|
1 # Some simple Queue module tests, plus some failure conditions |
|
2 # to ensure the Queue locks remain stable. |
|
3 import Queue |
|
4 import sys |
|
5 import threading |
|
6 import time |
|
7 |
|
8 from test.test_support import verify, TestFailed, verbose |
|
9 |
|
10 QUEUE_SIZE = 5 |
|
11 |
|
12 # A thread to run a function that unclogs a blocked Queue. |
|
13 class _TriggerThread(threading.Thread): |
|
14 def __init__(self, fn, args): |
|
15 self.fn = fn |
|
16 self.args = args |
|
17 self.startedEvent = threading.Event() |
|
18 threading.Thread.__init__(self) |
|
19 |
|
20 def run(self): |
|
21 # The sleep isn't necessary, but is intended to give the blocking |
|
22 # function in the main thread a chance at actually blocking before |
|
23 # we unclog it. But if the sleep is longer than the timeout-based |
|
24 # tests wait in their blocking functions, those tests will fail. |
|
25 # So we give them much longer timeout values compared to the |
|
26 # sleep here (I aimed at 10 seconds for blocking functions -- |
|
27 # they should never actually wait that long - they should make |
|
28 # progress as soon as we call self.fn()). |
|
29 time.sleep(0.1) |
|
30 self.startedEvent.set() |
|
31 self.fn(*self.args) |
|
32 |
|
33 # Execute a function that blocks, and in a separate thread, a function that |
|
34 # triggers the release. Returns the result of the blocking function. |
|
35 # Caution: block_func must guarantee to block until trigger_func is |
|
36 # called, and trigger_func must guarantee to change queue state so that |
|
37 # block_func can make enough progress to return. In particular, a |
|
38 # block_func that just raises an exception regardless of whether trigger_func |
|
39 # is called will lead to timing-dependent sporadic failures, and one of |
|
40 # those went rarely seen but undiagnosed for years. Now block_func |
|
41 # must be unexceptional. If block_func is supposed to raise an exception, |
|
42 # call _doExceptionalBlockingTest() instead. |
|
43 def _doBlockingTest(block_func, block_args, trigger_func, trigger_args): |
|
44 t = _TriggerThread(trigger_func, trigger_args) |
|
45 t.start() |
|
46 result = block_func(*block_args) |
|
47 # If block_func returned before our thread made the call, we failed! |
|
48 if not t.startedEvent.isSet(): |
|
49 raise TestFailed("blocking function '%r' appeared not to block" % |
|
50 block_func) |
|
51 t.join(10) # make sure the thread terminates |
|
52 if t.isAlive(): |
|
53 raise TestFailed("trigger function '%r' appeared to not return" % |
|
54 trigger_func) |
|
55 return result |
|
56 |
|
57 # Call this instead if block_func is supposed to raise an exception. |
|
58 def _doExceptionalBlockingTest(block_func, block_args, trigger_func, |
|
59 trigger_args, expected_exception_class): |
|
60 t = _TriggerThread(trigger_func, trigger_args) |
|
61 t.start() |
|
62 try: |
|
63 try: |
|
64 block_func(*block_args) |
|
65 except expected_exception_class: |
|
66 raise |
|
67 else: |
|
68 raise TestFailed("expected exception of kind %r" % |
|
69 expected_exception_class) |
|
70 finally: |
|
71 t.join(10) # make sure the thread terminates |
|
72 if t.isAlive(): |
|
73 raise TestFailed("trigger function '%r' appeared to not return" % |
|
74 trigger_func) |
|
75 if not t.startedEvent.isSet(): |
|
76 raise TestFailed("trigger thread ended but event never set") |
|
77 |
|
78 # A Queue subclass that can provoke failure at a moment's notice :) |
|
79 class FailingQueueException(Exception): |
|
80 pass |
|
81 |
|
82 class FailingQueue(Queue.Queue): |
|
83 def __init__(self, *args): |
|
84 self.fail_next_put = False |
|
85 self.fail_next_get = False |
|
86 Queue.Queue.__init__(self, *args) |
|
87 def _put(self, item): |
|
88 if self.fail_next_put: |
|
89 self.fail_next_put = False |
|
90 raise FailingQueueException, "You Lose" |
|
91 return Queue.Queue._put(self, item) |
|
92 def _get(self): |
|
93 if self.fail_next_get: |
|
94 self.fail_next_get = False |
|
95 raise FailingQueueException, "You Lose" |
|
96 return Queue.Queue._get(self) |
|
97 |
|
98 def FailingQueueTest(q): |
|
99 if not q.empty(): |
|
100 raise RuntimeError, "Call this function with an empty queue" |
|
101 for i in range(QUEUE_SIZE-1): |
|
102 q.put(i) |
|
103 # Test a failing non-blocking put. |
|
104 q.fail_next_put = True |
|
105 try: |
|
106 q.put("oops", block=0) |
|
107 raise TestFailed("The queue didn't fail when it should have") |
|
108 except FailingQueueException: |
|
109 pass |
|
110 q.fail_next_put = True |
|
111 try: |
|
112 q.put("oops", timeout=0.1) |
|
113 raise TestFailed("The queue didn't fail when it should have") |
|
114 except FailingQueueException: |
|
115 pass |
|
116 q.put("last") |
|
117 verify(q.full(), "Queue should be full") |
|
118 # Test a failing blocking put |
|
119 q.fail_next_put = True |
|
120 try: |
|
121 _doBlockingTest(q.put, ("full",), q.get, ()) |
|
122 raise TestFailed("The queue didn't fail when it should have") |
|
123 except FailingQueueException: |
|
124 pass |
|
125 # Check the Queue isn't damaged. |
|
126 # put failed, but get succeeded - re-add |
|
127 q.put("last") |
|
128 # Test a failing timeout put |
|
129 q.fail_next_put = True |
|
130 try: |
|
131 _doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (), |
|
132 FailingQueueException) |
|
133 raise TestFailed("The queue didn't fail when it should have") |
|
134 except FailingQueueException: |
|
135 pass |
|
136 # Check the Queue isn't damaged. |
|
137 # put failed, but get succeeded - re-add |
|
138 q.put("last") |
|
139 verify(q.full(), "Queue should be full") |
|
140 q.get() |
|
141 verify(not q.full(), "Queue should not be full") |
|
142 q.put("last") |
|
143 verify(q.full(), "Queue should be full") |
|
144 # Test a blocking put |
|
145 _doBlockingTest( q.put, ("full",), q.get, ()) |
|
146 # Empty it |
|
147 for i in range(QUEUE_SIZE): |
|
148 q.get() |
|
149 verify(q.empty(), "Queue should be empty") |
|
150 q.put("first") |
|
151 q.fail_next_get = True |
|
152 try: |
|
153 q.get() |
|
154 raise TestFailed("The queue didn't fail when it should have") |
|
155 except FailingQueueException: |
|
156 pass |
|
157 verify(not q.empty(), "Queue should not be empty") |
|
158 q.fail_next_get = True |
|
159 try: |
|
160 q.get(timeout=0.1) |
|
161 raise TestFailed("The queue didn't fail when it should have") |
|
162 except FailingQueueException: |
|
163 pass |
|
164 verify(not q.empty(), "Queue should not be empty") |
|
165 q.get() |
|
166 verify(q.empty(), "Queue should be empty") |
|
167 q.fail_next_get = True |
|
168 try: |
|
169 _doExceptionalBlockingTest(q.get, (), q.put, ('empty',), |
|
170 FailingQueueException) |
|
171 raise TestFailed("The queue didn't fail when it should have") |
|
172 except FailingQueueException: |
|
173 pass |
|
174 # put succeeded, but get failed. |
|
175 verify(not q.empty(), "Queue should not be empty") |
|
176 q.get() |
|
177 verify(q.empty(), "Queue should be empty") |
|
178 |
|
179 def SimpleQueueTest(q): |
|
180 if not q.empty(): |
|
181 raise RuntimeError, "Call this function with an empty queue" |
|
182 # I guess we better check things actually queue correctly a little :) |
|
183 q.put(111) |
|
184 q.put(222) |
|
185 verify(q.get() == 111 and q.get() == 222, |
|
186 "Didn't seem to queue the correct data!") |
|
187 for i in range(QUEUE_SIZE-1): |
|
188 q.put(i) |
|
189 verify(not q.empty(), "Queue should not be empty") |
|
190 verify(not q.full(), "Queue should not be full") |
|
191 q.put("last") |
|
192 verify(q.full(), "Queue should be full") |
|
193 try: |
|
194 q.put("full", block=0) |
|
195 raise TestFailed("Didn't appear to block with a full queue") |
|
196 except Queue.Full: |
|
197 pass |
|
198 try: |
|
199 q.put("full", timeout=0.01) |
|
200 raise TestFailed("Didn't appear to time-out with a full queue") |
|
201 except Queue.Full: |
|
202 pass |
|
203 # Test a blocking put |
|
204 _doBlockingTest(q.put, ("full",), q.get, ()) |
|
205 _doBlockingTest(q.put, ("full", True, 10), q.get, ()) |
|
206 # Empty it |
|
207 for i in range(QUEUE_SIZE): |
|
208 q.get() |
|
209 verify(q.empty(), "Queue should be empty") |
|
210 try: |
|
211 q.get(block=0) |
|
212 raise TestFailed("Didn't appear to block with an empty queue") |
|
213 except Queue.Empty: |
|
214 pass |
|
215 try: |
|
216 q.get(timeout=0.01) |
|
217 raise TestFailed("Didn't appear to time-out with an empty queue") |
|
218 except Queue.Empty: |
|
219 pass |
|
220 # Test a blocking get |
|
221 _doBlockingTest(q.get, (), q.put, ('empty',)) |
|
222 _doBlockingTest(q.get, (True, 10), q.put, ('empty',)) |
|
223 |
|
224 cum = 0 |
|
225 cumlock = threading.Lock() |
|
226 |
|
227 def worker(q): |
|
228 global cum |
|
229 while True: |
|
230 x = q.get() |
|
231 if x is None: |
|
232 q.task_done() |
|
233 return |
|
234 cumlock.acquire() |
|
235 try: |
|
236 cum += x |
|
237 finally: |
|
238 cumlock.release() |
|
239 q.task_done() |
|
240 |
|
241 def QueueJoinTest(q): |
|
242 global cum |
|
243 cum = 0 |
|
244 for i in (0,1): |
|
245 threading.Thread(target=worker, args=(q,)).start() |
|
246 for i in xrange(100): |
|
247 q.put(i) |
|
248 q.join() |
|
249 verify(cum==sum(range(100)), "q.join() did not block until all tasks were done") |
|
250 for i in (0,1): |
|
251 q.put(None) # instruct the threads to close |
|
252 q.join() # verify that you can join twice |
|
253 |
|
254 def QueueTaskDoneTest(q): |
|
255 try: |
|
256 q.task_done() |
|
257 except ValueError: |
|
258 pass |
|
259 else: |
|
260 raise TestFailed("Did not detect task count going negative") |
|
261 |
|
262 def test(): |
|
263 q = Queue.Queue() |
|
264 QueueTaskDoneTest(q) |
|
265 QueueJoinTest(q) |
|
266 QueueJoinTest(q) |
|
267 QueueTaskDoneTest(q) |
|
268 |
|
269 q = Queue.Queue(QUEUE_SIZE) |
|
270 # Do it a couple of times on the same queue |
|
271 SimpleQueueTest(q) |
|
272 SimpleQueueTest(q) |
|
273 if verbose: |
|
274 print "Simple Queue tests seemed to work" |
|
275 q = FailingQueue(QUEUE_SIZE) |
|
276 FailingQueueTest(q) |
|
277 FailingQueueTest(q) |
|
278 if verbose: |
|
279 print "Failing Queue tests seemed to work" |
|
280 |
|
281 test() |