# HG changeset patch # User Tom Sutcliffe # Date 1279489248 -3600 # Node ID 3a357d1808794d2a9151166c7856acbc1a4be5cc # Parent 99de8c43cede87ab7be975362cc3621ee2b7c132 Improvements to robustness of CThreadPool. Rationalised CThreadPool a bit, so that only the main thread is used for thread death notifications. This means that the death of parent threads no longer causes a panic when the child threads is shut down. diff -r 99de8c43cede -r 3a357d180879 core/src/command_factory.cpp --- a/core/src/command_factory.cpp Sun Jul 18 18:57:41 2010 +0100 +++ b/core/src/command_factory.cpp Sun Jul 18 22:40:48 2010 +0100 @@ -303,7 +303,7 @@ AddThreadCommandL(CCmdStart::NewLC); AddThreadCommandL(CCmdCompare::NewLC); AddThreadCommandL(CCmdTime::NewLC); - AddThreadCommandL(CCmdRepeat::NewLC); + AddThreadCommandL(CCmdRepeat::NewLC); // TODO: Should this have EUpdateEnvironment? It seems weird that source and foreach do but repeat doesn't. -TomS AddThreadCommandL(CCmdDebug::NewLC); AddThreadCommandL(CCmdReadMem::NewLC); AddThreadCommandL(CCmdE32Header::NewLC); diff -r 99de8c43cede -r 3a357d180879 core/src/worker_thread.cpp --- a/core/src/worker_thread.cpp Sun Jul 18 18:57:41 2010 +0100 +++ b/core/src/worker_thread.cpp Sun Jul 18 22:40:48 2010 +0100 @@ -11,34 +11,33 @@ // /* + Some notes on how CThreadPool works: * There are 2 types of CWorkerThread - ones that share a specific heap, and those that have their own heap. * Tasks that need a shared heap can only be run using a CWorkerThread that is bound to that heap (one will be created if necessary) * Tasks that don't need a shared heap can be run on any CWorkerThread that has its own heap. - * Excess CWorkerThreads are deleted after they've been idle for KCacheTime (default 1 second) + * Excess CWorkerThreads are deleted after they've been idle for KCacheTime. + * CThreadPool uses an RFastLock on all operations that could be called from multiple threads. * All member data of CThreadPool (including CWorkerThreads and their members) is kept on iThreadPoolAllocator, which was the heap - that the CThreadPool was created on. This heap is switched to (using User::SwitchAllocator() where necessary. - * Operations on CThreadPool are protected by iLock where necesesary so they are thread-safe - * When a task is queued, a non-busy CWorkerThread is found (or created), and that CWorkerThread adds itself to the - caller's active scheduler (which is not necessarily in the same thread as when the CWorkerThread was created) so it can - handle the death or completion of the worker thread in the context of the caller - * When a task is completed in a worker thread, it completes its CWorkerThread iStatus. CWorkerThread::RunL then runs in the context of - the parent thread so it can cancel the thread death watcher and detach itself from the thread ready to be reused (potentially by - a different thread). - * If the parent of a CWorkerThread and the worker thread itself both die at the same time, then Bad Things will happen. Consider this a TODO! - Probably fix it by making the iThreadDeathWatchers all run in the CThreadPool main thread, on the assumption that if *that* dies all bets - would be off anyway. It's just tricky to arrange because the call to NewTaskInSeparateThreadL is not necessarily going to be from that thread, - and you can't queue an AO in another thread. - * If a parent dies before its child this seems to cause problems too! TODO! + that the CThreadPool was created on. User::SwitchAllocator() is used where necessary to manipulate CThreadPool data from other threads + while the Lock is held + * When a task is queued, a non-busy CWorkerThread is found (or created). + * When a task is completed in a worker thread, it calls iParentThread.RequestComplete(iCompletionStatus, err). It then calls + CThreadPool::WorkerFinished() to set the CWorkerThread to be non-busy again. + * When a CWorkerThread is created it arranges (via SignalSelf()) to add a CThreadDeathWatcher to the main thread. + It follows that notifications about tasks failed because of thread death are always serviced by the main thread, which must therefore + remain responsive. (In the non-dead case, the completion notification is sent directly from the worker thread). + */ #include "worker_thread.h" #include -//#define WT_LOG(args...) +#define WT_LOG(args...) // These defines are for debugging only -#define WT_LOG(args...) RDebug::Print(args) +//#define WT_LOG(args...) RDebug::Print(args) +// These probably don't even work any more - use with extreme caution! //#define NO_REUSE //#define IMMEDIATE_CLEANUP @@ -62,7 +61,7 @@ void CThreadPool::ConstructL() { User::LeaveIfError(iMainThread.Open(RThread().Id())); - iIdleTimer = CPeriodic::NewL(CActive::EPriorityLow); + iIdleTimer = CPeriodic::NewL(CActive::EPriorityLow-1); // Interactions around iPendingThreadLogons mean it's probably a good idea to have this priority below that of CThreadPool itself iThreadPoolAllocator = &User::Allocator(); User::LeaveIfError(iLock.CreateLocal()); iThreads.ReserveL(2); @@ -81,6 +80,7 @@ iLock.Close(); delete iIdleTimer; iMainThread.Close(); + iPendingThreadLogons.Close(); } void CThreadPool::Lock() @@ -159,6 +159,8 @@ WT_LOG(_L("Creating new worker thread %d"), TUint(foundThread->GetThreadId())); iThreads.AppendL(foundThread); CleanupStack::Pop(foundThread); + iPendingThreadLogons.AppendL(foundThread); + SignalSelf(); // So the iPendingThreadLogons gets sorted out in context of main thread RestoreHeap(); } @@ -174,7 +176,9 @@ void CThreadPool::WorkerDied(CWorkerThread* aWorker) { Lock(); - SwitchToThreadPoolHeap(); + // This is now always called in the main thread + ASSERT(RThread().Id() == iMainThread.Id()); + // Find it and remove it - the next request will create a new worker if needed for (TInt i = 0; i < iThreads.Count(); i++) { @@ -193,25 +197,22 @@ { Lock(); aWorker->SetBusy(EFalse); -#if defined(IMMEDIATE_CLEANUP) - // Delete the thread straight away - SwitchToThreadPoolHeap(); - for (TInt i = 0; i < iThreads.Count(); i++) - { - CWorkerThread* worker = iThreads[i]; - if (worker == aWorker) - { - worker->Shutdown(); - delete worker; - iThreads.Remove(i); - break; - } - } -#elif defined(NO_REUSE) +#if defined(NO_REUSE) // Nothing #else // This is the normal case - queue ourself to run (on the main thread) so we can trigger the idle timer from our RunL // Can't do that directly as timers are all thread-local + SignalSelf(); +#endif + Unlock(); + } + + +void CThreadPool::SignalSelf() + { + // TODO: Optimise if we actually are the main thread? + + // Must be holding lock iPendingCallbacks++; TRequestStatus* stat = &iStatus; if (iPendingCallbacks == 1) // Can't use IsActive() as we might not be in the same thread @@ -220,8 +221,6 @@ SetActive(); } iMainThread.RequestComplete(stat, KErrNone); // Fortunately RThread::RequestComplete doesn't set the status to KRequestPending before completing it (unlike User::RequestComplete) -#endif - Unlock(); } void CThreadPool::CleanupAnyWorkersSharingAllocator(RAllocator* aAllocator) @@ -251,7 +250,13 @@ WT_LOG(_L("Idle timer expired, cleaning up spare workers")); Lock(); ASSERT(&User::Allocator() == iThreadPoolAllocator); // We're running in the thread pool's main thread so no need to switch allocators + +#ifdef IMMEDIATE_CLEANUP + // Everything gets nuked if this is defined + TBool foundSpareWorker = ETrue; +#else TBool foundSpareWorker = EFalse; +#endif for (TInt i = iThreads.Count() - 1; i >= 0; i--) { CWorkerThread* worker = iThreads[i]; @@ -291,9 +296,28 @@ iPendingCallbacks--; } + // We kinda overload our RunL, using it for both registering logons and triggering the idle cleanup. Ah well. + + if (iPendingThreadLogons.Count()) + { + for (TInt i = 0; i < iPendingThreadLogons.Count(); i++) + { + iPendingThreadLogons[i]->RegisterThreadDeathWatcherOnCurrentThread(); + } + SwitchToThreadPoolHeap(); + iPendingThreadLogons.Reset(); + } + +#ifdef IMMEDIATE_CLEANUP + Unlock(); + // Delete the thread straight away + PerformHouseKeeping(); +#else + // Normally go through the timer iIdleTimer->Cancel(); // Reset it if it's already counting iIdleTimer->Start(KCacheTime, KCacheTime, TCallBack(&TimerCallback, this)); Unlock(); +#endif } TInt CThreadPool::TimerCallback(TAny* aSelf) @@ -324,11 +348,6 @@ iThread.Logon(iStatus); SetActive(); } - void StopWatching() - { - Cancel(); - if (IsAdded()) Deque(); - } private: void DoCancel() { @@ -336,6 +355,7 @@ } void RunL() { + Deque(); // Our work is done. Might as well extract ourselves from whatever scheduler we're in while we can iWorker->ThreadDied(); } @@ -387,7 +407,7 @@ } CWorkerThread::CWorkerThread(CThreadPool* aParentPool, RAllocator* aSharedAllocator) - : CActive(CActive::EPriorityStandard), iParentPool(aParentPool), iSharedAllocator(aSharedAllocator) + : iParentPool(aParentPool), iSharedAllocator(aSharedAllocator) { iWorkerThread.SetHandle(0); } @@ -437,11 +457,6 @@ iWorkerThread.Close(); } -void CWorkerThread::DoCancel() - { - ASSERT(EFalse); // We should never reach here as we never call Cancel, and we assert in our destructor that we're not active - } - TInt CWorkerThread::Setup(TInt aTaskId, const TDesC& aThreadName, MTaskRunner::TThreadFunctionL aThreadFunction, TAny* aThreadContext) { ASSERT(!Busy()); @@ -453,13 +468,10 @@ TInt err = iParentThread.Open(RThread().Id()); if (!err) { - ASSERT(!IsAdded()); - CActiveScheduler::Add(this); iTaskId = aTaskId; iName = &aThreadName; iFn = aThreadFunction; iContext = aThreadContext; - iThreadDeathWatcher->StartWatching(); } return err; } @@ -476,9 +488,6 @@ TRequestStatus rendezvousStat; iWorkerThread.Rendezvous(rendezvousStat); - iStatus = KRequestPending; // Do this before signalling the other thread, it may complete quickly - SetActive(); - // Signal the worker thread to do its thing TRequestStatus* dispatchStat = iDispatchStatus; iWorkerThread.RequestComplete(dispatchStat, KErrNone); @@ -509,13 +518,10 @@ void CWorkerThread::AbortTask() { // Undo setup - ASSERT(!IsActive()); iName = NULL; iFn = NULL; iContext = NULL; iParentThread.Close(); - Deque(); - iThreadDeathWatcher->StopWatching(); iParentPool->WorkerFinished(this); } @@ -552,8 +558,8 @@ } // And signal back the result - TRequestStatus* completionStat = &iStatus; - iParentThread.RequestComplete(completionStat, err); + iParentPool->WorkerFinished(this); + iParentThread.RequestComplete(iCompletionStatus, err); // Finally put our name back to what it was #ifdef _DEBUG @@ -573,7 +579,6 @@ { __UHEAP_MARKEND; } - } TBool CWorkerThread::Busy() const @@ -596,37 +601,16 @@ return iSharedAllocator; } -void CWorkerThread::RunL() - { - WT_LOG(_L("8. Finished task %d (%S) result=%d exittype=%d"), iTaskId, iName, iStatus.Int(), iWorkerThread.ExitType()); - Deque(); - iThreadDeathWatcher->StopWatching(); - iParentThread.Close(); - - // Need to signal to completionstatus - if (iCompletionStatus) - { - User::RequestComplete(iCompletionStatus, iStatus.Int()); - } - - if (iWorkerThread.ExitType() == EExitPending) - { - iParentPool->WorkerFinished(this); - } - else - { - iParentPool->WorkerDied(this); - } - } - void CWorkerThread::ThreadDied() { WT_LOG(_L("Task %d died with exittype %d reason %d"), iTaskId, iWorkerThread.ExitType(), iWorkerThread.ExitReason()); TInt err = iWorkerThread.ExitReason(); if (err >= 0) err = KErrDied; - ASSERT(IsActive()); - TRequestStatus* ourStat = &iStatus; - User::RequestComplete(ourStat, err); + if (iCompletionStatus) + { + iParentThread.RequestComplete(iCompletionStatus, err); + } + iParentPool->WorkerDied(this); } TInt CWorkerThread::ThreadFn(TAny* aSelf) @@ -677,13 +661,16 @@ void CWorkerThread::Shutdown() { + // Can be called from (potentially) any thread WT_LOG(_L("Shutting down worker thread %d whose exittype is %d"), TUint(GetThreadId()), iWorkerThread.ExitType()); - ASSERT(iCompletionStatus == NULL && !IsActive() && !Busy()); // If we're active the logic below is flawed! - // We don't need to cancel iThreadDeathWatcher, we called StopWatching() when the task completed - // Hope like hell fshell doesn't call CmndKill() after the task has officially completed (because we won't pick if up cos we close the thread watcher when the task finishes - but it'll probably cause an ASSERT(IsRunning()) to be hit later) - - if (iWorkerThread.ExitType() == EExitPending) + if (Busy()) + { + WT_LOG(_L("Thread is still busy - killing it")); + iWorkerThread.Kill(KErrAbort); + // The thread death watcher should take care of everything else, eventually + } + else if (iWorkerThread.ExitType() == EExitPending) { TRequestStatus stat; iWorkerThread.Logon(stat); @@ -691,8 +678,9 @@ User::WaitForRequest(stat); WT_LOG(_L("Shut down worker %d exit=%d"), TUint(GetThreadId()), iWorkerThread.ExitType()); } - /*if (iCompletionStatus) - { - User::RequestComplete(iCompletionStatus, KErrCancel); - }*/ } + +void CWorkerThread::RegisterThreadDeathWatcherOnCurrentThread() + { + iThreadDeathWatcher->StartWatching(); + } diff -r 99de8c43cede -r 3a357d180879 core/src/worker_thread.h --- a/core/src/worker_thread.h Sun Jul 18 18:57:41 2010 +0100 +++ b/core/src/worker_thread.h Sun Jul 18 22:40:48 2010 +0100 @@ -51,6 +51,7 @@ void PerformHouseKeeping(); void RunL(); void DoCancel(); + void SignalSelf(); void Lock(); void LockLC(); @@ -69,12 +70,13 @@ TInt iPendingCallbacks; CPeriodic* iIdleTimer; TInt iCountThreadsCreated; // This is for statistics gathering, not involved in the logic + RArray iPendingThreadLogons; // Not owned }; class CThreadDeathWatcher; -class CWorkerThread : public CActive, public MThreadedTask +class CWorkerThread : public CBase, public MThreadedTask { public: static CWorkerThread* NewLC(CThreadPool* aParentPool, RAllocator* aSharedAllocator); @@ -87,6 +89,7 @@ TInt Setup(TInt aTaskId, const TDesC& aThreadName, MTaskRunner::TThreadFunctionL aThreadFunction, TAny* aThreadContext); void Shutdown(); ~CWorkerThread(); + void RegisterThreadDeathWatcherOnCurrentThread(); TBool Running() const { return iWorkerThread.Handle() && iWorkerThread.ExitType() == EExitPending; } @@ -105,9 +108,6 @@ static TInt ThreadFn(TAny* aSelf); void ThreadFnL(); - void RunL(); - void DoCancel(); - private: CThreadPool* iParentPool; RThread iWorkerThread;