|
1 # |
|
2 # Module providing the `SyncManager` class for dealing |
|
3 # with shared objects |
|
4 # |
|
5 # multiprocessing/managers.py |
|
6 # |
|
7 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt |
|
8 # |
|
9 |
|
10 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] |
|
11 |
|
12 # |
|
13 # Imports |
|
14 # |
|
15 |
|
16 import os |
|
17 import sys |
|
18 import weakref |
|
19 import threading |
|
20 import array |
|
21 import Queue |
|
22 |
|
23 from traceback import format_exc |
|
24 from multiprocessing import Process, current_process, active_children, Pool, util, connection |
|
25 from multiprocessing.process import AuthenticationString |
|
26 from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler |
|
27 from multiprocessing.util import Finalize, info |
|
28 |
|
29 try: |
|
30 from cPickle import PicklingError |
|
31 except ImportError: |
|
32 from pickle import PicklingError |
|
33 |
|
34 # |
|
35 # Register some things for pickling |
|
36 # |
|
37 |
|
38 def reduce_array(a): |
|
39 return array.array, (a.typecode, a.tostring()) |
|
40 ForkingPickler.register(array.array, reduce_array) |
|
41 |
|
42 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] |
|
43 |
|
44 # |
|
45 # Type for identifying shared objects |
|
46 # |
|
47 |
|
48 class Token(object): |
|
49 ''' |
|
50 Type to uniquely indentify a shared object |
|
51 ''' |
|
52 __slots__ = ('typeid', 'address', 'id') |
|
53 |
|
54 def __init__(self, typeid, address, id): |
|
55 (self.typeid, self.address, self.id) = (typeid, address, id) |
|
56 |
|
57 def __getstate__(self): |
|
58 return (self.typeid, self.address, self.id) |
|
59 |
|
60 def __setstate__(self, state): |
|
61 (self.typeid, self.address, self.id) = state |
|
62 |
|
63 def __repr__(self): |
|
64 return 'Token(typeid=%r, address=%r, id=%r)' % \ |
|
65 (self.typeid, self.address, self.id) |
|
66 |
|
67 # |
|
68 # Function for communication with a manager's server process |
|
69 # |
|
70 |
|
71 def dispatch(c, id, methodname, args=(), kwds={}): |
|
72 ''' |
|
73 Send a message to manager using connection `c` and return response |
|
74 ''' |
|
75 c.send((id, methodname, args, kwds)) |
|
76 kind, result = c.recv() |
|
77 if kind == '#RETURN': |
|
78 return result |
|
79 raise convert_to_error(kind, result) |
|
80 |
|
81 def convert_to_error(kind, result): |
|
82 if kind == '#ERROR': |
|
83 return result |
|
84 elif kind == '#TRACEBACK': |
|
85 assert type(result) is str |
|
86 return RemoteError(result) |
|
87 elif kind == '#UNSERIALIZABLE': |
|
88 assert type(result) is str |
|
89 return RemoteError('Unserializable message: %s\n' % result) |
|
90 else: |
|
91 return ValueError('Unrecognized message type') |
|
92 |
|
93 class RemoteError(Exception): |
|
94 def __str__(self): |
|
95 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) |
|
96 |
|
97 # |
|
98 # Functions for finding the method names of an object |
|
99 # |
|
100 |
|
101 def all_methods(obj): |
|
102 ''' |
|
103 Return a list of names of methods of `obj` |
|
104 ''' |
|
105 temp = [] |
|
106 for name in dir(obj): |
|
107 func = getattr(obj, name) |
|
108 if hasattr(func, '__call__'): |
|
109 temp.append(name) |
|
110 return temp |
|
111 |
|
112 def public_methods(obj): |
|
113 ''' |
|
114 Return a list of names of methods of `obj` which do not start with '_' |
|
115 ''' |
|
116 return [name for name in all_methods(obj) if name[0] != '_'] |
|
117 |
|
118 # |
|
119 # Server which is run in a process controlled by a manager |
|
120 # |
|
121 |
|
122 class Server(object): |
|
123 ''' |
|
124 Server class which runs in a process controlled by a manager object |
|
125 ''' |
|
126 public = ['shutdown', 'create', 'accept_connection', 'get_methods', |
|
127 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] |
|
128 |
|
129 def __init__(self, registry, address, authkey, serializer): |
|
130 assert isinstance(authkey, bytes) |
|
131 self.registry = registry |
|
132 self.authkey = AuthenticationString(authkey) |
|
133 Listener, Client = listener_client[serializer] |
|
134 |
|
135 # do authentication later |
|
136 self.listener = Listener(address=address, backlog=5) |
|
137 self.address = self.listener.address |
|
138 |
|
139 self.id_to_obj = {0: (None, ())} |
|
140 self.id_to_refcount = {} |
|
141 self.mutex = threading.RLock() |
|
142 self.stop = 0 |
|
143 |
|
144 def serve_forever(self): |
|
145 ''' |
|
146 Run the server forever |
|
147 ''' |
|
148 current_process()._manager_server = self |
|
149 try: |
|
150 try: |
|
151 while 1: |
|
152 try: |
|
153 c = self.listener.accept() |
|
154 except (OSError, IOError): |
|
155 continue |
|
156 t = threading.Thread(target=self.handle_request, args=(c,)) |
|
157 t.daemon = True |
|
158 t.start() |
|
159 except (KeyboardInterrupt, SystemExit): |
|
160 pass |
|
161 finally: |
|
162 self.stop = 999 |
|
163 self.listener.close() |
|
164 |
|
165 def handle_request(self, c): |
|
166 ''' |
|
167 Handle a new connection |
|
168 ''' |
|
169 funcname = result = request = None |
|
170 try: |
|
171 connection.deliver_challenge(c, self.authkey) |
|
172 connection.answer_challenge(c, self.authkey) |
|
173 request = c.recv() |
|
174 ignore, funcname, args, kwds = request |
|
175 assert funcname in self.public, '%r unrecognized' % funcname |
|
176 func = getattr(self, funcname) |
|
177 except Exception: |
|
178 msg = ('#TRACEBACK', format_exc()) |
|
179 else: |
|
180 try: |
|
181 result = func(c, *args, **kwds) |
|
182 except Exception: |
|
183 msg = ('#TRACEBACK', format_exc()) |
|
184 else: |
|
185 msg = ('#RETURN', result) |
|
186 try: |
|
187 c.send(msg) |
|
188 except Exception, e: |
|
189 try: |
|
190 c.send(('#TRACEBACK', format_exc())) |
|
191 except Exception: |
|
192 pass |
|
193 util.info('Failure to send message: %r', msg) |
|
194 util.info(' ... request was %r', request) |
|
195 util.info(' ... exception was %r', e) |
|
196 |
|
197 c.close() |
|
198 |
|
199 def serve_client(self, conn): |
|
200 ''' |
|
201 Handle requests from the proxies in a particular process/thread |
|
202 ''' |
|
203 util.debug('starting server thread to service %r', |
|
204 threading.current_thread().name) |
|
205 |
|
206 recv = conn.recv |
|
207 send = conn.send |
|
208 id_to_obj = self.id_to_obj |
|
209 |
|
210 while not self.stop: |
|
211 |
|
212 try: |
|
213 methodname = obj = None |
|
214 request = recv() |
|
215 ident, methodname, args, kwds = request |
|
216 obj, exposed, gettypeid = id_to_obj[ident] |
|
217 |
|
218 if methodname not in exposed: |
|
219 raise AttributeError( |
|
220 'method %r of %r object is not in exposed=%r' % |
|
221 (methodname, type(obj), exposed) |
|
222 ) |
|
223 |
|
224 function = getattr(obj, methodname) |
|
225 |
|
226 try: |
|
227 res = function(*args, **kwds) |
|
228 except Exception, e: |
|
229 msg = ('#ERROR', e) |
|
230 else: |
|
231 typeid = gettypeid and gettypeid.get(methodname, None) |
|
232 if typeid: |
|
233 rident, rexposed = self.create(conn, typeid, res) |
|
234 token = Token(typeid, self.address, rident) |
|
235 msg = ('#PROXY', (rexposed, token)) |
|
236 else: |
|
237 msg = ('#RETURN', res) |
|
238 |
|
239 except AttributeError: |
|
240 if methodname is None: |
|
241 msg = ('#TRACEBACK', format_exc()) |
|
242 else: |
|
243 try: |
|
244 fallback_func = self.fallback_mapping[methodname] |
|
245 result = fallback_func( |
|
246 self, conn, ident, obj, *args, **kwds |
|
247 ) |
|
248 msg = ('#RETURN', result) |
|
249 except Exception: |
|
250 msg = ('#TRACEBACK', format_exc()) |
|
251 |
|
252 except EOFError: |
|
253 util.debug('got EOF -- exiting thread serving %r', |
|
254 threading.current_thread().name) |
|
255 sys.exit(0) |
|
256 |
|
257 except Exception: |
|
258 msg = ('#TRACEBACK', format_exc()) |
|
259 |
|
260 try: |
|
261 try: |
|
262 send(msg) |
|
263 except Exception, e: |
|
264 send(('#UNSERIALIZABLE', repr(msg))) |
|
265 except Exception, e: |
|
266 util.info('exception in thread serving %r', |
|
267 threading.current_thread().name) |
|
268 util.info(' ... message was %r', msg) |
|
269 util.info(' ... exception was %r', e) |
|
270 conn.close() |
|
271 sys.exit(1) |
|
272 |
|
273 def fallback_getvalue(self, conn, ident, obj): |
|
274 return obj |
|
275 |
|
276 def fallback_str(self, conn, ident, obj): |
|
277 return str(obj) |
|
278 |
|
279 def fallback_repr(self, conn, ident, obj): |
|
280 return repr(obj) |
|
281 |
|
282 fallback_mapping = { |
|
283 '__str__':fallback_str, |
|
284 '__repr__':fallback_repr, |
|
285 '#GETVALUE':fallback_getvalue |
|
286 } |
|
287 |
|
288 def dummy(self, c): |
|
289 pass |
|
290 |
|
291 def debug_info(self, c): |
|
292 ''' |
|
293 Return some info --- useful to spot problems with refcounting |
|
294 ''' |
|
295 self.mutex.acquire() |
|
296 try: |
|
297 result = [] |
|
298 keys = self.id_to_obj.keys() |
|
299 keys.sort() |
|
300 for ident in keys: |
|
301 if ident != 0: |
|
302 result.append(' %s: refcount=%s\n %s' % |
|
303 (ident, self.id_to_refcount[ident], |
|
304 str(self.id_to_obj[ident][0])[:75])) |
|
305 return '\n'.join(result) |
|
306 finally: |
|
307 self.mutex.release() |
|
308 |
|
309 def number_of_objects(self, c): |
|
310 ''' |
|
311 Number of shared objects |
|
312 ''' |
|
313 return len(self.id_to_obj) - 1 # don't count ident=0 |
|
314 |
|
315 def shutdown(self, c): |
|
316 ''' |
|
317 Shutdown this process |
|
318 ''' |
|
319 try: |
|
320 try: |
|
321 util.debug('manager received shutdown message') |
|
322 c.send(('#RETURN', None)) |
|
323 |
|
324 if sys.stdout != sys.__stdout__: |
|
325 util.debug('resetting stdout, stderr') |
|
326 sys.stdout = sys.__stdout__ |
|
327 sys.stderr = sys.__stderr__ |
|
328 |
|
329 util._run_finalizers(0) |
|
330 |
|
331 for p in active_children(): |
|
332 util.debug('terminating a child process of manager') |
|
333 p.terminate() |
|
334 |
|
335 for p in active_children(): |
|
336 util.debug('terminating a child process of manager') |
|
337 p.join() |
|
338 |
|
339 util._run_finalizers() |
|
340 util.info('manager exiting with exitcode 0') |
|
341 except: |
|
342 import traceback |
|
343 traceback.print_exc() |
|
344 finally: |
|
345 exit(0) |
|
346 |
|
347 def create(self, c, typeid, *args, **kwds): |
|
348 ''' |
|
349 Create a new shared object and return its id |
|
350 ''' |
|
351 self.mutex.acquire() |
|
352 try: |
|
353 callable, exposed, method_to_typeid, proxytype = \ |
|
354 self.registry[typeid] |
|
355 |
|
356 if callable is None: |
|
357 assert len(args) == 1 and not kwds |
|
358 obj = args[0] |
|
359 else: |
|
360 obj = callable(*args, **kwds) |
|
361 |
|
362 if exposed is None: |
|
363 exposed = public_methods(obj) |
|
364 if method_to_typeid is not None: |
|
365 assert type(method_to_typeid) is dict |
|
366 exposed = list(exposed) + list(method_to_typeid) |
|
367 |
|
368 ident = '%x' % id(obj) # convert to string because xmlrpclib |
|
369 # only has 32 bit signed integers |
|
370 util.debug('%r callable returned object with id %r', typeid, ident) |
|
371 |
|
372 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) |
|
373 if ident not in self.id_to_refcount: |
|
374 self.id_to_refcount[ident] = 0 |
|
375 # increment the reference count immediately, to avoid |
|
376 # this object being garbage collected before a Proxy |
|
377 # object for it can be created. The caller of create() |
|
378 # is responsible for doing a decref once the Proxy object |
|
379 # has been created. |
|
380 self.incref(c, ident) |
|
381 return ident, tuple(exposed) |
|
382 finally: |
|
383 self.mutex.release() |
|
384 |
|
385 def get_methods(self, c, token): |
|
386 ''' |
|
387 Return the methods of the shared object indicated by token |
|
388 ''' |
|
389 return tuple(self.id_to_obj[token.id][1]) |
|
390 |
|
391 def accept_connection(self, c, name): |
|
392 ''' |
|
393 Spawn a new thread to serve this connection |
|
394 ''' |
|
395 threading.current_thread().name = name |
|
396 c.send(('#RETURN', None)) |
|
397 self.serve_client(c) |
|
398 |
|
399 def incref(self, c, ident): |
|
400 self.mutex.acquire() |
|
401 try: |
|
402 self.id_to_refcount[ident] += 1 |
|
403 finally: |
|
404 self.mutex.release() |
|
405 |
|
406 def decref(self, c, ident): |
|
407 self.mutex.acquire() |
|
408 try: |
|
409 assert self.id_to_refcount[ident] >= 1 |
|
410 self.id_to_refcount[ident] -= 1 |
|
411 if self.id_to_refcount[ident] == 0: |
|
412 del self.id_to_obj[ident], self.id_to_refcount[ident] |
|
413 util.debug('disposing of obj with id %d', ident) |
|
414 finally: |
|
415 self.mutex.release() |
|
416 |
|
417 # |
|
418 # Class to represent state of a manager |
|
419 # |
|
420 |
|
421 class State(object): |
|
422 __slots__ = ['value'] |
|
423 INITIAL = 0 |
|
424 STARTED = 1 |
|
425 SHUTDOWN = 2 |
|
426 |
|
427 # |
|
428 # Mapping from serializer name to Listener and Client types |
|
429 # |
|
430 |
|
431 listener_client = { |
|
432 'pickle' : (connection.Listener, connection.Client), |
|
433 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) |
|
434 } |
|
435 |
|
436 # |
|
437 # Definition of BaseManager |
|
438 # |
|
439 |
|
440 class BaseManager(object): |
|
441 ''' |
|
442 Base class for managers |
|
443 ''' |
|
444 _registry = {} |
|
445 _Server = Server |
|
446 |
|
447 def __init__(self, address=None, authkey=None, serializer='pickle'): |
|
448 if authkey is None: |
|
449 authkey = current_process().authkey |
|
450 self._address = address # XXX not final address if eg ('', 0) |
|
451 self._authkey = AuthenticationString(authkey) |
|
452 self._state = State() |
|
453 self._state.value = State.INITIAL |
|
454 self._serializer = serializer |
|
455 self._Listener, self._Client = listener_client[serializer] |
|
456 |
|
457 def __reduce__(self): |
|
458 return type(self).from_address, \ |
|
459 (self._address, self._authkey, self._serializer) |
|
460 |
|
461 def get_server(self): |
|
462 ''' |
|
463 Return server object with serve_forever() method and address attribute |
|
464 ''' |
|
465 assert self._state.value == State.INITIAL |
|
466 return Server(self._registry, self._address, |
|
467 self._authkey, self._serializer) |
|
468 |
|
469 def connect(self): |
|
470 ''' |
|
471 Connect manager object to the server process |
|
472 ''' |
|
473 Listener, Client = listener_client[self._serializer] |
|
474 conn = Client(self._address, authkey=self._authkey) |
|
475 dispatch(conn, None, 'dummy') |
|
476 self._state.value = State.STARTED |
|
477 |
|
478 def start(self): |
|
479 ''' |
|
480 Spawn a server process for this manager object |
|
481 ''' |
|
482 assert self._state.value == State.INITIAL |
|
483 |
|
484 # pipe over which we will retrieve address of server |
|
485 reader, writer = connection.Pipe(duplex=False) |
|
486 |
|
487 # spawn process which runs a server |
|
488 self._process = Process( |
|
489 target=type(self)._run_server, |
|
490 args=(self._registry, self._address, self._authkey, |
|
491 self._serializer, writer), |
|
492 ) |
|
493 ident = ':'.join(str(i) for i in self._process._identity) |
|
494 self._process.name = type(self).__name__ + '-' + ident |
|
495 self._process.start() |
|
496 |
|
497 # get address of server |
|
498 writer.close() |
|
499 self._address = reader.recv() |
|
500 reader.close() |
|
501 |
|
502 # register a finalizer |
|
503 self._state.value = State.STARTED |
|
504 self.shutdown = util.Finalize( |
|
505 self, type(self)._finalize_manager, |
|
506 args=(self._process, self._address, self._authkey, |
|
507 self._state, self._Client), |
|
508 exitpriority=0 |
|
509 ) |
|
510 |
|
511 @classmethod |
|
512 def _run_server(cls, registry, address, authkey, serializer, writer): |
|
513 ''' |
|
514 Create a server, report its address and run it |
|
515 ''' |
|
516 # create server |
|
517 server = cls._Server(registry, address, authkey, serializer) |
|
518 |
|
519 # inform parent process of the server's address |
|
520 writer.send(server.address) |
|
521 writer.close() |
|
522 |
|
523 # run the manager |
|
524 util.info('manager serving at %r', server.address) |
|
525 server.serve_forever() |
|
526 |
|
527 def _create(self, typeid, *args, **kwds): |
|
528 ''' |
|
529 Create a new shared object; return the token and exposed tuple |
|
530 ''' |
|
531 assert self._state.value == State.STARTED, 'server not yet started' |
|
532 conn = self._Client(self._address, authkey=self._authkey) |
|
533 try: |
|
534 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) |
|
535 finally: |
|
536 conn.close() |
|
537 return Token(typeid, self._address, id), exposed |
|
538 |
|
539 def join(self, timeout=None): |
|
540 ''' |
|
541 Join the manager process (if it has been spawned) |
|
542 ''' |
|
543 self._process.join(timeout) |
|
544 |
|
545 def _debug_info(self): |
|
546 ''' |
|
547 Return some info about the servers shared objects and connections |
|
548 ''' |
|
549 conn = self._Client(self._address, authkey=self._authkey) |
|
550 try: |
|
551 return dispatch(conn, None, 'debug_info') |
|
552 finally: |
|
553 conn.close() |
|
554 |
|
555 def _number_of_objects(self): |
|
556 ''' |
|
557 Return the number of shared objects |
|
558 ''' |
|
559 conn = self._Client(self._address, authkey=self._authkey) |
|
560 try: |
|
561 return dispatch(conn, None, 'number_of_objects') |
|
562 finally: |
|
563 conn.close() |
|
564 |
|
565 def __enter__(self): |
|
566 return self |
|
567 |
|
568 def __exit__(self, exc_type, exc_val, exc_tb): |
|
569 self.shutdown() |
|
570 |
|
571 @staticmethod |
|
572 def _finalize_manager(process, address, authkey, state, _Client): |
|
573 ''' |
|
574 Shutdown the manager process; will be registered as a finalizer |
|
575 ''' |
|
576 if process.is_alive(): |
|
577 util.info('sending shutdown message to manager') |
|
578 try: |
|
579 conn = _Client(address, authkey=authkey) |
|
580 try: |
|
581 dispatch(conn, None, 'shutdown') |
|
582 finally: |
|
583 conn.close() |
|
584 except Exception: |
|
585 pass |
|
586 |
|
587 process.join(timeout=0.2) |
|
588 if process.is_alive(): |
|
589 util.info('manager still alive') |
|
590 if hasattr(process, 'terminate'): |
|
591 util.info('trying to `terminate()` manager process') |
|
592 process.terminate() |
|
593 process.join(timeout=0.1) |
|
594 if process.is_alive(): |
|
595 util.info('manager still alive after terminate') |
|
596 |
|
597 state.value = State.SHUTDOWN |
|
598 try: |
|
599 del BaseProxy._address_to_local[address] |
|
600 except KeyError: |
|
601 pass |
|
602 |
|
603 address = property(lambda self: self._address) |
|
604 |
|
605 @classmethod |
|
606 def register(cls, typeid, callable=None, proxytype=None, exposed=None, |
|
607 method_to_typeid=None, create_method=True): |
|
608 ''' |
|
609 Register a typeid with the manager type |
|
610 ''' |
|
611 if '_registry' not in cls.__dict__: |
|
612 cls._registry = cls._registry.copy() |
|
613 |
|
614 if proxytype is None: |
|
615 proxytype = AutoProxy |
|
616 |
|
617 exposed = exposed or getattr(proxytype, '_exposed_', None) |
|
618 |
|
619 method_to_typeid = method_to_typeid or \ |
|
620 getattr(proxytype, '_method_to_typeid_', None) |
|
621 |
|
622 if method_to_typeid: |
|
623 for key, value in method_to_typeid.items(): |
|
624 assert type(key) is str, '%r is not a string' % key |
|
625 assert type(value) is str, '%r is not a string' % value |
|
626 |
|
627 cls._registry[typeid] = ( |
|
628 callable, exposed, method_to_typeid, proxytype |
|
629 ) |
|
630 |
|
631 if create_method: |
|
632 def temp(self, *args, **kwds): |
|
633 util.debug('requesting creation of a shared %r object', typeid) |
|
634 token, exp = self._create(typeid, *args, **kwds) |
|
635 proxy = proxytype( |
|
636 token, self._serializer, manager=self, |
|
637 authkey=self._authkey, exposed=exp |
|
638 ) |
|
639 conn = self._Client(token.address, authkey=self._authkey) |
|
640 dispatch(conn, None, 'decref', (token.id,)) |
|
641 return proxy |
|
642 temp.__name__ = typeid |
|
643 setattr(cls, typeid, temp) |
|
644 |
|
645 # |
|
646 # Subclass of set which get cleared after a fork |
|
647 # |
|
648 |
|
649 class ProcessLocalSet(set): |
|
650 def __init__(self): |
|
651 util.register_after_fork(self, lambda obj: obj.clear()) |
|
652 def __reduce__(self): |
|
653 return type(self), () |
|
654 |
|
655 # |
|
656 # Definition of BaseProxy |
|
657 # |
|
658 |
|
659 class BaseProxy(object): |
|
660 ''' |
|
661 A base for proxies of shared objects |
|
662 ''' |
|
663 _address_to_local = {} |
|
664 _mutex = util.ForkAwareThreadLock() |
|
665 |
|
666 def __init__(self, token, serializer, manager=None, |
|
667 authkey=None, exposed=None, incref=True): |
|
668 BaseProxy._mutex.acquire() |
|
669 try: |
|
670 tls_idset = BaseProxy._address_to_local.get(token.address, None) |
|
671 if tls_idset is None: |
|
672 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() |
|
673 BaseProxy._address_to_local[token.address] = tls_idset |
|
674 finally: |
|
675 BaseProxy._mutex.release() |
|
676 |
|
677 # self._tls is used to record the connection used by this |
|
678 # thread to communicate with the manager at token.address |
|
679 self._tls = tls_idset[0] |
|
680 |
|
681 # self._idset is used to record the identities of all shared |
|
682 # objects for which the current process owns references and |
|
683 # which are in the manager at token.address |
|
684 self._idset = tls_idset[1] |
|
685 |
|
686 self._token = token |
|
687 self._id = self._token.id |
|
688 self._manager = manager |
|
689 self._serializer = serializer |
|
690 self._Client = listener_client[serializer][1] |
|
691 |
|
692 if authkey is not None: |
|
693 self._authkey = AuthenticationString(authkey) |
|
694 elif self._manager is not None: |
|
695 self._authkey = self._manager._authkey |
|
696 else: |
|
697 self._authkey = current_process().authkey |
|
698 |
|
699 if incref: |
|
700 self._incref() |
|
701 |
|
702 util.register_after_fork(self, BaseProxy._after_fork) |
|
703 |
|
704 def _connect(self): |
|
705 util.debug('making connection to manager') |
|
706 name = current_process().name |
|
707 if threading.current_thread().name != 'MainThread': |
|
708 name += '|' + threading.current_thread().name |
|
709 conn = self._Client(self._token.address, authkey=self._authkey) |
|
710 dispatch(conn, None, 'accept_connection', (name,)) |
|
711 self._tls.connection = conn |
|
712 |
|
713 def _callmethod(self, methodname, args=(), kwds={}): |
|
714 ''' |
|
715 Try to call a method of the referrent and return a copy of the result |
|
716 ''' |
|
717 try: |
|
718 conn = self._tls.connection |
|
719 except AttributeError: |
|
720 util.debug('thread %r does not own a connection', |
|
721 threading.current_thread().name) |
|
722 self._connect() |
|
723 conn = self._tls.connection |
|
724 |
|
725 conn.send((self._id, methodname, args, kwds)) |
|
726 kind, result = conn.recv() |
|
727 |
|
728 if kind == '#RETURN': |
|
729 return result |
|
730 elif kind == '#PROXY': |
|
731 exposed, token = result |
|
732 proxytype = self._manager._registry[token.typeid][-1] |
|
733 proxy = proxytype( |
|
734 token, self._serializer, manager=self._manager, |
|
735 authkey=self._authkey, exposed=exposed |
|
736 ) |
|
737 conn = self._Client(token.address, authkey=self._authkey) |
|
738 dispatch(conn, None, 'decref', (token.id,)) |
|
739 return proxy |
|
740 raise convert_to_error(kind, result) |
|
741 |
|
742 def _getvalue(self): |
|
743 ''' |
|
744 Get a copy of the value of the referent |
|
745 ''' |
|
746 return self._callmethod('#GETVALUE') |
|
747 |
|
748 def _incref(self): |
|
749 conn = self._Client(self._token.address, authkey=self._authkey) |
|
750 dispatch(conn, None, 'incref', (self._id,)) |
|
751 util.debug('INCREF %r', self._token.id) |
|
752 |
|
753 self._idset.add(self._id) |
|
754 |
|
755 state = self._manager and self._manager._state |
|
756 |
|
757 self._close = util.Finalize( |
|
758 self, BaseProxy._decref, |
|
759 args=(self._token, self._authkey, state, |
|
760 self._tls, self._idset, self._Client), |
|
761 exitpriority=10 |
|
762 ) |
|
763 |
|
764 @staticmethod |
|
765 def _decref(token, authkey, state, tls, idset, _Client): |
|
766 idset.discard(token.id) |
|
767 |
|
768 # check whether manager is still alive |
|
769 if state is None or state.value == State.STARTED: |
|
770 # tell manager this process no longer cares about referent |
|
771 try: |
|
772 util.debug('DECREF %r', token.id) |
|
773 conn = _Client(token.address, authkey=authkey) |
|
774 dispatch(conn, None, 'decref', (token.id,)) |
|
775 except Exception, e: |
|
776 util.debug('... decref failed %s', e) |
|
777 |
|
778 else: |
|
779 util.debug('DECREF %r -- manager already shutdown', token.id) |
|
780 |
|
781 # check whether we can close this thread's connection because |
|
782 # the process owns no more references to objects for this manager |
|
783 if not idset and hasattr(tls, 'connection'): |
|
784 util.debug('thread %r has no more proxies so closing conn', |
|
785 threading.current_thread().name) |
|
786 tls.connection.close() |
|
787 del tls.connection |
|
788 |
|
789 def _after_fork(self): |
|
790 self._manager = None |
|
791 try: |
|
792 self._incref() |
|
793 except Exception, e: |
|
794 # the proxy may just be for a manager which has shutdown |
|
795 util.info('incref failed: %s' % e) |
|
796 |
|
797 def __reduce__(self): |
|
798 kwds = {} |
|
799 if Popen.thread_is_spawning(): |
|
800 kwds['authkey'] = self._authkey |
|
801 |
|
802 if getattr(self, '_isauto', False): |
|
803 kwds['exposed'] = self._exposed_ |
|
804 return (RebuildProxy, |
|
805 (AutoProxy, self._token, self._serializer, kwds)) |
|
806 else: |
|
807 return (RebuildProxy, |
|
808 (type(self), self._token, self._serializer, kwds)) |
|
809 |
|
810 def __deepcopy__(self, memo): |
|
811 return self._getvalue() |
|
812 |
|
813 def __repr__(self): |
|
814 return '<%s object, typeid %r at %s>' % \ |
|
815 (type(self).__name__, self._token.typeid, '0x%x' % id(self)) |
|
816 |
|
817 def __str__(self): |
|
818 ''' |
|
819 Return representation of the referent (or a fall-back if that fails) |
|
820 ''' |
|
821 try: |
|
822 return self._callmethod('__repr__') |
|
823 except Exception: |
|
824 return repr(self)[:-1] + "; '__str__()' failed>" |
|
825 |
|
826 # |
|
827 # Function used for unpickling |
|
828 # |
|
829 |
|
830 def RebuildProxy(func, token, serializer, kwds): |
|
831 ''' |
|
832 Function used for unpickling proxy objects. |
|
833 |
|
834 If possible the shared object is returned, or otherwise a proxy for it. |
|
835 ''' |
|
836 server = getattr(current_process(), '_manager_server', None) |
|
837 |
|
838 if server and server.address == token.address: |
|
839 return server.id_to_obj[token.id][0] |
|
840 else: |
|
841 incref = ( |
|
842 kwds.pop('incref', True) and |
|
843 not getattr(current_process(), '_inheriting', False) |
|
844 ) |
|
845 return func(token, serializer, incref=incref, **kwds) |
|
846 |
|
847 # |
|
848 # Functions to create proxies and proxy types |
|
849 # |
|
850 |
|
851 def MakeProxyType(name, exposed, _cache={}): |
|
852 ''' |
|
853 Return an proxy type whose methods are given by `exposed` |
|
854 ''' |
|
855 exposed = tuple(exposed) |
|
856 try: |
|
857 return _cache[(name, exposed)] |
|
858 except KeyError: |
|
859 pass |
|
860 |
|
861 dic = {} |
|
862 |
|
863 for meth in exposed: |
|
864 exec '''def %s(self, *args, **kwds): |
|
865 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic |
|
866 |
|
867 ProxyType = type(name, (BaseProxy,), dic) |
|
868 ProxyType._exposed_ = exposed |
|
869 _cache[(name, exposed)] = ProxyType |
|
870 return ProxyType |
|
871 |
|
872 |
|
873 def AutoProxy(token, serializer, manager=None, authkey=None, |
|
874 exposed=None, incref=True): |
|
875 ''' |
|
876 Return an auto-proxy for `token` |
|
877 ''' |
|
878 _Client = listener_client[serializer][1] |
|
879 |
|
880 if exposed is None: |
|
881 conn = _Client(token.address, authkey=authkey) |
|
882 try: |
|
883 exposed = dispatch(conn, None, 'get_methods', (token,)) |
|
884 finally: |
|
885 conn.close() |
|
886 |
|
887 if authkey is None and manager is not None: |
|
888 authkey = manager._authkey |
|
889 if authkey is None: |
|
890 authkey = current_process().authkey |
|
891 |
|
892 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) |
|
893 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, |
|
894 incref=incref) |
|
895 proxy._isauto = True |
|
896 return proxy |
|
897 |
|
898 # |
|
899 # Types/callables which we will register with SyncManager |
|
900 # |
|
901 |
|
902 class Namespace(object): |
|
903 def __init__(self, **kwds): |
|
904 self.__dict__.update(kwds) |
|
905 def __repr__(self): |
|
906 items = self.__dict__.items() |
|
907 temp = [] |
|
908 for name, value in items: |
|
909 if not name.startswith('_'): |
|
910 temp.append('%s=%r' % (name, value)) |
|
911 temp.sort() |
|
912 return 'Namespace(%s)' % str.join(', ', temp) |
|
913 |
|
914 class Value(object): |
|
915 def __init__(self, typecode, value, lock=True): |
|
916 self._typecode = typecode |
|
917 self._value = value |
|
918 def get(self): |
|
919 return self._value |
|
920 def set(self, value): |
|
921 self._value = value |
|
922 def __repr__(self): |
|
923 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) |
|
924 value = property(get, set) |
|
925 |
|
926 def Array(typecode, sequence, lock=True): |
|
927 return array.array(typecode, sequence) |
|
928 |
|
929 # |
|
930 # Proxy types used by SyncManager |
|
931 # |
|
932 |
|
933 class IteratorProxy(BaseProxy): |
|
934 # XXX remove methods for Py3.0 and Py2.6 |
|
935 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close') |
|
936 def __iter__(self): |
|
937 return self |
|
938 def __next__(self, *args): |
|
939 return self._callmethod('__next__', args) |
|
940 def next(self, *args): |
|
941 return self._callmethod('next', args) |
|
942 def send(self, *args): |
|
943 return self._callmethod('send', args) |
|
944 def throw(self, *args): |
|
945 return self._callmethod('throw', args) |
|
946 def close(self, *args): |
|
947 return self._callmethod('close', args) |
|
948 |
|
949 |
|
950 class AcquirerProxy(BaseProxy): |
|
951 _exposed_ = ('acquire', 'release') |
|
952 def acquire(self, blocking=True): |
|
953 return self._callmethod('acquire', (blocking,)) |
|
954 def release(self): |
|
955 return self._callmethod('release') |
|
956 def __enter__(self): |
|
957 return self._callmethod('acquire') |
|
958 def __exit__(self, exc_type, exc_val, exc_tb): |
|
959 return self._callmethod('release') |
|
960 |
|
961 |
|
962 class ConditionProxy(AcquirerProxy): |
|
963 # XXX will Condition.notfyAll() name be available in Py3.0? |
|
964 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') |
|
965 def wait(self, timeout=None): |
|
966 return self._callmethod('wait', (timeout,)) |
|
967 def notify(self): |
|
968 return self._callmethod('notify') |
|
969 def notify_all(self): |
|
970 return self._callmethod('notify_all') |
|
971 |
|
972 class EventProxy(BaseProxy): |
|
973 _exposed_ = ('is_set', 'set', 'clear', 'wait') |
|
974 def is_set(self): |
|
975 return self._callmethod('is_set') |
|
976 def set(self): |
|
977 return self._callmethod('set') |
|
978 def clear(self): |
|
979 return self._callmethod('clear') |
|
980 def wait(self, timeout=None): |
|
981 return self._callmethod('wait', (timeout,)) |
|
982 |
|
983 class NamespaceProxy(BaseProxy): |
|
984 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') |
|
985 def __getattr__(self, key): |
|
986 if key[0] == '_': |
|
987 return object.__getattribute__(self, key) |
|
988 callmethod = object.__getattribute__(self, '_callmethod') |
|
989 return callmethod('__getattribute__', (key,)) |
|
990 def __setattr__(self, key, value): |
|
991 if key[0] == '_': |
|
992 return object.__setattr__(self, key, value) |
|
993 callmethod = object.__getattribute__(self, '_callmethod') |
|
994 return callmethod('__setattr__', (key, value)) |
|
995 def __delattr__(self, key): |
|
996 if key[0] == '_': |
|
997 return object.__delattr__(self, key) |
|
998 callmethod = object.__getattribute__(self, '_callmethod') |
|
999 return callmethod('__delattr__', (key,)) |
|
1000 |
|
1001 |
|
1002 class ValueProxy(BaseProxy): |
|
1003 _exposed_ = ('get', 'set') |
|
1004 def get(self): |
|
1005 return self._callmethod('get') |
|
1006 def set(self, value): |
|
1007 return self._callmethod('set', (value,)) |
|
1008 value = property(get, set) |
|
1009 |
|
1010 |
|
1011 BaseListProxy = MakeProxyType('BaseListProxy', ( |
|
1012 '__add__', '__contains__', '__delitem__', '__delslice__', |
|
1013 '__getitem__', '__getslice__', '__len__', '__mul__', |
|
1014 '__reversed__', '__rmul__', '__setitem__', '__setslice__', |
|
1015 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', |
|
1016 'reverse', 'sort', '__imul__' |
|
1017 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 |
|
1018 class ListProxy(BaseListProxy): |
|
1019 def __iadd__(self, value): |
|
1020 self._callmethod('extend', (value,)) |
|
1021 return self |
|
1022 def __imul__(self, value): |
|
1023 self._callmethod('__imul__', (value,)) |
|
1024 return self |
|
1025 |
|
1026 |
|
1027 DictProxy = MakeProxyType('DictProxy', ( |
|
1028 '__contains__', '__delitem__', '__getitem__', '__len__', |
|
1029 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items', |
|
1030 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' |
|
1031 )) |
|
1032 |
|
1033 |
|
1034 ArrayProxy = MakeProxyType('ArrayProxy', ( |
|
1035 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__' |
|
1036 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 |
|
1037 |
|
1038 |
|
1039 PoolProxy = MakeProxyType('PoolProxy', ( |
|
1040 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', |
|
1041 'map', 'map_async', 'terminate' |
|
1042 )) |
|
1043 PoolProxy._method_to_typeid_ = { |
|
1044 'apply_async': 'AsyncResult', |
|
1045 'map_async': 'AsyncResult', |
|
1046 'imap': 'Iterator', |
|
1047 'imap_unordered': 'Iterator' |
|
1048 } |
|
1049 |
|
1050 # |
|
1051 # Definition of SyncManager |
|
1052 # |
|
1053 |
|
1054 class SyncManager(BaseManager): |
|
1055 ''' |
|
1056 Subclass of `BaseManager` which supports a number of shared object types. |
|
1057 |
|
1058 The types registered are those intended for the synchronization |
|
1059 of threads, plus `dict`, `list` and `Namespace`. |
|
1060 |
|
1061 The `multiprocessing.Manager()` function creates started instances of |
|
1062 this class. |
|
1063 ''' |
|
1064 |
|
1065 SyncManager.register('Queue', Queue.Queue) |
|
1066 SyncManager.register('JoinableQueue', Queue.Queue) |
|
1067 SyncManager.register('Event', threading.Event, EventProxy) |
|
1068 SyncManager.register('Lock', threading.Lock, AcquirerProxy) |
|
1069 SyncManager.register('RLock', threading.RLock, AcquirerProxy) |
|
1070 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) |
|
1071 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, |
|
1072 AcquirerProxy) |
|
1073 SyncManager.register('Condition', threading.Condition, ConditionProxy) |
|
1074 SyncManager.register('Pool', Pool, PoolProxy) |
|
1075 SyncManager.register('list', list, ListProxy) |
|
1076 SyncManager.register('dict', dict, DictProxy) |
|
1077 SyncManager.register('Value', Value, ValueProxy) |
|
1078 SyncManager.register('Array', Array, ArrayProxy) |
|
1079 SyncManager.register('Namespace', Namespace, NamespaceProxy) |
|
1080 |
|
1081 # types returned by methods of PoolProxy |
|
1082 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) |
|
1083 SyncManager.register('AsyncResult', create_method=False) |