buildframework/helium/external/python/lib/common/threadpool.py
author wbernard
Wed, 23 Dec 2009 19:29:07 +0200
changeset 179 d8ac696cc51f
permissions -rw-r--r--
helium_7.0-r14027
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
179
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
     1
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
     2
# -*- coding: UTF-8 -*-
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
     3
"""Easy to use object-oriented thread pool framework.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
     4
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
     5
A thread pool is an object that maintains a pool of worker threads to perform
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
     6
time consuming operations in parallel. It assigns jobs to the threads
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
     7
by putting them in a work request queue, where they are picked up by the
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
     8
next available thread. This then performs the requested operation in the
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
     9
background and puts the results in a another queue.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    10
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    11
The thread pool object can then collect the results from all threads from
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    12
this queue as soon as they become available or after all threads have
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    13
finished their work. It's also possible, to define callbacks to handle
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    14
each result as it comes in.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    15
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    16
The basic concept and some code was taken from the book "Python in a Nutshell"
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    17
by Alex Martelli, copyright 2003, ISBN 0-596-00188-6, from section 14.5
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    18
"Threaded Program Architecture". I wrapped the main program logic in the
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    19
ThreadPool class, added the WorkRequest class and the callback system and
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    20
tweaked the code here and there. Kudos also to Florent Aide for the exception
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    21
handling mechanism.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    22
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    23
Basic usage:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    24
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    25
>>> pool = TreadPool(poolsize)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    26
>>> requests = makeRequests(some_callable, list_of_args, callback)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    27
>>> [pool.putRequest(req) for req in requests]
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    28
>>> pool.wait()
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    29
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    30
See the end of the module code for a brief, annotated usage example.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    31
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    32
Website : http://chrisarndt.de/en/software/python/threadpool/
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    33
"""
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    34
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    35
__all__ = [
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    36
  'makeRequests',
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    37
  'NoResultsPending',
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    38
  'NoWorkersAvailable',
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    39
  'ThreadPool',
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    40
  'WorkRequest',
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    41
  'WorkerThread'
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    42
]
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    43
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    44
__author__ = "Christopher Arndt"
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    45
__version__ = "1.2.3"
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    46
__revision__ = "$Revision: 1.5 $"
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    47
__date__ = "$Date: 2006/06/23 12:32:25 $"
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    48
__license__ = 'Python license'
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    49
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    50
# standard library modules
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    51
import sys
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    52
import threading
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    53
import Queue
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    54
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    55
# exceptions
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    56
class NoResultsPending(Exception):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    57
    """All work requests have been processed."""
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    58
    pass
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    59
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    60
class NoWorkersAvailable(Exception):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    61
    """No worker threads available to process remaining requests."""
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    62
    pass
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    63
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    64
# classes
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    65
class WorkerThread(threading.Thread):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    66
    """Background thread connected to the requests/results queues.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    67
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    68
    A worker thread sits in the background and picks up work requests from
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    69
    one queue and puts the results in another until it is dismissed.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    70
    """
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    71
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    72
    def __init__(self, requestsQueue, resultsQueue, **kwds):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    73
        """Set up thread in daemonic mode and start it immediatedly.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    74
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    75
        requestsQueue and resultQueue are instances of Queue.Queue passed
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    76
        by the ThreadPool class when it creates a new worker thread.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    77
        """
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    78
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    79
        threading.Thread.__init__(self, **kwds)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    80
        self.setDaemon(1)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    81
        self.workRequestQueue = requestsQueue
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    82
        self.resultQueue = resultsQueue
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    83
        self._dismissed = threading.Event()
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    84
        self.start()
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    85
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    86
    def run(self):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    87
        """Repeatedly process the job queue until told to exit."""
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    88
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    89
        while not self._dismissed.isSet():
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    90
            # thread blocks here, if queue empty
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    91
            request = self.workRequestQueue.get()
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    92
            if self._dismissed.isSet():
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    93
                # if told to exit, return the work request we just picked up
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    94
                self.workRequestQueue.put(request)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    95
                break # and exit
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    96
            try:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    97
                self.resultQueue.put(
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    98
                    (request, request.callable(*request.args, **request.kwds))
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
    99
                )
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   100
            except:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   101
                request.exception = True
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   102
                self.resultQueue.put((request, sys.exc_info()))
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   103
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   104
    def dismiss(self):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   105
        """Sets a flag to tell the thread to exit when done with current job.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   106
        """
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   107
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   108
        self._dismissed.set()
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   109
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   110
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   111
class WorkRequest:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   112
    """A request to execute a callable for putting in the request queue later.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   113
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   114
    See the module function makeRequests() for the common case
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   115
    where you want to build several WorkRequests for the same callable
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   116
    but with different arguments for each call.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   117
    """
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   118
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   119
    def __init__(self, callable, args=None, kwds=None, requestID=None,
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   120
      callback=None, exc_callback=None):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   121
        """Create a work request for a callable and attach callbacks.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   122
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   123
        A work request consists of the a callable to be executed by a
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   124
        worker thread, a list of positional arguments, a dictionary
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   125
        of keyword arguments.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   126
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   127
        A callback function can be specified, that is called when the results
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   128
        of the request are picked up from the result queue. It must accept
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   129
        two arguments, the request object and the results of the callable,
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   130
        in that order. If you want to pass additional information to the
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   131
        callback, just stick it on the request object.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   132
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   133
        You can also give a callback for when an exception occurs. It should
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   134
        also accept two arguments, the work request and a tuple with the
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   135
        exception details as returned by sys.exc_info().
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   136
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   137
        requestID, if given, must be hashable since it is used by the
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   138
        ThreadPool object to store the results of that work request in a
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   139
        dictionary. It defaults to the return value of id(self).
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   140
        """
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   141
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   142
        if requestID is None:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   143
            self.requestID = id(self)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   144
        else:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   145
            try:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   146
                hash(requestID)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   147
            except TypeError:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   148
                raise TypeError("requestID must be hashable.")
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   149
            self.requestID = requestID
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   150
        self.exception = False
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   151
        self.callback = callback
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   152
        self.exc_callback = exc_callback
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   153
        self.callable = callable
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   154
        self.args = args or []
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   155
        self.kwds = kwds or {}
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   156
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   157
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   158
class ThreadPool:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   159
    """A thread pool, distributing work requests and collecting results.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   160
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   161
    See the module doctring for more information.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   162
    """
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   163
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   164
    def __init__(self, num_workers, q_size=0):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   165
        """Set up the thread pool and start num_workers worker threads.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   166
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   167
        num_workers is the number of worker threads to start initialy.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   168
        If q_size > 0 the size of the work request queue is limited and
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   169
        the thread pool blocks when the queue is full and it tries to put
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   170
        more work requests in it (see putRequest method).
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   171
        """
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   172
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   173
        self.requestsQueue = Queue.Queue(q_size)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   174
        self.resultsQueue = Queue.Queue()
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   175
        self.workers = []
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   176
        self.workRequests = {}
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   177
        self.createWorkers(num_workers)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   178
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   179
    def createWorkers(self, num_workers):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   180
        """Add num_workers worker threads to the pool."""
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   181
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   182
        for i in range(num_workers):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   183
            self.workers.append(WorkerThread(self.requestsQueue,
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   184
              self.resultsQueue))
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   185
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   186
    def dismissWorkers(self, num_workers):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   187
        """Tell num_workers worker threads to quit after their current task.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   188
        """
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   189
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   190
        for i in range(min(num_workers, len(self.workers))):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   191
            worker = self.workers.pop()
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   192
            worker.dismiss()
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   193
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   194
    def addWork(self, callable, args=None, kwds=None, requestID=None, callback=None, exc_callback=None, block=True, timeout=0):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   195
        request = WorkRequest(callable, args, kwds, requestID, callback, exc_callback)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   196
        self.putRequest(request, block, timeout)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   197
        
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   198
    def putRequest(self, request, block=True, timeout=0):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   199
        """Put work request into work queue and save its id for later."""
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   200
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   201
        assert isinstance(request, WorkRequest)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   202
        self.requestsQueue.put(request, block, timeout)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   203
        self.workRequests[request.requestID] = request
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   204
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   205
    def poll(self, block=False):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   206
        """Process any new results in the queue."""
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   207
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   208
        while True:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   209
            # still results pending?
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   210
            if not self.workRequests:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   211
                raise NoResultsPending
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   212
            # are there still workers to process remaining requests?
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   213
            elif block and not self.workers:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   214
                raise NoWorkersAvailable
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   215
            try:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   216
                # get back next results
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   217
                request, result = self.resultsQueue.get(block=block)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   218
                # has an exception occured?
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   219
                if request.exception and request.exc_callback:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   220
                    request.exc_callback(request, result)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   221
                # hand results to callback, if any
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   222
                if request.callback and not \
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   223
                  (request.exception and request.exc_callback):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   224
                    request.callback(request, result)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   225
                del self.workRequests[request.requestID]
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   226
            except Queue.Empty:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   227
                break
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   228
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   229
    def wait(self):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   230
        """Wait for results, blocking until all have arrived."""
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   231
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   232
        while 1:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   233
            try:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   234
                self.poll(True)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   235
            except NoResultsPending:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   236
                break
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   237
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   238
# helper functions
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   239
def makeRequests(callable, args_list, callback=None, exc_callback=None):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   240
    """Create several work requests for same callable with different arguments.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   241
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   242
    Convenience function for creating several work requests for the same
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   243
    callable where each invocation of the callable receives different values
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   244
    for its arguments.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   245
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   246
    args_list contains the parameters for each invocation of callable.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   247
    Each item in 'args_list' should be either a 2-item tuple of the list of
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   248
    positional arguments and a dictionary of keyword arguments or a single,
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   249
    non-tuple argument.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   250
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   251
    See docstring for WorkRequest for info on callback and exc_callback.
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   252
    """
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   253
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   254
    requests = []
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   255
    for item in args_list:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   256
        if isinstance(item, tuple):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   257
            requests.append(
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   258
              WorkRequest(callable, item[0], item[1], callback=callback,
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   259
                exc_callback=exc_callback)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   260
            )
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   261
        else:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   262
            requests.append(
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   263
              WorkRequest(callable, [item], None, callback=callback,
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   264
                exc_callback=exc_callback)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   265
            )
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   266
    return requests
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   267
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   268
################
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   269
# USAGE EXAMPLE
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   270
################
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   271
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   272
if __name__ == '__main__':
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   273
    import random
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   274
    import time
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   275
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   276
    # the work the threads will have to do (rather trivial in our example)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   277
    def do_something(data):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   278
        time.sleep(random.randint(1, 5))
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   279
        result = round(random.random() * data, 5)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   280
        # just to show off, we throw an exception once in a while
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   281
        if result > 3:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   282
            raise RuntimeError("Something extraordinary happened!")
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   283
        return result
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   284
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   285
    # this will be called each time a result is available
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   286
    def print_result(request, result):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   287
        print "**Result: %s from request #%s" % (result, request.requestID)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   288
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   289
    # this will be called when an exception occurs within a thread
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   290
    def handle_exception(request, exc_info):
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   291
        print "Exception occured in request #%s: %s" % \
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   292
          (request.requestID, exc_info[1])
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   293
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   294
    # assemble the arguments for each job to a list...
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   295
    data = [random.randint(1, 10) for i in range(20)]
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   296
    # ... and build a WorkRequest object for each item in data
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   297
    requests = makeRequests(do_something, data, print_result, handle_exception)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   298
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   299
    # or the other form of args_lists accepted by makeRequests: ((,), {})
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   300
    data = [((random.randint(1, 10), ), {}) for i in range(20)]
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   301
    requests.extend(
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   302
      makeRequests(do_something, data, print_result, handle_exception)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   303
    )
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   304
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   305
    # we create a pool of 3 worker threads
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   306
    main = ThreadPool(3)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   307
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   308
    # then we put the work requests in the queue...
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   309
    for req in requests:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   310
        main.putRequest(req)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   311
        print "Work request #%s added." % req.requestID
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   312
    # or shorter:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   313
    # [main.putRequest(req) for req in requests]
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   314
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   315
    # ...and wait for the results to arrive in the result queue
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   316
    # by using ThreadPool.wait(). This would block until results for
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   317
    # all work requests have arrived:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   318
    # main.wait()
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   319
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   320
    # instead we can poll for results while doing something else:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   321
    i = 0
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   322
    while 1:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   323
        try:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   324
            main.poll()
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   325
            print "Main thread working..."
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   326
            time.sleep(0.5)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   327
            if i == 10:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   328
                print "Adding 3 more worker threads..."
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   329
                main.createWorkers(3)
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   330
            i += 1
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   331
        except KeyboardInterrupt:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   332
            print "Interrupted!"
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   333
            break
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   334
        except NoResultsPending:
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   335
            print "All results collected."
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   336
            break
d8ac696cc51f helium_7.0-r14027
wbernard
parents:
diff changeset
   337