Improvements to robustness of CThreadPool.
authorTom Sutcliffe <thomas.sutcliffe@accenture.com>
Sun, 18 Jul 2010 22:40:48 +0100
changeset 37 3a357d180879
parent 36 99de8c43cede
child 38 75024fba2fb1
Improvements to robustness of CThreadPool. Rationalised CThreadPool a bit, so that only the main thread is used for thread death notifications. This means that the death of parent threads no longer causes a panic when the child threads is shut down.
core/src/command_factory.cpp
core/src/worker_thread.cpp
core/src/worker_thread.h
--- a/core/src/command_factory.cpp	Sun Jul 18 18:57:41 2010 +0100
+++ b/core/src/command_factory.cpp	Sun Jul 18 22:40:48 2010 +0100
@@ -303,7 +303,7 @@
 	AddThreadCommandL(CCmdStart::NewLC);
 	AddThreadCommandL(CCmdCompare::NewLC);
 	AddThreadCommandL(CCmdTime::NewLC);
-	AddThreadCommandL(CCmdRepeat::NewLC);
+	AddThreadCommandL(CCmdRepeat::NewLC); // TODO: Should this have EUpdateEnvironment? It seems weird that source and foreach do but repeat doesn't. -TomS
 	AddThreadCommandL(CCmdDebug::NewLC);
 	AddThreadCommandL(CCmdReadMem::NewLC);
 	AddThreadCommandL(CCmdE32Header::NewLC);
--- a/core/src/worker_thread.cpp	Sun Jul 18 18:57:41 2010 +0100
+++ b/core/src/worker_thread.cpp	Sun Jul 18 22:40:48 2010 +0100
@@ -11,34 +11,33 @@
 //
 
 /* 
+
 Some notes on how CThreadPool works:
  * There are 2 types of CWorkerThread - ones that share a specific heap, and those that have their own heap.
  * Tasks that need a shared heap can only be run using a CWorkerThread that is bound to that heap (one will be created if necessary)
  * Tasks that don't need a shared heap can be run on any CWorkerThread that has its own heap.
- * Excess CWorkerThreads are deleted after they've been idle for KCacheTime (default 1 second)
+ * Excess CWorkerThreads are deleted after they've been idle for KCacheTime.
+ * CThreadPool uses an RFastLock on all operations that could be called from multiple threads.
  * All member data of CThreadPool (including CWorkerThreads and their members) is kept on iThreadPoolAllocator, which was the heap
-   that the CThreadPool was created on. This heap is switched to (using User::SwitchAllocator() where necessary.
- * Operations on CThreadPool are protected by iLock where necesesary so they are thread-safe
- * When a task is queued, a non-busy CWorkerThread is found (or created), and that CWorkerThread adds itself to the 
-   caller's active scheduler (which is not necessarily in the same thread as when the CWorkerThread was created) so it can 
-   handle the death or completion of the worker thread in the context of the caller
- * When a task is completed in a worker thread, it completes its CWorkerThread iStatus. CWorkerThread::RunL then runs in the context of
-   the parent thread so it can cancel the thread death watcher and detach itself from the thread ready to be reused (potentially by 
-   a different thread).
- * If the parent of a CWorkerThread and the worker thread itself both die at the same time, then Bad Things will happen. Consider this a TODO!
-   Probably fix it by making the iThreadDeathWatchers all run in the CThreadPool main thread, on the assumption that if *that* dies all bets
-   would be off anyway. It's just tricky to arrange because the call to NewTaskInSeparateThreadL is not necessarily going to be from that thread, 
-   and you can't queue an AO in another thread.
- * If a parent dies before its child this seems to cause problems too! TODO!
+   that the CThreadPool was created on. User::SwitchAllocator() is used where necessary to manipulate CThreadPool data from other threads
+   while the Lock is held
+ * When a task is queued, a non-busy CWorkerThread is found (or created).
+ * When a task is completed in a worker thread, it calls iParentThread.RequestComplete(iCompletionStatus, err). It then calls
+   CThreadPool::WorkerFinished() to set the CWorkerThread to be non-busy again.
+ * When a CWorkerThread is created it arranges (via SignalSelf()) to add a CThreadDeathWatcher to the main thread.
+   It follows that notifications about tasks failed because of thread death are always serviced by the main thread, which must therefore
+   remain responsive. (In the non-dead case, the completion notification is sent directly from the worker thread).
+
 */
 
 #include "worker_thread.h"
 #include <e32debug.h>
 
-//#define WT_LOG(args...)
+#define WT_LOG(args...)
 
 // These defines are for debugging only
-#define WT_LOG(args...) RDebug::Print(args)
+//#define WT_LOG(args...) RDebug::Print(args)
+// These probably don't even work any more - use with extreme caution!
 //#define NO_REUSE
 //#define IMMEDIATE_CLEANUP
 
@@ -62,7 +61,7 @@
 void CThreadPool::ConstructL()
 	{
 	User::LeaveIfError(iMainThread.Open(RThread().Id()));
-	iIdleTimer = CPeriodic::NewL(CActive::EPriorityLow);
+	iIdleTimer = CPeriodic::NewL(CActive::EPriorityLow-1); // Interactions around iPendingThreadLogons mean it's probably a good idea to have this priority below that of CThreadPool itself
 	iThreadPoolAllocator = &User::Allocator();
 	User::LeaveIfError(iLock.CreateLocal());
 	iThreads.ReserveL(2);
@@ -81,6 +80,7 @@
 	iLock.Close();
 	delete iIdleTimer;
 	iMainThread.Close();
+	iPendingThreadLogons.Close();
 	}
 
 void CThreadPool::Lock()
@@ -159,6 +159,8 @@
 		WT_LOG(_L("Creating new worker thread %d"), TUint(foundThread->GetThreadId()));
 		iThreads.AppendL(foundThread);
 		CleanupStack::Pop(foundThread);
+		iPendingThreadLogons.AppendL(foundThread);
+		SignalSelf(); // So the iPendingThreadLogons gets sorted out in context of main thread
 		RestoreHeap();
 		}
 
@@ -174,7 +176,9 @@
 void CThreadPool::WorkerDied(CWorkerThread* aWorker)
 	{
 	Lock();
-	SwitchToThreadPoolHeap();
+	// This is now always called in the main thread
+	ASSERT(RThread().Id() == iMainThread.Id());
+
 	// Find it and remove it - the next request will create a new worker if needed
 	for (TInt i = 0; i < iThreads.Count(); i++)
 		{
@@ -193,25 +197,22 @@
 	{
 	Lock();
 	aWorker->SetBusy(EFalse);
-#if defined(IMMEDIATE_CLEANUP)
-	// Delete the thread straight away
-	SwitchToThreadPoolHeap();
-	for (TInt i = 0; i < iThreads.Count(); i++)
-		{
-		CWorkerThread* worker = iThreads[i];
-		if (worker == aWorker)
-			{
-			worker->Shutdown();
-			delete worker;
-			iThreads.Remove(i);
-			break;
-			}
-		}
-#elif defined(NO_REUSE)
+#if defined(NO_REUSE)
 	// Nothing
 #else
 	// This is the normal case - queue ourself to run (on the main thread) so we can trigger the idle timer from our RunL
 	// Can't do that directly as timers are all thread-local
+	SignalSelf();
+#endif
+	Unlock();
+	}
+
+
+void CThreadPool::SignalSelf()
+	{
+	// TODO: Optimise if we actually are the main thread?
+
+	// Must be holding lock
 	iPendingCallbacks++;
 	TRequestStatus* stat = &iStatus;
 	if (iPendingCallbacks == 1) // Can't use IsActive() as we might not be in the same thread
@@ -220,8 +221,6 @@
 		SetActive();
 		}
 	iMainThread.RequestComplete(stat, KErrNone); // Fortunately RThread::RequestComplete doesn't set the status to KRequestPending before completing it (unlike User::RequestComplete)
-#endif
-	Unlock();
 	}
 
 void CThreadPool::CleanupAnyWorkersSharingAllocator(RAllocator* aAllocator)
@@ -251,7 +250,13 @@
 	WT_LOG(_L("Idle timer expired, cleaning up spare workers"));
 	Lock();
 	ASSERT(&User::Allocator() == iThreadPoolAllocator); // We're running in the thread pool's main thread so no need to switch allocators
+
+#ifdef IMMEDIATE_CLEANUP
+	// Everything gets nuked if this is defined
+	TBool foundSpareWorker = ETrue;
+#else
 	TBool foundSpareWorker = EFalse;
+#endif
 	for (TInt i = iThreads.Count() - 1; i >= 0; i--)
 		{
 		CWorkerThread* worker = iThreads[i];
@@ -291,9 +296,28 @@
 		iPendingCallbacks--;
 		}
 
+	// We kinda overload our RunL, using it for both registering logons and triggering the idle cleanup. Ah well.
+
+	if (iPendingThreadLogons.Count())
+		{
+		for (TInt i = 0; i < iPendingThreadLogons.Count(); i++)
+			{
+			iPendingThreadLogons[i]->RegisterThreadDeathWatcherOnCurrentThread();
+			}
+		SwitchToThreadPoolHeap();
+		iPendingThreadLogons.Reset();
+		}
+
+#ifdef IMMEDIATE_CLEANUP
+	Unlock();
+	// Delete the thread straight away
+	PerformHouseKeeping();
+#else
+	// Normally go through the timer
 	iIdleTimer->Cancel(); // Reset it if it's already counting
 	iIdleTimer->Start(KCacheTime, KCacheTime, TCallBack(&TimerCallback, this));
 	Unlock();
+#endif
 	}
 
 TInt CThreadPool::TimerCallback(TAny* aSelf)
@@ -324,11 +348,6 @@
 		iThread.Logon(iStatus);
 		SetActive();
 		}
-	void StopWatching()
-		{
-		Cancel();
-		if (IsAdded()) Deque();
-		}
 private:
 	void DoCancel()
 		{
@@ -336,6 +355,7 @@
 		}
 	void RunL()
 		{
+		Deque(); // Our work is done. Might as well extract ourselves from whatever scheduler we're in while we can
 		iWorker->ThreadDied();
 		}
 
@@ -387,7 +407,7 @@
 	}
 
 CWorkerThread::CWorkerThread(CThreadPool* aParentPool, RAllocator* aSharedAllocator)
-	: CActive(CActive::EPriorityStandard), iParentPool(aParentPool), iSharedAllocator(aSharedAllocator)
+	: iParentPool(aParentPool), iSharedAllocator(aSharedAllocator)
 	{
 	iWorkerThread.SetHandle(0);
 	}
@@ -437,11 +457,6 @@
 	iWorkerThread.Close();
 	}
 
-void CWorkerThread::DoCancel()
-	{
-	ASSERT(EFalse); // We should never reach here as we never call Cancel, and we assert in our destructor that we're not active
-	}
-
 TInt CWorkerThread::Setup(TInt aTaskId, const TDesC& aThreadName, MTaskRunner::TThreadFunctionL aThreadFunction, TAny* aThreadContext)
 	{
 	ASSERT(!Busy());
@@ -453,13 +468,10 @@
 	TInt err = iParentThread.Open(RThread().Id());
 	if (!err)
 		{
-		ASSERT(!IsAdded());
-		CActiveScheduler::Add(this);
 		iTaskId = aTaskId;
 		iName = &aThreadName;
 		iFn = aThreadFunction;
 		iContext = aThreadContext;
-		iThreadDeathWatcher->StartWatching();
 		}
 	return err;
 	}
@@ -476,9 +488,6 @@
 	TRequestStatus rendezvousStat;
 	iWorkerThread.Rendezvous(rendezvousStat);
 
-	iStatus = KRequestPending; // Do this before signalling the other thread, it may complete quickly
-	SetActive();
-
 	// Signal the worker thread to do its thing
 	TRequestStatus* dispatchStat = iDispatchStatus;
 	iWorkerThread.RequestComplete(dispatchStat, KErrNone);
@@ -509,13 +518,10 @@
 void CWorkerThread::AbortTask()
 	{
 	// Undo setup
-	ASSERT(!IsActive());
 	iName = NULL;
 	iFn = NULL;
 	iContext = NULL;
 	iParentThread.Close();
-	Deque();
-	iThreadDeathWatcher->StopWatching();
 	iParentPool->WorkerFinished(this);
 	}
 
@@ -552,8 +558,8 @@
 		}
 
 	// And signal back the result
-	TRequestStatus* completionStat = &iStatus;
-	iParentThread.RequestComplete(completionStat, err);
+	iParentPool->WorkerFinished(this);
+	iParentThread.RequestComplete(iCompletionStatus, err);
 
 	// Finally put our name back to what it was
 #ifdef _DEBUG
@@ -573,7 +579,6 @@
 		{
 		__UHEAP_MARKEND;
 		}
-
 	}
 
 TBool CWorkerThread::Busy() const
@@ -596,37 +601,16 @@
 	return iSharedAllocator;
 	}
 
-void CWorkerThread::RunL()
-	{
-	WT_LOG(_L("8. Finished task %d (%S) result=%d exittype=%d"), iTaskId, iName, iStatus.Int(), iWorkerThread.ExitType());
-	Deque();
-	iThreadDeathWatcher->StopWatching();
-	iParentThread.Close();
-
-	// Need to signal to completionstatus
-	if (iCompletionStatus)
-		{
-		User::RequestComplete(iCompletionStatus, iStatus.Int());
-		}
-
-	if (iWorkerThread.ExitType() == EExitPending)
-		{
-		iParentPool->WorkerFinished(this);
-		}
-	else
-		{
-		iParentPool->WorkerDied(this);
-		}
-	}
-
 void CWorkerThread::ThreadDied()
 	{
 	WT_LOG(_L("Task %d died with exittype %d reason %d"), iTaskId, iWorkerThread.ExitType(), iWorkerThread.ExitReason());
 	TInt err = iWorkerThread.ExitReason();
 	if (err >= 0) err = KErrDied;
-	ASSERT(IsActive());
-	TRequestStatus* ourStat = &iStatus;
-	User::RequestComplete(ourStat, err);
+	if (iCompletionStatus)
+		{
+		iParentThread.RequestComplete(iCompletionStatus, err);
+		}
+	iParentPool->WorkerDied(this);
 	}
 
 TInt CWorkerThread::ThreadFn(TAny* aSelf)
@@ -677,13 +661,16 @@
 
 void CWorkerThread::Shutdown()
 	{
+	// Can be called from (potentially) any thread
 	WT_LOG(_L("Shutting down worker thread %d whose exittype is %d"), TUint(GetThreadId()), iWorkerThread.ExitType());
-	ASSERT(iCompletionStatus == NULL && !IsActive() && !Busy()); // If we're active the logic below is flawed!
 
-	// We don't need to cancel iThreadDeathWatcher, we called StopWatching() when the task completed
-	// Hope like hell fshell doesn't call CmndKill() after the task has officially completed (because we won't pick if up cos we close the thread watcher when the task finishes - but it'll probably cause an ASSERT(IsRunning()) to be hit later)
-	
-	if (iWorkerThread.ExitType() == EExitPending)
+	if (Busy())
+		{
+		WT_LOG(_L("Thread is still busy - killing it"));
+		iWorkerThread.Kill(KErrAbort);
+		// The thread death watcher should take care of everything else, eventually
+		}
+	else if (iWorkerThread.ExitType() == EExitPending)
 		{
 		TRequestStatus stat;
 		iWorkerThread.Logon(stat);
@@ -691,8 +678,9 @@
 		User::WaitForRequest(stat);
 		WT_LOG(_L("Shut down worker %d exit=%d"), TUint(GetThreadId()), iWorkerThread.ExitType());
 		}
-	/*if (iCompletionStatus)
-		{
-		User::RequestComplete(iCompletionStatus, KErrCancel);
-		}*/
 	}
+
+void CWorkerThread::RegisterThreadDeathWatcherOnCurrentThread()
+	{
+	iThreadDeathWatcher->StartWatching();
+	}
--- a/core/src/worker_thread.h	Sun Jul 18 18:57:41 2010 +0100
+++ b/core/src/worker_thread.h	Sun Jul 18 22:40:48 2010 +0100
@@ -51,6 +51,7 @@
 	void PerformHouseKeeping();
 	void RunL();
 	void DoCancel();
+	void SignalSelf();
 
 	void Lock();
 	void LockLC();
@@ -69,12 +70,13 @@
 	TInt iPendingCallbacks;
 	CPeriodic* iIdleTimer;
 	TInt iCountThreadsCreated; // This is for statistics gathering, not involved in the logic
+	RArray<CWorkerThread*> iPendingThreadLogons; // Not owned
 	};
 
 
 class CThreadDeathWatcher;
 
-class CWorkerThread : public CActive, public MThreadedTask
+class CWorkerThread : public CBase, public MThreadedTask
 	{
 public:
 	static CWorkerThread* NewLC(CThreadPool* aParentPool, RAllocator* aSharedAllocator);
@@ -87,6 +89,7 @@
 	TInt Setup(TInt aTaskId, const TDesC& aThreadName, MTaskRunner::TThreadFunctionL aThreadFunction, TAny* aThreadContext);
 	void Shutdown();
 	~CWorkerThread();
+	void RegisterThreadDeathWatcherOnCurrentThread();
 
 	TBool Running() const { return iWorkerThread.Handle() && iWorkerThread.ExitType() == EExitPending; }
 
@@ -105,9 +108,6 @@
 	static TInt ThreadFn(TAny* aSelf);
 	void ThreadFnL();
 
-	void RunL();
-	void DoCancel();
-
 private:
 	CThreadPool* iParentPool;
 	RThread iWorkerThread;