|
1 # Some simple queue module tests, plus some failure conditions |
|
2 # to ensure the Queue locks remain stable. |
|
3 import Queue |
|
4 import sys |
|
5 import threading |
|
6 import time |
|
7 import unittest |
|
8 from test import test_support |
|
9 |
|
10 QUEUE_SIZE = 5 |
|
11 |
|
12 # A thread to run a function that unclogs a blocked Queue. |
|
13 class _TriggerThread(threading.Thread): |
|
14 def __init__(self, fn, args): |
|
15 self.fn = fn |
|
16 self.args = args |
|
17 self.startedEvent = threading.Event() |
|
18 threading.Thread.__init__(self) |
|
19 |
|
20 def run(self): |
|
21 # The sleep isn't necessary, but is intended to give the blocking |
|
22 # function in the main thread a chance at actually blocking before |
|
23 # we unclog it. But if the sleep is longer than the timeout-based |
|
24 # tests wait in their blocking functions, those tests will fail. |
|
25 # So we give them much longer timeout values compared to the |
|
26 # sleep here (I aimed at 10 seconds for blocking functions -- |
|
27 # they should never actually wait that long - they should make |
|
28 # progress as soon as we call self.fn()). |
|
29 time.sleep(0.1) |
|
30 self.startedEvent.set() |
|
31 self.fn(*self.args) |
|
32 |
|
33 |
|
34 # Execute a function that blocks, and in a separate thread, a function that |
|
35 # triggers the release. Returns the result of the blocking function. Caution: |
|
36 # block_func must guarantee to block until trigger_func is called, and |
|
37 # trigger_func must guarantee to change queue state so that block_func can make |
|
38 # enough progress to return. In particular, a block_func that just raises an |
|
39 # exception regardless of whether trigger_func is called will lead to |
|
40 # timing-dependent sporadic failures, and one of those went rarely seen but |
|
41 # undiagnosed for years. Now block_func must be unexceptional. If block_func |
|
42 # is supposed to raise an exception, call do_exceptional_blocking_test() |
|
43 # instead. |
|
44 |
|
45 class BlockingTestMixin: |
|
46 |
|
47 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): |
|
48 self.t = _TriggerThread(trigger_func, trigger_args) |
|
49 self.t.start() |
|
50 self.result = block_func(*block_args) |
|
51 # If block_func returned before our thread made the call, we failed! |
|
52 if not self.t.startedEvent.is_set(): |
|
53 self.fail("blocking function '%r' appeared not to block" % |
|
54 block_func) |
|
55 self.t.join(10) # make sure the thread terminates |
|
56 if self.t.is_alive(): |
|
57 self.fail("trigger function '%r' appeared to not return" % |
|
58 trigger_func) |
|
59 return self.result |
|
60 |
|
61 # Call this instead if block_func is supposed to raise an exception. |
|
62 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, |
|
63 trigger_args, expected_exception_class): |
|
64 self.t = _TriggerThread(trigger_func, trigger_args) |
|
65 self.t.start() |
|
66 try: |
|
67 try: |
|
68 block_func(*block_args) |
|
69 except expected_exception_class: |
|
70 raise |
|
71 else: |
|
72 self.fail("expected exception of kind %r" % |
|
73 expected_exception_class) |
|
74 finally: |
|
75 self.t.join(10) # make sure the thread terminates |
|
76 if self.t.is_alive(): |
|
77 self.fail("trigger function '%r' appeared to not return" % |
|
78 trigger_func) |
|
79 if not self.t.startedEvent.is_set(): |
|
80 self.fail("trigger thread ended but event never set") |
|
81 |
|
82 |
|
83 class BaseQueueTest(unittest.TestCase, BlockingTestMixin): |
|
84 def setUp(self): |
|
85 self.cum = 0 |
|
86 self.cumlock = threading.Lock() |
|
87 |
|
88 def simple_queue_test(self, q): |
|
89 if not q.empty(): |
|
90 raise RuntimeError, "Call this function with an empty queue" |
|
91 # I guess we better check things actually queue correctly a little :) |
|
92 q.put(111) |
|
93 q.put(333) |
|
94 q.put(222) |
|
95 target_order = dict(Queue = [111, 333, 222], |
|
96 LifoQueue = [222, 333, 111], |
|
97 PriorityQueue = [111, 222, 333]) |
|
98 actual_order = [q.get(), q.get(), q.get()] |
|
99 self.assertEquals(actual_order, target_order[q.__class__.__name__], |
|
100 "Didn't seem to queue the correct data!") |
|
101 for i in range(QUEUE_SIZE-1): |
|
102 q.put(i) |
|
103 self.assert_(not q.empty(), "Queue should not be empty") |
|
104 self.assert_(not q.full(), "Queue should not be full") |
|
105 q.put("last") |
|
106 self.assert_(q.full(), "Queue should be full") |
|
107 try: |
|
108 q.put("full", block=0) |
|
109 self.fail("Didn't appear to block with a full queue") |
|
110 except Queue.Full: |
|
111 pass |
|
112 try: |
|
113 q.put("full", timeout=0.01) |
|
114 self.fail("Didn't appear to time-out with a full queue") |
|
115 except Queue.Full: |
|
116 pass |
|
117 # Test a blocking put |
|
118 self.do_blocking_test(q.put, ("full",), q.get, ()) |
|
119 self.do_blocking_test(q.put, ("full", True, 10), q.get, ()) |
|
120 # Empty it |
|
121 for i in range(QUEUE_SIZE): |
|
122 q.get() |
|
123 self.assert_(q.empty(), "Queue should be empty") |
|
124 try: |
|
125 q.get(block=0) |
|
126 self.fail("Didn't appear to block with an empty queue") |
|
127 except Queue.Empty: |
|
128 pass |
|
129 try: |
|
130 q.get(timeout=0.01) |
|
131 self.fail("Didn't appear to time-out with an empty queue") |
|
132 except Queue.Empty: |
|
133 pass |
|
134 # Test a blocking get |
|
135 self.do_blocking_test(q.get, (), q.put, ('empty',)) |
|
136 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) |
|
137 |
|
138 |
|
139 def worker(self, q): |
|
140 while True: |
|
141 x = q.get() |
|
142 if x is None: |
|
143 q.task_done() |
|
144 return |
|
145 with self.cumlock: |
|
146 self.cum += x |
|
147 q.task_done() |
|
148 |
|
149 def queue_join_test(self, q): |
|
150 self.cum = 0 |
|
151 for i in (0,1): |
|
152 threading.Thread(target=self.worker, args=(q,)).start() |
|
153 for i in xrange(100): |
|
154 q.put(i) |
|
155 q.join() |
|
156 self.assertEquals(self.cum, sum(range(100)), |
|
157 "q.join() did not block until all tasks were done") |
|
158 for i in (0,1): |
|
159 q.put(None) # instruct the threads to close |
|
160 q.join() # verify that you can join twice |
|
161 |
|
162 def test_queue_task_done(self): |
|
163 # Test to make sure a queue task completed successfully. |
|
164 q = self.type2test() |
|
165 try: |
|
166 q.task_done() |
|
167 except ValueError: |
|
168 pass |
|
169 else: |
|
170 self.fail("Did not detect task count going negative") |
|
171 |
|
172 def test_queue_join(self): |
|
173 # Test that a queue join()s successfully, and before anything else |
|
174 # (done twice for insurance). |
|
175 q = self.type2test() |
|
176 self.queue_join_test(q) |
|
177 self.queue_join_test(q) |
|
178 try: |
|
179 q.task_done() |
|
180 except ValueError: |
|
181 pass |
|
182 else: |
|
183 self.fail("Did not detect task count going negative") |
|
184 |
|
185 def test_simple_queue(self): |
|
186 # Do it a couple of times on the same queue. |
|
187 # Done twice to make sure works with same instance reused. |
|
188 q = self.type2test(QUEUE_SIZE) |
|
189 self.simple_queue_test(q) |
|
190 self.simple_queue_test(q) |
|
191 |
|
192 |
|
193 class QueueTest(BaseQueueTest): |
|
194 type2test = Queue.Queue |
|
195 |
|
196 class LifoQueueTest(BaseQueueTest): |
|
197 type2test = Queue.LifoQueue |
|
198 |
|
199 class PriorityQueueTest(BaseQueueTest): |
|
200 type2test = Queue.PriorityQueue |
|
201 |
|
202 |
|
203 |
|
204 # A Queue subclass that can provoke failure at a moment's notice :) |
|
205 class FailingQueueException(Exception): |
|
206 pass |
|
207 |
|
208 class FailingQueue(Queue.Queue): |
|
209 def __init__(self, *args): |
|
210 self.fail_next_put = False |
|
211 self.fail_next_get = False |
|
212 Queue.Queue.__init__(self, *args) |
|
213 def _put(self, item): |
|
214 if self.fail_next_put: |
|
215 self.fail_next_put = False |
|
216 raise FailingQueueException, "You Lose" |
|
217 return Queue.Queue._put(self, item) |
|
218 def _get(self): |
|
219 if self.fail_next_get: |
|
220 self.fail_next_get = False |
|
221 raise FailingQueueException, "You Lose" |
|
222 return Queue.Queue._get(self) |
|
223 |
|
224 class FailingQueueTest(unittest.TestCase, BlockingTestMixin): |
|
225 |
|
226 def failing_queue_test(self, q): |
|
227 if not q.empty(): |
|
228 raise RuntimeError, "Call this function with an empty queue" |
|
229 for i in range(QUEUE_SIZE-1): |
|
230 q.put(i) |
|
231 # Test a failing non-blocking put. |
|
232 q.fail_next_put = True |
|
233 try: |
|
234 q.put("oops", block=0) |
|
235 self.fail("The queue didn't fail when it should have") |
|
236 except FailingQueueException: |
|
237 pass |
|
238 q.fail_next_put = True |
|
239 try: |
|
240 q.put("oops", timeout=0.1) |
|
241 self.fail("The queue didn't fail when it should have") |
|
242 except FailingQueueException: |
|
243 pass |
|
244 q.put("last") |
|
245 self.assert_(q.full(), "Queue should be full") |
|
246 # Test a failing blocking put |
|
247 q.fail_next_put = True |
|
248 try: |
|
249 self.do_blocking_test(q.put, ("full",), q.get, ()) |
|
250 self.fail("The queue didn't fail when it should have") |
|
251 except FailingQueueException: |
|
252 pass |
|
253 # Check the Queue isn't damaged. |
|
254 # put failed, but get succeeded - re-add |
|
255 q.put("last") |
|
256 # Test a failing timeout put |
|
257 q.fail_next_put = True |
|
258 try: |
|
259 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (), |
|
260 FailingQueueException) |
|
261 self.fail("The queue didn't fail when it should have") |
|
262 except FailingQueueException: |
|
263 pass |
|
264 # Check the Queue isn't damaged. |
|
265 # put failed, but get succeeded - re-add |
|
266 q.put("last") |
|
267 self.assert_(q.full(), "Queue should be full") |
|
268 q.get() |
|
269 self.assert_(not q.full(), "Queue should not be full") |
|
270 q.put("last") |
|
271 self.assert_(q.full(), "Queue should be full") |
|
272 # Test a blocking put |
|
273 self.do_blocking_test(q.put, ("full",), q.get, ()) |
|
274 # Empty it |
|
275 for i in range(QUEUE_SIZE): |
|
276 q.get() |
|
277 self.assert_(q.empty(), "Queue should be empty") |
|
278 q.put("first") |
|
279 q.fail_next_get = True |
|
280 try: |
|
281 q.get() |
|
282 self.fail("The queue didn't fail when it should have") |
|
283 except FailingQueueException: |
|
284 pass |
|
285 self.assert_(not q.empty(), "Queue should not be empty") |
|
286 q.fail_next_get = True |
|
287 try: |
|
288 q.get(timeout=0.1) |
|
289 self.fail("The queue didn't fail when it should have") |
|
290 except FailingQueueException: |
|
291 pass |
|
292 self.assert_(not q.empty(), "Queue should not be empty") |
|
293 q.get() |
|
294 self.assert_(q.empty(), "Queue should be empty") |
|
295 q.fail_next_get = True |
|
296 try: |
|
297 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',), |
|
298 FailingQueueException) |
|
299 self.fail("The queue didn't fail when it should have") |
|
300 except FailingQueueException: |
|
301 pass |
|
302 # put succeeded, but get failed. |
|
303 self.assert_(not q.empty(), "Queue should not be empty") |
|
304 q.get() |
|
305 self.assert_(q.empty(), "Queue should be empty") |
|
306 |
|
307 def test_failing_queue(self): |
|
308 # Test to make sure a queue is functioning correctly. |
|
309 # Done twice to the same instance. |
|
310 q = FailingQueue(QUEUE_SIZE) |
|
311 self.failing_queue_test(q) |
|
312 self.failing_queue_test(q) |
|
313 |
|
314 |
|
315 def test_main(): |
|
316 test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest, |
|
317 FailingQueueTest) |
|
318 |
|
319 |
|
320 if __name__ == "__main__": |
|
321 test_main() |