symbian-qemu-0.9.1-12/python-win32-2.6.1/lib/multiprocessing/connection.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #
       
     2 # A higher level module for using sockets (or Windows named pipes)
       
     3 #
       
     4 # multiprocessing/connection.py
       
     5 #
       
     6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
       
     7 #
       
     8 
       
     9 __all__ = [ 'Client', 'Listener', 'Pipe' ]
       
    10 
       
    11 import os
       
    12 import sys
       
    13 import socket
       
    14 import errno
       
    15 import time
       
    16 import tempfile
       
    17 import itertools
       
    18 
       
    19 import _multiprocessing
       
    20 from multiprocessing import current_process, AuthenticationError
       
    21 from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
       
    22 from multiprocessing.forking import duplicate, close
       
    23 
       
    24 
       
    25 #
       
    26 #
       
    27 #
       
    28 
       
    29 BUFSIZE = 8192
       
    30 
       
    31 _mmap_counter = itertools.count()
       
    32 
       
    33 default_family = 'AF_INET'
       
    34 families = ['AF_INET']
       
    35 
       
    36 if hasattr(socket, 'AF_UNIX'):
       
    37     default_family = 'AF_UNIX'
       
    38     families += ['AF_UNIX']
       
    39 
       
    40 if sys.platform == 'win32':
       
    41     default_family = 'AF_PIPE'
       
    42     families += ['AF_PIPE']
       
    43 
       
    44 #
       
    45 #
       
    46 #
       
    47 
       
    48 def arbitrary_address(family):
       
    49     '''
       
    50     Return an arbitrary free address for the given family
       
    51     '''
       
    52     if family == 'AF_INET':
       
    53         return ('localhost', 0)
       
    54     elif family == 'AF_UNIX':
       
    55         return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
       
    56     elif family == 'AF_PIPE':
       
    57         return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
       
    58                                (os.getpid(), _mmap_counter.next()))
       
    59     else:
       
    60         raise ValueError('unrecognized family')
       
    61 
       
    62 
       
    63 def address_type(address):
       
    64     '''
       
    65     Return the types of the address
       
    66 
       
    67     This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
       
    68     '''
       
    69     if type(address) == tuple:
       
    70         return 'AF_INET'
       
    71     elif type(address) is str and address.startswith('\\\\'):
       
    72         return 'AF_PIPE'
       
    73     elif type(address) is str:
       
    74         return 'AF_UNIX'
       
    75     else:
       
    76         raise ValueError('address type of %r unrecognized' % address)
       
    77 
       
    78 #
       
    79 # Public functions
       
    80 #
       
    81 
       
    82 class Listener(object):
       
    83     '''
       
    84     Returns a listener object.
       
    85 
       
    86     This is a wrapper for a bound socket which is 'listening' for
       
    87     connections, or for a Windows named pipe.
       
    88     '''
       
    89     def __init__(self, address=None, family=None, backlog=1, authkey=None):
       
    90         family = family or (address and address_type(address)) \
       
    91                  or default_family
       
    92         address = address or arbitrary_address(family)
       
    93 
       
    94         if family == 'AF_PIPE':
       
    95             self._listener = PipeListener(address, backlog)
       
    96         else:
       
    97             self._listener = SocketListener(address, family, backlog)
       
    98 
       
    99         if authkey is not None and not isinstance(authkey, bytes):
       
   100             raise TypeError, 'authkey should be a byte string'
       
   101 
       
   102         self._authkey = authkey
       
   103 
       
   104     def accept(self):
       
   105         '''
       
   106         Accept a connection on the bound socket or named pipe of `self`.
       
   107 
       
   108         Returns a `Connection` object.
       
   109         '''
       
   110         c = self._listener.accept()
       
   111         if self._authkey:
       
   112             deliver_challenge(c, self._authkey)
       
   113             answer_challenge(c, self._authkey)
       
   114         return c
       
   115 
       
   116     def close(self):
       
   117         '''
       
   118         Close the bound socket or named pipe of `self`.
       
   119         '''
       
   120         return self._listener.close()
       
   121 
       
   122     address = property(lambda self: self._listener._address)
       
   123     last_accepted = property(lambda self: self._listener._last_accepted)
       
   124 
       
   125 
       
   126 def Client(address, family=None, authkey=None):
       
   127     '''
       
   128     Returns a connection to the address of a `Listener`
       
   129     '''
       
   130     family = family or address_type(address)
       
   131     if family == 'AF_PIPE':
       
   132         c = PipeClient(address)
       
   133     else:
       
   134         c = SocketClient(address)
       
   135 
       
   136     if authkey is not None and not isinstance(authkey, bytes):
       
   137         raise TypeError, 'authkey should be a byte string'
       
   138 
       
   139     if authkey is not None:
       
   140         answer_challenge(c, authkey)
       
   141         deliver_challenge(c, authkey)
       
   142 
       
   143     return c
       
   144 
       
   145 
       
   146 if sys.platform != 'win32':
       
   147 
       
   148     def Pipe(duplex=True):
       
   149         '''
       
   150         Returns pair of connection objects at either end of a pipe
       
   151         '''
       
   152         if duplex:
       
   153             s1, s2 = socket.socketpair()
       
   154             c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
       
   155             c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
       
   156             s1.close()
       
   157             s2.close()
       
   158         else:
       
   159             fd1, fd2 = os.pipe()
       
   160             c1 = _multiprocessing.Connection(fd1, writable=False)
       
   161             c2 = _multiprocessing.Connection(fd2, readable=False)
       
   162 
       
   163         return c1, c2
       
   164 
       
   165 else:
       
   166 
       
   167     from ._multiprocessing import win32
       
   168 
       
   169     def Pipe(duplex=True):
       
   170         '''
       
   171         Returns pair of connection objects at either end of a pipe
       
   172         '''
       
   173         address = arbitrary_address('AF_PIPE')
       
   174         if duplex:
       
   175             openmode = win32.PIPE_ACCESS_DUPLEX
       
   176             access = win32.GENERIC_READ | win32.GENERIC_WRITE
       
   177             obsize, ibsize = BUFSIZE, BUFSIZE
       
   178         else:
       
   179             openmode = win32.PIPE_ACCESS_INBOUND
       
   180             access = win32.GENERIC_WRITE
       
   181             obsize, ibsize = 0, BUFSIZE
       
   182 
       
   183         h1 = win32.CreateNamedPipe(
       
   184             address, openmode,
       
   185             win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
       
   186             win32.PIPE_WAIT,
       
   187             1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
       
   188             )
       
   189         h2 = win32.CreateFile(
       
   190             address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
       
   191             )
       
   192         win32.SetNamedPipeHandleState(
       
   193             h2, win32.PIPE_READMODE_MESSAGE, None, None
       
   194             )
       
   195 
       
   196         try:
       
   197             win32.ConnectNamedPipe(h1, win32.NULL)
       
   198         except WindowsError, e:
       
   199             if e.args[0] != win32.ERROR_PIPE_CONNECTED:
       
   200                 raise
       
   201 
       
   202         c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
       
   203         c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
       
   204 
       
   205         return c1, c2
       
   206 
       
   207 #
       
   208 # Definitions for connections based on sockets
       
   209 #
       
   210 
       
   211 class SocketListener(object):
       
   212     '''
       
   213     Representation of a socket which is bound to an address and listening
       
   214     '''
       
   215     def __init__(self, address, family, backlog=1):
       
   216         self._socket = socket.socket(getattr(socket, family))
       
   217         self._socket.bind(address)
       
   218         self._socket.listen(backlog)
       
   219         self._address = self._socket.getsockname()
       
   220         self._family = family
       
   221         self._last_accepted = None
       
   222 
       
   223         if family == 'AF_UNIX':
       
   224             self._unlink = Finalize(
       
   225                 self, os.unlink, args=(address,), exitpriority=0
       
   226                 )
       
   227         else:
       
   228             self._unlink = None
       
   229 
       
   230     def accept(self):
       
   231         s, self._last_accepted = self._socket.accept()
       
   232         fd = duplicate(s.fileno())
       
   233         conn = _multiprocessing.Connection(fd)
       
   234         s.close()
       
   235         return conn
       
   236 
       
   237     def close(self):
       
   238         self._socket.close()
       
   239         if self._unlink is not None:
       
   240             self._unlink()
       
   241 
       
   242 
       
   243 def SocketClient(address):
       
   244     '''
       
   245     Return a connection object connected to the socket given by `address`
       
   246     '''
       
   247     family = address_type(address)
       
   248     s = socket.socket( getattr(socket, family) )
       
   249 
       
   250     while 1:
       
   251         try:
       
   252             s.connect(address)
       
   253         except socket.error, e:
       
   254             if e.args[0] != errno.ECONNREFUSED: # connection refused
       
   255                 debug('failed to connect to address %s', address)
       
   256                 raise
       
   257             time.sleep(0.01)
       
   258         else:
       
   259             break
       
   260     else:
       
   261         raise
       
   262 
       
   263     fd = duplicate(s.fileno())
       
   264     conn = _multiprocessing.Connection(fd)
       
   265     s.close()
       
   266     return conn
       
   267 
       
   268 #
       
   269 # Definitions for connections based on named pipes
       
   270 #
       
   271 
       
   272 if sys.platform == 'win32':
       
   273 
       
   274     class PipeListener(object):
       
   275         '''
       
   276         Representation of a named pipe
       
   277         '''
       
   278         def __init__(self, address, backlog=None):
       
   279             self._address = address
       
   280             handle = win32.CreateNamedPipe(
       
   281                 address, win32.PIPE_ACCESS_DUPLEX,
       
   282                 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
       
   283                 win32.PIPE_WAIT,
       
   284                 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
       
   285                 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
       
   286                 )
       
   287             self._handle_queue = [handle]
       
   288             self._last_accepted = None
       
   289 
       
   290             sub_debug('listener created with address=%r', self._address)
       
   291 
       
   292             self.close = Finalize(
       
   293                 self, PipeListener._finalize_pipe_listener,
       
   294                 args=(self._handle_queue, self._address), exitpriority=0
       
   295                 )
       
   296 
       
   297         def accept(self):
       
   298             newhandle = win32.CreateNamedPipe(
       
   299                 self._address, win32.PIPE_ACCESS_DUPLEX,
       
   300                 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
       
   301                 win32.PIPE_WAIT,
       
   302                 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
       
   303                 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
       
   304                 )
       
   305             self._handle_queue.append(newhandle)
       
   306             handle = self._handle_queue.pop(0)
       
   307             try:
       
   308                 win32.ConnectNamedPipe(handle, win32.NULL)
       
   309             except WindowsError, e:
       
   310                 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
       
   311                     raise
       
   312             return _multiprocessing.PipeConnection(handle)
       
   313 
       
   314         @staticmethod
       
   315         def _finalize_pipe_listener(queue, address):
       
   316             sub_debug('closing listener with address=%r', address)
       
   317             for handle in queue:
       
   318                 close(handle)
       
   319 
       
   320     def PipeClient(address):
       
   321         '''
       
   322         Return a connection object connected to the pipe given by `address`
       
   323         '''
       
   324         while 1:
       
   325             try:
       
   326                 win32.WaitNamedPipe(address, 1000)
       
   327                 h = win32.CreateFile(
       
   328                     address, win32.GENERIC_READ | win32.GENERIC_WRITE,
       
   329                     0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
       
   330                     )
       
   331             except WindowsError, e:
       
   332                 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
       
   333                                      win32.ERROR_PIPE_BUSY):
       
   334                     raise
       
   335             else:
       
   336                 break
       
   337         else:
       
   338             raise
       
   339 
       
   340         win32.SetNamedPipeHandleState(
       
   341             h, win32.PIPE_READMODE_MESSAGE, None, None
       
   342             )
       
   343         return _multiprocessing.PipeConnection(h)
       
   344 
       
   345 #
       
   346 # Authentication stuff
       
   347 #
       
   348 
       
   349 MESSAGE_LENGTH = 20
       
   350 
       
   351 CHALLENGE = b'#CHALLENGE#'
       
   352 WELCOME = b'#WELCOME#'
       
   353 FAILURE = b'#FAILURE#'
       
   354 
       
   355 def deliver_challenge(connection, authkey):
       
   356     import hmac
       
   357     assert isinstance(authkey, bytes)
       
   358     message = os.urandom(MESSAGE_LENGTH)
       
   359     connection.send_bytes(CHALLENGE + message)
       
   360     digest = hmac.new(authkey, message).digest()
       
   361     response = connection.recv_bytes(256)        # reject large message
       
   362     if response == digest:
       
   363         connection.send_bytes(WELCOME)
       
   364     else:
       
   365         connection.send_bytes(FAILURE)
       
   366         raise AuthenticationError('digest received was wrong')
       
   367 
       
   368 def answer_challenge(connection, authkey):
       
   369     import hmac
       
   370     assert isinstance(authkey, bytes)
       
   371     message = connection.recv_bytes(256)         # reject large message
       
   372     assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
       
   373     message = message[len(CHALLENGE):]
       
   374     digest = hmac.new(authkey, message).digest()
       
   375     connection.send_bytes(digest)
       
   376     response = connection.recv_bytes(256)        # reject large message
       
   377     if response != WELCOME:
       
   378         raise AuthenticationError('digest sent was rejected')
       
   379 
       
   380 #
       
   381 # Support for using xmlrpclib for serialization
       
   382 #
       
   383 
       
   384 class ConnectionWrapper(object):
       
   385     def __init__(self, conn, dumps, loads):
       
   386         self._conn = conn
       
   387         self._dumps = dumps
       
   388         self._loads = loads
       
   389         for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
       
   390             obj = getattr(conn, attr)
       
   391             setattr(self, attr, obj)
       
   392     def send(self, obj):
       
   393         s = self._dumps(obj)
       
   394         self._conn.send_bytes(s)
       
   395     def recv(self):
       
   396         s = self._conn.recv_bytes()
       
   397         return self._loads(s)
       
   398 
       
   399 def _xml_dumps(obj):
       
   400     return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
       
   401 
       
   402 def _xml_loads(s):
       
   403     (obj,), method = xmlrpclib.loads(s.decode('utf8'))
       
   404     return obj
       
   405 
       
   406 class XmlListener(Listener):
       
   407     def accept(self):
       
   408         global xmlrpclib
       
   409         import xmlrpclib
       
   410         obj = Listener.accept(self)
       
   411         return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
       
   412 
       
   413 def XmlClient(*args, **kwds):
       
   414     global xmlrpclib
       
   415     import xmlrpclib
       
   416     return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)