|
1 """RPC Implemention, originally written for the Python Idle IDE |
|
2 |
|
3 For security reasons, GvR requested that Idle's Python execution server process |
|
4 connect to the Idle process, which listens for the connection. Since Idle has |
|
5 has only one client per server, this was not a limitation. |
|
6 |
|
7 +---------------------------------+ +-------------+ |
|
8 | SocketServer.BaseRequestHandler | | SocketIO | |
|
9 +---------------------------------+ +-------------+ |
|
10 ^ | register() | |
|
11 | | unregister()| |
|
12 | +-------------+ |
|
13 | ^ ^ |
|
14 | | | |
|
15 | + -------------------+ | |
|
16 | | | |
|
17 +-------------------------+ +-----------------+ |
|
18 | RPCHandler | | RPCClient | |
|
19 | [attribute of RPCServer]| | | |
|
20 +-------------------------+ +-----------------+ |
|
21 |
|
22 The RPCServer handler class is expected to provide register/unregister methods. |
|
23 RPCHandler inherits the mix-in class SocketIO, which provides these methods. |
|
24 |
|
25 See the Idle run.main() docstring for further information on how this was |
|
26 accomplished in Idle. |
|
27 |
|
28 """ |
|
29 |
|
30 import sys |
|
31 import os |
|
32 import socket |
|
33 import select |
|
34 import SocketServer |
|
35 import struct |
|
36 import cPickle as pickle |
|
37 import threading |
|
38 import Queue |
|
39 import traceback |
|
40 import copy_reg |
|
41 import types |
|
42 import marshal |
|
43 |
|
44 |
|
45 def unpickle_code(ms): |
|
46 co = marshal.loads(ms) |
|
47 assert isinstance(co, types.CodeType) |
|
48 return co |
|
49 |
|
50 def pickle_code(co): |
|
51 assert isinstance(co, types.CodeType) |
|
52 ms = marshal.dumps(co) |
|
53 return unpickle_code, (ms,) |
|
54 |
|
55 # XXX KBK 24Aug02 function pickling capability not used in Idle |
|
56 # def unpickle_function(ms): |
|
57 # return ms |
|
58 |
|
59 # def pickle_function(fn): |
|
60 # assert isinstance(fn, type.FunctionType) |
|
61 # return repr(fn) |
|
62 |
|
63 copy_reg.pickle(types.CodeType, pickle_code, unpickle_code) |
|
64 # copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function) |
|
65 |
|
66 BUFSIZE = 8*1024 |
|
67 LOCALHOST = '127.0.0.1' |
|
68 |
|
69 class RPCServer(SocketServer.TCPServer): |
|
70 |
|
71 def __init__(self, addr, handlerclass=None): |
|
72 if handlerclass is None: |
|
73 handlerclass = RPCHandler |
|
74 SocketServer.TCPServer.__init__(self, addr, handlerclass) |
|
75 |
|
76 def server_bind(self): |
|
77 "Override TCPServer method, no bind() phase for connecting entity" |
|
78 pass |
|
79 |
|
80 def server_activate(self): |
|
81 """Override TCPServer method, connect() instead of listen() |
|
82 |
|
83 Due to the reversed connection, self.server_address is actually the |
|
84 address of the Idle Client to which we are connecting. |
|
85 |
|
86 """ |
|
87 self.socket.connect(self.server_address) |
|
88 |
|
89 def get_request(self): |
|
90 "Override TCPServer method, return already connected socket" |
|
91 return self.socket, self.server_address |
|
92 |
|
93 def handle_error(self, request, client_address): |
|
94 """Override TCPServer method |
|
95 |
|
96 Error message goes to __stderr__. No error message if exiting |
|
97 normally or socket raised EOF. Other exceptions not handled in |
|
98 server code will cause os._exit. |
|
99 |
|
100 """ |
|
101 try: |
|
102 raise |
|
103 except SystemExit: |
|
104 raise |
|
105 except: |
|
106 erf = sys.__stderr__ |
|
107 print>>erf, '\n' + '-'*40 |
|
108 print>>erf, 'Unhandled server exception!' |
|
109 print>>erf, 'Thread: %s' % threading.currentThread().getName() |
|
110 print>>erf, 'Client Address: ', client_address |
|
111 print>>erf, 'Request: ', repr(request) |
|
112 traceback.print_exc(file=erf) |
|
113 print>>erf, '\n*** Unrecoverable, server exiting!' |
|
114 print>>erf, '-'*40 |
|
115 os._exit(0) |
|
116 |
|
117 #----------------- end class RPCServer -------------------- |
|
118 |
|
119 objecttable = {} |
|
120 request_queue = Queue.Queue(0) |
|
121 response_queue = Queue.Queue(0) |
|
122 |
|
123 |
|
124 class SocketIO(object): |
|
125 |
|
126 nextseq = 0 |
|
127 |
|
128 def __init__(self, sock, objtable=None, debugging=None): |
|
129 self.sockthread = threading.currentThread() |
|
130 if debugging is not None: |
|
131 self.debugging = debugging |
|
132 self.sock = sock |
|
133 if objtable is None: |
|
134 objtable = objecttable |
|
135 self.objtable = objtable |
|
136 self.responses = {} |
|
137 self.cvars = {} |
|
138 |
|
139 def close(self): |
|
140 sock = self.sock |
|
141 self.sock = None |
|
142 if sock is not None: |
|
143 sock.close() |
|
144 |
|
145 def exithook(self): |
|
146 "override for specific exit action" |
|
147 os._exit() |
|
148 |
|
149 def debug(self, *args): |
|
150 if not self.debugging: |
|
151 return |
|
152 s = self.location + " " + str(threading.currentThread().getName()) |
|
153 for a in args: |
|
154 s = s + " " + str(a) |
|
155 print>>sys.__stderr__, s |
|
156 |
|
157 def register(self, oid, object): |
|
158 self.objtable[oid] = object |
|
159 |
|
160 def unregister(self, oid): |
|
161 try: |
|
162 del self.objtable[oid] |
|
163 except KeyError: |
|
164 pass |
|
165 |
|
166 def localcall(self, seq, request): |
|
167 self.debug("localcall:", request) |
|
168 try: |
|
169 how, (oid, methodname, args, kwargs) = request |
|
170 except TypeError: |
|
171 return ("ERROR", "Bad request format") |
|
172 if not self.objtable.has_key(oid): |
|
173 return ("ERROR", "Unknown object id: %r" % (oid,)) |
|
174 obj = self.objtable[oid] |
|
175 if methodname == "__methods__": |
|
176 methods = {} |
|
177 _getmethods(obj, methods) |
|
178 return ("OK", methods) |
|
179 if methodname == "__attributes__": |
|
180 attributes = {} |
|
181 _getattributes(obj, attributes) |
|
182 return ("OK", attributes) |
|
183 if not hasattr(obj, methodname): |
|
184 return ("ERROR", "Unsupported method name: %r" % (methodname,)) |
|
185 method = getattr(obj, methodname) |
|
186 try: |
|
187 if how == 'CALL': |
|
188 ret = method(*args, **kwargs) |
|
189 if isinstance(ret, RemoteObject): |
|
190 ret = remoteref(ret) |
|
191 return ("OK", ret) |
|
192 elif how == 'QUEUE': |
|
193 request_queue.put((seq, (method, args, kwargs))) |
|
194 return("QUEUED", None) |
|
195 else: |
|
196 return ("ERROR", "Unsupported message type: %s" % how) |
|
197 except SystemExit: |
|
198 raise |
|
199 except socket.error: |
|
200 raise |
|
201 except: |
|
202 msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\ |
|
203 " Object: %s \n Method: %s \n Args: %s\n" |
|
204 print>>sys.__stderr__, msg % (oid, method, args) |
|
205 traceback.print_exc(file=sys.__stderr__) |
|
206 return ("EXCEPTION", None) |
|
207 |
|
208 def remotecall(self, oid, methodname, args, kwargs): |
|
209 self.debug("remotecall:asynccall: ", oid, methodname) |
|
210 seq = self.asynccall(oid, methodname, args, kwargs) |
|
211 return self.asyncreturn(seq) |
|
212 |
|
213 def remotequeue(self, oid, methodname, args, kwargs): |
|
214 self.debug("remotequeue:asyncqueue: ", oid, methodname) |
|
215 seq = self.asyncqueue(oid, methodname, args, kwargs) |
|
216 return self.asyncreturn(seq) |
|
217 |
|
218 def asynccall(self, oid, methodname, args, kwargs): |
|
219 request = ("CALL", (oid, methodname, args, kwargs)) |
|
220 seq = self.newseq() |
|
221 if threading.currentThread() != self.sockthread: |
|
222 cvar = threading.Condition() |
|
223 self.cvars[seq] = cvar |
|
224 self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs) |
|
225 self.putmessage((seq, request)) |
|
226 return seq |
|
227 |
|
228 def asyncqueue(self, oid, methodname, args, kwargs): |
|
229 request = ("QUEUE", (oid, methodname, args, kwargs)) |
|
230 seq = self.newseq() |
|
231 if threading.currentThread() != self.sockthread: |
|
232 cvar = threading.Condition() |
|
233 self.cvars[seq] = cvar |
|
234 self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs) |
|
235 self.putmessage((seq, request)) |
|
236 return seq |
|
237 |
|
238 def asyncreturn(self, seq): |
|
239 self.debug("asyncreturn:%d:call getresponse(): " % seq) |
|
240 response = self.getresponse(seq, wait=0.05) |
|
241 self.debug(("asyncreturn:%d:response: " % seq), response) |
|
242 return self.decoderesponse(response) |
|
243 |
|
244 def decoderesponse(self, response): |
|
245 how, what = response |
|
246 if how == "OK": |
|
247 return what |
|
248 if how == "QUEUED": |
|
249 return None |
|
250 if how == "EXCEPTION": |
|
251 self.debug("decoderesponse: EXCEPTION") |
|
252 return None |
|
253 if how == "EOF": |
|
254 self.debug("decoderesponse: EOF") |
|
255 self.decode_interrupthook() |
|
256 return None |
|
257 if how == "ERROR": |
|
258 self.debug("decoderesponse: Internal ERROR:", what) |
|
259 raise RuntimeError, what |
|
260 raise SystemError, (how, what) |
|
261 |
|
262 def decode_interrupthook(self): |
|
263 "" |
|
264 raise EOFError |
|
265 |
|
266 def mainloop(self): |
|
267 """Listen on socket until I/O not ready or EOF |
|
268 |
|
269 pollresponse() will loop looking for seq number None, which |
|
270 never comes, and exit on EOFError. |
|
271 |
|
272 """ |
|
273 try: |
|
274 self.getresponse(myseq=None, wait=0.05) |
|
275 except EOFError: |
|
276 self.debug("mainloop:return") |
|
277 return |
|
278 |
|
279 def getresponse(self, myseq, wait): |
|
280 response = self._getresponse(myseq, wait) |
|
281 if response is not None: |
|
282 how, what = response |
|
283 if how == "OK": |
|
284 response = how, self._proxify(what) |
|
285 return response |
|
286 |
|
287 def _proxify(self, obj): |
|
288 if isinstance(obj, RemoteProxy): |
|
289 return RPCProxy(self, obj.oid) |
|
290 if isinstance(obj, types.ListType): |
|
291 return map(self._proxify, obj) |
|
292 # XXX Check for other types -- not currently needed |
|
293 return obj |
|
294 |
|
295 def _getresponse(self, myseq, wait): |
|
296 self.debug("_getresponse:myseq:", myseq) |
|
297 if threading.currentThread() is self.sockthread: |
|
298 # this thread does all reading of requests or responses |
|
299 while 1: |
|
300 response = self.pollresponse(myseq, wait) |
|
301 if response is not None: |
|
302 return response |
|
303 else: |
|
304 # wait for notification from socket handling thread |
|
305 cvar = self.cvars[myseq] |
|
306 cvar.acquire() |
|
307 while not self.responses.has_key(myseq): |
|
308 cvar.wait() |
|
309 response = self.responses[myseq] |
|
310 self.debug("_getresponse:%s: thread woke up: response: %s" % |
|
311 (myseq, response)) |
|
312 del self.responses[myseq] |
|
313 del self.cvars[myseq] |
|
314 cvar.release() |
|
315 return response |
|
316 |
|
317 def newseq(self): |
|
318 self.nextseq = seq = self.nextseq + 2 |
|
319 return seq |
|
320 |
|
321 def putmessage(self, message): |
|
322 self.debug("putmessage:%d:" % message[0]) |
|
323 try: |
|
324 s = pickle.dumps(message) |
|
325 except pickle.PicklingError: |
|
326 print >>sys.__stderr__, "Cannot pickle:", repr(message) |
|
327 raise |
|
328 s = struct.pack("<i", len(s)) + s |
|
329 while len(s) > 0: |
|
330 try: |
|
331 r, w, x = select.select([], [self.sock], []) |
|
332 n = self.sock.send(s[:BUFSIZE]) |
|
333 except (AttributeError, TypeError): |
|
334 raise IOError, "socket no longer exists" |
|
335 except socket.error: |
|
336 raise |
|
337 else: |
|
338 s = s[n:] |
|
339 |
|
340 buffer = "" |
|
341 bufneed = 4 |
|
342 bufstate = 0 # meaning: 0 => reading count; 1 => reading data |
|
343 |
|
344 def pollpacket(self, wait): |
|
345 self._stage0() |
|
346 if len(self.buffer) < self.bufneed: |
|
347 r, w, x = select.select([self.sock.fileno()], [], [], wait) |
|
348 if len(r) == 0: |
|
349 return None |
|
350 try: |
|
351 s = self.sock.recv(BUFSIZE) |
|
352 except socket.error: |
|
353 raise EOFError |
|
354 if len(s) == 0: |
|
355 raise EOFError |
|
356 self.buffer += s |
|
357 self._stage0() |
|
358 return self._stage1() |
|
359 |
|
360 def _stage0(self): |
|
361 if self.bufstate == 0 and len(self.buffer) >= 4: |
|
362 s = self.buffer[:4] |
|
363 self.buffer = self.buffer[4:] |
|
364 self.bufneed = struct.unpack("<i", s)[0] |
|
365 self.bufstate = 1 |
|
366 |
|
367 def _stage1(self): |
|
368 if self.bufstate == 1 and len(self.buffer) >= self.bufneed: |
|
369 packet = self.buffer[:self.bufneed] |
|
370 self.buffer = self.buffer[self.bufneed:] |
|
371 self.bufneed = 4 |
|
372 self.bufstate = 0 |
|
373 return packet |
|
374 |
|
375 def pollmessage(self, wait): |
|
376 packet = self.pollpacket(wait) |
|
377 if packet is None: |
|
378 return None |
|
379 try: |
|
380 message = pickle.loads(packet) |
|
381 except pickle.UnpicklingError: |
|
382 print >>sys.__stderr__, "-----------------------" |
|
383 print >>sys.__stderr__, "cannot unpickle packet:", repr(packet) |
|
384 traceback.print_stack(file=sys.__stderr__) |
|
385 print >>sys.__stderr__, "-----------------------" |
|
386 raise |
|
387 return message |
|
388 |
|
389 def pollresponse(self, myseq, wait): |
|
390 """Handle messages received on the socket. |
|
391 |
|
392 Some messages received may be asynchronous 'call' or 'queue' requests, |
|
393 and some may be responses for other threads. |
|
394 |
|
395 'call' requests are passed to self.localcall() with the expectation of |
|
396 immediate execution, during which time the socket is not serviced. |
|
397 |
|
398 'queue' requests are used for tasks (which may block or hang) to be |
|
399 processed in a different thread. These requests are fed into |
|
400 request_queue by self.localcall(). Responses to queued requests are |
|
401 taken from response_queue and sent across the link with the associated |
|
402 sequence numbers. Messages in the queues are (sequence_number, |
|
403 request/response) tuples and code using this module removing messages |
|
404 from the request_queue is responsible for returning the correct |
|
405 sequence number in the response_queue. |
|
406 |
|
407 pollresponse() will loop until a response message with the myseq |
|
408 sequence number is received, and will save other responses in |
|
409 self.responses and notify the owning thread. |
|
410 |
|
411 """ |
|
412 while 1: |
|
413 # send queued response if there is one available |
|
414 try: |
|
415 qmsg = response_queue.get(0) |
|
416 except Queue.Empty: |
|
417 pass |
|
418 else: |
|
419 seq, response = qmsg |
|
420 message = (seq, ('OK', response)) |
|
421 self.putmessage(message) |
|
422 # poll for message on link |
|
423 try: |
|
424 message = self.pollmessage(wait) |
|
425 if message is None: # socket not ready |
|
426 return None |
|
427 except EOFError: |
|
428 self.handle_EOF() |
|
429 return None |
|
430 except AttributeError: |
|
431 return None |
|
432 seq, resq = message |
|
433 how = resq[0] |
|
434 self.debug("pollresponse:%d:myseq:%s" % (seq, myseq)) |
|
435 # process or queue a request |
|
436 if how in ("CALL", "QUEUE"): |
|
437 self.debug("pollresponse:%d:localcall:call:" % seq) |
|
438 response = self.localcall(seq, resq) |
|
439 self.debug("pollresponse:%d:localcall:response:%s" |
|
440 % (seq, response)) |
|
441 if how == "CALL": |
|
442 self.putmessage((seq, response)) |
|
443 elif how == "QUEUE": |
|
444 # don't acknowledge the 'queue' request! |
|
445 pass |
|
446 continue |
|
447 # return if completed message transaction |
|
448 elif seq == myseq: |
|
449 return resq |
|
450 # must be a response for a different thread: |
|
451 else: |
|
452 cv = self.cvars.get(seq, None) |
|
453 # response involving unknown sequence number is discarded, |
|
454 # probably intended for prior incarnation of server |
|
455 if cv is not None: |
|
456 cv.acquire() |
|
457 self.responses[seq] = resq |
|
458 cv.notify() |
|
459 cv.release() |
|
460 continue |
|
461 |
|
462 def handle_EOF(self): |
|
463 "action taken upon link being closed by peer" |
|
464 self.EOFhook() |
|
465 self.debug("handle_EOF") |
|
466 for key in self.cvars: |
|
467 cv = self.cvars[key] |
|
468 cv.acquire() |
|
469 self.responses[key] = ('EOF', None) |
|
470 cv.notify() |
|
471 cv.release() |
|
472 # call our (possibly overridden) exit function |
|
473 self.exithook() |
|
474 |
|
475 def EOFhook(self): |
|
476 "Classes using rpc client/server can override to augment EOF action" |
|
477 pass |
|
478 |
|
479 #----------------- end class SocketIO -------------------- |
|
480 |
|
481 class RemoteObject(object): |
|
482 # Token mix-in class |
|
483 pass |
|
484 |
|
485 def remoteref(obj): |
|
486 oid = id(obj) |
|
487 objecttable[oid] = obj |
|
488 return RemoteProxy(oid) |
|
489 |
|
490 class RemoteProxy(object): |
|
491 |
|
492 def __init__(self, oid): |
|
493 self.oid = oid |
|
494 |
|
495 class RPCHandler(SocketServer.BaseRequestHandler, SocketIO): |
|
496 |
|
497 debugging = False |
|
498 location = "#S" # Server |
|
499 |
|
500 def __init__(self, sock, addr, svr): |
|
501 svr.current_handler = self ## cgt xxx |
|
502 SocketIO.__init__(self, sock) |
|
503 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr) |
|
504 |
|
505 def handle(self): |
|
506 "handle() method required by SocketServer" |
|
507 self.mainloop() |
|
508 |
|
509 def get_remote_proxy(self, oid): |
|
510 return RPCProxy(self, oid) |
|
511 |
|
512 class RPCClient(SocketIO): |
|
513 |
|
514 debugging = False |
|
515 location = "#C" # Client |
|
516 |
|
517 nextseq = 1 # Requests coming from the client are odd numbered |
|
518 |
|
519 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM): |
|
520 self.listening_sock = socket.socket(family, type) |
|
521 self.listening_sock.setsockopt(socket.SOL_SOCKET, |
|
522 socket.SO_REUSEADDR, 1) |
|
523 self.listening_sock.bind(address) |
|
524 self.listening_sock.listen(1) |
|
525 |
|
526 def accept(self): |
|
527 working_sock, address = self.listening_sock.accept() |
|
528 if self.debugging: |
|
529 print>>sys.__stderr__, "****** Connection request from ", address |
|
530 if address[0] == LOCALHOST: |
|
531 SocketIO.__init__(self, working_sock) |
|
532 else: |
|
533 print>>sys.__stderr__, "** Invalid host: ", address |
|
534 raise socket.error |
|
535 |
|
536 def get_remote_proxy(self, oid): |
|
537 return RPCProxy(self, oid) |
|
538 |
|
539 class RPCProxy(object): |
|
540 |
|
541 __methods = None |
|
542 __attributes = None |
|
543 |
|
544 def __init__(self, sockio, oid): |
|
545 self.sockio = sockio |
|
546 self.oid = oid |
|
547 |
|
548 def __getattr__(self, name): |
|
549 if self.__methods is None: |
|
550 self.__getmethods() |
|
551 if self.__methods.get(name): |
|
552 return MethodProxy(self.sockio, self.oid, name) |
|
553 if self.__attributes is None: |
|
554 self.__getattributes() |
|
555 if self.__attributes.has_key(name): |
|
556 value = self.sockio.remotecall(self.oid, '__getattribute__', |
|
557 (name,), {}) |
|
558 return value |
|
559 else: |
|
560 raise AttributeError, name |
|
561 |
|
562 def __getattributes(self): |
|
563 self.__attributes = self.sockio.remotecall(self.oid, |
|
564 "__attributes__", (), {}) |
|
565 |
|
566 def __getmethods(self): |
|
567 self.__methods = self.sockio.remotecall(self.oid, |
|
568 "__methods__", (), {}) |
|
569 |
|
570 def _getmethods(obj, methods): |
|
571 # Helper to get a list of methods from an object |
|
572 # Adds names to dictionary argument 'methods' |
|
573 for name in dir(obj): |
|
574 attr = getattr(obj, name) |
|
575 if callable(attr): |
|
576 methods[name] = 1 |
|
577 if type(obj) == types.InstanceType: |
|
578 _getmethods(obj.__class__, methods) |
|
579 if type(obj) == types.ClassType: |
|
580 for super in obj.__bases__: |
|
581 _getmethods(super, methods) |
|
582 |
|
583 def _getattributes(obj, attributes): |
|
584 for name in dir(obj): |
|
585 attr = getattr(obj, name) |
|
586 if not callable(attr): |
|
587 attributes[name] = 1 |
|
588 |
|
589 class MethodProxy(object): |
|
590 |
|
591 def __init__(self, sockio, oid, name): |
|
592 self.sockio = sockio |
|
593 self.oid = oid |
|
594 self.name = name |
|
595 |
|
596 def __call__(self, *args, **kwargs): |
|
597 value = self.sockio.remotecall(self.oid, self.name, args, kwargs) |
|
598 return value |
|
599 |
|
600 |
|
601 # XXX KBK 09Sep03 We need a proper unit test for this module. Previously |
|
602 # existing test code was removed at Rev 1.27. |