symbian-qemu-0.9.1-12/python-2.6.1/Lib/multiprocessing/reduction.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #
       
     2 # Module to allow connection and socket objects to be transferred
       
     3 # between processes
       
     4 #
       
     5 # multiprocessing/reduction.py
       
     6 #
       
     7 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
       
     8 #
       
     9 
       
    10 __all__ = []
       
    11 
       
    12 import os
       
    13 import sys
       
    14 import socket
       
    15 import threading
       
    16 
       
    17 import _multiprocessing
       
    18 from multiprocessing import current_process
       
    19 from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
       
    20 from multiprocessing.util import register_after_fork, debug, sub_debug
       
    21 from multiprocessing.connection import Client, Listener
       
    22 
       
    23 
       
    24 #
       
    25 #
       
    26 #
       
    27 
       
    28 if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
       
    29     raise ImportError('pickling of connections not supported')
       
    30 
       
    31 #
       
    32 # Platform specific definitions
       
    33 #
       
    34 
       
    35 if sys.platform == 'win32':
       
    36     import _subprocess
       
    37     from ._multiprocessing import win32
       
    38 
       
    39     def send_handle(conn, handle, destination_pid):
       
    40         process_handle = win32.OpenProcess(
       
    41             win32.PROCESS_ALL_ACCESS, False, destination_pid
       
    42             )
       
    43         try:
       
    44             new_handle = duplicate(handle, process_handle)
       
    45             conn.send(new_handle)
       
    46         finally:
       
    47             close(process_handle)
       
    48 
       
    49     def recv_handle(conn):
       
    50         return conn.recv()
       
    51 
       
    52 else:
       
    53     def send_handle(conn, handle, destination_pid):
       
    54         _multiprocessing.sendfd(conn.fileno(), handle)
       
    55 
       
    56     def recv_handle(conn):
       
    57         return _multiprocessing.recvfd(conn.fileno())
       
    58 
       
    59 #
       
    60 # Support for a per-process server thread which caches pickled handles
       
    61 #
       
    62 
       
    63 _cache = set()
       
    64 
       
    65 def _reset(obj):
       
    66     global _lock, _listener, _cache
       
    67     for h in _cache:
       
    68         close(h)
       
    69     _cache.clear()
       
    70     _lock = threading.Lock()
       
    71     _listener = None
       
    72 
       
    73 _reset(None)
       
    74 register_after_fork(_reset, _reset)
       
    75 
       
    76 def _get_listener():
       
    77     global _listener
       
    78 
       
    79     if _listener is None:
       
    80         _lock.acquire()
       
    81         try:
       
    82             if _listener is None:
       
    83                 debug('starting listener and thread for sending handles')
       
    84                 _listener = Listener(authkey=current_process().authkey)
       
    85                 t = threading.Thread(target=_serve)
       
    86                 t.daemon = True
       
    87                 t.start()
       
    88         finally:
       
    89             _lock.release()
       
    90 
       
    91     return _listener
       
    92 
       
    93 def _serve():
       
    94     from .util import is_exiting, sub_warning
       
    95 
       
    96     while 1:
       
    97         try:
       
    98             conn = _listener.accept()
       
    99             handle_wanted, destination_pid = conn.recv()
       
   100             _cache.remove(handle_wanted)
       
   101             send_handle(conn, handle_wanted, destination_pid)
       
   102             close(handle_wanted)
       
   103             conn.close()
       
   104         except:
       
   105             if not is_exiting():
       
   106                 import traceback
       
   107                 sub_warning(
       
   108                     'thread for sharing handles raised exception :\n' +
       
   109                     '-'*79 + '\n' + traceback.format_exc() + '-'*79
       
   110                     )
       
   111 
       
   112 #
       
   113 # Functions to be used for pickling/unpickling objects with handles
       
   114 #
       
   115 
       
   116 def reduce_handle(handle):
       
   117     if Popen.thread_is_spawning():
       
   118         return (None, Popen.duplicate_for_child(handle), True)
       
   119     dup_handle = duplicate(handle)
       
   120     _cache.add(dup_handle)
       
   121     sub_debug('reducing handle %d', handle)
       
   122     return (_get_listener().address, dup_handle, False)
       
   123 
       
   124 def rebuild_handle(pickled_data):
       
   125     address, handle, inherited = pickled_data
       
   126     if inherited:
       
   127         return handle
       
   128     sub_debug('rebuilding handle %d', handle)
       
   129     conn = Client(address, authkey=current_process().authkey)
       
   130     conn.send((handle, os.getpid()))
       
   131     new_handle = recv_handle(conn)
       
   132     conn.close()
       
   133     return new_handle
       
   134 
       
   135 #
       
   136 # Register `_multiprocessing.Connection` with `ForkingPickler`
       
   137 #
       
   138 
       
   139 def reduce_connection(conn):
       
   140     rh = reduce_handle(conn.fileno())
       
   141     return rebuild_connection, (rh, conn.readable, conn.writable)
       
   142 
       
   143 def rebuild_connection(reduced_handle, readable, writable):
       
   144     handle = rebuild_handle(reduced_handle)
       
   145     return _multiprocessing.Connection(
       
   146         handle, readable=readable, writable=writable
       
   147         )
       
   148 
       
   149 ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
       
   150 
       
   151 #
       
   152 # Register `socket.socket` with `ForkingPickler`
       
   153 #
       
   154 
       
   155 def fromfd(fd, family, type_, proto=0):
       
   156     s = socket.fromfd(fd, family, type_, proto)
       
   157     if s.__class__ is not socket.socket:
       
   158         s = socket.socket(_sock=s)
       
   159     return s
       
   160 
       
   161 def reduce_socket(s):
       
   162     reduced_handle = reduce_handle(s.fileno())
       
   163     return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
       
   164 
       
   165 def rebuild_socket(reduced_handle, family, type_, proto):
       
   166     fd = rebuild_handle(reduced_handle)
       
   167     _sock = fromfd(fd, family, type_, proto)
       
   168     close(fd)
       
   169     return _sock
       
   170 
       
   171 ForkingPickler.register(socket.socket, reduce_socket)
       
   172 
       
   173 #
       
   174 # Register `_multiprocessing.PipeConnection` with `ForkingPickler`
       
   175 #
       
   176 
       
   177 if sys.platform == 'win32':
       
   178 
       
   179     def reduce_pipe_connection(conn):
       
   180         rh = reduce_handle(conn.fileno())
       
   181         return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
       
   182 
       
   183     def rebuild_pipe_connection(reduced_handle, readable, writable):
       
   184         handle = rebuild_handle(reduced_handle)
       
   185         return _multiprocessing.PipeConnection(
       
   186             handle, readable=readable, writable=writable
       
   187             )
       
   188 
       
   189     ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)