searchengine/cpix/tsrc/cpixunittest/src/asynctests.cpp
author hgs
Mon, 09 Aug 2010 10:51:30 +0530
changeset 14 8bd192d47aaa
parent 3 ae3f1779f6da
child 24 65456528cac2
permissions -rw-r--r--
201031

/*
* Copyright (c) 2010 Nokia Corporation and/or its subsidiary(-ies).
* All rights reserved.
* This component and the accompanying materials are made available
* under the terms of "Eclipse Public License v1.0"
* which accompanies this distribution, and is available
* at the URL "http://www.eclipse.org/legal/epl-v10.html".
*
* Initial Contributors:
* Nokia Corporation - initial contribution.
*
* Contributors:
*
* Description: 
*
*/

#include <wchar.h>
#include <stddef.h>
#include <unistd.h>

#include <pthread.h>

#include <algorithm>
#include <iostream>
#include <string>

#include "cpixtools.h"

#include "itk.h"

#include "cpix_async.h"

#include "config.h"
#include "suggestion.h"
#include "testutils.h"
#include "testcorpus.h"
#include "setupsentry.h"
#include "testutils.h"
#include "std_log_result.h"

// TODO PROPER, EXAMPLARY error clearing (cpix_ClearError())


bool AddField(cpix_Document * doc,
              const wchar_t * name,
              const wchar_t * value,
              int             flags)
{
    bool
        rv = false;

    cpix_Field
        field;

    cpix_Field_initialize(&field,
                          name,
                          value,
                          flags);

    if (cpix_Succeeded(&field))
        {
            cpix_Document_add(doc,
                              &field);

            if (cpix_Succeeded(doc))
                {
                    rv = true;
                }
            else
                {
                    cpix_Field_release(&field);
                }
        }

    return rv;
}


cpix_Document * CreateSmsDoc(Itk::TestMgr  * testMgr,
                             size_t          docUid,
                             const wchar_t * body,
                             Cpt::Mutex    & cpixMutex)
{
    using namespace std;

    wstring 
        docUidStr = GetItemId(docUid); 
    cpix_Result
        result;

    Cpt::SyncRegion
        sr(cpixMutex);

    cpix_Document
        * doc = cpix_Document_create(&result,
                                     docUidStr.c_str(),
                                     SMSAPPCLASS,
                                     body,
                                     NULL);
    ITK_ASSERT(testMgr,
               cpix_Succeeded(&result),
               "Failed to create document for SMS");

    bool
        success = AddField(doc,
                           LTO_FIELD,
                          L"+3585554444",
                          cpix_STORE_YES | cpix_INDEX_UNTOKENIZED);

    if (success)
        {
            success = AddField(doc,
                               LBODY_FIELD,
                               body,
                               cpix_STORE_NO | cpix_INDEX_TOKENIZED);
        }

    if (!success)
        {
            cpix_Document_destroy(doc);

            ITK_ASSERT(testMgr,
                       false,
                       "Could not add field(s) to SMS doc");
        }

    return doc;
}
                              

// TODO these SyncedAdd operations must evolve to lock on Itk::TestMgr
// * instance (or have a synced Itk::TestMgr derivant class)



/**
 * Reports the CPIX error if the last operation on cpix_Obj
 * failed. Also clears the error status on cpix_Obj (necessary for
 * error info lifetime mgmt).
 *
 * @return if the last operation cpix_Obj was successful, that is,
 * it's return valus is equivalent to calling
 * cpix_Succeeded(cpix_Obj). (But note, that this call clears the
 * error status, thus a cpix_Succeeded(cpix_Obj) call after a call to
 * this function will alsways return true).
 */
template <typename CPIX_OBJ>
bool ReportIfFailed(Itk::TestMgr * testMgr,
                    CPIX_OBJ     * cpix_Obj,
                    Cpt::Mutex   & cpixMutex)
{
    bool
        succeeded = true;

    if (cpix_Failed(cpix_Obj))
        {
            succeeded = false;

            wchar_t
                buffer[256];

            Cpt::SyncRegion
                sr(cpixMutex);

            cpix_Error_report(cpix_Obj->err_,
                              buffer,
                              sizeof(buffer) / sizeof(wchar_t));
            ITK_MSG(testMgr,
                    "CPIX failure: %S",
                    buffer);
            cpix_ClearError(cpix_Obj);
        }

    return succeeded;
}



/**
 * Adds an SMS item to an index using the async API, but it is used as
 * a sync API here.
 */
class SyncedAdd
{
    Cpt::SyncQueue<cpix_JobId>   q_;

    Cpt::Mutex                 & cpixMutex_;

public:
    
    SyncedAdd(Cpt::Mutex & cpixMutex)
        : q_(1),
          cpixMutex_(cpixMutex)
    {
        ;
    }


    void call(cpix_IdxDb    * idxDb,
              cpix_Document * doc,
              cpix_Analyzer * analyzer,
              Itk::TestMgr  * testMgr)
    {
        cpix_JobId
            jobId;

        {
            Cpt::SyncRegion
                sr(cpixMutex_);

            jobId = cpix_IdxDb_asyncAdd(idxDb,
                                        doc,
                                        analyzer,
                                        this,
                                        &callback);
        }

        ITK_ASSERT(testMgr,
                   ReportIfFailed(testMgr,
                                  idxDb,
                                  cpixMutex_),
                   "Making async call failed");

        cpix_JobId
            jobId2;

        bool 
            couldGet = q_.get(&jobId2);
        
        ITK_ASSERT(testMgr,
                   jobId == jobId2,
                   "Messed up callback!");

        if (couldGet)
            {
                {
                    Cpt::SyncRegion
                        sr(cpixMutex_);
                    
                    cpix_IdxDb_asyncAddResults(idxDb,
                                               jobId);
                }

                ITK_EXPECT(testMgr,
                           ReportIfFailed(testMgr,
                                          idxDb,
                                          cpixMutex_),
                           "Synced addition failed");

                if (cpix_Succeeded(idxDb))
                    {
                        ITK_MSG(testMgr,
                                "Indexed");
                    }
            }
        else
            {
                ITK_ASSERT(testMgr,
                           false,
                           "Could not get result from async call");
            }
    }
     

private:
    static void callback(void       * cookie,
                         cpix_JobId   jobId)
    {
        SyncedAdd
            * this_ = reinterpret_cast<SyncedAdd*>(cookie);

        this_->q_.put(jobId);
    }
};



/**
 * Deletes an SMS item from an index using the async API, but it is used as
 * a sync API here.
 */
class SyncedDel
{
    Cpt::SyncQueue<cpix_JobId>   q_;
    Cpt::Mutex                 & cpixMutex_;


public:
    
    SyncedDel(Cpt::Mutex & cpixMutex)
        : q_(1),
          cpixMutex_(cpixMutex)
    {
        ;
    }


    void call(cpix_IdxDb    * idxDb,
              const wchar_t * docUid,
              Itk::TestMgr  * testMgr)
    {
        cpix_JobId
            jobId;

        {
            Cpt::SyncRegion
                sr(cpixMutex_);

            jobId = cpix_IdxDb_asyncDeleteDocuments(idxDb,
                                                    docUid,
                                                    this,
                                                    &callback);
        }

        ITK_ASSERT(testMgr,
                   ReportIfFailed(testMgr,
                                  idxDb,
                                  cpixMutex_),
                   "could not make async call");

        cpix_JobId
            jobId2;

        bool
            couldGet = q_.get(&jobId2);

        ITK_ASSERT(testMgr,
                   jobId == jobId2,
                   "Messed up callback");

        if (couldGet)
            {
                int32_t
                    rv;

                {
                    Cpt::SyncRegion
                        sr(cpixMutex_);

                    rv = cpix_IdxDb_asyncDeleteDocumentsResults(idxDb,
                                                                jobId);
                }

                ITK_EXPECT(testMgr,
                           ReportIfFailed(testMgr,
                                          idxDb,
                                          cpixMutex_),
                           "Synced delete failed");

                if (cpix_Succeeded(idxDb))
                    {
                        ITK_MSG(testMgr,
                                "Deleted %d",
                                rv);
                    }
            }
        else
            {
                ITK_ASSERT(testMgr,
                           false,
                           "Could not get result from async call");
            }
    }
     

private:
    static void callback(void       * cookie,
                         cpix_JobId   jobId)
    {
        SyncedDel
            * this_ = reinterpret_cast<SyncedDel*>(cookie);

        this_->q_.put(jobId);
    }
};



/**
 * Updates an SMS item to an index using the async API, but it is used as
 * a sync API here.
 */
class SyncedUpdate
{
    Cpt::SyncQueue<cpix_JobId>   q_;

    Cpt::Mutex                 & cpixMutex_;

public:
    
    SyncedUpdate(Cpt::Mutex & cpixMutex)
        : q_(1),
          cpixMutex_(cpixMutex)
    {
        ;
    }


    void call(cpix_IdxDb    * idxDb,
              cpix_Document * doc,
              cpix_Analyzer * analyzer,
              Itk::TestMgr  * testMgr)
    {
        cpix_JobId
            jobId;

        {
            Cpt::SyncRegion
                sr(cpixMutex_);

            jobId = cpix_IdxDb_asyncUpdate(idxDb,
                                           doc,
                                           analyzer,
                                           this,
                                           &callback);
        }

        ITK_ASSERT(testMgr,
                   ReportIfFailed(testMgr,
                                  idxDb,
                                  cpixMutex_),
                   "Could not make async call");

        cpix_JobId
            jobId2;

        bool
            couldGet = q_.get(&jobId2);

        ITK_ASSERT(testMgr,
                   jobId == jobId2,
                   "Messed up callback");

        if (couldGet)
            {
                {
                    Cpt::SyncRegion
                        sr(cpixMutex_);
                    cpix_IdxDb_asyncUpdateResults(idxDb,
                                                  jobId);
                }
                    
                ITK_EXPECT(testMgr,
                           ReportIfFailed(testMgr,
                                          idxDb,
                                          cpixMutex_),
                           "Synced update failed");

                if (cpix_Succeeded(idxDb))
                    {
                        ITK_MSG(testMgr,
                                "Updated");
                    }
            }
        else
            {
                ITK_ASSERT(testMgr,
                           false,
                           "Could not get result from async call");
            }
    }
     

private:
    static void callback(void       * cookie,
                         cpix_JobId   jobId)
    {
        SyncedUpdate
            * this_ = reinterpret_cast<SyncedUpdate*>(cookie);

        this_->q_.put(jobId);
    }
};



class SyncedHousekeep
{
    Cpt::SyncQueue<cpix_JobId>   q_;
    Cpt::Mutex                 & cpixMutex_;


public:
    SyncedHousekeep(Cpt::Mutex & cpixMutex)
        : q_(1),
          cpixMutex_(cpixMutex)
    {
        ;
    }


    void call(Itk::TestMgr   * testMgr)
    {
        cpix_JobId
            jobId;

        cpix_Result
            result;

        {
            Cpt::SyncRegion
                sr(cpixMutex_);

            jobId = cpix_asyncDoHousekeeping(&result,
                                             this,
                                             &callback);
        }

        ITK_ASSERT(testMgr,
                   ReportIfFailed(testMgr,
                                  &result,
                                  cpixMutex_),
                   "Performing async housekeeping failed");

        cpix_JobId
            jobId2;

        bool
            couldGet = q_.get(&jobId2);

        ITK_ASSERT(testMgr,
                   jobId == jobId2,
                   "Messed up callback!");

        if (couldGet)
            {
                {
                    Cpt::SyncRegion
                        sr(cpixMutex_);

                    cpix_asyncDoHousekeepingResults(&result,
                                                    jobId);

                    ITK_EXPECT(testMgr,
                               ReportIfFailed(testMgr,
                                              &result,
                                              cpixMutex_),
                               "Synced doHousekeeping failed");

                    if (cpix_Succeeded(&result))
                        {
                            ITK_MSG(testMgr,
                                    "Housekeeping done");
                        }
                }
            }
        else
            {
                ITK_ASSERT(testMgr,
                           false,
                           "Could not get result from async call");
            }
    }



private:
    static void callback(void               * cookie,
                         cpix_JobId           jobId)
    {
        SyncedHousekeep
            * this_ = reinterpret_cast<SyncedHousekeep*>(cookie);

        this_->q_.put(jobId);
    }


};



class SyncedDoc
{
    Cpt::SyncQueue<cpix_JobId>   q_;

    Cpt::Mutex                 & cpixMutex_;

public:
    SyncedDoc(Cpt::Mutex & cpixMutex)
        : q_(1),
          cpixMutex_(cpixMutex)
    {
        ;
    }


    bool call(cpix_Hits     * hits,
              int32_t         index,
              cpix_Document ** target,
              Itk::TestMgr  * testMgr,
              bool            cancel)
    {
        bool
            rv = false;

        cpix_JobId
            jobId;

        {
            Cpt::SyncRegion
                sr(cpixMutex_);

            jobId = cpix_Hits_asyncDoc(hits,
                                       index,
                                       target,
                                       this,
                                       &callback,
                                       1);
        }
        
        ITK_ASSERT(testMgr,
                   ReportIfFailed(testMgr,
                                  hits,
                                  cpixMutex_),
                   "Could not make async doc retrieval");

        if (cancel)
            {
                cpix_CancelAction
                    cancelAction = cpix_CANCEL_ERROR;

                {
                    Cpt::SyncRegion
                        sr(cpixMutex_);
                    
                    cancelAction = cpix_Hits_cancelDoc(hits,
                                                       jobId);
                }

                ReportIfFailed(testMgr,
                               hits,
                               cpixMutex_);

                ITK_MSG(testMgr,
                        "Doc retrieval CANCELLED (%d)",
                        cancelAction);

                // !!! Even if we did call cancel(), the callback
                //     might have hitted us before the cancel()
                //     returned.  we have to clear the status, but we
                //     must not collect any results.
                if (q_.full())
                    {
                        ITK_MSG(testMgr,
                                "Clearing cancelled callback status...\n");
                        cpix_JobId
                            jobId2;
                        
                        bool
                            couldGet = q_.get(&jobId2);
                        
                        ITK_ASSERT(testMgr,
                                   jobId = jobId2,
                                   "Messed up callback");
                        ITK_MSG(testMgr,
                                "... clearing cancelled callback status done\n");
                    }
            }
        else
            {
                cpix_JobId
                    jobId2;

                bool
                    couldGet = q_.get(&jobId2);

                ITK_ASSERT(testMgr,
                           jobId == jobId2,
                           "Messed up callback");

                if (couldGet)
                    {
                        {
                            Cpt::SyncRegion
                                sr(cpixMutex_);
                            
                            cpix_Hits_asyncDocResults(hits,
                                                      jobId);
                        }

                        ITK_EXPECT(testMgr,
                                   ReportIfFailed(testMgr,
                                                  hits,
                                                  cpixMutex_),
                                   "Synced doc retrieval failed");

                        if (cpix_Succeeded(hits))
                            {
                                ITK_MSG(testMgr,
                                        "Doc is retrieved");
                                rv = true;
                            }
                    }
                else
                    {
                        // not assert: this can happen
                        ITK_MSG(testMgr,
                                false,
                                "Could not get result from async hit doc");
                    }
            }

        return rv;
    }

private:
    static void callback(void       * cookie,
                         cpix_JobId   jobId)
    {
        SyncedDoc
            * this_ = reinterpret_cast<SyncedDoc*>(cookie);

        this_->q_.put(jobId);
    }

};





class SyncedSearch
{
    Cpt::SyncQueue<cpix_JobId>    q_;

    SyncedDoc                     syncedDoc_;

    cpix_Document                 **targetDoc_;

    Cpt::Mutex                  & cpixMutex_;

public:
    
    SyncedSearch(Cpt::Mutex & cpixMutex)
        : q_(1),
          syncedDoc_(cpixMutex),
          cpixMutex_(cpixMutex)
    {
        ALLOC_DOC(targetDoc_, 1);
    }
    
    ~SyncedSearch() {
        FREE_DOC(targetDoc_, 1);
    }
    
    void call(cpix_IdxSearcher  * searcher,
              cpix_Query        * query,
              Itk::TestMgr      * testMgr,
              bool                cancel)
    {
        // cancel = false;

        cpix_JobId
            jobId;

        {
            Cpt::SyncRegion
                sr(cpixMutex_);

            jobId = cpix_IdxSearcher_asyncSearch(searcher,
                                                 query,
                                                 this,
                                                 &callback);
        }

        ITK_ASSERT(testMgr,
                   ReportIfFailed(testMgr,
                                  searcher,
                                  cpixMutex_),
                   "Could not make async call");

        if (cancel)
            {
                cpix_CancelAction
                    cancelAction = cpix_CANCEL_ERROR;

                {
                    Cpt::SyncRegion
                        sr(cpixMutex_);

                    cancelAction = cpix_IdxSearcher_cancelSearch(searcher,
                                                                 jobId);
                }

                ReportIfFailed(testMgr,
                               searcher,
                               cpixMutex_);
                ITK_MSG(testMgr,
                        "Search/terms query CANCELLED (%d)",
                        cancelAction);

                // !!! Even if we did call cancel(), the callback
                //     might have hitted us before the cancel()
                //     returned.  we have to clear the status, but we
                //     must not collect any results.
                if (q_.full())
                    {
                        ITK_MSG(testMgr,
                                "Clearing cancelled callback status...\n");
                        cpix_JobId
                            jobId2;
                        
                        bool
                            couldGet = q_.get(&jobId2);
                        
                        ITK_ASSERT(testMgr,
                                   jobId = jobId2,
                                   "Messed up callback");
                        ITK_MSG(testMgr,
                                "... clearing cancelled callback status done\n");
                    }
            }
        else
            {

                cpix_JobId
                    jobId2;

                bool
                    couldGet = q_.get(&jobId2);

                ITK_ASSERT(testMgr,
                           jobId = jobId2,
                           "Messed up callback");

                if (couldGet)
                    {
                        cpix_Hits
                            * hits = NULL;

                        {
                            Cpt::SyncRegion
                                sr(cpixMutex_);
                            
                            hits = cpix_IdxSearcher_asyncSearchResults(searcher,
                                                                       jobId);
                        }

                        ITK_EXPECT(testMgr,
                                   ReportIfFailed(testMgr,
                                                  searcher,
                                                  cpixMutex_),
                                   "Synced search failed");

                        if (cpix_Succeeded(searcher))
                            {
                                ITK_MSG(testMgr,
                                        "Search/terms query SUCCEEDED");

                                printSome(hits,
                                          testMgr);
                            }

                        {
                            Cpt::SyncRegion
                                sr(cpixMutex_);
                            cpix_Hits_destroy(hits);
                        }
                    }
                else
                    {
                        ITK_ASSERT(testMgr,
                                   false,
                                   "Could not get result from async call");
                    }
            }
    }
     

private:

    
    void printSome(cpix_Hits    * hits,
                   Itk::TestMgr * testMgr)
    {
        int32_t
            length;

        {
            Cpt::SyncRegion
                sr(cpixMutex_);

            length = cpix_Hits_length(hits);
        }

        ITK_MSG(testMgr,
                "Num of hits: %d",
                length);

        enum 
        { 
            // The setup of havig SOME=10 and cancelling doc retrieval
            // operations for doc indexes on the condition "(i%4) ==
            // 1" is special because this way
            //
            // (a) sooner or later there is going to be an indexing
            //     operation during doc enumeration, invalidating the
            //     hits
            //
            // (b) the last doc retrieval (index 9) is cancelled,
            //     after which this hits is destroyed
            //
            // (c) this situation previously has crashed CPix
            SOME = 10
        };

        for (int32_t i = 0; i < SOME && i < length; ++i)
            {
                bool
                    succeeded = syncedDoc_.call(hits,
                                                i,
                                                targetDoc_,
                                                testMgr,
                                                (i%4) == 1); // cancel or not

                if (succeeded)
                    {
                        Cpt::SyncRegion
                            sr(cpixMutex_);

                        PrintHit(targetDoc_[0],
                                 testMgr);
                    }
            }
    }



    static void callback(void       * cookie,
                         cpix_JobId   jobId)
    {
        SyncedSearch
            * this_ = reinterpret_cast<SyncedSearch*>(cookie);

        this_->q_.put(jobId);
    }
    

};






class ASyncContext : public Itk::ITestContext
{
private:
    cpix_IdxDb       * idxDb1_;
    cpix_IdxDb       * idxDb2_;
    cpix_IdxSearcher * searcher1_;
    cpix_IdxSearcher * searcher2_;
    cpix_Analyzer    * analyzerI_; // for indexing thread
    cpix_Analyzer    * analyzerQ_; // for query thread
    cpix_QueryParser * queryParser_;
    cpix_Query       * searchQuery_;
    cpix_Query       * termsQuery_;
    
    LineTestCorpusRef  corpus_;

    /* OBS
    // TODO currently protecting both 
    //   (a) CPix (non-thread safe) and
    //   (b) Itk::TestMgr
    //
    // turn Itk::TestMgr into protected later, this tempMutex_ is to
    // be renamed, and it should protect individual calls (async and
    // sync alike) to cpix within these tests
    Cpt::Mutex         tempMutex_;
    */

    /**
     * CPix itself is to be invoked from a single thread (API is not
     * multithreaded). However, for sake of clarity, we test it from
     * multiple threads (indexing and searching client threads). So
     * they need to be synchronized - no two threads should be
     * executing an sync / async call at the same time.
     */
    Cpt::Mutex         cpixMutex_;

    bool               keepIndexing_;

    Itk::TestMgr     * testMgr_;

    /**
     * Statistics POV
     */
    struct Stats
    {
        std::string    name_;
        size_t         opCount_;
        long           opSumMs_;

        Stats(const char * name)
            : name_(name),
              opCount_(0),
              opSumMs_(0)
        {
            ;
        }


        void update(Cpt::StopperWatch & stopperWatch)
        {
            ++opCount_;
            opSumMs_ += stopperWatch.elapsedMSecs();
        }

        
        void output(Itk::TestMgr * testMgr)
        {
            if (opCount_ > 0)
                {
                    ITK_REPORT(testMgr,
                               name_.c_str(),
                               "%ld ops in %ld ms (%.3f ms/op)",
                               opCount_,
                               opSumMs_,
                               static_cast<double>(opSumMs_) 
                               / static_cast<double>(opCount_));
                }
            else
                {
                    ITK_REPORT(testMgr,
                               name_.c_str(),
                               "%ld ops in %ld ms (### ms/op)",
                               opCount_,
                               opSumMs_);
                }
        }


        void reset()
        {
            opCount_ = 0;
            opSumMs_ = 0;
        }
    };


    // statistics for indexing
    Stats              addStats_;
    Stats              delStats_;
    Stats              updateStats_;

    // statistics for search
    Stats              searchStats_;
    Stats              termsStats_;

public:
    virtual void setup() throw (Itk::PanicExc)
    {
        SetupSentry
            ss(*this);

        cpix_Result
            result;

        cpix_IdxDb_dbgScrapAll(&result);
        if (cpix_Failed(&result))
            {
                ITK_PANIC("Could not dbg scrapp all indexes");
            }

        const Volume
            *p = Volumes;

        for (; p->qbac_ != NULL; ++p)
            {
                cpix_IdxDb_defineVolume(&result,
                                        p->qbac_,
                                        p->path_);

                if (cpix_Failed(&result))
                    {
                        ITK_PANIC("Could not define volume '%s'",
                                  p->qbac_);
                    }
            }

        cpix_IdxDb_defineVolume(&result,
                                SMS_QBASEAPPCLASS,
                                NULL);

        if (cpix_Failed(&result))
            {
                ITK_PANIC("Could not define index for SMS");
            }

        idxDb1_ = cpix_IdxDb_openDb(&result,
                                    SMS_QBASEAPPCLASS,
                                    cpix_IDX_CREATE);
        if (cpix_Failed(&result))
            {
                ITK_PANIC("Could not open(create) index for SMS");
            }

        static const char
            sms2Qbac[] = SMS_QBASEAPPCLASS "2";

        cpix_IdxDb_defineVolume(&result,
                                sms2Qbac,
                                NULL);

        if (cpix_Failed(&result))
            {
                ITK_PANIC("Could not define index for SMS(2)");
            }

        idxDb2_ = cpix_IdxDb_openDb(&result,
                                    sms2Qbac,
                                    cpix_IDX_CREATE);
        if (cpix_Failed(&result))
            {
                ITK_PANIC("Could not open(create) index for SMS(2)");
            }

        searcher1_ = cpix_IdxSearcher_openDb(&result,
                                             "root msg");
        if (searcher1_ == NULL)
            {
                ITK_PANIC("Could not create searcher");
            }

        searcher2_ = cpix_IdxSearcher_openDb(&result,
                                             "root");
        if (searcher2_ == NULL)
            {
                ITK_PANIC("Could not create searcher");
            }
        
        analyzerI_ = cpix_CreateSimpleAnalyzer(&result);
        
        if (analyzerI_ == NULL)
            {
                ITK_PANIC("Could not create analyzer");
            }

        analyzerQ_ = cpix_CreateSimpleAnalyzer(&result);
        
        if (analyzerQ_ == NULL)
            {
                ITK_PANIC("Could not create analyzer");
            }

        queryParser_ = cpix_QueryParser_create(&result,
                                               LCPIX_DEFAULT_FIELD,
                                               analyzerQ_);
        if (queryParser_ == NULL)
            {
                ITK_PANIC("Could not create query parser");
            }
        
        searchQuery_ = cpix_QueryParser_parse(queryParser_,
                                              L"happy");
        if (cpix_Failed(queryParser_)
            || searchQuery_ == NULL)
            {
                ITK_PANIC("Could not parse search query");
            }

        cpix_QueryParser_destroy(queryParser_);
        queryParser_ = NULL;

        queryParser_ = cpix_QueryParser_create(&result,
                                               L"SecondaryTokens",
                                               analyzerQ_);
        if (queryParser_ == NULL)
            {
                ITK_PANIC("Could not create query parser");
            }

        termsQuery_ = cpix_QueryParser_parse(queryParser_,
                                             L"ha* OR _aggregate:happy");
        if (cpix_Failed(queryParser_)
            || termsQuery_ == NULL)
            {
                ITK_PANIC("Could not parse terms query");
            }

        ss.setupComplete();
    }


    virtual void tearDown() throw()
    {
        cleanup();
    }


    virtual ~ASyncContext()
    {
        cleanup();
    }


    //
    // public operations
    //
    ASyncContext()
        : idxDb1_(NULL),
          idxDb2_(NULL),
          searcher1_(NULL),
          searcher2_(NULL),
          analyzerI_(NULL),
          analyzerQ_(NULL),
          queryParser_(NULL),
          searchQuery_(NULL),
          termsQuery_(NULL),
          corpus_(DEFAULT_TEST_CORPUS_PATH),
          keepIndexing_(true),
          addStats_("adding"),
          delStats_("deleting"),
          updateStats_("updating"),
          searchStats_("searching"),
          termsStats_("term-qrying")
    {
        ;
    }


    
    void testSingleThreadIdx(Itk::TestMgr * testMgr)
    {
        char *xml_file = (char*)__FUNCTION__;
        assert_failed = 0;
        testMgr_ = testMgr;

        // cpix_setLogLevel(CPIX_LL_DEBUG);

        pthread_t
            indexerThreadHndl;

        void
            * rv;

        int
            result = pthread_create(&indexerThreadHndl,
                                    NULL,
                                    &IndexerThreadFunc,
                                    this);
        ITK_ASSERT(testMgr,
                   result == 0,
                   "Could not create POSIX thread for indexing");

        printf("MAIN: Created indexer thread\n");

        printf("MAIN: waiting for worker thread (indexer)\n");

        result = pthread_join(indexerThreadHndl,
                              &rv);
        
        ITK_EXPECT(testMgr,
                   result == 0,
                   "Could not join indexer thread");
        if(result != 0) 
            {
            assert_failed = 1;
            }
        testResultXml(xml_file);
        // cpix_setLogLevel(CPIX_LL_TRACE);

        printStatistics();
    }
    


    void testSingleThreadQry(Itk::TestMgr * testMgr)
    {
        char *xml_file = (char*)__FUNCTION__;
        assert_failed = 0;
        testMgr_ = testMgr;

        // cpix_setLogLevel(CPIX_LL_DEBUG);

        pthread_t
            searcherThreadHndl;

        void
            * rv;

        int
            result = pthread_create(&searcherThreadHndl,
                                    NULL,
                                    &SearcherThreadFunc,
                                    this);
        ITK_ASSERT(testMgr,
                   result == 0,
                   "Could not create POSIX thread for searching");

        printf("MAIN: Created searcher thread\n");

        printf("MAIN: waiting for worker thread (searcher)\n");

        result = pthread_join(searcherThreadHndl,
                              &rv);

        ITK_EXPECT(testMgr,
                   result == 0,
                   "Could not join searcher thread");
        if(result != 0) 
            {
            assert_failed = 1;
            }
        testResultXml(xml_file);
        // cpix_setLogLevel(CPIX_LL_TRACE);

        printStatistics();
    }



    void testMultiThreads(Itk::TestMgr * testMgr)
    {
        char *xml_file = (char*)__FUNCTION__;
        assert_failed = 0;
        cleanup();
        setup();

        testMgr_ = testMgr;

        // cpix_setLogLevel(CPIX_LL_DEBUG);

        pthread_t
            indexerThreadHndl,
            searcherThreadHndl;

        void
            * rv;

        int
            result = pthread_create(&indexerThreadHndl,
                                    NULL,
                                    &IndexerThreadFunc,
                                    this);
        ITK_ASSERT(testMgr,
                   result == 0,
                   "Could not create POSIX thread for indexing");

        printf("MAIN: Created indexer thread\n");

        result = pthread_create(&searcherThreadHndl,
                                NULL,
                                &SearcherThreadFunc,
                                this);

        if (result != 0)
            {
                keepIndexing_ = false;

                result = pthread_join(indexerThreadHndl,
                                      &rv);
                ITK_EXPECT(testMgr,
                           result == 0,
                           "Could not force closure of indexer thread");
                ITK_ASSERT(testMgr,
                           false,
                           "Could not create POSIX thread for searching");
            }


        printf("MAIN: Created searcher thread\n");
        
        printf("MAIN: waiting for both worker threads\n");

        result = pthread_join(indexerThreadHndl,
                              &rv);
        
        ITK_EXPECT(testMgr,
                   result == 0,
                   "Could not join indexer thread");
        if(result != 0) 
            {
            assert_failed = 1;
            }
        
        result = pthread_join(searcherThreadHndl,
                              &rv);

        ITK_EXPECT(testMgr,
                   result == 0,
                   "Could not join searcher thread");
        if(result != 0) 
            {
            assert_failed = 1;
            }
        testResultXml(xml_file);
        // cpix_setLogLevel(CPIX_LL_TRACE);

        printStatistics();
    }


private:
    void cleanup()
    {
        cpix_IdxDb_releaseDb(idxDb1_);
        idxDb1_ = NULL;

        cpix_IdxDb_releaseDb(idxDb2_);
        idxDb2_ = NULL;
        
        cpix_IdxSearcher_releaseDb(searcher1_);
        searcher1_ = NULL;

        cpix_IdxSearcher_releaseDb(searcher2_);
        searcher2_ = NULL;

        cpix_Analyzer_destroy(analyzerI_);
        analyzerI_ = NULL;

        cpix_Analyzer_destroy(analyzerQ_);
        analyzerQ_ = NULL;

        cpix_QueryParser_destroy(queryParser_);
        queryParser_ = NULL;

        cpix_Query_destroy(searchQuery_);
        searchQuery_ = NULL;

        cpix_Query_destroy(termsQuery_);
        termsQuery_ = NULL;
    }

    
    void printStatistics()
    {
        addStats_.output(testMgr_);
        delStats_.output(testMgr_);
        updateStats_.output(testMgr_);
        searchStats_.output(testMgr_);
        termsStats_.output(testMgr_);
    }


    void * indexStuff()
    {
        using namespace std;
        using namespace Cpt;

        enum 
        { ROUNDS        = 200, 
          SAMPLE_START  = 180,
          ACTION_PERIOD = 5,
        };

        Cpt::StopperWatch
            stopperWatch;

        SyncedAdd
            syncedAdd(cpixMutex_);
        SyncedDel
            syncedDel(cpixMutex_);
        SyncedUpdate
            syncedUpdate(cpixMutex_);
        SyncedHousekeep
            syncedHousekeep(cpixMutex_);

        try
            {
                int
                    curId = 0;
                
                for (size_t i = 0; i < ROUNDS && keepIndexing_; ++i)
                    {
                        if (i == SAMPLE_START)
                            {
                                addStats_.reset();
                                delStats_.reset();
                                updateStats_.reset();
                            }

                        ++curId;
                        std::wstring
                            body(corpus_.item(curId));
                        
                        stopperWatch.reset();

                        cpix_Document
                            * doc = CreateSmsDoc(testMgr_,
                                                 curId,
                                                 body.c_str(),
                                                 cpixMutex_);
                        
                        syncedAdd.call(idxDb1_,
                                       doc,
                                       analyzerI_,
                                       testMgr_);

                        {
                            Cpt::SyncRegion
                                sr(cpixMutex_);
                            cpix_Document_destroy(doc);
                            doc = NULL;
                        }

                        addStats_.update(stopperWatch);

                        stopperWatch.reset();

                        doc = CreateSmsDoc(testMgr_,
                                           curId,
                                           body.c_str(),
                                           cpixMutex_);
                        
                        syncedAdd.call(idxDb2_,
                                       doc,
                                       analyzerI_,
                                       testMgr_);

                        {
                            Cpt::SyncRegion
                                sr(cpixMutex_);
                            cpix_Document_destroy(doc);
                            doc = NULL;
                        }

                        addStats_.update(stopperWatch);

                        if (curId % ACTION_PERIOD == 0)
                            {
                                stopperWatch.reset();

                                wstring
                                    docUid = GetItemId(curId/ACTION_PERIOD);

                                syncedDel.call(idxDb1_,
                                               docUid.c_str(),
                                               testMgr_);

                                delStats_.update(stopperWatch);

                                stopperWatch.reset();

                                doc = CreateSmsDoc(testMgr_,
                                                   curId/ACTION_PERIOD,
                                                   L"happy bla",
                                                   cpixMutex_);

                                syncedUpdate.call(idxDb2_,
                                                  doc,
                                                  analyzerI_,
                                                  testMgr_);
                                
                                {
                                    Cpt::SyncRegion
                                        sr(cpixMutex_);
                                    cpix_Document_destroy(doc);
                                    doc = NULL;
                                }

                                updateStats_.update(stopperWatch);

                                ITK_DBGMSG(testMgr_, "+");

                                syncedHousekeep.call(testMgr_);
                            }
                        else
                            {
                                ITK_DBGMSG(testMgr_, ".");
                            }
                    }
            }
        catch (...)
            {
                assert_failed = 1;
                ITK_EXPECT(testMgr_,
                           false,
                           "INDEXER: Failed indexing\n");
                throw;
            }

        syncedHousekeep.call(testMgr_);

        return NULL;
    }


    static void * IndexerThreadFunc(void * p)
    {
        ASyncContext
            * thisPtr = reinterpret_cast<ASyncContext*>(p);
        return thisPtr->indexStuff();
    }


    void * searchStuff()
    {
        using namespace Cpt;

        sleep(43);

        // on emulator, this amount of search happens together with
        // indexing/deleting/updating, the rest runs alone, therefore
        // distoring statistics (we are interested in how slow
        // searching when (a) runs alone ("sq" test case) or when (b) runs mixed
        // with indexing ("mt" test case).
        enum 
        { 
            ROUNDS       = 21,
            SAMPLE_START = 1
        };

        Cpt::StopperWatch
            stopperWatch;

        SyncedSearch
            syncedSearch(cpixMutex_);

        try
            {
                for (size_t i = 0; i < ROUNDS; ++i)
                    {
                        if (i == SAMPLE_START)
                            {
                                searchStats_.reset();
                                termsStats_.reset();
                            }

                        stopperWatch.reset();
                            syncedSearch.call(searcher1_,
                                              searchQuery_,
                                              testMgr_,
                                              i%2 == 1);

                        searchStats_.update(stopperWatch);

                        stopperWatch.reset();
                            syncedSearch.call(searcher2_,
                                              termsQuery_,
                                              testMgr_,
                                              i%2 == 1);

                        termsStats_.update(stopperWatch);
                        
                        ITK_DBGMSG(testMgr_, ":");
                    }
            }
        catch (...)
            {
                assert_failed = 1;
                ITK_EXPECT(testMgr_,
                           false,
                           "SEARCHER: Failed searching\n");
                throw;
            }

        return NULL;
    }


    static void * SearcherThreadFunc(void * p)
    {
        ASyncContext
            * thisPtr = reinterpret_cast<ASyncContext*>(p);
        return thisPtr->searchStuff();
    }


};



Itk::TesterBase * CreateASyncTests()
{
    using namespace Itk;

    ASyncContext
        * asyncContext = new ASyncContext();
    ContextTester
        * contextTester = new ContextTester("async",
                                           asyncContext);

#define TEST "si"
    contextTester->add(TEST,
                       asyncContext,
                       &ASyncContext::testSingleThreadIdx,
                       TEST);
#undef TEST

#define TEST "sq"
    contextTester->add(TEST,
                       asyncContext,
                       &ASyncContext::testSingleThreadQry,
                       TEST,
                       "CANCELLED"); // cancellation may happen in
                                     // different ways
#undef TEST

#define TEST "mt"
    contextTester->add(TEST,
                       asyncContext,
                       &ASyncContext::testMultiThreads,
                       TEST,
                       SuiteTester::REDIRECT_ONLY);
#undef TEST

    return contextTester;
}