diff -r 000000000000 -r 1918ee327afb src/corelib/concurrent/qthreadpool.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/corelib/concurrent/qthreadpool.cpp Mon Jan 11 14:00:40 2010 +0000 @@ -0,0 +1,622 @@ +/**************************************************************************** +** +** Copyright (C) 2009 Nokia Corporation and/or its subsidiary(-ies). +** All rights reserved. +** Contact: Nokia Corporation (qt-info@nokia.com) +** +** This file is part of the QtCore module of the Qt Toolkit. +** +** $QT_BEGIN_LICENSE:LGPL$ +** No Commercial Usage +** This file contains pre-release code and may not be distributed. +** You may use this file in accordance with the terms and conditions +** contained in the Technology Preview License Agreement accompanying +** this package. +** +** GNU Lesser General Public License Usage +** Alternatively, this file may be used under the terms of the GNU Lesser +** General Public License version 2.1 as published by the Free Software +** Foundation and appearing in the file LICENSE.LGPL included in the +** packaging of this file. Please review the following information to +** ensure the GNU Lesser General Public License version 2.1 requirements +** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html. +** +** In addition, as a special exception, Nokia gives you certain additional +** rights. These rights are described in the Nokia Qt LGPL Exception +** version 1.1, included in the file LGPL_EXCEPTION.txt in this package. +** +** If you have questions regarding the use of this file, please contact +** Nokia at qt-info@nokia.com. +** +** +** +** +** +** +** +** +** $QT_END_LICENSE$ +** +****************************************************************************/ + +#include "qthreadpool.h" +#include "qthreadpool_p.h" + +#ifndef QT_NO_THREAD + +QT_BEGIN_NAMESPACE + +inline bool operator<(int priority, const QPair &p) +{ + return p.second < priority; +} +inline bool operator<(const QPair &p, int priority) +{ + return priority < p.second; +} + +Q_GLOBAL_STATIC(QThreadPool, theInstance) + +/* + QThread wrapper, provides synchronizitaion against a ThreadPool +*/ +class QThreadPoolThread : public QThread +{ +public: + QThreadPoolThread(QThreadPoolPrivate *manager); + void run(); + void registerTheadInactive(); + + QThreadPoolPrivate *manager; + QRunnable *runnable; +}; + +/* + QThreadPool private class. +*/ + + +/*!\internal + +*/ +QThreadPoolThread::QThreadPoolThread(QThreadPoolPrivate *manager) + :manager(manager), runnable(0) +{ } + +/* \internal + +*/ +void QThreadPoolThread::run() +{ + QMutexLocker locker(&manager->mutex); + for(;;) { + QRunnable *r = runnable; + runnable = 0; + + do { + if (r) { + const bool autoDelete = r->autoDelete(); + + + // run the task + locker.unlock(); +#ifndef QT_NO_EXCEPTIONS + try { +#endif + r->run(); +#ifndef QT_NO_EXCEPTIONS + } catch (...) { + qWarning("Qt Concurrent has caught an exception thrown from a worker thread.\n" + "This is not supported, exceptions thrown in worker threads must be\n" + "caught before control returns to Qt Concurrent."); + registerTheadInactive(); + throw; + } +#endif + locker.relock(); + + if (autoDelete && !--r->ref) + delete r; + } + + // if too many threads are active, expire this thread + if (manager->tooManyThreadsActive()) + break; + + r = !manager->queue.isEmpty() ? manager->queue.takeFirst().first : 0; + } while (r != 0); + + if (manager->isExiting) { + registerTheadInactive(); + break; + } + + // if too many threads are active, expire this thread + bool expired = manager->tooManyThreadsActive(); + if (!expired) { + ++manager->waitingThreads; + registerTheadInactive(); + // wait for work, exiting after the expiry timeout is reached + expired = !manager->runnableReady.wait(locker.mutex(), manager->expiryTimeout); + ++manager->activeThreads; + + if (expired) + --manager->waitingThreads; + } + if (expired) { + manager->expiredThreads.enqueue(this); + registerTheadInactive(); + break; + } + } +} + +void QThreadPoolThread::registerTheadInactive() +{ + if (--manager->activeThreads == 0) + manager->noActiveThreads.wakeAll(); +} + + +/* \internal + +*/ +QThreadPoolPrivate:: QThreadPoolPrivate() + : isExiting(false), + expiryTimeout(30000), + maxThreadCount(qAbs(QThread::idealThreadCount())), + reservedThreads(0), + waitingThreads(0), + activeThreads(0) +{ } + +bool QThreadPoolPrivate::tryStart(QRunnable *task) +{ + if (allThreads.isEmpty()) { + // always create at least one thread + startThread(task); + return true; + } + + // can't do anything if we're over the limit + if (activeThreadCount() >= maxThreadCount) + return false; + + if (waitingThreads > 0) { + // recycle an available thread + --waitingThreads; + enqueueTask(task); + return true; + } + + if (!expiredThreads.isEmpty()) { + // restart an expired thread + QThreadPoolThread *thread = expiredThreads.dequeue(); + Q_ASSERT(thread->runnable == 0); + + ++activeThreads; + + if (task->autoDelete()) + ++task->ref; + thread->runnable = task; + thread->start(); + return true; + } + + // start a new thread + startThread(task); + return true; +} + +void QThreadPoolPrivate::enqueueTask(QRunnable *runnable, int priority) +{ + if (runnable->autoDelete()) + ++runnable->ref; + + // put it on the queue + QList >::iterator at = + qUpperBound(queue.begin(), queue.end(), priority); + queue.insert(at, qMakePair(runnable, priority)); + runnableReady.wakeOne(); +} + +int QThreadPoolPrivate::activeThreadCount() const +{ + // To improve scalability this function is called without holding + // the mutex lock -- keep it thread-safe. + return (allThreads.count() + - expiredThreads.count() + - waitingThreads + + reservedThreads); +} + +void QThreadPoolPrivate::tryToStartMoreThreads() +{ + // try to push tasks on the queue to any available threads + while (!queue.isEmpty() && tryStart(queue.first().first)) + queue.removeFirst(); +} + +bool QThreadPoolPrivate::tooManyThreadsActive() const +{ + const int activeThreadCount = this->activeThreadCount(); + return activeThreadCount > maxThreadCount && (activeThreadCount - reservedThreads) > 1; +} + +/*! \internal + +*/ +void QThreadPoolPrivate::startThread(QRunnable *runnable) +{ + QScopedPointer thread(new QThreadPoolThread(this)); + allThreads.insert(thread.data()); + ++activeThreads; + + if (runnable->autoDelete()) + ++runnable->ref; + thread->runnable = runnable; + thread.take()->start(); +} + +/*! \internal + Makes all threads exit, waits for each tread to exit and deletes it. +*/ +void QThreadPoolPrivate::reset() +{ + QMutexLocker locker(&mutex); + isExiting = true; + runnableReady.wakeAll(); + + do { + // make a copy of the set so that we can iterate without the lock + QSet allThreadsCopy = allThreads; + allThreads.clear(); + locker.unlock(); + + foreach (QThreadPoolThread *thread, allThreadsCopy) { + thread->wait(); + delete thread; + } + + locker.relock(); + // repeat until all newly arrived threads have also completed + } while (!allThreads.isEmpty()); + + waitingThreads = 0; + expiredThreads.clear(); + + isExiting = false; +} + +void QThreadPoolPrivate::waitForDone() +{ + QMutexLocker locker(&mutex); + while (!(queue.isEmpty() && activeThreads == 0)) + noActiveThreads.wait(locker.mutex()); +} + +/*! \internal + Pulls a runnable from the front queue and runs it in the current thread. Blocks + until the runnable has completed. Returns true if a runnable was found. +*/ +bool QThreadPoolPrivate::startFrontRunnable() +{ + QMutexLocker locker(&mutex); + if (queue.isEmpty()) + return false; + + QRunnable *runnable = queue.takeFirst().first; + const bool autoDelete = runnable->autoDelete(); + bool del = autoDelete && !--runnable->ref; + + locker.unlock(); + runnable->run(); + locker.relock(); + + if (del) { + delete runnable; + } + + return true; +} + +/*! \internal + Seaches for \a runnable in the queue, removes it from the queue and + runs it if found. This functon does not return until the runnable + has completed. +*/ +void QThreadPoolPrivate::stealRunnable(QRunnable *runnable) +{ + if (runnable == 0 || queue.isEmpty()) + return; + bool found = false; + { + QMutexLocker locker(&mutex); + QList >::iterator it = queue.begin(); + QList >::iterator end = queue.end(); + + while (it != end) { + if (it->first == runnable) { + found = true; + queue.erase(it); + break; + } + ++it; + } + } + + if (!found) + return; + + const bool autoDelete = runnable->autoDelete(); + bool del = autoDelete && !--runnable->ref; + + runnable->run(); + + if (del) { + delete runnable; + } +} + +/*! + \class QThreadPool + \brief The QThreadPool class manages a collection of QThreads. + \since 4.4 + \threadsafe + + \ingroup thread + + QThreadPool manages and recyles individual QThread objects to help reduce + thread creation costs in programs that use threads. Each Qt application + has one global QThreadPool object, which can be accessed by calling + globalInstance(). + + To use one of the QThreadPool threads, subclass QRunnable and implement + the run() virtual function. Then create an object of that class and pass + it to QThreadPool::start(). + + \snippet doc/src/snippets/code/src_corelib_concurrent_qthreadpool.cpp 0 + + QThreadPool deletes the QRunnable automatically by default. Use + QRunnable::setAutoDelete() to change the auto-deletion flag. + + QThreadPool supports executing the same QRunnable more than once + by calling tryStart(this) from within QRunnable::run(). + If autoDelete is enabled the QRunnable will be deleted when + the last thread exits the run function. Calling start() + multiple times with the same QRunnable when autoDelete is enabled + creates a race condition and is not recommended. + + Threads that are unused for a certain amount of time will expire. The + default expiry timeout is 30000 milliseconds (30 seconds). This can be + changed using setExpiryTimeout(). Setting a negative expiry timeout + disables the expiry mechanism. + + Call maxThreadCount() to query the maximum number of threads to be used. + If needed, you can change the limit with setMaxThreadCount(). The default + maxThreadCount() is QThread::idealThreadCount(). The activeThreadCount() + function returns the number of threads currently doing work. + + The reserveThread() function reserves a thread for external + use. Use releaseThread() when your are done with the thread, so + that it may be reused. Essentially, these functions temporarily + increase or reduce the active thread count and are useful when + implementing time-consuming operations that are not visible to the + QThreadPool. + + Note that QThreadPool is a low-level class for managing threads, see + QtConcurrent::run() or the other + \l {Concurrent Programming}{Qt Concurrent} APIs for higher + level alternatives. + + \sa QRunnable +*/ + +/*! + Constructs a thread pool with the given \a parent. +*/ +QThreadPool::QThreadPool(QObject *parent) + : QObject(*new QThreadPoolPrivate, parent) +{ } + +/*! + Destroys the QThreadPool. + This function will block until all runnables have been completed. +*/ +QThreadPool::~QThreadPool() +{ + d_func()->waitForDone(); + d_func()->reset(); +} + +/*! + Returns the global QThreadPool instance. +*/ +QThreadPool *QThreadPool::globalInstance() +{ + return theInstance(); +} + +/*! + Reserves a thread and uses it to run \a runnable, unless this thread will + make the current thread count exceed maxThreadCount(). In that case, + \a runnable is added to a run queue instead. The \a priority argument can + be used to control the run queue's order of execution. + + Note that the thread pool takes ownership of the \a runnable if + \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns true, + and the \a runnable will be deleted automatically by the thread + pool after the \l{QRunnable::run()}{runnable->run()} returns. If + \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns false, + ownership of \a runnable remains with the caller. Note that + changing the auto-deletion on \a runnable after calling this + functions results in undefined behavior. +*/ +void QThreadPool::start(QRunnable *runnable, int priority) +{ + if (!runnable) + return; + + Q_D(QThreadPool); + QMutexLocker locker(&d->mutex); + if (!d->tryStart(runnable)) + d->enqueueTask(runnable, priority); +} + +/*! + Attempts to reserve a thread to run \a runnable. + + If no threads are available at the time of calling, then this function + does nothing and returns false. Otherwise, \a runnable is run immediately + using one available thread and this function returns true. + + Note that the thread pool takes ownership of the \a runnable if + \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns true, + and the \a runnable will be deleted automatically by the thread + pool after the \l{QRunnable::run()}{runnable->run()} returns. If + \l{QRunnable::autoDelete()}{runnable->autoDelete()} returns false, + ownership of \a runnable remains with the caller. Note that + changing the auto-deletion on \a runnable after calling this + function results in undefined behavior. +*/ +bool QThreadPool::tryStart(QRunnable *runnable) +{ + if (!runnable) + return false; + + Q_D(QThreadPool); + + // To improve scalability perform a check on the thread count + // before locking the mutex. + if (d->allThreads.isEmpty() == false && d->activeThreadCount() >= d->maxThreadCount) + return false; + + QMutexLocker locker(&d->mutex); + return d->tryStart(runnable); +} + +/*! \property QThreadPool::expiryTimeout + + Threads that are unused for \a expiryTimeout milliseconds are considered + to have expired and will exit. Such threads will be restarted as needed. + The default \a expiryTimeout is 30000 milliseconds (30 seconds). If + \a expiryTimeout is negative, newly created threads will not expire, e.g., + they will not exit until the thread pool is destroyed. + + Note that setting \a expiryTimeout has no effect on already running + threads. Only newly created threads will use the new \a expiryTimeout. + We recommend setting the \a expiryTimeout immediately after creating the + thread pool, but before calling start(). +*/ + +int QThreadPool::expiryTimeout() const +{ + Q_D(const QThreadPool); + return d->expiryTimeout; +} + +void QThreadPool::setExpiryTimeout(int expiryTimeout) +{ + Q_D(QThreadPool); + if (d->expiryTimeout == expiryTimeout) + return; + d->expiryTimeout = expiryTimeout; +} + +/*! \property QThreadPool::maxThreadCount + + This property represents the maximum number of threads used by the thread + pool. + + \note The thread pool will always use at least 1 thread, even if + \a maxThreadCount limit is zero or negative. + + The default \a maxThreadCount is QThread::idealThreadCount(). +*/ + +int QThreadPool::maxThreadCount() const +{ + Q_D(const QThreadPool); + return d->maxThreadCount; +} + +void QThreadPool::setMaxThreadCount(int maxThreadCount) +{ + Q_D(QThreadPool); + QMutexLocker locker(&d->mutex); + + if (maxThreadCount == d->maxThreadCount) + return; + + d->maxThreadCount = maxThreadCount; + d->tryToStartMoreThreads(); +} + +/*! \property QThreadPool::activeThreadCount + + This property represents the number of active threads in the thread pool. + + \note It is possible for this function to return a value that is greater + than maxThreadCount(). See reserveThread() for more details. + + \sa reserveThread(), releaseThread() +*/ + +int QThreadPool::activeThreadCount() const +{ + Q_D(const QThreadPool); + return d->activeThreadCount(); +} + +/*! + Reserves one thread, disregarding activeThreadCount() and maxThreadCount(). + + Once you are done with the thread, call releaseThread() to allow it to be + reused. + + \note This function will always increase the number of active threads. + This means that by using this function, it is possible for + activeThreadCount() to return a value greater than maxThreadCount() . + + \sa releaseThread() + */ +void QThreadPool::reserveThread() +{ + Q_D(QThreadPool); + QMutexLocker locker(&d->mutex); + ++d->reservedThreads; +} + +/*! + Releases a thread previously reserved by a call to reserveThread(). + + \note Calling this function without previously reserving a thread + temporarily increases maxThreadCount(). This is useful when a + thread goes to sleep waiting for more work, allowing other threads + to continue. Be sure to call reserveThread() when done waiting, so + that the thread pool can correctly maintain the + activeThreadCount(). + + \sa reserveThread() +*/ +void QThreadPool::releaseThread() +{ + Q_D(QThreadPool); + QMutexLocker locker(&d->mutex); + --d->reservedThreads; + d->tryToStartMoreThreads(); +} + +/*! + Waits for each thread to exit and removes all threads from the thread pool. +*/ +void QThreadPool::waitForDone() +{ + Q_D(QThreadPool); + d->waitForDone(); + d->reset(); +} + +QT_END_NAMESPACE + +#endif