searchengine/util/tsrc/cpixtoolsunittest/src/syncqueuetests.cpp
changeset 0 671dee74050a
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/searchengine/util/tsrc/cpixtoolsunittest/src/syncqueuetests.cpp	Mon Apr 19 14:40:16 2010 +0300
@@ -0,0 +1,409 @@
+/*
+* Copyright (c) 2010 Nokia Corporation and/or its subsidiary(-ies).
+* All rights reserved.
+* This component and the accompanying materials are made available
+* under the terms of "Eclipse Public License v1.0"
+* which accompanies this distribution, and is available
+* at the URL "http://www.eclipse.org/legal/epl-v10.html".
+*
+* Initial Contributors:
+* Nokia Corporation - initial contribution.
+*
+* Contributors:
+*
+* Description: 
+*
+*/
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <list>
+
+#include "itk.h"
+
+#include "cpixsynctools.h"
+#include "cpixsyncqueue.h"
+
+
+#define LENIENT_STR "#IGNORE "
+
+#define MSG(fmt, args...)  printf(fmt, ##args); fflush(stdout);
+
+
+typedef Cpt::SyncQueue<int>    IntQueue;
+
+
+struct ProducerParam
+{
+    IntQueue                 * queue_;
+    int                        maxCount_;
+    int                        maxSleepMs_;
+    Itk::TestMgr             * testMgr_;
+};
+
+
+void * producer(void * param)
+{
+    struct ProducerParam
+        const * p = reinterpret_cast<ProducerParam const*>(param);
+
+    int
+        ms = rand();
+    ms = ms % p->maxSleepMs_;
+
+    enum 
+    { 
+        MSEC_PER_SEC = 1000,
+        NSEC_PER_MSEC = 1000000
+    };
+
+    struct timespec 
+        tim;
+    tim.tv_sec = ms / MSEC_PER_SEC;
+    tim.tv_nsec = ms % MSEC_PER_SEC * NSEC_PER_MSEC;
+            
+    MSG("[B:%d] ",
+        ms);
+
+    try
+        {
+            bool
+                goOn = true;
+
+            for (int i = 0; 
+                 i <= p->maxCount_ && goOn; 
+                 ++i)
+                {
+                    
+                    nanosleep(&tim,
+                              NULL);
+                    goOn = p->queue_->put(i);
+                    MSG(">%d ",
+                           i);
+                    ITK_DBGMSG(p->testMgr_, ":");
+                }
+        }
+    catch (Cpt::SyncExc & exc)
+        {
+            // we should not get exceptions
+            MSG("\nSync exception in producer ");
+        }
+
+    MSG("\n" LENIENT_STR "[E] ");
+
+    return NULL;
+}
+
+
+/**
+ * A simple integer threads can add to in a synchronized way.
+ */
+class SyncSum : public Cpt::SyncEntity
+{
+    int        sum_;
+public:
+    SyncSum()
+        : sum_(0)
+    {
+        ;
+    }
+
+    
+    int sum()
+    {
+        { // SYNC
+            Cpt::SyncRegion
+                sr(*this);
+            
+            return sum_;
+
+        } // SYNC
+    }
+
+
+    void inc(int delta)
+    {
+        { // SYNC
+            Cpt::SyncRegion
+                sr(*this);
+            
+            sum_ += delta;
+
+        } // SYNC
+    }
+};
+
+
+struct ConsumerParam
+{
+    IntQueue                 * queue_;
+    SyncSum                  * sum_;
+    Itk::TestMgr             * testMgr_;
+};
+
+
+void * consumer(void * param)
+{
+    struct ConsumerParam
+        const * p = reinterpret_cast<ConsumerParam const *>(param);
+
+    MSG("(B) ");
+
+    bool
+        x = false;
+
+    try
+        {
+            bool
+                goOn = true;
+
+            while (goOn)
+                {
+                    MSG(x ? "xB " : ". ");
+                    int
+                        i;
+                    goOn = p->queue_->get(&i);
+                    if (goOn)
+                        {
+                            p->sum_->inc(i);
+                            MSG("<%d ",
+                                   i);
+                        }
+                    else
+                        {
+                            MSG("\n#IGNORE !X ");
+                            x = true;
+                        }
+                    MSG(x ? "xA " : ". ");
+                    ITK_DBGMSG(p->testMgr_, ".");
+                }
+        }
+    catch (Cpt::SyncExc & exc)
+        {
+            // we should not get exceptions
+            MSG("\nSync exception in consumer ");
+        }
+
+    MSG("\n" LENIENT_STR "(E) ");
+
+    return NULL;
+}
+
+
+
+void testMultiThreadSession(Itk::TestMgr * testMgr,
+                            int            count,
+                            int            mainSleepSec)
+{
+    IntQueue
+        queue(5);
+    SyncSum
+        sum;
+    
+    struct ProducerParam
+        producerParam = {
+        &queue,
+        10,  // max count
+        1100,    // max sleep millisecs
+        testMgr
+    };
+
+    struct ConsumerParam
+        consumerParam = {
+        &queue,
+        &sum,
+        testMgr
+    };
+
+    std::list<pthread_t>
+        threadHndls;
+
+    int
+        result;
+
+    MSG("Creating %d producers and %d consumers\n",
+        count,
+        count);
+
+    MSG(LENIENT_STR);
+
+    for (int i = 0; i < count; ++i)
+        {
+            pthread_t
+                threadHndl;
+
+            result = pthread_create(&threadHndl,
+                                    NULL,
+                                    &producer,
+                                    &producerParam);
+            ITK_ASSERT(testMgr,
+                       result == 0,
+                       "Could not create producer thread %d",
+                       2*i);
+            
+            threadHndls.push_back(threadHndl);
+
+            result = pthread_create(&threadHndl,
+                                    NULL,
+                                    &consumer,
+                                    &consumerParam);
+
+            ITK_ASSERT(testMgr,
+                       result == 0,
+                       "Could not create consumer thread %d",
+                       2*i + 1);
+
+            threadHndls.push_back(threadHndl);
+        }
+
+    result = sleep(mainSleepSec);
+
+    MSG("forcing stop ");
+            
+    queue.stopProcessing();
+            
+    MSG("forced stop ");
+
+    MSG("joining threads ");
+
+    int
+        jIdx = 0;
+
+    std::list<pthread_t>::iterator
+        i = threadHndls.begin(),
+        end = threadHndls.end();
+    for (; i != end; ++i)
+        {
+            void
+                * retVal;
+
+            MSG("j:%d ",
+                jIdx);
+
+            result = pthread_join(*i,
+                                  &retVal);
+            ITK_EXPECT(testMgr,
+                       result == 0,
+                       "Failed to join %s thread %d",
+                       ((jIdx % 2) == 0 ? "producer" : "consumer"),
+                       jIdx);
+
+            ++jIdx;
+        }
+
+    MSG("\nJoined all workers\n");
+    MSG("\n" LENIENT_STR "SUM: %d\n",
+        sum.sum());
+    MSG("\n session done.");
+}
+
+
+
+void testMTEarlyStop1(Itk::TestMgr * testMgr)
+{
+    testMultiThreadSession(testMgr,
+                           1,     // count
+                           2);     // sleep
+}
+
+
+void testMTEarlyStop2(Itk::TestMgr * testMgr)
+{
+    testMultiThreadSession(testMgr,
+                           2,     // count
+                           2);     // sleep
+}
+
+
+void testMTEarlyStop5(Itk::TestMgr * testMgr)
+{
+    testMultiThreadSession(testMgr,
+                           5,     // count
+                           2);     // sleep
+}
+
+
+void testMTLateStop1(Itk::TestMgr * testMgr)
+{
+    testMultiThreadSession(testMgr,
+                           1,     // count
+                           12);     // sleep
+}
+
+
+void testMTLateStop2(Itk::TestMgr * testMgr)
+{
+    testMultiThreadSession(testMgr,
+                           2,     // count
+                           12);    // sleep
+}
+
+
+void testMTLateStop5(Itk::TestMgr * testMgr)
+{
+    testMultiThreadSession(testMgr,
+                           5,     // count
+                           12);     // sleep
+}
+
+
+
+
+
+Itk::TesterBase * CreateSyncQueueTests()
+{
+    using namespace Itk;
+
+    SuiteTester
+        * syncQueueTests = new SuiteTester("syncqueue");
+
+#define TEST "earlyStop1"
+    syncQueueTests->add(TEST,
+                        testMTEarlyStop1,
+                        TEST,
+                        LENIENT_STR);
+#undef TEST
+
+#define TEST "earlyStop2"
+    syncQueueTests->add(TEST,
+                        testMTEarlyStop2,
+                        TEST,
+                        LENIENT_STR);
+#undef TEST
+
+#define TEST "earlyStop5"
+    syncQueueTests->add(TEST,
+                        testMTEarlyStop5,
+                        TEST,
+                        LENIENT_STR);
+#undef TEST
+
+#define TEST "lateStop1"
+    syncQueueTests->add(TEST,
+                        testMTLateStop1,
+                        TEST,
+                        LENIENT_STR);
+#undef TEST
+
+#define TEST "lateStop2"
+    syncQueueTests->add(TEST,
+                        testMTLateStop2,
+                        TEST,
+                        LENIENT_STR);
+#undef TEST
+
+#define TEST "lateStop5"
+    syncQueueTests->add(TEST,
+                        testMTLateStop5,
+                        TEST,
+                        LENIENT_STR);
+#undef TEST
+
+    // ... add more tests to suite
+    
+    return syncQueueTests;
+    
+}