symbian-qemu-0.9.1-12/python-win32-2.6.1/lib/multiprocessing/queues.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #
       
     2 # Module implementing queues
       
     3 #
       
     4 # multiprocessing/queues.py
       
     5 #
       
     6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
       
     7 #
       
     8 
       
     9 __all__ = ['Queue', 'SimpleQueue']
       
    10 
       
    11 import sys
       
    12 import os
       
    13 import threading
       
    14 import collections
       
    15 import time
       
    16 import atexit
       
    17 import weakref
       
    18 
       
    19 from Queue import Empty, Full
       
    20 import _multiprocessing
       
    21 from multiprocessing import Pipe
       
    22 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
       
    23 from multiprocessing.util import debug, info, Finalize, register_after_fork
       
    24 from multiprocessing.forking import assert_spawning
       
    25 
       
    26 #
       
    27 # Queue type using a pipe, buffer and thread
       
    28 #
       
    29 
       
    30 class Queue(object):
       
    31 
       
    32     def __init__(self, maxsize=0):
       
    33         if maxsize <= 0:
       
    34             maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
       
    35         self._maxsize = maxsize
       
    36         self._reader, self._writer = Pipe(duplex=False)
       
    37         self._rlock = Lock()
       
    38         self._opid = os.getpid()
       
    39         if sys.platform == 'win32':
       
    40             self._wlock = None
       
    41         else:
       
    42             self._wlock = Lock()
       
    43         self._sem = BoundedSemaphore(maxsize)
       
    44 
       
    45         self._after_fork()
       
    46 
       
    47         if sys.platform != 'win32':
       
    48             register_after_fork(self, Queue._after_fork)
       
    49 
       
    50     def __getstate__(self):
       
    51         assert_spawning(self)
       
    52         return (self._maxsize, self._reader, self._writer,
       
    53                 self._rlock, self._wlock, self._sem, self._opid)
       
    54 
       
    55     def __setstate__(self, state):
       
    56         (self._maxsize, self._reader, self._writer,
       
    57          self._rlock, self._wlock, self._sem, self._opid) = state
       
    58         self._after_fork()
       
    59 
       
    60     def _after_fork(self):
       
    61         debug('Queue._after_fork()')
       
    62         self._notempty = threading.Condition(threading.Lock())
       
    63         self._buffer = collections.deque()
       
    64         self._thread = None
       
    65         self._jointhread = None
       
    66         self._joincancelled = False
       
    67         self._closed = False
       
    68         self._close = None
       
    69         self._send = self._writer.send
       
    70         self._recv = self._reader.recv
       
    71         self._poll = self._reader.poll
       
    72 
       
    73     def put(self, obj, block=True, timeout=None):
       
    74         assert not self._closed
       
    75         if not self._sem.acquire(block, timeout):
       
    76             raise Full
       
    77 
       
    78         self._notempty.acquire()
       
    79         try:
       
    80             if self._thread is None:
       
    81                 self._start_thread()
       
    82             self._buffer.append(obj)
       
    83             self._notempty.notify()
       
    84         finally:
       
    85             self._notempty.release()
       
    86 
       
    87     def get(self, block=True, timeout=None):
       
    88         if block and timeout is None:
       
    89             self._rlock.acquire()
       
    90             try:
       
    91                 res = self._recv()
       
    92                 self._sem.release()
       
    93                 return res
       
    94             finally:
       
    95                 self._rlock.release()
       
    96 
       
    97         else:
       
    98             if block:
       
    99                 deadline = time.time() + timeout
       
   100             if not self._rlock.acquire(block, timeout):
       
   101                 raise Empty
       
   102             try:
       
   103                 if not self._poll(block and (deadline-time.time()) or 0.0):
       
   104                     raise Empty
       
   105                 res = self._recv()
       
   106                 self._sem.release()
       
   107                 return res
       
   108             finally:
       
   109                 self._rlock.release()
       
   110 
       
   111     def qsize(self):
       
   112         # Raises NotImplementError on Mac OSX because of broken sem_getvalue()
       
   113         return self._maxsize - self._sem._semlock._get_value()
       
   114 
       
   115     def empty(self):
       
   116         return not self._poll()
       
   117 
       
   118     def full(self):
       
   119         return self._sem._semlock._is_zero()
       
   120 
       
   121     def get_nowait(self):
       
   122         return self.get(False)
       
   123 
       
   124     def put_nowait(self, obj):
       
   125         return self.put(obj, False)
       
   126 
       
   127     def close(self):
       
   128         self._closed = True
       
   129         self._reader.close()
       
   130         if self._close:
       
   131             self._close()
       
   132 
       
   133     def join_thread(self):
       
   134         debug('Queue.join_thread()')
       
   135         assert self._closed
       
   136         if self._jointhread:
       
   137             self._jointhread()
       
   138 
       
   139     def cancel_join_thread(self):
       
   140         debug('Queue.cancel_join_thread()')
       
   141         self._joincancelled = True
       
   142         try:
       
   143             self._jointhread.cancel()
       
   144         except AttributeError:
       
   145             pass
       
   146 
       
   147     def _start_thread(self):
       
   148         debug('Queue._start_thread()')
       
   149 
       
   150         # Start thread which transfers data from buffer to pipe
       
   151         self._buffer.clear()
       
   152         self._thread = threading.Thread(
       
   153             target=Queue._feed,
       
   154             args=(self._buffer, self._notempty, self._send,
       
   155                   self._wlock, self._writer.close),
       
   156             name='QueueFeederThread'
       
   157             )
       
   158         self._thread.daemon = True
       
   159 
       
   160         debug('doing self._thread.start()')
       
   161         self._thread.start()
       
   162         debug('... done self._thread.start()')
       
   163 
       
   164         # On process exit we will wait for data to be flushed to pipe.
       
   165         #
       
   166         # However, if this process created the queue then all
       
   167         # processes which use the queue will be descendants of this
       
   168         # process.  Therefore waiting for the queue to be flushed
       
   169         # is pointless once all the child processes have been joined.
       
   170         created_by_this_process = (self._opid == os.getpid())
       
   171         if not self._joincancelled and not created_by_this_process:
       
   172             self._jointhread = Finalize(
       
   173                 self._thread, Queue._finalize_join,
       
   174                 [weakref.ref(self._thread)],
       
   175                 exitpriority=-5
       
   176                 )
       
   177 
       
   178         # Send sentinel to the thread queue object when garbage collected
       
   179         self._close = Finalize(
       
   180             self, Queue._finalize_close,
       
   181             [self._buffer, self._notempty],
       
   182             exitpriority=10
       
   183             )
       
   184 
       
   185     @staticmethod
       
   186     def _finalize_join(twr):
       
   187         debug('joining queue thread')
       
   188         thread = twr()
       
   189         if thread is not None:
       
   190             thread.join()
       
   191             debug('... queue thread joined')
       
   192         else:
       
   193             debug('... queue thread already dead')
       
   194 
       
   195     @staticmethod
       
   196     def _finalize_close(buffer, notempty):
       
   197         debug('telling queue thread to quit')
       
   198         notempty.acquire()
       
   199         try:
       
   200             buffer.append(_sentinel)
       
   201             notempty.notify()
       
   202         finally:
       
   203             notempty.release()
       
   204 
       
   205     @staticmethod
       
   206     def _feed(buffer, notempty, send, writelock, close):
       
   207         debug('starting thread to feed data to pipe')
       
   208         from .util import is_exiting
       
   209 
       
   210         nacquire = notempty.acquire
       
   211         nrelease = notempty.release
       
   212         nwait = notempty.wait
       
   213         bpopleft = buffer.popleft
       
   214         sentinel = _sentinel
       
   215         if sys.platform != 'win32':
       
   216             wacquire = writelock.acquire
       
   217             wrelease = writelock.release
       
   218         else:
       
   219             wacquire = None
       
   220 
       
   221         try:
       
   222             while 1:
       
   223                 nacquire()
       
   224                 try:
       
   225                     if not buffer:
       
   226                         nwait()
       
   227                 finally:
       
   228                     nrelease()
       
   229                 try:
       
   230                     while 1:
       
   231                         obj = bpopleft()
       
   232                         if obj is sentinel:
       
   233                             debug('feeder thread got sentinel -- exiting')
       
   234                             close()
       
   235                             return
       
   236 
       
   237                         if wacquire is None:
       
   238                             send(obj)
       
   239                         else:
       
   240                             wacquire()
       
   241                             try:
       
   242                                 send(obj)
       
   243                             finally:
       
   244                                 wrelease()
       
   245                 except IndexError:
       
   246                     pass
       
   247         except Exception, e:
       
   248             # Since this runs in a daemon thread the resources it uses
       
   249             # may be become unusable while the process is cleaning up.
       
   250             # We ignore errors which happen after the process has
       
   251             # started to cleanup.
       
   252             try:
       
   253                 if is_exiting():
       
   254                     info('error in queue thread: %s', e)
       
   255                 else:
       
   256                     import traceback
       
   257                     traceback.print_exc()
       
   258             except Exception:
       
   259                 pass
       
   260 
       
   261 _sentinel = object()
       
   262 
       
   263 #
       
   264 # A queue type which also supports join() and task_done() methods
       
   265 #
       
   266 # Note that if you do not call task_done() for each finished task then
       
   267 # eventually the counter's semaphore may overflow causing Bad Things
       
   268 # to happen.
       
   269 #
       
   270 
       
   271 class JoinableQueue(Queue):
       
   272 
       
   273     def __init__(self, maxsize=0):
       
   274         Queue.__init__(self, maxsize)
       
   275         self._unfinished_tasks = Semaphore(0)
       
   276         self._cond = Condition()
       
   277 
       
   278     def __getstate__(self):
       
   279         return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
       
   280 
       
   281     def __setstate__(self, state):
       
   282         Queue.__setstate__(self, state[:-2])
       
   283         self._cond, self._unfinished_tasks = state[-2:]
       
   284 
       
   285     def put(self, item, block=True, timeout=None):
       
   286         Queue.put(self, item, block, timeout)
       
   287         self._unfinished_tasks.release()
       
   288 
       
   289     def task_done(self):
       
   290         self._cond.acquire()
       
   291         try:
       
   292             if not self._unfinished_tasks.acquire(False):
       
   293                 raise ValueError('task_done() called too many times')
       
   294             if self._unfinished_tasks._semlock._is_zero():
       
   295                 self._cond.notify_all()
       
   296         finally:
       
   297             self._cond.release()
       
   298 
       
   299     def join(self):
       
   300         self._cond.acquire()
       
   301         try:
       
   302             if not self._unfinished_tasks._semlock._is_zero():
       
   303                 self._cond.wait()
       
   304         finally:
       
   305             self._cond.release()
       
   306 
       
   307 #
       
   308 # Simplified Queue type -- really just a locked pipe
       
   309 #
       
   310 
       
   311 class SimpleQueue(object):
       
   312 
       
   313     def __init__(self):
       
   314         self._reader, self._writer = Pipe(duplex=False)
       
   315         self._rlock = Lock()
       
   316         if sys.platform == 'win32':
       
   317             self._wlock = None
       
   318         else:
       
   319             self._wlock = Lock()
       
   320         self._make_methods()
       
   321 
       
   322     def empty(self):
       
   323         return not self._reader.poll()
       
   324 
       
   325     def __getstate__(self):
       
   326         assert_spawning(self)
       
   327         return (self._reader, self._writer, self._rlock, self._wlock)
       
   328 
       
   329     def __setstate__(self, state):
       
   330         (self._reader, self._writer, self._rlock, self._wlock) = state
       
   331         self._make_methods()
       
   332 
       
   333     def _make_methods(self):
       
   334         recv = self._reader.recv
       
   335         racquire, rrelease = self._rlock.acquire, self._rlock.release
       
   336         def get():
       
   337             racquire()
       
   338             try:
       
   339                 return recv()
       
   340             finally:
       
   341                 rrelease()
       
   342         self.get = get
       
   343 
       
   344         if self._wlock is None:
       
   345             # writes to a message oriented win32 pipe are atomic
       
   346             self.put = self._writer.send
       
   347         else:
       
   348             send = self._writer.send
       
   349             wacquire, wrelease = self._wlock.acquire, self._wlock.release
       
   350             def put(obj):
       
   351                 wacquire()
       
   352                 try:
       
   353                     return send(obj)
       
   354                 finally:
       
   355                     wrelease()
       
   356             self.put = put