--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/searchengine/util/tsrc/cpixtoolsunittest/src/syncqueuetests.cpp Mon Apr 19 14:40:16 2010 +0300
@@ -0,0 +1,409 @@
+/*
+* Copyright (c) 2010 Nokia Corporation and/or its subsidiary(-ies).
+* All rights reserved.
+* This component and the accompanying materials are made available
+* under the terms of "Eclipse Public License v1.0"
+* which accompanies this distribution, and is available
+* at the URL "http://www.eclipse.org/legal/epl-v10.html".
+*
+* Initial Contributors:
+* Nokia Corporation - initial contribution.
+*
+* Contributors:
+*
+* Description:
+*
+*/
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <list>
+
+#include "itk.h"
+
+#include "cpixsynctools.h"
+#include "cpixsyncqueue.h"
+
+
+#define LENIENT_STR "#IGNORE "
+
+#define MSG(fmt, args...) printf(fmt, ##args); fflush(stdout);
+
+
+typedef Cpt::SyncQueue<int> IntQueue;
+
+
+struct ProducerParam
+{
+ IntQueue * queue_;
+ int maxCount_;
+ int maxSleepMs_;
+ Itk::TestMgr * testMgr_;
+};
+
+
+void * producer(void * param)
+{
+ struct ProducerParam
+ const * p = reinterpret_cast<ProducerParam const*>(param);
+
+ int
+ ms = rand();
+ ms = ms % p->maxSleepMs_;
+
+ enum
+ {
+ MSEC_PER_SEC = 1000,
+ NSEC_PER_MSEC = 1000000
+ };
+
+ struct timespec
+ tim;
+ tim.tv_sec = ms / MSEC_PER_SEC;
+ tim.tv_nsec = ms % MSEC_PER_SEC * NSEC_PER_MSEC;
+
+ MSG("[B:%d] ",
+ ms);
+
+ try
+ {
+ bool
+ goOn = true;
+
+ for (int i = 0;
+ i <= p->maxCount_ && goOn;
+ ++i)
+ {
+
+ nanosleep(&tim,
+ NULL);
+ goOn = p->queue_->put(i);
+ MSG(">%d ",
+ i);
+ ITK_DBGMSG(p->testMgr_, ":");
+ }
+ }
+ catch (Cpt::SyncExc & exc)
+ {
+ // we should not get exceptions
+ MSG("\nSync exception in producer ");
+ }
+
+ MSG("\n" LENIENT_STR "[E] ");
+
+ return NULL;
+}
+
+
+/**
+ * A simple integer threads can add to in a synchronized way.
+ */
+class SyncSum : public Cpt::SyncEntity
+{
+ int sum_;
+public:
+ SyncSum()
+ : sum_(0)
+ {
+ ;
+ }
+
+
+ int sum()
+ {
+ { // SYNC
+ Cpt::SyncRegion
+ sr(*this);
+
+ return sum_;
+
+ } // SYNC
+ }
+
+
+ void inc(int delta)
+ {
+ { // SYNC
+ Cpt::SyncRegion
+ sr(*this);
+
+ sum_ += delta;
+
+ } // SYNC
+ }
+};
+
+
+struct ConsumerParam
+{
+ IntQueue * queue_;
+ SyncSum * sum_;
+ Itk::TestMgr * testMgr_;
+};
+
+
+void * consumer(void * param)
+{
+ struct ConsumerParam
+ const * p = reinterpret_cast<ConsumerParam const *>(param);
+
+ MSG("(B) ");
+
+ bool
+ x = false;
+
+ try
+ {
+ bool
+ goOn = true;
+
+ while (goOn)
+ {
+ MSG(x ? "xB " : ". ");
+ int
+ i;
+ goOn = p->queue_->get(&i);
+ if (goOn)
+ {
+ p->sum_->inc(i);
+ MSG("<%d ",
+ i);
+ }
+ else
+ {
+ MSG("\n#IGNORE !X ");
+ x = true;
+ }
+ MSG(x ? "xA " : ". ");
+ ITK_DBGMSG(p->testMgr_, ".");
+ }
+ }
+ catch (Cpt::SyncExc & exc)
+ {
+ // we should not get exceptions
+ MSG("\nSync exception in consumer ");
+ }
+
+ MSG("\n" LENIENT_STR "(E) ");
+
+ return NULL;
+}
+
+
+
+void testMultiThreadSession(Itk::TestMgr * testMgr,
+ int count,
+ int mainSleepSec)
+{
+ IntQueue
+ queue(5);
+ SyncSum
+ sum;
+
+ struct ProducerParam
+ producerParam = {
+ &queue,
+ 10, // max count
+ 1100, // max sleep millisecs
+ testMgr
+ };
+
+ struct ConsumerParam
+ consumerParam = {
+ &queue,
+ &sum,
+ testMgr
+ };
+
+ std::list<pthread_t>
+ threadHndls;
+
+ int
+ result;
+
+ MSG("Creating %d producers and %d consumers\n",
+ count,
+ count);
+
+ MSG(LENIENT_STR);
+
+ for (int i = 0; i < count; ++i)
+ {
+ pthread_t
+ threadHndl;
+
+ result = pthread_create(&threadHndl,
+ NULL,
+ &producer,
+ &producerParam);
+ ITK_ASSERT(testMgr,
+ result == 0,
+ "Could not create producer thread %d",
+ 2*i);
+
+ threadHndls.push_back(threadHndl);
+
+ result = pthread_create(&threadHndl,
+ NULL,
+ &consumer,
+ &consumerParam);
+
+ ITK_ASSERT(testMgr,
+ result == 0,
+ "Could not create consumer thread %d",
+ 2*i + 1);
+
+ threadHndls.push_back(threadHndl);
+ }
+
+ result = sleep(mainSleepSec);
+
+ MSG("forcing stop ");
+
+ queue.stopProcessing();
+
+ MSG("forced stop ");
+
+ MSG("joining threads ");
+
+ int
+ jIdx = 0;
+
+ std::list<pthread_t>::iterator
+ i = threadHndls.begin(),
+ end = threadHndls.end();
+ for (; i != end; ++i)
+ {
+ void
+ * retVal;
+
+ MSG("j:%d ",
+ jIdx);
+
+ result = pthread_join(*i,
+ &retVal);
+ ITK_EXPECT(testMgr,
+ result == 0,
+ "Failed to join %s thread %d",
+ ((jIdx % 2) == 0 ? "producer" : "consumer"),
+ jIdx);
+
+ ++jIdx;
+ }
+
+ MSG("\nJoined all workers\n");
+ MSG("\n" LENIENT_STR "SUM: %d\n",
+ sum.sum());
+ MSG("\n session done.");
+}
+
+
+
+void testMTEarlyStop1(Itk::TestMgr * testMgr)
+{
+ testMultiThreadSession(testMgr,
+ 1, // count
+ 2); // sleep
+}
+
+
+void testMTEarlyStop2(Itk::TestMgr * testMgr)
+{
+ testMultiThreadSession(testMgr,
+ 2, // count
+ 2); // sleep
+}
+
+
+void testMTEarlyStop5(Itk::TestMgr * testMgr)
+{
+ testMultiThreadSession(testMgr,
+ 5, // count
+ 2); // sleep
+}
+
+
+void testMTLateStop1(Itk::TestMgr * testMgr)
+{
+ testMultiThreadSession(testMgr,
+ 1, // count
+ 12); // sleep
+}
+
+
+void testMTLateStop2(Itk::TestMgr * testMgr)
+{
+ testMultiThreadSession(testMgr,
+ 2, // count
+ 12); // sleep
+}
+
+
+void testMTLateStop5(Itk::TestMgr * testMgr)
+{
+ testMultiThreadSession(testMgr,
+ 5, // count
+ 12); // sleep
+}
+
+
+
+
+
+Itk::TesterBase * CreateSyncQueueTests()
+{
+ using namespace Itk;
+
+ SuiteTester
+ * syncQueueTests = new SuiteTester("syncqueue");
+
+#define TEST "earlyStop1"
+ syncQueueTests->add(TEST,
+ testMTEarlyStop1,
+ TEST,
+ LENIENT_STR);
+#undef TEST
+
+#define TEST "earlyStop2"
+ syncQueueTests->add(TEST,
+ testMTEarlyStop2,
+ TEST,
+ LENIENT_STR);
+#undef TEST
+
+#define TEST "earlyStop5"
+ syncQueueTests->add(TEST,
+ testMTEarlyStop5,
+ TEST,
+ LENIENT_STR);
+#undef TEST
+
+#define TEST "lateStop1"
+ syncQueueTests->add(TEST,
+ testMTLateStop1,
+ TEST,
+ LENIENT_STR);
+#undef TEST
+
+#define TEST "lateStop2"
+ syncQueueTests->add(TEST,
+ testMTLateStop2,
+ TEST,
+ LENIENT_STR);
+#undef TEST
+
+#define TEST "lateStop5"
+ syncQueueTests->add(TEST,
+ testMTLateStop5,
+ TEST,
+ LENIENT_STR);
+#undef TEST
+
+ // ... add more tests to suite
+
+ return syncQueueTests;
+
+}