|
1 # Very rudimentary test of threading module |
|
2 |
|
3 import test.test_support |
|
4 from test.test_support import verbose |
|
5 import random |
|
6 import sys |
|
7 import threading |
|
8 import thread |
|
9 import time |
|
10 import unittest |
|
11 |
|
12 # A trivial mutable counter. |
|
13 class Counter(object): |
|
14 def __init__(self): |
|
15 self.value = 0 |
|
16 def inc(self): |
|
17 self.value += 1 |
|
18 def dec(self): |
|
19 self.value -= 1 |
|
20 def get(self): |
|
21 return self.value |
|
22 |
|
23 class TestThread(threading.Thread): |
|
24 def __init__(self, name, testcase, sema, mutex, nrunning): |
|
25 threading.Thread.__init__(self, name=name) |
|
26 self.testcase = testcase |
|
27 self.sema = sema |
|
28 self.mutex = mutex |
|
29 self.nrunning = nrunning |
|
30 |
|
31 def run(self): |
|
32 delay = random.random() * 2 |
|
33 if verbose: |
|
34 print 'task', self.getName(), 'will run for', delay, 'sec' |
|
35 |
|
36 self.sema.acquire() |
|
37 |
|
38 self.mutex.acquire() |
|
39 self.nrunning.inc() |
|
40 if verbose: |
|
41 print self.nrunning.get(), 'tasks are running' |
|
42 self.testcase.assert_(self.nrunning.get() <= 3) |
|
43 self.mutex.release() |
|
44 |
|
45 time.sleep(delay) |
|
46 if verbose: |
|
47 print 'task', self.getName(), 'done' |
|
48 |
|
49 self.mutex.acquire() |
|
50 self.nrunning.dec() |
|
51 self.testcase.assert_(self.nrunning.get() >= 0) |
|
52 if verbose: |
|
53 print self.getName(), 'is finished.', self.nrunning.get(), \ |
|
54 'tasks are running' |
|
55 self.mutex.release() |
|
56 |
|
57 self.sema.release() |
|
58 |
|
59 class ThreadTests(unittest.TestCase): |
|
60 |
|
61 # Create a bunch of threads, let each do some work, wait until all are |
|
62 # done. |
|
63 def test_various_ops(self): |
|
64 # This takes about n/3 seconds to run (about n/3 clumps of tasks, |
|
65 # times about 1 second per clump). |
|
66 NUMTASKS = 10 |
|
67 |
|
68 # no more than 3 of the 10 can run at once |
|
69 sema = threading.BoundedSemaphore(value=3) |
|
70 mutex = threading.RLock() |
|
71 numrunning = Counter() |
|
72 |
|
73 threads = [] |
|
74 |
|
75 for i in range(NUMTASKS): |
|
76 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) |
|
77 threads.append(t) |
|
78 t.start() |
|
79 |
|
80 if verbose: |
|
81 print 'waiting for all tasks to complete' |
|
82 for t in threads: |
|
83 t.join(NUMTASKS) |
|
84 self.assert_(not t.isAlive()) |
|
85 if verbose: |
|
86 print 'all tasks done' |
|
87 self.assertEqual(numrunning.get(), 0) |
|
88 |
|
89 # run with a small(ish) thread stack size (256kB) |
|
90 def test_various_ops_small_stack(self): |
|
91 if verbose: |
|
92 print 'with 256kB thread stack size...' |
|
93 try: |
|
94 threading.stack_size(262144) |
|
95 except thread.error: |
|
96 if verbose: |
|
97 print 'platform does not support changing thread stack size' |
|
98 return |
|
99 self.test_various_ops() |
|
100 threading.stack_size(0) |
|
101 |
|
102 # run with a large thread stack size (1MB) |
|
103 def test_various_ops_large_stack(self): |
|
104 if verbose: |
|
105 print 'with 1MB thread stack size...' |
|
106 try: |
|
107 threading.stack_size(0x100000) |
|
108 except thread.error: |
|
109 if verbose: |
|
110 print 'platform does not support changing thread stack size' |
|
111 return |
|
112 self.test_various_ops() |
|
113 threading.stack_size(0) |
|
114 |
|
115 def test_foreign_thread(self): |
|
116 # Check that a "foreign" thread can use the threading module. |
|
117 def f(mutex): |
|
118 # Acquiring an RLock forces an entry for the foreign |
|
119 # thread to get made in the threading._active map. |
|
120 r = threading.RLock() |
|
121 r.acquire() |
|
122 r.release() |
|
123 mutex.release() |
|
124 |
|
125 mutex = threading.Lock() |
|
126 mutex.acquire() |
|
127 tid = thread.start_new_thread(f, (mutex,)) |
|
128 # Wait for the thread to finish. |
|
129 mutex.acquire() |
|
130 self.assert_(tid in threading._active) |
|
131 self.assert_(isinstance(threading._active[tid], |
|
132 threading._DummyThread)) |
|
133 del threading._active[tid] |
|
134 |
|
135 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) |
|
136 # exposed at the Python level. This test relies on ctypes to get at it. |
|
137 def test_PyThreadState_SetAsyncExc(self): |
|
138 try: |
|
139 import ctypes |
|
140 except ImportError: |
|
141 if verbose: |
|
142 print "test_PyThreadState_SetAsyncExc can't import ctypes" |
|
143 return # can't do anything |
|
144 |
|
145 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc |
|
146 |
|
147 class AsyncExc(Exception): |
|
148 pass |
|
149 |
|
150 exception = ctypes.py_object(AsyncExc) |
|
151 |
|
152 # `worker_started` is set by the thread when it's inside a try/except |
|
153 # block waiting to catch the asynchronously set AsyncExc exception. |
|
154 # `worker_saw_exception` is set by the thread upon catching that |
|
155 # exception. |
|
156 worker_started = threading.Event() |
|
157 worker_saw_exception = threading.Event() |
|
158 |
|
159 class Worker(threading.Thread): |
|
160 def run(self): |
|
161 self.id = thread.get_ident() |
|
162 self.finished = False |
|
163 |
|
164 try: |
|
165 while True: |
|
166 worker_started.set() |
|
167 time.sleep(0.1) |
|
168 except AsyncExc: |
|
169 self.finished = True |
|
170 worker_saw_exception.set() |
|
171 |
|
172 t = Worker() |
|
173 t.setDaemon(True) # so if this fails, we don't hang Python at shutdown |
|
174 t.start() |
|
175 if verbose: |
|
176 print " started worker thread" |
|
177 |
|
178 # Try a thread id that doesn't make sense. |
|
179 if verbose: |
|
180 print " trying nonsensical thread id" |
|
181 result = set_async_exc(ctypes.c_long(-1), exception) |
|
182 self.assertEqual(result, 0) # no thread states modified |
|
183 |
|
184 # Now raise an exception in the worker thread. |
|
185 if verbose: |
|
186 print " waiting for worker thread to get started" |
|
187 worker_started.wait() |
|
188 if verbose: |
|
189 print " verifying worker hasn't exited" |
|
190 self.assert_(not t.finished) |
|
191 if verbose: |
|
192 print " attempting to raise asynch exception in worker" |
|
193 result = set_async_exc(ctypes.c_long(t.id), exception) |
|
194 self.assertEqual(result, 1) # one thread state modified |
|
195 if verbose: |
|
196 print " waiting for worker to say it caught the exception" |
|
197 worker_saw_exception.wait(timeout=10) |
|
198 self.assert_(t.finished) |
|
199 if verbose: |
|
200 print " all OK -- joining worker" |
|
201 if t.finished: |
|
202 t.join() |
|
203 # else the thread is still running, and we have no way to kill it |
|
204 |
|
205 def test_enumerate_after_join(self): |
|
206 # Try hard to trigger #1703448: a thread is still returned in |
|
207 # threading.enumerate() after it has been join()ed. |
|
208 enum = threading.enumerate |
|
209 old_interval = sys.getcheckinterval() |
|
210 sys.setcheckinterval(1) |
|
211 try: |
|
212 for i in xrange(1, 1000): |
|
213 t = threading.Thread(target=lambda: None) |
|
214 t.start() |
|
215 t.join() |
|
216 l = enum() |
|
217 self.assertFalse(t in l, |
|
218 "#1703448 triggered after %d trials: %s" % (i, l)) |
|
219 finally: |
|
220 sys.setcheckinterval(old_interval) |
|
221 |
|
222 |
|
223 def test_main(): |
|
224 test.test_support.run_unittest(ThreadTests) |
|
225 |
|
226 if __name__ == "__main__": |
|
227 test_main() |