src/corelib/concurrent/qtconcurrentreducekernel.h
changeset 0 1918ee327afb
child 4 3b1da2848fc7
equal deleted inserted replaced
-1:000000000000 0:1918ee327afb
       
     1 /****************************************************************************
       
     2 **
       
     3 ** Copyright (C) 2009 Nokia Corporation and/or its subsidiary(-ies).
       
     4 ** All rights reserved.
       
     5 ** Contact: Nokia Corporation (qt-info@nokia.com)
       
     6 **
       
     7 ** This file is part of the QtCore module of the Qt Toolkit.
       
     8 **
       
     9 ** $QT_BEGIN_LICENSE:LGPL$
       
    10 ** No Commercial Usage
       
    11 ** This file contains pre-release code and may not be distributed.
       
    12 ** You may use this file in accordance with the terms and conditions
       
    13 ** contained in the Technology Preview License Agreement accompanying
       
    14 ** this package.
       
    15 **
       
    16 ** GNU Lesser General Public License Usage
       
    17 ** Alternatively, this file may be used under the terms of the GNU Lesser
       
    18 ** General Public License version 2.1 as published by the Free Software
       
    19 ** Foundation and appearing in the file LICENSE.LGPL included in the
       
    20 ** packaging of this file.  Please review the following information to
       
    21 ** ensure the GNU Lesser General Public License version 2.1 requirements
       
    22 ** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
       
    23 **
       
    24 ** In addition, as a special exception, Nokia gives you certain additional
       
    25 ** rights.  These rights are described in the Nokia Qt LGPL Exception
       
    26 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
       
    27 **
       
    28 ** If you have questions regarding the use of this file, please contact
       
    29 ** Nokia at qt-info@nokia.com.
       
    30 **
       
    31 **
       
    32 **
       
    33 **
       
    34 **
       
    35 **
       
    36 **
       
    37 **
       
    38 ** $QT_END_LICENSE$
       
    39 **
       
    40 ****************************************************************************/
       
    41 
       
    42 #ifndef QTCONCURRENT_REDUCEKERNEL_H
       
    43 #define QTCONCURRENT_REDUCEKERNEL_H
       
    44 
       
    45 #include <QtCore/qglobal.h>
       
    46 
       
    47 #ifndef QT_NO_CONCURRENT
       
    48 
       
    49 #include <QtCore/qatomic.h>
       
    50 #include <QtCore/qlist.h>
       
    51 #include <QtCore/qmap.h>
       
    52 #include <QtCore/qmutex.h>
       
    53 #include <QtCore/qthread.h>
       
    54 #include <QtCore/qthreadpool.h>
       
    55 #include <QtCore/qvector.h>
       
    56 
       
    57 QT_BEGIN_HEADER
       
    58 QT_BEGIN_NAMESPACE
       
    59 
       
    60 QT_MODULE(Core)
       
    61 
       
    62 namespace QtConcurrent {
       
    63 
       
    64 #ifndef qdoc
       
    65 
       
    66 /*
       
    67     The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
       
    68     limit the reduce queue size for MapReduce. When the number of
       
    69     reduce blocks in the queue exceeds ReduceQueueStartLimit,
       
    70     MapReduce won't start any new threads, and when it exceeds
       
    71     ReduceQueueThrottleLimit running threads will be stopped.
       
    72 */
       
    73 enum {
       
    74     ReduceQueueStartLimit = 20,
       
    75     ReduceQueueThrottleLimit = 30
       
    76 };
       
    77 
       
    78 // IntermediateResults holds a block of intermediate results from a
       
    79 // map or filter functor. The begin/end offsets indicates the origin
       
    80 // and range of the block.
       
    81 template <typename T>
       
    82 class IntermediateResults
       
    83 {
       
    84 public:
       
    85     int begin, end;
       
    86     QVector<T> vector;
       
    87 };
       
    88 
       
    89 #endif // qdoc
       
    90 
       
    91 enum ReduceOption {
       
    92     UnorderedReduce = 0x1,
       
    93     OrderedReduce = 0x2,
       
    94     SequentialReduce = 0x4
       
    95     // ParallelReduce = 0x8
       
    96 };
       
    97 Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
       
    98 Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
       
    99 
       
   100 #ifndef qdoc
       
   101 
       
   102 // supports both ordered and out-of-order reduction
       
   103 template <typename ReduceFunctor, typename ReduceResultType, typename T>
       
   104 class ReduceKernel
       
   105 {
       
   106     typedef QMap<int, IntermediateResults<T> > ResultsMap;
       
   107 
       
   108     const ReduceOptions reduceOptions;
       
   109 
       
   110     QMutex mutex;
       
   111     int progress, resultsMapSize, threadCount;
       
   112     ResultsMap resultsMap;
       
   113 
       
   114     bool canReduce(int begin) const
       
   115     {
       
   116         return (((reduceOptions & UnorderedReduce)
       
   117                  && progress == 0)
       
   118                 || ((reduceOptions & OrderedReduce)
       
   119                     && progress == begin));
       
   120     }
       
   121 
       
   122     void reduceResult(ReduceFunctor &reduce,
       
   123                       ReduceResultType &r,
       
   124                       const IntermediateResults<T> &result)
       
   125     {
       
   126         for (int i = 0; i < result.vector.size(); ++i) {
       
   127             reduce(r, result.vector.at(i));
       
   128         }
       
   129     }
       
   130 
       
   131     void reduceResults(ReduceFunctor &reduce,
       
   132                        ReduceResultType &r,
       
   133                        ResultsMap &map)
       
   134     {
       
   135         typename ResultsMap::iterator it = map.begin();
       
   136         while (it != map.end()) {
       
   137             reduceResult(reduce, r, it.value());
       
   138             ++it;
       
   139         }
       
   140     }
       
   141 
       
   142 public:
       
   143     ReduceKernel(ReduceOptions _reduceOptions)
       
   144         : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0), 
       
   145           threadCount(QThreadPool::globalInstance()->maxThreadCount())
       
   146     { }
       
   147 
       
   148     void runReduce(ReduceFunctor &reduce,
       
   149                    ReduceResultType &r,
       
   150                    const IntermediateResults<T> &result)
       
   151     {
       
   152         QMutexLocker locker(&mutex);
       
   153         if (!canReduce(result.begin)) {
       
   154             ++resultsMapSize;
       
   155             resultsMap.insert(result.begin, result);
       
   156             return;
       
   157         }
       
   158 
       
   159         if (reduceOptions & UnorderedReduce) {
       
   160             // UnorderedReduce
       
   161             progress = -1;
       
   162 
       
   163             // reduce this result
       
   164             locker.unlock();
       
   165             reduceResult(reduce, r, result);
       
   166             locker.relock();
       
   167 
       
   168             // reduce all stored results as well
       
   169             while (!resultsMap.isEmpty()) {
       
   170                 ResultsMap resultsMapCopy = resultsMap;
       
   171                 resultsMap.clear();
       
   172 
       
   173                 locker.unlock();
       
   174                 reduceResults(reduce, r, resultsMapCopy);
       
   175                 locker.relock();
       
   176 
       
   177                 resultsMapSize -= resultsMapCopy.size();
       
   178             }
       
   179 
       
   180             progress = 0;
       
   181         } else {
       
   182             // reduce this result
       
   183             locker.unlock();
       
   184             reduceResult(reduce, r, result);
       
   185             locker.relock();
       
   186 
       
   187             // OrderedReduce
       
   188             progress += result.end - result.begin;
       
   189 
       
   190             // reduce as many other results as possible
       
   191             typename ResultsMap::iterator it = resultsMap.begin();
       
   192             while (it != resultsMap.end()) {
       
   193                 if (it.value().begin != progress)
       
   194                     break;
       
   195 
       
   196                 locker.unlock();
       
   197                 reduceResult(reduce, r, it.value());
       
   198                 locker.relock();
       
   199 
       
   200                 --resultsMapSize;
       
   201                 progress += it.value().end - it.value().begin;
       
   202                 it = resultsMap.erase(it);
       
   203             }
       
   204         }
       
   205     }
       
   206 
       
   207     // final reduction
       
   208     void finish(ReduceFunctor &reduce, ReduceResultType &r)
       
   209     {
       
   210         reduceResults(reduce, r, resultsMap);
       
   211     }
       
   212 
       
   213     inline bool shouldThrottle()
       
   214     {
       
   215         return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
       
   216     }
       
   217 
       
   218     inline bool shouldStartThread()
       
   219     {
       
   220         return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
       
   221     }
       
   222 };
       
   223 
       
   224 template <typename Sequence, typename Base, typename Functor1, typename Functor2>
       
   225 struct SequenceHolder2 : public Base
       
   226 {
       
   227     SequenceHolder2(const Sequence &_sequence,
       
   228                     Functor1 functor1,
       
   229                     Functor2 functor2,
       
   230                     ReduceOptions reduceOptions)
       
   231         : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
       
   232           sequence(_sequence)
       
   233     { }
       
   234 
       
   235     Sequence sequence;
       
   236 
       
   237     void finish()
       
   238     {
       
   239         Base::finish();
       
   240         // Clear the sequence to make sure all temporaries are destroyed
       
   241         // before finished is signaled.
       
   242         sequence = Sequence();
       
   243     }
       
   244 };
       
   245 
       
   246 #endif //qdoc
       
   247 
       
   248 } // namespace QtConcurrent
       
   249 
       
   250 QT_END_NAMESPACE
       
   251 QT_END_HEADER
       
   252 
       
   253 #endif // QT_NO_CONCURRENT
       
   254 
       
   255 #endif