|
1 # |
|
2 # A higher level module for using sockets (or Windows named pipes) |
|
3 # |
|
4 # multiprocessing/connection.py |
|
5 # |
|
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt |
|
7 # |
|
8 |
|
9 __all__ = [ 'Client', 'Listener', 'Pipe' ] |
|
10 |
|
11 import os |
|
12 import sys |
|
13 import socket |
|
14 import errno |
|
15 import time |
|
16 import tempfile |
|
17 import itertools |
|
18 |
|
19 import _multiprocessing |
|
20 from multiprocessing import current_process, AuthenticationError |
|
21 from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug |
|
22 from multiprocessing.forking import duplicate, close |
|
23 |
|
24 |
|
25 # |
|
26 # |
|
27 # |
|
28 |
|
29 BUFSIZE = 8192 |
|
30 |
|
31 _mmap_counter = itertools.count() |
|
32 |
|
33 default_family = 'AF_INET' |
|
34 families = ['AF_INET'] |
|
35 |
|
36 if hasattr(socket, 'AF_UNIX'): |
|
37 default_family = 'AF_UNIX' |
|
38 families += ['AF_UNIX'] |
|
39 |
|
40 if sys.platform == 'win32': |
|
41 default_family = 'AF_PIPE' |
|
42 families += ['AF_PIPE'] |
|
43 |
|
44 # |
|
45 # |
|
46 # |
|
47 |
|
48 def arbitrary_address(family): |
|
49 ''' |
|
50 Return an arbitrary free address for the given family |
|
51 ''' |
|
52 if family == 'AF_INET': |
|
53 return ('localhost', 0) |
|
54 elif family == 'AF_UNIX': |
|
55 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir()) |
|
56 elif family == 'AF_PIPE': |
|
57 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % |
|
58 (os.getpid(), _mmap_counter.next())) |
|
59 else: |
|
60 raise ValueError('unrecognized family') |
|
61 |
|
62 |
|
63 def address_type(address): |
|
64 ''' |
|
65 Return the types of the address |
|
66 |
|
67 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' |
|
68 ''' |
|
69 if type(address) == tuple: |
|
70 return 'AF_INET' |
|
71 elif type(address) is str and address.startswith('\\\\'): |
|
72 return 'AF_PIPE' |
|
73 elif type(address) is str: |
|
74 return 'AF_UNIX' |
|
75 else: |
|
76 raise ValueError('address type of %r unrecognized' % address) |
|
77 |
|
78 # |
|
79 # Public functions |
|
80 # |
|
81 |
|
82 class Listener(object): |
|
83 ''' |
|
84 Returns a listener object. |
|
85 |
|
86 This is a wrapper for a bound socket which is 'listening' for |
|
87 connections, or for a Windows named pipe. |
|
88 ''' |
|
89 def __init__(self, address=None, family=None, backlog=1, authkey=None): |
|
90 family = family or (address and address_type(address)) \ |
|
91 or default_family |
|
92 address = address or arbitrary_address(family) |
|
93 |
|
94 if family == 'AF_PIPE': |
|
95 self._listener = PipeListener(address, backlog) |
|
96 else: |
|
97 self._listener = SocketListener(address, family, backlog) |
|
98 |
|
99 if authkey is not None and not isinstance(authkey, bytes): |
|
100 raise TypeError, 'authkey should be a byte string' |
|
101 |
|
102 self._authkey = authkey |
|
103 |
|
104 def accept(self): |
|
105 ''' |
|
106 Accept a connection on the bound socket or named pipe of `self`. |
|
107 |
|
108 Returns a `Connection` object. |
|
109 ''' |
|
110 c = self._listener.accept() |
|
111 if self._authkey: |
|
112 deliver_challenge(c, self._authkey) |
|
113 answer_challenge(c, self._authkey) |
|
114 return c |
|
115 |
|
116 def close(self): |
|
117 ''' |
|
118 Close the bound socket or named pipe of `self`. |
|
119 ''' |
|
120 return self._listener.close() |
|
121 |
|
122 address = property(lambda self: self._listener._address) |
|
123 last_accepted = property(lambda self: self._listener._last_accepted) |
|
124 |
|
125 |
|
126 def Client(address, family=None, authkey=None): |
|
127 ''' |
|
128 Returns a connection to the address of a `Listener` |
|
129 ''' |
|
130 family = family or address_type(address) |
|
131 if family == 'AF_PIPE': |
|
132 c = PipeClient(address) |
|
133 else: |
|
134 c = SocketClient(address) |
|
135 |
|
136 if authkey is not None and not isinstance(authkey, bytes): |
|
137 raise TypeError, 'authkey should be a byte string' |
|
138 |
|
139 if authkey is not None: |
|
140 answer_challenge(c, authkey) |
|
141 deliver_challenge(c, authkey) |
|
142 |
|
143 return c |
|
144 |
|
145 |
|
146 if sys.platform != 'win32': |
|
147 |
|
148 def Pipe(duplex=True): |
|
149 ''' |
|
150 Returns pair of connection objects at either end of a pipe |
|
151 ''' |
|
152 if duplex: |
|
153 s1, s2 = socket.socketpair() |
|
154 c1 = _multiprocessing.Connection(os.dup(s1.fileno())) |
|
155 c2 = _multiprocessing.Connection(os.dup(s2.fileno())) |
|
156 s1.close() |
|
157 s2.close() |
|
158 else: |
|
159 fd1, fd2 = os.pipe() |
|
160 c1 = _multiprocessing.Connection(fd1, writable=False) |
|
161 c2 = _multiprocessing.Connection(fd2, readable=False) |
|
162 |
|
163 return c1, c2 |
|
164 |
|
165 else: |
|
166 |
|
167 from ._multiprocessing import win32 |
|
168 |
|
169 def Pipe(duplex=True): |
|
170 ''' |
|
171 Returns pair of connection objects at either end of a pipe |
|
172 ''' |
|
173 address = arbitrary_address('AF_PIPE') |
|
174 if duplex: |
|
175 openmode = win32.PIPE_ACCESS_DUPLEX |
|
176 access = win32.GENERIC_READ | win32.GENERIC_WRITE |
|
177 obsize, ibsize = BUFSIZE, BUFSIZE |
|
178 else: |
|
179 openmode = win32.PIPE_ACCESS_INBOUND |
|
180 access = win32.GENERIC_WRITE |
|
181 obsize, ibsize = 0, BUFSIZE |
|
182 |
|
183 h1 = win32.CreateNamedPipe( |
|
184 address, openmode, |
|
185 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | |
|
186 win32.PIPE_WAIT, |
|
187 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL |
|
188 ) |
|
189 h2 = win32.CreateFile( |
|
190 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL |
|
191 ) |
|
192 win32.SetNamedPipeHandleState( |
|
193 h2, win32.PIPE_READMODE_MESSAGE, None, None |
|
194 ) |
|
195 |
|
196 try: |
|
197 win32.ConnectNamedPipe(h1, win32.NULL) |
|
198 except WindowsError, e: |
|
199 if e.args[0] != win32.ERROR_PIPE_CONNECTED: |
|
200 raise |
|
201 |
|
202 c1 = _multiprocessing.PipeConnection(h1, writable=duplex) |
|
203 c2 = _multiprocessing.PipeConnection(h2, readable=duplex) |
|
204 |
|
205 return c1, c2 |
|
206 |
|
207 # |
|
208 # Definitions for connections based on sockets |
|
209 # |
|
210 |
|
211 class SocketListener(object): |
|
212 ''' |
|
213 Representation of a socket which is bound to an address and listening |
|
214 ''' |
|
215 def __init__(self, address, family, backlog=1): |
|
216 self._socket = socket.socket(getattr(socket, family)) |
|
217 self._socket.bind(address) |
|
218 self._socket.listen(backlog) |
|
219 self._address = self._socket.getsockname() |
|
220 self._family = family |
|
221 self._last_accepted = None |
|
222 |
|
223 if family == 'AF_UNIX': |
|
224 self._unlink = Finalize( |
|
225 self, os.unlink, args=(address,), exitpriority=0 |
|
226 ) |
|
227 else: |
|
228 self._unlink = None |
|
229 |
|
230 def accept(self): |
|
231 s, self._last_accepted = self._socket.accept() |
|
232 fd = duplicate(s.fileno()) |
|
233 conn = _multiprocessing.Connection(fd) |
|
234 s.close() |
|
235 return conn |
|
236 |
|
237 def close(self): |
|
238 self._socket.close() |
|
239 if self._unlink is not None: |
|
240 self._unlink() |
|
241 |
|
242 |
|
243 def SocketClient(address): |
|
244 ''' |
|
245 Return a connection object connected to the socket given by `address` |
|
246 ''' |
|
247 family = address_type(address) |
|
248 s = socket.socket( getattr(socket, family) ) |
|
249 |
|
250 while 1: |
|
251 try: |
|
252 s.connect(address) |
|
253 except socket.error, e: |
|
254 if e.args[0] != errno.ECONNREFUSED: # connection refused |
|
255 debug('failed to connect to address %s', address) |
|
256 raise |
|
257 time.sleep(0.01) |
|
258 else: |
|
259 break |
|
260 else: |
|
261 raise |
|
262 |
|
263 fd = duplicate(s.fileno()) |
|
264 conn = _multiprocessing.Connection(fd) |
|
265 s.close() |
|
266 return conn |
|
267 |
|
268 # |
|
269 # Definitions for connections based on named pipes |
|
270 # |
|
271 |
|
272 if sys.platform == 'win32': |
|
273 |
|
274 class PipeListener(object): |
|
275 ''' |
|
276 Representation of a named pipe |
|
277 ''' |
|
278 def __init__(self, address, backlog=None): |
|
279 self._address = address |
|
280 handle = win32.CreateNamedPipe( |
|
281 address, win32.PIPE_ACCESS_DUPLEX, |
|
282 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | |
|
283 win32.PIPE_WAIT, |
|
284 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, |
|
285 win32.NMPWAIT_WAIT_FOREVER, win32.NULL |
|
286 ) |
|
287 self._handle_queue = [handle] |
|
288 self._last_accepted = None |
|
289 |
|
290 sub_debug('listener created with address=%r', self._address) |
|
291 |
|
292 self.close = Finalize( |
|
293 self, PipeListener._finalize_pipe_listener, |
|
294 args=(self._handle_queue, self._address), exitpriority=0 |
|
295 ) |
|
296 |
|
297 def accept(self): |
|
298 newhandle = win32.CreateNamedPipe( |
|
299 self._address, win32.PIPE_ACCESS_DUPLEX, |
|
300 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | |
|
301 win32.PIPE_WAIT, |
|
302 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, |
|
303 win32.NMPWAIT_WAIT_FOREVER, win32.NULL |
|
304 ) |
|
305 self._handle_queue.append(newhandle) |
|
306 handle = self._handle_queue.pop(0) |
|
307 try: |
|
308 win32.ConnectNamedPipe(handle, win32.NULL) |
|
309 except WindowsError, e: |
|
310 if e.args[0] != win32.ERROR_PIPE_CONNECTED: |
|
311 raise |
|
312 return _multiprocessing.PipeConnection(handle) |
|
313 |
|
314 @staticmethod |
|
315 def _finalize_pipe_listener(queue, address): |
|
316 sub_debug('closing listener with address=%r', address) |
|
317 for handle in queue: |
|
318 close(handle) |
|
319 |
|
320 def PipeClient(address): |
|
321 ''' |
|
322 Return a connection object connected to the pipe given by `address` |
|
323 ''' |
|
324 while 1: |
|
325 try: |
|
326 win32.WaitNamedPipe(address, 1000) |
|
327 h = win32.CreateFile( |
|
328 address, win32.GENERIC_READ | win32.GENERIC_WRITE, |
|
329 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL |
|
330 ) |
|
331 except WindowsError, e: |
|
332 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, |
|
333 win32.ERROR_PIPE_BUSY): |
|
334 raise |
|
335 else: |
|
336 break |
|
337 else: |
|
338 raise |
|
339 |
|
340 win32.SetNamedPipeHandleState( |
|
341 h, win32.PIPE_READMODE_MESSAGE, None, None |
|
342 ) |
|
343 return _multiprocessing.PipeConnection(h) |
|
344 |
|
345 # |
|
346 # Authentication stuff |
|
347 # |
|
348 |
|
349 MESSAGE_LENGTH = 20 |
|
350 |
|
351 CHALLENGE = b'#CHALLENGE#' |
|
352 WELCOME = b'#WELCOME#' |
|
353 FAILURE = b'#FAILURE#' |
|
354 |
|
355 def deliver_challenge(connection, authkey): |
|
356 import hmac |
|
357 assert isinstance(authkey, bytes) |
|
358 message = os.urandom(MESSAGE_LENGTH) |
|
359 connection.send_bytes(CHALLENGE + message) |
|
360 digest = hmac.new(authkey, message).digest() |
|
361 response = connection.recv_bytes(256) # reject large message |
|
362 if response == digest: |
|
363 connection.send_bytes(WELCOME) |
|
364 else: |
|
365 connection.send_bytes(FAILURE) |
|
366 raise AuthenticationError('digest received was wrong') |
|
367 |
|
368 def answer_challenge(connection, authkey): |
|
369 import hmac |
|
370 assert isinstance(authkey, bytes) |
|
371 message = connection.recv_bytes(256) # reject large message |
|
372 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message |
|
373 message = message[len(CHALLENGE):] |
|
374 digest = hmac.new(authkey, message).digest() |
|
375 connection.send_bytes(digest) |
|
376 response = connection.recv_bytes(256) # reject large message |
|
377 if response != WELCOME: |
|
378 raise AuthenticationError('digest sent was rejected') |
|
379 |
|
380 # |
|
381 # Support for using xmlrpclib for serialization |
|
382 # |
|
383 |
|
384 class ConnectionWrapper(object): |
|
385 def __init__(self, conn, dumps, loads): |
|
386 self._conn = conn |
|
387 self._dumps = dumps |
|
388 self._loads = loads |
|
389 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): |
|
390 obj = getattr(conn, attr) |
|
391 setattr(self, attr, obj) |
|
392 def send(self, obj): |
|
393 s = self._dumps(obj) |
|
394 self._conn.send_bytes(s) |
|
395 def recv(self): |
|
396 s = self._conn.recv_bytes() |
|
397 return self._loads(s) |
|
398 |
|
399 def _xml_dumps(obj): |
|
400 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') |
|
401 |
|
402 def _xml_loads(s): |
|
403 (obj,), method = xmlrpclib.loads(s.decode('utf8')) |
|
404 return obj |
|
405 |
|
406 class XmlListener(Listener): |
|
407 def accept(self): |
|
408 global xmlrpclib |
|
409 import xmlrpclib |
|
410 obj = Listener.accept(self) |
|
411 return ConnectionWrapper(obj, _xml_dumps, _xml_loads) |
|
412 |
|
413 def XmlClient(*args, **kwds): |
|
414 global xmlrpclib |
|
415 import xmlrpclib |
|
416 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) |