diff -r 000000000000 -r 818e61de6cd1 crashanalysercmd/PerfToolsSharedLibraries/Engine/SymbianUtils/Threading/MultiThreadedProcessor.cs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/crashanalysercmd/PerfToolsSharedLibraries/Engine/SymbianUtils/Threading/MultiThreadedProcessor.cs Thu Feb 11 15:50:58 2010 +0200 @@ -0,0 +1,262 @@ +/* +* Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). +* All rights reserved. +* This component and the accompanying materials are made available +* under the terms of "Eclipse Public License v1.0" +* which accompanies this distribution, and is available +* at the URL "http://www.eclipse.org/legal/epl-v10.html". +* +* Initial Contributors: +* Nokia Corporation - initial contribution. +* +* Contributors: +* +* Description: +* +*/ +using System; +using System.Text; +using System.IO; +using System.Collections.Generic; +using System.Threading; +using System.Reflection; + +namespace SymbianUtils.Threading +{ + public class MultiThreadedProcessor : DisposableObject + { + #region Enumerations + public enum TEvent + { + EEventStarting = 0, + EEventCompleted + } + #endregion + + #region Delegates & events + public delegate void ProcessorEventHandler( TEvent aEvent ); + public event ProcessorEventHandler EventHandler = delegate { }; + // + public delegate void ItemProcessor( T aItem ); + public event ItemProcessor ProcessItem = null; + // + public delegate void ExceptionHandler( Exception aException ); + public event ExceptionHandler Exception; + #endregion + + #region Constructors + public MultiThreadedProcessor( IEnumerable aCollection ) + : this( aCollection, ThreadPriority.Normal ) + { + } + + public MultiThreadedProcessor( IEnumerable aCollection, ThreadPriority aPriority ) + { + PopulateQueue( aCollection ); + iThreadPriorities = aPriority; + iProcessorCount = System.Environment.ProcessorCount; + } + #endregion + + #region Framework API + public virtual void Start( TSynchronicity aSynchronicity ) + { + iSynchronicity = aSynchronicity; + OnEvent( MultiThreadedProcessor.TEvent.EEventStarting ); + + int count = iQueue.Count; + if ( count == 0 ) + { + // Nothing to do! + OnEvent( MultiThreadedProcessor.TEvent.EEventCompleted ); + } + else + { + // For sync mode, we need to block until the operation + // completes. + DestroyBlocker(); + if ( aSynchronicity == TSynchronicity.ESynchronous ) + { + iSynchronousBlocker = new ManualResetEvent( false ); + } + + // Create worker threads to process queue items. One per + // processor core. + CreateThreads(); + + if ( aSynchronicity == TSynchronicity.ESynchronous ) + { + System.Diagnostics.Debug.Assert( iSynchronousBlocker != null ); + + // Now wait. + using ( iSynchronousBlocker ) + { + iSynchronousBlocker.WaitOne(); + } + iSynchronousBlocker = null; + + // See comments in "RunThread" below for details about why + // we do this here - it avoids a race condition. + OperationComplete(); + } + } + } + + protected virtual bool Process( T aItem ) + { + return false; + } + + protected virtual void OnException( Exception aException ) + { + if ( Exception != null ) + { + Exception( aException ); + } + } + #endregion + + #region Properties + #endregion + + #region Internal methods + protected void PopulateQueue( IEnumerable aCollection ) + { + iQueue.Clear(); + // + foreach ( T item in aCollection ) + { + iQueue.Enqueue( item ); + } + } + + private void CreateThreads() + { + Random r = new Random( DateTime.Now.Millisecond ); + // + int count = System.Environment.ProcessorCount; + for ( int i = 0; i < count; i++ ) + { + string name = string.Format( "Processor Thread {0:d3} {1:d8}", i, r.Next() ); + Thread t = new Thread( new ThreadStart( RunThread ) ); + t.IsBackground = true; + t.Priority = iThreadPriorities; + iThreads.Add( t ); + // + t.Start(); + } + } + + private void RunThread() + { + // Process items until none are left. + while ( iQueue.Count > 0 ) + { + T item; + // + bool dequeued = iQueue.TryToDequeue( out item ); + if ( dequeued ) + { + // First try virtual function call. If that fails then + // we'll resort to trying an event handler. + try + { + bool processed = Process( item ); + if ( processed == false && ProcessItem != null ) + { + ProcessItem( item ); + } + } + catch ( Exception e ) + { + // Let the derived class handle exceptions + OnException( e ); + } + } + } + + // If all the threads have finished then the entire + // operation is complete. + bool finished = false; + lock ( iThreads ) + { + iThreads.Remove( Thread.CurrentThread ); + finished = ( iThreads.Count == 0 ); + } + + // Check for completion + if ( finished ) + { + // If we're operating synchronously, then let the main + // thread (the one that is currently blocked) report + // completion. This prevents a race condition whereby the worker + // threads (i.e. the thread in which this function is running) + // notifies about completion before the main thread has started + // blocking (waiting). This can cause an exception whereby + // the client might dispose of this object twice. + if ( iSynchronicity == TSynchronicity.EAsynchronous ) + { + OperationComplete(); + } + else + { + // Will be done by "Start" + } + + // Always release the blocker to "unblock" the main thread + ReleaseBlocker(); + } + } + + private void OperationComplete() + { + OnEvent( MultiThreadedProcessor.TEvent.EEventCompleted ); + } + + private void DestroyBlocker() + { + if ( iSynchronousBlocker != null ) + { + iSynchronousBlocker.Close(); + iSynchronousBlocker = null; + } + } + + private void ReleaseBlocker() + { + if ( iSynchronousBlocker != null ) + { + iSynchronousBlocker.Set(); + } + } + + protected virtual void OnEvent( TEvent aEvent ) + { + EventHandler( aEvent ); + } + #endregion + + #region From DisposableObject + protected override void CleanupManagedResources() + { + try + { + base.CleanupManagedResources(); + } + finally + { + DestroyBlocker(); + } + } + #endregion + + #region Data members + private readonly int iProcessorCount; + private readonly ThreadPriority iThreadPriorities; + private BlockingQueue iQueue = new BlockingQueue(); + private List iThreads = new List(); + private ManualResetEvent iSynchronousBlocker = null; + private TSynchronicity iSynchronicity = TSynchronicity.ESynchronous; + #endregion + } +}