|
1 # |
|
2 # Module for starting a process object using os.fork() or CreateProcess() |
|
3 # |
|
4 # multiprocessing/forking.py |
|
5 # |
|
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt |
|
7 # |
|
8 |
|
9 import os |
|
10 import sys |
|
11 import signal |
|
12 |
|
13 from multiprocessing import util, process |
|
14 |
|
15 __all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler'] |
|
16 |
|
17 # |
|
18 # Check that the current thread is spawning a child process |
|
19 # |
|
20 |
|
21 def assert_spawning(self): |
|
22 if not Popen.thread_is_spawning(): |
|
23 raise RuntimeError( |
|
24 '%s objects should only be shared between processes' |
|
25 ' through inheritance' % type(self).__name__ |
|
26 ) |
|
27 |
|
28 # |
|
29 # Try making some callable types picklable |
|
30 # |
|
31 |
|
32 from pickle import Pickler |
|
33 class ForkingPickler(Pickler): |
|
34 dispatch = Pickler.dispatch.copy() |
|
35 |
|
36 @classmethod |
|
37 def register(cls, type, reduce): |
|
38 def dispatcher(self, obj): |
|
39 rv = reduce(obj) |
|
40 self.save_reduce(obj=obj, *rv) |
|
41 cls.dispatch[type] = dispatcher |
|
42 |
|
43 def _reduce_method(m): |
|
44 if m.im_self is None: |
|
45 return getattr, (m.im_class, m.im_func.func_name) |
|
46 else: |
|
47 return getattr, (m.im_self, m.im_func.func_name) |
|
48 ForkingPickler.register(type(ForkingPickler.save), _reduce_method) |
|
49 |
|
50 def _reduce_method_descriptor(m): |
|
51 return getattr, (m.__objclass__, m.__name__) |
|
52 ForkingPickler.register(type(list.append), _reduce_method_descriptor) |
|
53 ForkingPickler.register(type(int.__add__), _reduce_method_descriptor) |
|
54 |
|
55 #def _reduce_builtin_function_or_method(m): |
|
56 # return getattr, (m.__self__, m.__name__) |
|
57 #ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method) |
|
58 #ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method) |
|
59 |
|
60 try: |
|
61 from functools import partial |
|
62 except ImportError: |
|
63 pass |
|
64 else: |
|
65 def _reduce_partial(p): |
|
66 return _rebuild_partial, (p.func, p.args, p.keywords or {}) |
|
67 def _rebuild_partial(func, args, keywords): |
|
68 return partial(func, *args, **keywords) |
|
69 ForkingPickler.register(partial, _reduce_partial) |
|
70 |
|
71 # |
|
72 # Unix |
|
73 # |
|
74 |
|
75 if sys.platform != 'win32': |
|
76 import time |
|
77 |
|
78 exit = os._exit |
|
79 duplicate = os.dup |
|
80 close = os.close |
|
81 |
|
82 # |
|
83 # We define a Popen class similar to the one from subprocess, but |
|
84 # whose constructor takes a process object as its argument. |
|
85 # |
|
86 |
|
87 class Popen(object): |
|
88 |
|
89 def __init__(self, process_obj): |
|
90 sys.stdout.flush() |
|
91 sys.stderr.flush() |
|
92 self.returncode = None |
|
93 |
|
94 self.pid = os.fork() |
|
95 if self.pid == 0: |
|
96 if 'random' in sys.modules: |
|
97 import random |
|
98 random.seed() |
|
99 code = process_obj._bootstrap() |
|
100 sys.stdout.flush() |
|
101 sys.stderr.flush() |
|
102 os._exit(code) |
|
103 |
|
104 def poll(self, flag=os.WNOHANG): |
|
105 if self.returncode is None: |
|
106 pid, sts = os.waitpid(self.pid, flag) |
|
107 if pid == self.pid: |
|
108 if os.WIFSIGNALED(sts): |
|
109 self.returncode = -os.WTERMSIG(sts) |
|
110 else: |
|
111 assert os.WIFEXITED(sts) |
|
112 self.returncode = os.WEXITSTATUS(sts) |
|
113 return self.returncode |
|
114 |
|
115 def wait(self, timeout=None): |
|
116 if timeout is None: |
|
117 return self.poll(0) |
|
118 deadline = time.time() + timeout |
|
119 delay = 0.0005 |
|
120 while 1: |
|
121 res = self.poll() |
|
122 if res is not None: |
|
123 break |
|
124 remaining = deadline - time.time() |
|
125 if remaining <= 0: |
|
126 break |
|
127 delay = min(delay * 2, remaining, 0.05) |
|
128 time.sleep(delay) |
|
129 return res |
|
130 |
|
131 def terminate(self): |
|
132 if self.returncode is None: |
|
133 try: |
|
134 os.kill(self.pid, signal.SIGTERM) |
|
135 except OSError, e: |
|
136 if self.wait(timeout=0.1) is None: |
|
137 raise |
|
138 |
|
139 @staticmethod |
|
140 def thread_is_spawning(): |
|
141 return False |
|
142 |
|
143 # |
|
144 # Windows |
|
145 # |
|
146 |
|
147 else: |
|
148 import thread |
|
149 import msvcrt |
|
150 import _subprocess |
|
151 import time |
|
152 |
|
153 from ._multiprocessing import win32, Connection, PipeConnection |
|
154 from .util import Finalize |
|
155 |
|
156 #try: |
|
157 # from cPickle import dump, load, HIGHEST_PROTOCOL |
|
158 #except ImportError: |
|
159 from pickle import load, HIGHEST_PROTOCOL |
|
160 |
|
161 def dump(obj, file, protocol=None): |
|
162 ForkingPickler(file, protocol).dump(obj) |
|
163 |
|
164 # |
|
165 # |
|
166 # |
|
167 |
|
168 TERMINATE = 0x10000 |
|
169 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) |
|
170 |
|
171 exit = win32.ExitProcess |
|
172 close = win32.CloseHandle |
|
173 |
|
174 # |
|
175 # _python_exe is the assumed path to the python executable. |
|
176 # People embedding Python want to modify it. |
|
177 # |
|
178 |
|
179 if sys.executable.lower().endswith('pythonservice.exe'): |
|
180 _python_exe = os.path.join(sys.exec_prefix, 'python.exe') |
|
181 else: |
|
182 _python_exe = sys.executable |
|
183 |
|
184 def set_executable(exe): |
|
185 global _python_exe |
|
186 _python_exe = exe |
|
187 |
|
188 # |
|
189 # |
|
190 # |
|
191 |
|
192 def duplicate(handle, target_process=None, inheritable=False): |
|
193 if target_process is None: |
|
194 target_process = _subprocess.GetCurrentProcess() |
|
195 return _subprocess.DuplicateHandle( |
|
196 _subprocess.GetCurrentProcess(), handle, target_process, |
|
197 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS |
|
198 ).Detach() |
|
199 |
|
200 # |
|
201 # We define a Popen class similar to the one from subprocess, but |
|
202 # whose constructor takes a process object as its argument. |
|
203 # |
|
204 |
|
205 class Popen(object): |
|
206 ''' |
|
207 Start a subprocess to run the code of a process object |
|
208 ''' |
|
209 _tls = thread._local() |
|
210 |
|
211 def __init__(self, process_obj): |
|
212 # create pipe for communication with child |
|
213 rfd, wfd = os.pipe() |
|
214 |
|
215 # get handle for read end of the pipe and make it inheritable |
|
216 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True) |
|
217 os.close(rfd) |
|
218 |
|
219 # start process |
|
220 cmd = get_command_line() + [rhandle] |
|
221 cmd = ' '.join('"%s"' % x for x in cmd) |
|
222 hp, ht, pid, tid = _subprocess.CreateProcess( |
|
223 _python_exe, cmd, None, None, 1, 0, None, None, None |
|
224 ) |
|
225 ht.Close() |
|
226 close(rhandle) |
|
227 |
|
228 # set attributes of self |
|
229 self.pid = pid |
|
230 self.returncode = None |
|
231 self._handle = hp |
|
232 |
|
233 # send information to child |
|
234 prep_data = get_preparation_data(process_obj._name) |
|
235 to_child = os.fdopen(wfd, 'wb') |
|
236 Popen._tls.process_handle = int(hp) |
|
237 try: |
|
238 dump(prep_data, to_child, HIGHEST_PROTOCOL) |
|
239 dump(process_obj, to_child, HIGHEST_PROTOCOL) |
|
240 finally: |
|
241 del Popen._tls.process_handle |
|
242 to_child.close() |
|
243 |
|
244 @staticmethod |
|
245 def thread_is_spawning(): |
|
246 return getattr(Popen._tls, 'process_handle', None) is not None |
|
247 |
|
248 @staticmethod |
|
249 def duplicate_for_child(handle): |
|
250 return duplicate(handle, Popen._tls.process_handle) |
|
251 |
|
252 def wait(self, timeout=None): |
|
253 if self.returncode is None: |
|
254 if timeout is None: |
|
255 msecs = _subprocess.INFINITE |
|
256 else: |
|
257 msecs = max(0, int(timeout * 1000 + 0.5)) |
|
258 |
|
259 res = _subprocess.WaitForSingleObject(int(self._handle), msecs) |
|
260 if res == _subprocess.WAIT_OBJECT_0: |
|
261 code = _subprocess.GetExitCodeProcess(self._handle) |
|
262 if code == TERMINATE: |
|
263 code = -signal.SIGTERM |
|
264 self.returncode = code |
|
265 |
|
266 return self.returncode |
|
267 |
|
268 def poll(self): |
|
269 return self.wait(timeout=0) |
|
270 |
|
271 def terminate(self): |
|
272 if self.returncode is None: |
|
273 try: |
|
274 _subprocess.TerminateProcess(int(self._handle), TERMINATE) |
|
275 except WindowsError: |
|
276 if self.wait(timeout=0.1) is None: |
|
277 raise |
|
278 |
|
279 # |
|
280 # |
|
281 # |
|
282 |
|
283 def is_forking(argv): |
|
284 ''' |
|
285 Return whether commandline indicates we are forking |
|
286 ''' |
|
287 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork': |
|
288 assert len(argv) == 3 |
|
289 return True |
|
290 else: |
|
291 return False |
|
292 |
|
293 |
|
294 def freeze_support(): |
|
295 ''' |
|
296 Run code for process object if this in not the main process |
|
297 ''' |
|
298 if is_forking(sys.argv): |
|
299 main() |
|
300 sys.exit() |
|
301 |
|
302 |
|
303 def get_command_line(): |
|
304 ''' |
|
305 Returns prefix of command line used for spawning a child process |
|
306 ''' |
|
307 if process.current_process()._identity==() and is_forking(sys.argv): |
|
308 raise RuntimeError(''' |
|
309 Attempt to start a new process before the current process |
|
310 has finished its bootstrapping phase. |
|
311 |
|
312 This probably means that you are on Windows and you have |
|
313 forgotten to use the proper idiom in the main module: |
|
314 |
|
315 if __name__ == '__main__': |
|
316 freeze_support() |
|
317 ... |
|
318 |
|
319 The "freeze_support()" line can be omitted if the program |
|
320 is not going to be frozen to produce a Windows executable.''') |
|
321 |
|
322 if getattr(sys, 'frozen', False): |
|
323 return [sys.executable, '--multiprocessing-fork'] |
|
324 else: |
|
325 prog = 'from multiprocessing.forking import main; main()' |
|
326 return [_python_exe, '-c', prog, '--multiprocessing-fork'] |
|
327 |
|
328 |
|
329 def main(): |
|
330 ''' |
|
331 Run code specifed by data received over pipe |
|
332 ''' |
|
333 assert is_forking(sys.argv) |
|
334 |
|
335 handle = int(sys.argv[-1]) |
|
336 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) |
|
337 from_parent = os.fdopen(fd, 'rb') |
|
338 |
|
339 process.current_process()._inheriting = True |
|
340 preparation_data = load(from_parent) |
|
341 prepare(preparation_data) |
|
342 self = load(from_parent) |
|
343 process.current_process()._inheriting = False |
|
344 |
|
345 from_parent.close() |
|
346 |
|
347 exitcode = self._bootstrap() |
|
348 exit(exitcode) |
|
349 |
|
350 |
|
351 def get_preparation_data(name): |
|
352 ''' |
|
353 Return info about parent needed by child to unpickle process object |
|
354 ''' |
|
355 from .util import _logger, _log_to_stderr |
|
356 |
|
357 d = dict( |
|
358 name=name, |
|
359 sys_path=sys.path, |
|
360 sys_argv=sys.argv, |
|
361 log_to_stderr=_log_to_stderr, |
|
362 orig_dir=process.ORIGINAL_DIR, |
|
363 authkey=process.current_process().authkey, |
|
364 ) |
|
365 |
|
366 if _logger is not None: |
|
367 d['log_level'] = _logger.getEffectiveLevel() |
|
368 |
|
369 if not WINEXE: |
|
370 main_path = getattr(sys.modules['__main__'], '__file__', None) |
|
371 if not main_path and sys.argv[0] not in ('', '-c'): |
|
372 main_path = sys.argv[0] |
|
373 if main_path is not None: |
|
374 if not os.path.isabs(main_path) and \ |
|
375 process.ORIGINAL_DIR is not None: |
|
376 main_path = os.path.join(process.ORIGINAL_DIR, main_path) |
|
377 d['main_path'] = os.path.normpath(main_path) |
|
378 |
|
379 return d |
|
380 |
|
381 # |
|
382 # Make (Pipe)Connection picklable |
|
383 # |
|
384 |
|
385 def reduce_connection(conn): |
|
386 if not Popen.thread_is_spawning(): |
|
387 raise RuntimeError( |
|
388 'By default %s objects can only be shared between processes\n' |
|
389 'using inheritance' % type(conn).__name__ |
|
390 ) |
|
391 return type(conn), (Popen.duplicate_for_child(conn.fileno()), |
|
392 conn.readable, conn.writable) |
|
393 |
|
394 ForkingPickler.register(Connection, reduce_connection) |
|
395 ForkingPickler.register(PipeConnection, reduce_connection) |
|
396 |
|
397 # |
|
398 # Prepare current process |
|
399 # |
|
400 |
|
401 old_main_modules = [] |
|
402 |
|
403 def prepare(data): |
|
404 ''' |
|
405 Try to get current process ready to unpickle process object |
|
406 ''' |
|
407 old_main_modules.append(sys.modules['__main__']) |
|
408 |
|
409 if 'name' in data: |
|
410 process.current_process().name = data['name'] |
|
411 |
|
412 if 'authkey' in data: |
|
413 process.current_process()._authkey = data['authkey'] |
|
414 |
|
415 if 'log_to_stderr' in data and data['log_to_stderr']: |
|
416 util.log_to_stderr() |
|
417 |
|
418 if 'log_level' in data: |
|
419 util.get_logger().setLevel(data['log_level']) |
|
420 |
|
421 if 'sys_path' in data: |
|
422 sys.path = data['sys_path'] |
|
423 |
|
424 if 'sys_argv' in data: |
|
425 sys.argv = data['sys_argv'] |
|
426 |
|
427 if 'dir' in data: |
|
428 os.chdir(data['dir']) |
|
429 |
|
430 if 'orig_dir' in data: |
|
431 process.ORIGINAL_DIR = data['orig_dir'] |
|
432 |
|
433 if 'main_path' in data: |
|
434 main_path = data['main_path'] |
|
435 main_name = os.path.splitext(os.path.basename(main_path))[0] |
|
436 if main_name == '__init__': |
|
437 main_name = os.path.basename(os.path.dirname(main_path)) |
|
438 |
|
439 if main_name != 'ipython': |
|
440 import imp |
|
441 |
|
442 if main_path is None: |
|
443 dirs = None |
|
444 elif os.path.basename(main_path).startswith('__init__.py'): |
|
445 dirs = [os.path.dirname(os.path.dirname(main_path))] |
|
446 else: |
|
447 dirs = [os.path.dirname(main_path)] |
|
448 |
|
449 assert main_name not in sys.modules, main_name |
|
450 file, path_name, etc = imp.find_module(main_name, dirs) |
|
451 try: |
|
452 # We would like to do "imp.load_module('__main__', ...)" |
|
453 # here. However, that would cause 'if __name__ == |
|
454 # "__main__"' clauses to be executed. |
|
455 main_module = imp.load_module( |
|
456 '__parents_main__', file, path_name, etc |
|
457 ) |
|
458 finally: |
|
459 if file: |
|
460 file.close() |
|
461 |
|
462 sys.modules['__main__'] = main_module |
|
463 main_module.__name__ = '__main__' |
|
464 |
|
465 # Try to make the potentially picklable objects in |
|
466 # sys.modules['__main__'] realize they are in the main |
|
467 # module -- somewhat ugly. |
|
468 for obj in main_module.__dict__.values(): |
|
469 try: |
|
470 if obj.__module__ == '__parents_main__': |
|
471 obj.__module__ = '__main__' |
|
472 except Exception: |
|
473 pass |