|
1 # |
|
2 # Module implementing synchronization primitives |
|
3 # |
|
4 # multiprocessing/synchronize.py |
|
5 # |
|
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt |
|
7 # |
|
8 |
|
9 __all__ = [ |
|
10 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' |
|
11 ] |
|
12 |
|
13 import threading |
|
14 import os |
|
15 import sys |
|
16 |
|
17 from time import time as _time, sleep as _sleep |
|
18 |
|
19 import _multiprocessing |
|
20 from multiprocessing.process import current_process |
|
21 from multiprocessing.util import Finalize, register_after_fork, debug |
|
22 from multiprocessing.forking import assert_spawning, Popen |
|
23 |
|
24 # Try to import the mp.synchronize module cleanly, if it fails |
|
25 # raise ImportError for platforms lacking a working sem_open implementation. |
|
26 # See issue 3770 |
|
27 try: |
|
28 from _multiprocessing import SemLock |
|
29 except (ImportError): |
|
30 raise ImportError("This platform lacks a functioning sem_open" + |
|
31 " implementation, therefore, the required" + |
|
32 " synchronization primitives needed will not" + |
|
33 " function, see issue 3770.") |
|
34 |
|
35 # |
|
36 # Constants |
|
37 # |
|
38 |
|
39 RECURSIVE_MUTEX, SEMAPHORE = range(2) |
|
40 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX |
|
41 |
|
42 # |
|
43 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` |
|
44 # |
|
45 |
|
46 class SemLock(object): |
|
47 |
|
48 def __init__(self, kind, value, maxvalue): |
|
49 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) |
|
50 debug('created semlock with handle %s' % sl.handle) |
|
51 self._make_methods() |
|
52 |
|
53 if sys.platform != 'win32': |
|
54 def _after_fork(obj): |
|
55 obj._semlock._after_fork() |
|
56 register_after_fork(self, _after_fork) |
|
57 |
|
58 def _make_methods(self): |
|
59 self.acquire = self._semlock.acquire |
|
60 self.release = self._semlock.release |
|
61 self.__enter__ = self._semlock.__enter__ |
|
62 self.__exit__ = self._semlock.__exit__ |
|
63 |
|
64 def __getstate__(self): |
|
65 assert_spawning(self) |
|
66 sl = self._semlock |
|
67 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) |
|
68 |
|
69 def __setstate__(self, state): |
|
70 self._semlock = _multiprocessing.SemLock._rebuild(*state) |
|
71 debug('recreated blocker with handle %r' % state[0]) |
|
72 self._make_methods() |
|
73 |
|
74 # |
|
75 # Semaphore |
|
76 # |
|
77 |
|
78 class Semaphore(SemLock): |
|
79 |
|
80 def __init__(self, value=1): |
|
81 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX) |
|
82 |
|
83 def get_value(self): |
|
84 return self._semlock._get_value() |
|
85 |
|
86 def __repr__(self): |
|
87 try: |
|
88 value = self._semlock._get_value() |
|
89 except Exception: |
|
90 value = 'unknown' |
|
91 return '<Semaphore(value=%s)>' % value |
|
92 |
|
93 # |
|
94 # Bounded semaphore |
|
95 # |
|
96 |
|
97 class BoundedSemaphore(Semaphore): |
|
98 |
|
99 def __init__(self, value=1): |
|
100 SemLock.__init__(self, SEMAPHORE, value, value) |
|
101 |
|
102 def __repr__(self): |
|
103 try: |
|
104 value = self._semlock._get_value() |
|
105 except Exception: |
|
106 value = 'unknown' |
|
107 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \ |
|
108 (value, self._semlock.maxvalue) |
|
109 |
|
110 # |
|
111 # Non-recursive lock |
|
112 # |
|
113 |
|
114 class Lock(SemLock): |
|
115 |
|
116 def __init__(self): |
|
117 SemLock.__init__(self, SEMAPHORE, 1, 1) |
|
118 |
|
119 def __repr__(self): |
|
120 try: |
|
121 if self._semlock._is_mine(): |
|
122 name = current_process().name |
|
123 if threading.current_thread().name != 'MainThread': |
|
124 name += '|' + threading.current_thread().name |
|
125 elif self._semlock._get_value() == 1: |
|
126 name = 'None' |
|
127 elif self._semlock._count() > 0: |
|
128 name = 'SomeOtherThread' |
|
129 else: |
|
130 name = 'SomeOtherProcess' |
|
131 except Exception: |
|
132 name = 'unknown' |
|
133 return '<Lock(owner=%s)>' % name |
|
134 |
|
135 # |
|
136 # Recursive lock |
|
137 # |
|
138 |
|
139 class RLock(SemLock): |
|
140 |
|
141 def __init__(self): |
|
142 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) |
|
143 |
|
144 def __repr__(self): |
|
145 try: |
|
146 if self._semlock._is_mine(): |
|
147 name = current_process().name |
|
148 if threading.current_thread().name != 'MainThread': |
|
149 name += '|' + threading.current_thread().name |
|
150 count = self._semlock._count() |
|
151 elif self._semlock._get_value() == 1: |
|
152 name, count = 'None', 0 |
|
153 elif self._semlock._count() > 0: |
|
154 name, count = 'SomeOtherThread', 'nonzero' |
|
155 else: |
|
156 name, count = 'SomeOtherProcess', 'nonzero' |
|
157 except Exception: |
|
158 name, count = 'unknown', 'unknown' |
|
159 return '<RLock(%s, %s)>' % (name, count) |
|
160 |
|
161 # |
|
162 # Condition variable |
|
163 # |
|
164 |
|
165 class Condition(object): |
|
166 |
|
167 def __init__(self, lock=None): |
|
168 self._lock = lock or RLock() |
|
169 self._sleeping_count = Semaphore(0) |
|
170 self._woken_count = Semaphore(0) |
|
171 self._wait_semaphore = Semaphore(0) |
|
172 self._make_methods() |
|
173 |
|
174 def __getstate__(self): |
|
175 assert_spawning(self) |
|
176 return (self._lock, self._sleeping_count, |
|
177 self._woken_count, self._wait_semaphore) |
|
178 |
|
179 def __setstate__(self, state): |
|
180 (self._lock, self._sleeping_count, |
|
181 self._woken_count, self._wait_semaphore) = state |
|
182 self._make_methods() |
|
183 |
|
184 def _make_methods(self): |
|
185 self.acquire = self._lock.acquire |
|
186 self.release = self._lock.release |
|
187 self.__enter__ = self._lock.__enter__ |
|
188 self.__exit__ = self._lock.__exit__ |
|
189 |
|
190 def __repr__(self): |
|
191 try: |
|
192 num_waiters = (self._sleeping_count._semlock._get_value() - |
|
193 self._woken_count._semlock._get_value()) |
|
194 except Exception: |
|
195 num_waiters = 'unkown' |
|
196 return '<Condition(%s, %s)>' % (self._lock, num_waiters) |
|
197 |
|
198 def wait(self, timeout=None): |
|
199 assert self._lock._semlock._is_mine(), \ |
|
200 'must acquire() condition before using wait()' |
|
201 |
|
202 # indicate that this thread is going to sleep |
|
203 self._sleeping_count.release() |
|
204 |
|
205 # release lock |
|
206 count = self._lock._semlock._count() |
|
207 for i in xrange(count): |
|
208 self._lock.release() |
|
209 |
|
210 try: |
|
211 # wait for notification or timeout |
|
212 self._wait_semaphore.acquire(True, timeout) |
|
213 finally: |
|
214 # indicate that this thread has woken |
|
215 self._woken_count.release() |
|
216 |
|
217 # reacquire lock |
|
218 for i in xrange(count): |
|
219 self._lock.acquire() |
|
220 |
|
221 def notify(self): |
|
222 assert self._lock._semlock._is_mine(), 'lock is not owned' |
|
223 assert not self._wait_semaphore.acquire(False) |
|
224 |
|
225 # to take account of timeouts since last notify() we subtract |
|
226 # woken_count from sleeping_count and rezero woken_count |
|
227 while self._woken_count.acquire(False): |
|
228 res = self._sleeping_count.acquire(False) |
|
229 assert res |
|
230 |
|
231 if self._sleeping_count.acquire(False): # try grabbing a sleeper |
|
232 self._wait_semaphore.release() # wake up one sleeper |
|
233 self._woken_count.acquire() # wait for the sleeper to wake |
|
234 |
|
235 # rezero _wait_semaphore in case a timeout just happened |
|
236 self._wait_semaphore.acquire(False) |
|
237 |
|
238 def notify_all(self): |
|
239 assert self._lock._semlock._is_mine(), 'lock is not owned' |
|
240 assert not self._wait_semaphore.acquire(False) |
|
241 |
|
242 # to take account of timeouts since last notify*() we subtract |
|
243 # woken_count from sleeping_count and rezero woken_count |
|
244 while self._woken_count.acquire(False): |
|
245 res = self._sleeping_count.acquire(False) |
|
246 assert res |
|
247 |
|
248 sleepers = 0 |
|
249 while self._sleeping_count.acquire(False): |
|
250 self._wait_semaphore.release() # wake up one sleeper |
|
251 sleepers += 1 |
|
252 |
|
253 if sleepers: |
|
254 for i in xrange(sleepers): |
|
255 self._woken_count.acquire() # wait for a sleeper to wake |
|
256 |
|
257 # rezero wait_semaphore in case some timeouts just happened |
|
258 while self._wait_semaphore.acquire(False): |
|
259 pass |
|
260 |
|
261 # |
|
262 # Event |
|
263 # |
|
264 |
|
265 class Event(object): |
|
266 |
|
267 def __init__(self): |
|
268 self._cond = Condition(Lock()) |
|
269 self._flag = Semaphore(0) |
|
270 |
|
271 def is_set(self): |
|
272 self._cond.acquire() |
|
273 try: |
|
274 if self._flag.acquire(False): |
|
275 self._flag.release() |
|
276 return True |
|
277 return False |
|
278 finally: |
|
279 self._cond.release() |
|
280 |
|
281 def set(self): |
|
282 self._cond.acquire() |
|
283 try: |
|
284 self._flag.acquire(False) |
|
285 self._flag.release() |
|
286 self._cond.notify_all() |
|
287 finally: |
|
288 self._cond.release() |
|
289 |
|
290 def clear(self): |
|
291 self._cond.acquire() |
|
292 try: |
|
293 self._flag.acquire(False) |
|
294 finally: |
|
295 self._cond.release() |
|
296 |
|
297 def wait(self, timeout=None): |
|
298 self._cond.acquire() |
|
299 try: |
|
300 if self._flag.acquire(False): |
|
301 self._flag.release() |
|
302 else: |
|
303 self._cond.wait(timeout) |
|
304 finally: |
|
305 self._cond.release() |