searchengine/cpix/tsrc/cpixunittest/src/asynctests.cpp
changeset 0 671dee74050a
child 3 ae3f1779f6da
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/searchengine/cpix/tsrc/cpixunittest/src/asynctests.cpp	Mon Apr 19 14:40:16 2010 +0300
@@ -0,0 +1,1670 @@
+/*
+* Copyright (c) 2010 Nokia Corporation and/or its subsidiary(-ies).
+* All rights reserved.
+* This component and the accompanying materials are made available
+* under the terms of "Eclipse Public License v1.0"
+* which accompanies this distribution, and is available
+* at the URL "http://www.eclipse.org/legal/epl-v10.html".
+*
+* Initial Contributors:
+* Nokia Corporation - initial contribution.
+*
+* Contributors:
+*
+* Description: 
+*
+*/
+
+#include <wchar.h>
+#include <stddef.h>
+#include <unistd.h>
+
+#include <pthread.h>
+
+#include <algorithm>
+#include <iostream>
+#include <string>
+
+#include "cpixtools.h"
+
+#include "itk.h"
+
+#include "cpix_async.h"
+
+#include "config.h"
+#include "suggestion.h"
+#include "testutils.h"
+#include "testcorpus.h"
+#include "setupsentry.h"
+
+
+// TODO PROPER, EXAMPLARY error clearing (cpix_ClearError())
+
+
+bool AddField(cpix_Document * doc,
+              const wchar_t * name,
+              const wchar_t * value,
+              int             flags)
+{
+    bool
+        rv = false;
+
+    cpix_Field
+        field;
+
+    cpix_Field_initialize(&field,
+                          name,
+                          value,
+                          flags);
+
+    if (cpix_Succeeded(&field))
+        {
+            cpix_Document_add(doc,
+                              &field);
+
+            if (cpix_Succeeded(doc))
+                {
+                    rv = true;
+                }
+            else
+                {
+                    cpix_Field_release(&field);
+                }
+        }
+
+    return rv;
+}
+
+
+cpix_Document * CreateSmsDoc(Itk::TestMgr  * testMgr,
+                             size_t          docUid,
+                             const wchar_t * body,
+                             Cpt::Mutex    & cpixMutex)
+{
+    using namespace std;
+
+    wstring 
+        docUidStr = GetItemId(docUid); 
+    cpix_Result
+        result;
+
+    Cpt::SyncRegion
+        sr(cpixMutex);
+
+    cpix_Document
+        * doc = cpix_Document_create(&result,
+                                     docUidStr.c_str(),
+                                     SMSAPPCLASS,
+                                     body,
+                                     NULL);
+    ITK_ASSERT(testMgr,
+               cpix_Succeeded(&result),
+               "Failed to create document for SMS");
+
+    bool
+        success = AddField(doc,
+                           LTO_FIELD,
+                          L"+3585554444",
+                          cpix_STORE_YES | cpix_INDEX_UNTOKENIZED);
+
+    if (success)
+        {
+            success = AddField(doc,
+                               LBODY_FIELD,
+                               body,
+                               cpix_STORE_NO | cpix_INDEX_TOKENIZED);
+        }
+
+    if (!success)
+        {
+            cpix_Document_destroy(doc);
+
+            ITK_ASSERT(testMgr,
+                       false,
+                       "Could not add field(s) to SMS doc");
+        }
+
+    return doc;
+}
+                              
+
+// TODO these SyncedAdd operations must evolve to lock on Itk::TestMgr
+// * instance (or have a synced Itk::TestMgr derivant class)
+
+
+
+/**
+ * Reports the CPIX error if the last operation on cpix_Obj
+ * failed. Also clears the error status on cpix_Obj (necessary for
+ * error info lifetime mgmt).
+ *
+ * @return if the last operation cpix_Obj was successful, that is,
+ * it's return valus is equivalent to calling
+ * cpix_Succeeded(cpix_Obj). (But note, that this call clears the
+ * error status, thus a cpix_Succeeded(cpix_Obj) call after a call to
+ * this function will alsways return true).
+ */
+template <typename CPIX_OBJ>
+bool ReportIfFailed(Itk::TestMgr * testMgr,
+                    CPIX_OBJ     * cpix_Obj,
+                    Cpt::Mutex   & cpixMutex)
+{
+    bool
+        succeeded = true;
+
+    if (cpix_Failed(cpix_Obj))
+        {
+            succeeded = false;
+
+            wchar_t
+                buffer[256];
+
+            Cpt::SyncRegion
+                sr(cpixMutex);
+
+            cpix_Error_report(cpix_Obj->err_,
+                              buffer,
+                              sizeof(buffer) / sizeof(wchar_t));
+            ITK_MSG(testMgr,
+                    "CPIX failure: %S",
+                    buffer);
+            cpix_ClearError(cpix_Obj);
+        }
+
+    return succeeded;
+}
+
+
+
+/**
+ * Adds an SMS item to an index using the async API, but it is used as
+ * a sync API here.
+ */
+class SyncedAdd
+{
+    Cpt::SyncQueue<cpix_JobId>   q_;
+
+    Cpt::Mutex                 & cpixMutex_;
+
+public:
+    
+    SyncedAdd(Cpt::Mutex & cpixMutex)
+        : q_(1),
+          cpixMutex_(cpixMutex)
+    {
+        ;
+    }
+
+
+    void call(cpix_IdxDb    * idxDb,
+              cpix_Document * doc,
+              cpix_Analyzer * analyzer,
+              Itk::TestMgr  * testMgr)
+    {
+        cpix_JobId
+            jobId;
+
+        {
+            Cpt::SyncRegion
+                sr(cpixMutex_);
+
+            jobId = cpix_IdxDb_asyncAdd(idxDb,
+                                        doc,
+                                        analyzer,
+                                        this,
+                                        &callback);
+        }
+
+        ITK_ASSERT(testMgr,
+                   ReportIfFailed(testMgr,
+                                  idxDb,
+                                  cpixMutex_),
+                   "Making async call failed");
+
+        cpix_JobId
+            jobId2;
+
+        bool 
+            couldGet = q_.get(&jobId2);
+        
+        ITK_ASSERT(testMgr,
+                   jobId == jobId2,
+                   "Messed up callback!");
+
+        if (couldGet)
+            {
+                {
+                    Cpt::SyncRegion
+                        sr(cpixMutex_);
+                    
+                    cpix_IdxDb_asyncAddResults(idxDb,
+                                               jobId);
+                }
+
+                ITK_EXPECT(testMgr,
+                           ReportIfFailed(testMgr,
+                                          idxDb,
+                                          cpixMutex_),
+                           "Synced addition failed");
+
+                if (cpix_Succeeded(idxDb))
+                    {
+                        ITK_MSG(testMgr,
+                                "Indexed");
+                    }
+            }
+        else
+            {
+                ITK_ASSERT(testMgr,
+                           false,
+                           "Could not get result from async call");
+            }
+    }
+     
+
+private:
+    static void callback(void       * cookie,
+                         cpix_JobId   jobId)
+    {
+        SyncedAdd
+            * this_ = reinterpret_cast<SyncedAdd*>(cookie);
+
+        this_->q_.put(jobId);
+    }
+};
+
+
+
+/**
+ * Deletes an SMS item from an index using the async API, but it is used as
+ * a sync API here.
+ */
+class SyncedDel
+{
+    Cpt::SyncQueue<cpix_JobId>   q_;
+    Cpt::Mutex                 & cpixMutex_;
+
+
+public:
+    
+    SyncedDel(Cpt::Mutex & cpixMutex)
+        : q_(1),
+          cpixMutex_(cpixMutex)
+    {
+        ;
+    }
+
+
+    void call(cpix_IdxDb    * idxDb,
+              const wchar_t * docUid,
+              Itk::TestMgr  * testMgr)
+    {
+        cpix_JobId
+            jobId;
+
+        {
+            Cpt::SyncRegion
+                sr(cpixMutex_);
+
+            jobId = cpix_IdxDb_asyncDeleteDocuments(idxDb,
+                                                    docUid,
+                                                    this,
+                                                    &callback);
+        }
+
+        ITK_ASSERT(testMgr,
+                   ReportIfFailed(testMgr,
+                                  idxDb,
+                                  cpixMutex_),
+                   "could not make async call");
+
+        cpix_JobId
+            jobId2;
+
+        bool
+            couldGet = q_.get(&jobId2);
+
+        ITK_ASSERT(testMgr,
+                   jobId == jobId2,
+                   "Messed up callback");
+
+        if (couldGet)
+            {
+                int32_t
+                    rv;
+
+                {
+                    Cpt::SyncRegion
+                        sr(cpixMutex_);
+
+                    rv = cpix_IdxDb_asyncDeleteDocumentsResults(idxDb,
+                                                                jobId);
+                }
+
+                ITK_EXPECT(testMgr,
+                           ReportIfFailed(testMgr,
+                                          idxDb,
+                                          cpixMutex_),
+                           "Synced delete failed");
+
+                if (cpix_Succeeded(idxDb))
+                    {
+                        ITK_MSG(testMgr,
+                                "Deleted %d",
+                                rv);
+                    }
+            }
+        else
+            {
+                ITK_ASSERT(testMgr,
+                           false,
+                           "Could not get result from async call");
+            }
+    }
+     
+
+private:
+    static void callback(void       * cookie,
+                         cpix_JobId   jobId)
+    {
+        SyncedDel
+            * this_ = reinterpret_cast<SyncedDel*>(cookie);
+
+        this_->q_.put(jobId);
+    }
+};
+
+
+
+/**
+ * Updates an SMS item to an index using the async API, but it is used as
+ * a sync API here.
+ */
+class SyncedUpdate
+{
+    Cpt::SyncQueue<cpix_JobId>   q_;
+
+    Cpt::Mutex                 & cpixMutex_;
+
+public:
+    
+    SyncedUpdate(Cpt::Mutex & cpixMutex)
+        : q_(1),
+          cpixMutex_(cpixMutex)
+    {
+        ;
+    }
+
+
+    void call(cpix_IdxDb    * idxDb,
+              cpix_Document * doc,
+              cpix_Analyzer * analyzer,
+              Itk::TestMgr  * testMgr)
+    {
+        cpix_JobId
+            jobId;
+
+        {
+            Cpt::SyncRegion
+                sr(cpixMutex_);
+
+            jobId = cpix_IdxDb_asyncUpdate(idxDb,
+                                           doc,
+                                           analyzer,
+                                           this,
+                                           &callback);
+        }
+
+        ITK_ASSERT(testMgr,
+                   ReportIfFailed(testMgr,
+                                  idxDb,
+                                  cpixMutex_),
+                   "Could not make async call");
+
+        cpix_JobId
+            jobId2;
+
+        bool
+            couldGet = q_.get(&jobId2);
+
+        ITK_ASSERT(testMgr,
+                   jobId == jobId2,
+                   "Messed up callback");
+
+        if (couldGet)
+            {
+                {
+                    Cpt::SyncRegion
+                        sr(cpixMutex_);
+                    cpix_IdxDb_asyncUpdateResults(idxDb,
+                                                  jobId);
+                }
+                    
+                ITK_EXPECT(testMgr,
+                           ReportIfFailed(testMgr,
+                                          idxDb,
+                                          cpixMutex_),
+                           "Synced update failed");
+
+                if (cpix_Succeeded(idxDb))
+                    {
+                        ITK_MSG(testMgr,
+                                "Updated");
+                    }
+            }
+        else
+            {
+                ITK_ASSERT(testMgr,
+                           false,
+                           "Could not get result from async call");
+            }
+    }
+     
+
+private:
+    static void callback(void       * cookie,
+                         cpix_JobId   jobId)
+    {
+        SyncedUpdate
+            * this_ = reinterpret_cast<SyncedUpdate*>(cookie);
+
+        this_->q_.put(jobId);
+    }
+};
+
+
+
+class SyncedHousekeep
+{
+    Cpt::SyncQueue<cpix_JobId>   q_;
+    Cpt::Mutex                 & cpixMutex_;
+
+
+public:
+    SyncedHousekeep(Cpt::Mutex & cpixMutex)
+        : q_(1),
+          cpixMutex_(cpixMutex)
+    {
+        ;
+    }
+
+
+    void call(Itk::TestMgr   * testMgr)
+    {
+        cpix_JobId
+            jobId;
+
+        cpix_Result
+            result;
+
+        {
+            Cpt::SyncRegion
+                sr(cpixMutex_);
+
+            jobId = cpix_asyncDoHousekeeping(&result,
+                                             this,
+                                             &callback);
+        }
+
+        ITK_ASSERT(testMgr,
+                   ReportIfFailed(testMgr,
+                                  &result,
+                                  cpixMutex_),
+                   "Performing async housekeeping failed");
+
+        cpix_JobId
+            jobId2;
+
+        bool
+            couldGet = q_.get(&jobId2);
+
+        ITK_ASSERT(testMgr,
+                   jobId == jobId2,
+                   "Messed up callback!");
+
+        if (couldGet)
+            {
+                {
+                    Cpt::SyncRegion
+                        sr(cpixMutex_);
+
+                    cpix_asyncDoHousekeepingResults(&result,
+                                                    jobId);
+
+                    ITK_EXPECT(testMgr,
+                               ReportIfFailed(testMgr,
+                                              &result,
+                                              cpixMutex_),
+                               "Synced doHousekeeping failed");
+
+                    if (cpix_Succeeded(&result))
+                        {
+                            ITK_MSG(testMgr,
+                                    "Housekeeping done");
+                        }
+                }
+            }
+        else
+            {
+                ITK_ASSERT(testMgr,
+                           false,
+                           "Could not get result from async call");
+            }
+    }
+
+
+
+private:
+    static void callback(void               * cookie,
+                         cpix_JobId           jobId)
+    {
+        SyncedHousekeep
+            * this_ = reinterpret_cast<SyncedHousekeep*>(cookie);
+
+        this_->q_.put(jobId);
+    }
+
+
+};
+
+
+
+class SyncedDoc
+{
+    Cpt::SyncQueue<cpix_JobId>   q_;
+
+    Cpt::Mutex                 & cpixMutex_;
+
+public:
+    SyncedDoc(Cpt::Mutex & cpixMutex)
+        : q_(1),
+          cpixMutex_(cpixMutex)
+    {
+        ;
+    }
+
+
+    bool call(cpix_Hits     * hits,
+              int32_t         index,
+              cpix_Document * target,
+              Itk::TestMgr  * testMgr,
+              bool            cancel)
+    {
+        bool
+            rv = false;
+
+        cpix_JobId
+            jobId;
+
+        {
+            Cpt::SyncRegion
+                sr(cpixMutex_);
+
+            jobId = cpix_Hits_asyncDoc(hits,
+                                       index,
+                                       target,
+                                       this,
+                                       &callback);
+        }
+        
+        ITK_ASSERT(testMgr,
+                   ReportIfFailed(testMgr,
+                                  hits,
+                                  cpixMutex_),
+                   "Could not make async doc retrieval");
+
+        if (cancel)
+            {
+                cpix_CancelAction
+                    cancelAction = cpix_CANCEL_ERROR;
+
+                {
+                    Cpt::SyncRegion
+                        sr(cpixMutex_);
+                    
+                    cancelAction = cpix_Hits_cancelDoc(hits,
+                                                       jobId);
+                }
+
+                ReportIfFailed(testMgr,
+                               hits,
+                               cpixMutex_);
+
+                ITK_MSG(testMgr,
+                        "Doc retrieval CANCELLED (%d)",
+                        cancelAction);
+
+                // !!! Even if we did call cancel(), the callback
+                //     might have hitted us before the cancel()
+                //     returned.  we have to clear the status, but we
+                //     must not collect any results.
+                if (q_.full())
+                    {
+                        ITK_MSG(testMgr,
+                                "Clearing cancelled callback status...\n");
+                        cpix_JobId
+                            jobId2;
+                        
+                        bool
+                            couldGet = q_.get(&jobId2);
+                        
+                        ITK_ASSERT(testMgr,
+                                   jobId = jobId2,
+                                   "Messed up callback");
+                        ITK_MSG(testMgr,
+                                "... clearing cancelled callback status done\n");
+                    }
+            }
+        else
+            {
+                cpix_JobId
+                    jobId2;
+
+                bool
+                    couldGet = q_.get(&jobId2);
+
+                ITK_ASSERT(testMgr,
+                           jobId == jobId2,
+                           "Messed up callback");
+
+                if (couldGet)
+                    {
+                        {
+                            Cpt::SyncRegion
+                                sr(cpixMutex_);
+                            
+                            cpix_Hits_asyncDocResults(hits,
+                                                      jobId);
+                        }
+
+                        ITK_EXPECT(testMgr,
+                                   ReportIfFailed(testMgr,
+                                                  hits,
+                                                  cpixMutex_),
+                                   "Synced doc retrieval failed");
+
+                        if (cpix_Succeeded(hits))
+                            {
+                                ITK_MSG(testMgr,
+                                        "Doc is retrieved");
+                                rv = true;
+                            }
+                    }
+                else
+                    {
+                        // not assert: this can happen
+                        ITK_MSG(testMgr,
+                                false,
+                                "Could not get result from async hit doc");
+                    }
+            }
+
+        return rv;
+    }
+
+private:
+    static void callback(void       * cookie,
+                         cpix_JobId   jobId)
+    {
+        SyncedDoc
+            * this_ = reinterpret_cast<SyncedDoc*>(cookie);
+
+        this_->q_.put(jobId);
+    }
+
+};
+
+
+
+
+
+class SyncedSearch
+{
+    Cpt::SyncQueue<cpix_JobId>    q_;
+
+    SyncedDoc                     syncedDoc_;
+
+    cpix_Document                 targetDoc_;
+
+    Cpt::Mutex                  & cpixMutex_;
+
+public:
+    
+    SyncedSearch(Cpt::Mutex & cpixMutex)
+        : q_(1),
+          syncedDoc_(cpixMutex),
+          cpixMutex_(cpixMutex)
+    {
+        ;
+    }
+
+    void call(cpix_IdxSearcher  * searcher,
+              cpix_Query        * query,
+              Itk::TestMgr      * testMgr,
+              bool                cancel)
+    {
+        // cancel = false;
+
+        cpix_JobId
+            jobId;
+
+        {
+            Cpt::SyncRegion
+                sr(cpixMutex_);
+
+            jobId = cpix_IdxSearcher_asyncSearch(searcher,
+                                                 query,
+                                                 this,
+                                                 &callback);
+        }
+
+        ITK_ASSERT(testMgr,
+                   ReportIfFailed(testMgr,
+                                  searcher,
+                                  cpixMutex_),
+                   "Could not make async call");
+
+        if (cancel)
+            {
+                cpix_CancelAction
+                    cancelAction = cpix_CANCEL_ERROR;
+
+                {
+                    Cpt::SyncRegion
+                        sr(cpixMutex_);
+
+                    cancelAction = cpix_IdxSearcher_cancelSearch(searcher,
+                                                                 jobId);
+                }
+
+                ReportIfFailed(testMgr,
+                               searcher,
+                               cpixMutex_);
+                ITK_MSG(testMgr,
+                        "Search/terms query CANCELLED (%d)",
+                        cancelAction);
+
+                // !!! Even if we did call cancel(), the callback
+                //     might have hitted us before the cancel()
+                //     returned.  we have to clear the status, but we
+                //     must not collect any results.
+                if (q_.full())
+                    {
+                        ITK_MSG(testMgr,
+                                "Clearing cancelled callback status...\n");
+                        cpix_JobId
+                            jobId2;
+                        
+                        bool
+                            couldGet = q_.get(&jobId2);
+                        
+                        ITK_ASSERT(testMgr,
+                                   jobId = jobId2,
+                                   "Messed up callback");
+                        ITK_MSG(testMgr,
+                                "... clearing cancelled callback status done\n");
+                    }
+            }
+        else
+            {
+
+                cpix_JobId
+                    jobId2;
+
+                bool
+                    couldGet = q_.get(&jobId2);
+
+                ITK_ASSERT(testMgr,
+                           jobId = jobId2,
+                           "Messed up callback");
+
+                if (couldGet)
+                    {
+                        cpix_Hits
+                            * hits = NULL;
+
+                        {
+                            Cpt::SyncRegion
+                                sr(cpixMutex_);
+                            
+                            hits = cpix_IdxSearcher_asyncSearchResults(searcher,
+                                                                       jobId);
+                        }
+
+                        ITK_EXPECT(testMgr,
+                                   ReportIfFailed(testMgr,
+                                                  searcher,
+                                                  cpixMutex_),
+                                   "Synced search failed");
+
+                        if (cpix_Succeeded(searcher))
+                            {
+                                ITK_MSG(testMgr,
+                                        "Search/terms query SUCCEEDED");
+
+                                printSome(hits,
+                                          testMgr);
+                            }
+
+                        {
+                            Cpt::SyncRegion
+                                sr(cpixMutex_);
+                            cpix_Hits_destroy(hits);
+                        }
+                    }
+                else
+                    {
+                        ITK_ASSERT(testMgr,
+                                   false,
+                                   "Could not get result from async call");
+                    }
+            }
+    }
+     
+
+private:
+
+    
+    void printSome(cpix_Hits    * hits,
+                   Itk::TestMgr * testMgr)
+    {
+        int32_t
+            length;
+
+        {
+            Cpt::SyncRegion
+                sr(cpixMutex_);
+
+            length = cpix_Hits_length(hits);
+        }
+
+        ITK_MSG(testMgr,
+                "Num of hits: %d",
+                length);
+
+        enum 
+        { 
+            // The setup of havig SOME=10 and cancelling doc retrieval
+            // operations for doc indexes on the condition "(i%4) ==
+            // 1" is special because this way
+            //
+            // (a) sooner or later there is going to be an indexing
+            //     operation during doc enumeration, invalidating the
+            //     hits
+            //
+            // (b) the last doc retrieval (index 9) is cancelled,
+            //     after which this hits is destroyed
+            //
+            // (c) this situation previously has crashed CPix
+            SOME = 10
+        };
+
+        for (int32_t i = 0; i < SOME && i < length; ++i)
+            {
+                bool
+                    succeeded = syncedDoc_.call(hits,
+                                                i,
+                                                &targetDoc_,
+                                                testMgr,
+                                                (i%4) == 1); // cancel or not
+
+                if (succeeded)
+                    {
+                        Cpt::SyncRegion
+                            sr(cpixMutex_);
+
+                        PrintHit(&targetDoc_,
+                                 testMgr);
+                    }
+            }
+    }
+
+
+
+    static void callback(void       * cookie,
+                         cpix_JobId   jobId)
+    {
+        SyncedSearch
+            * this_ = reinterpret_cast<SyncedSearch*>(cookie);
+
+        this_->q_.put(jobId);
+    }
+    
+
+};
+
+
+
+
+
+
+class ASyncContext : public Itk::ITestContext
+{
+private:
+    cpix_IdxDb       * idxDb1_;
+    cpix_IdxDb       * idxDb2_;
+    cpix_IdxSearcher * searcher1_;
+    cpix_IdxSearcher * searcher2_;
+    cpix_Analyzer    * analyzerI_; // for indexing thread
+    cpix_Analyzer    * analyzerQ_; // for query thread
+    cpix_QueryParser * queryParser_;
+    cpix_Query       * searchQuery_;
+    cpix_Query       * termsQuery_;
+    
+    LineTestCorpusRef  corpus_;
+
+    /* OBS
+    // TODO currently protecting both 
+    //   (a) CPix (non-thread safe) and
+    //   (b) Itk::TestMgr
+    //
+    // turn Itk::TestMgr into protected later, this tempMutex_ is to
+    // be renamed, and it should protect individual calls (async and
+    // sync alike) to cpix within these tests
+    Cpt::Mutex         tempMutex_;
+    */
+
+    /**
+     * CPix itself is to be invoked from a single thread (API is not
+     * multithreaded). However, for sake of clarity, we test it from
+     * multiple threads (indexing and searching client threads). So
+     * they need to be synchronized - no two threads should be
+     * executing an sync / async call at the same time.
+     */
+    Cpt::Mutex         cpixMutex_;
+
+    bool               keepIndexing_;
+
+    Itk::TestMgr     * testMgr_;
+
+    /**
+     * Statistics POV
+     */
+    struct Stats
+    {
+        std::string    name_;
+        size_t         opCount_;
+        long           opSumMs_;
+
+        Stats(const char * name)
+            : name_(name),
+              opCount_(0),
+              opSumMs_(0)
+        {
+            ;
+        }
+
+
+        void update(Cpt::StopperWatch & stopperWatch)
+        {
+            ++opCount_;
+            opSumMs_ += stopperWatch.elapsedMSecs();
+        }
+
+        
+        void output(Itk::TestMgr * testMgr)
+        {
+            if (opCount_ > 0)
+                {
+                    ITK_REPORT(testMgr,
+                               name_.c_str(),
+                               "%ld ops in %ld ms (%.3f ms/op)",
+                               opCount_,
+                               opSumMs_,
+                               static_cast<double>(opSumMs_) 
+                               / static_cast<double>(opCount_));
+                }
+            else
+                {
+                    ITK_REPORT(testMgr,
+                               name_.c_str(),
+                               "%ld ops in %ld ms (### ms/op)",
+                               opCount_,
+                               opSumMs_);
+                }
+        }
+
+
+        void reset()
+        {
+            opCount_ = 0;
+            opSumMs_ = 0;
+        }
+    };
+
+
+    // statistics for indexing
+    Stats              addStats_;
+    Stats              delStats_;
+    Stats              updateStats_;
+
+    // statistics for search
+    Stats              searchStats_;
+    Stats              termsStats_;
+
+public:
+    virtual void setup() throw (Itk::PanicExc)
+    {
+        SetupSentry
+            ss(*this);
+
+        cpix_Result
+            result;
+
+        cpix_IdxDb_dbgScrapAll(&result);
+        if (cpix_Failed(&result))
+            {
+                ITK_PANIC("Could not dbg scrapp all indexes");
+            }
+
+        const Volume
+            *p = Volumes;
+
+        for (; p->qbac_ != NULL; ++p)
+            {
+                cpix_IdxDb_defineVolume(&result,
+                                        p->qbac_,
+                                        p->path_);
+
+                if (cpix_Failed(&result))
+                    {
+                        ITK_PANIC("Could not define volume '%s'",
+                                  p->qbac_);
+                    }
+            }
+
+        cpix_IdxDb_defineVolume(&result,
+                                SMS_QBASEAPPCLASS,
+                                NULL);
+
+        if (cpix_Failed(&result))
+            {
+                ITK_PANIC("Could not define index for SMS");
+            }
+
+        idxDb1_ = cpix_IdxDb_openDb(&result,
+                                    SMS_QBASEAPPCLASS,
+                                    cpix_IDX_CREATE);
+        if (cpix_Failed(&result))
+            {
+                ITK_PANIC("Could not open(create) index for SMS");
+            }
+
+        static const char
+            sms2Qbac[] = SMS_QBASEAPPCLASS "2";
+
+        cpix_IdxDb_defineVolume(&result,
+                                sms2Qbac,
+                                NULL);
+
+        if (cpix_Failed(&result))
+            {
+                ITK_PANIC("Could not define index for SMS(2)");
+            }
+
+        idxDb2_ = cpix_IdxDb_openDb(&result,
+                                    sms2Qbac,
+                                    cpix_IDX_CREATE);
+        if (cpix_Failed(&result))
+            {
+                ITK_PANIC("Could not open(create) index for SMS(2)");
+            }
+
+        searcher1_ = cpix_IdxSearcher_openDb(&result,
+                                             "root msg");
+        if (searcher1_ == NULL)
+            {
+                ITK_PANIC("Could not create searcher");
+            }
+
+        searcher2_ = cpix_IdxSearcher_openDb(&result,
+                                             "root");
+        if (searcher2_ == NULL)
+            {
+                ITK_PANIC("Could not create searcher");
+            }
+        
+        analyzerI_ = cpix_CreateSimpleAnalyzer(&result);
+        
+        if (analyzerI_ == NULL)
+            {
+                ITK_PANIC("Could not create analyzer");
+            }
+
+        analyzerQ_ = cpix_CreateSimpleAnalyzer(&result);
+        
+        if (analyzerQ_ == NULL)
+            {
+                ITK_PANIC("Could not create analyzer");
+            }
+
+        queryParser_ = cpix_QueryParser_create(&result,
+                                               LCPIX_DEFAULT_FIELD,
+                                               analyzerQ_);
+        if (queryParser_ == NULL)
+            {
+                ITK_PANIC("Could not create query parser");
+            }
+        
+        searchQuery_ = cpix_QueryParser_parse(queryParser_,
+                                              L"happy");
+        if (cpix_Failed(queryParser_)
+            || searchQuery_ == NULL)
+            {
+                ITK_PANIC("Could not parse search query");
+            }
+
+        cpix_QueryParser_destroy(queryParser_);
+        queryParser_ = NULL;
+
+        queryParser_ = cpix_QueryParser_create(&result,
+                                               L"SecondaryTokens",
+                                               analyzerQ_);
+        if (queryParser_ == NULL)
+            {
+                ITK_PANIC("Could not create query parser");
+            }
+
+        termsQuery_ = cpix_QueryParser_parse(queryParser_,
+                                             L"ha* OR _aggregate:happy");
+        if (cpix_Failed(queryParser_)
+            || termsQuery_ == NULL)
+            {
+                ITK_PANIC("Could not parse terms query");
+            }
+
+        ss.setupComplete();
+    }
+
+
+    virtual void tearDown() throw()
+    {
+        cleanup();
+    }
+
+
+    virtual ~ASyncContext()
+    {
+        cleanup();
+    }
+
+
+    //
+    // public operations
+    //
+    ASyncContext()
+        : idxDb1_(NULL),
+          idxDb2_(NULL),
+          searcher1_(NULL),
+          searcher2_(NULL),
+          analyzerI_(NULL),
+          analyzerQ_(NULL),
+          queryParser_(NULL),
+          searchQuery_(NULL),
+          termsQuery_(NULL),
+          corpus_(DEFAULT_TEST_CORPUS_PATH),
+          keepIndexing_(true),
+          addStats_("adding"),
+          delStats_("deleting"),
+          updateStats_("updating"),
+          searchStats_("searching"),
+          termsStats_("term-qrying")
+    {
+        ;
+    }
+
+
+    
+    void testSingleThreadIdx(Itk::TestMgr * testMgr)
+    {
+        testMgr_ = testMgr;
+
+        // cpix_setLogLevel(CPIX_LL_DEBUG);
+
+        pthread_t
+            indexerThreadHndl;
+
+        void
+            * rv;
+
+        int
+            result = pthread_create(&indexerThreadHndl,
+                                    NULL,
+                                    &IndexerThreadFunc,
+                                    this);
+        ITK_ASSERT(testMgr,
+                   result == 0,
+                   "Could not create POSIX thread for indexing");
+
+        printf("MAIN: Created indexer thread\n");
+
+        printf("MAIN: waiting for worker thread (indexer)\n");
+
+        result = pthread_join(indexerThreadHndl,
+                              &rv);
+        
+        ITK_EXPECT(testMgr,
+                   result == 0,
+                   "Could not join indexer thread");
+
+        // cpix_setLogLevel(CPIX_LL_TRACE);
+
+        printStatistics();
+    }
+    
+
+
+    void testSingleThreadQry(Itk::TestMgr * testMgr)
+    {
+        testMgr_ = testMgr;
+
+        // cpix_setLogLevel(CPIX_LL_DEBUG);
+
+        pthread_t
+            searcherThreadHndl;
+
+        void
+            * rv;
+
+        int
+            result = pthread_create(&searcherThreadHndl,
+                                    NULL,
+                                    &SearcherThreadFunc,
+                                    this);
+        ITK_ASSERT(testMgr,
+                   result == 0,
+                   "Could not create POSIX thread for searching");
+
+        printf("MAIN: Created searcher thread\n");
+
+        printf("MAIN: waiting for worker thread (searcher)\n");
+
+        result = pthread_join(searcherThreadHndl,
+                              &rv);
+
+        ITK_EXPECT(testMgr,
+                   result == 0,
+                   "Could not join searcher thread");
+
+        // cpix_setLogLevel(CPIX_LL_TRACE);
+
+        printStatistics();
+    }
+
+
+
+    void testMultiThreads(Itk::TestMgr * testMgr)
+    {
+        cleanup();
+        setup();
+
+        testMgr_ = testMgr;
+
+        // cpix_setLogLevel(CPIX_LL_DEBUG);
+
+        pthread_t
+            indexerThreadHndl,
+            searcherThreadHndl;
+
+        void
+            * rv;
+
+        int
+            result = pthread_create(&indexerThreadHndl,
+                                    NULL,
+                                    &IndexerThreadFunc,
+                                    this);
+        ITK_ASSERT(testMgr,
+                   result == 0,
+                   "Could not create POSIX thread for indexing");
+
+        printf("MAIN: Created indexer thread\n");
+
+        result = pthread_create(&searcherThreadHndl,
+                                NULL,
+                                &SearcherThreadFunc,
+                                this);
+
+        if (result != 0)
+            {
+                keepIndexing_ = false;
+
+                result = pthread_join(indexerThreadHndl,
+                                      &rv);
+                ITK_EXPECT(testMgr,
+                           result == 0,
+                           "Could not force closure of indexer thread");
+                ITK_ASSERT(testMgr,
+                           false,
+                           "Could not create POSIX thread for searching");
+            }
+
+
+        printf("MAIN: Created searcher thread\n");
+        
+        printf("MAIN: waiting for both worker threads\n");
+
+        result = pthread_join(indexerThreadHndl,
+                              &rv);
+        
+        ITK_EXPECT(testMgr,
+                   result == 0,
+                   "Could not join indexer thread");
+
+        result = pthread_join(searcherThreadHndl,
+                              &rv);
+
+        ITK_EXPECT(testMgr,
+                   result == 0,
+                   "Could not join searcher thread");
+
+        // cpix_setLogLevel(CPIX_LL_TRACE);
+
+        printStatistics();
+    }
+
+
+private:
+    void cleanup()
+    {
+        cpix_IdxDb_releaseDb(idxDb1_);
+        idxDb1_ = NULL;
+
+        cpix_IdxDb_releaseDb(idxDb2_);
+        idxDb2_ = NULL;
+        
+        cpix_IdxSearcher_releaseDb(searcher1_);
+        searcher1_ = NULL;
+
+        cpix_IdxSearcher_releaseDb(searcher2_);
+        searcher2_ = NULL;
+
+        cpix_Analyzer_destroy(analyzerI_);
+        analyzerI_ = NULL;
+
+        cpix_Analyzer_destroy(analyzerQ_);
+        analyzerQ_ = NULL;
+
+        cpix_QueryParser_destroy(queryParser_);
+        queryParser_ = NULL;
+
+        cpix_Query_destroy(searchQuery_);
+        searchQuery_ = NULL;
+
+        cpix_Query_destroy(termsQuery_);
+        termsQuery_ = NULL;
+    }
+
+    
+    void printStatistics()
+    {
+        addStats_.output(testMgr_);
+        delStats_.output(testMgr_);
+        updateStats_.output(testMgr_);
+        searchStats_.output(testMgr_);
+        termsStats_.output(testMgr_);
+    }
+
+
+    void * indexStuff()
+    {
+        using namespace std;
+        using namespace Cpt;
+
+        enum 
+        { ROUNDS        = 200, 
+          SAMPLE_START  = 180,
+          ACTION_PERIOD = 5,
+        };
+
+        Cpt::StopperWatch
+            stopperWatch;
+
+        SyncedAdd
+            syncedAdd(cpixMutex_);
+        SyncedDel
+            syncedDel(cpixMutex_);
+        SyncedUpdate
+            syncedUpdate(cpixMutex_);
+        SyncedHousekeep
+            syncedHousekeep(cpixMutex_);
+
+        try
+            {
+                int
+                    curId = 0;
+                
+                for (size_t i = 0; i < ROUNDS && keepIndexing_; ++i)
+                    {
+                        if (i == SAMPLE_START)
+                            {
+                                addStats_.reset();
+                                delStats_.reset();
+                                updateStats_.reset();
+                            }
+
+                        ++curId;
+                        std::wstring
+                            body(corpus_.item(curId));
+                        
+                        stopperWatch.reset();
+
+                        cpix_Document
+                            * doc = CreateSmsDoc(testMgr_,
+                                                 curId,
+                                                 body.c_str(),
+                                                 cpixMutex_);
+                        
+                        syncedAdd.call(idxDb1_,
+                                       doc,
+                                       analyzerI_,
+                                       testMgr_);
+
+                        {
+                            Cpt::SyncRegion
+                                sr(cpixMutex_);
+                            cpix_Document_destroy(doc);
+                            doc = NULL;
+                        }
+
+                        addStats_.update(stopperWatch);
+
+                        stopperWatch.reset();
+
+                        doc = CreateSmsDoc(testMgr_,
+                                           curId,
+                                           body.c_str(),
+                                           cpixMutex_);
+                        
+                        syncedAdd.call(idxDb2_,
+                                       doc,
+                                       analyzerI_,
+                                       testMgr_);
+
+                        {
+                            Cpt::SyncRegion
+                                sr(cpixMutex_);
+                            cpix_Document_destroy(doc);
+                            doc = NULL;
+                        }
+
+                        addStats_.update(stopperWatch);
+
+                        if (curId % ACTION_PERIOD == 0)
+                            {
+                                stopperWatch.reset();
+
+                                wstring
+                                    docUid = GetItemId(curId/ACTION_PERIOD);
+
+                                syncedDel.call(idxDb1_,
+                                               docUid.c_str(),
+                                               testMgr_);
+
+                                delStats_.update(stopperWatch);
+
+                                stopperWatch.reset();
+
+                                doc = CreateSmsDoc(testMgr_,
+                                                   curId/ACTION_PERIOD,
+                                                   L"happy bla",
+                                                   cpixMutex_);
+
+                                syncedUpdate.call(idxDb2_,
+                                                  doc,
+                                                  analyzerI_,
+                                                  testMgr_);
+                                
+                                {
+                                    Cpt::SyncRegion
+                                        sr(cpixMutex_);
+                                    cpix_Document_destroy(doc);
+                                    doc = NULL;
+                                }
+
+                                updateStats_.update(stopperWatch);
+
+                                ITK_DBGMSG(testMgr_, "+");
+
+                                syncedHousekeep.call(testMgr_);
+                            }
+                        else
+                            {
+                                ITK_DBGMSG(testMgr_, ".");
+                            }
+                    }
+            }
+        catch (...)
+            {
+                ITK_EXPECT(testMgr_,
+                           false,
+                           "INDEXER: Failed indexing\n");
+                throw;
+            }
+
+        syncedHousekeep.call(testMgr_);
+
+        return NULL;
+    }
+
+
+    static void * IndexerThreadFunc(void * p)
+    {
+        ASyncContext
+            * thisPtr = reinterpret_cast<ASyncContext*>(p);
+        return thisPtr->indexStuff();
+    }
+
+
+    void * searchStuff()
+    {
+        using namespace Cpt;
+
+        sleep(43);
+
+        // on emulator, this amount of search happens together with
+        // indexing/deleting/updating, the rest runs alone, therefore
+        // distoring statistics (we are interested in how slow
+        // searching when (a) runs alone ("sq" test case) or when (b) runs mixed
+        // with indexing ("mt" test case).
+        enum 
+        { 
+            ROUNDS       = 21,
+            SAMPLE_START = 1
+        };
+
+        Cpt::StopperWatch
+            stopperWatch;
+
+        SyncedSearch
+            syncedSearch(cpixMutex_);
+
+        try
+            {
+                for (size_t i = 0; i < ROUNDS; ++i)
+                    {
+                        if (i == SAMPLE_START)
+                            {
+                                searchStats_.reset();
+                                termsStats_.reset();
+                            }
+
+                        stopperWatch.reset();
+                            syncedSearch.call(searcher1_,
+                                              searchQuery_,
+                                              testMgr_,
+                                              i%2 == 1);
+
+                        searchStats_.update(stopperWatch);
+
+                        stopperWatch.reset();
+                            syncedSearch.call(searcher2_,
+                                              termsQuery_,
+                                              testMgr_,
+                                              i%2 == 1);
+
+                        termsStats_.update(stopperWatch);
+                        
+                        ITK_DBGMSG(testMgr_, ":");
+                    }
+            }
+        catch (...)
+            {
+                ITK_EXPECT(testMgr_,
+                           false,
+                           "SEARCHER: Failed searching\n");
+                throw;
+            }
+
+        return NULL;
+    }
+
+
+    static void * SearcherThreadFunc(void * p)
+    {
+        ASyncContext
+            * thisPtr = reinterpret_cast<ASyncContext*>(p);
+        return thisPtr->searchStuff();
+    }
+
+
+};
+
+
+
+Itk::TesterBase * CreateASyncTests()
+{
+    using namespace Itk;
+
+    ASyncContext
+        * asyncContext = new ASyncContext();
+    ContextTester
+        * contextTester = new ContextTester("async",
+                                           asyncContext);
+
+#define TEST "si"
+    contextTester->add(TEST,
+                       asyncContext,
+                       &ASyncContext::testSingleThreadIdx,
+                       TEST);
+#undef TEST
+
+#define TEST "sq"
+    contextTester->add(TEST,
+                       asyncContext,
+                       &ASyncContext::testSingleThreadQry,
+                       TEST,
+                       "CANCELLED"); // cancellation may happen in
+                                     // different ways
+#undef TEST
+
+#define TEST "mt"
+    contextTester->add(TEST,
+                       asyncContext,
+                       &ASyncContext::testMultiThreads,
+                       TEST,
+                       SuiteTester::REDIRECT_ONLY);
+#undef TEST
+
+    return contextTester;
+}