python-2.5.2/win32/Lib/test/test_socketserver.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 # Test suite for SocketServer.py
       
     2 
       
     3 from test import test_support
       
     4 from test.test_support import (verbose, verify, TESTFN, TestSkipped,
       
     5                                reap_children)
       
     6 test_support.requires('network')
       
     7 
       
     8 from SocketServer import *
       
     9 import socket
       
    10 import errno
       
    11 import select
       
    12 import time
       
    13 import threading
       
    14 import os
       
    15 
       
    16 NREQ = 3
       
    17 DELAY = 0.5
       
    18 
       
    19 class MyMixinHandler:
       
    20     def handle(self):
       
    21         time.sleep(DELAY)
       
    22         line = self.rfile.readline()
       
    23         time.sleep(DELAY)
       
    24         self.wfile.write(line)
       
    25 
       
    26 class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
       
    27     pass
       
    28 
       
    29 class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
       
    30     pass
       
    31 
       
    32 class MyMixinServer:
       
    33     def serve_a_few(self):
       
    34         for i in range(NREQ):
       
    35             self.handle_request()
       
    36     def handle_error(self, request, client_address):
       
    37         self.close_request(request)
       
    38         self.server_close()
       
    39         raise
       
    40 
       
    41 teststring = "hello world\n"
       
    42 
       
    43 def receive(sock, n, timeout=20):
       
    44     r, w, x = select.select([sock], [], [], timeout)
       
    45     if sock in r:
       
    46         return sock.recv(n)
       
    47     else:
       
    48         raise RuntimeError, "timed out on %r" % (sock,)
       
    49 
       
    50 def testdgram(proto, addr):
       
    51     s = socket.socket(proto, socket.SOCK_DGRAM)
       
    52     s.sendto(teststring, addr)
       
    53     buf = data = receive(s, 100)
       
    54     while data and '\n' not in buf:
       
    55         data = receive(s, 100)
       
    56         buf += data
       
    57     verify(buf == teststring)
       
    58     s.close()
       
    59 
       
    60 def teststream(proto, addr):
       
    61     s = socket.socket(proto, socket.SOCK_STREAM)
       
    62     s.connect(addr)
       
    63     s.sendall(teststring)
       
    64     buf = data = receive(s, 100)
       
    65     while data and '\n' not in buf:
       
    66         data = receive(s, 100)
       
    67         buf += data
       
    68     verify(buf == teststring)
       
    69     s.close()
       
    70 
       
    71 class ServerThread(threading.Thread):
       
    72     def __init__(self, addr, svrcls, hdlrcls):
       
    73         threading.Thread.__init__(self)
       
    74         self.__addr = addr
       
    75         self.__svrcls = svrcls
       
    76         self.__hdlrcls = hdlrcls
       
    77     def run(self):
       
    78         class svrcls(MyMixinServer, self.__svrcls):
       
    79             pass
       
    80         if verbose: print "thread: creating server"
       
    81         svr = svrcls(self.__addr, self.__hdlrcls)
       
    82         # pull the address out of the server in case it changed
       
    83         # this can happen if another process is using the port
       
    84         addr = svr.server_address
       
    85         if addr:
       
    86             self.__addr = addr
       
    87             if self.__addr != svr.socket.getsockname():
       
    88                 raise RuntimeError('server_address was %s, expected %s' %
       
    89                                        (self.__addr, svr.socket.getsockname()))
       
    90         if verbose: print "thread: serving three times"
       
    91         svr.serve_a_few()
       
    92         if verbose: print "thread: done"
       
    93 
       
    94 seed = 0
       
    95 def pickport():
       
    96     global seed
       
    97     seed += 1
       
    98     return 10000 + (os.getpid() % 1000)*10 + seed
       
    99 
       
   100 host = "localhost"
       
   101 testfiles = []
       
   102 def pickaddr(proto):
       
   103     if proto == socket.AF_INET:
       
   104         return (host, pickport())
       
   105     else:
       
   106         fn = TESTFN + str(pickport())
       
   107         if os.name == 'os2':
       
   108             # AF_UNIX socket names on OS/2 require a specific prefix
       
   109             # which can't include a drive letter and must also use
       
   110             # backslashes as directory separators
       
   111             if fn[1] == ':':
       
   112                 fn = fn[2:]
       
   113             if fn[0] in (os.sep, os.altsep):
       
   114                 fn = fn[1:]
       
   115             fn = os.path.join('\socket', fn)
       
   116             if os.sep == '/':
       
   117                 fn = fn.replace(os.sep, os.altsep)
       
   118             else:
       
   119                 fn = fn.replace(os.altsep, os.sep)
       
   120         testfiles.append(fn)
       
   121         return fn
       
   122 
       
   123 def cleanup():
       
   124     for fn in testfiles:
       
   125         try:
       
   126             os.remove(fn)
       
   127         except os.error:
       
   128             pass
       
   129     testfiles[:] = []
       
   130 
       
   131 def testloop(proto, servers, hdlrcls, testfunc):
       
   132     for svrcls in servers:
       
   133         addr = pickaddr(proto)
       
   134         if verbose:
       
   135             print "ADDR =", addr
       
   136             print "CLASS =", svrcls
       
   137         t = ServerThread(addr, svrcls, hdlrcls)
       
   138         if verbose: print "server created"
       
   139         t.start()
       
   140         if verbose: print "server running"
       
   141         for i in range(NREQ):
       
   142             time.sleep(DELAY)
       
   143             if verbose: print "test client", i
       
   144             testfunc(proto, addr)
       
   145         if verbose: print "waiting for server"
       
   146         t.join()
       
   147         if verbose: print "done"
       
   148 
       
   149 class ForgivingTCPServer(TCPServer):
       
   150     # prevent errors if another process is using the port we want
       
   151     def server_bind(self):
       
   152         host, default_port = self.server_address
       
   153         # this code shamelessly stolen from test.test_support
       
   154         # the ports were changed to protect the innocent
       
   155         import sys
       
   156         for port in [default_port, 3434, 8798, 23833]:
       
   157             try:
       
   158                 self.server_address = host, port
       
   159                 TCPServer.server_bind(self)
       
   160                 break
       
   161             except socket.error, (err, msg):
       
   162                 if err != errno.EADDRINUSE:
       
   163                     raise
       
   164                 print >>sys.__stderr__, \
       
   165                     '  WARNING: failed to listen on port %d, trying another' % port
       
   166 
       
   167 tcpservers = [ForgivingTCPServer, ThreadingTCPServer]
       
   168 if hasattr(os, 'fork') and os.name not in ('os2',):
       
   169     tcpservers.append(ForkingTCPServer)
       
   170 udpservers = [UDPServer, ThreadingUDPServer]
       
   171 if hasattr(os, 'fork') and os.name not in ('os2',):
       
   172     udpservers.append(ForkingUDPServer)
       
   173 
       
   174 if not hasattr(socket, 'AF_UNIX'):
       
   175     streamservers = []
       
   176     dgramservers = []
       
   177 else:
       
   178     class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
       
   179     streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
       
   180     if hasattr(os, 'fork') and os.name not in ('os2',):
       
   181         streamservers.append(ForkingUnixStreamServer)
       
   182     class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
       
   183     dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
       
   184     if hasattr(os, 'fork') and os.name not in ('os2',):
       
   185         dgramservers.append(ForkingUnixDatagramServer)
       
   186 
       
   187 def sloppy_cleanup():
       
   188     # See http://python.org/sf/1540386
       
   189     # We need to reap children here otherwise a child from one server
       
   190     # can be left running for the next server and cause a test failure.
       
   191     time.sleep(DELAY)
       
   192     reap_children()
       
   193 
       
   194 def testall():
       
   195     testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
       
   196     sloppy_cleanup()
       
   197     testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
       
   198     if hasattr(socket, 'AF_UNIX'):
       
   199         sloppy_cleanup()
       
   200         testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
       
   201         # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
       
   202         # client address so this cannot work:
       
   203         ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
       
   204 
       
   205 def test_main():
       
   206     import imp
       
   207     if imp.lock_held():
       
   208         # If the import lock is held, the threads will hang.
       
   209         raise TestSkipped("can't run when import lock is held")
       
   210 
       
   211     try:
       
   212         testall()
       
   213     finally:
       
   214         cleanup()
       
   215     reap_children()
       
   216 
       
   217 if __name__ == "__main__":
       
   218     test_main()