searchengine/util/tsrc/cpixtoolsunittest/src/jobqueuetests.cpp
author Dremov Kirill (Nokia-D-MSW/Tampere) <kirill.dremov@nokia.com>
Mon, 19 Apr 2010 14:40:16 +0300
changeset 0 671dee74050a
permissions -rw-r--r--
Revision: 201011 Kit: 201015

/*
* Copyright (c) 2010 Nokia Corporation and/or its subsidiary(-ies).
* All rights reserved.
* This component and the accompanying materials are made available
* under the terms of "Eclipse Public License v1.0"
* which accompanies this distribution, and is available
* at the URL "http://www.eclipse.org/legal/epl-v10.html".
*
* Initial Contributors:
* Nokia Corporation - initial contribution.
*
* Contributors:
*
* Description: 
*
*/

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <list>
#include <vector>

#include "itk.h"

#include "cpixjobqueue.h"

#define MSG(fmt, args...)  printf(fmt, ##args); fflush(stdout);

class Job
{
    int        id_;
    int        rounds_;

    bool       cancelled_;
    
public:
    Job(int id,
        int rounds)
        : id_(id),
          rounds_(rounds),
          cancelled_(false)
    {
        ;
    }


    int id() const
    {
        return id_;
    }
    

    void cancel(int id) // TODO OBS id
    {
        if (id != id_)
            {
                ITK_PANIC("Messed up cancel id");
            }

        cancelled_ = true;
        MSG("JOB %d - cancelled\n",
            id_);
    }

    
    void doJob()
    {
        MSG("Starting JOB %d - %d rounds\n",
            id_,
            rounds_);

        for (; rounds_ > 0 && !cancelled_; --rounds_)
            {
                sleep(1);

                MSG("JOB %d : %d rounds to go\n",
                    id_,
                    rounds_);
            }

        MSG("JOB %d - %s\n",
            id_,
            cancelled_ ? "CANCELLED" : "COMPLETED");
    }
    
};


typedef Cpt::JobQueue<Job*> TestJobQueue;

class JobVector : public std::vector<Job*>, public Cpt::SyncEntity
{
public:

    ~JobVector()
    {
        std::vector<Job*>::iterator
            i = begin(),
            e = end();

        for (; i != e; ++i)
            {
                delete *i;
            }
    }
};



struct ThreadParam
{
    TestJobQueue    * q_;
    JobVector       * v_;
    Itk::TestMgr    * testMgr_;
};


static void * producer(void * param)
{
    using namespace std;

    struct ThreadParam
        * p = reinterpret_cast<ThreadParam *>(param);

    MSG("Producer thread started\n");

    try
        {
            for (int i = 0; i < 10; ++i)
                {
                    std::auto_ptr<Job>
                        job(new Job(i,
                                    i));

                    { // SYNC
                        Cpt::SyncRegion
                            sr(*p->v_);

                        p->v_->push_back(job.get());
                        p->q_->put(job.get());
                    } // SYNC

                    job.release();

                    MSG("Producer thread created job #%d\n",
                        i);
                    
                    ITK_DBGMSG(p->testMgr_, ":");

                    sleep(1);
                }

            sleep(3);

            for (int i = 0; i < 5; ++i)
                {
                    { // SYNC
                        Cpt::SyncRegion
                            sr(*p->v_);

                        int
                            id = 2*i;

                        Job
                            * job = NULL;
                        bool
                            found = p->q_->findJob(compose1(bind2nd(equal_to<int>(),
                                                                    id),
                                                            mem_fun(&Job::id)),
                                                   &job);

                        if (found)
                            {
                                job->cancel(id);
                                MSG("Producer thread cancelled job #%d\n",
                                    id);
                            }
                        else
                            {
                                MSG("Started job was not found");
                            }

                        ITK_DBGMSG(p->testMgr_, "!");

                    } // SYNC
                }

            sleep(6);

            p->q_->stopProcessing();
            
        }
    catch (...)
        {
            MSG("Producer thread: cancelled\n");
        }

    MSG("Producer thread stopped\n");

    return NULL;
}



static void * consumer(void * param)
{
    struct ThreadParam
        * p = reinterpret_cast<ThreadParam *>(param);

    MSG("Consumer thread started\n");

    try
        {
            bool
                result = true;

            while (result)
                {
                    Job
                        * job = NULL;
                    
                    MSG("Consumer thread getting ...\n");

                    result = p->q_->get(&job);

                    MSG("Consumer thread has got (%d) ...\n",
                        result);
                    
                    ITK_DBGMSG(p->testMgr_, ".");

                    if (result)
                        {
                            job->doJob();
                            p->q_->jobCompleted(job);
                        }
                }
        }
    catch (...)
        {
            MSG("Consumer thread: cancelled\n");
        }

    MSG("Consumer thread stopped\n");

    return NULL;
}



void testCancel(Itk::TestMgr * testMgr)
{
    TestJobQueue
        jobQueue(10);
    JobVector
        jobVector;
    
    struct ThreadParam threadParam = 
        {
            &jobQueue,
            &jobVector,
            testMgr
        };

    std::list<pthread_t>
        threadHndls;

    int
        result;

    MSG("Creating 1 producer and 1 consumer\n");

    pthread_t
        threadHndl;
    
    result = pthread_create(&threadHndl,
                            NULL,
                            &producer,
                            &threadParam);
    ITK_ASSERT(testMgr,
               result == 0,
               "Could not create producer thread");
    
    threadHndls.push_back(threadHndl);
    
    result = pthread_create(&threadHndl,
                            NULL,
                            &consumer,
                            &threadParam);
    
    ITK_ASSERT(testMgr,
               result == 0,
               "Could not create consumer thread");

    threadHndls.push_back(threadHndl);

    MSG("joining threads\n");

    int
        jIdx = 0;

    std::list<pthread_t>::iterator
        i = threadHndls.begin(),
        end = threadHndls.end();
    for (; i != end; ++i)
        {
            void
                * retVal;

            MSG("j:%d\n",
                jIdx);

            result = pthread_join(*i,
                                  &retVal);
            ITK_EXPECT(testMgr,
                       result == 0,
                       "Failed to join %s thread %d",
                       ((jIdx % 2) == 0 ? "producer" : "consumer"),
                       jIdx);

            ++jIdx;
        }

    MSG("Joined all workers\n");
    MSG("session done.\n");
}



Itk::TesterBase * CreateJobQueueTests()
{
    using namespace Itk;

    SuiteTester
        * jobQueueTests = new SuiteTester("jobqueue");

#define TEST "cancel"
    jobQueueTests->add(TEST,
                       testCancel,
                       TEST,
                       SuiteTester::REDIRECT_ONLY);
#undef TEST


    // ... add more tests to suite
    
    return jobQueueTests;
    
}