searcher/tsrc/cpixsearchertest/src/multithreadtester.cpp
author Dremov Kirill (Nokia-D-MSW/Tampere) <kirill.dremov@nokia.com>
Thu, 27 May 2010 13:59:44 +0300
changeset 3 ae3f1779f6da
parent 0 671dee74050a
permissions -rw-r--r--
Revision: 201019 Kit: 2010121

/*
* Copyright (c) 2010 Nokia Corporation and/or its subsidiary(-ies).
* All rights reserved.
* This component and the accompanying materials are made available
* under the terms of "Eclipse Public License v1.0"
* which accompanies this distribution, and is available
* at the URL "http://www.eclipse.org/legal/epl-v10.html".
*
* Initial Contributors:
* Nokia Corporation - initial contribution.
*
* Contributors:
*
* Description: 
*
*/
#include "multithreadtester.h"
#include "RSearchServerSession.h"
#include "CCPixIndexer.h"
#include "CCPixSearcher.h"
#include "CSearchDocument.h"
#include <e32math.h>

#define KIndexingWorkerCount 4
#define KSearchingWorkerCount 16
#define KTermDeviation 10 // The bigger this word, the more different term 
#define KTermCountDeviation 10 // The bigger this word, the more terms per document
_LIT( KContentsField, "Contents " ); 

class CWorker : public CBase 
    {
public: 
    CWorker() 
    : iWorkerId( iNextFreeWorkerId++ ), 
      iActive( EFalse ), 
      iThread(), 
      iSemaphore()
        {
        }

    void ConstructL() 
        {
        User::LeaveIfError( iSemaphore.CreateLocal( 0 ) ); 
        }
    
    ~CWorker() 
        {
        if ( iActive ) 
            {
            Finish(); 
            }
        iSemaphore.Close(); 
        }
    
    TInt WorkerId() 
        {
        return iWorkerId; 
        }

    TInt Error() 
        {
        return iError; 
        }

    virtual const TDesC& Name() = 0;
    
    void StartL() 
        {
        TThreadFunction function = CWorker::ThreadFunction; 
        
        User::LeaveIfError( iThread.Create( Name(), function, 2048, 0, this ) ); 
         
        iActive = ETrue;
        iThread.Resume(); 
        // Go thread go
    }
    
    void Finish() 
        {
        iActive = EFalse;
        DoCancel(); 
        iSemaphore.Wait(); 
        iThread.Close(); 
        }
    
protected: 

    /**
     * For doing any required preparations in the original thread. 
     */
    virtual void DoPrepareL() = 0;
    virtual void DoCancel() = 0;
    virtual void DoRunL() = 0;

    void RunL() 
        {
        CActiveScheduler* scheduler = new (ELeave) CActiveScheduler();
        CleanupStack::PushL(scheduler);
        CActiveScheduler::Install(scheduler);

        TRAP(iError, DoRunL());
        iSemaphore.Signal(); 
        
        CleanupStack::PopAndDestroy(scheduler);
        }
    
public: 
    
    static TInt ThreadFunction(void *aThis) 
        {
        CTrapCleanup* cleanup = CTrapCleanup::New();
    
        TRAP_IGNORE
            ( 
            (reinterpret_cast<CWorker*>(aThis))->RunL();
            ); 

        delete cleanup;
        return KErrNone;
        }
    
private: 
    
    TInt iWorkerId;     
    TInt iError; 
    static TInt iNextFreeWorkerId;  
    TBool iActive; 
    RThread iThread; 
    RSemaphore iSemaphore; 
    };


TInt CWorker::iNextFreeWorkerId = 0;  

/**
 * Returns random numbers so, that small numbers
 * are most common. Used to approximate the distribution
 * of real world words. In the distribution the probability that n is 
 * one of the numbers following i is 9 times as probable as that n is i. 
 * 
 * NOTE: Doesn't really follow the real distribution of words, because
 *       the word distribution follows Zipf's law and is not logarithmic.
 *       These things won't probably matter too much in any case. 
 */
TInt LogRand( TInt aDeviation ) 
    {
    TInt rv = 0; 
    while ( Math::Random() % aDeviation ) rv++;
    return rv; 
    }

/**
 * Worker, which continuously adds items to the index. Operates on its own
 * thread. 
 */
class CIndexingWorker : public CWorker
    {
public:
    CIndexingWorker() 
    :   iStop( EFalse ),
        iNextFreeUid( 0 ),
        iQualAppClass( 0 ),
        iAppClass( 0 ) 
        {
        }
    
    static CIndexingWorker* NewL( const TDesC& aAppClass ) 
        {
        CIndexingWorker* self = new ( ELeave ) CIndexingWorker(); 
        CleanupStack::PushL( self ); 
        self->ConstructL( aAppClass ); 
        CleanupStack::Pop( self ); 
        return self; 
        }
     
    void ConstructL( const TDesC& aQualAppClass ) 
        {
        CWorker::ConstructL(); 
        iQualAppClass = aQualAppClass.AllocL();
        iAppClass = aQualAppClass.Right( aQualAppClass.Length() - aQualAppClass.Find( _L( ":" ) ) - 1 ).AllocL();
        
        iName = HBufC::NewL( 512 );
        iName->Des().Append( _L( "indexer") ); 
        iName->Des().AppendNum( WorkerId() ); 
        iName->Des().Append( _L( "_") ); 
        iName->Des().Append( *iAppClass ); 
        }

    ~CIndexingWorker() 
        {
        delete iName; 
        delete iAppClass;
        delete iQualAppClass; 
        }
    
public: // From Cworker

    virtual void DoPrepareL() {
        iStop = EFalse; 
    }

    virtual void DoCancel() {
        iStop = ETrue; 
    }
    
    virtual void DoRunL() 
        {
        RSearchServerSession session;
        User::LeaveIfError( session.Connect() );
        CleanupClosePushL( session ); 
        CCPixIndexer* indexer = CCPixIndexer::NewLC( session );
        indexer->OpenDatabaseL( *iQualAppClass );
        
        HBufC* buf = HBufC::NewLC( 1024*8 ); 
        
        while ( !iStop ) 
            {
            TBuf<64> docuid; 
            docuid.Append( _L("iw") ); 
            docuid.AppendNum( WorkerId() ); 
            docuid.Append( _L("_d") );  
            docuid.AppendNum( iNextFreeUid++ ); 

            CSearchDocument* doc = CSearchDocument::NewLC( docuid,
                                                           *iAppClass );
            
            buf->Des().Zero(); 
            TInt n = LogRand( KTermCountDeviation ) + 1; 
            for ( TInt i = 0; i < n; i++ ) 
                {
                buf->Des().AppendNum( LogRand( KTermDeviation ) ); 
                if ( i + 1 < n ) 
                    {
                    buf->Des().Append( _L( " " ) );
                    }
                }
                
            doc->AddFieldL( KContentsField, *buf, CDocumentField::EStoreYes | 
                                                  CDocumentField::EIndexTokenized | 
                                                  CDocumentField::EAggregateNo ); 
            doc->AddExcerptL( *buf ); 
            
            indexer->AddL( *doc ); 
            
            CleanupStack::PopAndDestroy( doc ); 
            }
        CleanupStack::PopAndDestroy( buf ); 
        CleanupStack::PopAndDestroy( indexer ); 
        CleanupStack::PopAndDestroy(); // session 
        }

    virtual const TDesC& Name() {
        return *iName; 
    }
    
private: 
    
    TBool iStop; 
    
    TInt iNextFreeUid; 

    HBufC* iQualAppClass;

    HBufC* iAppClass;

    HBufC* iName;

    };


/**
 * A worker that commits normal searches to the given  search domain. 
 * Worker operates within its own thread.  
 */
class CSearchingWorker : public CWorker
    {
public: 
    CSearchingWorker() 
    :   iStop( EFalse ),
        iAppClass( 0 ), 
        iName( 0 )
        {
        }
    
    static CSearchingWorker* NewL( const TDesC& aAppClass ) 
        {
        CSearchingWorker* self = new ( ELeave ) CSearchingWorker(); 
        CleanupStack::PushL( self ); 
        self->ConstructL( aAppClass ); 
        CleanupStack::Pop( self ); 
        return self; 
        }

    
    void ConstructL( const TDesC& aAppClass ) 
        {   
        CWorker::ConstructL(); 
        
        iAppClass = aAppClass.AllocL(); 

        iName = HBufC::NewL( 512 );
        iName->Des().Append( _L( "searcher") ); 
        iName->Des().AppendNum( WorkerId() ); 
/*      iName->Des().Append( _L( "_") ); 
        iName->Des().Append( aAppClass ); */
        }
    
    ~CSearchingWorker() 
        {
        delete iName;
        delete iAppClass; 
        }

public: // From Cworker

    virtual void DoPrepareL() 
        {
        iStop = EFalse; 
        }

    virtual void DoCancel() 
        {
        iStop = ETrue; 
        }
    
    virtual void DoRunL() 
        {
        RSearchServerSession session;
        User::LeaveIfError( session.Connect() );
        CleanupClosePushL( session ); 
        CCPixSearcher* searcher = CCPixSearcher::NewLC( session, KContentsField );
        
        searcher->OpenDatabaseL( *iAppClass );
        
        while ( !iStop ) 
            {
            TBuf<64> num; 
            num.AppendNum( LogRand( KTermDeviation ) );  
            TInt n = searcher->SearchL( num ); 
            for ( TInt i = 0; i < n; i++ ) 
                {
                TRAP_IGNORE( delete searcher->GetDocumentL( i++ ); ); 
                }
            }
        CleanupStack::PopAndDestroy( searcher ); // session
        CleanupStack::PopAndDestroy(); // session
        }
        
    virtual const TDesC& Name() 
        {
        return *iName; 
        }
    
private:
    
    TBool iStop; 
    HBufC* iAppClass; 
    HBufC* iName; 
    };


void CTestMultiThreading::setUp() 
    {
    }

void CTestMultiThreading::tearDown() 
    {
    }

void CTestMultiThreading::PrepareL() 
    {
    RSearchServerSession session;
    User::LeaveIfError( session.Connect() ); 
    CleanupClosePushL( session ); 
    
    // First database has one reader and one or two searchers, what 
    // is something that can be expected in the actual device
    User::LeaveIfError
        (
        session.DefineVolume( _L( "@df:root test expected" ), 
                              _L( "c:\\data\\indexing\\indexdb\\root\\test\\expected" ) )
        ); 
    
    // Second database has one reader and one or two searchers, what 
    // is quite uncommon example and mainly used to stress the search server
    User::LeaveIfError
        (
        session.DefineVolume( _L( "@df:root test busy" ),
                              _L( "c:\\data\\indexing\\indexdb\\root\\test\\busy" ) )
        );
        
    CCPixIndexer* indexer = CCPixIndexer::NewLC( session );
    indexer->OpenDatabaseL( _L( "@df:root test expected" ) );
    indexer->ResetL(); 
    indexer->OpenDatabaseL( _L( "@df:root test busy" ) );
    indexer->ResetL(); 
    CleanupStack::PopAndDestroy( indexer); 
    CleanupStack::PopAndDestroy(); // session  

    // 
    // Indexers
    // 
    
    // Indexer with one's own index
    iWorkers->Append( CIndexingWorker::NewL( _L( "@df:root test expected" ) ) );
    
    // 3 indexers in the same index
    iWorkers->Append( CIndexingWorker::NewL( _L( "@df:root test busy" ) ) );
    iWorkers->Append( CIndexingWorker::NewL( _L( "@df:root test busy" ) ) );
    iWorkers->Append( CIndexingWorker::NewL( _L( "@df:root test busy" ) ) );
    
    // = 4 indexers in total 
    
    // 
    // Searchers
    // 
    
    // Multisearcher
    iWorkers->Append( CSearchingWorker::NewL( _L( "root test" ) ) );

    // Search qualified index
    iWorkers->Append( CSearchingWorker::NewL( _L( "@df:root test expected") ) );

    // Search big the busy index
    iWorkers->Append( CSearchingWorker::NewL( _L( "@df:root test busy" ) ) );
    iWorkers->Append( CSearchingWorker::NewL( _L( "@df:root test busy" ) ) );
    iWorkers->Append( CSearchingWorker::NewL( _L( "root test busy" ) ) );
    
    // = 5 searchers in total
    }

void CTestMultiThreading::Wait(TInt aSeconds) 
    {
    RTimer timer; 
    TRequestStatus timerStatus; 
    timer.CreateLocal();        
    CleanupClosePushL( timer ); 
    
    timer.After( timerStatus, aSeconds * 1000 * 1000 ); 
    User::WaitForRequest(timerStatus); 
    
    CleanupStack::PopAndDestroy(); // timer 
    }

void CTestMultiThreading::RunL() 
    {
    for ( TInt i = 0; i < iWorkers->Count(); i++ ) 
        {
        (*iWorkers)[i]->StartL(); 
        }
    
    Wait( 5 ); // Wait for 5 seconds
    
    for ( TInt i = 0; i < iWorkers->Count(); i++ ) 
        {
        (*iWorkers)[i]->Finish(); 
        if ( (*iWorkers)[i]->Error() != KErrNone ) 
            {
            TBuf8<512> msg; 
            const TDesC& name = (*iWorkers)[i]->Name();
            for ( TInt c = 0; c < name.Length(); c++ ) {
                msg.Append( name[c] ); 
            }
            msg.Append( _L8( " failed with" ) );
            msg.AppendNum( (*iWorkers)[i]->Error() );
            //TS_FAIL( reinterpret_cast<const char*>( msg.PtrZ() ) ); 
            }
        }
    }

void CTestMultiThreading::Finish() 
    {
    iWorkers->ResetAndDestroy();
    }

void CTestMultiThreading::DoTestMultiThreadingL() 
    {
    iWorkers = new ( ELeave ) RPointerArray<CWorker>(); 
    CleanupStack::PushL( iWorkers ); 
    CleanupClosePushL( *iWorkers ); 
    PrepareL(); 
    TRAPD( err, RunL() ); 
    Finish(); 
    User::LeaveIfError( err ); 
    CleanupStack::PopAndDestroy( iWorkers ); 
    CleanupStack::PopAndDestroy(); // iWorkers  
    }