|
1 # |
|
2 # Module providing various facilities to other parts of the package |
|
3 # |
|
4 # multiprocessing/util.py |
|
5 # |
|
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt |
|
7 # |
|
8 |
|
9 import itertools |
|
10 import weakref |
|
11 import atexit |
|
12 import threading # we want threading to install it's |
|
13 # cleanup function before multiprocessing does |
|
14 |
|
15 from multiprocessing.process import current_process, active_children |
|
16 |
|
17 __all__ = [ |
|
18 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', |
|
19 'log_to_stderr', 'get_temp_dir', 'register_after_fork', |
|
20 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal' |
|
21 ] |
|
22 |
|
23 # |
|
24 # Logging |
|
25 # |
|
26 |
|
27 NOTSET = 0 |
|
28 SUBDEBUG = 5 |
|
29 DEBUG = 10 |
|
30 INFO = 20 |
|
31 SUBWARNING = 25 |
|
32 |
|
33 LOGGER_NAME = 'multiprocessing' |
|
34 DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' |
|
35 |
|
36 _logger = None |
|
37 _log_to_stderr = False |
|
38 |
|
39 def sub_debug(msg, *args): |
|
40 if _logger: |
|
41 _logger.log(SUBDEBUG, msg, *args) |
|
42 |
|
43 def debug(msg, *args): |
|
44 if _logger: |
|
45 _logger.log(DEBUG, msg, *args) |
|
46 |
|
47 def info(msg, *args): |
|
48 if _logger: |
|
49 _logger.log(INFO, msg, *args) |
|
50 |
|
51 def sub_warning(msg, *args): |
|
52 if _logger: |
|
53 _logger.log(SUBWARNING, msg, *args) |
|
54 |
|
55 def get_logger(): |
|
56 ''' |
|
57 Returns logger used by multiprocessing |
|
58 ''' |
|
59 global _logger |
|
60 |
|
61 if not _logger: |
|
62 import logging, atexit |
|
63 |
|
64 # XXX multiprocessing should cleanup before logging |
|
65 if hasattr(atexit, 'unregister'): |
|
66 atexit.unregister(_exit_function) |
|
67 atexit.register(_exit_function) |
|
68 else: |
|
69 atexit._exithandlers.remove((_exit_function, (), {})) |
|
70 atexit._exithandlers.append((_exit_function, (), {})) |
|
71 |
|
72 _check_logger_class() |
|
73 _logger = logging.getLogger(LOGGER_NAME) |
|
74 |
|
75 return _logger |
|
76 |
|
77 def _check_logger_class(): |
|
78 ''' |
|
79 Make sure process name is recorded when loggers are used |
|
80 ''' |
|
81 # XXX This function is unnecessary once logging is patched |
|
82 import logging |
|
83 if hasattr(logging, 'multiprocessing'): |
|
84 return |
|
85 |
|
86 logging._acquireLock() |
|
87 try: |
|
88 OldLoggerClass = logging.getLoggerClass() |
|
89 if not getattr(OldLoggerClass, '_process_aware', False): |
|
90 class ProcessAwareLogger(OldLoggerClass): |
|
91 _process_aware = True |
|
92 def makeRecord(self, *args, **kwds): |
|
93 record = OldLoggerClass.makeRecord(self, *args, **kwds) |
|
94 record.processName = current_process()._name |
|
95 return record |
|
96 logging.setLoggerClass(ProcessAwareLogger) |
|
97 finally: |
|
98 logging._releaseLock() |
|
99 |
|
100 def log_to_stderr(level=None): |
|
101 ''' |
|
102 Turn on logging and add a handler which prints to stderr |
|
103 ''' |
|
104 global _log_to_stderr |
|
105 import logging |
|
106 logger = get_logger() |
|
107 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) |
|
108 handler = logging.StreamHandler() |
|
109 handler.setFormatter(formatter) |
|
110 logger.addHandler(handler) |
|
111 if level is not None: |
|
112 logger.setLevel(level) |
|
113 _log_to_stderr = True |
|
114 |
|
115 # |
|
116 # Function returning a temp directory which will be removed on exit |
|
117 # |
|
118 |
|
119 def get_temp_dir(): |
|
120 # get name of a temp directory which will be automatically cleaned up |
|
121 if current_process()._tempdir is None: |
|
122 import shutil, tempfile |
|
123 tempdir = tempfile.mkdtemp(prefix='pymp-') |
|
124 info('created temp directory %s', tempdir) |
|
125 Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) |
|
126 current_process()._tempdir = tempdir |
|
127 return current_process()._tempdir |
|
128 |
|
129 # |
|
130 # Support for reinitialization of objects when bootstrapping a child process |
|
131 # |
|
132 |
|
133 _afterfork_registry = weakref.WeakValueDictionary() |
|
134 _afterfork_counter = itertools.count() |
|
135 |
|
136 def _run_after_forkers(): |
|
137 items = list(_afterfork_registry.items()) |
|
138 items.sort() |
|
139 for (index, ident, func), obj in items: |
|
140 try: |
|
141 func(obj) |
|
142 except Exception, e: |
|
143 info('after forker raised exception %s', e) |
|
144 |
|
145 def register_after_fork(obj, func): |
|
146 _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj |
|
147 |
|
148 # |
|
149 # Finalization using weakrefs |
|
150 # |
|
151 |
|
152 _finalizer_registry = {} |
|
153 _finalizer_counter = itertools.count() |
|
154 |
|
155 |
|
156 class Finalize(object): |
|
157 ''' |
|
158 Class which supports object finalization using weakrefs |
|
159 ''' |
|
160 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): |
|
161 assert exitpriority is None or type(exitpriority) is int |
|
162 |
|
163 if obj is not None: |
|
164 self._weakref = weakref.ref(obj, self) |
|
165 else: |
|
166 assert exitpriority is not None |
|
167 |
|
168 self._callback = callback |
|
169 self._args = args |
|
170 self._kwargs = kwargs or {} |
|
171 self._key = (exitpriority, _finalizer_counter.next()) |
|
172 |
|
173 _finalizer_registry[self._key] = self |
|
174 |
|
175 def __call__(self, wr=None): |
|
176 ''' |
|
177 Run the callback unless it has already been called or cancelled |
|
178 ''' |
|
179 try: |
|
180 del _finalizer_registry[self._key] |
|
181 except KeyError: |
|
182 sub_debug('finalizer no longer registered') |
|
183 else: |
|
184 sub_debug('finalizer calling %s with args %s and kwargs %s', |
|
185 self._callback, self._args, self._kwargs) |
|
186 res = self._callback(*self._args, **self._kwargs) |
|
187 self._weakref = self._callback = self._args = \ |
|
188 self._kwargs = self._key = None |
|
189 return res |
|
190 |
|
191 def cancel(self): |
|
192 ''' |
|
193 Cancel finalization of the object |
|
194 ''' |
|
195 try: |
|
196 del _finalizer_registry[self._key] |
|
197 except KeyError: |
|
198 pass |
|
199 else: |
|
200 self._weakref = self._callback = self._args = \ |
|
201 self._kwargs = self._key = None |
|
202 |
|
203 def still_active(self): |
|
204 ''' |
|
205 Return whether this finalizer is still waiting to invoke callback |
|
206 ''' |
|
207 return self._key in _finalizer_registry |
|
208 |
|
209 def __repr__(self): |
|
210 try: |
|
211 obj = self._weakref() |
|
212 except (AttributeError, TypeError): |
|
213 obj = None |
|
214 |
|
215 if obj is None: |
|
216 return '<Finalize object, dead>' |
|
217 |
|
218 x = '<Finalize object, callback=%s' % \ |
|
219 getattr(self._callback, '__name__', self._callback) |
|
220 if self._args: |
|
221 x += ', args=' + str(self._args) |
|
222 if self._kwargs: |
|
223 x += ', kwargs=' + str(self._kwargs) |
|
224 if self._key[0] is not None: |
|
225 x += ', exitprority=' + str(self._key[0]) |
|
226 return x + '>' |
|
227 |
|
228 |
|
229 def _run_finalizers(minpriority=None): |
|
230 ''' |
|
231 Run all finalizers whose exit priority is not None and at least minpriority |
|
232 |
|
233 Finalizers with highest priority are called first; finalizers with |
|
234 the same priority will be called in reverse order of creation. |
|
235 ''' |
|
236 if minpriority is None: |
|
237 f = lambda p : p[0][0] is not None |
|
238 else: |
|
239 f = lambda p : p[0][0] is not None and p[0][0] >= minpriority |
|
240 |
|
241 items = [x for x in _finalizer_registry.items() if f(x)] |
|
242 items.sort(reverse=True) |
|
243 |
|
244 for key, finalizer in items: |
|
245 sub_debug('calling %s', finalizer) |
|
246 try: |
|
247 finalizer() |
|
248 except Exception: |
|
249 import traceback |
|
250 traceback.print_exc() |
|
251 |
|
252 if minpriority is None: |
|
253 _finalizer_registry.clear() |
|
254 |
|
255 # |
|
256 # Clean up on exit |
|
257 # |
|
258 |
|
259 def is_exiting(): |
|
260 ''' |
|
261 Returns true if the process is shutting down |
|
262 ''' |
|
263 return _exiting or _exiting is None |
|
264 |
|
265 _exiting = False |
|
266 |
|
267 def _exit_function(): |
|
268 global _exiting |
|
269 |
|
270 info('process shutting down') |
|
271 debug('running all "atexit" finalizers with priority >= 0') |
|
272 _run_finalizers(0) |
|
273 |
|
274 for p in active_children(): |
|
275 if p._daemonic: |
|
276 info('calling terminate() for daemon %s', p.name) |
|
277 p._popen.terminate() |
|
278 |
|
279 for p in active_children(): |
|
280 info('calling join() for process %s', p.name) |
|
281 p.join() |
|
282 |
|
283 debug('running the remaining "atexit" finalizers') |
|
284 _run_finalizers() |
|
285 |
|
286 atexit.register(_exit_function) |
|
287 |
|
288 # |
|
289 # Some fork aware types |
|
290 # |
|
291 |
|
292 class ForkAwareThreadLock(object): |
|
293 def __init__(self): |
|
294 self._lock = threading.Lock() |
|
295 self.acquire = self._lock.acquire |
|
296 self.release = self._lock.release |
|
297 register_after_fork(self, ForkAwareThreadLock.__init__) |
|
298 |
|
299 class ForkAwareLocal(threading.local): |
|
300 def __init__(self): |
|
301 register_after_fork(self, lambda obj : obj.__dict__.clear()) |
|
302 def __reduce__(self): |
|
303 return type(self), () |