symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_socket.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #!/usr/bin/env python
       
     2 
       
     3 import unittest
       
     4 from test import test_support
       
     5 
       
     6 import errno
       
     7 import socket
       
     8 import select
       
     9 import thread, threading
       
    10 import time
       
    11 import traceback
       
    12 import Queue
       
    13 import sys
       
    14 import os
       
    15 import array
       
    16 from weakref import proxy
       
    17 import signal
       
    18 
       
    19 HOST = test_support.HOST
       
    20 MSG = 'Michael Gilfix was here\n'
       
    21 
       
    22 class SocketTCPTest(unittest.TestCase):
       
    23 
       
    24     def setUp(self):
       
    25         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
    26         self.port = test_support.bind_port(self.serv)
       
    27         self.serv.listen(1)
       
    28 
       
    29     def tearDown(self):
       
    30         self.serv.close()
       
    31         self.serv = None
       
    32 
       
    33 class SocketUDPTest(unittest.TestCase):
       
    34 
       
    35     def setUp(self):
       
    36         self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
       
    37         self.port = test_support.bind_port(self.serv)
       
    38 
       
    39     def tearDown(self):
       
    40         self.serv.close()
       
    41         self.serv = None
       
    42 
       
    43 class ThreadableTest:
       
    44     """Threadable Test class
       
    45 
       
    46     The ThreadableTest class makes it easy to create a threaded
       
    47     client/server pair from an existing unit test. To create a
       
    48     new threaded class from an existing unit test, use multiple
       
    49     inheritance:
       
    50 
       
    51         class NewClass (OldClass, ThreadableTest):
       
    52             pass
       
    53 
       
    54     This class defines two new fixture functions with obvious
       
    55     purposes for overriding:
       
    56 
       
    57         clientSetUp ()
       
    58         clientTearDown ()
       
    59 
       
    60     Any new test functions within the class must then define
       
    61     tests in pairs, where the test name is preceeded with a
       
    62     '_' to indicate the client portion of the test. Ex:
       
    63 
       
    64         def testFoo(self):
       
    65             # Server portion
       
    66 
       
    67         def _testFoo(self):
       
    68             # Client portion
       
    69 
       
    70     Any exceptions raised by the clients during their tests
       
    71     are caught and transferred to the main thread to alert
       
    72     the testing framework.
       
    73 
       
    74     Note, the server setup function cannot call any blocking
       
    75     functions that rely on the client thread during setup,
       
    76     unless serverExplicitReady() is called just before
       
    77     the blocking call (such as in setting up a client/server
       
    78     connection and performing the accept() in setUp().
       
    79     """
       
    80 
       
    81     def __init__(self):
       
    82         # Swap the true setup function
       
    83         self.__setUp = self.setUp
       
    84         self.__tearDown = self.tearDown
       
    85         self.setUp = self._setUp
       
    86         self.tearDown = self._tearDown
       
    87 
       
    88     def serverExplicitReady(self):
       
    89         """This method allows the server to explicitly indicate that
       
    90         it wants the client thread to proceed. This is useful if the
       
    91         server is about to execute a blocking routine that is
       
    92         dependent upon the client thread during its setup routine."""
       
    93         self.server_ready.set()
       
    94 
       
    95     def _setUp(self):
       
    96         self.server_ready = threading.Event()
       
    97         self.client_ready = threading.Event()
       
    98         self.done = threading.Event()
       
    99         self.queue = Queue.Queue(1)
       
   100 
       
   101         # Do some munging to start the client test.
       
   102         methodname = self.id()
       
   103         i = methodname.rfind('.')
       
   104         methodname = methodname[i+1:]
       
   105         test_method = getattr(self, '_' + methodname)
       
   106         self.client_thread = thread.start_new_thread(
       
   107             self.clientRun, (test_method,))
       
   108 
       
   109         self.__setUp()
       
   110         if not self.server_ready.is_set():
       
   111             self.server_ready.set()
       
   112         self.client_ready.wait()
       
   113 
       
   114     def _tearDown(self):
       
   115         self.__tearDown()
       
   116         self.done.wait()
       
   117 
       
   118         if not self.queue.empty():
       
   119             msg = self.queue.get()
       
   120             self.fail(msg)
       
   121 
       
   122     def clientRun(self, test_func):
       
   123         self.server_ready.wait()
       
   124         self.client_ready.set()
       
   125         self.clientSetUp()
       
   126         if not callable(test_func):
       
   127             raise TypeError, "test_func must be a callable function"
       
   128         try:
       
   129             test_func()
       
   130         except Exception, strerror:
       
   131             self.queue.put(strerror)
       
   132         self.clientTearDown()
       
   133 
       
   134     def clientSetUp(self):
       
   135         raise NotImplementedError, "clientSetUp must be implemented."
       
   136 
       
   137     def clientTearDown(self):
       
   138         self.done.set()
       
   139         thread.exit()
       
   140 
       
   141 class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
       
   142 
       
   143     def __init__(self, methodName='runTest'):
       
   144         SocketTCPTest.__init__(self, methodName=methodName)
       
   145         ThreadableTest.__init__(self)
       
   146 
       
   147     def clientSetUp(self):
       
   148         self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   149 
       
   150     def clientTearDown(self):
       
   151         self.cli.close()
       
   152         self.cli = None
       
   153         ThreadableTest.clientTearDown(self)
       
   154 
       
   155 class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
       
   156 
       
   157     def __init__(self, methodName='runTest'):
       
   158         SocketUDPTest.__init__(self, methodName=methodName)
       
   159         ThreadableTest.__init__(self)
       
   160 
       
   161     def clientSetUp(self):
       
   162         self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
       
   163 
       
   164 class SocketConnectedTest(ThreadedTCPSocketTest):
       
   165 
       
   166     def __init__(self, methodName='runTest'):
       
   167         ThreadedTCPSocketTest.__init__(self, methodName=methodName)
       
   168 
       
   169     def setUp(self):
       
   170         ThreadedTCPSocketTest.setUp(self)
       
   171         # Indicate explicitly we're ready for the client thread to
       
   172         # proceed and then perform the blocking call to accept
       
   173         self.serverExplicitReady()
       
   174         conn, addr = self.serv.accept()
       
   175         self.cli_conn = conn
       
   176 
       
   177     def tearDown(self):
       
   178         self.cli_conn.close()
       
   179         self.cli_conn = None
       
   180         ThreadedTCPSocketTest.tearDown(self)
       
   181 
       
   182     def clientSetUp(self):
       
   183         ThreadedTCPSocketTest.clientSetUp(self)
       
   184         self.cli.connect((HOST, self.port))
       
   185         self.serv_conn = self.cli
       
   186 
       
   187     def clientTearDown(self):
       
   188         self.serv_conn.close()
       
   189         self.serv_conn = None
       
   190         ThreadedTCPSocketTest.clientTearDown(self)
       
   191 
       
   192 class SocketPairTest(unittest.TestCase, ThreadableTest):
       
   193 
       
   194     def __init__(self, methodName='runTest'):
       
   195         unittest.TestCase.__init__(self, methodName=methodName)
       
   196         ThreadableTest.__init__(self)
       
   197 
       
   198     def setUp(self):
       
   199         self.serv, self.cli = socket.socketpair()
       
   200 
       
   201     def tearDown(self):
       
   202         self.serv.close()
       
   203         self.serv = None
       
   204 
       
   205     def clientSetUp(self):
       
   206         pass
       
   207 
       
   208     def clientTearDown(self):
       
   209         self.cli.close()
       
   210         self.cli = None
       
   211         ThreadableTest.clientTearDown(self)
       
   212 
       
   213 
       
   214 #######################################################################
       
   215 ## Begin Tests
       
   216 
       
   217 class GeneralModuleTests(unittest.TestCase):
       
   218 
       
   219     def test_weakref(self):
       
   220         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   221         p = proxy(s)
       
   222         self.assertEqual(p.fileno(), s.fileno())
       
   223         s.close()
       
   224         s = None
       
   225         try:
       
   226             p.fileno()
       
   227         except ReferenceError:
       
   228             pass
       
   229         else:
       
   230             self.fail('Socket proxy still exists')
       
   231 
       
   232     def testSocketError(self):
       
   233         # Testing socket module exceptions
       
   234         def raise_error(*args, **kwargs):
       
   235             raise socket.error
       
   236         def raise_herror(*args, **kwargs):
       
   237             raise socket.herror
       
   238         def raise_gaierror(*args, **kwargs):
       
   239             raise socket.gaierror
       
   240         self.failUnlessRaises(socket.error, raise_error,
       
   241                               "Error raising socket exception.")
       
   242         self.failUnlessRaises(socket.error, raise_herror,
       
   243                               "Error raising socket exception.")
       
   244         self.failUnlessRaises(socket.error, raise_gaierror,
       
   245                               "Error raising socket exception.")
       
   246 
       
   247     def testCrucialConstants(self):
       
   248         # Testing for mission critical constants
       
   249         socket.AF_INET
       
   250         socket.SOCK_STREAM
       
   251         socket.SOCK_DGRAM
       
   252         socket.SOCK_RAW
       
   253         socket.SOCK_RDM
       
   254         socket.SOCK_SEQPACKET
       
   255         socket.SOL_SOCKET
       
   256         socket.SO_REUSEADDR
       
   257 
       
   258     def testHostnameRes(self):
       
   259         # Testing hostname resolution mechanisms
       
   260         hostname = socket.gethostname()
       
   261         try:
       
   262             ip = socket.gethostbyname(hostname)
       
   263         except socket.error:
       
   264             # Probably name lookup wasn't set up right; skip this test
       
   265             return
       
   266         self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
       
   267         try:
       
   268             hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
       
   269         except socket.error:
       
   270             # Probably a similar problem as above; skip this test
       
   271             return
       
   272         all_host_names = [hostname, hname] + aliases
       
   273         fqhn = socket.getfqdn(ip)
       
   274         if not fqhn in all_host_names:
       
   275             self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
       
   276 
       
   277     def testRefCountGetNameInfo(self):
       
   278         # Testing reference count for getnameinfo
       
   279         if hasattr(sys, "getrefcount"):
       
   280             try:
       
   281                 # On some versions, this loses a reference
       
   282                 orig = sys.getrefcount(__name__)
       
   283                 socket.getnameinfo(__name__,0)
       
   284             except SystemError:
       
   285                 if sys.getrefcount(__name__) <> orig:
       
   286                     self.fail("socket.getnameinfo loses a reference")
       
   287 
       
   288     def testInterpreterCrash(self):
       
   289         # Making sure getnameinfo doesn't crash the interpreter
       
   290         try:
       
   291             # On some versions, this crashes the interpreter.
       
   292             socket.getnameinfo(('x', 0, 0, 0), 0)
       
   293         except socket.error:
       
   294             pass
       
   295 
       
   296     def testNtoH(self):
       
   297         # This just checks that htons etc. are their own inverse,
       
   298         # when looking at the lower 16 or 32 bits.
       
   299         sizes = {socket.htonl: 32, socket.ntohl: 32,
       
   300                  socket.htons: 16, socket.ntohs: 16}
       
   301         for func, size in sizes.items():
       
   302             mask = (1L<<size) - 1
       
   303             for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
       
   304                 self.assertEqual(i & mask, func(func(i&mask)) & mask)
       
   305 
       
   306             swapped = func(mask)
       
   307             self.assertEqual(swapped & mask, mask)
       
   308             self.assertRaises(OverflowError, func, 1L<<34)
       
   309 
       
   310     def testNtoHErrors(self):
       
   311         good_values = [ 1, 2, 3, 1L, 2L, 3L ]
       
   312         bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
       
   313         for k in good_values:
       
   314             socket.ntohl(k)
       
   315             socket.ntohs(k)
       
   316             socket.htonl(k)
       
   317             socket.htons(k)
       
   318         for k in bad_values:
       
   319             self.assertRaises(OverflowError, socket.ntohl, k)
       
   320             self.assertRaises(OverflowError, socket.ntohs, k)
       
   321             self.assertRaises(OverflowError, socket.htonl, k)
       
   322             self.assertRaises(OverflowError, socket.htons, k)
       
   323 
       
   324     def testGetServBy(self):
       
   325         eq = self.assertEqual
       
   326         # Find one service that exists, then check all the related interfaces.
       
   327         # I've ordered this by protocols that have both a tcp and udp
       
   328         # protocol, at least for modern Linuxes.
       
   329         if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
       
   330                             'freebsd7', 'freebsd8', 'darwin'):
       
   331             # avoid the 'echo' service on this platform, as there is an
       
   332             # assumption breaking non-standard port/protocol entry
       
   333             services = ('daytime', 'qotd', 'domain')
       
   334         else:
       
   335             services = ('echo', 'daytime', 'domain')
       
   336         for service in services:
       
   337             try:
       
   338                 port = socket.getservbyname(service, 'tcp')
       
   339                 break
       
   340             except socket.error:
       
   341                 pass
       
   342         else:
       
   343             raise socket.error
       
   344         # Try same call with optional protocol omitted
       
   345         port2 = socket.getservbyname(service)
       
   346         eq(port, port2)
       
   347         # Try udp, but don't barf it it doesn't exist
       
   348         try:
       
   349             udpport = socket.getservbyname(service, 'udp')
       
   350         except socket.error:
       
   351             udpport = None
       
   352         else:
       
   353             eq(udpport, port)
       
   354         # Now make sure the lookup by port returns the same service name
       
   355         eq(socket.getservbyport(port2), service)
       
   356         eq(socket.getservbyport(port, 'tcp'), service)
       
   357         if udpport is not None:
       
   358             eq(socket.getservbyport(udpport, 'udp'), service)
       
   359 
       
   360     def testDefaultTimeout(self):
       
   361         # Testing default timeout
       
   362         # The default timeout should initially be None
       
   363         self.assertEqual(socket.getdefaulttimeout(), None)
       
   364         s = socket.socket()
       
   365         self.assertEqual(s.gettimeout(), None)
       
   366         s.close()
       
   367 
       
   368         # Set the default timeout to 10, and see if it propagates
       
   369         socket.setdefaulttimeout(10)
       
   370         self.assertEqual(socket.getdefaulttimeout(), 10)
       
   371         s = socket.socket()
       
   372         self.assertEqual(s.gettimeout(), 10)
       
   373         s.close()
       
   374 
       
   375         # Reset the default timeout to None, and see if it propagates
       
   376         socket.setdefaulttimeout(None)
       
   377         self.assertEqual(socket.getdefaulttimeout(), None)
       
   378         s = socket.socket()
       
   379         self.assertEqual(s.gettimeout(), None)
       
   380         s.close()
       
   381 
       
   382         # Check that setting it to an invalid value raises ValueError
       
   383         self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
       
   384 
       
   385         # Check that setting it to an invalid type raises TypeError
       
   386         self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
       
   387 
       
   388     def testIPv4toString(self):
       
   389         if not hasattr(socket, 'inet_pton'):
       
   390             return # No inet_pton() on this platform
       
   391         from socket import inet_aton as f, inet_pton, AF_INET
       
   392         g = lambda a: inet_pton(AF_INET, a)
       
   393 
       
   394         self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
       
   395         self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
       
   396         self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
       
   397         self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
       
   398         self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
       
   399 
       
   400         self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
       
   401         self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
       
   402         self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
       
   403         self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
       
   404 
       
   405     def testIPv6toString(self):
       
   406         if not hasattr(socket, 'inet_pton'):
       
   407             return # No inet_pton() on this platform
       
   408         try:
       
   409             from socket import inet_pton, AF_INET6, has_ipv6
       
   410             if not has_ipv6:
       
   411                 return
       
   412         except ImportError:
       
   413             return
       
   414         f = lambda a: inet_pton(AF_INET6, a)
       
   415 
       
   416         self.assertEquals('\x00' * 16, f('::'))
       
   417         self.assertEquals('\x00' * 16, f('0::0'))
       
   418         self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
       
   419         self.assertEquals(
       
   420             '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
       
   421             f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
       
   422         )
       
   423 
       
   424     def testStringToIPv4(self):
       
   425         if not hasattr(socket, 'inet_ntop'):
       
   426             return # No inet_ntop() on this platform
       
   427         from socket import inet_ntoa as f, inet_ntop, AF_INET
       
   428         g = lambda a: inet_ntop(AF_INET, a)
       
   429 
       
   430         self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
       
   431         self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
       
   432         self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
       
   433         self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
       
   434 
       
   435         self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
       
   436         self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
       
   437         self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
       
   438 
       
   439     def testStringToIPv6(self):
       
   440         if not hasattr(socket, 'inet_ntop'):
       
   441             return # No inet_ntop() on this platform
       
   442         try:
       
   443             from socket import inet_ntop, AF_INET6, has_ipv6
       
   444             if not has_ipv6:
       
   445                 return
       
   446         except ImportError:
       
   447             return
       
   448         f = lambda a: inet_ntop(AF_INET6, a)
       
   449 
       
   450         self.assertEquals('::', f('\x00' * 16))
       
   451         self.assertEquals('::1', f('\x00' * 15 + '\x01'))
       
   452         self.assertEquals(
       
   453             'aef:b01:506:1001:ffff:9997:55:170',
       
   454             f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
       
   455         )
       
   456 
       
   457     # XXX The following don't test module-level functionality...
       
   458 
       
   459     def testSockName(self):
       
   460         # Testing getsockname().  Use a temporary socket to elicit an unused
       
   461         # ephemeral port that we can use later in the test.
       
   462         tempsock = socket.socket()
       
   463         tempsock.bind(("0.0.0.0", 0))
       
   464         (host, port) = tempsock.getsockname()
       
   465         tempsock.close()
       
   466         del tempsock
       
   467 
       
   468         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   469         sock.bind(("0.0.0.0", port))
       
   470         name = sock.getsockname()
       
   471         # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
       
   472         # it reasonable to get the host's addr in addition to 0.0.0.0.
       
   473         # At least for eCos.  This is required for the S/390 to pass.
       
   474         my_ip_addr = socket.gethostbyname(socket.gethostname())
       
   475         self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
       
   476         self.assertEqual(name[1], port)
       
   477 
       
   478     def testGetSockOpt(self):
       
   479         # Testing getsockopt()
       
   480         # We know a socket should start without reuse==0
       
   481         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   482         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
       
   483         self.failIf(reuse != 0, "initial mode is reuse")
       
   484 
       
   485     def testSetSockOpt(self):
       
   486         # Testing setsockopt()
       
   487         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   488         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
       
   489         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
       
   490         self.failIf(reuse == 0, "failed to set reuse mode")
       
   491 
       
   492     def testSendAfterClose(self):
       
   493         # testing send() after close() with timeout
       
   494         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   495         sock.settimeout(1)
       
   496         sock.close()
       
   497         self.assertRaises(socket.error, sock.send, "spam")
       
   498 
       
   499     def testNewAttributes(self):
       
   500         # testing .family, .type and .protocol
       
   501         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
   502         self.assertEqual(sock.family, socket.AF_INET)
       
   503         self.assertEqual(sock.type, socket.SOCK_STREAM)
       
   504         self.assertEqual(sock.proto, 0)
       
   505         sock.close()
       
   506 
       
   507     def test_sock_ioctl(self):
       
   508         if os.name != "nt":
       
   509             return
       
   510         self.assert_(hasattr(socket.socket, 'ioctl'))
       
   511         self.assert_(hasattr(socket, 'SIO_RCVALL'))
       
   512         self.assert_(hasattr(socket, 'RCVALL_ON'))
       
   513         self.assert_(hasattr(socket, 'RCVALL_OFF'))
       
   514 
       
   515 
       
   516 class BasicTCPTest(SocketConnectedTest):
       
   517 
       
   518     def __init__(self, methodName='runTest'):
       
   519         SocketConnectedTest.__init__(self, methodName=methodName)
       
   520 
       
   521     def testRecv(self):
       
   522         # Testing large receive over TCP
       
   523         msg = self.cli_conn.recv(1024)
       
   524         self.assertEqual(msg, MSG)
       
   525 
       
   526     def _testRecv(self):
       
   527         self.serv_conn.send(MSG)
       
   528 
       
   529     def testOverFlowRecv(self):
       
   530         # Testing receive in chunks over TCP
       
   531         seg1 = self.cli_conn.recv(len(MSG) - 3)
       
   532         seg2 = self.cli_conn.recv(1024)
       
   533         msg = seg1 + seg2
       
   534         self.assertEqual(msg, MSG)
       
   535 
       
   536     def _testOverFlowRecv(self):
       
   537         self.serv_conn.send(MSG)
       
   538 
       
   539     def testRecvFrom(self):
       
   540         # Testing large recvfrom() over TCP
       
   541         msg, addr = self.cli_conn.recvfrom(1024)
       
   542         self.assertEqual(msg, MSG)
       
   543 
       
   544     def _testRecvFrom(self):
       
   545         self.serv_conn.send(MSG)
       
   546 
       
   547     def testOverFlowRecvFrom(self):
       
   548         # Testing recvfrom() in chunks over TCP
       
   549         seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
       
   550         seg2, addr = self.cli_conn.recvfrom(1024)
       
   551         msg = seg1 + seg2
       
   552         self.assertEqual(msg, MSG)
       
   553 
       
   554     def _testOverFlowRecvFrom(self):
       
   555         self.serv_conn.send(MSG)
       
   556 
       
   557     def testSendAll(self):
       
   558         # Testing sendall() with a 2048 byte string over TCP
       
   559         msg = ''
       
   560         while 1:
       
   561             read = self.cli_conn.recv(1024)
       
   562             if not read:
       
   563                 break
       
   564             msg += read
       
   565         self.assertEqual(msg, 'f' * 2048)
       
   566 
       
   567     def _testSendAll(self):
       
   568         big_chunk = 'f' * 2048
       
   569         self.serv_conn.sendall(big_chunk)
       
   570 
       
   571     def testFromFd(self):
       
   572         # Testing fromfd()
       
   573         if not hasattr(socket, "fromfd"):
       
   574             return # On Windows, this doesn't exist
       
   575         fd = self.cli_conn.fileno()
       
   576         sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
       
   577         msg = sock.recv(1024)
       
   578         self.assertEqual(msg, MSG)
       
   579 
       
   580     def _testFromFd(self):
       
   581         self.serv_conn.send(MSG)
       
   582 
       
   583     def testShutdown(self):
       
   584         # Testing shutdown()
       
   585         msg = self.cli_conn.recv(1024)
       
   586         self.assertEqual(msg, MSG)
       
   587 
       
   588     def _testShutdown(self):
       
   589         self.serv_conn.send(MSG)
       
   590         self.serv_conn.shutdown(2)
       
   591 
       
   592 class BasicUDPTest(ThreadedUDPSocketTest):
       
   593 
       
   594     def __init__(self, methodName='runTest'):
       
   595         ThreadedUDPSocketTest.__init__(self, methodName=methodName)
       
   596 
       
   597     def testSendtoAndRecv(self):
       
   598         # Testing sendto() and Recv() over UDP
       
   599         msg = self.serv.recv(len(MSG))
       
   600         self.assertEqual(msg, MSG)
       
   601 
       
   602     def _testSendtoAndRecv(self):
       
   603         self.cli.sendto(MSG, 0, (HOST, self.port))
       
   604 
       
   605     def testRecvFrom(self):
       
   606         # Testing recvfrom() over UDP
       
   607         msg, addr = self.serv.recvfrom(len(MSG))
       
   608         self.assertEqual(msg, MSG)
       
   609 
       
   610     def _testRecvFrom(self):
       
   611         self.cli.sendto(MSG, 0, (HOST, self.port))
       
   612 
       
   613     def testRecvFromNegative(self):
       
   614         # Negative lengths passed to recvfrom should give ValueError.
       
   615         self.assertRaises(ValueError, self.serv.recvfrom, -1)
       
   616 
       
   617     def _testRecvFromNegative(self):
       
   618         self.cli.sendto(MSG, 0, (HOST, self.port))
       
   619 
       
   620 class TCPCloserTest(ThreadedTCPSocketTest):
       
   621 
       
   622     def testClose(self):
       
   623         conn, addr = self.serv.accept()
       
   624         conn.close()
       
   625 
       
   626         sd = self.cli
       
   627         read, write, err = select.select([sd], [], [], 1.0)
       
   628         self.assertEqual(read, [sd])
       
   629         self.assertEqual(sd.recv(1), '')
       
   630 
       
   631     def _testClose(self):
       
   632         self.cli.connect((HOST, self.port))
       
   633         time.sleep(1.0)
       
   634 
       
   635 class BasicSocketPairTest(SocketPairTest):
       
   636 
       
   637     def __init__(self, methodName='runTest'):
       
   638         SocketPairTest.__init__(self, methodName=methodName)
       
   639 
       
   640     def testRecv(self):
       
   641         msg = self.serv.recv(1024)
       
   642         self.assertEqual(msg, MSG)
       
   643 
       
   644     def _testRecv(self):
       
   645         self.cli.send(MSG)
       
   646 
       
   647     def testSend(self):
       
   648         self.serv.send(MSG)
       
   649 
       
   650     def _testSend(self):
       
   651         msg = self.cli.recv(1024)
       
   652         self.assertEqual(msg, MSG)
       
   653 
       
   654 class NonBlockingTCPTests(ThreadedTCPSocketTest):
       
   655 
       
   656     def __init__(self, methodName='runTest'):
       
   657         ThreadedTCPSocketTest.__init__(self, methodName=methodName)
       
   658 
       
   659     def testSetBlocking(self):
       
   660         # Testing whether set blocking works
       
   661         self.serv.setblocking(0)
       
   662         start = time.time()
       
   663         try:
       
   664             self.serv.accept()
       
   665         except socket.error:
       
   666             pass
       
   667         end = time.time()
       
   668         self.assert_((end - start) < 1.0, "Error setting non-blocking mode.")
       
   669 
       
   670     def _testSetBlocking(self):
       
   671         pass
       
   672 
       
   673     def testAccept(self):
       
   674         # Testing non-blocking accept
       
   675         self.serv.setblocking(0)
       
   676         try:
       
   677             conn, addr = self.serv.accept()
       
   678         except socket.error:
       
   679             pass
       
   680         else:
       
   681             self.fail("Error trying to do non-blocking accept.")
       
   682         read, write, err = select.select([self.serv], [], [])
       
   683         if self.serv in read:
       
   684             conn, addr = self.serv.accept()
       
   685         else:
       
   686             self.fail("Error trying to do accept after select.")
       
   687 
       
   688     def _testAccept(self):
       
   689         time.sleep(0.1)
       
   690         self.cli.connect((HOST, self.port))
       
   691 
       
   692     def testConnect(self):
       
   693         # Testing non-blocking connect
       
   694         conn, addr = self.serv.accept()
       
   695 
       
   696     def _testConnect(self):
       
   697         self.cli.settimeout(10)
       
   698         self.cli.connect((HOST, self.port))
       
   699 
       
   700     def testRecv(self):
       
   701         # Testing non-blocking recv
       
   702         conn, addr = self.serv.accept()
       
   703         conn.setblocking(0)
       
   704         try:
       
   705             msg = conn.recv(len(MSG))
       
   706         except socket.error:
       
   707             pass
       
   708         else:
       
   709             self.fail("Error trying to do non-blocking recv.")
       
   710         read, write, err = select.select([conn], [], [])
       
   711         if conn in read:
       
   712             msg = conn.recv(len(MSG))
       
   713             self.assertEqual(msg, MSG)
       
   714         else:
       
   715             self.fail("Error during select call to non-blocking socket.")
       
   716 
       
   717     def _testRecv(self):
       
   718         self.cli.connect((HOST, self.port))
       
   719         time.sleep(0.1)
       
   720         self.cli.send(MSG)
       
   721 
       
   722 class FileObjectClassTestCase(SocketConnectedTest):
       
   723 
       
   724     bufsize = -1 # Use default buffer size
       
   725 
       
   726     def __init__(self, methodName='runTest'):
       
   727         SocketConnectedTest.__init__(self, methodName=methodName)
       
   728 
       
   729     def setUp(self):
       
   730         SocketConnectedTest.setUp(self)
       
   731         self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
       
   732 
       
   733     def tearDown(self):
       
   734         self.serv_file.close()
       
   735         self.assert_(self.serv_file.closed)
       
   736         self.serv_file = None
       
   737         SocketConnectedTest.tearDown(self)
       
   738 
       
   739     def clientSetUp(self):
       
   740         SocketConnectedTest.clientSetUp(self)
       
   741         self.cli_file = self.serv_conn.makefile('wb')
       
   742 
       
   743     def clientTearDown(self):
       
   744         self.cli_file.close()
       
   745         self.assert_(self.cli_file.closed)
       
   746         self.cli_file = None
       
   747         SocketConnectedTest.clientTearDown(self)
       
   748 
       
   749     def testSmallRead(self):
       
   750         # Performing small file read test
       
   751         first_seg = self.serv_file.read(len(MSG)-3)
       
   752         second_seg = self.serv_file.read(3)
       
   753         msg = first_seg + second_seg
       
   754         self.assertEqual(msg, MSG)
       
   755 
       
   756     def _testSmallRead(self):
       
   757         self.cli_file.write(MSG)
       
   758         self.cli_file.flush()
       
   759 
       
   760     def testFullRead(self):
       
   761         # read until EOF
       
   762         msg = self.serv_file.read()
       
   763         self.assertEqual(msg, MSG)
       
   764 
       
   765     def _testFullRead(self):
       
   766         self.cli_file.write(MSG)
       
   767         self.cli_file.close()
       
   768 
       
   769     def testUnbufferedRead(self):
       
   770         # Performing unbuffered file read test
       
   771         buf = ''
       
   772         while 1:
       
   773             char = self.serv_file.read(1)
       
   774             if not char:
       
   775                 break
       
   776             buf += char
       
   777         self.assertEqual(buf, MSG)
       
   778 
       
   779     def _testUnbufferedRead(self):
       
   780         self.cli_file.write(MSG)
       
   781         self.cli_file.flush()
       
   782 
       
   783     def testReadline(self):
       
   784         # Performing file readline test
       
   785         line = self.serv_file.readline()
       
   786         self.assertEqual(line, MSG)
       
   787 
       
   788     def _testReadline(self):
       
   789         self.cli_file.write(MSG)
       
   790         self.cli_file.flush()
       
   791 
       
   792     def testReadlineAfterRead(self):
       
   793         a_baloo_is = self.serv_file.read(len("A baloo is"))
       
   794         self.assertEqual("A baloo is", a_baloo_is)
       
   795         _a_bear = self.serv_file.read(len(" a bear"))
       
   796         self.assertEqual(" a bear", _a_bear)
       
   797         line = self.serv_file.readline()
       
   798         self.assertEqual("\n", line)
       
   799         line = self.serv_file.readline()
       
   800         self.assertEqual("A BALOO IS A BEAR.\n", line)
       
   801         line = self.serv_file.readline()
       
   802         self.assertEqual(MSG, line)
       
   803 
       
   804     def _testReadlineAfterRead(self):
       
   805         self.cli_file.write("A baloo is a bear\n")
       
   806         self.cli_file.write("A BALOO IS A BEAR.\n")
       
   807         self.cli_file.write(MSG)
       
   808         self.cli_file.flush()
       
   809 
       
   810     def testReadlineAfterReadNoNewline(self):
       
   811         end_of_ = self.serv_file.read(len("End Of "))
       
   812         self.assertEqual("End Of ", end_of_)
       
   813         line = self.serv_file.readline()
       
   814         self.assertEqual("Line", line)
       
   815 
       
   816     def _testReadlineAfterReadNoNewline(self):
       
   817         self.cli_file.write("End Of Line")
       
   818 
       
   819     def testClosedAttr(self):
       
   820         self.assert_(not self.serv_file.closed)
       
   821 
       
   822     def _testClosedAttr(self):
       
   823         self.assert_(not self.cli_file.closed)
       
   824 
       
   825 class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
       
   826 
       
   827     """Repeat the tests from FileObjectClassTestCase with bufsize==0.
       
   828 
       
   829     In this case (and in this case only), it should be possible to
       
   830     create a file object, read a line from it, create another file
       
   831     object, read another line from it, without loss of data in the
       
   832     first file object's buffer.  Note that httplib relies on this
       
   833     when reading multiple requests from the same socket."""
       
   834 
       
   835     bufsize = 0 # Use unbuffered mode
       
   836 
       
   837     def testUnbufferedReadline(self):
       
   838         # Read a line, create a new file object, read another line with it
       
   839         line = self.serv_file.readline() # first line
       
   840         self.assertEqual(line, "A. " + MSG) # first line
       
   841         self.serv_file = self.cli_conn.makefile('rb', 0)
       
   842         line = self.serv_file.readline() # second line
       
   843         self.assertEqual(line, "B. " + MSG) # second line
       
   844 
       
   845     def _testUnbufferedReadline(self):
       
   846         self.cli_file.write("A. " + MSG)
       
   847         self.cli_file.write("B. " + MSG)
       
   848         self.cli_file.flush()
       
   849 
       
   850 class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
       
   851 
       
   852     bufsize = 1 # Default-buffered for reading; line-buffered for writing
       
   853 
       
   854 
       
   855 class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
       
   856 
       
   857     bufsize = 2 # Exercise the buffering code
       
   858 
       
   859 
       
   860 class NetworkConnectionTest(object):
       
   861     """Prove network connection."""
       
   862     def clientSetUp(self):
       
   863         # We're inherited below by BasicTCPTest2, which also inherits
       
   864         # BasicTCPTest, which defines self.port referenced below.
       
   865         self.cli = socket.create_connection((HOST, self.port))
       
   866         self.serv_conn = self.cli
       
   867 
       
   868 class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
       
   869     """Tests that NetworkConnection does not break existing TCP functionality.
       
   870     """
       
   871 
       
   872 class NetworkConnectionNoServer(unittest.TestCase):
       
   873     def testWithoutServer(self):
       
   874         port = test_support.find_unused_port()
       
   875         self.failUnlessRaises(
       
   876             socket.error,
       
   877             lambda: socket.create_connection((HOST, port))
       
   878         )
       
   879 
       
   880 class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
       
   881 
       
   882     def __init__(self, methodName='runTest'):
       
   883         SocketTCPTest.__init__(self, methodName=methodName)
       
   884         ThreadableTest.__init__(self)
       
   885 
       
   886     def clientSetUp(self):
       
   887         pass
       
   888 
       
   889     def clientTearDown(self):
       
   890         self.cli.close()
       
   891         self.cli = None
       
   892         ThreadableTest.clientTearDown(self)
       
   893 
       
   894     def _justAccept(self):
       
   895         conn, addr = self.serv.accept()
       
   896 
       
   897     testFamily = _justAccept
       
   898     def _testFamily(self):
       
   899         self.cli = socket.create_connection((HOST, self.port), timeout=30)
       
   900         self.assertEqual(self.cli.family, 2)
       
   901 
       
   902     testTimeoutDefault = _justAccept
       
   903     def _testTimeoutDefault(self):
       
   904         # passing no explicit timeout uses socket's global default
       
   905         self.assert_(socket.getdefaulttimeout() is None)
       
   906         socket.setdefaulttimeout(42)
       
   907         try:
       
   908             self.cli = socket.create_connection((HOST, self.port))
       
   909         finally:
       
   910             socket.setdefaulttimeout(None)
       
   911         self.assertEquals(self.cli.gettimeout(), 42)
       
   912 
       
   913     testTimeoutNone = _justAccept
       
   914     def _testTimeoutNone(self):
       
   915         # None timeout means the same as sock.settimeout(None)
       
   916         self.assert_(socket.getdefaulttimeout() is None)
       
   917         socket.setdefaulttimeout(30)
       
   918         try:
       
   919             self.cli = socket.create_connection((HOST, self.port), timeout=None)
       
   920         finally:
       
   921             socket.setdefaulttimeout(None)
       
   922         self.assertEqual(self.cli.gettimeout(), None)
       
   923 
       
   924     testTimeoutValueNamed = _justAccept
       
   925     def _testTimeoutValueNamed(self):
       
   926         self.cli = socket.create_connection((HOST, self.port), timeout=30)
       
   927         self.assertEqual(self.cli.gettimeout(), 30)
       
   928 
       
   929     testTimeoutValueNonamed = _justAccept
       
   930     def _testTimeoutValueNonamed(self):
       
   931         self.cli = socket.create_connection((HOST, self.port), 30)
       
   932         self.assertEqual(self.cli.gettimeout(), 30)
       
   933 
       
   934 class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
       
   935 
       
   936     def __init__(self, methodName='runTest'):
       
   937         SocketTCPTest.__init__(self, methodName=methodName)
       
   938         ThreadableTest.__init__(self)
       
   939 
       
   940     def clientSetUp(self):
       
   941         pass
       
   942 
       
   943     def clientTearDown(self):
       
   944         self.cli.close()
       
   945         self.cli = None
       
   946         ThreadableTest.clientTearDown(self)
       
   947 
       
   948     def testInsideTimeout(self):
       
   949         conn, addr = self.serv.accept()
       
   950         time.sleep(3)
       
   951         conn.send("done!")
       
   952     testOutsideTimeout = testInsideTimeout
       
   953 
       
   954     def _testInsideTimeout(self):
       
   955         self.cli = sock = socket.create_connection((HOST, self.port))
       
   956         data = sock.recv(5)
       
   957         self.assertEqual(data, "done!")
       
   958 
       
   959     def _testOutsideTimeout(self):
       
   960         self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
       
   961         self.failUnlessRaises(socket.timeout, lambda: sock.recv(5))
       
   962 
       
   963 
       
   964 class Urllib2FileobjectTest(unittest.TestCase):
       
   965 
       
   966     # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
       
   967     # it close the socket if the close c'tor argument is true
       
   968 
       
   969     def testClose(self):
       
   970         class MockSocket:
       
   971             closed = False
       
   972             def flush(self): pass
       
   973             def close(self): self.closed = True
       
   974 
       
   975         # must not close unless we request it: the original use of _fileobject
       
   976         # by module socket requires that the underlying socket not be closed until
       
   977         # the _socketobject that created the _fileobject is closed
       
   978         s = MockSocket()
       
   979         f = socket._fileobject(s)
       
   980         f.close()
       
   981         self.assert_(not s.closed)
       
   982 
       
   983         s = MockSocket()
       
   984         f = socket._fileobject(s, close=True)
       
   985         f.close()
       
   986         self.assert_(s.closed)
       
   987 
       
   988 class TCPTimeoutTest(SocketTCPTest):
       
   989 
       
   990     def testTCPTimeout(self):
       
   991         def raise_timeout(*args, **kwargs):
       
   992             self.serv.settimeout(1.0)
       
   993             self.serv.accept()
       
   994         self.failUnlessRaises(socket.timeout, raise_timeout,
       
   995                               "Error generating a timeout exception (TCP)")
       
   996 
       
   997     def testTimeoutZero(self):
       
   998         ok = False
       
   999         try:
       
  1000             self.serv.settimeout(0.0)
       
  1001             foo = self.serv.accept()
       
  1002         except socket.timeout:
       
  1003             self.fail("caught timeout instead of error (TCP)")
       
  1004         except socket.error:
       
  1005             ok = True
       
  1006         except:
       
  1007             self.fail("caught unexpected exception (TCP)")
       
  1008         if not ok:
       
  1009             self.fail("accept() returned success when we did not expect it")
       
  1010 
       
  1011     def testInterruptedTimeout(self):
       
  1012         # XXX I don't know how to do this test on MSWindows or any other
       
  1013         # plaform that doesn't support signal.alarm() or os.kill(), though
       
  1014         # the bug should have existed on all platforms.
       
  1015         if not hasattr(signal, "alarm"):
       
  1016             return                  # can only test on *nix
       
  1017         self.serv.settimeout(5.0)   # must be longer than alarm
       
  1018         class Alarm(Exception):
       
  1019             pass
       
  1020         def alarm_handler(signal, frame):
       
  1021             raise Alarm
       
  1022         old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
       
  1023         try:
       
  1024             signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
       
  1025             try:
       
  1026                 foo = self.serv.accept()
       
  1027             except socket.timeout:
       
  1028                 self.fail("caught timeout instead of Alarm")
       
  1029             except Alarm:
       
  1030                 pass
       
  1031             except:
       
  1032                 self.fail("caught other exception instead of Alarm:"
       
  1033                           " %s(%s):\n%s" %
       
  1034                           (sys.exc_info()[:2] + (traceback.format_exc(),)))
       
  1035             else:
       
  1036                 self.fail("nothing caught")
       
  1037             finally:
       
  1038                 signal.alarm(0)         # shut off alarm
       
  1039         except Alarm:
       
  1040             self.fail("got Alarm in wrong place")
       
  1041         finally:
       
  1042             # no alarm can be pending.  Safe to restore old handler.
       
  1043             signal.signal(signal.SIGALRM, old_alarm)
       
  1044 
       
  1045 class UDPTimeoutTest(SocketTCPTest):
       
  1046 
       
  1047     def testUDPTimeout(self):
       
  1048         def raise_timeout(*args, **kwargs):
       
  1049             self.serv.settimeout(1.0)
       
  1050             self.serv.recv(1024)
       
  1051         self.failUnlessRaises(socket.timeout, raise_timeout,
       
  1052                               "Error generating a timeout exception (UDP)")
       
  1053 
       
  1054     def testTimeoutZero(self):
       
  1055         ok = False
       
  1056         try:
       
  1057             self.serv.settimeout(0.0)
       
  1058             foo = self.serv.recv(1024)
       
  1059         except socket.timeout:
       
  1060             self.fail("caught timeout instead of error (UDP)")
       
  1061         except socket.error:
       
  1062             ok = True
       
  1063         except:
       
  1064             self.fail("caught unexpected exception (UDP)")
       
  1065         if not ok:
       
  1066             self.fail("recv() returned success when we did not expect it")
       
  1067 
       
  1068 class TestExceptions(unittest.TestCase):
       
  1069 
       
  1070     def testExceptionTree(self):
       
  1071         self.assert_(issubclass(socket.error, Exception))
       
  1072         self.assert_(issubclass(socket.herror, socket.error))
       
  1073         self.assert_(issubclass(socket.gaierror, socket.error))
       
  1074         self.assert_(issubclass(socket.timeout, socket.error))
       
  1075 
       
  1076 class TestLinuxAbstractNamespace(unittest.TestCase):
       
  1077 
       
  1078     UNIX_PATH_MAX = 108
       
  1079 
       
  1080     def testLinuxAbstractNamespace(self):
       
  1081         address = "\x00python-test-hello\x00\xff"
       
  1082         s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
       
  1083         s1.bind(address)
       
  1084         s1.listen(1)
       
  1085         s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
       
  1086         s2.connect(s1.getsockname())
       
  1087         s1.accept()
       
  1088         self.assertEqual(s1.getsockname(), address)
       
  1089         self.assertEqual(s2.getpeername(), address)
       
  1090 
       
  1091     def testMaxName(self):
       
  1092         address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
       
  1093         s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
       
  1094         s.bind(address)
       
  1095         self.assertEqual(s.getsockname(), address)
       
  1096 
       
  1097     def testNameOverflow(self):
       
  1098         address = "\x00" + "h" * self.UNIX_PATH_MAX
       
  1099         s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
       
  1100         self.assertRaises(socket.error, s.bind, address)
       
  1101 
       
  1102 
       
  1103 class BufferIOTest(SocketConnectedTest):
       
  1104     """
       
  1105     Test the buffer versions of socket.recv() and socket.send().
       
  1106     """
       
  1107     def __init__(self, methodName='runTest'):
       
  1108         SocketConnectedTest.__init__(self, methodName=methodName)
       
  1109 
       
  1110     def testRecvInto(self):
       
  1111         buf = array.array('c', ' '*1024)
       
  1112         nbytes = self.cli_conn.recv_into(buf)
       
  1113         self.assertEqual(nbytes, len(MSG))
       
  1114         msg = buf.tostring()[:len(MSG)]
       
  1115         self.assertEqual(msg, MSG)
       
  1116 
       
  1117     def _testRecvInto(self):
       
  1118         buf = buffer(MSG)
       
  1119         self.serv_conn.send(buf)
       
  1120 
       
  1121     def testRecvFromInto(self):
       
  1122         buf = array.array('c', ' '*1024)
       
  1123         nbytes, addr = self.cli_conn.recvfrom_into(buf)
       
  1124         self.assertEqual(nbytes, len(MSG))
       
  1125         msg = buf.tostring()[:len(MSG)]
       
  1126         self.assertEqual(msg, MSG)
       
  1127 
       
  1128     def _testRecvFromInto(self):
       
  1129         buf = buffer(MSG)
       
  1130         self.serv_conn.send(buf)
       
  1131 
       
  1132 
       
  1133 TIPC_STYPE = 2000
       
  1134 TIPC_LOWER = 200
       
  1135 TIPC_UPPER = 210
       
  1136 
       
  1137 def isTipcAvailable():
       
  1138     """Check if the TIPC module is loaded
       
  1139 
       
  1140     The TIPC module is not loaded automatically on Ubuntu and probably
       
  1141     other Linux distros.
       
  1142     """
       
  1143     if not hasattr(socket, "AF_TIPC"):
       
  1144         return False
       
  1145     if not os.path.isfile("/proc/modules"):
       
  1146         return False
       
  1147     with open("/proc/modules") as f:
       
  1148         for line in f:
       
  1149             if line.startswith("tipc "):
       
  1150                 return True
       
  1151     if test_support.verbose:
       
  1152         print "TIPC module is not loaded, please 'sudo modprobe tipc'"
       
  1153     return False
       
  1154 
       
  1155 class TIPCTest (unittest.TestCase):
       
  1156     def testRDM(self):
       
  1157         srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
       
  1158         cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
       
  1159 
       
  1160         srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
       
  1161         srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
       
  1162                 TIPC_LOWER, TIPC_UPPER)
       
  1163         srv.bind(srvaddr)
       
  1164 
       
  1165         sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
       
  1166                 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
       
  1167         cli.sendto(MSG, sendaddr)
       
  1168 
       
  1169         msg, recvaddr = srv.recvfrom(1024)
       
  1170 
       
  1171         self.assertEqual(cli.getsockname(), recvaddr)
       
  1172         self.assertEqual(msg, MSG)
       
  1173 
       
  1174 
       
  1175 class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
       
  1176     def __init__(self, methodName = 'runTest'):
       
  1177         unittest.TestCase.__init__(self, methodName = methodName)
       
  1178         ThreadableTest.__init__(self)
       
  1179 
       
  1180     def setUp(self):
       
  1181         self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
       
  1182         self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
       
  1183         srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
       
  1184                 TIPC_LOWER, TIPC_UPPER)
       
  1185         self.srv.bind(srvaddr)
       
  1186         self.srv.listen(5)
       
  1187         self.serverExplicitReady()
       
  1188         self.conn, self.connaddr = self.srv.accept()
       
  1189 
       
  1190     def clientSetUp(self):
       
  1191         # The is a hittable race between serverExplicitReady() and the
       
  1192         # accept() call; sleep a little while to avoid it, otherwise
       
  1193         # we could get an exception
       
  1194         time.sleep(0.1)
       
  1195         self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
       
  1196         addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
       
  1197                 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
       
  1198         self.cli.connect(addr)
       
  1199         self.cliaddr = self.cli.getsockname()
       
  1200 
       
  1201     def testStream(self):
       
  1202         msg = self.conn.recv(1024)
       
  1203         self.assertEqual(msg, MSG)
       
  1204         self.assertEqual(self.cliaddr, self.connaddr)
       
  1205 
       
  1206     def _testStream(self):
       
  1207         self.cli.send(MSG)
       
  1208         self.cli.close()
       
  1209 
       
  1210 
       
  1211 def test_main():
       
  1212     tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
       
  1213              TestExceptions, BufferIOTest, BasicTCPTest2]
       
  1214     if sys.platform != 'mac':
       
  1215         tests.extend([ BasicUDPTest, UDPTimeoutTest ])
       
  1216 
       
  1217     tests.extend([
       
  1218         NonBlockingTCPTests,
       
  1219         FileObjectClassTestCase,
       
  1220         UnbufferedFileObjectClassTestCase,
       
  1221         LineBufferedFileObjectClassTestCase,
       
  1222         SmallBufferedFileObjectClassTestCase,
       
  1223         Urllib2FileobjectTest,
       
  1224         NetworkConnectionNoServer,
       
  1225         NetworkConnectionAttributesTest,
       
  1226         NetworkConnectionBehaviourTest,
       
  1227     ])
       
  1228     if hasattr(socket, "socketpair"):
       
  1229         tests.append(BasicSocketPairTest)
       
  1230     if sys.platform == 'linux2':
       
  1231         tests.append(TestLinuxAbstractNamespace)
       
  1232     if isTipcAvailable():
       
  1233         tests.append(TIPCTest)
       
  1234         tests.append(TIPCThreadableTest)
       
  1235 
       
  1236     thread_info = test_support.threading_setup()
       
  1237     test_support.run_unittest(*tests)
       
  1238     test_support.threading_cleanup(*thread_info)
       
  1239 
       
  1240 if __name__ == "__main__":
       
  1241     test_main()