symbian-qemu-0.9.1-12/python-2.6.1/Doc/includes/mp_distributing.py
changeset 1 2fb8b9db1c86
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/symbian-qemu-0.9.1-12/python-2.6.1/Doc/includes/mp_distributing.py	Fri Jul 31 15:01:17 2009 +0100
@@ -0,0 +1,361 @@
+#
+# Module to allow spawning of processes on foreign host
+#
+# Depends on `multiprocessing` package -- tested with `processing-0.60`
+#
+
+__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
+
+#
+# Imports
+#
+
+import sys
+import os
+import tarfile
+import shutil
+import subprocess
+import logging
+import itertools
+import Queue
+
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+from multiprocessing import Process, current_process, cpu_count
+from multiprocessing import util, managers, connection, forking, pool
+
+#
+# Logging
+#
+
+def get_logger():
+    return _logger
+
+_logger = logging.getLogger('distributing')
+_logger.propogate = 0
+
+_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
+_handler = logging.StreamHandler()
+_handler.setFormatter(_formatter)
+_logger.addHandler(_handler)
+
+info = _logger.info
+debug = _logger.debug
+
+#
+# Get number of cpus
+#
+
+try:
+    slot_count = cpu_count()
+except NotImplemented:
+    slot_count = 1
+        
+#
+# Manager type which spawns subprocesses
+#
+
+class HostManager(managers.SyncManager):
+    '''
+    Manager type used for spawning processes on a (presumably) foreign host
+    '''    
+    def __init__(self, address, authkey):
+        managers.SyncManager.__init__(self, address, authkey)
+        self._name = 'Host-unknown'
+
+    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
+        if hasattr(sys.modules['__main__'], '__file__'):
+            main_path = os.path.basename(sys.modules['__main__'].__file__)
+        else:
+            main_path = None
+        data = pickle.dumps((target, args, kwargs))
+        p = self._RemoteProcess(data, main_path)
+        if name is None:
+            temp = self._name.split('Host-')[-1] + '/Process-%s'
+            name = temp % ':'.join(map(str, p.get_identity()))
+        p.set_name(name)
+        return p
+
+    @classmethod
+    def from_address(cls, address, authkey):
+        manager = cls(address, authkey)
+        managers.transact(address, authkey, 'dummy')
+        manager._state.value = managers.State.STARTED
+        manager._name = 'Host-%s:%s' % manager.address
+        manager.shutdown = util.Finalize(
+            manager, HostManager._finalize_host,
+            args=(manager._address, manager._authkey, manager._name),
+            exitpriority=-10
+            )
+        return manager
+
+    @staticmethod
+    def _finalize_host(address, authkey, name):
+        managers.transact(address, authkey, 'shutdown')
+        
+    def __repr__(self):
+        return '<Host(%s)>' % self._name
+
+#
+# Process subclass representing a process on (possibly) a remote machine
+#
+
+class RemoteProcess(Process):
+    '''
+    Represents a process started on a remote host
+    '''
+    def __init__(self, data, main_path):
+        assert not main_path or os.path.basename(main_path) == main_path
+        Process.__init__(self)
+        self._data = data
+        self._main_path = main_path
+        
+    def _bootstrap(self):
+        forking.prepare({'main_path': self._main_path})
+        self._target, self._args, self._kwargs = pickle.loads(self._data)
+        return Process._bootstrap(self)
+        
+    def get_identity(self):
+        return self._identity
+
+HostManager.register('_RemoteProcess', RemoteProcess)
+
+#
+# A Pool class that uses a cluster
+#
+
+class DistributedPool(pool.Pool):
+    
+    def __init__(self, cluster, processes=None, initializer=None, initargs=()):
+        self._cluster = cluster
+        self.Process = cluster.Process
+        pool.Pool.__init__(self, processes or len(cluster),
+                           initializer, initargs)
+        
+    def _setup_queues(self):
+        self._inqueue = self._cluster._SettableQueue()
+        self._outqueue = self._cluster._SettableQueue()
+        self._quick_put = self._inqueue.put
+        self._quick_get = self._outqueue.get
+
+    @staticmethod
+    def _help_stuff_finish(inqueue, task_handler, size):
+        inqueue.set_contents([None] * size)
+
+#
+# Manager type which starts host managers on other machines
+#
+
+def LocalProcess(**kwds):
+    p = Process(**kwds)
+    p.set_name('localhost/' + p.name)
+    return p
+
+class Cluster(managers.SyncManager):
+    '''
+    Represents collection of slots running on various hosts.
+    
+    `Cluster` is a subclass of `SyncManager` so it allows creation of
+    various types of shared objects.
+    '''
+    def __init__(self, hostlist, modules):
+        managers.SyncManager.__init__(self, address=('localhost', 0))
+        self._hostlist = hostlist
+        self._modules = modules
+        if __name__ not in modules:
+            modules.append(__name__)
+        files = [sys.modules[name].__file__ for name in modules]
+        for i, file in enumerate(files):
+            if file.endswith('.pyc') or file.endswith('.pyo'):
+                files[i] = file[:-4] + '.py'
+        self._files = [os.path.abspath(file) for file in files]
+        
+    def start(self):
+        managers.SyncManager.start(self)
+        
+        l = connection.Listener(family='AF_INET', authkey=self._authkey)
+        
+        for i, host in enumerate(self._hostlist):
+            host._start_manager(i, self._authkey, l.address, self._files)
+
+        for host in self._hostlist:
+            if host.hostname != 'localhost':
+                conn = l.accept()
+                i, address, cpus = conn.recv()
+                conn.close()
+                other_host = self._hostlist[i]
+                other_host.manager = HostManager.from_address(address,
+                                                              self._authkey)
+                other_host.slots = other_host.slots or cpus
+                other_host.Process = other_host.manager.Process
+            else:
+                host.slots = host.slots or slot_count
+                host.Process = LocalProcess
+
+        self._slotlist = [
+            Slot(host) for host in self._hostlist for i in range(host.slots)
+            ]
+        self._slot_iterator = itertools.cycle(self._slotlist)
+        self._base_shutdown = self.shutdown
+        del self.shutdown
+        
+    def shutdown(self):
+        for host in self._hostlist:
+            if host.hostname != 'localhost':
+                host.manager.shutdown()
+        self._base_shutdown()
+        
+    def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
+        slot = self._slot_iterator.next()
+        return slot.Process(
+            group=group, target=target, name=name, args=args, kwargs=kwargs
+            )
+
+    def Pool(self, processes=None, initializer=None, initargs=()):
+        return DistributedPool(self, processes, initializer, initargs)
+    
+    def __getitem__(self, i):
+        return self._slotlist[i]
+
+    def __len__(self):
+        return len(self._slotlist)
+
+    def __iter__(self):
+        return iter(self._slotlist)
+
+#
+# Queue subclass used by distributed pool
+#
+
+class SettableQueue(Queue.Queue):
+    def empty(self):
+        return not self.queue
+    def full(self):
+        return self.maxsize > 0 and len(self.queue) == self.maxsize
+    def set_contents(self, contents):
+        # length of contents must be at least as large as the number of
+        # threads which have potentially called get()
+        self.not_empty.acquire()
+        try:
+            self.queue.clear()
+            self.queue.extend(contents)
+            self.not_empty.notifyAll()
+        finally:
+            self.not_empty.release()
+            
+Cluster.register('_SettableQueue', SettableQueue)
+
+#
+# Class representing a notional cpu in the cluster
+#
+
+class Slot(object):
+    def __init__(self, host):
+        self.host = host
+        self.Process = host.Process
+
+#
+# Host
+#
+
+class Host(object):
+    '''
+    Represents a host to use as a node in a cluster.
+
+    `hostname` gives the name of the host.  If hostname is not
+    "localhost" then ssh is used to log in to the host.  To log in as
+    a different user use a host name of the form
+    "username@somewhere.org"
+
+    `slots` is used to specify the number of slots for processes on
+    the host.  This affects how often processes will be allocated to
+    this host.  Normally this should be equal to the number of cpus on
+    that host.
+    '''
+    def __init__(self, hostname, slots=None):
+        self.hostname = hostname
+        self.slots = slots
+        
+    def _start_manager(self, index, authkey, address, files):
+        if self.hostname != 'localhost':
+            tempdir = copy_to_remote_temporary_directory(self.hostname, files)
+            debug('startup files copied to %s:%s', self.hostname, tempdir)
+            p = subprocess.Popen(
+                ['ssh', self.hostname, 'python', '-c',
+                 '"import os; os.chdir(%r); '
+                 'from distributing import main; main()"' % tempdir],
+                stdin=subprocess.PIPE
+                )
+            data = dict(
+                name='BoostrappingHost', index=index,
+                dist_log_level=_logger.getEffectiveLevel(),
+                dir=tempdir, authkey=str(authkey), parent_address=address
+                )
+            pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
+            p.stdin.close()
+
+#
+# Copy files to remote directory, returning name of directory
+#
+
+unzip_code = '''"
+import tempfile, os, sys, tarfile
+tempdir = tempfile.mkdtemp(prefix='distrib-')
+os.chdir(tempdir)
+tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
+for ti in tf:
+    tf.extract(ti)
+print tempdir
+"'''
+
+def copy_to_remote_temporary_directory(host, files):
+    p = subprocess.Popen(
+        ['ssh', host, 'python', '-c', unzip_code],
+        stdout=subprocess.PIPE, stdin=subprocess.PIPE
+        )
+    tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
+    for name in files:
+        tf.add(name, os.path.basename(name))
+    tf.close()
+    p.stdin.close()
+    return p.stdout.read().rstrip()
+
+#
+# Code which runs a host manager
+#
+
+def main():   
+    # get data from parent over stdin
+    data = pickle.load(sys.stdin)
+    sys.stdin.close()
+
+    # set some stuff
+    _logger.setLevel(data['dist_log_level'])
+    forking.prepare(data)
+    
+    # create server for a `HostManager` object
+    server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
+    current_process()._server = server
+    
+    # report server address and number of cpus back to parent
+    conn = connection.Client(data['parent_address'], authkey=data['authkey'])
+    conn.send((data['index'], server.address, slot_count))
+    conn.close()
+    
+    # set name etc
+    current_process().set_name('Host-%s:%s' % server.address)
+    util._run_after_forkers()
+    
+    # register a cleanup function
+    def cleanup(directory):
+        debug('removing directory %s', directory)
+        shutil.rmtree(directory)
+        debug('shutting down host manager')
+    util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
+    
+    # start host manager
+    debug('remote host manager starting in %s', data['dir'])
+    server.serve_forever()