symbian-qemu-0.9.1-12/python-2.6.1/Lib/multiprocessing/managers.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #
       
     2 # Module providing the `SyncManager` class for dealing
       
     3 # with shared objects
       
     4 #
       
     5 # multiprocessing/managers.py
       
     6 #
       
     7 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
       
     8 #
       
     9 
       
    10 __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
       
    11 
       
    12 #
       
    13 # Imports
       
    14 #
       
    15 
       
    16 import os
       
    17 import sys
       
    18 import weakref
       
    19 import threading
       
    20 import array
       
    21 import Queue
       
    22 
       
    23 from traceback import format_exc
       
    24 from multiprocessing import Process, current_process, active_children, Pool, util, connection
       
    25 from multiprocessing.process import AuthenticationString
       
    26 from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
       
    27 from multiprocessing.util import Finalize, info
       
    28 
       
    29 try:
       
    30     from cPickle import PicklingError
       
    31 except ImportError:
       
    32     from pickle import PicklingError
       
    33 
       
    34 #
       
    35 # Register some things for pickling
       
    36 #
       
    37 
       
    38 def reduce_array(a):
       
    39     return array.array, (a.typecode, a.tostring())
       
    40 ForkingPickler.register(array.array, reduce_array)
       
    41 
       
    42 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
       
    43 
       
    44 #
       
    45 # Type for identifying shared objects
       
    46 #
       
    47 
       
    48 class Token(object):
       
    49     '''
       
    50     Type to uniquely indentify a shared object
       
    51     '''
       
    52     __slots__ = ('typeid', 'address', 'id')
       
    53 
       
    54     def __init__(self, typeid, address, id):
       
    55         (self.typeid, self.address, self.id) = (typeid, address, id)
       
    56 
       
    57     def __getstate__(self):
       
    58         return (self.typeid, self.address, self.id)
       
    59 
       
    60     def __setstate__(self, state):
       
    61         (self.typeid, self.address, self.id) = state
       
    62 
       
    63     def __repr__(self):
       
    64         return 'Token(typeid=%r, address=%r, id=%r)' % \
       
    65                (self.typeid, self.address, self.id)
       
    66 
       
    67 #
       
    68 # Function for communication with a manager's server process
       
    69 #
       
    70 
       
    71 def dispatch(c, id, methodname, args=(), kwds={}):
       
    72     '''
       
    73     Send a message to manager using connection `c` and return response
       
    74     '''
       
    75     c.send((id, methodname, args, kwds))
       
    76     kind, result = c.recv()
       
    77     if kind == '#RETURN':
       
    78         return result
       
    79     raise convert_to_error(kind, result)
       
    80 
       
    81 def convert_to_error(kind, result):
       
    82     if kind == '#ERROR':
       
    83         return result
       
    84     elif kind == '#TRACEBACK':
       
    85         assert type(result) is str
       
    86         return  RemoteError(result)
       
    87     elif kind == '#UNSERIALIZABLE':
       
    88         assert type(result) is str
       
    89         return RemoteError('Unserializable message: %s\n' % result)
       
    90     else:
       
    91         return ValueError('Unrecognized message type')
       
    92 
       
    93 class RemoteError(Exception):
       
    94     def __str__(self):
       
    95         return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
       
    96 
       
    97 #
       
    98 # Functions for finding the method names of an object
       
    99 #
       
   100 
       
   101 def all_methods(obj):
       
   102     '''
       
   103     Return a list of names of methods of `obj`
       
   104     '''
       
   105     temp = []
       
   106     for name in dir(obj):
       
   107         func = getattr(obj, name)
       
   108         if hasattr(func, '__call__'):
       
   109             temp.append(name)
       
   110     return temp
       
   111 
       
   112 def public_methods(obj):
       
   113     '''
       
   114     Return a list of names of methods of `obj` which do not start with '_'
       
   115     '''
       
   116     return [name for name in all_methods(obj) if name[0] != '_']
       
   117 
       
   118 #
       
   119 # Server which is run in a process controlled by a manager
       
   120 #
       
   121 
       
   122 class Server(object):
       
   123     '''
       
   124     Server class which runs in a process controlled by a manager object
       
   125     '''
       
   126     public = ['shutdown', 'create', 'accept_connection', 'get_methods',
       
   127               'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
       
   128 
       
   129     def __init__(self, registry, address, authkey, serializer):
       
   130         assert isinstance(authkey, bytes)
       
   131         self.registry = registry
       
   132         self.authkey = AuthenticationString(authkey)
       
   133         Listener, Client = listener_client[serializer]
       
   134 
       
   135         # do authentication later
       
   136         self.listener = Listener(address=address, backlog=5)
       
   137         self.address = self.listener.address
       
   138 
       
   139         self.id_to_obj = {0: (None, ())}
       
   140         self.id_to_refcount = {}
       
   141         self.mutex = threading.RLock()
       
   142         self.stop = 0
       
   143 
       
   144     def serve_forever(self):
       
   145         '''
       
   146         Run the server forever
       
   147         '''
       
   148         current_process()._manager_server = self
       
   149         try:
       
   150             try:
       
   151                 while 1:
       
   152                     try:
       
   153                         c = self.listener.accept()
       
   154                     except (OSError, IOError):
       
   155                         continue
       
   156                     t = threading.Thread(target=self.handle_request, args=(c,))
       
   157                     t.daemon = True
       
   158                     t.start()
       
   159             except (KeyboardInterrupt, SystemExit):
       
   160                 pass
       
   161         finally:
       
   162             self.stop = 999
       
   163             self.listener.close()
       
   164 
       
   165     def handle_request(self, c):
       
   166         '''
       
   167         Handle a new connection
       
   168         '''
       
   169         funcname = result = request = None
       
   170         try:
       
   171             connection.deliver_challenge(c, self.authkey)
       
   172             connection.answer_challenge(c, self.authkey)
       
   173             request = c.recv()
       
   174             ignore, funcname, args, kwds = request
       
   175             assert funcname in self.public, '%r unrecognized' % funcname
       
   176             func = getattr(self, funcname)
       
   177         except Exception:
       
   178             msg = ('#TRACEBACK', format_exc())
       
   179         else:
       
   180             try:
       
   181                 result = func(c, *args, **kwds)
       
   182             except Exception:
       
   183                 msg = ('#TRACEBACK', format_exc())
       
   184             else:
       
   185                 msg = ('#RETURN', result)
       
   186         try:
       
   187             c.send(msg)
       
   188         except Exception, e:
       
   189             try:
       
   190                 c.send(('#TRACEBACK', format_exc()))
       
   191             except Exception:
       
   192                 pass
       
   193             util.info('Failure to send message: %r', msg)
       
   194             util.info(' ... request was %r', request)
       
   195             util.info(' ... exception was %r', e)
       
   196 
       
   197         c.close()
       
   198 
       
   199     def serve_client(self, conn):
       
   200         '''
       
   201         Handle requests from the proxies in a particular process/thread
       
   202         '''
       
   203         util.debug('starting server thread to service %r',
       
   204                    threading.current_thread().name)
       
   205 
       
   206         recv = conn.recv
       
   207         send = conn.send
       
   208         id_to_obj = self.id_to_obj
       
   209 
       
   210         while not self.stop:
       
   211 
       
   212             try:
       
   213                 methodname = obj = None
       
   214                 request = recv()
       
   215                 ident, methodname, args, kwds = request
       
   216                 obj, exposed, gettypeid = id_to_obj[ident]
       
   217 
       
   218                 if methodname not in exposed:
       
   219                     raise AttributeError(
       
   220                         'method %r of %r object is not in exposed=%r' %
       
   221                         (methodname, type(obj), exposed)
       
   222                         )
       
   223 
       
   224                 function = getattr(obj, methodname)
       
   225 
       
   226                 try:
       
   227                     res = function(*args, **kwds)
       
   228                 except Exception, e:
       
   229                     msg = ('#ERROR', e)
       
   230                 else:
       
   231                     typeid = gettypeid and gettypeid.get(methodname, None)
       
   232                     if typeid:
       
   233                         rident, rexposed = self.create(conn, typeid, res)
       
   234                         token = Token(typeid, self.address, rident)
       
   235                         msg = ('#PROXY', (rexposed, token))
       
   236                     else:
       
   237                         msg = ('#RETURN', res)
       
   238 
       
   239             except AttributeError:
       
   240                 if methodname is None:
       
   241                     msg = ('#TRACEBACK', format_exc())
       
   242                 else:
       
   243                     try:
       
   244                         fallback_func = self.fallback_mapping[methodname]
       
   245                         result = fallback_func(
       
   246                             self, conn, ident, obj, *args, **kwds
       
   247                             )
       
   248                         msg = ('#RETURN', result)
       
   249                     except Exception:
       
   250                         msg = ('#TRACEBACK', format_exc())
       
   251 
       
   252             except EOFError:
       
   253                 util.debug('got EOF -- exiting thread serving %r',
       
   254                            threading.current_thread().name)
       
   255                 sys.exit(0)
       
   256 
       
   257             except Exception:
       
   258                 msg = ('#TRACEBACK', format_exc())
       
   259 
       
   260             try:
       
   261                 try:
       
   262                     send(msg)
       
   263                 except Exception, e:
       
   264                     send(('#UNSERIALIZABLE', repr(msg)))
       
   265             except Exception, e:
       
   266                 util.info('exception in thread serving %r',
       
   267                         threading.current_thread().name)
       
   268                 util.info(' ... message was %r', msg)
       
   269                 util.info(' ... exception was %r', e)
       
   270                 conn.close()
       
   271                 sys.exit(1)
       
   272 
       
   273     def fallback_getvalue(self, conn, ident, obj):
       
   274         return obj
       
   275 
       
   276     def fallback_str(self, conn, ident, obj):
       
   277         return str(obj)
       
   278 
       
   279     def fallback_repr(self, conn, ident, obj):
       
   280         return repr(obj)
       
   281 
       
   282     fallback_mapping = {
       
   283         '__str__':fallback_str,
       
   284         '__repr__':fallback_repr,
       
   285         '#GETVALUE':fallback_getvalue
       
   286         }
       
   287 
       
   288     def dummy(self, c):
       
   289         pass
       
   290 
       
   291     def debug_info(self, c):
       
   292         '''
       
   293         Return some info --- useful to spot problems with refcounting
       
   294         '''
       
   295         self.mutex.acquire()
       
   296         try:
       
   297             result = []
       
   298             keys = self.id_to_obj.keys()
       
   299             keys.sort()
       
   300             for ident in keys:
       
   301                 if ident != 0:
       
   302                     result.append('  %s:       refcount=%s\n    %s' %
       
   303                                   (ident, self.id_to_refcount[ident],
       
   304                                    str(self.id_to_obj[ident][0])[:75]))
       
   305             return '\n'.join(result)
       
   306         finally:
       
   307             self.mutex.release()
       
   308 
       
   309     def number_of_objects(self, c):
       
   310         '''
       
   311         Number of shared objects
       
   312         '''
       
   313         return len(self.id_to_obj) - 1      # don't count ident=0
       
   314 
       
   315     def shutdown(self, c):
       
   316         '''
       
   317         Shutdown this process
       
   318         '''
       
   319         try:
       
   320             try:
       
   321                 util.debug('manager received shutdown message')
       
   322                 c.send(('#RETURN', None))
       
   323 
       
   324                 if sys.stdout != sys.__stdout__:
       
   325                     util.debug('resetting stdout, stderr')
       
   326                     sys.stdout = sys.__stdout__
       
   327                     sys.stderr = sys.__stderr__
       
   328 
       
   329                 util._run_finalizers(0)
       
   330 
       
   331                 for p in active_children():
       
   332                     util.debug('terminating a child process of manager')
       
   333                     p.terminate()
       
   334 
       
   335                 for p in active_children():
       
   336                     util.debug('terminating a child process of manager')
       
   337                     p.join()
       
   338 
       
   339                 util._run_finalizers()
       
   340                 util.info('manager exiting with exitcode 0')
       
   341             except:
       
   342                 import traceback
       
   343                 traceback.print_exc()
       
   344         finally:
       
   345             exit(0)
       
   346 
       
   347     def create(self, c, typeid, *args, **kwds):
       
   348         '''
       
   349         Create a new shared object and return its id
       
   350         '''
       
   351         self.mutex.acquire()
       
   352         try:
       
   353             callable, exposed, method_to_typeid, proxytype = \
       
   354                       self.registry[typeid]
       
   355 
       
   356             if callable is None:
       
   357                 assert len(args) == 1 and not kwds
       
   358                 obj = args[0]
       
   359             else:
       
   360                 obj = callable(*args, **kwds)
       
   361 
       
   362             if exposed is None:
       
   363                 exposed = public_methods(obj)
       
   364             if method_to_typeid is not None:
       
   365                 assert type(method_to_typeid) is dict
       
   366                 exposed = list(exposed) + list(method_to_typeid)
       
   367 
       
   368             ident = '%x' % id(obj)  # convert to string because xmlrpclib
       
   369                                     # only has 32 bit signed integers
       
   370             util.debug('%r callable returned object with id %r', typeid, ident)
       
   371 
       
   372             self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
       
   373             if ident not in self.id_to_refcount:
       
   374                 self.id_to_refcount[ident] = 0
       
   375             # increment the reference count immediately, to avoid
       
   376             # this object being garbage collected before a Proxy
       
   377             # object for it can be created.  The caller of create()
       
   378             # is responsible for doing a decref once the Proxy object
       
   379             # has been created.
       
   380             self.incref(c, ident)
       
   381             return ident, tuple(exposed)
       
   382         finally:
       
   383             self.mutex.release()
       
   384 
       
   385     def get_methods(self, c, token):
       
   386         '''
       
   387         Return the methods of the shared object indicated by token
       
   388         '''
       
   389         return tuple(self.id_to_obj[token.id][1])
       
   390 
       
   391     def accept_connection(self, c, name):
       
   392         '''
       
   393         Spawn a new thread to serve this connection
       
   394         '''
       
   395         threading.current_thread().name = name
       
   396         c.send(('#RETURN', None))
       
   397         self.serve_client(c)
       
   398 
       
   399     def incref(self, c, ident):
       
   400         self.mutex.acquire()
       
   401         try:
       
   402             self.id_to_refcount[ident] += 1
       
   403         finally:
       
   404             self.mutex.release()
       
   405 
       
   406     def decref(self, c, ident):
       
   407         self.mutex.acquire()
       
   408         try:
       
   409             assert self.id_to_refcount[ident] >= 1
       
   410             self.id_to_refcount[ident] -= 1
       
   411             if self.id_to_refcount[ident] == 0:
       
   412                 del self.id_to_obj[ident], self.id_to_refcount[ident]
       
   413                 util.debug('disposing of obj with id %d', ident)
       
   414         finally:
       
   415             self.mutex.release()
       
   416 
       
   417 #
       
   418 # Class to represent state of a manager
       
   419 #
       
   420 
       
   421 class State(object):
       
   422     __slots__ = ['value']
       
   423     INITIAL = 0
       
   424     STARTED = 1
       
   425     SHUTDOWN = 2
       
   426 
       
   427 #
       
   428 # Mapping from serializer name to Listener and Client types
       
   429 #
       
   430 
       
   431 listener_client = {
       
   432     'pickle' : (connection.Listener, connection.Client),
       
   433     'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
       
   434     }
       
   435 
       
   436 #
       
   437 # Definition of BaseManager
       
   438 #
       
   439 
       
   440 class BaseManager(object):
       
   441     '''
       
   442     Base class for managers
       
   443     '''
       
   444     _registry = {}
       
   445     _Server = Server
       
   446 
       
   447     def __init__(self, address=None, authkey=None, serializer='pickle'):
       
   448         if authkey is None:
       
   449             authkey = current_process().authkey
       
   450         self._address = address     # XXX not final address if eg ('', 0)
       
   451         self._authkey = AuthenticationString(authkey)
       
   452         self._state = State()
       
   453         self._state.value = State.INITIAL
       
   454         self._serializer = serializer
       
   455         self._Listener, self._Client = listener_client[serializer]
       
   456 
       
   457     def __reduce__(self):
       
   458         return type(self).from_address, \
       
   459                (self._address, self._authkey, self._serializer)
       
   460 
       
   461     def get_server(self):
       
   462         '''
       
   463         Return server object with serve_forever() method and address attribute
       
   464         '''
       
   465         assert self._state.value == State.INITIAL
       
   466         return Server(self._registry, self._address,
       
   467                       self._authkey, self._serializer)
       
   468 
       
   469     def connect(self):
       
   470         '''
       
   471         Connect manager object to the server process
       
   472         '''
       
   473         Listener, Client = listener_client[self._serializer]
       
   474         conn = Client(self._address, authkey=self._authkey)
       
   475         dispatch(conn, None, 'dummy')
       
   476         self._state.value = State.STARTED
       
   477 
       
   478     def start(self):
       
   479         '''
       
   480         Spawn a server process for this manager object
       
   481         '''
       
   482         assert self._state.value == State.INITIAL
       
   483 
       
   484         # pipe over which we will retrieve address of server
       
   485         reader, writer = connection.Pipe(duplex=False)
       
   486 
       
   487         # spawn process which runs a server
       
   488         self._process = Process(
       
   489             target=type(self)._run_server,
       
   490             args=(self._registry, self._address, self._authkey,
       
   491                   self._serializer, writer),
       
   492             )
       
   493         ident = ':'.join(str(i) for i in self._process._identity)
       
   494         self._process.name = type(self).__name__  + '-' + ident
       
   495         self._process.start()
       
   496 
       
   497         # get address of server
       
   498         writer.close()
       
   499         self._address = reader.recv()
       
   500         reader.close()
       
   501 
       
   502         # register a finalizer
       
   503         self._state.value = State.STARTED
       
   504         self.shutdown = util.Finalize(
       
   505             self, type(self)._finalize_manager,
       
   506             args=(self._process, self._address, self._authkey,
       
   507                   self._state, self._Client),
       
   508             exitpriority=0
       
   509             )
       
   510 
       
   511     @classmethod
       
   512     def _run_server(cls, registry, address, authkey, serializer, writer):
       
   513         '''
       
   514         Create a server, report its address and run it
       
   515         '''
       
   516         # create server
       
   517         server = cls._Server(registry, address, authkey, serializer)
       
   518 
       
   519         # inform parent process of the server's address
       
   520         writer.send(server.address)
       
   521         writer.close()
       
   522 
       
   523         # run the manager
       
   524         util.info('manager serving at %r', server.address)
       
   525         server.serve_forever()
       
   526 
       
   527     def _create(self, typeid, *args, **kwds):
       
   528         '''
       
   529         Create a new shared object; return the token and exposed tuple
       
   530         '''
       
   531         assert self._state.value == State.STARTED, 'server not yet started'
       
   532         conn = self._Client(self._address, authkey=self._authkey)
       
   533         try:
       
   534             id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
       
   535         finally:
       
   536             conn.close()
       
   537         return Token(typeid, self._address, id), exposed
       
   538 
       
   539     def join(self, timeout=None):
       
   540         '''
       
   541         Join the manager process (if it has been spawned)
       
   542         '''
       
   543         self._process.join(timeout)
       
   544 
       
   545     def _debug_info(self):
       
   546         '''
       
   547         Return some info about the servers shared objects and connections
       
   548         '''
       
   549         conn = self._Client(self._address, authkey=self._authkey)
       
   550         try:
       
   551             return dispatch(conn, None, 'debug_info')
       
   552         finally:
       
   553             conn.close()
       
   554 
       
   555     def _number_of_objects(self):
       
   556         '''
       
   557         Return the number of shared objects
       
   558         '''
       
   559         conn = self._Client(self._address, authkey=self._authkey)
       
   560         try:
       
   561             return dispatch(conn, None, 'number_of_objects')
       
   562         finally:
       
   563             conn.close()
       
   564 
       
   565     def __enter__(self):
       
   566         return self
       
   567 
       
   568     def __exit__(self, exc_type, exc_val, exc_tb):
       
   569         self.shutdown()
       
   570 
       
   571     @staticmethod
       
   572     def _finalize_manager(process, address, authkey, state, _Client):
       
   573         '''
       
   574         Shutdown the manager process; will be registered as a finalizer
       
   575         '''
       
   576         if process.is_alive():
       
   577             util.info('sending shutdown message to manager')
       
   578             try:
       
   579                 conn = _Client(address, authkey=authkey)
       
   580                 try:
       
   581                     dispatch(conn, None, 'shutdown')
       
   582                 finally:
       
   583                     conn.close()
       
   584             except Exception:
       
   585                 pass
       
   586 
       
   587             process.join(timeout=0.2)
       
   588             if process.is_alive():
       
   589                 util.info('manager still alive')
       
   590                 if hasattr(process, 'terminate'):
       
   591                     util.info('trying to `terminate()` manager process')
       
   592                     process.terminate()
       
   593                     process.join(timeout=0.1)
       
   594                     if process.is_alive():
       
   595                         util.info('manager still alive after terminate')
       
   596 
       
   597         state.value = State.SHUTDOWN
       
   598         try:
       
   599             del BaseProxy._address_to_local[address]
       
   600         except KeyError:
       
   601             pass
       
   602 
       
   603     address = property(lambda self: self._address)
       
   604 
       
   605     @classmethod
       
   606     def register(cls, typeid, callable=None, proxytype=None, exposed=None,
       
   607                  method_to_typeid=None, create_method=True):
       
   608         '''
       
   609         Register a typeid with the manager type
       
   610         '''
       
   611         if '_registry' not in cls.__dict__:
       
   612             cls._registry = cls._registry.copy()
       
   613 
       
   614         if proxytype is None:
       
   615             proxytype = AutoProxy
       
   616 
       
   617         exposed = exposed or getattr(proxytype, '_exposed_', None)
       
   618 
       
   619         method_to_typeid = method_to_typeid or \
       
   620                            getattr(proxytype, '_method_to_typeid_', None)
       
   621 
       
   622         if method_to_typeid:
       
   623             for key, value in method_to_typeid.items():
       
   624                 assert type(key) is str, '%r is not a string' % key
       
   625                 assert type(value) is str, '%r is not a string' % value
       
   626 
       
   627         cls._registry[typeid] = (
       
   628             callable, exposed, method_to_typeid, proxytype
       
   629             )
       
   630 
       
   631         if create_method:
       
   632             def temp(self, *args, **kwds):
       
   633                 util.debug('requesting creation of a shared %r object', typeid)
       
   634                 token, exp = self._create(typeid, *args, **kwds)
       
   635                 proxy = proxytype(
       
   636                     token, self._serializer, manager=self,
       
   637                     authkey=self._authkey, exposed=exp
       
   638                     )
       
   639                 conn = self._Client(token.address, authkey=self._authkey)
       
   640                 dispatch(conn, None, 'decref', (token.id,))
       
   641                 return proxy
       
   642             temp.__name__ = typeid
       
   643             setattr(cls, typeid, temp)
       
   644 
       
   645 #
       
   646 # Subclass of set which get cleared after a fork
       
   647 #
       
   648 
       
   649 class ProcessLocalSet(set):
       
   650     def __init__(self):
       
   651         util.register_after_fork(self, lambda obj: obj.clear())
       
   652     def __reduce__(self):
       
   653         return type(self), ()
       
   654 
       
   655 #
       
   656 # Definition of BaseProxy
       
   657 #
       
   658 
       
   659 class BaseProxy(object):
       
   660     '''
       
   661     A base for proxies of shared objects
       
   662     '''
       
   663     _address_to_local = {}
       
   664     _mutex = util.ForkAwareThreadLock()
       
   665 
       
   666     def __init__(self, token, serializer, manager=None,
       
   667                  authkey=None, exposed=None, incref=True):
       
   668         BaseProxy._mutex.acquire()
       
   669         try:
       
   670             tls_idset = BaseProxy._address_to_local.get(token.address, None)
       
   671             if tls_idset is None:
       
   672                 tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
       
   673                 BaseProxy._address_to_local[token.address] = tls_idset
       
   674         finally:
       
   675             BaseProxy._mutex.release()
       
   676 
       
   677         # self._tls is used to record the connection used by this
       
   678         # thread to communicate with the manager at token.address
       
   679         self._tls = tls_idset[0]
       
   680 
       
   681         # self._idset is used to record the identities of all shared
       
   682         # objects for which the current process owns references and
       
   683         # which are in the manager at token.address
       
   684         self._idset = tls_idset[1]
       
   685 
       
   686         self._token = token
       
   687         self._id = self._token.id
       
   688         self._manager = manager
       
   689         self._serializer = serializer
       
   690         self._Client = listener_client[serializer][1]
       
   691 
       
   692         if authkey is not None:
       
   693             self._authkey = AuthenticationString(authkey)
       
   694         elif self._manager is not None:
       
   695             self._authkey = self._manager._authkey
       
   696         else:
       
   697             self._authkey = current_process().authkey
       
   698 
       
   699         if incref:
       
   700             self._incref()
       
   701 
       
   702         util.register_after_fork(self, BaseProxy._after_fork)
       
   703 
       
   704     def _connect(self):
       
   705         util.debug('making connection to manager')
       
   706         name = current_process().name
       
   707         if threading.current_thread().name != 'MainThread':
       
   708             name += '|' + threading.current_thread().name
       
   709         conn = self._Client(self._token.address, authkey=self._authkey)
       
   710         dispatch(conn, None, 'accept_connection', (name,))
       
   711         self._tls.connection = conn
       
   712 
       
   713     def _callmethod(self, methodname, args=(), kwds={}):
       
   714         '''
       
   715         Try to call a method of the referrent and return a copy of the result
       
   716         '''
       
   717         try:
       
   718             conn = self._tls.connection
       
   719         except AttributeError:
       
   720             util.debug('thread %r does not own a connection',
       
   721                        threading.current_thread().name)
       
   722             self._connect()
       
   723             conn = self._tls.connection
       
   724 
       
   725         conn.send((self._id, methodname, args, kwds))
       
   726         kind, result = conn.recv()
       
   727 
       
   728         if kind == '#RETURN':
       
   729             return result
       
   730         elif kind == '#PROXY':
       
   731             exposed, token = result
       
   732             proxytype = self._manager._registry[token.typeid][-1]
       
   733             proxy = proxytype(
       
   734                 token, self._serializer, manager=self._manager,
       
   735                 authkey=self._authkey, exposed=exposed
       
   736                 )
       
   737             conn = self._Client(token.address, authkey=self._authkey)
       
   738             dispatch(conn, None, 'decref', (token.id,))
       
   739             return proxy
       
   740         raise convert_to_error(kind, result)
       
   741 
       
   742     def _getvalue(self):
       
   743         '''
       
   744         Get a copy of the value of the referent
       
   745         '''
       
   746         return self._callmethod('#GETVALUE')
       
   747 
       
   748     def _incref(self):
       
   749         conn = self._Client(self._token.address, authkey=self._authkey)
       
   750         dispatch(conn, None, 'incref', (self._id,))
       
   751         util.debug('INCREF %r', self._token.id)
       
   752 
       
   753         self._idset.add(self._id)
       
   754 
       
   755         state = self._manager and self._manager._state
       
   756 
       
   757         self._close = util.Finalize(
       
   758             self, BaseProxy._decref,
       
   759             args=(self._token, self._authkey, state,
       
   760                   self._tls, self._idset, self._Client),
       
   761             exitpriority=10
       
   762             )
       
   763 
       
   764     @staticmethod
       
   765     def _decref(token, authkey, state, tls, idset, _Client):
       
   766         idset.discard(token.id)
       
   767 
       
   768         # check whether manager is still alive
       
   769         if state is None or state.value == State.STARTED:
       
   770             # tell manager this process no longer cares about referent
       
   771             try:
       
   772                 util.debug('DECREF %r', token.id)
       
   773                 conn = _Client(token.address, authkey=authkey)
       
   774                 dispatch(conn, None, 'decref', (token.id,))
       
   775             except Exception, e:
       
   776                 util.debug('... decref failed %s', e)
       
   777 
       
   778         else:
       
   779             util.debug('DECREF %r -- manager already shutdown', token.id)
       
   780 
       
   781         # check whether we can close this thread's connection because
       
   782         # the process owns no more references to objects for this manager
       
   783         if not idset and hasattr(tls, 'connection'):
       
   784             util.debug('thread %r has no more proxies so closing conn',
       
   785                        threading.current_thread().name)
       
   786             tls.connection.close()
       
   787             del tls.connection
       
   788 
       
   789     def _after_fork(self):
       
   790         self._manager = None
       
   791         try:
       
   792             self._incref()
       
   793         except Exception, e:
       
   794             # the proxy may just be for a manager which has shutdown
       
   795             util.info('incref failed: %s' % e)
       
   796 
       
   797     def __reduce__(self):
       
   798         kwds = {}
       
   799         if Popen.thread_is_spawning():
       
   800             kwds['authkey'] = self._authkey
       
   801 
       
   802         if getattr(self, '_isauto', False):
       
   803             kwds['exposed'] = self._exposed_
       
   804             return (RebuildProxy,
       
   805                     (AutoProxy, self._token, self._serializer, kwds))
       
   806         else:
       
   807             return (RebuildProxy,
       
   808                     (type(self), self._token, self._serializer, kwds))
       
   809 
       
   810     def __deepcopy__(self, memo):
       
   811         return self._getvalue()
       
   812 
       
   813     def __repr__(self):
       
   814         return '<%s object, typeid %r at %s>' % \
       
   815                (type(self).__name__, self._token.typeid, '0x%x' % id(self))
       
   816 
       
   817     def __str__(self):
       
   818         '''
       
   819         Return representation of the referent (or a fall-back if that fails)
       
   820         '''
       
   821         try:
       
   822             return self._callmethod('__repr__')
       
   823         except Exception:
       
   824             return repr(self)[:-1] + "; '__str__()' failed>"
       
   825 
       
   826 #
       
   827 # Function used for unpickling
       
   828 #
       
   829 
       
   830 def RebuildProxy(func, token, serializer, kwds):
       
   831     '''
       
   832     Function used for unpickling proxy objects.
       
   833 
       
   834     If possible the shared object is returned, or otherwise a proxy for it.
       
   835     '''
       
   836     server = getattr(current_process(), '_manager_server', None)
       
   837 
       
   838     if server and server.address == token.address:
       
   839         return server.id_to_obj[token.id][0]
       
   840     else:
       
   841         incref = (
       
   842             kwds.pop('incref', True) and
       
   843             not getattr(current_process(), '_inheriting', False)
       
   844             )
       
   845         return func(token, serializer, incref=incref, **kwds)
       
   846 
       
   847 #
       
   848 # Functions to create proxies and proxy types
       
   849 #
       
   850 
       
   851 def MakeProxyType(name, exposed, _cache={}):
       
   852     '''
       
   853     Return an proxy type whose methods are given by `exposed`
       
   854     '''
       
   855     exposed = tuple(exposed)
       
   856     try:
       
   857         return _cache[(name, exposed)]
       
   858     except KeyError:
       
   859         pass
       
   860 
       
   861     dic = {}
       
   862 
       
   863     for meth in exposed:
       
   864         exec '''def %s(self, *args, **kwds):
       
   865         return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
       
   866 
       
   867     ProxyType = type(name, (BaseProxy,), dic)
       
   868     ProxyType._exposed_ = exposed
       
   869     _cache[(name, exposed)] = ProxyType
       
   870     return ProxyType
       
   871 
       
   872 
       
   873 def AutoProxy(token, serializer, manager=None, authkey=None,
       
   874               exposed=None, incref=True):
       
   875     '''
       
   876     Return an auto-proxy for `token`
       
   877     '''
       
   878     _Client = listener_client[serializer][1]
       
   879 
       
   880     if exposed is None:
       
   881         conn = _Client(token.address, authkey=authkey)
       
   882         try:
       
   883             exposed = dispatch(conn, None, 'get_methods', (token,))
       
   884         finally:
       
   885             conn.close()
       
   886 
       
   887     if authkey is None and manager is not None:
       
   888         authkey = manager._authkey
       
   889     if authkey is None:
       
   890         authkey = current_process().authkey
       
   891 
       
   892     ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
       
   893     proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
       
   894                       incref=incref)
       
   895     proxy._isauto = True
       
   896     return proxy
       
   897 
       
   898 #
       
   899 # Types/callables which we will register with SyncManager
       
   900 #
       
   901 
       
   902 class Namespace(object):
       
   903     def __init__(self, **kwds):
       
   904         self.__dict__.update(kwds)
       
   905     def __repr__(self):
       
   906         items = self.__dict__.items()
       
   907         temp = []
       
   908         for name, value in items:
       
   909             if not name.startswith('_'):
       
   910                 temp.append('%s=%r' % (name, value))
       
   911         temp.sort()
       
   912         return 'Namespace(%s)' % str.join(', ', temp)
       
   913 
       
   914 class Value(object):
       
   915     def __init__(self, typecode, value, lock=True):
       
   916         self._typecode = typecode
       
   917         self._value = value
       
   918     def get(self):
       
   919         return self._value
       
   920     def set(self, value):
       
   921         self._value = value
       
   922     def __repr__(self):
       
   923         return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
       
   924     value = property(get, set)
       
   925 
       
   926 def Array(typecode, sequence, lock=True):
       
   927     return array.array(typecode, sequence)
       
   928 
       
   929 #
       
   930 # Proxy types used by SyncManager
       
   931 #
       
   932 
       
   933 class IteratorProxy(BaseProxy):
       
   934     # XXX remove methods for Py3.0 and Py2.6
       
   935     _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
       
   936     def __iter__(self):
       
   937         return self
       
   938     def __next__(self, *args):
       
   939         return self._callmethod('__next__', args)
       
   940     def next(self, *args):
       
   941         return self._callmethod('next', args)
       
   942     def send(self, *args):
       
   943         return self._callmethod('send', args)
       
   944     def throw(self, *args):
       
   945         return self._callmethod('throw', args)
       
   946     def close(self, *args):
       
   947         return self._callmethod('close', args)
       
   948 
       
   949 
       
   950 class AcquirerProxy(BaseProxy):
       
   951     _exposed_ = ('acquire', 'release')
       
   952     def acquire(self, blocking=True):
       
   953         return self._callmethod('acquire', (blocking,))
       
   954     def release(self):
       
   955         return self._callmethod('release')
       
   956     def __enter__(self):
       
   957         return self._callmethod('acquire')
       
   958     def __exit__(self, exc_type, exc_val, exc_tb):
       
   959         return self._callmethod('release')
       
   960 
       
   961 
       
   962 class ConditionProxy(AcquirerProxy):
       
   963     # XXX will Condition.notfyAll() name be available in Py3.0?
       
   964     _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
       
   965     def wait(self, timeout=None):
       
   966         return self._callmethod('wait', (timeout,))
       
   967     def notify(self):
       
   968         return self._callmethod('notify')
       
   969     def notify_all(self):
       
   970         return self._callmethod('notify_all')
       
   971 
       
   972 class EventProxy(BaseProxy):
       
   973     _exposed_ = ('is_set', 'set', 'clear', 'wait')
       
   974     def is_set(self):
       
   975         return self._callmethod('is_set')
       
   976     def set(self):
       
   977         return self._callmethod('set')
       
   978     def clear(self):
       
   979         return self._callmethod('clear')
       
   980     def wait(self, timeout=None):
       
   981         return self._callmethod('wait', (timeout,))
       
   982 
       
   983 class NamespaceProxy(BaseProxy):
       
   984     _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
       
   985     def __getattr__(self, key):
       
   986         if key[0] == '_':
       
   987             return object.__getattribute__(self, key)
       
   988         callmethod = object.__getattribute__(self, '_callmethod')
       
   989         return callmethod('__getattribute__', (key,))
       
   990     def __setattr__(self, key, value):
       
   991         if key[0] == '_':
       
   992             return object.__setattr__(self, key, value)
       
   993         callmethod = object.__getattribute__(self, '_callmethod')
       
   994         return callmethod('__setattr__', (key, value))
       
   995     def __delattr__(self, key):
       
   996         if key[0] == '_':
       
   997             return object.__delattr__(self, key)
       
   998         callmethod = object.__getattribute__(self, '_callmethod')
       
   999         return callmethod('__delattr__', (key,))
       
  1000 
       
  1001 
       
  1002 class ValueProxy(BaseProxy):
       
  1003     _exposed_ = ('get', 'set')
       
  1004     def get(self):
       
  1005         return self._callmethod('get')
       
  1006     def set(self, value):
       
  1007         return self._callmethod('set', (value,))
       
  1008     value = property(get, set)
       
  1009 
       
  1010 
       
  1011 BaseListProxy = MakeProxyType('BaseListProxy', (
       
  1012     '__add__', '__contains__', '__delitem__', '__delslice__',
       
  1013     '__getitem__', '__getslice__', '__len__', '__mul__',
       
  1014     '__reversed__', '__rmul__', '__setitem__', '__setslice__',
       
  1015     'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
       
  1016     'reverse', 'sort', '__imul__'
       
  1017     ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
       
  1018 class ListProxy(BaseListProxy):
       
  1019     def __iadd__(self, value):
       
  1020         self._callmethod('extend', (value,))
       
  1021         return self
       
  1022     def __imul__(self, value):
       
  1023         self._callmethod('__imul__', (value,))
       
  1024         return self
       
  1025 
       
  1026 
       
  1027 DictProxy = MakeProxyType('DictProxy', (
       
  1028     '__contains__', '__delitem__', '__getitem__', '__len__',
       
  1029     '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
       
  1030     'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
       
  1031     ))
       
  1032 
       
  1033 
       
  1034 ArrayProxy = MakeProxyType('ArrayProxy', (
       
  1035     '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
       
  1036     ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
       
  1037 
       
  1038 
       
  1039 PoolProxy = MakeProxyType('PoolProxy', (
       
  1040     'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
       
  1041     'map', 'map_async', 'terminate'
       
  1042     ))
       
  1043 PoolProxy._method_to_typeid_ = {
       
  1044     'apply_async': 'AsyncResult',
       
  1045     'map_async': 'AsyncResult',
       
  1046     'imap': 'Iterator',
       
  1047     'imap_unordered': 'Iterator'
       
  1048     }
       
  1049 
       
  1050 #
       
  1051 # Definition of SyncManager
       
  1052 #
       
  1053 
       
  1054 class SyncManager(BaseManager):
       
  1055     '''
       
  1056     Subclass of `BaseManager` which supports a number of shared object types.
       
  1057 
       
  1058     The types registered are those intended for the synchronization
       
  1059     of threads, plus `dict`, `list` and `Namespace`.
       
  1060 
       
  1061     The `multiprocessing.Manager()` function creates started instances of
       
  1062     this class.
       
  1063     '''
       
  1064 
       
  1065 SyncManager.register('Queue', Queue.Queue)
       
  1066 SyncManager.register('JoinableQueue', Queue.Queue)
       
  1067 SyncManager.register('Event', threading.Event, EventProxy)
       
  1068 SyncManager.register('Lock', threading.Lock, AcquirerProxy)
       
  1069 SyncManager.register('RLock', threading.RLock, AcquirerProxy)
       
  1070 SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
       
  1071 SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
       
  1072                      AcquirerProxy)
       
  1073 SyncManager.register('Condition', threading.Condition, ConditionProxy)
       
  1074 SyncManager.register('Pool', Pool, PoolProxy)
       
  1075 SyncManager.register('list', list, ListProxy)
       
  1076 SyncManager.register('dict', dict, DictProxy)
       
  1077 SyncManager.register('Value', Value, ValueProxy)
       
  1078 SyncManager.register('Array', Array, ArrayProxy)
       
  1079 SyncManager.register('Namespace', Namespace, NamespaceProxy)
       
  1080 
       
  1081 # types returned by methods of PoolProxy
       
  1082 SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
       
  1083 SyncManager.register('AsyncResult', create_method=False)