python-2.5.2/win32/Lib/idlelib/rpc.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 """RPC Implemention, originally written for the Python Idle IDE
       
     2 
       
     3 For security reasons, GvR requested that Idle's Python execution server process
       
     4 connect to the Idle process, which listens for the connection.  Since Idle has
       
     5 has only one client per server, this was not a limitation.
       
     6 
       
     7    +---------------------------------+ +-------------+
       
     8    | SocketServer.BaseRequestHandler | | SocketIO    |
       
     9    +---------------------------------+ +-------------+
       
    10                    ^                   | register()  |
       
    11                    |                   | unregister()|
       
    12                    |                   +-------------+
       
    13                    |                      ^  ^
       
    14                    |                      |  |
       
    15                    | + -------------------+  |
       
    16                    | |                       |
       
    17    +-------------------------+        +-----------------+
       
    18    | RPCHandler              |        | RPCClient       |
       
    19    | [attribute of RPCServer]|        |                 |
       
    20    +-------------------------+        +-----------------+
       
    21 
       
    22 The RPCServer handler class is expected to provide register/unregister methods.
       
    23 RPCHandler inherits the mix-in class SocketIO, which provides these methods.
       
    24 
       
    25 See the Idle run.main() docstring for further information on how this was
       
    26 accomplished in Idle.
       
    27 
       
    28 """
       
    29 
       
    30 import sys
       
    31 import os
       
    32 import socket
       
    33 import select
       
    34 import SocketServer
       
    35 import struct
       
    36 import cPickle as pickle
       
    37 import threading
       
    38 import Queue
       
    39 import traceback
       
    40 import copy_reg
       
    41 import types
       
    42 import marshal
       
    43 
       
    44 
       
    45 def unpickle_code(ms):
       
    46     co = marshal.loads(ms)
       
    47     assert isinstance(co, types.CodeType)
       
    48     return co
       
    49 
       
    50 def pickle_code(co):
       
    51     assert isinstance(co, types.CodeType)
       
    52     ms = marshal.dumps(co)
       
    53     return unpickle_code, (ms,)
       
    54 
       
    55 # XXX KBK 24Aug02 function pickling capability not used in Idle
       
    56 #  def unpickle_function(ms):
       
    57 #      return ms
       
    58 
       
    59 #  def pickle_function(fn):
       
    60 #      assert isinstance(fn, type.FunctionType)
       
    61 #      return repr(fn)
       
    62 
       
    63 copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
       
    64 # copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
       
    65 
       
    66 BUFSIZE = 8*1024
       
    67 LOCALHOST = '127.0.0.1'
       
    68 
       
    69 class RPCServer(SocketServer.TCPServer):
       
    70 
       
    71     def __init__(self, addr, handlerclass=None):
       
    72         if handlerclass is None:
       
    73             handlerclass = RPCHandler
       
    74         SocketServer.TCPServer.__init__(self, addr, handlerclass)
       
    75 
       
    76     def server_bind(self):
       
    77         "Override TCPServer method, no bind() phase for connecting entity"
       
    78         pass
       
    79 
       
    80     def server_activate(self):
       
    81         """Override TCPServer method, connect() instead of listen()
       
    82 
       
    83         Due to the reversed connection, self.server_address is actually the
       
    84         address of the Idle Client to which we are connecting.
       
    85 
       
    86         """
       
    87         self.socket.connect(self.server_address)
       
    88 
       
    89     def get_request(self):
       
    90         "Override TCPServer method, return already connected socket"
       
    91         return self.socket, self.server_address
       
    92 
       
    93     def handle_error(self, request, client_address):
       
    94         """Override TCPServer method
       
    95 
       
    96         Error message goes to __stderr__.  No error message if exiting
       
    97         normally or socket raised EOF.  Other exceptions not handled in
       
    98         server code will cause os._exit.
       
    99 
       
   100         """
       
   101         try:
       
   102             raise
       
   103         except SystemExit:
       
   104             raise
       
   105         except:
       
   106             erf = sys.__stderr__
       
   107             print>>erf, '\n' + '-'*40
       
   108             print>>erf, 'Unhandled server exception!'
       
   109             print>>erf, 'Thread: %s' % threading.currentThread().getName()
       
   110             print>>erf, 'Client Address: ', client_address
       
   111             print>>erf, 'Request: ', repr(request)
       
   112             traceback.print_exc(file=erf)
       
   113             print>>erf, '\n*** Unrecoverable, server exiting!'
       
   114             print>>erf, '-'*40
       
   115             os._exit(0)
       
   116 
       
   117 #----------------- end class RPCServer --------------------
       
   118 
       
   119 objecttable = {}
       
   120 request_queue = Queue.Queue(0)
       
   121 response_queue = Queue.Queue(0)
       
   122 
       
   123 
       
   124 class SocketIO(object):
       
   125 
       
   126     nextseq = 0
       
   127 
       
   128     def __init__(self, sock, objtable=None, debugging=None):
       
   129         self.sockthread = threading.currentThread()
       
   130         if debugging is not None:
       
   131             self.debugging = debugging
       
   132         self.sock = sock
       
   133         if objtable is None:
       
   134             objtable = objecttable
       
   135         self.objtable = objtable
       
   136         self.responses = {}
       
   137         self.cvars = {}
       
   138 
       
   139     def close(self):
       
   140         sock = self.sock
       
   141         self.sock = None
       
   142         if sock is not None:
       
   143             sock.close()
       
   144 
       
   145     def exithook(self):
       
   146         "override for specific exit action"
       
   147         os._exit()
       
   148 
       
   149     def debug(self, *args):
       
   150         if not self.debugging:
       
   151             return
       
   152         s = self.location + " " + str(threading.currentThread().getName())
       
   153         for a in args:
       
   154             s = s + " " + str(a)
       
   155         print>>sys.__stderr__, s
       
   156 
       
   157     def register(self, oid, object):
       
   158         self.objtable[oid] = object
       
   159 
       
   160     def unregister(self, oid):
       
   161         try:
       
   162             del self.objtable[oid]
       
   163         except KeyError:
       
   164             pass
       
   165 
       
   166     def localcall(self, seq, request):
       
   167         self.debug("localcall:", request)
       
   168         try:
       
   169             how, (oid, methodname, args, kwargs) = request
       
   170         except TypeError:
       
   171             return ("ERROR", "Bad request format")
       
   172         if not self.objtable.has_key(oid):
       
   173             return ("ERROR", "Unknown object id: %r" % (oid,))
       
   174         obj = self.objtable[oid]
       
   175         if methodname == "__methods__":
       
   176             methods = {}
       
   177             _getmethods(obj, methods)
       
   178             return ("OK", methods)
       
   179         if methodname == "__attributes__":
       
   180             attributes = {}
       
   181             _getattributes(obj, attributes)
       
   182             return ("OK", attributes)
       
   183         if not hasattr(obj, methodname):
       
   184             return ("ERROR", "Unsupported method name: %r" % (methodname,))
       
   185         method = getattr(obj, methodname)
       
   186         try:
       
   187             if how == 'CALL':
       
   188                 ret = method(*args, **kwargs)
       
   189                 if isinstance(ret, RemoteObject):
       
   190                     ret = remoteref(ret)
       
   191                 return ("OK", ret)
       
   192             elif how == 'QUEUE':
       
   193                 request_queue.put((seq, (method, args, kwargs)))
       
   194                 return("QUEUED", None)
       
   195             else:
       
   196                 return ("ERROR", "Unsupported message type: %s" % how)
       
   197         except SystemExit:
       
   198             raise
       
   199         except socket.error:
       
   200             raise
       
   201         except:
       
   202             msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\
       
   203                   " Object: %s \n Method: %s \n Args: %s\n"
       
   204             print>>sys.__stderr__, msg % (oid, method, args)
       
   205             traceback.print_exc(file=sys.__stderr__)
       
   206             return ("EXCEPTION", None)
       
   207 
       
   208     def remotecall(self, oid, methodname, args, kwargs):
       
   209         self.debug("remotecall:asynccall: ", oid, methodname)
       
   210         seq = self.asynccall(oid, methodname, args, kwargs)
       
   211         return self.asyncreturn(seq)
       
   212 
       
   213     def remotequeue(self, oid, methodname, args, kwargs):
       
   214         self.debug("remotequeue:asyncqueue: ", oid, methodname)
       
   215         seq = self.asyncqueue(oid, methodname, args, kwargs)
       
   216         return self.asyncreturn(seq)
       
   217 
       
   218     def asynccall(self, oid, methodname, args, kwargs):
       
   219         request = ("CALL", (oid, methodname, args, kwargs))
       
   220         seq = self.newseq()
       
   221         if threading.currentThread() != self.sockthread:
       
   222             cvar = threading.Condition()
       
   223             self.cvars[seq] = cvar
       
   224         self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
       
   225         self.putmessage((seq, request))
       
   226         return seq
       
   227 
       
   228     def asyncqueue(self, oid, methodname, args, kwargs):
       
   229         request = ("QUEUE", (oid, methodname, args, kwargs))
       
   230         seq = self.newseq()
       
   231         if threading.currentThread() != self.sockthread:
       
   232             cvar = threading.Condition()
       
   233             self.cvars[seq] = cvar
       
   234         self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
       
   235         self.putmessage((seq, request))
       
   236         return seq
       
   237 
       
   238     def asyncreturn(self, seq):
       
   239         self.debug("asyncreturn:%d:call getresponse(): " % seq)
       
   240         response = self.getresponse(seq, wait=0.05)
       
   241         self.debug(("asyncreturn:%d:response: " % seq), response)
       
   242         return self.decoderesponse(response)
       
   243 
       
   244     def decoderesponse(self, response):
       
   245         how, what = response
       
   246         if how == "OK":
       
   247             return what
       
   248         if how == "QUEUED":
       
   249             return None
       
   250         if how == "EXCEPTION":
       
   251             self.debug("decoderesponse: EXCEPTION")
       
   252             return None
       
   253         if how == "EOF":
       
   254             self.debug("decoderesponse: EOF")
       
   255             self.decode_interrupthook()
       
   256             return None
       
   257         if how == "ERROR":
       
   258             self.debug("decoderesponse: Internal ERROR:", what)
       
   259             raise RuntimeError, what
       
   260         raise SystemError, (how, what)
       
   261 
       
   262     def decode_interrupthook(self):
       
   263         ""
       
   264         raise EOFError
       
   265 
       
   266     def mainloop(self):
       
   267         """Listen on socket until I/O not ready or EOF
       
   268 
       
   269         pollresponse() will loop looking for seq number None, which
       
   270         never comes, and exit on EOFError.
       
   271 
       
   272         """
       
   273         try:
       
   274             self.getresponse(myseq=None, wait=0.05)
       
   275         except EOFError:
       
   276             self.debug("mainloop:return")
       
   277             return
       
   278 
       
   279     def getresponse(self, myseq, wait):
       
   280         response = self._getresponse(myseq, wait)
       
   281         if response is not None:
       
   282             how, what = response
       
   283             if how == "OK":
       
   284                 response = how, self._proxify(what)
       
   285         return response
       
   286 
       
   287     def _proxify(self, obj):
       
   288         if isinstance(obj, RemoteProxy):
       
   289             return RPCProxy(self, obj.oid)
       
   290         if isinstance(obj, types.ListType):
       
   291             return map(self._proxify, obj)
       
   292         # XXX Check for other types -- not currently needed
       
   293         return obj
       
   294 
       
   295     def _getresponse(self, myseq, wait):
       
   296         self.debug("_getresponse:myseq:", myseq)
       
   297         if threading.currentThread() is self.sockthread:
       
   298             # this thread does all reading of requests or responses
       
   299             while 1:
       
   300                 response = self.pollresponse(myseq, wait)
       
   301                 if response is not None:
       
   302                     return response
       
   303         else:
       
   304             # wait for notification from socket handling thread
       
   305             cvar = self.cvars[myseq]
       
   306             cvar.acquire()
       
   307             while not self.responses.has_key(myseq):
       
   308                 cvar.wait()
       
   309             response = self.responses[myseq]
       
   310             self.debug("_getresponse:%s: thread woke up: response: %s" %
       
   311                        (myseq, response))
       
   312             del self.responses[myseq]
       
   313             del self.cvars[myseq]
       
   314             cvar.release()
       
   315             return response
       
   316 
       
   317     def newseq(self):
       
   318         self.nextseq = seq = self.nextseq + 2
       
   319         return seq
       
   320 
       
   321     def putmessage(self, message):
       
   322         self.debug("putmessage:%d:" % message[0])
       
   323         try:
       
   324             s = pickle.dumps(message)
       
   325         except pickle.PicklingError:
       
   326             print >>sys.__stderr__, "Cannot pickle:", repr(message)
       
   327             raise
       
   328         s = struct.pack("<i", len(s)) + s
       
   329         while len(s) > 0:
       
   330             try:
       
   331                 r, w, x = select.select([], [self.sock], [])
       
   332                 n = self.sock.send(s[:BUFSIZE])
       
   333             except (AttributeError, TypeError):
       
   334                 raise IOError, "socket no longer exists"
       
   335             except socket.error:
       
   336                 raise
       
   337             else:
       
   338                 s = s[n:]
       
   339 
       
   340     buffer = ""
       
   341     bufneed = 4
       
   342     bufstate = 0 # meaning: 0 => reading count; 1 => reading data
       
   343 
       
   344     def pollpacket(self, wait):
       
   345         self._stage0()
       
   346         if len(self.buffer) < self.bufneed:
       
   347             r, w, x = select.select([self.sock.fileno()], [], [], wait)
       
   348             if len(r) == 0:
       
   349                 return None
       
   350             try:
       
   351                 s = self.sock.recv(BUFSIZE)
       
   352             except socket.error:
       
   353                 raise EOFError
       
   354             if len(s) == 0:
       
   355                 raise EOFError
       
   356             self.buffer += s
       
   357             self._stage0()
       
   358         return self._stage1()
       
   359 
       
   360     def _stage0(self):
       
   361         if self.bufstate == 0 and len(self.buffer) >= 4:
       
   362             s = self.buffer[:4]
       
   363             self.buffer = self.buffer[4:]
       
   364             self.bufneed = struct.unpack("<i", s)[0]
       
   365             self.bufstate = 1
       
   366 
       
   367     def _stage1(self):
       
   368         if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
       
   369             packet = self.buffer[:self.bufneed]
       
   370             self.buffer = self.buffer[self.bufneed:]
       
   371             self.bufneed = 4
       
   372             self.bufstate = 0
       
   373             return packet
       
   374 
       
   375     def pollmessage(self, wait):
       
   376         packet = self.pollpacket(wait)
       
   377         if packet is None:
       
   378             return None
       
   379         try:
       
   380             message = pickle.loads(packet)
       
   381         except pickle.UnpicklingError:
       
   382             print >>sys.__stderr__, "-----------------------"
       
   383             print >>sys.__stderr__, "cannot unpickle packet:", repr(packet)
       
   384             traceback.print_stack(file=sys.__stderr__)
       
   385             print >>sys.__stderr__, "-----------------------"
       
   386             raise
       
   387         return message
       
   388 
       
   389     def pollresponse(self, myseq, wait):
       
   390         """Handle messages received on the socket.
       
   391 
       
   392         Some messages received may be asynchronous 'call' or 'queue' requests,
       
   393         and some may be responses for other threads.
       
   394 
       
   395         'call' requests are passed to self.localcall() with the expectation of
       
   396         immediate execution, during which time the socket is not serviced.
       
   397 
       
   398         'queue' requests are used for tasks (which may block or hang) to be
       
   399         processed in a different thread.  These requests are fed into
       
   400         request_queue by self.localcall().  Responses to queued requests are
       
   401         taken from response_queue and sent across the link with the associated
       
   402         sequence numbers.  Messages in the queues are (sequence_number,
       
   403         request/response) tuples and code using this module removing messages
       
   404         from the request_queue is responsible for returning the correct
       
   405         sequence number in the response_queue.
       
   406 
       
   407         pollresponse() will loop until a response message with the myseq
       
   408         sequence number is received, and will save other responses in
       
   409         self.responses and notify the owning thread.
       
   410 
       
   411         """
       
   412         while 1:
       
   413             # send queued response if there is one available
       
   414             try:
       
   415                 qmsg = response_queue.get(0)
       
   416             except Queue.Empty:
       
   417                 pass
       
   418             else:
       
   419                 seq, response = qmsg
       
   420                 message = (seq, ('OK', response))
       
   421                 self.putmessage(message)
       
   422             # poll for message on link
       
   423             try:
       
   424                 message = self.pollmessage(wait)
       
   425                 if message is None:  # socket not ready
       
   426                     return None
       
   427             except EOFError:
       
   428                 self.handle_EOF()
       
   429                 return None
       
   430             except AttributeError:
       
   431                 return None
       
   432             seq, resq = message
       
   433             how = resq[0]
       
   434             self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
       
   435             # process or queue a request
       
   436             if how in ("CALL", "QUEUE"):
       
   437                 self.debug("pollresponse:%d:localcall:call:" % seq)
       
   438                 response = self.localcall(seq, resq)
       
   439                 self.debug("pollresponse:%d:localcall:response:%s"
       
   440                            % (seq, response))
       
   441                 if how == "CALL":
       
   442                     self.putmessage((seq, response))
       
   443                 elif how == "QUEUE":
       
   444                     # don't acknowledge the 'queue' request!
       
   445                     pass
       
   446                 continue
       
   447             # return if completed message transaction
       
   448             elif seq == myseq:
       
   449                 return resq
       
   450             # must be a response for a different thread:
       
   451             else:
       
   452                 cv = self.cvars.get(seq, None)
       
   453                 # response involving unknown sequence number is discarded,
       
   454                 # probably intended for prior incarnation of server
       
   455                 if cv is not None:
       
   456                     cv.acquire()
       
   457                     self.responses[seq] = resq
       
   458                     cv.notify()
       
   459                     cv.release()
       
   460                 continue
       
   461 
       
   462     def handle_EOF(self):
       
   463         "action taken upon link being closed by peer"
       
   464         self.EOFhook()
       
   465         self.debug("handle_EOF")
       
   466         for key in self.cvars:
       
   467             cv = self.cvars[key]
       
   468             cv.acquire()
       
   469             self.responses[key] = ('EOF', None)
       
   470             cv.notify()
       
   471             cv.release()
       
   472         # call our (possibly overridden) exit function
       
   473         self.exithook()
       
   474 
       
   475     def EOFhook(self):
       
   476         "Classes using rpc client/server can override to augment EOF action"
       
   477         pass
       
   478 
       
   479 #----------------- end class SocketIO --------------------
       
   480 
       
   481 class RemoteObject(object):
       
   482     # Token mix-in class
       
   483     pass
       
   484 
       
   485 def remoteref(obj):
       
   486     oid = id(obj)
       
   487     objecttable[oid] = obj
       
   488     return RemoteProxy(oid)
       
   489 
       
   490 class RemoteProxy(object):
       
   491 
       
   492     def __init__(self, oid):
       
   493         self.oid = oid
       
   494 
       
   495 class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
       
   496 
       
   497     debugging = False
       
   498     location = "#S"  # Server
       
   499 
       
   500     def __init__(self, sock, addr, svr):
       
   501         svr.current_handler = self ## cgt xxx
       
   502         SocketIO.__init__(self, sock)
       
   503         SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
       
   504 
       
   505     def handle(self):
       
   506         "handle() method required by SocketServer"
       
   507         self.mainloop()
       
   508 
       
   509     def get_remote_proxy(self, oid):
       
   510         return RPCProxy(self, oid)
       
   511 
       
   512 class RPCClient(SocketIO):
       
   513 
       
   514     debugging = False
       
   515     location = "#C"  # Client
       
   516 
       
   517     nextseq = 1 # Requests coming from the client are odd numbered
       
   518 
       
   519     def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
       
   520         self.listening_sock = socket.socket(family, type)
       
   521         self.listening_sock.setsockopt(socket.SOL_SOCKET,
       
   522                                        socket.SO_REUSEADDR, 1)
       
   523         self.listening_sock.bind(address)
       
   524         self.listening_sock.listen(1)
       
   525 
       
   526     def accept(self):
       
   527         working_sock, address = self.listening_sock.accept()
       
   528         if self.debugging:
       
   529             print>>sys.__stderr__, "****** Connection request from ", address
       
   530         if address[0] == LOCALHOST:
       
   531             SocketIO.__init__(self, working_sock)
       
   532         else:
       
   533             print>>sys.__stderr__, "** Invalid host: ", address
       
   534             raise socket.error
       
   535 
       
   536     def get_remote_proxy(self, oid):
       
   537         return RPCProxy(self, oid)
       
   538 
       
   539 class RPCProxy(object):
       
   540 
       
   541     __methods = None
       
   542     __attributes = None
       
   543 
       
   544     def __init__(self, sockio, oid):
       
   545         self.sockio = sockio
       
   546         self.oid = oid
       
   547 
       
   548     def __getattr__(self, name):
       
   549         if self.__methods is None:
       
   550             self.__getmethods()
       
   551         if self.__methods.get(name):
       
   552             return MethodProxy(self.sockio, self.oid, name)
       
   553         if self.__attributes is None:
       
   554             self.__getattributes()
       
   555         if self.__attributes.has_key(name):
       
   556             value = self.sockio.remotecall(self.oid, '__getattribute__',
       
   557                                            (name,), {})
       
   558             return value
       
   559         else:
       
   560             raise AttributeError, name
       
   561 
       
   562     def __getattributes(self):
       
   563         self.__attributes = self.sockio.remotecall(self.oid,
       
   564                                                 "__attributes__", (), {})
       
   565 
       
   566     def __getmethods(self):
       
   567         self.__methods = self.sockio.remotecall(self.oid,
       
   568                                                 "__methods__", (), {})
       
   569 
       
   570 def _getmethods(obj, methods):
       
   571     # Helper to get a list of methods from an object
       
   572     # Adds names to dictionary argument 'methods'
       
   573     for name in dir(obj):
       
   574         attr = getattr(obj, name)
       
   575         if callable(attr):
       
   576             methods[name] = 1
       
   577     if type(obj) == types.InstanceType:
       
   578         _getmethods(obj.__class__, methods)
       
   579     if type(obj) == types.ClassType:
       
   580         for super in obj.__bases__:
       
   581             _getmethods(super, methods)
       
   582 
       
   583 def _getattributes(obj, attributes):
       
   584     for name in dir(obj):
       
   585         attr = getattr(obj, name)
       
   586         if not callable(attr):
       
   587             attributes[name] = 1
       
   588 
       
   589 class MethodProxy(object):
       
   590 
       
   591     def __init__(self, sockio, oid, name):
       
   592         self.sockio = sockio
       
   593         self.oid = oid
       
   594         self.name = name
       
   595 
       
   596     def __call__(self, *args, **kwargs):
       
   597         value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
       
   598         return value
       
   599 
       
   600 
       
   601 # XXX KBK 09Sep03  We need a proper unit test for this module.  Previously
       
   602 #                  existing test code was removed at Rev 1.27.