searchengine/util/tsrc/cpixtoolsunittest/src/jobqueuetests.cpp
changeset 0 671dee74050a
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/searchengine/util/tsrc/cpixtoolsunittest/src/jobqueuetests.cpp	Mon Apr 19 14:40:16 2010 +0300
@@ -0,0 +1,351 @@
+/*
+* Copyright (c) 2010 Nokia Corporation and/or its subsidiary(-ies).
+* All rights reserved.
+* This component and the accompanying materials are made available
+* under the terms of "Eclipse Public License v1.0"
+* which accompanies this distribution, and is available
+* at the URL "http://www.eclipse.org/legal/epl-v10.html".
+*
+* Initial Contributors:
+* Nokia Corporation - initial contribution.
+*
+* Contributors:
+*
+* Description: 
+*
+*/
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <list>
+#include <vector>
+
+#include "itk.h"
+
+#include "cpixjobqueue.h"
+
+#define MSG(fmt, args...)  printf(fmt, ##args); fflush(stdout);
+
+class Job
+{
+    int        id_;
+    int        rounds_;
+
+    bool       cancelled_;
+    
+public:
+    Job(int id,
+        int rounds)
+        : id_(id),
+          rounds_(rounds),
+          cancelled_(false)
+    {
+        ;
+    }
+
+
+    int id() const
+    {
+        return id_;
+    }
+    
+
+    void cancel(int id) // TODO OBS id
+    {
+        if (id != id_)
+            {
+                ITK_PANIC("Messed up cancel id");
+            }
+
+        cancelled_ = true;
+        MSG("JOB %d - cancelled\n",
+            id_);
+    }
+
+    
+    void doJob()
+    {
+        MSG("Starting JOB %d - %d rounds\n",
+            id_,
+            rounds_);
+
+        for (; rounds_ > 0 && !cancelled_; --rounds_)
+            {
+                sleep(1);
+
+                MSG("JOB %d : %d rounds to go\n",
+                    id_,
+                    rounds_);
+            }
+
+        MSG("JOB %d - %s\n",
+            id_,
+            cancelled_ ? "CANCELLED" : "COMPLETED");
+    }
+    
+};
+
+
+typedef Cpt::JobQueue<Job*> TestJobQueue;
+
+class JobVector : public std::vector<Job*>, public Cpt::SyncEntity
+{
+public:
+
+    ~JobVector()
+    {
+        std::vector<Job*>::iterator
+            i = begin(),
+            e = end();
+
+        for (; i != e; ++i)
+            {
+                delete *i;
+            }
+    }
+};
+
+
+
+struct ThreadParam
+{
+    TestJobQueue    * q_;
+    JobVector       * v_;
+    Itk::TestMgr    * testMgr_;
+};
+
+
+static void * producer(void * param)
+{
+    using namespace std;
+
+    struct ThreadParam
+        * p = reinterpret_cast<ThreadParam *>(param);
+
+    MSG("Producer thread started\n");
+
+    try
+        {
+            for (int i = 0; i < 10; ++i)
+                {
+                    std::auto_ptr<Job>
+                        job(new Job(i,
+                                    i));
+
+                    { // SYNC
+                        Cpt::SyncRegion
+                            sr(*p->v_);
+
+                        p->v_->push_back(job.get());
+                        p->q_->put(job.get());
+                    } // SYNC
+
+                    job.release();
+
+                    MSG("Producer thread created job #%d\n",
+                        i);
+                    
+                    ITK_DBGMSG(p->testMgr_, ":");
+
+                    sleep(1);
+                }
+
+            sleep(3);
+
+            for (int i = 0; i < 5; ++i)
+                {
+                    { // SYNC
+                        Cpt::SyncRegion
+                            sr(*p->v_);
+
+                        int
+                            id = 2*i;
+
+                        Job
+                            * job = NULL;
+                        bool
+                            found = p->q_->findJob(compose1(bind2nd(equal_to<int>(),
+                                                                    id),
+                                                            mem_fun(&Job::id)),
+                                                   &job);
+
+                        if (found)
+                            {
+                                job->cancel(id);
+                                MSG("Producer thread cancelled job #%d\n",
+                                    id);
+                            }
+                        else
+                            {
+                                MSG("Started job was not found");
+                            }
+
+                        ITK_DBGMSG(p->testMgr_, "!");
+
+                    } // SYNC
+                }
+
+            sleep(6);
+
+            p->q_->stopProcessing();
+            
+        }
+    catch (...)
+        {
+            MSG("Producer thread: cancelled\n");
+        }
+
+    MSG("Producer thread stopped\n");
+
+    return NULL;
+}
+
+
+
+static void * consumer(void * param)
+{
+    struct ThreadParam
+        * p = reinterpret_cast<ThreadParam *>(param);
+
+    MSG("Consumer thread started\n");
+
+    try
+        {
+            bool
+                result = true;
+
+            while (result)
+                {
+                    Job
+                        * job = NULL;
+                    
+                    MSG("Consumer thread getting ...\n");
+
+                    result = p->q_->get(&job);
+
+                    MSG("Consumer thread has got (%d) ...\n",
+                        result);
+                    
+                    ITK_DBGMSG(p->testMgr_, ".");
+
+                    if (result)
+                        {
+                            job->doJob();
+                            p->q_->jobCompleted(job);
+                        }
+                }
+        }
+    catch (...)
+        {
+            MSG("Consumer thread: cancelled\n");
+        }
+
+    MSG("Consumer thread stopped\n");
+
+    return NULL;
+}
+
+
+
+void testCancel(Itk::TestMgr * testMgr)
+{
+    TestJobQueue
+        jobQueue(10);
+    JobVector
+        jobVector;
+    
+    struct ThreadParam threadParam = 
+        {
+            &jobQueue,
+            &jobVector,
+            testMgr
+        };
+
+    std::list<pthread_t>
+        threadHndls;
+
+    int
+        result;
+
+    MSG("Creating 1 producer and 1 consumer\n");
+
+    pthread_t
+        threadHndl;
+    
+    result = pthread_create(&threadHndl,
+                            NULL,
+                            &producer,
+                            &threadParam);
+    ITK_ASSERT(testMgr,
+               result == 0,
+               "Could not create producer thread");
+    
+    threadHndls.push_back(threadHndl);
+    
+    result = pthread_create(&threadHndl,
+                            NULL,
+                            &consumer,
+                            &threadParam);
+    
+    ITK_ASSERT(testMgr,
+               result == 0,
+               "Could not create consumer thread");
+
+    threadHndls.push_back(threadHndl);
+
+    MSG("joining threads\n");
+
+    int
+        jIdx = 0;
+
+    std::list<pthread_t>::iterator
+        i = threadHndls.begin(),
+        end = threadHndls.end();
+    for (; i != end; ++i)
+        {
+            void
+                * retVal;
+
+            MSG("j:%d\n",
+                jIdx);
+
+            result = pthread_join(*i,
+                                  &retVal);
+            ITK_EXPECT(testMgr,
+                       result == 0,
+                       "Failed to join %s thread %d",
+                       ((jIdx % 2) == 0 ? "producer" : "consumer"),
+                       jIdx);
+
+            ++jIdx;
+        }
+
+    MSG("Joined all workers\n");
+    MSG("session done.\n");
+}
+
+
+
+Itk::TesterBase * CreateJobQueueTests()
+{
+    using namespace Itk;
+
+    SuiteTester
+        * jobQueueTests = new SuiteTester("jobqueue");
+
+#define TEST "cancel"
+    jobQueueTests->add(TEST,
+                       testCancel,
+                       TEST,
+                       SuiteTester::REDIRECT_ONLY);
+#undef TEST
+
+
+    // ... add more tests to suite
+    
+    return jobQueueTests;
+    
+}