|
1 # Very rudimentary test of threading module |
|
2 |
|
3 import test.test_support |
|
4 from test.test_support import verbose |
|
5 import random |
|
6 import re |
|
7 import sys |
|
8 import threading |
|
9 import thread |
|
10 import time |
|
11 import unittest |
|
12 import weakref |
|
13 |
|
14 # A trivial mutable counter. |
|
15 class Counter(object): |
|
16 def __init__(self): |
|
17 self.value = 0 |
|
18 def inc(self): |
|
19 self.value += 1 |
|
20 def dec(self): |
|
21 self.value -= 1 |
|
22 def get(self): |
|
23 return self.value |
|
24 |
|
25 class TestThread(threading.Thread): |
|
26 def __init__(self, name, testcase, sema, mutex, nrunning): |
|
27 threading.Thread.__init__(self, name=name) |
|
28 self.testcase = testcase |
|
29 self.sema = sema |
|
30 self.mutex = mutex |
|
31 self.nrunning = nrunning |
|
32 |
|
33 def run(self): |
|
34 delay = random.random() / 10000.0 |
|
35 if verbose: |
|
36 print 'task %s will run for %.1f usec' % ( |
|
37 self.name, delay * 1e6) |
|
38 |
|
39 with self.sema: |
|
40 with self.mutex: |
|
41 self.nrunning.inc() |
|
42 if verbose: |
|
43 print self.nrunning.get(), 'tasks are running' |
|
44 self.testcase.assert_(self.nrunning.get() <= 3) |
|
45 |
|
46 time.sleep(delay) |
|
47 if verbose: |
|
48 print 'task', self.name, 'done' |
|
49 |
|
50 with self.mutex: |
|
51 self.nrunning.dec() |
|
52 self.testcase.assert_(self.nrunning.get() >= 0) |
|
53 if verbose: |
|
54 print '%s is finished. %d tasks are running' % ( |
|
55 self.name, self.nrunning.get()) |
|
56 |
|
57 class ThreadTests(unittest.TestCase): |
|
58 |
|
59 # Create a bunch of threads, let each do some work, wait until all are |
|
60 # done. |
|
61 def test_various_ops(self): |
|
62 # This takes about n/3 seconds to run (about n/3 clumps of tasks, |
|
63 # times about 1 second per clump). |
|
64 NUMTASKS = 10 |
|
65 |
|
66 # no more than 3 of the 10 can run at once |
|
67 sema = threading.BoundedSemaphore(value=3) |
|
68 mutex = threading.RLock() |
|
69 numrunning = Counter() |
|
70 |
|
71 threads = [] |
|
72 |
|
73 for i in range(NUMTASKS): |
|
74 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) |
|
75 threads.append(t) |
|
76 self.failUnlessEqual(t.ident, None) |
|
77 self.assert_(re.match('<TestThread\(.*, initial\)>', repr(t))) |
|
78 t.start() |
|
79 |
|
80 if verbose: |
|
81 print 'waiting for all tasks to complete' |
|
82 for t in threads: |
|
83 t.join(NUMTASKS) |
|
84 self.assert_(not t.is_alive()) |
|
85 self.failIfEqual(t.ident, 0) |
|
86 self.assert_(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t))) |
|
87 if verbose: |
|
88 print 'all tasks done' |
|
89 self.assertEqual(numrunning.get(), 0) |
|
90 |
|
91 # run with a small(ish) thread stack size (256kB) |
|
92 def test_various_ops_small_stack(self): |
|
93 if verbose: |
|
94 print 'with 256kB thread stack size...' |
|
95 try: |
|
96 threading.stack_size(262144) |
|
97 except thread.error: |
|
98 if verbose: |
|
99 print 'platform does not support changing thread stack size' |
|
100 return |
|
101 self.test_various_ops() |
|
102 threading.stack_size(0) |
|
103 |
|
104 # run with a large thread stack size (1MB) |
|
105 def test_various_ops_large_stack(self): |
|
106 if verbose: |
|
107 print 'with 1MB thread stack size...' |
|
108 try: |
|
109 threading.stack_size(0x100000) |
|
110 except thread.error: |
|
111 if verbose: |
|
112 print 'platform does not support changing thread stack size' |
|
113 return |
|
114 self.test_various_ops() |
|
115 threading.stack_size(0) |
|
116 |
|
117 def test_foreign_thread(self): |
|
118 # Check that a "foreign" thread can use the threading module. |
|
119 def f(mutex): |
|
120 # Acquiring an RLock forces an entry for the foreign |
|
121 # thread to get made in the threading._active map. |
|
122 r = threading.RLock() |
|
123 r.acquire() |
|
124 r.release() |
|
125 mutex.release() |
|
126 |
|
127 mutex = threading.Lock() |
|
128 mutex.acquire() |
|
129 tid = thread.start_new_thread(f, (mutex,)) |
|
130 # Wait for the thread to finish. |
|
131 mutex.acquire() |
|
132 self.assert_(tid in threading._active) |
|
133 self.assert_(isinstance(threading._active[tid], |
|
134 threading._DummyThread)) |
|
135 del threading._active[tid] |
|
136 |
|
137 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) |
|
138 # exposed at the Python level. This test relies on ctypes to get at it. |
|
139 def test_PyThreadState_SetAsyncExc(self): |
|
140 try: |
|
141 import ctypes |
|
142 except ImportError: |
|
143 if verbose: |
|
144 print "test_PyThreadState_SetAsyncExc can't import ctypes" |
|
145 return # can't do anything |
|
146 |
|
147 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc |
|
148 |
|
149 class AsyncExc(Exception): |
|
150 pass |
|
151 |
|
152 exception = ctypes.py_object(AsyncExc) |
|
153 |
|
154 # `worker_started` is set by the thread when it's inside a try/except |
|
155 # block waiting to catch the asynchronously set AsyncExc exception. |
|
156 # `worker_saw_exception` is set by the thread upon catching that |
|
157 # exception. |
|
158 worker_started = threading.Event() |
|
159 worker_saw_exception = threading.Event() |
|
160 |
|
161 class Worker(threading.Thread): |
|
162 def run(self): |
|
163 self.id = thread.get_ident() |
|
164 self.finished = False |
|
165 |
|
166 try: |
|
167 while True: |
|
168 worker_started.set() |
|
169 time.sleep(0.1) |
|
170 except AsyncExc: |
|
171 self.finished = True |
|
172 worker_saw_exception.set() |
|
173 |
|
174 t = Worker() |
|
175 t.daemon = True # so if this fails, we don't hang Python at shutdown |
|
176 t.start() |
|
177 if verbose: |
|
178 print " started worker thread" |
|
179 |
|
180 # Try a thread id that doesn't make sense. |
|
181 if verbose: |
|
182 print " trying nonsensical thread id" |
|
183 result = set_async_exc(ctypes.c_long(-1), exception) |
|
184 self.assertEqual(result, 0) # no thread states modified |
|
185 |
|
186 # Now raise an exception in the worker thread. |
|
187 if verbose: |
|
188 print " waiting for worker thread to get started" |
|
189 worker_started.wait() |
|
190 if verbose: |
|
191 print " verifying worker hasn't exited" |
|
192 self.assert_(not t.finished) |
|
193 if verbose: |
|
194 print " attempting to raise asynch exception in worker" |
|
195 result = set_async_exc(ctypes.c_long(t.id), exception) |
|
196 self.assertEqual(result, 1) # one thread state modified |
|
197 if verbose: |
|
198 print " waiting for worker to say it caught the exception" |
|
199 worker_saw_exception.wait(timeout=10) |
|
200 self.assert_(t.finished) |
|
201 if verbose: |
|
202 print " all OK -- joining worker" |
|
203 if t.finished: |
|
204 t.join() |
|
205 # else the thread is still running, and we have no way to kill it |
|
206 |
|
207 def test_finalize_runnning_thread(self): |
|
208 # Issue 1402: the PyGILState_Ensure / _Release functions may be called |
|
209 # very late on python exit: on deallocation of a running thread for |
|
210 # example. |
|
211 try: |
|
212 import ctypes |
|
213 except ImportError: |
|
214 if verbose: |
|
215 print("test_finalize_with_runnning_thread can't import ctypes") |
|
216 return # can't do anything |
|
217 |
|
218 import subprocess |
|
219 rc = subprocess.call([sys.executable, "-c", """if 1: |
|
220 import ctypes, sys, time, thread |
|
221 |
|
222 # This lock is used as a simple event variable. |
|
223 ready = thread.allocate_lock() |
|
224 ready.acquire() |
|
225 |
|
226 # Module globals are cleared before __del__ is run |
|
227 # So we save the functions in class dict |
|
228 class C: |
|
229 ensure = ctypes.pythonapi.PyGILState_Ensure |
|
230 release = ctypes.pythonapi.PyGILState_Release |
|
231 def __del__(self): |
|
232 state = self.ensure() |
|
233 self.release(state) |
|
234 |
|
235 def waitingThread(): |
|
236 x = C() |
|
237 ready.release() |
|
238 time.sleep(100) |
|
239 |
|
240 thread.start_new_thread(waitingThread, ()) |
|
241 ready.acquire() # Be sure the other thread is waiting. |
|
242 sys.exit(42) |
|
243 """]) |
|
244 self.assertEqual(rc, 42) |
|
245 |
|
246 def test_finalize_with_trace(self): |
|
247 # Issue1733757 |
|
248 # Avoid a deadlock when sys.settrace steps into threading._shutdown |
|
249 import subprocess |
|
250 rc = subprocess.call([sys.executable, "-c", """if 1: |
|
251 import sys, threading |
|
252 |
|
253 # A deadlock-killer, to prevent the |
|
254 # testsuite to hang forever |
|
255 def killer(): |
|
256 import os, time |
|
257 time.sleep(2) |
|
258 print 'program blocked; aborting' |
|
259 os._exit(2) |
|
260 t = threading.Thread(target=killer) |
|
261 t.daemon = True |
|
262 t.start() |
|
263 |
|
264 # This is the trace function |
|
265 def func(frame, event, arg): |
|
266 threading.current_thread() |
|
267 return func |
|
268 |
|
269 sys.settrace(func) |
|
270 """]) |
|
271 self.failIf(rc == 2, "interpreted was blocked") |
|
272 self.failUnless(rc == 0, "Unexpected error") |
|
273 |
|
274 |
|
275 def test_enumerate_after_join(self): |
|
276 # Try hard to trigger #1703448: a thread is still returned in |
|
277 # threading.enumerate() after it has been join()ed. |
|
278 enum = threading.enumerate |
|
279 old_interval = sys.getcheckinterval() |
|
280 try: |
|
281 for i in xrange(1, 100): |
|
282 # Try a couple times at each thread-switching interval |
|
283 # to get more interleavings. |
|
284 sys.setcheckinterval(i // 5) |
|
285 t = threading.Thread(target=lambda: None) |
|
286 t.start() |
|
287 t.join() |
|
288 l = enum() |
|
289 self.assertFalse(t in l, |
|
290 "#1703448 triggered after %d trials: %s" % (i, l)) |
|
291 finally: |
|
292 sys.setcheckinterval(old_interval) |
|
293 |
|
294 def test_no_refcycle_through_target(self): |
|
295 class RunSelfFunction(object): |
|
296 def __init__(self, should_raise): |
|
297 # The links in this refcycle from Thread back to self |
|
298 # should be cleaned up when the thread completes. |
|
299 self.should_raise = should_raise |
|
300 self.thread = threading.Thread(target=self._run, |
|
301 args=(self,), |
|
302 kwargs={'yet_another':self}) |
|
303 self.thread.start() |
|
304 |
|
305 def _run(self, other_ref, yet_another): |
|
306 if self.should_raise: |
|
307 raise SystemExit |
|
308 |
|
309 cyclic_object = RunSelfFunction(should_raise=False) |
|
310 weak_cyclic_object = weakref.ref(cyclic_object) |
|
311 cyclic_object.thread.join() |
|
312 del cyclic_object |
|
313 self.assertEquals(None, weak_cyclic_object(), |
|
314 msg=('%d references still around' % |
|
315 sys.getrefcount(weak_cyclic_object()))) |
|
316 |
|
317 raising_cyclic_object = RunSelfFunction(should_raise=True) |
|
318 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) |
|
319 raising_cyclic_object.thread.join() |
|
320 del raising_cyclic_object |
|
321 self.assertEquals(None, weak_raising_cyclic_object(), |
|
322 msg=('%d references still around' % |
|
323 sys.getrefcount(weak_raising_cyclic_object()))) |
|
324 |
|
325 |
|
326 class ThreadJoinOnShutdown(unittest.TestCase): |
|
327 |
|
328 def _run_and_join(self, script): |
|
329 script = """if 1: |
|
330 import sys, os, time, threading |
|
331 |
|
332 # a thread, which waits for the main program to terminate |
|
333 def joiningfunc(mainthread): |
|
334 mainthread.join() |
|
335 print 'end of thread' |
|
336 \n""" + script |
|
337 |
|
338 import subprocess |
|
339 p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) |
|
340 rc = p.wait() |
|
341 data = p.stdout.read().replace('\r', '') |
|
342 self.assertEqual(data, "end of main\nend of thread\n") |
|
343 self.failIf(rc == 2, "interpreter was blocked") |
|
344 self.failUnless(rc == 0, "Unexpected error") |
|
345 |
|
346 def test_1_join_on_shutdown(self): |
|
347 # The usual case: on exit, wait for a non-daemon thread |
|
348 script = """if 1: |
|
349 import os |
|
350 t = threading.Thread(target=joiningfunc, |
|
351 args=(threading.current_thread(),)) |
|
352 t.start() |
|
353 time.sleep(0.1) |
|
354 print 'end of main' |
|
355 """ |
|
356 self._run_and_join(script) |
|
357 |
|
358 |
|
359 def test_2_join_in_forked_process(self): |
|
360 # Like the test above, but from a forked interpreter |
|
361 import os |
|
362 if not hasattr(os, 'fork'): |
|
363 return |
|
364 script = """if 1: |
|
365 childpid = os.fork() |
|
366 if childpid != 0: |
|
367 os.waitpid(childpid, 0) |
|
368 sys.exit(0) |
|
369 |
|
370 t = threading.Thread(target=joiningfunc, |
|
371 args=(threading.current_thread(),)) |
|
372 t.start() |
|
373 print 'end of main' |
|
374 """ |
|
375 self._run_and_join(script) |
|
376 |
|
377 def test_3_join_in_forked_from_thread(self): |
|
378 # Like the test above, but fork() was called from a worker thread |
|
379 # In the forked process, the main Thread object must be marked as stopped. |
|
380 import os |
|
381 if not hasattr(os, 'fork'): |
|
382 return |
|
383 # Skip platforms with known problems forking from a worker thread. |
|
384 # See http://bugs.python.org/issue3863. |
|
385 if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): |
|
386 print >>sys.stderr, ('Skipping test_3_join_in_forked_from_thread' |
|
387 ' due to known OS bugs on'), sys.platform |
|
388 return |
|
389 script = """if 1: |
|
390 main_thread = threading.current_thread() |
|
391 def worker(): |
|
392 childpid = os.fork() |
|
393 if childpid != 0: |
|
394 os.waitpid(childpid, 0) |
|
395 sys.exit(0) |
|
396 |
|
397 t = threading.Thread(target=joiningfunc, |
|
398 args=(main_thread,)) |
|
399 print 'end of main' |
|
400 t.start() |
|
401 t.join() # Should not block: main_thread is already stopped |
|
402 |
|
403 w = threading.Thread(target=worker) |
|
404 w.start() |
|
405 """ |
|
406 self._run_and_join(script) |
|
407 |
|
408 |
|
409 class ThreadingExceptionTests(unittest.TestCase): |
|
410 # A RuntimeError should be raised if Thread.start() is called |
|
411 # multiple times. |
|
412 def test_start_thread_again(self): |
|
413 thread = threading.Thread() |
|
414 thread.start() |
|
415 self.assertRaises(RuntimeError, thread.start) |
|
416 |
|
417 def test_releasing_unacquired_rlock(self): |
|
418 rlock = threading.RLock() |
|
419 self.assertRaises(RuntimeError, rlock.release) |
|
420 |
|
421 def test_waiting_on_unacquired_condition(self): |
|
422 cond = threading.Condition() |
|
423 self.assertRaises(RuntimeError, cond.wait) |
|
424 |
|
425 def test_notify_on_unacquired_condition(self): |
|
426 cond = threading.Condition() |
|
427 self.assertRaises(RuntimeError, cond.notify) |
|
428 |
|
429 def test_semaphore_with_negative_value(self): |
|
430 self.assertRaises(ValueError, threading.Semaphore, value = -1) |
|
431 self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxint) |
|
432 |
|
433 def test_joining_current_thread(self): |
|
434 current_thread = threading.current_thread() |
|
435 self.assertRaises(RuntimeError, current_thread.join); |
|
436 |
|
437 def test_joining_inactive_thread(self): |
|
438 thread = threading.Thread() |
|
439 self.assertRaises(RuntimeError, thread.join) |
|
440 |
|
441 def test_daemonize_active_thread(self): |
|
442 thread = threading.Thread() |
|
443 thread.start() |
|
444 self.assertRaises(RuntimeError, setattr, thread, "daemon", True) |
|
445 |
|
446 |
|
447 def test_main(): |
|
448 test.test_support.run_unittest(ThreadTests, |
|
449 ThreadJoinOnShutdown, |
|
450 ThreadingExceptionTests, |
|
451 ) |
|
452 |
|
453 if __name__ == "__main__": |
|
454 test_main() |