symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_socketserver.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """
       
     2 Test suite for SocketServer.py.
       
     3 """
       
     4 
       
     5 import contextlib
       
     6 import errno
       
     7 import imp
       
     8 import os
       
     9 import select
       
    10 import signal
       
    11 import socket
       
    12 import tempfile
       
    13 import threading
       
    14 import time
       
    15 import unittest
       
    16 import SocketServer
       
    17 
       
    18 import test.test_support
       
    19 from test.test_support import reap_children, verbose, TestSkipped
       
    20 from test.test_support import TESTFN as TEST_FILE
       
    21 
       
    22 test.test_support.requires("network")
       
    23 
       
    24 TEST_STR = "hello world\n"
       
    25 HOST = test.test_support.HOST
       
    26 
       
    27 HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
       
    28 HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
       
    29 
       
    30 def signal_alarm(n):
       
    31     """Call signal.alarm when it exists (i.e. not on Windows)."""
       
    32     if hasattr(signal, 'alarm'):
       
    33         signal.alarm(n)
       
    34 
       
    35 def receive(sock, n, timeout=20):
       
    36     r, w, x = select.select([sock], [], [], timeout)
       
    37     if sock in r:
       
    38         return sock.recv(n)
       
    39     else:
       
    40         raise RuntimeError, "timed out on %r" % (sock,)
       
    41 
       
    42 if HAVE_UNIX_SOCKETS:
       
    43     class ForkingUnixStreamServer(SocketServer.ForkingMixIn,
       
    44                                   SocketServer.UnixStreamServer):
       
    45         pass
       
    46 
       
    47     class ForkingUnixDatagramServer(SocketServer.ForkingMixIn,
       
    48                                     SocketServer.UnixDatagramServer):
       
    49         pass
       
    50 
       
    51 
       
    52 @contextlib.contextmanager
       
    53 def simple_subprocess(testcase):
       
    54     pid = os.fork()
       
    55     if pid == 0:
       
    56         # Don't throw an exception; it would be caught by the test harness.
       
    57         os._exit(72)
       
    58     yield None
       
    59     pid2, status = os.waitpid(pid, 0)
       
    60     testcase.assertEquals(pid2, pid)
       
    61     testcase.assertEquals(72 << 8, status)
       
    62 
       
    63 
       
    64 class SocketServerTest(unittest.TestCase):
       
    65     """Test all socket servers."""
       
    66 
       
    67     def setUp(self):
       
    68         signal_alarm(20)  # Kill deadlocks after 20 seconds.
       
    69         self.port_seed = 0
       
    70         self.test_files = []
       
    71 
       
    72     def tearDown(self):
       
    73         signal_alarm(0)  # Didn't deadlock.
       
    74         reap_children()
       
    75 
       
    76         for fn in self.test_files:
       
    77             try:
       
    78                 os.remove(fn)
       
    79             except os.error:
       
    80                 pass
       
    81         self.test_files[:] = []
       
    82 
       
    83     def pickaddr(self, proto):
       
    84         if proto == socket.AF_INET:
       
    85             return (HOST, 0)
       
    86         else:
       
    87             # XXX: We need a way to tell AF_UNIX to pick its own name
       
    88             # like AF_INET provides port==0.
       
    89             dir = None
       
    90             if os.name == 'os2':
       
    91                 dir = '\socket'
       
    92             fn = tempfile.mktemp(prefix='unix_socket.', dir=dir)
       
    93             if os.name == 'os2':
       
    94                 # AF_UNIX socket names on OS/2 require a specific prefix
       
    95                 # which can't include a drive letter and must also use
       
    96                 # backslashes as directory separators
       
    97                 if fn[1] == ':':
       
    98                     fn = fn[2:]
       
    99                 if fn[0] in (os.sep, os.altsep):
       
   100                     fn = fn[1:]
       
   101                 if os.sep == '/':
       
   102                     fn = fn.replace(os.sep, os.altsep)
       
   103                 else:
       
   104                     fn = fn.replace(os.altsep, os.sep)
       
   105             self.test_files.append(fn)
       
   106             return fn
       
   107 
       
   108     def make_server(self, addr, svrcls, hdlrbase):
       
   109         class MyServer(svrcls):
       
   110             def handle_error(self, request, client_address):
       
   111                 self.close_request(request)
       
   112                 self.server_close()
       
   113                 raise
       
   114 
       
   115         class MyHandler(hdlrbase):
       
   116             def handle(self):
       
   117                 line = self.rfile.readline()
       
   118                 self.wfile.write(line)
       
   119 
       
   120         if verbose: print "creating server"
       
   121         server = MyServer(addr, MyHandler)
       
   122         self.assertEquals(server.server_address, server.socket.getsockname())
       
   123         return server
       
   124 
       
   125     def run_server(self, svrcls, hdlrbase, testfunc):
       
   126         server = self.make_server(self.pickaddr(svrcls.address_family),
       
   127                                   svrcls, hdlrbase)
       
   128         # We had the OS pick a port, so pull the real address out of
       
   129         # the server.
       
   130         addr = server.server_address
       
   131         if verbose:
       
   132             print "server created"
       
   133             print "ADDR =", addr
       
   134             print "CLASS =", svrcls
       
   135         t = threading.Thread(
       
   136             name='%s serving' % svrcls,
       
   137             target=server.serve_forever,
       
   138             # Short poll interval to make the test finish quickly.
       
   139             # Time between requests is short enough that we won't wake
       
   140             # up spuriously too many times.
       
   141             kwargs={'poll_interval':0.01})
       
   142         t.daemon = True  # In case this function raises.
       
   143         t.start()
       
   144         if verbose: print "server running"
       
   145         for i in range(3):
       
   146             if verbose: print "test client", i
       
   147             testfunc(svrcls.address_family, addr)
       
   148         if verbose: print "waiting for server"
       
   149         server.shutdown()
       
   150         t.join()
       
   151         if verbose: print "done"
       
   152 
       
   153     def stream_examine(self, proto, addr):
       
   154         s = socket.socket(proto, socket.SOCK_STREAM)
       
   155         s.connect(addr)
       
   156         s.sendall(TEST_STR)
       
   157         buf = data = receive(s, 100)
       
   158         while data and '\n' not in buf:
       
   159             data = receive(s, 100)
       
   160             buf += data
       
   161         self.assertEquals(buf, TEST_STR)
       
   162         s.close()
       
   163 
       
   164     def dgram_examine(self, proto, addr):
       
   165         s = socket.socket(proto, socket.SOCK_DGRAM)
       
   166         s.sendto(TEST_STR, addr)
       
   167         buf = data = receive(s, 100)
       
   168         while data and '\n' not in buf:
       
   169             data = receive(s, 100)
       
   170             buf += data
       
   171         self.assertEquals(buf, TEST_STR)
       
   172         s.close()
       
   173 
       
   174     def test_TCPServer(self):
       
   175         self.run_server(SocketServer.TCPServer,
       
   176                         SocketServer.StreamRequestHandler,
       
   177                         self.stream_examine)
       
   178 
       
   179     def test_ThreadingTCPServer(self):
       
   180         self.run_server(SocketServer.ThreadingTCPServer,
       
   181                         SocketServer.StreamRequestHandler,
       
   182                         self.stream_examine)
       
   183 
       
   184     if HAVE_FORKING:
       
   185         def test_ForkingTCPServer(self):
       
   186             with simple_subprocess(self):
       
   187                 self.run_server(SocketServer.ForkingTCPServer,
       
   188                                 SocketServer.StreamRequestHandler,
       
   189                                 self.stream_examine)
       
   190 
       
   191     if HAVE_UNIX_SOCKETS:
       
   192         def test_UnixStreamServer(self):
       
   193             self.run_server(SocketServer.UnixStreamServer,
       
   194                             SocketServer.StreamRequestHandler,
       
   195                             self.stream_examine)
       
   196 
       
   197         def test_ThreadingUnixStreamServer(self):
       
   198             self.run_server(SocketServer.ThreadingUnixStreamServer,
       
   199                             SocketServer.StreamRequestHandler,
       
   200                             self.stream_examine)
       
   201 
       
   202         if HAVE_FORKING:
       
   203             def test_ForkingUnixStreamServer(self):
       
   204                 with simple_subprocess(self):
       
   205                     self.run_server(ForkingUnixStreamServer,
       
   206                                     SocketServer.StreamRequestHandler,
       
   207                                     self.stream_examine)
       
   208 
       
   209     def test_UDPServer(self):
       
   210         self.run_server(SocketServer.UDPServer,
       
   211                         SocketServer.DatagramRequestHandler,
       
   212                         self.dgram_examine)
       
   213 
       
   214     def test_ThreadingUDPServer(self):
       
   215         self.run_server(SocketServer.ThreadingUDPServer,
       
   216                         SocketServer.DatagramRequestHandler,
       
   217                         self.dgram_examine)
       
   218 
       
   219     if HAVE_FORKING:
       
   220         def test_ForkingUDPServer(self):
       
   221             with simple_subprocess(self):
       
   222                 self.run_server(SocketServer.ForkingUDPServer,
       
   223                                 SocketServer.DatagramRequestHandler,
       
   224                                 self.dgram_examine)
       
   225 
       
   226     # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
       
   227     # client address so this cannot work:
       
   228 
       
   229     # if HAVE_UNIX_SOCKETS:
       
   230     #     def test_UnixDatagramServer(self):
       
   231     #         self.run_server(SocketServer.UnixDatagramServer,
       
   232     #                         SocketServer.DatagramRequestHandler,
       
   233     #                         self.dgram_examine)
       
   234     #
       
   235     #     def test_ThreadingUnixDatagramServer(self):
       
   236     #         self.run_server(SocketServer.ThreadingUnixDatagramServer,
       
   237     #                         SocketServer.DatagramRequestHandler,
       
   238     #                         self.dgram_examine)
       
   239     #
       
   240     #     if HAVE_FORKING:
       
   241     #         def test_ForkingUnixDatagramServer(self):
       
   242     #             self.run_server(SocketServer.ForkingUnixDatagramServer,
       
   243     #                             SocketServer.DatagramRequestHandler,
       
   244     #                             self.dgram_examine)
       
   245 
       
   246 
       
   247 def test_main():
       
   248     if imp.lock_held():
       
   249         # If the import lock is held, the threads will hang
       
   250         raise TestSkipped("can't run when import lock is held")
       
   251 
       
   252     test.test_support.run_unittest(SocketServerTest)
       
   253 
       
   254 if __name__ == "__main__":
       
   255     test_main()
       
   256     signal_alarm(3)  # Shutdown shouldn't take more than 3 seconds.