searchengine/util/tsrc/cpixtoolsunittest/src/jobqueuetests.cpp
changeset 0 671dee74050a
equal deleted inserted replaced
-1:000000000000 0:671dee74050a
       
     1 /*
       
     2 * Copyright (c) 2010 Nokia Corporation and/or its subsidiary(-ies).
       
     3 * All rights reserved.
       
     4 * This component and the accompanying materials are made available
       
     5 * under the terms of "Eclipse Public License v1.0"
       
     6 * which accompanies this distribution, and is available
       
     7 * at the URL "http://www.eclipse.org/legal/epl-v10.html".
       
     8 *
       
     9 * Initial Contributors:
       
    10 * Nokia Corporation - initial contribution.
       
    11 *
       
    12 * Contributors:
       
    13 *
       
    14 * Description: 
       
    15 *
       
    16 */
       
    17 
       
    18 #include <unistd.h>
       
    19 #include <stdio.h>
       
    20 #include <stdlib.h>
       
    21 #include <string.h>
       
    22 
       
    23 #include <list>
       
    24 #include <vector>
       
    25 
       
    26 #include "itk.h"
       
    27 
       
    28 #include "cpixjobqueue.h"
       
    29 
       
    30 #define MSG(fmt, args...)  printf(fmt, ##args); fflush(stdout);
       
    31 
       
    32 class Job
       
    33 {
       
    34     int        id_;
       
    35     int        rounds_;
       
    36 
       
    37     bool       cancelled_;
       
    38     
       
    39 public:
       
    40     Job(int id,
       
    41         int rounds)
       
    42         : id_(id),
       
    43           rounds_(rounds),
       
    44           cancelled_(false)
       
    45     {
       
    46         ;
       
    47     }
       
    48 
       
    49 
       
    50     int id() const
       
    51     {
       
    52         return id_;
       
    53     }
       
    54     
       
    55 
       
    56     void cancel(int id) // TODO OBS id
       
    57     {
       
    58         if (id != id_)
       
    59             {
       
    60                 ITK_PANIC("Messed up cancel id");
       
    61             }
       
    62 
       
    63         cancelled_ = true;
       
    64         MSG("JOB %d - cancelled\n",
       
    65             id_);
       
    66     }
       
    67 
       
    68     
       
    69     void doJob()
       
    70     {
       
    71         MSG("Starting JOB %d - %d rounds\n",
       
    72             id_,
       
    73             rounds_);
       
    74 
       
    75         for (; rounds_ > 0 && !cancelled_; --rounds_)
       
    76             {
       
    77                 sleep(1);
       
    78 
       
    79                 MSG("JOB %d : %d rounds to go\n",
       
    80                     id_,
       
    81                     rounds_);
       
    82             }
       
    83 
       
    84         MSG("JOB %d - %s\n",
       
    85             id_,
       
    86             cancelled_ ? "CANCELLED" : "COMPLETED");
       
    87     }
       
    88     
       
    89 };
       
    90 
       
    91 
       
    92 typedef Cpt::JobQueue<Job*> TestJobQueue;
       
    93 
       
    94 class JobVector : public std::vector<Job*>, public Cpt::SyncEntity
       
    95 {
       
    96 public:
       
    97 
       
    98     ~JobVector()
       
    99     {
       
   100         std::vector<Job*>::iterator
       
   101             i = begin(),
       
   102             e = end();
       
   103 
       
   104         for (; i != e; ++i)
       
   105             {
       
   106                 delete *i;
       
   107             }
       
   108     }
       
   109 };
       
   110 
       
   111 
       
   112 
       
   113 struct ThreadParam
       
   114 {
       
   115     TestJobQueue    * q_;
       
   116     JobVector       * v_;
       
   117     Itk::TestMgr    * testMgr_;
       
   118 };
       
   119 
       
   120 
       
   121 static void * producer(void * param)
       
   122 {
       
   123     using namespace std;
       
   124 
       
   125     struct ThreadParam
       
   126         * p = reinterpret_cast<ThreadParam *>(param);
       
   127 
       
   128     MSG("Producer thread started\n");
       
   129 
       
   130     try
       
   131         {
       
   132             for (int i = 0; i < 10; ++i)
       
   133                 {
       
   134                     std::auto_ptr<Job>
       
   135                         job(new Job(i,
       
   136                                     i));
       
   137 
       
   138                     { // SYNC
       
   139                         Cpt::SyncRegion
       
   140                             sr(*p->v_);
       
   141 
       
   142                         p->v_->push_back(job.get());
       
   143                         p->q_->put(job.get());
       
   144                     } // SYNC
       
   145 
       
   146                     job.release();
       
   147 
       
   148                     MSG("Producer thread created job #%d\n",
       
   149                         i);
       
   150                     
       
   151                     ITK_DBGMSG(p->testMgr_, ":");
       
   152 
       
   153                     sleep(1);
       
   154                 }
       
   155 
       
   156             sleep(3);
       
   157 
       
   158             for (int i = 0; i < 5; ++i)
       
   159                 {
       
   160                     { // SYNC
       
   161                         Cpt::SyncRegion
       
   162                             sr(*p->v_);
       
   163 
       
   164                         int
       
   165                             id = 2*i;
       
   166 
       
   167                         Job
       
   168                             * job = NULL;
       
   169                         bool
       
   170                             found = p->q_->findJob(compose1(bind2nd(equal_to<int>(),
       
   171                                                                     id),
       
   172                                                             mem_fun(&Job::id)),
       
   173                                                    &job);
       
   174 
       
   175                         if (found)
       
   176                             {
       
   177                                 job->cancel(id);
       
   178                                 MSG("Producer thread cancelled job #%d\n",
       
   179                                     id);
       
   180                             }
       
   181                         else
       
   182                             {
       
   183                                 MSG("Started job was not found");
       
   184                             }
       
   185 
       
   186                         ITK_DBGMSG(p->testMgr_, "!");
       
   187 
       
   188                     } // SYNC
       
   189                 }
       
   190 
       
   191             sleep(6);
       
   192 
       
   193             p->q_->stopProcessing();
       
   194             
       
   195         }
       
   196     catch (...)
       
   197         {
       
   198             MSG("Producer thread: cancelled\n");
       
   199         }
       
   200 
       
   201     MSG("Producer thread stopped\n");
       
   202 
       
   203     return NULL;
       
   204 }
       
   205 
       
   206 
       
   207 
       
   208 static void * consumer(void * param)
       
   209 {
       
   210     struct ThreadParam
       
   211         * p = reinterpret_cast<ThreadParam *>(param);
       
   212 
       
   213     MSG("Consumer thread started\n");
       
   214 
       
   215     try
       
   216         {
       
   217             bool
       
   218                 result = true;
       
   219 
       
   220             while (result)
       
   221                 {
       
   222                     Job
       
   223                         * job = NULL;
       
   224                     
       
   225                     MSG("Consumer thread getting ...\n");
       
   226 
       
   227                     result = p->q_->get(&job);
       
   228 
       
   229                     MSG("Consumer thread has got (%d) ...\n",
       
   230                         result);
       
   231                     
       
   232                     ITK_DBGMSG(p->testMgr_, ".");
       
   233 
       
   234                     if (result)
       
   235                         {
       
   236                             job->doJob();
       
   237                             p->q_->jobCompleted(job);
       
   238                         }
       
   239                 }
       
   240         }
       
   241     catch (...)
       
   242         {
       
   243             MSG("Consumer thread: cancelled\n");
       
   244         }
       
   245 
       
   246     MSG("Consumer thread stopped\n");
       
   247 
       
   248     return NULL;
       
   249 }
       
   250 
       
   251 
       
   252 
       
   253 void testCancel(Itk::TestMgr * testMgr)
       
   254 {
       
   255     TestJobQueue
       
   256         jobQueue(10);
       
   257     JobVector
       
   258         jobVector;
       
   259     
       
   260     struct ThreadParam threadParam = 
       
   261         {
       
   262             &jobQueue,
       
   263             &jobVector,
       
   264             testMgr
       
   265         };
       
   266 
       
   267     std::list<pthread_t>
       
   268         threadHndls;
       
   269 
       
   270     int
       
   271         result;
       
   272 
       
   273     MSG("Creating 1 producer and 1 consumer\n");
       
   274 
       
   275     pthread_t
       
   276         threadHndl;
       
   277     
       
   278     result = pthread_create(&threadHndl,
       
   279                             NULL,
       
   280                             &producer,
       
   281                             &threadParam);
       
   282     ITK_ASSERT(testMgr,
       
   283                result == 0,
       
   284                "Could not create producer thread");
       
   285     
       
   286     threadHndls.push_back(threadHndl);
       
   287     
       
   288     result = pthread_create(&threadHndl,
       
   289                             NULL,
       
   290                             &consumer,
       
   291                             &threadParam);
       
   292     
       
   293     ITK_ASSERT(testMgr,
       
   294                result == 0,
       
   295                "Could not create consumer thread");
       
   296 
       
   297     threadHndls.push_back(threadHndl);
       
   298 
       
   299     MSG("joining threads\n");
       
   300 
       
   301     int
       
   302         jIdx = 0;
       
   303 
       
   304     std::list<pthread_t>::iterator
       
   305         i = threadHndls.begin(),
       
   306         end = threadHndls.end();
       
   307     for (; i != end; ++i)
       
   308         {
       
   309             void
       
   310                 * retVal;
       
   311 
       
   312             MSG("j:%d\n",
       
   313                 jIdx);
       
   314 
       
   315             result = pthread_join(*i,
       
   316                                   &retVal);
       
   317             ITK_EXPECT(testMgr,
       
   318                        result == 0,
       
   319                        "Failed to join %s thread %d",
       
   320                        ((jIdx % 2) == 0 ? "producer" : "consumer"),
       
   321                        jIdx);
       
   322 
       
   323             ++jIdx;
       
   324         }
       
   325 
       
   326     MSG("Joined all workers\n");
       
   327     MSG("session done.\n");
       
   328 }
       
   329 
       
   330 
       
   331 
       
   332 Itk::TesterBase * CreateJobQueueTests()
       
   333 {
       
   334     using namespace Itk;
       
   335 
       
   336     SuiteTester
       
   337         * jobQueueTests = new SuiteTester("jobqueue");
       
   338 
       
   339 #define TEST "cancel"
       
   340     jobQueueTests->add(TEST,
       
   341                        testCancel,
       
   342                        TEST,
       
   343                        SuiteTester::REDIRECT_ONLY);
       
   344 #undef TEST
       
   345 
       
   346 
       
   347     // ... add more tests to suite
       
   348     
       
   349     return jobQueueTests;
       
   350     
       
   351 }