symbian-qemu-0.9.1-12/python-2.6.1/Lib/multiprocessing/synchronize.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #
       
     2 # Module implementing synchronization primitives
       
     3 #
       
     4 # multiprocessing/synchronize.py
       
     5 #
       
     6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
       
     7 #
       
     8 
       
     9 __all__ = [
       
    10     'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
       
    11     ]
       
    12 
       
    13 import threading
       
    14 import os
       
    15 import sys
       
    16 
       
    17 from time import time as _time, sleep as _sleep
       
    18 
       
    19 import _multiprocessing
       
    20 from multiprocessing.process import current_process
       
    21 from multiprocessing.util import Finalize, register_after_fork, debug
       
    22 from multiprocessing.forking import assert_spawning, Popen
       
    23 
       
    24 # Try to import the mp.synchronize module cleanly, if it fails
       
    25 # raise ImportError for platforms lacking a working sem_open implementation.
       
    26 # See issue 3770
       
    27 try:
       
    28     from _multiprocessing import SemLock
       
    29 except (ImportError):
       
    30     raise ImportError("This platform lacks a functioning sem_open" +
       
    31                       " implementation, therefore, the required" +
       
    32                       " synchronization primitives needed will not" +
       
    33                       " function, see issue 3770.")
       
    34 
       
    35 #
       
    36 # Constants
       
    37 #
       
    38 
       
    39 RECURSIVE_MUTEX, SEMAPHORE = range(2)
       
    40 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
       
    41 
       
    42 #
       
    43 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
       
    44 #
       
    45 
       
    46 class SemLock(object):
       
    47 
       
    48     def __init__(self, kind, value, maxvalue):
       
    49         sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
       
    50         debug('created semlock with handle %s' % sl.handle)
       
    51         self._make_methods()
       
    52 
       
    53         if sys.platform != 'win32':
       
    54             def _after_fork(obj):
       
    55                 obj._semlock._after_fork()
       
    56             register_after_fork(self, _after_fork)
       
    57 
       
    58     def _make_methods(self):
       
    59         self.acquire = self._semlock.acquire
       
    60         self.release = self._semlock.release
       
    61         self.__enter__ = self._semlock.__enter__
       
    62         self.__exit__ = self._semlock.__exit__
       
    63 
       
    64     def __getstate__(self):
       
    65         assert_spawning(self)
       
    66         sl = self._semlock
       
    67         return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
       
    68 
       
    69     def __setstate__(self, state):
       
    70         self._semlock = _multiprocessing.SemLock._rebuild(*state)
       
    71         debug('recreated blocker with handle %r' % state[0])
       
    72         self._make_methods()
       
    73 
       
    74 #
       
    75 # Semaphore
       
    76 #
       
    77 
       
    78 class Semaphore(SemLock):
       
    79 
       
    80     def __init__(self, value=1):
       
    81         SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
       
    82 
       
    83     def get_value(self):
       
    84         return self._semlock._get_value()
       
    85 
       
    86     def __repr__(self):
       
    87         try:
       
    88             value = self._semlock._get_value()
       
    89         except Exception:
       
    90             value = 'unknown'
       
    91         return '<Semaphore(value=%s)>' % value
       
    92 
       
    93 #
       
    94 # Bounded semaphore
       
    95 #
       
    96 
       
    97 class BoundedSemaphore(Semaphore):
       
    98 
       
    99     def __init__(self, value=1):
       
   100         SemLock.__init__(self, SEMAPHORE, value, value)
       
   101 
       
   102     def __repr__(self):
       
   103         try:
       
   104             value = self._semlock._get_value()
       
   105         except Exception:
       
   106             value = 'unknown'
       
   107         return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
       
   108                (value, self._semlock.maxvalue)
       
   109 
       
   110 #
       
   111 # Non-recursive lock
       
   112 #
       
   113 
       
   114 class Lock(SemLock):
       
   115 
       
   116     def __init__(self):
       
   117         SemLock.__init__(self, SEMAPHORE, 1, 1)
       
   118 
       
   119     def __repr__(self):
       
   120         try:
       
   121             if self._semlock._is_mine():
       
   122                 name = current_process().name
       
   123                 if threading.current_thread().name != 'MainThread':
       
   124                     name += '|' + threading.current_thread().name
       
   125             elif self._semlock._get_value() == 1:
       
   126                 name = 'None'
       
   127             elif self._semlock._count() > 0:
       
   128                 name = 'SomeOtherThread'
       
   129             else:
       
   130                 name = 'SomeOtherProcess'
       
   131         except Exception:
       
   132             name = 'unknown'
       
   133         return '<Lock(owner=%s)>' % name
       
   134 
       
   135 #
       
   136 # Recursive lock
       
   137 #
       
   138 
       
   139 class RLock(SemLock):
       
   140 
       
   141     def __init__(self):
       
   142         SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
       
   143 
       
   144     def __repr__(self):
       
   145         try:
       
   146             if self._semlock._is_mine():
       
   147                 name = current_process().name
       
   148                 if threading.current_thread().name != 'MainThread':
       
   149                     name += '|' + threading.current_thread().name
       
   150                 count = self._semlock._count()
       
   151             elif self._semlock._get_value() == 1:
       
   152                 name, count = 'None', 0
       
   153             elif self._semlock._count() > 0:
       
   154                 name, count = 'SomeOtherThread', 'nonzero'
       
   155             else:
       
   156                 name, count = 'SomeOtherProcess', 'nonzero'
       
   157         except Exception:
       
   158             name, count = 'unknown', 'unknown'
       
   159         return '<RLock(%s, %s)>' % (name, count)
       
   160 
       
   161 #
       
   162 # Condition variable
       
   163 #
       
   164 
       
   165 class Condition(object):
       
   166 
       
   167     def __init__(self, lock=None):
       
   168         self._lock = lock or RLock()
       
   169         self._sleeping_count = Semaphore(0)
       
   170         self._woken_count = Semaphore(0)
       
   171         self._wait_semaphore = Semaphore(0)
       
   172         self._make_methods()
       
   173 
       
   174     def __getstate__(self):
       
   175         assert_spawning(self)
       
   176         return (self._lock, self._sleeping_count,
       
   177                 self._woken_count, self._wait_semaphore)
       
   178 
       
   179     def __setstate__(self, state):
       
   180         (self._lock, self._sleeping_count,
       
   181          self._woken_count, self._wait_semaphore) = state
       
   182         self._make_methods()
       
   183 
       
   184     def _make_methods(self):
       
   185         self.acquire = self._lock.acquire
       
   186         self.release = self._lock.release
       
   187         self.__enter__ = self._lock.__enter__
       
   188         self.__exit__ = self._lock.__exit__
       
   189 
       
   190     def __repr__(self):
       
   191         try:
       
   192             num_waiters = (self._sleeping_count._semlock._get_value() -
       
   193                            self._woken_count._semlock._get_value())
       
   194         except Exception:
       
   195             num_waiters = 'unkown'
       
   196         return '<Condition(%s, %s)>' % (self._lock, num_waiters)
       
   197 
       
   198     def wait(self, timeout=None):
       
   199         assert self._lock._semlock._is_mine(), \
       
   200                'must acquire() condition before using wait()'
       
   201 
       
   202         # indicate that this thread is going to sleep
       
   203         self._sleeping_count.release()
       
   204 
       
   205         # release lock
       
   206         count = self._lock._semlock._count()
       
   207         for i in xrange(count):
       
   208             self._lock.release()
       
   209 
       
   210         try:
       
   211             # wait for notification or timeout
       
   212             self._wait_semaphore.acquire(True, timeout)
       
   213         finally:
       
   214             # indicate that this thread has woken
       
   215             self._woken_count.release()
       
   216 
       
   217             # reacquire lock
       
   218             for i in xrange(count):
       
   219                 self._lock.acquire()
       
   220 
       
   221     def notify(self):
       
   222         assert self._lock._semlock._is_mine(), 'lock is not owned'
       
   223         assert not self._wait_semaphore.acquire(False)
       
   224 
       
   225         # to take account of timeouts since last notify() we subtract
       
   226         # woken_count from sleeping_count and rezero woken_count
       
   227         while self._woken_count.acquire(False):
       
   228             res = self._sleeping_count.acquire(False)
       
   229             assert res
       
   230 
       
   231         if self._sleeping_count.acquire(False): # try grabbing a sleeper
       
   232             self._wait_semaphore.release()      # wake up one sleeper
       
   233             self._woken_count.acquire()         # wait for the sleeper to wake
       
   234 
       
   235             # rezero _wait_semaphore in case a timeout just happened
       
   236             self._wait_semaphore.acquire(False)
       
   237 
       
   238     def notify_all(self):
       
   239         assert self._lock._semlock._is_mine(), 'lock is not owned'
       
   240         assert not self._wait_semaphore.acquire(False)
       
   241 
       
   242         # to take account of timeouts since last notify*() we subtract
       
   243         # woken_count from sleeping_count and rezero woken_count
       
   244         while self._woken_count.acquire(False):
       
   245             res = self._sleeping_count.acquire(False)
       
   246             assert res
       
   247 
       
   248         sleepers = 0
       
   249         while self._sleeping_count.acquire(False):
       
   250             self._wait_semaphore.release()        # wake up one sleeper
       
   251             sleepers += 1
       
   252 
       
   253         if sleepers:
       
   254             for i in xrange(sleepers):
       
   255                 self._woken_count.acquire()       # wait for a sleeper to wake
       
   256 
       
   257             # rezero wait_semaphore in case some timeouts just happened
       
   258             while self._wait_semaphore.acquire(False):
       
   259                 pass
       
   260 
       
   261 #
       
   262 # Event
       
   263 #
       
   264 
       
   265 class Event(object):
       
   266 
       
   267     def __init__(self):
       
   268         self._cond = Condition(Lock())
       
   269         self._flag = Semaphore(0)
       
   270 
       
   271     def is_set(self):
       
   272         self._cond.acquire()
       
   273         try:
       
   274             if self._flag.acquire(False):
       
   275                 self._flag.release()
       
   276                 return True
       
   277             return False
       
   278         finally:
       
   279             self._cond.release()
       
   280 
       
   281     def set(self):
       
   282         self._cond.acquire()
       
   283         try:
       
   284             self._flag.acquire(False)
       
   285             self._flag.release()
       
   286             self._cond.notify_all()
       
   287         finally:
       
   288             self._cond.release()
       
   289 
       
   290     def clear(self):
       
   291         self._cond.acquire()
       
   292         try:
       
   293             self._flag.acquire(False)
       
   294         finally:
       
   295             self._cond.release()
       
   296 
       
   297     def wait(self, timeout=None):
       
   298         self._cond.acquire()
       
   299         try:
       
   300             if self._flag.acquire(False):
       
   301                 self._flag.release()
       
   302             else:
       
   303                 self._cond.wait(timeout)
       
   304         finally:
       
   305             self._cond.release()