# HG changeset patch # User Tom Sutcliffe # Date 1282771406 -3600 # Node ID da10798406fc892d03e0ca7891f7279628bf58bf # Parent a581f3c08c9aa104519b1bd94d24bbbd443b7063# Parent 698ccde1571316dd82711fbbd9a29ed9a9812f81 Merge from MCL diff -r 698ccde15713 -r da10798406fc build/common/common.mmh diff -r 698ccde15713 -r da10798406fc core/src/command_constructors.cpp --- a/core/src/command_constructors.cpp Wed Aug 25 22:17:52 2010 +0100 +++ b/core/src/command_constructors.cpp Wed Aug 25 22:23:26 2010 +0100 @@ -66,32 +66,32 @@ // CThreadCommandConstructor. // -CThreadCommandConstructor* CThreadCommandConstructor::NewLC(TCommandConstructor aConstructor, TUint aFlags) +CThreadCommandConstructor* CThreadCommandConstructor::NewLC(TCommandConstructor aConstructor, TUint aFlags, MTaskRunner* aTaskRunner) { CCommandBase* command = (*aConstructor)(); - CThreadCommandConstructor* self = CThreadCommandConstructor::NewLC(command->Name(), aConstructor, aFlags); + CThreadCommandConstructor* self = CThreadCommandConstructor::NewLC(command->Name(), aConstructor, aFlags, aTaskRunner); CleanupStack::Pop(self); CleanupStack::PopAndDestroy(command); CleanupStack::PushL(self); return self; } -CThreadCommandConstructor* CThreadCommandConstructor::NewLC(const TDesC& aCommandName, TCommandConstructor aConstructor, TUint aFlags) +CThreadCommandConstructor* CThreadCommandConstructor::NewLC(const TDesC& aCommandName, TCommandConstructor aConstructor, TUint aFlags, MTaskRunner* aTaskRunner) { - CThreadCommandConstructor* self = new(ELeave) CThreadCommandConstructor(aFlags, aConstructor); + CThreadCommandConstructor* self = new(ELeave) CThreadCommandConstructor(aFlags, aConstructor, aTaskRunner); CleanupStack::PushL(self); self->BaseConstructL(aCommandName); return self; } -CThreadCommandConstructor::CThreadCommandConstructor(TUint aFlags, TCommandConstructor aConstructor) - : CCommandConstructorBase(ETypeThread), iFlags(aFlags), iConstructor(aConstructor) +CThreadCommandConstructor::CThreadCommandConstructor(TUint aFlags, TCommandConstructor aConstructor, MTaskRunner* aTaskRunner) + : CCommandConstructorBase(ETypeThread), iFlags(aFlags), iConstructor(aConstructor), iTaskRunner(aTaskRunner) { } MCommand* CThreadCommandConstructor::ConstructCommandL() { - return CThreadCommand::NewL(CommandName(), iConstructor, iFlags); + return CThreadCommand::NewL(CommandName(), iConstructor, iFlags, iTaskRunner); } void CThreadCommandConstructor::AppendDescriptionL(RLtkBuf16& aBuf) const diff -r 698ccde15713 -r da10798406fc core/src/command_constructors.h --- a/core/src/command_constructors.h Wed Aug 25 22:17:52 2010 +0100 +++ b/core/src/command_constructors.h Wed Aug 25 22:23:26 2010 +0100 @@ -18,6 +18,8 @@ #include class MCommand; +class MTaskRunner; + namespace LtkUtils { class RLtkBuf16; } using LtkUtils::RLtkBuf16; @@ -61,16 +63,17 @@ class CThreadCommandConstructor : public CCommandConstructorBase { public: - static CThreadCommandConstructor* NewLC(TCommandConstructor aConstructor, TUint aFlags); - static CThreadCommandConstructor* NewLC(const TDesC& aCommandName, TCommandConstructor aConstructor, TUint aFlags); + static CThreadCommandConstructor* NewLC(TCommandConstructor aConstructor, TUint aFlags, MTaskRunner* aTaskRunner); + static CThreadCommandConstructor* NewLC(const TDesC& aCommandName, TCommandConstructor aConstructor, TUint aFlags, MTaskRunner* aTaskRunner); private: - CThreadCommandConstructor(TUint aFlags, TCommandConstructor aConstructor); + CThreadCommandConstructor(TUint aFlags, TCommandConstructor aConstructor, MTaskRunner* aTaskRunner); private: // From CCommandConstructorBase. virtual MCommand* ConstructCommandL(); virtual void AppendDescriptionL(RLtkBuf16& aBuf) const; private: TUint iFlags; TCommandConstructor iConstructor; + MTaskRunner* iTaskRunner; }; class CExeCommandConstructor : public CCommandConstructorBase diff -r 698ccde15713 -r da10798406fc core/src/command_factory.cpp --- a/core/src/command_factory.cpp Wed Aug 25 22:17:52 2010 +0100 +++ b/core/src/command_factory.cpp Wed Aug 25 22:23:26 2010 +0100 @@ -24,7 +24,7 @@ #include "xmodem.h" #include "ymodem.h" #include "version.h" - +#include "worker_thread.h" // // Constants. @@ -84,6 +84,7 @@ Cancel(); iCommands.ResetAndDestroy(); iLock.Close(); + delete iThreadPool; } TInt CompareCommandNames(const CCommandConstructorBase& aCommand1, const CCommandConstructorBase& aCommand2) @@ -250,6 +251,7 @@ { User::LeaveIfError(iLock.CreateLocal()); User::LeaveIfError(iFs.DriveList(iDriveList)); + iThreadPool = CThreadPool::NewL(); AddThreadCommandL(CCmdExit::NewLC); // Note, this command should never execute as 'exit' has handled explicitly by CParser. It exists so that 'exit' appears in fshell's help list and also to support 'exit --help'. AddThreadCommandL(CCmdHelp::NewLC, CThreadCommand::ESharedHeap); @@ -301,7 +303,7 @@ AddThreadCommandL(CCmdStart::NewLC); AddThreadCommandL(CCmdCompare::NewLC); AddThreadCommandL(CCmdTime::NewLC); - AddThreadCommandL(CCmdRepeat::NewLC); + AddThreadCommandL(CCmdRepeat::NewLC); // TODO: Should this have EUpdateEnvironment? It seems weird that source and foreach do but repeat doesn't. -TomS AddThreadCommandL(CCmdDebug::NewLC); AddThreadCommandL(CCmdReadMem::NewLC); AddThreadCommandL(CCmdE32Header::NewLC); @@ -417,14 +419,14 @@ void CCommandFactory::AddThreadCommandL(TCommandConstructor aConstructor, TUint aFlags) { - CCommandConstructorBase* constructor = CThreadCommandConstructor::NewLC(aConstructor, aFlags); + CCommandConstructorBase* constructor = CThreadCommandConstructor::NewLC(aConstructor, aFlags, iThreadPool); AddCommandL(constructor); CleanupStack::Pop(constructor); } void CCommandFactory::AddThreadCommandL(const TDesC& aCommandName, TCommandConstructor aConstructor, TUint aAttributes, TUint aFlags) { - CCommandConstructorBase* constructor = CThreadCommandConstructor::NewLC(aCommandName, aConstructor, aFlags); + CCommandConstructorBase* constructor = CThreadCommandConstructor::NewLC(aCommandName, aConstructor, aFlags, iThreadPool); constructor->SetAttributes(aAttributes); AddCommandL(constructor); CleanupStack::Pop(constructor); @@ -432,7 +434,7 @@ void CCommandFactory::AddAliasCommandL(const TDesC& aAliasName, TCommandConstructor aConstructor, const TDesC* aAdditionalArguments, const TDesC* aReplacementArguments, TUint aAttributes, TUint aFlags) { - CCommandConstructorBase* aliasedCommand = CThreadCommandConstructor::NewLC(aConstructor, aFlags); + CCommandConstructorBase* aliasedCommand = CThreadCommandConstructor::NewLC(aConstructor, aFlags, iThreadPool); CCommandConstructorBase* constructor = CAliasCommandConstructor::NewLC(aAliasName, aliasedCommand, aAdditionalArguments, aReplacementArguments); CleanupStack::Pop(2, aliasedCommand); // Now owned by "constructor". CleanupStack::PushL(constructor); diff -r 698ccde15713 -r da10798406fc core/src/command_factory.h --- a/core/src/command_factory.h Wed Aug 25 22:17:52 2010 +0100 +++ b/core/src/command_factory.h Wed Aug 25 22:23:26 2010 +0100 @@ -15,7 +15,9 @@ #define __COMMAND_FACTORY_H__ #include -#include "command_constructors.h" +#include "command_wrappers.h" +class CCommandConstructorBase; +class CThreadPool; #include "error.h" class RFs; @@ -51,6 +53,7 @@ virtual void RunL(); virtual void DoCancel(); virtual TInt RunError(TInt aError); + private: RFs& iFs; mutable RMutex iLock; @@ -58,6 +61,7 @@ TDriveList iDriveList; TBool iFileSystemScanned; TBool iFailedToScanFileSystem; + CThreadPool* iThreadPool; }; diff -r 698ccde15713 -r da10798406fc core/src/command_wrappers.cpp --- a/core/src/command_wrappers.cpp Wed Aug 25 22:17:52 2010 +0100 +++ b/core/src/command_wrappers.cpp Wed Aug 25 22:23:26 2010 +0100 @@ -11,20 +11,14 @@ // #include "command_wrappers.h" - - -// -// Constants. -// - -const TInt KMaxHeapSize = KMinHeapSize * 1024; - +#include "worker_thread.h" // // CCommandWrapperBase. // CCommandWrapperBase::CCommandWrapperBase() + : CActive(CActive::EPriorityStandard) { } @@ -96,14 +90,23 @@ delete this; } +void CCommandWrapperBase::RunL() + { + // Optionally for use by subclasses + } + +void CCommandWrapperBase::DoCancel() + { + // Optionally for use by subclasses + } // // CThreadCommand. // -CThreadCommand* CThreadCommand::NewL(const TDesC& aName, TCommandConstructor aCommandConstructor, TUint aFlags) +CThreadCommand* CThreadCommand::NewL(const TDesC& aName, TCommandConstructor aCommandConstructor, TUint aFlags, MTaskRunner* aTaskRunner) { - CThreadCommand* self = new(ELeave) CThreadCommand(aCommandConstructor, aFlags); + CThreadCommand* self = new(ELeave) CThreadCommand(aCommandConstructor, aFlags, aTaskRunner); CleanupStack::PushL(self); self->ConstructL(aName); CleanupStack::Pop(self); @@ -112,14 +115,15 @@ CThreadCommand::~CThreadCommand() { - delete iWatcher; - delete iArgs; + Cancel(); + delete iCommandLine; iThread.Close(); } -CThreadCommand::CThreadCommand(TCommandConstructor aCommandConstructor, TUint aFlags) - : iFlags(aFlags), iCommandConstructor(aCommandConstructor) +CThreadCommand::CThreadCommand(TCommandConstructor aCommandConstructor, TUint aFlags, MTaskRunner* aTaskRunner) + : iFlags(aFlags), iCommandConstructor(aCommandConstructor), iTaskRunner(aTaskRunner) { + CActiveScheduler::Add(this); iThread.SetHandle(0); // By default RThread refers to the current thread. This results in fshell's thread exiting if this object gets killed before it has managed to open a real thread handle. if (iFlags & EUpdateEnvironment) iFlags |= ESharedHeap; // Update environment implies a shared heap, ever since we did away with the explict SwitchAllocator } @@ -127,66 +131,31 @@ void CThreadCommand::ConstructL(const TDesC& aName) { BaseConstructL(aName); - iWatcher = CThreadWatcher::NewL(); } -void CommandThreadStartL(CThreadCommand::TArgs& aArgs) +void CThreadCommand::DoCommandThreadStartL(TAny* aSelf) { - if (aArgs.iFlags & CThreadCommand::ESharedHeap) - { - // If we're sharing the main fshell heap, we have to play by the rules and not crash - User::SetCritical(User::EProcessCritical); - } - - CActiveScheduler* scheduler = new(ELeave) CActiveScheduler; - CleanupStack::PushL(scheduler); - CActiveScheduler::Install(scheduler); - - HBufC* commandLine = aArgs.iCommandLine.AllocLC(); + CThreadCommand* self = static_cast(aSelf); IoUtils::CEnvironment* env; - if (aArgs.iFlags & CThreadCommand::EUpdateEnvironment) + if (self->iFlags & CThreadCommand::EUpdateEnvironment) { - env = aArgs.iEnv.CreateSharedEnvironmentL(); + env = self->iSuppliedEnv->CreateSharedEnvironmentL(); } else { // A straight-forward copy - env = IoUtils::CEnvironment::NewL(aArgs.iEnv); + env = IoUtils::CEnvironment::NewL(*self->iSuppliedEnv); } CleanupStack::PushL(env); - CCommandBase* command = (*aArgs.iCommandConstructor)(); - RThread parentThread; - User::LeaveIfError(parentThread.Open(aArgs.iParentThreadId)); - parentThread.RequestComplete(aArgs.iParentStatus, KErrNone); - parentThread.Close(); - - command->RunCommandL(commandLine, env); - CleanupStack::PopAndDestroy(4, scheduler); // env, command, commandline, scheduler + CCommandBase* command = (*self->iCommandConstructor)(); + //RDebug::Print(_L("5. DoCommandThreadStartL rendezvousing for %S %S"), &self->CmndName(), self->iCommandLine); + RThread::Rendezvous(KErrNone); + command->RunCommandL(self->iCommandLine, env); + CleanupStack::PopAndDestroy(2, env); // command, env } -TInt CommandThreadStart(TAny* aPtr) - { - CThreadCommand::TArgs args = *(CThreadCommand::TArgs*)aPtr; - TBool sharedHeap = (args.iFlags & CThreadCommand::ESharedHeap); - if (!sharedHeap) - { - __UHEAP_MARK; - } - TInt err = KErrNoMemory; - CTrapCleanup* cleanup = CTrapCleanup::New(); - if (cleanup) - { - TRAP(err, CommandThreadStartL(args)); - delete cleanup; - } - if (!sharedHeap) - { - __UHEAP_MARKEND; - } - return err; - } void SetHandleOwnersL(TThreadId aThreadId, RIoReadHandle& aStdin, RIoWriteHandle& aStdout, RIoWriteHandle& aStderr) { @@ -199,64 +168,36 @@ { ASSERT(iObserver == NULL); - TRequestStatus status(KRequestPending); - iArgs = new TArgs(iFlags, aEnv, iCommandConstructor, aCommandLine, status); - if (iArgs == NULL) + MThreadedTask* thread = NULL; + TRAPD(err, thread = iTaskRunner->NewTaskInSeparateThreadL(CmndName(), iFlags & ESharedHeap, &DoCommandThreadStartL, this)); + if (err) return err; + + TRAP(err, SetHandleOwnersL(thread->GetThreadId(), CmndStdin(), CmndStdout(), CmndStderr())); + + if (!err) { - return KErrNoMemory; + iCommandLine = aCommandLine.Alloc(); + if (!iCommandLine) err = KErrNoMemory; } - TInt i = 0; - TName threadName; - TInt err = KErrNone; - do + if (!err) { - const TDesC& name = CmndName(); - threadName.Format(_L("%S_%02d"), &name, i++); - if (iFlags & ESharedHeap) - { - err = iThread.Create(threadName, CommandThreadStart, KDefaultStackSize, NULL, iArgs); - } - else - { - err = iThread.Create(threadName, CommandThreadStart, KDefaultStackSize, KMinHeapSize, KMaxHeapSize, iArgs); - } - } - while (err == KErrAlreadyExists); - - if (err) - { - return err; + err = iThread.Open(thread->GetThreadId()); } - err = iWatcher->Logon(*this, iThread, aObserver); - if (err) + if (!err) { - iThread.Kill(0); - iThread.Close(); - return err; + iSuppliedEnv = &aEnv; + iObserver = &aObserver; + thread->ExecuteTask(iStatus); + SetActive(); + } + else + { + thread->AbortTask(); } - TThreadId threadId = iThread.Id(); - TRAP(err, SetHandleOwnersL(threadId, CmndStdin(), CmndStdout(), CmndStderr())); - if (err) - { - iThread.Kill(0); - iThread.Close(); - return err; - } - - iThread.Resume(); - User::WaitForRequest(status, iWatcher->iStatus); - if (status == KRequestPending) - { - iThread.Close(); - return iWatcher->iStatus.Int(); - } - - iWatcher->SetActive(); - iObserver = &aObserver; - return KErrNone; + return err; } void CThreadCommand::CmndForeground() @@ -300,71 +241,14 @@ return iThread.ExitCategory(); } - -// -// CThreadCommand::TArgs. -// - -CThreadCommand::TArgs::TArgs(TUint aFlags, IoUtils::CEnvironment& aEnv, TCommandConstructor aCommandConstructor, const TDesC& aCommandLine, TRequestStatus& aParentStatus) - : iFlags(aFlags), iEnv(aEnv), iCommandConstructor(aCommandConstructor), iCommandLine(aCommandLine), iParentStatus(&aParentStatus), iParentThreadId(RThread().Id()) +void CThreadCommand::RunL() { - } - - -// -// CThreadCommand::CThreadWatcher. -// - -CThreadCommand::CThreadWatcher* CThreadCommand::CThreadWatcher::NewL() - { - return new(ELeave) CThreadWatcher(); - } - -CThreadCommand::CThreadWatcher::~CThreadWatcher() - { - Cancel(); - } - -CThreadCommand::CThreadWatcher::CThreadWatcher() - : CActive(CActive::EPriorityStandard) - { - CActiveScheduler::Add(this); + iObserver->HandleCommandComplete(*this, iStatus.Int()); } -TInt CThreadCommand::CThreadWatcher::Logon(CThreadCommand& aCommand, RThread& aThread, MCommandObserver& aObserver) +void CThreadCommand::DoCancel() { - TInt ret = KErrNone; - aThread.Logon(iStatus); - if (iStatus != KRequestPending) - { - User::WaitForRequest(iStatus); - ret = iStatus.Int(); - } - else - { - iCommand = &aCommand; - iThread = &aThread; - iObserver = &aObserver; - } - return ret; - } - -void CThreadCommand::CThreadWatcher::SetActive() - { - CActive::SetActive(); - } - -void CThreadCommand::CThreadWatcher::RunL() - { - iObserver->HandleCommandComplete(*iCommand, iStatus.Int()); - } - -void CThreadCommand::CThreadWatcher::DoCancel() - { - if (iThread) - { - iThread->LogonCancel(iStatus); - } + CmndKill(); // This is a bit drastic, but effective... } diff -r 698ccde15713 -r da10798406fc core/src/command_wrappers.h --- a/core/src/command_wrappers.h Wed Aug 25 22:17:52 2010 +0100 +++ b/core/src/command_wrappers.h Wed Aug 25 22:23:26 2010 +0100 @@ -55,8 +55,7 @@ virtual void HandleCommandComplete(MCommand& aCommand, TInt aError) = 0; }; - -class CCommandWrapperBase : public CBase, public MCommand +class CCommandWrapperBase : public CActive, public MCommand { protected: CCommandWrapperBase(); @@ -74,6 +73,10 @@ virtual void CmndDisown(); private: // From MCommand. virtual void CmndRelease(); +protected: // From CActive + void RunL(); + void DoCancel(); + private: HBufC* iName; ///< This is used by concrete classes as they see fit. RIoReadHandle iStdin; @@ -91,11 +94,14 @@ ESharedHeap = 0x00000002, // Any command that accesses gShell must have this set }; public: - static CThreadCommand* NewL(const TDesC& aName, TCommandConstructor aCommandConstructor, TUint aFlags); + static CThreadCommand* NewL(const TDesC& aName, TCommandConstructor aCommandConstructor, TUint aFlags, MTaskRunner* aTaskRunner); ~CThreadCommand(); private: - CThreadCommand(TCommandConstructor aCommandConstructor, TUint aFlags); + CThreadCommand(TCommandConstructor aCommandConstructor, TUint aFlags, MTaskRunner* aTaskRunner); void ConstructL(const TDesC& aName); + void RunL(); + void DoCancel(); + static void DoCommandThreadStartL(TAny* aSelf); private: // From MCommand. virtual TInt CmndRun(const TDesC& aCommandLine, IoUtils::CEnvironment& aEnv, MCommandObserver& aObserver, RIoSession& aIoSession); virtual void CmndForeground(); @@ -105,43 +111,14 @@ virtual TInt CmndResume(); virtual TExitType CmndExitType() const; virtual TExitCategoryName CmndExitCategory() const; -public: - class TArgs - { - public: - TArgs(TUint aFlags, CEnvironment& aEnv, TCommandConstructor aCommandConstructor, const TDesC& aCommandLine, TRequestStatus& aParentStatus); - public: - TUint iFlags; - CEnvironment& iEnv; - TCommandConstructor iCommandConstructor; - const TPtrC iCommandLine; - TRequestStatus* iParentStatus; - TThreadId iParentThreadId; - }; - class CThreadWatcher : public CActive - { - public: - static CThreadWatcher* NewL(); - ~CThreadWatcher(); - TInt Logon(CThreadCommand& aCommand, RThread& aThread, MCommandObserver& aObserver); - void SetActive(); - private: - CThreadWatcher(); - private: // From CActive. - virtual void RunL(); - virtual void DoCancel(); - private: - CThreadCommand* iCommand; - RThread* iThread; - MCommandObserver* iObserver; - }; private: TUint iFlags; TCommandConstructor iCommandConstructor; MCommandObserver* iObserver; - TArgs* iArgs; RThread iThread; - CThreadWatcher* iWatcher; + MTaskRunner* iTaskRunner; + CEnvironment* iSuppliedEnv; + HBufC* iCommandLine; }; diff -r 698ccde15713 -r da10798406fc core/src/fshell.mmp --- a/core/src/fshell.mmp Wed Aug 25 22:17:52 2010 +0100 +++ b/core/src/fshell.mmp Wed Aug 25 22:23:26 2010 +0100 @@ -65,6 +65,7 @@ source lexer.cpp source file_reader.cpp source script_command.cpp +source worker_thread.cpp sourcepath ..\builtins source hello.cpp diff -r 698ccde15713 -r da10798406fc core/src/worker_thread.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/src/worker_thread.cpp Wed Aug 25 22:23:26 2010 +0100 @@ -0,0 +1,686 @@ +// worker_thread.cpp +// +// Copyright (c) 2010 Accenture. All rights reserved. +// This component and the accompanying materials are made available +// under the terms of the "Eclipse Public License v1.0" +// which accompanies this distribution, and is available +// at the URL "http://www.eclipse.org/legal/epl-v10.html". +// +// Initial Contributors: +// Accenture - Initial contribution +// + +/* + +Some notes on how CThreadPool works: + * There are 2 types of CWorkerThread - ones that share a specific heap, and those that have their own heap. + * Tasks that need a shared heap can only be run using a CWorkerThread that is bound to that heap (one will be created if necessary) + * Tasks that don't need a shared heap can be run on any CWorkerThread that has its own heap. + * Excess CWorkerThreads are deleted after they've been idle for KCacheTime. + * CThreadPool uses an RFastLock on all operations that could be called from multiple threads. + * All member data of CThreadPool (including CWorkerThreads and their members) is kept on iThreadPoolAllocator, which was the heap + that the CThreadPool was created on. User::SwitchAllocator() is used where necessary to manipulate CThreadPool data from other threads + while the Lock is held + * When a task is queued, a non-busy CWorkerThread is found (or created). + * When a task is completed in a worker thread, it calls iParentThread.RequestComplete(iCompletionStatus, err). It then calls + CThreadPool::WorkerFinished() to set the CWorkerThread to be non-busy again. + * When a CWorkerThread is created it arranges (via SignalSelf()) to add a CThreadDeathWatcher to the main thread. + It follows that notifications about tasks failed because of thread death are always serviced by the main thread, which must therefore + remain responsive. (In the non-dead case, the completion notification is sent directly from the worker thread). + +*/ + +#include "worker_thread.h" +#include + +#define WT_LOG(args...) + +// These defines are for debugging only +//#define WT_LOG(args...) RDebug::Print(args) +// These probably don't even work any more - use with extreme caution! +//#define NO_REUSE +//#define IMMEDIATE_CLEANUP + +const TInt KCacheTime = 1000000; // 1 second + +CThreadPool* CThreadPool::NewL() + { + CThreadPool* self = new(ELeave) CThreadPool(); + CleanupStack::PushL(self); + self->ConstructL(); + CleanupStack::Pop(self); + return self; + } + +CThreadPool::CThreadPool() + : CActive(CActive::EPriorityLow) // Cleanup doesn't need to be a priority + { + CActiveScheduler::Add(this); + } + +void CThreadPool::ConstructL() + { + User::LeaveIfError(iMainThread.Open(RThread().Id())); + iIdleTimer = CPeriodic::NewL(CActive::EPriorityLow-1); // Interactions around iPendingThreadLogons mean it's probably a good idea to have this priority below that of CThreadPool itself + iThreadPoolAllocator = &User::Allocator(); + User::LeaveIfError(iLock.CreateLocal()); + iThreads.ReserveL(2); + // We don't create any workers by default + } + +CThreadPool::~CThreadPool() + { + WT_LOG(_L("Deleting thread pool. %d threads created during its lifetime"), iCountThreadsCreated); + Cancel(); + for (TInt i = 0; i < iThreads.Count(); i++) + { + iThreads[i]->Shutdown(); + } + iThreads.ResetAndDestroy(); + iLock.Close(); + delete iIdleTimer; + iMainThread.Close(); + iPendingThreadLogons.Close(); + } + +void CThreadPool::Lock() + { + iLock.Wait(); + } + +void CThreadPool::SwitchToThreadPoolHeap() + { + //ASSERT(iLock.IsHeld()); + RAllocator* allocator = &User::Allocator(); + if (allocator != iThreadPoolAllocator) + { + WT_LOG(_L("Thread %d switching from 0x%x to thread pool allocator 0x%x"), TUint(RThread().Id()), allocator, iThreadPoolAllocator); + iTempThreadAllocator = User::SwitchAllocator(iThreadPoolAllocator); + } + } + +void CThreadPool::RestoreHeap() + { + if (iTempThreadAllocator) + { + WT_LOG(_L("Thread %d restoring heap 0x%x"), TUint(RThread().Id()), iTempThreadAllocator); + User::SwitchAllocator(iTempThreadAllocator); + iTempThreadAllocator = NULL; + } + } + +void CThreadPool::Unlock() + { + RestoreHeap(); + iLock.Signal(); + } + +void CThreadPool::LockLC() + { + Lock(); + CleanupStack::PushL(TCleanupItem(&DoUnlock, &iLock)); + } + +#define _LOFF(p,T,f) ((T*)(((TUint8*)(p))-_FOFF(T,f))) // Why isn't this defined user-side? + +void CThreadPool::DoUnlock(TAny* aLock) + { + CThreadPool* self = _LOFF(aLock, CThreadPool, iLock); + self->Unlock(); + } + +MThreadedTask* CThreadPool::NewTaskInSeparateThreadL(const TDesC& aThreadName, TBool aSharedHeap, MTaskRunner::TThreadFunctionL aThreadFunction, TAny* aThreadContext) + { + WT_LOG(_L("1. NewTaskInSeparateThreadL task %d (%S)"), iTaskCounter, &aThreadName); + + LockLC(); + // Hunt for a non-busy thread + CWorkerThread* foundThread = NULL; + RAllocator* requiredAllocator = aSharedHeap ? &User::Allocator() : NULL; +#ifndef NO_REUSE // This code normally does run, except during debugging when NO_REUSE is defined + for (TInt i = 0; i < iThreads.Count(); i++) + { + CWorkerThread* thread = iThreads[i]; + if (!thread->Busy() && thread->SharedAllocator() == requiredAllocator) + { + // If the worker thread is sharing an allocator, it must be sharing the *same* one as this current thread + ASSERT(thread->Running()); + foundThread = thread; + break; + } + } +#endif + + if (foundThread == NULL) + { + SwitchToThreadPoolHeap(); + foundThread = CWorkerThread::NewLC(this, requiredAllocator); + iCountThreadsCreated++; + WT_LOG(_L("Creating new worker thread %d"), TUint(foundThread->GetThreadId())); + iThreads.AppendL(foundThread); + CleanupStack::Pop(foundThread); + iPendingThreadLogons.AppendL(foundThread); + SignalSelf(); // So the iPendingThreadLogons gets sorted out in context of main thread + RestoreHeap(); + } + + WT_LOG(_L("Using worker thread %d for task %d (%S)"), TUint(foundThread->GetThreadId()), iTaskCounter, &aThreadName); + + User::LeaveIfError(foundThread->Setup(iTaskCounter, aThreadName, aThreadFunction, aThreadContext)); + foundThread->SetBusy(ETrue); + iTaskCounter++; + CleanupStack::PopAndDestroy(&iLock); + return foundThread; + } + +void CThreadPool::WorkerDied(CWorkerThread* aWorker) + { + Lock(); + // This is now always called in the main thread + ASSERT(RThread().Id() == iMainThread.Id()); + + // Find it and remove it - the next request will create a new worker if needed + for (TInt i = 0; i < iThreads.Count(); i++) + { + CWorkerThread* worker = iThreads[i]; + if (worker == aWorker) + { + delete worker; + iThreads.Remove(i); + break; + } + } + Unlock(); + } + +void CThreadPool::WorkerFinished(CWorkerThread* aWorker) + { + Lock(); + aWorker->SetBusy(EFalse); +#if defined(NO_REUSE) + // Nothing +#else + // This is the normal case - queue ourself to run (on the main thread) so we can trigger the idle timer from our RunL + // Can't do that directly as timers are all thread-local + SignalSelf(); +#endif + Unlock(); + } + + +void CThreadPool::SignalSelf() + { + // TODO: Optimise if we actually are the main thread? + + // Must be holding lock + iPendingCallbacks++; + TRequestStatus* stat = &iStatus; + if (iPendingCallbacks == 1) // Can't use IsActive() as we might not be in the same thread + { + iStatus = KRequestPending; + SetActive(); + } + iMainThread.RequestComplete(stat, KErrNone); // Fortunately RThread::RequestComplete doesn't set the status to KRequestPending before completing it (unlike User::RequestComplete) + } + +void CThreadPool::CleanupAnyWorkersSharingAllocator(RAllocator* aAllocator) + { + Lock(); + SwitchToThreadPoolHeap(); + for (TInt i = iThreads.Count() - 1; i >= 0; i--) + { + CWorkerThread* worker = iThreads[i]; + if (worker->SharedAllocator() == aAllocator) + { + ASSERT(!worker->Busy()); + worker->Shutdown(); + delete worker; + iThreads.Remove(i); + } + } + Unlock(); + } + +void CThreadPool::PerformHouseKeeping() + { + // Time to do some housekeeping. Algorithm is: + // * Keep a single spare non-busy shared worker around, but only for the main heap (ie a CWorkerThread whose SharedAllocator() equals iThreadPoolAllocator) + // * Bin everything else that isn't busy + + WT_LOG(_L("Idle timer expired, cleaning up spare workers")); + Lock(); + ASSERT(&User::Allocator() == iThreadPoolAllocator); // We're running in the thread pool's main thread so no need to switch allocators + +#ifdef IMMEDIATE_CLEANUP + // Everything gets nuked if this is defined + TBool foundSpareWorker = ETrue; +#else + TBool foundSpareWorker = EFalse; +#endif + for (TInt i = iThreads.Count() - 1; i >= 0; i--) + { + CWorkerThread* worker = iThreads[i]; + if (!worker->Busy()) + { + RAllocator* allocator = worker->SharedAllocator(); + if (allocator == iThreadPoolAllocator && !foundSpareWorker) + { + // This one can stay + foundSpareWorker = ETrue; + } + else + { + // Everything else (that isn't busy) gets cleaned up + worker->Shutdown(); + delete worker; + iThreads.Remove(i); + } + } + } + Unlock(); + } + +void CThreadPool::DoCancel() + { + // There's nothing we can cancel as we complete ourself + } + +void CThreadPool::RunL() + { + Lock(); + iPendingCallbacks--; + while (iPendingCallbacks) + { + // Consume any further signals currently pending - we're somewhat abusing the active scheduler by completing the same repeatedly without it running in between so we have to consume the abuse here to balance things + User::WaitForRequest(iStatus); + iPendingCallbacks--; + } + + // We kinda overload our RunL, using it for both registering logons and triggering the idle cleanup. Ah well. + + if (iPendingThreadLogons.Count()) + { + for (TInt i = 0; i < iPendingThreadLogons.Count(); i++) + { + iPendingThreadLogons[i]->RegisterThreadDeathWatcherOnCurrentThread(); + } + SwitchToThreadPoolHeap(); + iPendingThreadLogons.Reset(); + } + +#ifdef IMMEDIATE_CLEANUP + Unlock(); + // Delete the thread straight away + PerformHouseKeeping(); +#else + // Normally go through the timer + iIdleTimer->Cancel(); // Reset it if it's already counting + iIdleTimer->Start(KCacheTime, KCacheTime, TCallBack(&TimerCallback, this)); + Unlock(); +#endif + } + +TInt CThreadPool::TimerCallback(TAny* aSelf) + { + CThreadPool* self = static_cast(aSelf); + self->iIdleTimer->Cancel(); // Stop the thing being periodic + self->PerformHouseKeeping(); + return 0; + } + + +//// + +class CThreadDeathWatcher : public CActive + { +public: + CThreadDeathWatcher(CWorkerThread* aWorker, RThread& aUnderlyingThread) + : CActive(CActive::EPriorityHigh), iWorker(aWorker), iThread(aUnderlyingThread) + { + } + ~CThreadDeathWatcher() + { + Cancel(); + } + void StartWatching() + { + CActiveScheduler::Add(this); + iThread.Logon(iStatus); + SetActive(); + } +private: + void DoCancel() + { + iThread.LogonCancel(iStatus); + } + void RunL() + { + Deque(); // Our work is done. Might as well extract ourselves from whatever scheduler we're in while we can + iWorker->ThreadDied(); + } + +private: + CWorkerThread* iWorker; + RThread& iThread; + }; + +const TInt KMaxHeapSize = KMinHeapSize * 1024; + +class CWorkerThreadDispatcher : public CActive + { +public: + CWorkerThreadDispatcher(CWorkerThread* aThread) + : CActive(CActive::EPriorityStandard), iThread(aThread) + { + CActiveScheduler::Add(this); + iStatus = KRequestPending; + SetActive(); + } + + void RunL() + { + if (iStatus.Int() == KErrNone) + { + iStatus = KRequestPending; + SetActive(); + iThread->ThreadRun(); + } + else + { + // Time to die + iThread->iAsWait.AsyncStop(); + } + } + + void DoCancel() {} // Can never happen because we can only ever be destroyed as a result of the AsyncStop in RunL from which we can never be active + +private: + CWorkerThread* iThread; + }; + +CWorkerThread* CWorkerThread::NewLC(CThreadPool* aParentPool, RAllocator* aSharedAllocator) + { + CWorkerThread* self = new(ELeave) CWorkerThread(aParentPool, aSharedAllocator); + CleanupStack::PushL(self); + self->ConstructL(); + return self; + } + +CWorkerThread::CWorkerThread(CThreadPool* aParentPool, RAllocator* aSharedAllocator) + : iParentPool(aParentPool), iSharedAllocator(aSharedAllocator) + { + iWorkerThread.SetHandle(0); + } + +void CWorkerThread::ConstructL() + { + TInt err = KErrNone; + TName name; + name.Format(_L("WorkerThread_%x"), this); + if (iSharedAllocator) + { + err = iWorkerThread.Create(name, &ThreadFn, KDefaultStackSize, iSharedAllocator, this); + } + else + { + // Create a new heap with default heap size + err = iWorkerThread.Create(name, &ThreadFn, KDefaultStackSize, KMinHeapSize, KMaxHeapSize, this); + } + User::LeaveIfError(err); + + iThreadDeathWatcher = new(ELeave) CThreadDeathWatcher(this, iWorkerThread); + + TRequestStatus stat; + iWorkerThread.Rendezvous(stat); + + ASSERT(stat == KRequestPending); + if (stat == KRequestPending) + { + iWorkerThread.Resume(); + } + else + { + iWorkerThread.Kill(stat.Int()); + } + + User::WaitForRequest(stat); + User::LeaveIfError(stat.Int()); + // Thread is now ready to do stuff + } + +CWorkerThread::~CWorkerThread() + { + ASSERT(iWorkerThread.Handle() == 0 || iWorkerThread.ExitType() != EExitPending); + //Cancel(); + delete iThreadDeathWatcher; + iParentThread.Close(); + iWorkerThread.Close(); + } + +TInt CWorkerThread::Setup(TInt aTaskId, const TDesC& aThreadName, MTaskRunner::TThreadFunctionL aThreadFunction, TAny* aThreadContext) + { + ASSERT(!Busy()); + + // Things to do in preparation for running a command: + // 1. Check what thread we're running in now and set ourselves and our thread death watcher on its scheduler (may not be the thread that created us originally) + // 2. Store the required context for the worker thread to access + + TInt err = iParentThread.Open(RThread().Id()); + if (!err) + { + iTaskId = aTaskId; + iName = &aThreadName; + iFn = aThreadFunction; + iContext = aThreadContext; + } + return err; + } + +TInt CWorkerThread::ExecuteTask(TRequestStatus& aCompletionStatus) + { + WT_LOG(_L("2. + Go task %d (%S)"), iTaskId, iName); + + ASSERT(iCompletionStatus == NULL); + aCompletionStatus = KRequestPending; + iCompletionStatus = &aCompletionStatus; + + // Setup rendezvous and completion requestStatuses before signalling iDispatchStatus + TRequestStatus rendezvousStat; + iWorkerThread.Rendezvous(rendezvousStat); + + // Signal the worker thread to do its thing + TRequestStatus* dispatchStat = iDispatchStatus; + iWorkerThread.RequestComplete(dispatchStat, KErrNone); + + User::WaitForRequest(rendezvousStat); + + TInt err = rendezvousStat.Int(); + if (iWorkerThread.ExitType() != EExitPending && rendezvousStat.Int() >= 0) + { + err = KErrDied; + } + + WT_LOG(_L("6. - Go task %d (%S) err=%d"), iTaskId, iName, err); + + if (err != KErrNone) + { + // We don't signal completion if there was an error prior to the rendezvous, we just return it here + iCompletionStatus = NULL; + } + return err; + } + +TThreadId CWorkerThread::GetThreadId() const + { + return iWorkerThread.Id(); + } + +void CWorkerThread::AbortTask() + { + // Undo setup + iName = NULL; + iFn = NULL; + iContext = NULL; + iParentThread.Close(); + iParentPool->WorkerFinished(this); + } + +void CWorkerThread::ThreadRun() + { + // Runs in the worker thread + + if (!UsingSharedAllocator()) + { + __UHEAP_MARK; + } + + TInt i = 0; + TInt err = KErrNone; + do + { + TName threadName; + threadName.Format(_L("%S_%02d"), iName, i++); + err = User::RenameThread(threadName); + WT_LOG(_L("3. Running thread for task %d (%S)"), iTaskId, &threadName); + } + while (err == KErrAlreadyExists); + + // Execute the actual function + WT_LOG(_L("4. + ThreadRun for task %d (%S)"), iTaskId, iName); + TRAP(err, (*iFn)(iContext)); + WT_LOG(_L("7. - ThreadRun for task %d (%S) signalling thread %d with err=%d"), iTaskId, iName, TUint(iParentThread.Id()), err); + + if (!UsingSharedAllocator()) + { + // Do this before saying we've actually finished the task, otherwise we risk deadlocking on the thread pool lock when IMMEDIATE_CLEANUP is defined + // because CleanupAnyWorkersSharingAllocator takes the lock, but the lock can already be held by that point around the Shutdown that IMMEDIATE_CLEANUP does. + iParentPool->CleanupAnyWorkersSharingAllocator(&User::Allocator()); // Otherwise the heap check will fail + } + + // And signal back the result + iParentPool->WorkerFinished(this); + iParentThread.RequestComplete(iCompletionStatus, err); + + // Finally put our name back to what it was +#ifdef _DEBUG + _LIT(KDebugName, "WorkerThread_%x (was %S)"); + TName threadName = RThread().Name(); + TPtrC oldName = threadName.Left(threadName.MaxLength() - KDebugName().Length()); + TName newName; + newName.Format(KDebugName, this, &oldName); + User::RenameThread(newName); +#else + TName threadName; + threadName.Format(_L("WorkerThread_%x"), this); + User::RenameThread(threadName); +#endif + + if (!UsingSharedAllocator()) + { + __UHEAP_MARKEND; + } + } + +TBool CWorkerThread::Busy() const + { + return iBusy; + } + +void CWorkerThread::SetBusy(TBool aBusy) + { + iBusy = aBusy; + } + +TBool CWorkerThread::UsingSharedAllocator() const + { + return iSharedAllocator != NULL; + } + +RAllocator* CWorkerThread::SharedAllocator() const + { + return iSharedAllocator; + } + +void CWorkerThread::ThreadDied() + { + WT_LOG(_L("Task %d died with exittype %d reason %d"), iTaskId, iWorkerThread.ExitType(), iWorkerThread.ExitReason()); + TInt err = iWorkerThread.ExitReason(); + if (err >= 0) err = KErrDied; + if (iCompletionStatus) + { + iParentThread.RequestComplete(iCompletionStatus, err); + } + iParentPool->WorkerDied(this); + } + +TInt CWorkerThread::ThreadFn(TAny* aSelf) + { + CWorkerThread* self = static_cast(aSelf); + if (self->UsingSharedAllocator()) + { + // If we're sharing the main fshell heap, we have to play by the rules and not crash + User::SetCritical(User::EProcessCritical); + + // We also need to temporarily switch to the thread pool's allocator, because our CTrapCleanup, CActiveScheduler etc conceptually belong with the worker thread object and not with the heap we're sharing (which could be different... damn repeat command again) + //self->iParentPool->Lock(); + //self->iParentPool->SwitchToThreadPoolHeap(); + } + else + { + __UHEAP_MARK; + } + TInt err = KErrNoMemory; + CTrapCleanup* cleanup = CTrapCleanup::New(); + //WT_LOG(_L("Worker thread %d creating trapcleanup 0x%x"), TUint(RThread().Id()), cleanup); + if (cleanup) + { + TRAP(err, self->ThreadFnL()); + //WT_LOG(_L("Worker thread %d deleting trapcleanup 0x%x"), TUint(RThread().Id()), cleanup); + delete cleanup; + } + if (!self->UsingSharedAllocator()) + { + __UHEAP_MARKEND; + } + return err; + } + +void CWorkerThread::ThreadFnL() + { + CActiveScheduler* scheduler = new(ELeave) CActiveScheduler; + CleanupStack::PushL(scheduler); + CActiveScheduler::Install(scheduler); + + CWorkerThreadDispatcher* dispatcher = new(ELeave) CWorkerThreadDispatcher(this); + CleanupStack::PushL(dispatcher); + iDispatchStatus = &dispatcher->iStatus; + RThread::Rendezvous(KErrNone); + iAsWait.Start(); + CleanupStack::PopAndDestroy(2, scheduler); // dispatcher, scheduler + } + +void CWorkerThread::Shutdown() + { + // Can be called from (potentially) any thread + WT_LOG(_L("Shutting down worker thread %d whose exittype is %d"), TUint(GetThreadId()), iWorkerThread.ExitType()); + + if (Busy()) + { + WT_LOG(_L("Thread is still busy - killing it")); + iWorkerThread.Kill(KErrAbort); + // The thread death watcher should take care of everything else, eventually + } + else if (iWorkerThread.ExitType() == EExitPending) + { + TRequestStatus stat; + iWorkerThread.Logon(stat); + iWorkerThread.RequestComplete(iDispatchStatus, KErrCancel); + User::WaitForRequest(stat); + WT_LOG(_L("Shut down worker %d exit=%d"), TUint(GetThreadId()), iWorkerThread.ExitType()); + } + } + +void CWorkerThread::RegisterThreadDeathWatcherOnCurrentThread() + { + iThreadDeathWatcher->StartWatching(); + } diff -r 698ccde15713 -r da10798406fc core/src/worker_thread.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/core/src/worker_thread.h Wed Aug 25 22:23:26 2010 +0100 @@ -0,0 +1,131 @@ +// worker_thread.h +// +// Copyright (c) 2010 Accenture. All rights reserved. +// This component and the accompanying materials are made available +// under the terms of the "Eclipse Public License v1.0" +// which accompanies this distribution, and is available +// at the URL "http://www.eclipse.org/legal/epl-v10.html". +// +// Initial Contributors: +// Accenture - Initial contribution +// + +#ifndef WORKER_THREAD_H +#define WORKER_THREAD_H + +#include + +class MThreadedTask + { +public: + virtual TThreadId GetThreadId() const=0; + virtual TInt ExecuteTask(TRequestStatus& aCompletionStatus)=0; + virtual void AbortTask()=0; // If you decide you don't want to call ExecuteTask + }; + +class MTaskRunner + { +public: + typedef void (*TThreadFunctionL)(TAny*); + virtual MThreadedTask* NewTaskInSeparateThreadL(const TDesC& aThreadName, TBool aSharedHeap, TThreadFunctionL aThreadFunction, TAny* aThreadContext)=0; + }; + +class CWorkerThread; + +class CThreadPool : public CActive, public MTaskRunner + { +public: + static CThreadPool* NewL(); + ~CThreadPool(); + void WorkerDied(CWorkerThread* aWorker); + void WorkerFinished(CWorkerThread* aWorker); + void CleanupAnyWorkersSharingAllocator(RAllocator* aAllocator); + +public: // From MTaskRunner + virtual MThreadedTask* NewTaskInSeparateThreadL(const TDesC& aThreadName, TBool aSharedHeap, TThreadFunctionL aThreadFunction, TAny* aThreadContext); + +private: + CThreadPool(); + void ConstructL(); + static TInt TimerCallback(TAny* aSelf); + void PerformHouseKeeping(); + void RunL(); + void DoCancel(); + void SignalSelf(); + + void Lock(); + void LockLC(); + void SwitchToThreadPoolHeap(); + void RestoreHeap(); + void Unlock(); + static void DoUnlock(TAny* aLock); + +private: + RFastLock iLock; // Thread pool is shared between eg fshell main thread and any source threads, so needs locking around WorkerFinished and NewTaskInSeparateThreadL etc + RAllocator* iThreadPoolAllocator; // The thread pool can be modified from a thread that isn't using the main heap, so we need to track that and use User::SwitchAllocator as needed + RAllocator* iTempThreadAllocator; // Only used while lock is held + RPointerArray iThreads; + TInt iTaskCounter; + RThread iMainThread; + TInt iPendingCallbacks; + CPeriodic* iIdleTimer; + TInt iCountThreadsCreated; // This is for statistics gathering, not involved in the logic + RArray iPendingThreadLogons; // Not owned + }; + + +class CThreadDeathWatcher; + +class CWorkerThread : public CBase, public MThreadedTask + { +public: + static CWorkerThread* NewLC(CThreadPool* aParentPool, RAllocator* aSharedAllocator); + // For safety these 2 need to be called with the thread pool lock held + TBool Busy() const; + void SetBusy(TBool aBusy); + + TBool UsingSharedAllocator() const; + RAllocator* SharedAllocator() const; + TInt Setup(TInt aTaskId, const TDesC& aThreadName, MTaskRunner::TThreadFunctionL aThreadFunction, TAny* aThreadContext); + void Shutdown(); + ~CWorkerThread(); + void RegisterThreadDeathWatcherOnCurrentThread(); + + TBool Running() const { return iWorkerThread.Handle() && iWorkerThread.ExitType() == EExitPending; } + +public: // From MThreadedTask + TThreadId GetThreadId() const; + TInt ExecuteTask(TRequestStatus& aCompletionStatus); + void AbortTask(); + +public: // For CThreadDeathWatcher to use + void ThreadDied(); + +private: + CWorkerThread(CThreadPool* aParentPool, RAllocator* aSharedAllocator); + void ConstructL(); + void ThreadRun(); + static TInt ThreadFn(TAny* aSelf); + void ThreadFnL(); + +private: + CThreadPool* iParentPool; + RThread iWorkerThread; + RAllocator* iSharedAllocator; + TRequestStatus* iDispatchStatus; + CActiveSchedulerWait iAsWait; // Use a CActiveSchedulerWait to protect ourselves from whatever aThreadFunction may try and do + CThreadDeathWatcher* iThreadDeathWatcher; + TBool iBusy; + + // Things specific to the task currently being executed + RThread iParentThread; // Worker needs this to signal completion to us + const TDesC* iName; + MTaskRunner::TThreadFunctionL iFn; + TAny* iContext; + TInt iTaskId; + TRequestStatus* iCompletionStatus; + + friend class CWorkerThreadDispatcher; + }; + +#endif diff -r 698ccde15713 -r da10798406fc documentation/pod-list.txt diff -r 698ccde15713 -r da10798406fc tools/fsh-builddocs