|
1 # test asynchat -- requires threading |
|
2 |
|
3 import thread # If this fails, we can't test this module |
|
4 import asyncore, asynchat, socket, threading, time |
|
5 import unittest |
|
6 import sys |
|
7 from test import test_support |
|
8 |
|
9 HOST = test_support.HOST |
|
10 SERVER_QUIT = 'QUIT\n' |
|
11 |
|
12 class echo_server(threading.Thread): |
|
13 # parameter to determine the number of bytes passed back to the |
|
14 # client each send |
|
15 chunk_size = 1 |
|
16 |
|
17 def __init__(self, event): |
|
18 threading.Thread.__init__(self) |
|
19 self.event = event |
|
20 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
21 self.port = test_support.bind_port(self.sock) |
|
22 |
|
23 def run(self): |
|
24 self.sock.listen(1) |
|
25 self.event.set() |
|
26 conn, client = self.sock.accept() |
|
27 self.buffer = "" |
|
28 # collect data until quit message is seen |
|
29 while SERVER_QUIT not in self.buffer: |
|
30 data = conn.recv(1) |
|
31 if not data: |
|
32 break |
|
33 self.buffer = self.buffer + data |
|
34 |
|
35 # remove the SERVER_QUIT message |
|
36 self.buffer = self.buffer.replace(SERVER_QUIT, '') |
|
37 |
|
38 # re-send entire set of collected data |
|
39 try: |
|
40 # this may fail on some tests, such as test_close_when_done, since |
|
41 # the client closes the channel when it's done sending |
|
42 while self.buffer: |
|
43 n = conn.send(self.buffer[:self.chunk_size]) |
|
44 time.sleep(0.001) |
|
45 self.buffer = self.buffer[n:] |
|
46 except: |
|
47 pass |
|
48 |
|
49 conn.close() |
|
50 self.sock.close() |
|
51 |
|
52 class echo_client(asynchat.async_chat): |
|
53 |
|
54 def __init__(self, terminator, server_port): |
|
55 asynchat.async_chat.__init__(self) |
|
56 self.contents = [] |
|
57 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) |
|
58 self.connect((HOST, server_port)) |
|
59 self.set_terminator(terminator) |
|
60 self.buffer = '' |
|
61 |
|
62 def handle_connect(self): |
|
63 pass |
|
64 |
|
65 if sys.platform == 'darwin': |
|
66 # select.poll returns a select.POLLHUP at the end of the tests |
|
67 # on darwin, so just ignore it |
|
68 def handle_expt(self): |
|
69 pass |
|
70 |
|
71 def collect_incoming_data(self, data): |
|
72 self.buffer += data |
|
73 |
|
74 def found_terminator(self): |
|
75 self.contents.append(self.buffer) |
|
76 self.buffer = "" |
|
77 |
|
78 |
|
79 def start_echo_server(): |
|
80 event = threading.Event() |
|
81 s = echo_server(event) |
|
82 s.start() |
|
83 event.wait() |
|
84 event.clear() |
|
85 time.sleep(0.01) # Give server time to start accepting. |
|
86 return s, event |
|
87 |
|
88 |
|
89 class TestAsynchat(unittest.TestCase): |
|
90 usepoll = False |
|
91 |
|
92 def setUp (self): |
|
93 pass |
|
94 |
|
95 def tearDown (self): |
|
96 pass |
|
97 |
|
98 def line_terminator_check(self, term, server_chunk): |
|
99 event = threading.Event() |
|
100 s = echo_server(event) |
|
101 s.chunk_size = server_chunk |
|
102 s.start() |
|
103 event.wait() |
|
104 event.clear() |
|
105 time.sleep(0.01) # Give server time to start accepting. |
|
106 c = echo_client(term, s.port) |
|
107 c.push("hello ") |
|
108 c.push("world%s" % term) |
|
109 c.push("I'm not dead yet!%s" % term) |
|
110 c.push(SERVER_QUIT) |
|
111 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) |
|
112 s.join() |
|
113 |
|
114 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) |
|
115 |
|
116 # the line terminator tests below check receiving variously-sized |
|
117 # chunks back from the server in order to exercise all branches of |
|
118 # async_chat.handle_read |
|
119 |
|
120 def test_line_terminator1(self): |
|
121 # test one-character terminator |
|
122 for l in (1,2,3): |
|
123 self.line_terminator_check('\n', l) |
|
124 |
|
125 def test_line_terminator2(self): |
|
126 # test two-character terminator |
|
127 for l in (1,2,3): |
|
128 self.line_terminator_check('\r\n', l) |
|
129 |
|
130 def test_line_terminator3(self): |
|
131 # test three-character terminator |
|
132 for l in (1,2,3): |
|
133 self.line_terminator_check('qqq', l) |
|
134 |
|
135 def numeric_terminator_check(self, termlen): |
|
136 # Try reading a fixed number of bytes |
|
137 s, event = start_echo_server() |
|
138 c = echo_client(termlen, s.port) |
|
139 data = "hello world, I'm not dead yet!\n" |
|
140 c.push(data) |
|
141 c.push(SERVER_QUIT) |
|
142 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) |
|
143 s.join() |
|
144 |
|
145 self.assertEqual(c.contents, [data[:termlen]]) |
|
146 |
|
147 def test_numeric_terminator1(self): |
|
148 # check that ints & longs both work (since type is |
|
149 # explicitly checked in async_chat.handle_read) |
|
150 self.numeric_terminator_check(1) |
|
151 self.numeric_terminator_check(1L) |
|
152 |
|
153 def test_numeric_terminator2(self): |
|
154 self.numeric_terminator_check(6L) |
|
155 |
|
156 def test_none_terminator(self): |
|
157 # Try reading a fixed number of bytes |
|
158 s, event = start_echo_server() |
|
159 c = echo_client(None, s.port) |
|
160 data = "hello world, I'm not dead yet!\n" |
|
161 c.push(data) |
|
162 c.push(SERVER_QUIT) |
|
163 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) |
|
164 s.join() |
|
165 |
|
166 self.assertEqual(c.contents, []) |
|
167 self.assertEqual(c.buffer, data) |
|
168 |
|
169 def test_simple_producer(self): |
|
170 s, event = start_echo_server() |
|
171 c = echo_client('\n', s.port) |
|
172 data = "hello world\nI'm not dead yet!\n" |
|
173 p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) |
|
174 c.push_with_producer(p) |
|
175 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) |
|
176 s.join() |
|
177 |
|
178 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) |
|
179 |
|
180 def test_string_producer(self): |
|
181 s, event = start_echo_server() |
|
182 c = echo_client('\n', s.port) |
|
183 data = "hello world\nI'm not dead yet!\n" |
|
184 c.push_with_producer(data+SERVER_QUIT) |
|
185 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) |
|
186 s.join() |
|
187 |
|
188 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) |
|
189 |
|
190 def test_empty_line(self): |
|
191 # checks that empty lines are handled correctly |
|
192 s, event = start_echo_server() |
|
193 c = echo_client('\n', s.port) |
|
194 c.push("hello world\n\nI'm not dead yet!\n") |
|
195 c.push(SERVER_QUIT) |
|
196 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) |
|
197 s.join() |
|
198 |
|
199 self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"]) |
|
200 |
|
201 def test_close_when_done(self): |
|
202 s, event = start_echo_server() |
|
203 c = echo_client('\n', s.port) |
|
204 c.push("hello world\nI'm not dead yet!\n") |
|
205 c.push(SERVER_QUIT) |
|
206 c.close_when_done() |
|
207 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) |
|
208 s.join() |
|
209 |
|
210 self.assertEqual(c.contents, []) |
|
211 # the server might have been able to send a byte or two back, but this |
|
212 # at least checks that it received something and didn't just fail |
|
213 # (which could still result in the client not having received anything) |
|
214 self.assertTrue(len(s.buffer) > 0) |
|
215 |
|
216 |
|
217 class TestAsynchat_WithPoll(TestAsynchat): |
|
218 usepoll = True |
|
219 |
|
220 class TestHelperFunctions(unittest.TestCase): |
|
221 def test_find_prefix_at_end(self): |
|
222 self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1) |
|
223 self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0) |
|
224 |
|
225 class TestFifo(unittest.TestCase): |
|
226 def test_basic(self): |
|
227 f = asynchat.fifo() |
|
228 f.push(7) |
|
229 f.push('a') |
|
230 self.assertEqual(len(f), 2) |
|
231 self.assertEqual(f.first(), 7) |
|
232 self.assertEqual(f.pop(), (1, 7)) |
|
233 self.assertEqual(len(f), 1) |
|
234 self.assertEqual(f.first(), 'a') |
|
235 self.assertEqual(f.is_empty(), False) |
|
236 self.assertEqual(f.pop(), (1, 'a')) |
|
237 self.assertEqual(len(f), 0) |
|
238 self.assertEqual(f.is_empty(), True) |
|
239 self.assertEqual(f.pop(), (0, None)) |
|
240 |
|
241 def test_given_list(self): |
|
242 f = asynchat.fifo(['x', 17, 3]) |
|
243 self.assertEqual(len(f), 3) |
|
244 self.assertEqual(f.pop(), (1, 'x')) |
|
245 self.assertEqual(f.pop(), (1, 17)) |
|
246 self.assertEqual(f.pop(), (1, 3)) |
|
247 self.assertEqual(f.pop(), (0, None)) |
|
248 |
|
249 |
|
250 def test_main(verbose=None): |
|
251 test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll, |
|
252 TestHelperFunctions, TestFifo) |
|
253 |
|
254 if __name__ == "__main__": |
|
255 test_main(verbose=True) |