symbian-qemu-0.9.1-12/python-win32-2.6.1/lib/multiprocessing/util.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #
       
     2 # Module providing various facilities to other parts of the package
       
     3 #
       
     4 # multiprocessing/util.py
       
     5 #
       
     6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
       
     7 #
       
     8 
       
     9 import itertools
       
    10 import weakref
       
    11 import atexit
       
    12 import threading        # we want threading to install it's
       
    13                         # cleanup function before multiprocessing does
       
    14 
       
    15 from multiprocessing.process import current_process, active_children
       
    16 
       
    17 __all__ = [
       
    18     'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
       
    19     'log_to_stderr', 'get_temp_dir', 'register_after_fork',
       
    20     'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal'
       
    21     ]
       
    22 
       
    23 #
       
    24 # Logging
       
    25 #
       
    26 
       
    27 NOTSET = 0
       
    28 SUBDEBUG = 5
       
    29 DEBUG = 10
       
    30 INFO = 20
       
    31 SUBWARNING = 25
       
    32 
       
    33 LOGGER_NAME = 'multiprocessing'
       
    34 DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
       
    35 
       
    36 _logger = None
       
    37 _log_to_stderr = False
       
    38 
       
    39 def sub_debug(msg, *args):
       
    40     if _logger:
       
    41         _logger.log(SUBDEBUG, msg, *args)
       
    42 
       
    43 def debug(msg, *args):
       
    44     if _logger:
       
    45         _logger.log(DEBUG, msg, *args)
       
    46 
       
    47 def info(msg, *args):
       
    48     if _logger:
       
    49         _logger.log(INFO, msg, *args)
       
    50 
       
    51 def sub_warning(msg, *args):
       
    52     if _logger:
       
    53         _logger.log(SUBWARNING, msg, *args)
       
    54 
       
    55 def get_logger():
       
    56     '''
       
    57     Returns logger used by multiprocessing
       
    58     '''
       
    59     global _logger
       
    60 
       
    61     if not _logger:
       
    62         import logging, atexit
       
    63 
       
    64         # XXX multiprocessing should cleanup before logging
       
    65         if hasattr(atexit, 'unregister'):
       
    66             atexit.unregister(_exit_function)
       
    67             atexit.register(_exit_function)
       
    68         else:
       
    69             atexit._exithandlers.remove((_exit_function, (), {}))
       
    70             atexit._exithandlers.append((_exit_function, (), {}))
       
    71 
       
    72         _check_logger_class()
       
    73         _logger = logging.getLogger(LOGGER_NAME)
       
    74 
       
    75     return _logger
       
    76 
       
    77 def _check_logger_class():
       
    78     '''
       
    79     Make sure process name is recorded when loggers are used
       
    80     '''
       
    81     # XXX This function is unnecessary once logging is patched
       
    82     import logging
       
    83     if hasattr(logging, 'multiprocessing'):
       
    84         return
       
    85 
       
    86     logging._acquireLock()
       
    87     try:
       
    88         OldLoggerClass = logging.getLoggerClass()
       
    89         if not getattr(OldLoggerClass, '_process_aware', False):
       
    90             class ProcessAwareLogger(OldLoggerClass):
       
    91                 _process_aware = True
       
    92                 def makeRecord(self, *args, **kwds):
       
    93                     record = OldLoggerClass.makeRecord(self, *args, **kwds)
       
    94                     record.processName = current_process()._name
       
    95                     return record
       
    96             logging.setLoggerClass(ProcessAwareLogger)
       
    97     finally:
       
    98         logging._releaseLock()
       
    99 
       
   100 def log_to_stderr(level=None):
       
   101     '''
       
   102     Turn on logging and add a handler which prints to stderr
       
   103     '''
       
   104     global _log_to_stderr
       
   105     import logging
       
   106     logger = get_logger()
       
   107     formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
       
   108     handler = logging.StreamHandler()
       
   109     handler.setFormatter(formatter)
       
   110     logger.addHandler(handler)
       
   111     if level is not None:
       
   112         logger.setLevel(level)
       
   113     _log_to_stderr = True
       
   114 
       
   115 #
       
   116 # Function returning a temp directory which will be removed on exit
       
   117 #
       
   118 
       
   119 def get_temp_dir():
       
   120     # get name of a temp directory which will be automatically cleaned up
       
   121     if current_process()._tempdir is None:
       
   122         import shutil, tempfile
       
   123         tempdir = tempfile.mkdtemp(prefix='pymp-')
       
   124         info('created temp directory %s', tempdir)
       
   125         Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
       
   126         current_process()._tempdir = tempdir
       
   127     return current_process()._tempdir
       
   128 
       
   129 #
       
   130 # Support for reinitialization of objects when bootstrapping a child process
       
   131 #
       
   132 
       
   133 _afterfork_registry = weakref.WeakValueDictionary()
       
   134 _afterfork_counter = itertools.count()
       
   135 
       
   136 def _run_after_forkers():
       
   137     items = list(_afterfork_registry.items())
       
   138     items.sort()
       
   139     for (index, ident, func), obj in items:
       
   140         try:
       
   141             func(obj)
       
   142         except Exception, e:
       
   143             info('after forker raised exception %s', e)
       
   144 
       
   145 def register_after_fork(obj, func):
       
   146     _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
       
   147 
       
   148 #
       
   149 # Finalization using weakrefs
       
   150 #
       
   151 
       
   152 _finalizer_registry = {}
       
   153 _finalizer_counter = itertools.count()
       
   154 
       
   155 
       
   156 class Finalize(object):
       
   157     '''
       
   158     Class which supports object finalization using weakrefs
       
   159     '''
       
   160     def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
       
   161         assert exitpriority is None or type(exitpriority) is int
       
   162 
       
   163         if obj is not None:
       
   164             self._weakref = weakref.ref(obj, self)
       
   165         else:
       
   166             assert exitpriority is not None
       
   167 
       
   168         self._callback = callback
       
   169         self._args = args
       
   170         self._kwargs = kwargs or {}
       
   171         self._key = (exitpriority, _finalizer_counter.next())
       
   172 
       
   173         _finalizer_registry[self._key] = self
       
   174 
       
   175     def __call__(self, wr=None):
       
   176         '''
       
   177         Run the callback unless it has already been called or cancelled
       
   178         '''
       
   179         try:
       
   180             del _finalizer_registry[self._key]
       
   181         except KeyError:
       
   182             sub_debug('finalizer no longer registered')
       
   183         else:
       
   184             sub_debug('finalizer calling %s with args %s and kwargs %s',
       
   185                      self._callback, self._args, self._kwargs)
       
   186             res = self._callback(*self._args, **self._kwargs)
       
   187             self._weakref = self._callback = self._args = \
       
   188                             self._kwargs = self._key = None
       
   189             return res
       
   190 
       
   191     def cancel(self):
       
   192         '''
       
   193         Cancel finalization of the object
       
   194         '''
       
   195         try:
       
   196             del _finalizer_registry[self._key]
       
   197         except KeyError:
       
   198             pass
       
   199         else:
       
   200             self._weakref = self._callback = self._args = \
       
   201                             self._kwargs = self._key = None
       
   202 
       
   203     def still_active(self):
       
   204         '''
       
   205         Return whether this finalizer is still waiting to invoke callback
       
   206         '''
       
   207         return self._key in _finalizer_registry
       
   208 
       
   209     def __repr__(self):
       
   210         try:
       
   211             obj = self._weakref()
       
   212         except (AttributeError, TypeError):
       
   213             obj = None
       
   214 
       
   215         if obj is None:
       
   216             return '<Finalize object, dead>'
       
   217 
       
   218         x = '<Finalize object, callback=%s' % \
       
   219             getattr(self._callback, '__name__', self._callback)
       
   220         if self._args:
       
   221             x += ', args=' + str(self._args)
       
   222         if self._kwargs:
       
   223             x += ', kwargs=' + str(self._kwargs)
       
   224         if self._key[0] is not None:
       
   225             x += ', exitprority=' + str(self._key[0])
       
   226         return x + '>'
       
   227 
       
   228 
       
   229 def _run_finalizers(minpriority=None):
       
   230     '''
       
   231     Run all finalizers whose exit priority is not None and at least minpriority
       
   232 
       
   233     Finalizers with highest priority are called first; finalizers with
       
   234     the same priority will be called in reverse order of creation.
       
   235     '''
       
   236     if minpriority is None:
       
   237         f = lambda p : p[0][0] is not None
       
   238     else:
       
   239         f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
       
   240 
       
   241     items = [x for x in _finalizer_registry.items() if f(x)]
       
   242     items.sort(reverse=True)
       
   243 
       
   244     for key, finalizer in items:
       
   245         sub_debug('calling %s', finalizer)
       
   246         try:
       
   247             finalizer()
       
   248         except Exception:
       
   249             import traceback
       
   250             traceback.print_exc()
       
   251 
       
   252     if minpriority is None:
       
   253         _finalizer_registry.clear()
       
   254 
       
   255 #
       
   256 # Clean up on exit
       
   257 #
       
   258 
       
   259 def is_exiting():
       
   260     '''
       
   261     Returns true if the process is shutting down
       
   262     '''
       
   263     return _exiting or _exiting is None
       
   264 
       
   265 _exiting = False
       
   266 
       
   267 def _exit_function():
       
   268     global _exiting
       
   269 
       
   270     info('process shutting down')
       
   271     debug('running all "atexit" finalizers with priority >= 0')
       
   272     _run_finalizers(0)
       
   273 
       
   274     for p in active_children():
       
   275         if p._daemonic:
       
   276             info('calling terminate() for daemon %s', p.name)
       
   277             p._popen.terminate()
       
   278 
       
   279     for p in active_children():
       
   280         info('calling join() for process %s', p.name)
       
   281         p.join()
       
   282 
       
   283     debug('running the remaining "atexit" finalizers')
       
   284     _run_finalizers()
       
   285 
       
   286 atexit.register(_exit_function)
       
   287 
       
   288 #
       
   289 # Some fork aware types
       
   290 #
       
   291 
       
   292 class ForkAwareThreadLock(object):
       
   293     def __init__(self):
       
   294         self._lock = threading.Lock()
       
   295         self.acquire = self._lock.acquire
       
   296         self.release = self._lock.release
       
   297         register_after_fork(self, ForkAwareThreadLock.__init__)
       
   298 
       
   299 class ForkAwareLocal(threading.local):
       
   300     def __init__(self):
       
   301         register_after_fork(self, lambda obj : obj.__dict__.clear())
       
   302     def __reduce__(self):
       
   303         return type(self), ()