changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
     1 #
     2 # Module implementing queues
     3 #
     4 # multiprocessing/
     5 #
     6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
     7 #
     9 __all__ = ['Queue', 'SimpleQueue']
    11 import sys
    12 import os
    13 import threading
    14 import collections
    15 import time
    16 import atexit
    17 import weakref
    19 from Queue import Empty, Full
    20 import _multiprocessing
    21 from multiprocessing import Pipe
    22 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
    23 from multiprocessing.util import debug, info, Finalize, register_after_fork
    24 from multiprocessing.forking import assert_spawning
    26 #
    27 # Queue type using a pipe, buffer and thread
    28 #
    30 class Queue(object):
    32     def __init__(self, maxsize=0):
    33         if maxsize <= 0:
    34             maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
    35         self._maxsize = maxsize
    36         self._reader, self._writer = Pipe(duplex=False)
    37         self._rlock = Lock()
    38         self._opid = os.getpid()
    39         if sys.platform == 'win32':
    40             self._wlock = None
    41         else:
    42             self._wlock = Lock()
    43         self._sem = BoundedSemaphore(maxsize)
    45         self._after_fork()
    47         if sys.platform != 'win32':
    48             register_after_fork(self, Queue._after_fork)
    50     def __getstate__(self):
    51         assert_spawning(self)
    52         return (self._maxsize, self._reader, self._writer,
    53                 self._rlock, self._wlock, self._sem, self._opid)
    55     def __setstate__(self, state):
    56         (self._maxsize, self._reader, self._writer,
    57          self._rlock, self._wlock, self._sem, self._opid) = state
    58         self._after_fork()
    60     def _after_fork(self):
    61         debug('Queue._after_fork()')
    62         self._notempty = threading.Condition(threading.Lock())
    63         self._buffer = collections.deque()
    64         self._thread = None
    65         self._jointhread = None
    66         self._joincancelled = False
    67         self._closed = False
    68         self._close = None
    69         self._send = self._writer.send
    70         self._recv = self._reader.recv
    71         self._poll = self._reader.poll
    73     def put(self, obj, block=True, timeout=None):
    74         assert not self._closed
    75         if not self._sem.acquire(block, timeout):
    76             raise Full
    78         self._notempty.acquire()
    79         try:
    80             if self._thread is None:
    81                 self._start_thread()
    82             self._buffer.append(obj)
    83             self._notempty.notify()
    84         finally:
    85             self._notempty.release()
    87     def get(self, block=True, timeout=None):
    88         if block and timeout is None:
    89             self._rlock.acquire()
    90             try:
    91                 res = self._recv()
    92                 self._sem.release()
    93                 return res
    94             finally:
    95                 self._rlock.release()
    97         else:
    98             if block:
    99                 deadline = time.time() + timeout
   100             if not self._rlock.acquire(block, timeout):
   101                 raise Empty
   102             try:
   103                 if not self._poll(block and (deadline-time.time()) or 0.0):
   104                     raise Empty
   105                 res = self._recv()
   106                 self._sem.release()
   107                 return res
   108             finally:
   109                 self._rlock.release()
   111     def qsize(self):
   112         # Raises NotImplementError on Mac OSX because of broken sem_getvalue()
   113         return self._maxsize - self._sem._semlock._get_value()
   115     def empty(self):
   116         return not self._poll()
   118     def full(self):
   119         return self._sem._semlock._is_zero()
   121     def get_nowait(self):
   122         return self.get(False)
   124     def put_nowait(self, obj):
   125         return self.put(obj, False)
   127     def close(self):
   128         self._closed = True
   129         self._reader.close()
   130         if self._close:
   131             self._close()
   133     def join_thread(self):
   134         debug('Queue.join_thread()')
   135         assert self._closed
   136         if self._jointhread:
   137             self._jointhread()
   139     def cancel_join_thread(self):
   140         debug('Queue.cancel_join_thread()')
   141         self._joincancelled = True
   142         try:
   143             self._jointhread.cancel()
   144         except AttributeError:
   145             pass
   147     def _start_thread(self):
   148         debug('Queue._start_thread()')
   150         # Start thread which transfers data from buffer to pipe
   151         self._buffer.clear()
   152         self._thread = threading.Thread(
   153             target=Queue._feed,
   154             args=(self._buffer, self._notempty, self._send,
   155                   self._wlock, self._writer.close),
   156             name='QueueFeederThread'
   157             )
   158         self._thread.daemon = True
   160         debug('doing self._thread.start()')
   161         self._thread.start()
   162         debug('... done self._thread.start()')
   164         # On process exit we will wait for data to be flushed to pipe.
   165         #
   166         # However, if this process created the queue then all
   167         # processes which use the queue will be descendants of this
   168         # process.  Therefore waiting for the queue to be flushed
   169         # is pointless once all the child processes have been joined.
   170         created_by_this_process = (self._opid == os.getpid())
   171         if not self._joincancelled and not created_by_this_process:
   172             self._jointhread = Finalize(
   173                 self._thread, Queue._finalize_join,
   174                 [weakref.ref(self._thread)],
   175                 exitpriority=-5
   176                 )
   178         # Send sentinel to the thread queue object when garbage collected
   179         self._close = Finalize(
   180             self, Queue._finalize_close,
   181             [self._buffer, self._notempty],
   182             exitpriority=10
   183             )
   185     @staticmethod
   186     def _finalize_join(twr):
   187         debug('joining queue thread')
   188         thread = twr()
   189         if thread is not None:
   190             thread.join()
   191             debug('... queue thread joined')
   192         else:
   193             debug('... queue thread already dead')
   195     @staticmethod
   196     def _finalize_close(buffer, notempty):
   197         debug('telling queue thread to quit')
   198         notempty.acquire()
   199         try:
   200             buffer.append(_sentinel)
   201             notempty.notify()
   202         finally:
   203             notempty.release()
   205     @staticmethod
   206     def _feed(buffer, notempty, send, writelock, close):
   207         debug('starting thread to feed data to pipe')
   208         from .util import is_exiting
   210         nacquire = notempty.acquire
   211         nrelease = notempty.release
   212         nwait = notempty.wait
   213         bpopleft = buffer.popleft
   214         sentinel = _sentinel
   215         if sys.platform != 'win32':
   216             wacquire = writelock.acquire
   217             wrelease = writelock.release
   218         else:
   219             wacquire = None
   221         try:
   222             while 1:
   223                 nacquire()
   224                 try:
   225                     if not buffer:
   226                         nwait()
   227                 finally:
   228                     nrelease()
   229                 try:
   230                     while 1:
   231                         obj = bpopleft()
   232                         if obj is sentinel:
   233                             debug('feeder thread got sentinel -- exiting')
   234                             close()
   235                             return
   237                         if wacquire is None:
   238                             send(obj)
   239                         else:
   240                             wacquire()
   241                             try:
   242                                 send(obj)
   243                             finally:
   244                                 wrelease()
   245                 except IndexError:
   246                     pass
   247         except Exception, e:
   248             # Since this runs in a daemon thread the resources it uses
   249             # may be become unusable while the process is cleaning up.
   250             # We ignore errors which happen after the process has
   251             # started to cleanup.
   252             try:
   253                 if is_exiting():
   254                     info('error in queue thread: %s', e)
   255                 else:
   256                     import traceback
   257                     traceback.print_exc()
   258             except Exception:
   259                 pass
   261 _sentinel = object()
   263 #
   264 # A queue type which also supports join() and task_done() methods
   265 #
   266 # Note that if you do not call task_done() for each finished task then
   267 # eventually the counter's semaphore may overflow causing Bad Things
   268 # to happen.
   269 #
   271 class JoinableQueue(Queue):
   273     def __init__(self, maxsize=0):
   274         Queue.__init__(self, maxsize)
   275         self._unfinished_tasks = Semaphore(0)
   276         self._cond = Condition()
   278     def __getstate__(self):
   279         return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
   281     def __setstate__(self, state):
   282         Queue.__setstate__(self, state[:-2])
   283         self._cond, self._unfinished_tasks = state[-2:]
   285     def put(self, item, block=True, timeout=None):
   286         Queue.put(self, item, block, timeout)
   287         self._unfinished_tasks.release()
   289     def task_done(self):
   290         self._cond.acquire()
   291         try:
   292             if not self._unfinished_tasks.acquire(False):
   293                 raise ValueError('task_done() called too many times')
   294             if self._unfinished_tasks._semlock._is_zero():
   295                 self._cond.notify_all()
   296         finally:
   297             self._cond.release()
   299     def join(self):
   300         self._cond.acquire()
   301         try:
   302             if not self._unfinished_tasks._semlock._is_zero():
   303                 self._cond.wait()
   304         finally:
   305             self._cond.release()
   307 #
   308 # Simplified Queue type -- really just a locked pipe
   309 #
   311 class SimpleQueue(object):
   313     def __init__(self):
   314         self._reader, self._writer = Pipe(duplex=False)
   315         self._rlock = Lock()
   316         if sys.platform == 'win32':
   317             self._wlock = None
   318         else:
   319             self._wlock = Lock()
   320         self._make_methods()
   322     def empty(self):
   323         return not self._reader.poll()
   325     def __getstate__(self):
   326         assert_spawning(self)
   327         return (self._reader, self._writer, self._rlock, self._wlock)
   329     def __setstate__(self, state):
   330         (self._reader, self._writer, self._rlock, self._wlock) = state
   331         self._make_methods()
   333     def _make_methods(self):
   334         recv = self._reader.recv
   335         racquire, rrelease = self._rlock.acquire, self._rlock.release
   336         def get():
   337             racquire()
   338             try:
   339                 return recv()
   340             finally:
   341                 rrelease()
   342         self.get = get
   344         if self._wlock is None:
   345             # writes to a message oriented win32 pipe are atomic
   346             self.put = self._writer.send
   347         else:
   348             send = self._writer.send
   349             wacquire, wrelease = self._wlock.acquire, self._wlock.release
   350             def put(obj):
   351                 wacquire()
   352                 try:
   353                     return send(obj)
   354                 finally:
   355                     wrelease()
   356             self.put = put