|
1 # |
|
2 # Module implementing queues |
|
3 # |
|
4 # multiprocessing/queues.py |
|
5 # |
|
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt |
|
7 # |
|
8 |
|
9 __all__ = ['Queue', 'SimpleQueue'] |
|
10 |
|
11 import sys |
|
12 import os |
|
13 import threading |
|
14 import collections |
|
15 import time |
|
16 import atexit |
|
17 import weakref |
|
18 |
|
19 from Queue import Empty, Full |
|
20 import _multiprocessing |
|
21 from multiprocessing import Pipe |
|
22 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition |
|
23 from multiprocessing.util import debug, info, Finalize, register_after_fork |
|
24 from multiprocessing.forking import assert_spawning |
|
25 |
|
26 # |
|
27 # Queue type using a pipe, buffer and thread |
|
28 # |
|
29 |
|
30 class Queue(object): |
|
31 |
|
32 def __init__(self, maxsize=0): |
|
33 if maxsize <= 0: |
|
34 maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX |
|
35 self._maxsize = maxsize |
|
36 self._reader, self._writer = Pipe(duplex=False) |
|
37 self._rlock = Lock() |
|
38 self._opid = os.getpid() |
|
39 if sys.platform == 'win32': |
|
40 self._wlock = None |
|
41 else: |
|
42 self._wlock = Lock() |
|
43 self._sem = BoundedSemaphore(maxsize) |
|
44 |
|
45 self._after_fork() |
|
46 |
|
47 if sys.platform != 'win32': |
|
48 register_after_fork(self, Queue._after_fork) |
|
49 |
|
50 def __getstate__(self): |
|
51 assert_spawning(self) |
|
52 return (self._maxsize, self._reader, self._writer, |
|
53 self._rlock, self._wlock, self._sem, self._opid) |
|
54 |
|
55 def __setstate__(self, state): |
|
56 (self._maxsize, self._reader, self._writer, |
|
57 self._rlock, self._wlock, self._sem, self._opid) = state |
|
58 self._after_fork() |
|
59 |
|
60 def _after_fork(self): |
|
61 debug('Queue._after_fork()') |
|
62 self._notempty = threading.Condition(threading.Lock()) |
|
63 self._buffer = collections.deque() |
|
64 self._thread = None |
|
65 self._jointhread = None |
|
66 self._joincancelled = False |
|
67 self._closed = False |
|
68 self._close = None |
|
69 self._send = self._writer.send |
|
70 self._recv = self._reader.recv |
|
71 self._poll = self._reader.poll |
|
72 |
|
73 def put(self, obj, block=True, timeout=None): |
|
74 assert not self._closed |
|
75 if not self._sem.acquire(block, timeout): |
|
76 raise Full |
|
77 |
|
78 self._notempty.acquire() |
|
79 try: |
|
80 if self._thread is None: |
|
81 self._start_thread() |
|
82 self._buffer.append(obj) |
|
83 self._notempty.notify() |
|
84 finally: |
|
85 self._notempty.release() |
|
86 |
|
87 def get(self, block=True, timeout=None): |
|
88 if block and timeout is None: |
|
89 self._rlock.acquire() |
|
90 try: |
|
91 res = self._recv() |
|
92 self._sem.release() |
|
93 return res |
|
94 finally: |
|
95 self._rlock.release() |
|
96 |
|
97 else: |
|
98 if block: |
|
99 deadline = time.time() + timeout |
|
100 if not self._rlock.acquire(block, timeout): |
|
101 raise Empty |
|
102 try: |
|
103 if not self._poll(block and (deadline-time.time()) or 0.0): |
|
104 raise Empty |
|
105 res = self._recv() |
|
106 self._sem.release() |
|
107 return res |
|
108 finally: |
|
109 self._rlock.release() |
|
110 |
|
111 def qsize(self): |
|
112 # Raises NotImplementError on Mac OSX because of broken sem_getvalue() |
|
113 return self._maxsize - self._sem._semlock._get_value() |
|
114 |
|
115 def empty(self): |
|
116 return not self._poll() |
|
117 |
|
118 def full(self): |
|
119 return self._sem._semlock._is_zero() |
|
120 |
|
121 def get_nowait(self): |
|
122 return self.get(False) |
|
123 |
|
124 def put_nowait(self, obj): |
|
125 return self.put(obj, False) |
|
126 |
|
127 def close(self): |
|
128 self._closed = True |
|
129 self._reader.close() |
|
130 if self._close: |
|
131 self._close() |
|
132 |
|
133 def join_thread(self): |
|
134 debug('Queue.join_thread()') |
|
135 assert self._closed |
|
136 if self._jointhread: |
|
137 self._jointhread() |
|
138 |
|
139 def cancel_join_thread(self): |
|
140 debug('Queue.cancel_join_thread()') |
|
141 self._joincancelled = True |
|
142 try: |
|
143 self._jointhread.cancel() |
|
144 except AttributeError: |
|
145 pass |
|
146 |
|
147 def _start_thread(self): |
|
148 debug('Queue._start_thread()') |
|
149 |
|
150 # Start thread which transfers data from buffer to pipe |
|
151 self._buffer.clear() |
|
152 self._thread = threading.Thread( |
|
153 target=Queue._feed, |
|
154 args=(self._buffer, self._notempty, self._send, |
|
155 self._wlock, self._writer.close), |
|
156 name='QueueFeederThread' |
|
157 ) |
|
158 self._thread.daemon = True |
|
159 |
|
160 debug('doing self._thread.start()') |
|
161 self._thread.start() |
|
162 debug('... done self._thread.start()') |
|
163 |
|
164 # On process exit we will wait for data to be flushed to pipe. |
|
165 # |
|
166 # However, if this process created the queue then all |
|
167 # processes which use the queue will be descendants of this |
|
168 # process. Therefore waiting for the queue to be flushed |
|
169 # is pointless once all the child processes have been joined. |
|
170 created_by_this_process = (self._opid == os.getpid()) |
|
171 if not self._joincancelled and not created_by_this_process: |
|
172 self._jointhread = Finalize( |
|
173 self._thread, Queue._finalize_join, |
|
174 [weakref.ref(self._thread)], |
|
175 exitpriority=-5 |
|
176 ) |
|
177 |
|
178 # Send sentinel to the thread queue object when garbage collected |
|
179 self._close = Finalize( |
|
180 self, Queue._finalize_close, |
|
181 [self._buffer, self._notempty], |
|
182 exitpriority=10 |
|
183 ) |
|
184 |
|
185 @staticmethod |
|
186 def _finalize_join(twr): |
|
187 debug('joining queue thread') |
|
188 thread = twr() |
|
189 if thread is not None: |
|
190 thread.join() |
|
191 debug('... queue thread joined') |
|
192 else: |
|
193 debug('... queue thread already dead') |
|
194 |
|
195 @staticmethod |
|
196 def _finalize_close(buffer, notempty): |
|
197 debug('telling queue thread to quit') |
|
198 notempty.acquire() |
|
199 try: |
|
200 buffer.append(_sentinel) |
|
201 notempty.notify() |
|
202 finally: |
|
203 notempty.release() |
|
204 |
|
205 @staticmethod |
|
206 def _feed(buffer, notempty, send, writelock, close): |
|
207 debug('starting thread to feed data to pipe') |
|
208 from .util import is_exiting |
|
209 |
|
210 nacquire = notempty.acquire |
|
211 nrelease = notempty.release |
|
212 nwait = notempty.wait |
|
213 bpopleft = buffer.popleft |
|
214 sentinel = _sentinel |
|
215 if sys.platform != 'win32': |
|
216 wacquire = writelock.acquire |
|
217 wrelease = writelock.release |
|
218 else: |
|
219 wacquire = None |
|
220 |
|
221 try: |
|
222 while 1: |
|
223 nacquire() |
|
224 try: |
|
225 if not buffer: |
|
226 nwait() |
|
227 finally: |
|
228 nrelease() |
|
229 try: |
|
230 while 1: |
|
231 obj = bpopleft() |
|
232 if obj is sentinel: |
|
233 debug('feeder thread got sentinel -- exiting') |
|
234 close() |
|
235 return |
|
236 |
|
237 if wacquire is None: |
|
238 send(obj) |
|
239 else: |
|
240 wacquire() |
|
241 try: |
|
242 send(obj) |
|
243 finally: |
|
244 wrelease() |
|
245 except IndexError: |
|
246 pass |
|
247 except Exception, e: |
|
248 # Since this runs in a daemon thread the resources it uses |
|
249 # may be become unusable while the process is cleaning up. |
|
250 # We ignore errors which happen after the process has |
|
251 # started to cleanup. |
|
252 try: |
|
253 if is_exiting(): |
|
254 info('error in queue thread: %s', e) |
|
255 else: |
|
256 import traceback |
|
257 traceback.print_exc() |
|
258 except Exception: |
|
259 pass |
|
260 |
|
261 _sentinel = object() |
|
262 |
|
263 # |
|
264 # A queue type which also supports join() and task_done() methods |
|
265 # |
|
266 # Note that if you do not call task_done() for each finished task then |
|
267 # eventually the counter's semaphore may overflow causing Bad Things |
|
268 # to happen. |
|
269 # |
|
270 |
|
271 class JoinableQueue(Queue): |
|
272 |
|
273 def __init__(self, maxsize=0): |
|
274 Queue.__init__(self, maxsize) |
|
275 self._unfinished_tasks = Semaphore(0) |
|
276 self._cond = Condition() |
|
277 |
|
278 def __getstate__(self): |
|
279 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) |
|
280 |
|
281 def __setstate__(self, state): |
|
282 Queue.__setstate__(self, state[:-2]) |
|
283 self._cond, self._unfinished_tasks = state[-2:] |
|
284 |
|
285 def put(self, item, block=True, timeout=None): |
|
286 Queue.put(self, item, block, timeout) |
|
287 self._unfinished_tasks.release() |
|
288 |
|
289 def task_done(self): |
|
290 self._cond.acquire() |
|
291 try: |
|
292 if not self._unfinished_tasks.acquire(False): |
|
293 raise ValueError('task_done() called too many times') |
|
294 if self._unfinished_tasks._semlock._is_zero(): |
|
295 self._cond.notify_all() |
|
296 finally: |
|
297 self._cond.release() |
|
298 |
|
299 def join(self): |
|
300 self._cond.acquire() |
|
301 try: |
|
302 if not self._unfinished_tasks._semlock._is_zero(): |
|
303 self._cond.wait() |
|
304 finally: |
|
305 self._cond.release() |
|
306 |
|
307 # |
|
308 # Simplified Queue type -- really just a locked pipe |
|
309 # |
|
310 |
|
311 class SimpleQueue(object): |
|
312 |
|
313 def __init__(self): |
|
314 self._reader, self._writer = Pipe(duplex=False) |
|
315 self._rlock = Lock() |
|
316 if sys.platform == 'win32': |
|
317 self._wlock = None |
|
318 else: |
|
319 self._wlock = Lock() |
|
320 self._make_methods() |
|
321 |
|
322 def empty(self): |
|
323 return not self._reader.poll() |
|
324 |
|
325 def __getstate__(self): |
|
326 assert_spawning(self) |
|
327 return (self._reader, self._writer, self._rlock, self._wlock) |
|
328 |
|
329 def __setstate__(self, state): |
|
330 (self._reader, self._writer, self._rlock, self._wlock) = state |
|
331 self._make_methods() |
|
332 |
|
333 def _make_methods(self): |
|
334 recv = self._reader.recv |
|
335 racquire, rrelease = self._rlock.acquire, self._rlock.release |
|
336 def get(): |
|
337 racquire() |
|
338 try: |
|
339 return recv() |
|
340 finally: |
|
341 rrelease() |
|
342 self.get = get |
|
343 |
|
344 if self._wlock is None: |
|
345 # writes to a message oriented win32 pipe are atomic |
|
346 self.put = self._writer.send |
|
347 else: |
|
348 send = self._writer.send |
|
349 wacquire, wrelease = self._wlock.acquire, self._wlock.release |
|
350 def put(obj): |
|
351 wacquire() |
|
352 try: |
|
353 return send(obj) |
|
354 finally: |
|
355 wrelease() |
|
356 self.put = put |