--- a/core/src/worker_thread.cpp Sun Jul 18 18:57:41 2010 +0100
+++ b/core/src/worker_thread.cpp Sun Jul 18 22:40:48 2010 +0100
@@ -11,34 +11,33 @@
//
/*
+
Some notes on how CThreadPool works:
* There are 2 types of CWorkerThread - ones that share a specific heap, and those that have their own heap.
* Tasks that need a shared heap can only be run using a CWorkerThread that is bound to that heap (one will be created if necessary)
* Tasks that don't need a shared heap can be run on any CWorkerThread that has its own heap.
- * Excess CWorkerThreads are deleted after they've been idle for KCacheTime (default 1 second)
+ * Excess CWorkerThreads are deleted after they've been idle for KCacheTime.
+ * CThreadPool uses an RFastLock on all operations that could be called from multiple threads.
* All member data of CThreadPool (including CWorkerThreads and their members) is kept on iThreadPoolAllocator, which was the heap
- that the CThreadPool was created on. This heap is switched to (using User::SwitchAllocator() where necessary.
- * Operations on CThreadPool are protected by iLock where necesesary so they are thread-safe
- * When a task is queued, a non-busy CWorkerThread is found (or created), and that CWorkerThread adds itself to the
- caller's active scheduler (which is not necessarily in the same thread as when the CWorkerThread was created) so it can
- handle the death or completion of the worker thread in the context of the caller
- * When a task is completed in a worker thread, it completes its CWorkerThread iStatus. CWorkerThread::RunL then runs in the context of
- the parent thread so it can cancel the thread death watcher and detach itself from the thread ready to be reused (potentially by
- a different thread).
- * If the parent of a CWorkerThread and the worker thread itself both die at the same time, then Bad Things will happen. Consider this a TODO!
- Probably fix it by making the iThreadDeathWatchers all run in the CThreadPool main thread, on the assumption that if *that* dies all bets
- would be off anyway. It's just tricky to arrange because the call to NewTaskInSeparateThreadL is not necessarily going to be from that thread,
- and you can't queue an AO in another thread.
- * If a parent dies before its child this seems to cause problems too! TODO!
+ that the CThreadPool was created on. User::SwitchAllocator() is used where necessary to manipulate CThreadPool data from other threads
+ while the Lock is held
+ * When a task is queued, a non-busy CWorkerThread is found (or created).
+ * When a task is completed in a worker thread, it calls iParentThread.RequestComplete(iCompletionStatus, err). It then calls
+ CThreadPool::WorkerFinished() to set the CWorkerThread to be non-busy again.
+ * When a CWorkerThread is created it arranges (via SignalSelf()) to add a CThreadDeathWatcher to the main thread.
+ It follows that notifications about tasks failed because of thread death are always serviced by the main thread, which must therefore
+ remain responsive. (In the non-dead case, the completion notification is sent directly from the worker thread).
+
*/
#include "worker_thread.h"
#include <e32debug.h>
-//#define WT_LOG(args...)
+#define WT_LOG(args...)
// These defines are for debugging only
-#define WT_LOG(args...) RDebug::Print(args)
+//#define WT_LOG(args...) RDebug::Print(args)
+// These probably don't even work any more - use with extreme caution!
//#define NO_REUSE
//#define IMMEDIATE_CLEANUP
@@ -62,7 +61,7 @@
void CThreadPool::ConstructL()
{
User::LeaveIfError(iMainThread.Open(RThread().Id()));
- iIdleTimer = CPeriodic::NewL(CActive::EPriorityLow);
+ iIdleTimer = CPeriodic::NewL(CActive::EPriorityLow-1); // Interactions around iPendingThreadLogons mean it's probably a good idea to have this priority below that of CThreadPool itself
iThreadPoolAllocator = &User::Allocator();
User::LeaveIfError(iLock.CreateLocal());
iThreads.ReserveL(2);
@@ -81,6 +80,7 @@
iLock.Close();
delete iIdleTimer;
iMainThread.Close();
+ iPendingThreadLogons.Close();
}
void CThreadPool::Lock()
@@ -159,6 +159,8 @@
WT_LOG(_L("Creating new worker thread %d"), TUint(foundThread->GetThreadId()));
iThreads.AppendL(foundThread);
CleanupStack::Pop(foundThread);
+ iPendingThreadLogons.AppendL(foundThread);
+ SignalSelf(); // So the iPendingThreadLogons gets sorted out in context of main thread
RestoreHeap();
}
@@ -174,7 +176,9 @@
void CThreadPool::WorkerDied(CWorkerThread* aWorker)
{
Lock();
- SwitchToThreadPoolHeap();
+ // This is now always called in the main thread
+ ASSERT(RThread().Id() == iMainThread.Id());
+
// Find it and remove it - the next request will create a new worker if needed
for (TInt i = 0; i < iThreads.Count(); i++)
{
@@ -193,25 +197,22 @@
{
Lock();
aWorker->SetBusy(EFalse);
-#if defined(IMMEDIATE_CLEANUP)
- // Delete the thread straight away
- SwitchToThreadPoolHeap();
- for (TInt i = 0; i < iThreads.Count(); i++)
- {
- CWorkerThread* worker = iThreads[i];
- if (worker == aWorker)
- {
- worker->Shutdown();
- delete worker;
- iThreads.Remove(i);
- break;
- }
- }
-#elif defined(NO_REUSE)
+#if defined(NO_REUSE)
// Nothing
#else
// This is the normal case - queue ourself to run (on the main thread) so we can trigger the idle timer from our RunL
// Can't do that directly as timers are all thread-local
+ SignalSelf();
+#endif
+ Unlock();
+ }
+
+
+void CThreadPool::SignalSelf()
+ {
+ // TODO: Optimise if we actually are the main thread?
+
+ // Must be holding lock
iPendingCallbacks++;
TRequestStatus* stat = &iStatus;
if (iPendingCallbacks == 1) // Can't use IsActive() as we might not be in the same thread
@@ -220,8 +221,6 @@
SetActive();
}
iMainThread.RequestComplete(stat, KErrNone); // Fortunately RThread::RequestComplete doesn't set the status to KRequestPending before completing it (unlike User::RequestComplete)
-#endif
- Unlock();
}
void CThreadPool::CleanupAnyWorkersSharingAllocator(RAllocator* aAllocator)
@@ -251,7 +250,13 @@
WT_LOG(_L("Idle timer expired, cleaning up spare workers"));
Lock();
ASSERT(&User::Allocator() == iThreadPoolAllocator); // We're running in the thread pool's main thread so no need to switch allocators
+
+#ifdef IMMEDIATE_CLEANUP
+ // Everything gets nuked if this is defined
+ TBool foundSpareWorker = ETrue;
+#else
TBool foundSpareWorker = EFalse;
+#endif
for (TInt i = iThreads.Count() - 1; i >= 0; i--)
{
CWorkerThread* worker = iThreads[i];
@@ -291,9 +296,28 @@
iPendingCallbacks--;
}
+ // We kinda overload our RunL, using it for both registering logons and triggering the idle cleanup. Ah well.
+
+ if (iPendingThreadLogons.Count())
+ {
+ for (TInt i = 0; i < iPendingThreadLogons.Count(); i++)
+ {
+ iPendingThreadLogons[i]->RegisterThreadDeathWatcherOnCurrentThread();
+ }
+ SwitchToThreadPoolHeap();
+ iPendingThreadLogons.Reset();
+ }
+
+#ifdef IMMEDIATE_CLEANUP
+ Unlock();
+ // Delete the thread straight away
+ PerformHouseKeeping();
+#else
+ // Normally go through the timer
iIdleTimer->Cancel(); // Reset it if it's already counting
iIdleTimer->Start(KCacheTime, KCacheTime, TCallBack(&TimerCallback, this));
Unlock();
+#endif
}
TInt CThreadPool::TimerCallback(TAny* aSelf)
@@ -324,11 +348,6 @@
iThread.Logon(iStatus);
SetActive();
}
- void StopWatching()
- {
- Cancel();
- if (IsAdded()) Deque();
- }
private:
void DoCancel()
{
@@ -336,6 +355,7 @@
}
void RunL()
{
+ Deque(); // Our work is done. Might as well extract ourselves from whatever scheduler we're in while we can
iWorker->ThreadDied();
}
@@ -387,7 +407,7 @@
}
CWorkerThread::CWorkerThread(CThreadPool* aParentPool, RAllocator* aSharedAllocator)
- : CActive(CActive::EPriorityStandard), iParentPool(aParentPool), iSharedAllocator(aSharedAllocator)
+ : iParentPool(aParentPool), iSharedAllocator(aSharedAllocator)
{
iWorkerThread.SetHandle(0);
}
@@ -437,11 +457,6 @@
iWorkerThread.Close();
}
-void CWorkerThread::DoCancel()
- {
- ASSERT(EFalse); // We should never reach here as we never call Cancel, and we assert in our destructor that we're not active
- }
-
TInt CWorkerThread::Setup(TInt aTaskId, const TDesC& aThreadName, MTaskRunner::TThreadFunctionL aThreadFunction, TAny* aThreadContext)
{
ASSERT(!Busy());
@@ -453,13 +468,10 @@
TInt err = iParentThread.Open(RThread().Id());
if (!err)
{
- ASSERT(!IsAdded());
- CActiveScheduler::Add(this);
iTaskId = aTaskId;
iName = &aThreadName;
iFn = aThreadFunction;
iContext = aThreadContext;
- iThreadDeathWatcher->StartWatching();
}
return err;
}
@@ -476,9 +488,6 @@
TRequestStatus rendezvousStat;
iWorkerThread.Rendezvous(rendezvousStat);
- iStatus = KRequestPending; // Do this before signalling the other thread, it may complete quickly
- SetActive();
-
// Signal the worker thread to do its thing
TRequestStatus* dispatchStat = iDispatchStatus;
iWorkerThread.RequestComplete(dispatchStat, KErrNone);
@@ -509,13 +518,10 @@
void CWorkerThread::AbortTask()
{
// Undo setup
- ASSERT(!IsActive());
iName = NULL;
iFn = NULL;
iContext = NULL;
iParentThread.Close();
- Deque();
- iThreadDeathWatcher->StopWatching();
iParentPool->WorkerFinished(this);
}
@@ -552,8 +558,8 @@
}
// And signal back the result
- TRequestStatus* completionStat = &iStatus;
- iParentThread.RequestComplete(completionStat, err);
+ iParentPool->WorkerFinished(this);
+ iParentThread.RequestComplete(iCompletionStatus, err);
// Finally put our name back to what it was
#ifdef _DEBUG
@@ -573,7 +579,6 @@
{
__UHEAP_MARKEND;
}
-
}
TBool CWorkerThread::Busy() const
@@ -596,37 +601,16 @@
return iSharedAllocator;
}
-void CWorkerThread::RunL()
- {
- WT_LOG(_L("8. Finished task %d (%S) result=%d exittype=%d"), iTaskId, iName, iStatus.Int(), iWorkerThread.ExitType());
- Deque();
- iThreadDeathWatcher->StopWatching();
- iParentThread.Close();
-
- // Need to signal to completionstatus
- if (iCompletionStatus)
- {
- User::RequestComplete(iCompletionStatus, iStatus.Int());
- }
-
- if (iWorkerThread.ExitType() == EExitPending)
- {
- iParentPool->WorkerFinished(this);
- }
- else
- {
- iParentPool->WorkerDied(this);
- }
- }
-
void CWorkerThread::ThreadDied()
{
WT_LOG(_L("Task %d died with exittype %d reason %d"), iTaskId, iWorkerThread.ExitType(), iWorkerThread.ExitReason());
TInt err = iWorkerThread.ExitReason();
if (err >= 0) err = KErrDied;
- ASSERT(IsActive());
- TRequestStatus* ourStat = &iStatus;
- User::RequestComplete(ourStat, err);
+ if (iCompletionStatus)
+ {
+ iParentThread.RequestComplete(iCompletionStatus, err);
+ }
+ iParentPool->WorkerDied(this);
}
TInt CWorkerThread::ThreadFn(TAny* aSelf)
@@ -677,13 +661,16 @@
void CWorkerThread::Shutdown()
{
+ // Can be called from (potentially) any thread
WT_LOG(_L("Shutting down worker thread %d whose exittype is %d"), TUint(GetThreadId()), iWorkerThread.ExitType());
- ASSERT(iCompletionStatus == NULL && !IsActive() && !Busy()); // If we're active the logic below is flawed!
- // We don't need to cancel iThreadDeathWatcher, we called StopWatching() when the task completed
- // Hope like hell fshell doesn't call CmndKill() after the task has officially completed (because we won't pick if up cos we close the thread watcher when the task finishes - but it'll probably cause an ASSERT(IsRunning()) to be hit later)
-
- if (iWorkerThread.ExitType() == EExitPending)
+ if (Busy())
+ {
+ WT_LOG(_L("Thread is still busy - killing it"));
+ iWorkerThread.Kill(KErrAbort);
+ // The thread death watcher should take care of everything else, eventually
+ }
+ else if (iWorkerThread.ExitType() == EExitPending)
{
TRequestStatus stat;
iWorkerThread.Logon(stat);
@@ -691,8 +678,9 @@
User::WaitForRequest(stat);
WT_LOG(_L("Shut down worker %d exit=%d"), TUint(GetThreadId()), iWorkerThread.ExitType());
}
- /*if (iCompletionStatus)
- {
- User::RequestComplete(iCompletionStatus, KErrCancel);
- }*/
}
+
+void CWorkerThread::RegisterThreadDeathWatcherOnCurrentThread()
+ {
+ iThreadDeathWatcher->StartWatching();
+ }
--- a/core/src/worker_thread.h Sun Jul 18 18:57:41 2010 +0100
+++ b/core/src/worker_thread.h Sun Jul 18 22:40:48 2010 +0100
@@ -51,6 +51,7 @@
void PerformHouseKeeping();
void RunL();
void DoCancel();
+ void SignalSelf();
void Lock();
void LockLC();
@@ -69,12 +70,13 @@
TInt iPendingCallbacks;
CPeriodic* iIdleTimer;
TInt iCountThreadsCreated; // This is for statistics gathering, not involved in the logic
+ RArray<CWorkerThread*> iPendingThreadLogons; // Not owned
};
class CThreadDeathWatcher;
-class CWorkerThread : public CActive, public MThreadedTask
+class CWorkerThread : public CBase, public MThreadedTask
{
public:
static CWorkerThread* NewLC(CThreadPool* aParentPool, RAllocator* aSharedAllocator);
@@ -87,6 +89,7 @@
TInt Setup(TInt aTaskId, const TDesC& aThreadName, MTaskRunner::TThreadFunctionL aThreadFunction, TAny* aThreadContext);
void Shutdown();
~CWorkerThread();
+ void RegisterThreadDeathWatcherOnCurrentThread();
TBool Running() const { return iWorkerThread.Handle() && iWorkerThread.ExitType() == EExitPending; }
@@ -105,9 +108,6 @@
static TInt ThreadFn(TAny* aSelf);
void ThreadFnL();
- void RunL();
- void DoCancel();
-
private:
CThreadPool* iParentPool;
RThread iWorkerThread;