symbian-qemu-0.9.1-12/python-2.6.1/Doc/includes/mp_distributing.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #
       
     2 # Module to allow spawning of processes on foreign host
       
     3 #
       
     4 # Depends on `multiprocessing` package -- tested with `processing-0.60`
       
     5 #
       
     6 
       
     7 __all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
       
     8 
       
     9 #
       
    10 # Imports
       
    11 #
       
    12 
       
    13 import sys
       
    14 import os
       
    15 import tarfile
       
    16 import shutil
       
    17 import subprocess
       
    18 import logging
       
    19 import itertools
       
    20 import Queue
       
    21 
       
    22 try:
       
    23     import cPickle as pickle
       
    24 except ImportError:
       
    25     import pickle
       
    26 
       
    27 from multiprocessing import Process, current_process, cpu_count
       
    28 from multiprocessing import util, managers, connection, forking, pool
       
    29 
       
    30 #
       
    31 # Logging
       
    32 #
       
    33 
       
    34 def get_logger():
       
    35     return _logger
       
    36 
       
    37 _logger = logging.getLogger('distributing')
       
    38 _logger.propogate = 0
       
    39 
       
    40 _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
       
    41 _handler = logging.StreamHandler()
       
    42 _handler.setFormatter(_formatter)
       
    43 _logger.addHandler(_handler)
       
    44 
       
    45 info = _logger.info
       
    46 debug = _logger.debug
       
    47 
       
    48 #
       
    49 # Get number of cpus
       
    50 #
       
    51 
       
    52 try:
       
    53     slot_count = cpu_count()
       
    54 except NotImplemented:
       
    55     slot_count = 1
       
    56         
       
    57 #
       
    58 # Manager type which spawns subprocesses
       
    59 #
       
    60 
       
    61 class HostManager(managers.SyncManager):
       
    62     '''
       
    63     Manager type used for spawning processes on a (presumably) foreign host
       
    64     '''    
       
    65     def __init__(self, address, authkey):
       
    66         managers.SyncManager.__init__(self, address, authkey)
       
    67         self._name = 'Host-unknown'
       
    68 
       
    69     def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
       
    70         if hasattr(sys.modules['__main__'], '__file__'):
       
    71             main_path = os.path.basename(sys.modules['__main__'].__file__)
       
    72         else:
       
    73             main_path = None
       
    74         data = pickle.dumps((target, args, kwargs))
       
    75         p = self._RemoteProcess(data, main_path)
       
    76         if name is None:
       
    77             temp = self._name.split('Host-')[-1] + '/Process-%s'
       
    78             name = temp % ':'.join(map(str, p.get_identity()))
       
    79         p.set_name(name)
       
    80         return p
       
    81 
       
    82     @classmethod
       
    83     def from_address(cls, address, authkey):
       
    84         manager = cls(address, authkey)
       
    85         managers.transact(address, authkey, 'dummy')
       
    86         manager._state.value = managers.State.STARTED
       
    87         manager._name = 'Host-%s:%s' % manager.address
       
    88         manager.shutdown = util.Finalize(
       
    89             manager, HostManager._finalize_host,
       
    90             args=(manager._address, manager._authkey, manager._name),
       
    91             exitpriority=-10
       
    92             )
       
    93         return manager
       
    94 
       
    95     @staticmethod
       
    96     def _finalize_host(address, authkey, name):
       
    97         managers.transact(address, authkey, 'shutdown')
       
    98         
       
    99     def __repr__(self):
       
   100         return '<Host(%s)>' % self._name
       
   101 
       
   102 #
       
   103 # Process subclass representing a process on (possibly) a remote machine
       
   104 #
       
   105 
       
   106 class RemoteProcess(Process):
       
   107     '''
       
   108     Represents a process started on a remote host
       
   109     '''
       
   110     def __init__(self, data, main_path):
       
   111         assert not main_path or os.path.basename(main_path) == main_path
       
   112         Process.__init__(self)
       
   113         self._data = data
       
   114         self._main_path = main_path
       
   115         
       
   116     def _bootstrap(self):
       
   117         forking.prepare({'main_path': self._main_path})
       
   118         self._target, self._args, self._kwargs = pickle.loads(self._data)
       
   119         return Process._bootstrap(self)
       
   120         
       
   121     def get_identity(self):
       
   122         return self._identity
       
   123 
       
   124 HostManager.register('_RemoteProcess', RemoteProcess)
       
   125 
       
   126 #
       
   127 # A Pool class that uses a cluster
       
   128 #
       
   129 
       
   130 class DistributedPool(pool.Pool):
       
   131     
       
   132     def __init__(self, cluster, processes=None, initializer=None, initargs=()):
       
   133         self._cluster = cluster
       
   134         self.Process = cluster.Process
       
   135         pool.Pool.__init__(self, processes or len(cluster),
       
   136                            initializer, initargs)
       
   137         
       
   138     def _setup_queues(self):
       
   139         self._inqueue = self._cluster._SettableQueue()
       
   140         self._outqueue = self._cluster._SettableQueue()
       
   141         self._quick_put = self._inqueue.put
       
   142         self._quick_get = self._outqueue.get
       
   143 
       
   144     @staticmethod
       
   145     def _help_stuff_finish(inqueue, task_handler, size):
       
   146         inqueue.set_contents([None] * size)
       
   147 
       
   148 #
       
   149 # Manager type which starts host managers on other machines
       
   150 #
       
   151 
       
   152 def LocalProcess(**kwds):
       
   153     p = Process(**kwds)
       
   154     p.set_name('localhost/' + p.name)
       
   155     return p
       
   156 
       
   157 class Cluster(managers.SyncManager):
       
   158     '''
       
   159     Represents collection of slots running on various hosts.
       
   160     
       
   161     `Cluster` is a subclass of `SyncManager` so it allows creation of
       
   162     various types of shared objects.
       
   163     '''
       
   164     def __init__(self, hostlist, modules):
       
   165         managers.SyncManager.__init__(self, address=('localhost', 0))
       
   166         self._hostlist = hostlist
       
   167         self._modules = modules
       
   168         if __name__ not in modules:
       
   169             modules.append(__name__)
       
   170         files = [sys.modules[name].__file__ for name in modules]
       
   171         for i, file in enumerate(files):
       
   172             if file.endswith('.pyc') or file.endswith('.pyo'):
       
   173                 files[i] = file[:-4] + '.py'
       
   174         self._files = [os.path.abspath(file) for file in files]
       
   175         
       
   176     def start(self):
       
   177         managers.SyncManager.start(self)
       
   178         
       
   179         l = connection.Listener(family='AF_INET', authkey=self._authkey)
       
   180         
       
   181         for i, host in enumerate(self._hostlist):
       
   182             host._start_manager(i, self._authkey, l.address, self._files)
       
   183 
       
   184         for host in self._hostlist:
       
   185             if host.hostname != 'localhost':
       
   186                 conn = l.accept()
       
   187                 i, address, cpus = conn.recv()
       
   188                 conn.close()
       
   189                 other_host = self._hostlist[i]
       
   190                 other_host.manager = HostManager.from_address(address,
       
   191                                                               self._authkey)
       
   192                 other_host.slots = other_host.slots or cpus
       
   193                 other_host.Process = other_host.manager.Process
       
   194             else:
       
   195                 host.slots = host.slots or slot_count
       
   196                 host.Process = LocalProcess
       
   197 
       
   198         self._slotlist = [
       
   199             Slot(host) for host in self._hostlist for i in range(host.slots)
       
   200             ]
       
   201         self._slot_iterator = itertools.cycle(self._slotlist)
       
   202         self._base_shutdown = self.shutdown
       
   203         del self.shutdown
       
   204         
       
   205     def shutdown(self):
       
   206         for host in self._hostlist:
       
   207             if host.hostname != 'localhost':
       
   208                 host.manager.shutdown()
       
   209         self._base_shutdown()
       
   210         
       
   211     def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
       
   212         slot = self._slot_iterator.next()
       
   213         return slot.Process(
       
   214             group=group, target=target, name=name, args=args, kwargs=kwargs
       
   215             )
       
   216 
       
   217     def Pool(self, processes=None, initializer=None, initargs=()):
       
   218         return DistributedPool(self, processes, initializer, initargs)
       
   219     
       
   220     def __getitem__(self, i):
       
   221         return self._slotlist[i]
       
   222 
       
   223     def __len__(self):
       
   224         return len(self._slotlist)
       
   225 
       
   226     def __iter__(self):
       
   227         return iter(self._slotlist)
       
   228 
       
   229 #
       
   230 # Queue subclass used by distributed pool
       
   231 #
       
   232 
       
   233 class SettableQueue(Queue.Queue):
       
   234     def empty(self):
       
   235         return not self.queue
       
   236     def full(self):
       
   237         return self.maxsize > 0 and len(self.queue) == self.maxsize
       
   238     def set_contents(self, contents):
       
   239         # length of contents must be at least as large as the number of
       
   240         # threads which have potentially called get()
       
   241         self.not_empty.acquire()
       
   242         try:
       
   243             self.queue.clear()
       
   244             self.queue.extend(contents)
       
   245             self.not_empty.notifyAll()
       
   246         finally:
       
   247             self.not_empty.release()
       
   248             
       
   249 Cluster.register('_SettableQueue', SettableQueue)
       
   250 
       
   251 #
       
   252 # Class representing a notional cpu in the cluster
       
   253 #
       
   254 
       
   255 class Slot(object):
       
   256     def __init__(self, host):
       
   257         self.host = host
       
   258         self.Process = host.Process
       
   259 
       
   260 #
       
   261 # Host
       
   262 #
       
   263 
       
   264 class Host(object):
       
   265     '''
       
   266     Represents a host to use as a node in a cluster.
       
   267 
       
   268     `hostname` gives the name of the host.  If hostname is not
       
   269     "localhost" then ssh is used to log in to the host.  To log in as
       
   270     a different user use a host name of the form
       
   271     "username@somewhere.org"
       
   272 
       
   273     `slots` is used to specify the number of slots for processes on
       
   274     the host.  This affects how often processes will be allocated to
       
   275     this host.  Normally this should be equal to the number of cpus on
       
   276     that host.
       
   277     '''
       
   278     def __init__(self, hostname, slots=None):
       
   279         self.hostname = hostname
       
   280         self.slots = slots
       
   281         
       
   282     def _start_manager(self, index, authkey, address, files):
       
   283         if self.hostname != 'localhost':
       
   284             tempdir = copy_to_remote_temporary_directory(self.hostname, files)
       
   285             debug('startup files copied to %s:%s', self.hostname, tempdir)
       
   286             p = subprocess.Popen(
       
   287                 ['ssh', self.hostname, 'python', '-c',
       
   288                  '"import os; os.chdir(%r); '
       
   289                  'from distributing import main; main()"' % tempdir],
       
   290                 stdin=subprocess.PIPE
       
   291                 )
       
   292             data = dict(
       
   293                 name='BoostrappingHost', index=index,
       
   294                 dist_log_level=_logger.getEffectiveLevel(),
       
   295                 dir=tempdir, authkey=str(authkey), parent_address=address
       
   296                 )
       
   297             pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
       
   298             p.stdin.close()
       
   299 
       
   300 #
       
   301 # Copy files to remote directory, returning name of directory
       
   302 #
       
   303 
       
   304 unzip_code = '''"
       
   305 import tempfile, os, sys, tarfile
       
   306 tempdir = tempfile.mkdtemp(prefix='distrib-')
       
   307 os.chdir(tempdir)
       
   308 tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
       
   309 for ti in tf:
       
   310     tf.extract(ti)
       
   311 print tempdir
       
   312 "'''
       
   313 
       
   314 def copy_to_remote_temporary_directory(host, files):
       
   315     p = subprocess.Popen(
       
   316         ['ssh', host, 'python', '-c', unzip_code],
       
   317         stdout=subprocess.PIPE, stdin=subprocess.PIPE
       
   318         )
       
   319     tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
       
   320     for name in files:
       
   321         tf.add(name, os.path.basename(name))
       
   322     tf.close()
       
   323     p.stdin.close()
       
   324     return p.stdout.read().rstrip()
       
   325 
       
   326 #
       
   327 # Code which runs a host manager
       
   328 #
       
   329 
       
   330 def main():   
       
   331     # get data from parent over stdin
       
   332     data = pickle.load(sys.stdin)
       
   333     sys.stdin.close()
       
   334 
       
   335     # set some stuff
       
   336     _logger.setLevel(data['dist_log_level'])
       
   337     forking.prepare(data)
       
   338     
       
   339     # create server for a `HostManager` object
       
   340     server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
       
   341     current_process()._server = server
       
   342     
       
   343     # report server address and number of cpus back to parent
       
   344     conn = connection.Client(data['parent_address'], authkey=data['authkey'])
       
   345     conn.send((data['index'], server.address, slot_count))
       
   346     conn.close()
       
   347     
       
   348     # set name etc
       
   349     current_process().set_name('Host-%s:%s' % server.address)
       
   350     util._run_after_forkers()
       
   351     
       
   352     # register a cleanup function
       
   353     def cleanup(directory):
       
   354         debug('removing directory %s', directory)
       
   355         shutil.rmtree(directory)
       
   356         debug('shutting down host manager')
       
   357     util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
       
   358     
       
   359     # start host manager
       
   360     debug('remote host manager starting in %s', data['dir'])
       
   361     server.serve_forever()