symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_asynchat.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 # test asynchat -- requires threading
       
     2 
       
     3 import thread # If this fails, we can't test this module
       
     4 import asyncore, asynchat, socket, threading, time
       
     5 import unittest
       
     6 import sys
       
     7 from test import test_support
       
     8 
       
     9 HOST = test_support.HOST
       
    10 SERVER_QUIT = 'QUIT\n'
       
    11 
       
    12 class echo_server(threading.Thread):
       
    13     # parameter to determine the number of bytes passed back to the
       
    14     # client each send
       
    15     chunk_size = 1
       
    16 
       
    17     def __init__(self, event):
       
    18         threading.Thread.__init__(self)
       
    19         self.event = event
       
    20         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       
    21         self.port = test_support.bind_port(self.sock)
       
    22 
       
    23     def run(self):
       
    24         self.sock.listen(1)
       
    25         self.event.set()
       
    26         conn, client = self.sock.accept()
       
    27         self.buffer = ""
       
    28         # collect data until quit message is seen
       
    29         while SERVER_QUIT not in self.buffer:
       
    30             data = conn.recv(1)
       
    31             if not data:
       
    32                 break
       
    33             self.buffer = self.buffer + data
       
    34 
       
    35         # remove the SERVER_QUIT message
       
    36         self.buffer = self.buffer.replace(SERVER_QUIT, '')
       
    37 
       
    38         # re-send entire set of collected data
       
    39         try:
       
    40             # this may fail on some tests, such as test_close_when_done, since
       
    41             # the client closes the channel when it's done sending
       
    42             while self.buffer:
       
    43                 n = conn.send(self.buffer[:self.chunk_size])
       
    44                 time.sleep(0.001)
       
    45                 self.buffer = self.buffer[n:]
       
    46         except:
       
    47             pass
       
    48 
       
    49         conn.close()
       
    50         self.sock.close()
       
    51 
       
    52 class echo_client(asynchat.async_chat):
       
    53 
       
    54     def __init__(self, terminator, server_port):
       
    55         asynchat.async_chat.__init__(self)
       
    56         self.contents = []
       
    57         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
       
    58         self.connect((HOST, server_port))
       
    59         self.set_terminator(terminator)
       
    60         self.buffer = ''
       
    61 
       
    62     def handle_connect(self):
       
    63         pass
       
    64 
       
    65     if sys.platform == 'darwin':
       
    66         # select.poll returns a select.POLLHUP at the end of the tests
       
    67         # on darwin, so just ignore it
       
    68         def handle_expt(self):
       
    69             pass
       
    70 
       
    71     def collect_incoming_data(self, data):
       
    72         self.buffer += data
       
    73 
       
    74     def found_terminator(self):
       
    75         self.contents.append(self.buffer)
       
    76         self.buffer = ""
       
    77 
       
    78 
       
    79 def start_echo_server():
       
    80     event = threading.Event()
       
    81     s = echo_server(event)
       
    82     s.start()
       
    83     event.wait()
       
    84     event.clear()
       
    85     time.sleep(0.01) # Give server time to start accepting.
       
    86     return s, event
       
    87 
       
    88 
       
    89 class TestAsynchat(unittest.TestCase):
       
    90     usepoll = False
       
    91 
       
    92     def setUp (self):
       
    93         pass
       
    94 
       
    95     def tearDown (self):
       
    96         pass
       
    97 
       
    98     def line_terminator_check(self, term, server_chunk):
       
    99         event = threading.Event()
       
   100         s = echo_server(event)
       
   101         s.chunk_size = server_chunk
       
   102         s.start()
       
   103         event.wait()
       
   104         event.clear()
       
   105         time.sleep(0.01) # Give server time to start accepting.
       
   106         c = echo_client(term, s.port)
       
   107         c.push("hello ")
       
   108         c.push("world%s" % term)
       
   109         c.push("I'm not dead yet!%s" % term)
       
   110         c.push(SERVER_QUIT)
       
   111         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
       
   112         s.join()
       
   113 
       
   114         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
       
   115 
       
   116     # the line terminator tests below check receiving variously-sized
       
   117     # chunks back from the server in order to exercise all branches of
       
   118     # async_chat.handle_read
       
   119 
       
   120     def test_line_terminator1(self):
       
   121         # test one-character terminator
       
   122         for l in (1,2,3):
       
   123             self.line_terminator_check('\n', l)
       
   124 
       
   125     def test_line_terminator2(self):
       
   126         # test two-character terminator
       
   127         for l in (1,2,3):
       
   128             self.line_terminator_check('\r\n', l)
       
   129 
       
   130     def test_line_terminator3(self):
       
   131         # test three-character terminator
       
   132         for l in (1,2,3):
       
   133             self.line_terminator_check('qqq', l)
       
   134 
       
   135     def numeric_terminator_check(self, termlen):
       
   136         # Try reading a fixed number of bytes
       
   137         s, event = start_echo_server()
       
   138         c = echo_client(termlen, s.port)
       
   139         data = "hello world, I'm not dead yet!\n"
       
   140         c.push(data)
       
   141         c.push(SERVER_QUIT)
       
   142         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
       
   143         s.join()
       
   144 
       
   145         self.assertEqual(c.contents, [data[:termlen]])
       
   146 
       
   147     def test_numeric_terminator1(self):
       
   148         # check that ints & longs both work (since type is
       
   149         # explicitly checked in async_chat.handle_read)
       
   150         self.numeric_terminator_check(1)
       
   151         self.numeric_terminator_check(1L)
       
   152 
       
   153     def test_numeric_terminator2(self):
       
   154         self.numeric_terminator_check(6L)
       
   155 
       
   156     def test_none_terminator(self):
       
   157         # Try reading a fixed number of bytes
       
   158         s, event = start_echo_server()
       
   159         c = echo_client(None, s.port)
       
   160         data = "hello world, I'm not dead yet!\n"
       
   161         c.push(data)
       
   162         c.push(SERVER_QUIT)
       
   163         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
       
   164         s.join()
       
   165 
       
   166         self.assertEqual(c.contents, [])
       
   167         self.assertEqual(c.buffer, data)
       
   168 
       
   169     def test_simple_producer(self):
       
   170         s, event = start_echo_server()
       
   171         c = echo_client('\n', s.port)
       
   172         data = "hello world\nI'm not dead yet!\n"
       
   173         p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
       
   174         c.push_with_producer(p)
       
   175         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
       
   176         s.join()
       
   177 
       
   178         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
       
   179 
       
   180     def test_string_producer(self):
       
   181         s, event = start_echo_server()
       
   182         c = echo_client('\n', s.port)
       
   183         data = "hello world\nI'm not dead yet!\n"
       
   184         c.push_with_producer(data+SERVER_QUIT)
       
   185         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
       
   186         s.join()
       
   187 
       
   188         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
       
   189 
       
   190     def test_empty_line(self):
       
   191         # checks that empty lines are handled correctly
       
   192         s, event = start_echo_server()
       
   193         c = echo_client('\n', s.port)
       
   194         c.push("hello world\n\nI'm not dead yet!\n")
       
   195         c.push(SERVER_QUIT)
       
   196         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
       
   197         s.join()
       
   198 
       
   199         self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
       
   200 
       
   201     def test_close_when_done(self):
       
   202         s, event = start_echo_server()
       
   203         c = echo_client('\n', s.port)
       
   204         c.push("hello world\nI'm not dead yet!\n")
       
   205         c.push(SERVER_QUIT)
       
   206         c.close_when_done()
       
   207         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
       
   208         s.join()
       
   209 
       
   210         self.assertEqual(c.contents, [])
       
   211         # the server might have been able to send a byte or two back, but this
       
   212         # at least checks that it received something and didn't just fail
       
   213         # (which could still result in the client not having received anything)
       
   214         self.assertTrue(len(s.buffer) > 0)
       
   215 
       
   216 
       
   217 class TestAsynchat_WithPoll(TestAsynchat):
       
   218     usepoll = True
       
   219 
       
   220 class TestHelperFunctions(unittest.TestCase):
       
   221     def test_find_prefix_at_end(self):
       
   222         self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
       
   223         self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
       
   224 
       
   225 class TestFifo(unittest.TestCase):
       
   226     def test_basic(self):
       
   227         f = asynchat.fifo()
       
   228         f.push(7)
       
   229         f.push('a')
       
   230         self.assertEqual(len(f), 2)
       
   231         self.assertEqual(f.first(), 7)
       
   232         self.assertEqual(f.pop(), (1, 7))
       
   233         self.assertEqual(len(f), 1)
       
   234         self.assertEqual(f.first(), 'a')
       
   235         self.assertEqual(f.is_empty(), False)
       
   236         self.assertEqual(f.pop(), (1, 'a'))
       
   237         self.assertEqual(len(f), 0)
       
   238         self.assertEqual(f.is_empty(), True)
       
   239         self.assertEqual(f.pop(), (0, None))
       
   240 
       
   241     def test_given_list(self):
       
   242         f = asynchat.fifo(['x', 17, 3])
       
   243         self.assertEqual(len(f), 3)
       
   244         self.assertEqual(f.pop(), (1, 'x'))
       
   245         self.assertEqual(f.pop(), (1, 17))
       
   246         self.assertEqual(f.pop(), (1, 3))
       
   247         self.assertEqual(f.pop(), (0, None))
       
   248 
       
   249 
       
   250 def test_main(verbose=None):
       
   251     test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
       
   252                               TestHelperFunctions, TestFifo)
       
   253 
       
   254 if __name__ == "__main__":
       
   255     test_main(verbose=True)