src/corelib/concurrent/qthreadpool.cpp
changeset 0 1918ee327afb
child 4 3b1da2848fc7
equal deleted inserted replaced
-1:000000000000 0:1918ee327afb
       
     1 /****************************************************************************
       
     2 **
       
     3 ** Copyright (C) 2009 Nokia Corporation and/or its subsidiary(-ies).
       
     4 ** All rights reserved.
       
     5 ** Contact: Nokia Corporation (qt-info@nokia.com)
       
     6 **
       
     7 ** This file is part of the QtCore module of the Qt Toolkit.
       
     8 **
       
     9 ** $QT_BEGIN_LICENSE:LGPL$
       
    10 ** No Commercial Usage
       
    11 ** This file contains pre-release code and may not be distributed.
       
    12 ** You may use this file in accordance with the terms and conditions
       
    13 ** contained in the Technology Preview License Agreement accompanying
       
    14 ** this package.
       
    15 **
       
    16 ** GNU Lesser General Public License Usage
       
    17 ** Alternatively, this file may be used under the terms of the GNU Lesser
       
    18 ** General Public License version 2.1 as published by the Free Software
       
    19 ** Foundation and appearing in the file LICENSE.LGPL included in the
       
    20 ** packaging of this file.  Please review the following information to
       
    21 ** ensure the GNU Lesser General Public License version 2.1 requirements
       
    22 ** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
       
    23 **
       
    24 ** In addition, as a special exception, Nokia gives you certain additional
       
    25 ** rights.  These rights are described in the Nokia Qt LGPL Exception
       
    26 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
       
    27 **
       
    28 ** If you have questions regarding the use of this file, please contact
       
    29 ** Nokia at qt-info@nokia.com.
       
    30 **
       
    31 **
       
    32 **
       
    33 **
       
    34 **
       
    35 **
       
    36 **
       
    37 **
       
    38 ** $QT_END_LICENSE$
       
    39 **
       
    40 ****************************************************************************/
       
    41 
       
    42 #include "qthreadpool.h"
       
    43 #include "qthreadpool_p.h"
       
    44 
       
    45 #ifndef QT_NO_THREAD
       
    46 
       
    47 QT_BEGIN_NAMESPACE
       
    48 
       
    49 inline bool operator<(int priority, const QPair<QRunnable *, int> &p)
       
    50 {
       
    51     return p.second < priority;
       
    52 }
       
    53 inline bool operator<(const QPair<QRunnable *, int> &p, int priority)
       
    54 {
       
    55     return priority < p.second;
       
    56 }
       
    57 
       
    58 Q_GLOBAL_STATIC(QThreadPool, theInstance)
       
    59 
       
    60 /*
       
    61     QThread wrapper, provides synchronizitaion against a ThreadPool
       
    62 */
       
    63 class QThreadPoolThread : public QThread
       
    64 {
       
    65 public:
       
    66     QThreadPoolThread(QThreadPoolPrivate *manager);
       
    67     void run();
       
    68     void registerTheadInactive();
       
    69 
       
    70     QThreadPoolPrivate *manager;
       
    71     QRunnable *runnable;
       
    72 };
       
    73 
       
    74 /*
       
    75     QThreadPool private class.
       
    76 */
       
    77 
       
    78 
       
    79 /*!\internal
       
    80 
       
    81 */
       
    82 QThreadPoolThread::QThreadPoolThread(QThreadPoolPrivate *manager)
       
    83     :manager(manager), runnable(0)
       
    84 { }
       
    85 
       
    86 /* \internal
       
    87 
       
    88 */
       
    89 void QThreadPoolThread::run()
       
    90 {
       
    91     QMutexLocker locker(&manager->mutex);
       
    92     for(;;) {
       
    93         QRunnable *r = runnable;
       
    94         runnable = 0;
       
    95 
       
    96         do {
       
    97             if (r) {
       
    98                 const bool autoDelete = r->autoDelete();
       
    99 
       
   100 
       
   101                 // run the task
       
   102                 locker.unlock();
       
   103 #ifndef QT_NO_EXCEPTIONS
       
   104                 try {
       
   105 #endif
       
   106                     r->run();
       
   107 #ifndef QT_NO_EXCEPTIONS
       
   108                 } catch (...) {
       
   109                     qWarning("Qt Concurrent has caught an exception thrown from a worker thread.\n"
       
   110                              "This is not supported, exceptions thrown in worker threads must be\n"
       
   111                              "caught before control returns to Qt Concurrent.");
       
   112                     registerTheadInactive();
       
   113                     throw;
       
   114                 }
       
   115 #endif
       
   116                 locker.relock();
       
   117 
       
   118                 if (autoDelete && !--r->ref)
       
   119                     delete r;
       
   120             }
       
   121 
       
   122             // if too many threads are active, expire this thread
       
   123             if (manager->tooManyThreadsActive())
       
   124                 break;
       
   125 
       
   126             r = !manager->queue.isEmpty() ? manager->queue.takeFirst().first : 0;
       
   127         } while (r != 0);
       
   128 
       
   129         if (manager->isExiting) {
       
   130             registerTheadInactive();
       
   131             break;
       
   132         }
       
   133 
       
   134         // if too many threads are active, expire this thread
       
   135         bool expired = manager->tooManyThreadsActive();
       
   136         if (!expired) {
       
   137             ++manager->waitingThreads;
       
   138             registerTheadInactive();
       
   139             // wait for work, exiting after the expiry timeout is reached
       
   140             expired = !manager->runnableReady.wait(locker.mutex(), manager->expiryTimeout);
       
   141             ++manager->activeThreads;
       
   142     
       
   143             if (expired)
       
   144                 --manager->waitingThreads;
       
   145         }
       
   146         if (expired) {
       
   147             manager->expiredThreads.enqueue(this);
       
   148             registerTheadInactive();
       
   149             break;
       
   150         }
       
   151     }
       
   152 }
       
   153 
       
   154 void QThreadPoolThread::registerTheadInactive()
       
   155 {
       
   156     if (--manager->activeThreads == 0)
       
   157         manager->noActiveThreads.wakeAll();
       
   158 }
       
   159 
       
   160 
       
   161 /* \internal
       
   162 
       
   163 */
       
   164 QThreadPoolPrivate:: QThreadPoolPrivate()
       
   165     : isExiting(false),
       
   166       expiryTimeout(30000),
       
   167       maxThreadCount(qAbs(QThread::idealThreadCount())),
       
   168       reservedThreads(0),
       
   169       waitingThreads(0),
       
   170       activeThreads(0)
       
   171 { }
       
   172 
       
   173 bool QThreadPoolPrivate::tryStart(QRunnable *task)
       
   174 {
       
   175     if (allThreads.isEmpty()) {
       
   176         // always create at least one thread
       
   177         startThread(task);
       
   178         return true;
       
   179     }
       
   180 
       
   181     // can't do anything if we're over the limit
       
   182     if (activeThreadCount() >= maxThreadCount)
       
   183         return false;
       
   184 
       
   185     if (waitingThreads > 0) {
       
   186         // recycle an available thread
       
   187         --waitingThreads;
       
   188         enqueueTask(task);
       
   189         return true;
       
   190     }
       
   191 
       
   192     if (!expiredThreads.isEmpty()) {
       
   193         // restart an expired thread
       
   194         QThreadPoolThread *thread = expiredThreads.dequeue();
       
   195         Q_ASSERT(thread->runnable == 0);
       
   196 
       
   197         ++activeThreads;
       
   198 
       
   199         if (task->autoDelete())
       
   200             ++task->ref;
       
   201         thread->runnable = task;
       
   202         thread->start();
       
   203         return true;
       
   204     }
       
   205 
       
   206     // start a new thread
       
   207     startThread(task);
       
   208     return true;
       
   209 }
       
   210 
       
   211 void QThreadPoolPrivate::enqueueTask(QRunnable *runnable, int priority)
       
   212 {
       
   213     if (runnable->autoDelete())
       
   214         ++runnable->ref;
       
   215 
       
   216     // put it on the queue
       
   217     QList<QPair<QRunnable *, int> >::iterator at =
       
   218         qUpperBound(queue.begin(), queue.end(), priority);
       
   219     queue.insert(at, qMakePair(runnable, priority));
       
   220     runnableReady.wakeOne();
       
   221 }
       
   222 
       
   223 int QThreadPoolPrivate::activeThreadCount() const
       
   224 {
       
   225     // To improve scalability this function is called without holding 
       
   226     // the mutex lock -- keep it thread-safe.
       
   227     return (allThreads.count()
       
   228             - expiredThreads.count()
       
   229             - waitingThreads
       
   230             + reservedThreads);
       
   231 }
       
   232 
       
   233 void QThreadPoolPrivate::tryToStartMoreThreads()
       
   234 {
       
   235     // try to push tasks on the queue to any available threads
       
   236     while (!queue.isEmpty() && tryStart(queue.first().first))
       
   237         queue.removeFirst();
       
   238 }
       
   239 
       
   240 bool QThreadPoolPrivate::tooManyThreadsActive() const
       
   241 {
       
   242     const int activeThreadCount = this->activeThreadCount();
       
   243     return activeThreadCount > maxThreadCount && (activeThreadCount - reservedThreads) > 1;
       
   244 }
       
   245 
       
   246 /*! \internal
       
   247 
       
   248 */
       
   249 void QThreadPoolPrivate::startThread(QRunnable *runnable)
       
   250 {
       
   251     QScopedPointer <QThreadPoolThread> thread(new QThreadPoolThread(this));
       
   252     allThreads.insert(thread.data());
       
   253     ++activeThreads;
       
   254 
       
   255     if (runnable->autoDelete())
       
   256         ++runnable->ref;
       
   257     thread->runnable = runnable;
       
   258     thread.take()->start();
       
   259 }
       
   260 
       
   261 /*! \internal
       
   262     Makes all threads exit, waits for each tread to exit and deletes it.
       
   263 */
       
   264 void QThreadPoolPrivate::reset()
       
   265 {
       
   266     QMutexLocker locker(&mutex);
       
   267     isExiting = true;
       
   268     runnableReady.wakeAll();
       
   269 
       
   270     do {
       
   271         // make a copy of the set so that we can iterate without the lock
       
   272         QSet<QThreadPoolThread *> allThreadsCopy = allThreads;
       
   273         allThreads.clear();
       
   274         locker.unlock();
       
   275 
       
   276         foreach (QThreadPoolThread *thread, allThreadsCopy) {
       
   277             thread->wait();
       
   278             delete thread;
       
   279         }
       
   280 
       
   281         locker.relock();
       
   282         // repeat until all newly arrived threads have also completed
       
   283     } while (!allThreads.isEmpty());
       
   284 
       
   285     waitingThreads = 0;
       
   286     expiredThreads.clear();
       
   287 
       
   288     isExiting = false;
       
   289 }
       
   290 
       
   291 void QThreadPoolPrivate::waitForDone()
       
   292 {
       
   293     QMutexLocker locker(&mutex);
       
   294     while (!(queue.isEmpty() && activeThreads == 0))
       
   295         noActiveThreads.wait(locker.mutex());
       
   296 }
       
   297 
       
   298 /*! \internal
       
   299     Pulls a runnable from the front queue and runs it in the current thread. Blocks
       
   300     until the runnable has completed. Returns true if a runnable was found.
       
   301 */
       
   302 bool QThreadPoolPrivate::startFrontRunnable()
       
   303 {
       
   304     QMutexLocker locker(&mutex);
       
   305     if (queue.isEmpty())
       
   306         return false;
       
   307 
       
   308     QRunnable *runnable = queue.takeFirst().first;
       
   309     const bool autoDelete = runnable->autoDelete();
       
   310     bool del = autoDelete && !--runnable->ref;
       
   311 
       
   312     locker.unlock();
       
   313     runnable->run();
       
   314     locker.relock();
       
   315 
       
   316     if (del) {
       
   317         delete runnable;
       
   318     }
       
   319 
       
   320     return true;
       
   321 }
       
   322 
       
   323 /*! \internal
       
   324     Seaches for \a runnable in the queue, removes it from the queue and
       
   325     runs it if found. This functon does not return until the runnable
       
   326     has completed.
       
   327 */
       
   328 void QThreadPoolPrivate::stealRunnable(QRunnable *runnable)
       
   329 {
       
   330     if (runnable == 0 || queue.isEmpty())
       
   331         return;
       
   332     bool found = false;
       
   333     {
       
   334         QMutexLocker locker(&mutex);
       
   335         QList<QPair<QRunnable *, int> >::iterator it = queue.begin();
       
   336         QList<QPair<QRunnable *, int> >::iterator end = queue.end();
       
   337 
       
   338         while (it != end) {
       
   339             if (it->first == runnable) {
       
   340                 found = true;
       
   341                 queue.erase(it);
       
   342                 break;
       
   343             }
       
   344             ++it;
       
   345         }
       
   346     }
       
   347 
       
   348     if (!found)
       
   349         return;
       
   350 
       
   351     const bool autoDelete = runnable->autoDelete();
       
   352     bool del = autoDelete && !--runnable->ref;
       
   353 
       
   354     runnable->run();
       
   355 
       
   356     if (del) {
       
   357         delete runnable;
       
   358     }
       
   359 }
       
   360 
       
   361 /*!
       
   362     \class QThreadPool
       
   363     \brief The QThreadPool class manages a collection of QThreads.
       
   364     \since 4.4
       
   365     \threadsafe
       
   366 
       
   367     \ingroup thread
       
   368 
       
   369     QThreadPool manages and recyles individual QThread objects to help reduce
       
   370     thread creation costs in programs that use threads. Each Qt application
       
   371     has one global QThreadPool object, which can be accessed by calling
       
   372     globalInstance().
       
   373 
       
   374     To use one of the QThreadPool threads, subclass QRunnable and implement
       
   375     the run() virtual function. Then create an object of that class and pass
       
   376     it to QThreadPool::start().
       
   377 
       
   378     \snippet doc/src/snippets/code/src_corelib_concurrent_qthreadpool.cpp 0
       
   379 
       
   380     QThreadPool deletes the QRunnable automatically by default. Use 
       
   381     QRunnable::setAutoDelete() to change the auto-deletion flag.
       
   382 
       
   383     QThreadPool supports executing the same QRunnable more than once
       
   384     by calling tryStart(this) from within QRunnable::run(). 
       
   385     If autoDelete is enabled the QRunnable will be deleted when
       
   386     the last thread exits the run function. Calling start()
       
   387     multiple times with the same QRunnable when autoDelete is enabled
       
   388     creates a race condition and is not recommended.
       
   389 
       
   390     Threads that are unused for a certain amount of time will expire. The
       
   391     default expiry timeout is 30000 milliseconds (30 seconds). This can be
       
   392     changed using setExpiryTimeout(). Setting a negative expiry timeout
       
   393     disables the expiry mechanism.
       
   394 
       
   395     Call maxThreadCount() to query the maximum number of threads to be used.
       
   396     If needed, you can change the limit with setMaxThreadCount(). The default
       
   397     maxThreadCount() is QThread::idealThreadCount(). The activeThreadCount()
       
   398     function returns the number of threads currently doing work.
       
   399 
       
   400     The reserveThread() function reserves a thread for external
       
   401     use. Use releaseThread() when your are done with the thread, so
       
   402     that it may be reused.  Essentially, these functions temporarily
       
   403     increase or reduce the active thread count and are useful when
       
   404     implementing time-consuming operations that are not visible to the
       
   405     QThreadPool.
       
   406 
       
   407     Note that QThreadPool is a low-level class for managing threads, see
       
   408     QtConcurrent::run() or the other
       
   409     \l {Concurrent Programming}{Qt Concurrent} APIs for higher
       
   410     level alternatives.
       
   411 
       
   412     \sa QRunnable
       
   413 */
       
   414 
       
   415 /*!
       
   416     Constructs a thread pool with the given \a parent.
       
   417 */
       
   418 QThreadPool::QThreadPool(QObject *parent)
       
   419     : QObject(*new QThreadPoolPrivate, parent)
       
   420 { }
       
   421 
       
   422 /*!
       
   423     Destroys the QThreadPool.
       
   424     This function will block until all runnables have been completed.
       
   425 */
       
   426 QThreadPool::~QThreadPool()
       
   427 {
       
   428     d_func()->waitForDone();
       
   429     d_func()->reset();
       
   430 }
       
   431 
       
   432 /*!
       
   433     Returns the global QThreadPool instance.
       
   434 */
       
   435 QThreadPool *QThreadPool::globalInstance()
       
   436 {
       
   437     return theInstance();
       
   438 }
       
   439 
       
   440 /*!
       
   441     Reserves a thread and uses it to run \a runnable, unless this thread will
       
   442     make the current thread count exceed maxThreadCount().  In that case,
       
   443     \a runnable is added to a run queue instead. The \a priority argument can
       
   444     be used to control the run queue's order of execution.
       
   445 
       
   446     Note that the thread pool takes ownership of the \a runnable if
       
   447     \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns true,
       
   448     and the \a runnable will be deleted automatically by the thread
       
   449     pool after the \l{QRunnable::run()}{runnable->run()} returns. If
       
   450     \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns false,
       
   451     ownership of \a runnable remains with the caller. Note that
       
   452     changing the auto-deletion on \a runnable after calling this
       
   453     functions results in undefined behavior.
       
   454 */
       
   455 void QThreadPool::start(QRunnable *runnable, int priority)
       
   456 {
       
   457     if (!runnable)
       
   458         return;
       
   459 
       
   460     Q_D(QThreadPool);
       
   461     QMutexLocker locker(&d->mutex);
       
   462     if (!d->tryStart(runnable))
       
   463         d->enqueueTask(runnable, priority);
       
   464 }
       
   465 
       
   466 /*!
       
   467     Attempts to reserve a thread to run \a runnable.
       
   468 
       
   469     If no threads are available at the time of calling, then this function
       
   470     does nothing and returns false.  Otherwise, \a runnable is run immediately
       
   471     using one available thread and this function returns true.
       
   472 
       
   473     Note that the thread pool takes ownership of the \a runnable if
       
   474     \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns true,
       
   475     and the \a runnable will be deleted automatically by the thread
       
   476     pool after the \l{QRunnable::run()}{runnable->run()} returns. If
       
   477     \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns false,
       
   478     ownership of \a runnable remains with the caller. Note that
       
   479     changing the auto-deletion on \a runnable after calling this
       
   480     function results in undefined behavior.
       
   481 */
       
   482 bool QThreadPool::tryStart(QRunnable *runnable)
       
   483 {
       
   484     if (!runnable)
       
   485         return false;
       
   486 
       
   487     Q_D(QThreadPool);
       
   488 
       
   489     // To improve scalability perform a check on the thread count
       
   490     // before locking the mutex.
       
   491     if (d->allThreads.isEmpty() == false && d->activeThreadCount() >= d->maxThreadCount)
       
   492         return false;
       
   493 
       
   494     QMutexLocker locker(&d->mutex);
       
   495     return d->tryStart(runnable);
       
   496 }
       
   497 
       
   498 /*! \property QThreadPool::expiryTimeout
       
   499 
       
   500     Threads that are unused for \a expiryTimeout milliseconds are considered
       
   501     to have expired and will exit. Such threads will be restarted as needed.
       
   502     The default \a expiryTimeout is 30000 milliseconds (30 seconds). If
       
   503     \a expiryTimeout is negative, newly created threads will not expire, e.g.,
       
   504     they will not exit until the thread pool is destroyed.
       
   505 
       
   506     Note that setting \a expiryTimeout has no effect on already running
       
   507     threads. Only newly created threads will use the new \a expiryTimeout.
       
   508     We recommend setting the \a expiryTimeout immediately after creating the
       
   509     thread pool, but before calling start().
       
   510 */
       
   511 
       
   512 int QThreadPool::expiryTimeout() const
       
   513 {
       
   514     Q_D(const QThreadPool);
       
   515     return d->expiryTimeout;
       
   516 }
       
   517 
       
   518 void QThreadPool::setExpiryTimeout(int expiryTimeout)
       
   519 {
       
   520     Q_D(QThreadPool);
       
   521     if (d->expiryTimeout == expiryTimeout)
       
   522         return;
       
   523     d->expiryTimeout = expiryTimeout;
       
   524 }
       
   525 
       
   526 /*! \property QThreadPool::maxThreadCount
       
   527 
       
   528     This property represents the maximum number of threads used by the thread
       
   529     pool.
       
   530 
       
   531     \note The thread pool will always use at least 1 thread, even if
       
   532     \a maxThreadCount limit is zero or negative.
       
   533 
       
   534     The default \a maxThreadCount is QThread::idealThreadCount().
       
   535 */
       
   536 
       
   537 int QThreadPool::maxThreadCount() const
       
   538 {
       
   539     Q_D(const QThreadPool);
       
   540     return d->maxThreadCount;
       
   541 }
       
   542 
       
   543 void QThreadPool::setMaxThreadCount(int maxThreadCount)
       
   544 {
       
   545     Q_D(QThreadPool);
       
   546     QMutexLocker locker(&d->mutex);
       
   547 
       
   548     if (maxThreadCount == d->maxThreadCount)
       
   549         return;
       
   550 
       
   551     d->maxThreadCount = maxThreadCount;
       
   552     d->tryToStartMoreThreads();
       
   553 }
       
   554 
       
   555 /*! \property QThreadPool::activeThreadCount
       
   556 
       
   557     This property represents the number of active threads in the thread pool.
       
   558 
       
   559     \note It is possible for this function to return a value that is greater
       
   560     than maxThreadCount(). See reserveThread() for more details.
       
   561 
       
   562     \sa reserveThread(), releaseThread()
       
   563 */
       
   564 
       
   565 int QThreadPool::activeThreadCount() const
       
   566 {
       
   567     Q_D(const QThreadPool);
       
   568     return d->activeThreadCount();
       
   569 }
       
   570 
       
   571 /*!
       
   572     Reserves one thread, disregarding activeThreadCount() and maxThreadCount().
       
   573 
       
   574     Once you are done with the thread, call releaseThread() to allow it to be
       
   575     reused.
       
   576 
       
   577     \note This function will always increase the number of active threads.
       
   578     This means that by using this function, it is possible for
       
   579     activeThreadCount() to return a value greater than maxThreadCount() .
       
   580 
       
   581     \sa releaseThread()
       
   582  */
       
   583 void QThreadPool::reserveThread()
       
   584 {
       
   585     Q_D(QThreadPool);
       
   586     QMutexLocker locker(&d->mutex);
       
   587     ++d->reservedThreads;
       
   588 }
       
   589 
       
   590 /*!
       
   591     Releases a thread previously reserved by a call to reserveThread().
       
   592 
       
   593     \note Calling this function without previously reserving a thread
       
   594     temporarily increases maxThreadCount(). This is useful when a
       
   595     thread goes to sleep waiting for more work, allowing other threads
       
   596     to continue. Be sure to call reserveThread() when done waiting, so
       
   597     that the thread pool can correctly maintain the
       
   598     activeThreadCount().
       
   599 
       
   600     \sa reserveThread()
       
   601 */
       
   602 void QThreadPool::releaseThread()
       
   603 {
       
   604     Q_D(QThreadPool);
       
   605     QMutexLocker locker(&d->mutex);
       
   606     --d->reservedThreads;
       
   607     d->tryToStartMoreThreads();
       
   608 }
       
   609 
       
   610 /*!
       
   611     Waits for each thread to exit and removes all threads from the thread pool.
       
   612 */
       
   613 void QThreadPool::waitForDone()
       
   614 {
       
   615     Q_D(QThreadPool);
       
   616     d->waitForDone();
       
   617     d->reset();
       
   618 }
       
   619 
       
   620 QT_END_NAMESPACE
       
   621 
       
   622 #endif