|
1 """Generic thread tests. |
|
2 |
|
3 Meant to be used by dummy_thread and thread. To allow for different modules |
|
4 to be used, test_main() can be called with the module to use as the thread |
|
5 implementation as its sole argument. |
|
6 |
|
7 """ |
|
8 import dummy_thread as _thread |
|
9 import time |
|
10 import Queue |
|
11 import random |
|
12 import unittest |
|
13 from test import test_support |
|
14 |
|
15 DELAY = 0 # Set > 0 when testing a module other than dummy_thread, such as |
|
16 # the 'thread' module. |
|
17 |
|
18 class LockTests(unittest.TestCase): |
|
19 """Test lock objects.""" |
|
20 |
|
21 def setUp(self): |
|
22 # Create a lock |
|
23 self.lock = _thread.allocate_lock() |
|
24 |
|
25 def test_initlock(self): |
|
26 #Make sure locks start locked |
|
27 self.failUnless(not self.lock.locked(), |
|
28 "Lock object is not initialized unlocked.") |
|
29 |
|
30 def test_release(self): |
|
31 # Test self.lock.release() |
|
32 self.lock.acquire() |
|
33 self.lock.release() |
|
34 self.failUnless(not self.lock.locked(), |
|
35 "Lock object did not release properly.") |
|
36 |
|
37 def test_improper_release(self): |
|
38 #Make sure release of an unlocked thread raises _thread.error |
|
39 self.failUnlessRaises(_thread.error, self.lock.release) |
|
40 |
|
41 def test_cond_acquire_success(self): |
|
42 #Make sure the conditional acquiring of the lock works. |
|
43 self.failUnless(self.lock.acquire(0), |
|
44 "Conditional acquiring of the lock failed.") |
|
45 |
|
46 def test_cond_acquire_fail(self): |
|
47 #Test acquiring locked lock returns False |
|
48 self.lock.acquire(0) |
|
49 self.failUnless(not self.lock.acquire(0), |
|
50 "Conditional acquiring of a locked lock incorrectly " |
|
51 "succeeded.") |
|
52 |
|
53 def test_uncond_acquire_success(self): |
|
54 #Make sure unconditional acquiring of a lock works. |
|
55 self.lock.acquire() |
|
56 self.failUnless(self.lock.locked(), |
|
57 "Uncondional locking failed.") |
|
58 |
|
59 def test_uncond_acquire_return_val(self): |
|
60 #Make sure that an unconditional locking returns True. |
|
61 self.failUnless(self.lock.acquire(1) is True, |
|
62 "Unconditional locking did not return True.") |
|
63 self.failUnless(self.lock.acquire() is True) |
|
64 |
|
65 def test_uncond_acquire_blocking(self): |
|
66 #Make sure that unconditional acquiring of a locked lock blocks. |
|
67 def delay_unlock(to_unlock, delay): |
|
68 """Hold on to lock for a set amount of time before unlocking.""" |
|
69 time.sleep(delay) |
|
70 to_unlock.release() |
|
71 |
|
72 self.lock.acquire() |
|
73 start_time = int(time.time()) |
|
74 _thread.start_new_thread(delay_unlock,(self.lock, DELAY)) |
|
75 if test_support.verbose: |
|
76 print |
|
77 print "*** Waiting for thread to release the lock "\ |
|
78 "(approx. %s sec.) ***" % DELAY |
|
79 self.lock.acquire() |
|
80 end_time = int(time.time()) |
|
81 if test_support.verbose: |
|
82 print "done" |
|
83 self.failUnless((end_time - start_time) >= DELAY, |
|
84 "Blocking by unconditional acquiring failed.") |
|
85 |
|
86 class MiscTests(unittest.TestCase): |
|
87 """Miscellaneous tests.""" |
|
88 |
|
89 def test_exit(self): |
|
90 #Make sure _thread.exit() raises SystemExit |
|
91 self.failUnlessRaises(SystemExit, _thread.exit) |
|
92 |
|
93 def test_ident(self): |
|
94 #Test sanity of _thread.get_ident() |
|
95 self.failUnless(isinstance(_thread.get_ident(), int), |
|
96 "_thread.get_ident() returned a non-integer") |
|
97 self.failUnless(_thread.get_ident() != 0, |
|
98 "_thread.get_ident() returned 0") |
|
99 |
|
100 def test_LockType(self): |
|
101 #Make sure _thread.LockType is the same type as _thread.allocate_locke() |
|
102 self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType), |
|
103 "_thread.LockType is not an instance of what is " |
|
104 "returned by _thread.allocate_lock()") |
|
105 |
|
106 def test_interrupt_main(self): |
|
107 #Calling start_new_thread with a function that executes interrupt_main |
|
108 # should raise KeyboardInterrupt upon completion. |
|
109 def call_interrupt(): |
|
110 _thread.interrupt_main() |
|
111 self.failUnlessRaises(KeyboardInterrupt, _thread.start_new_thread, |
|
112 call_interrupt, tuple()) |
|
113 |
|
114 def test_interrupt_in_main(self): |
|
115 # Make sure that if interrupt_main is called in main threat that |
|
116 # KeyboardInterrupt is raised instantly. |
|
117 self.failUnlessRaises(KeyboardInterrupt, _thread.interrupt_main) |
|
118 |
|
119 class ThreadTests(unittest.TestCase): |
|
120 """Test thread creation.""" |
|
121 |
|
122 def test_arg_passing(self): |
|
123 #Make sure that parameter passing works. |
|
124 def arg_tester(queue, arg1=False, arg2=False): |
|
125 """Use to test _thread.start_new_thread() passes args properly.""" |
|
126 queue.put((arg1, arg2)) |
|
127 |
|
128 testing_queue = Queue.Queue(1) |
|
129 _thread.start_new_thread(arg_tester, (testing_queue, True, True)) |
|
130 result = testing_queue.get() |
|
131 self.failUnless(result[0] and result[1], |
|
132 "Argument passing for thread creation using tuple failed") |
|
133 _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue, |
|
134 'arg1':True, 'arg2':True}) |
|
135 result = testing_queue.get() |
|
136 self.failUnless(result[0] and result[1], |
|
137 "Argument passing for thread creation using kwargs failed") |
|
138 _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True}) |
|
139 result = testing_queue.get() |
|
140 self.failUnless(result[0] and result[1], |
|
141 "Argument passing for thread creation using both tuple" |
|
142 " and kwargs failed") |
|
143 |
|
144 def test_multi_creation(self): |
|
145 #Make sure multiple threads can be created. |
|
146 def queue_mark(queue, delay): |
|
147 """Wait for ``delay`` seconds and then put something into ``queue``""" |
|
148 time.sleep(delay) |
|
149 queue.put(_thread.get_ident()) |
|
150 |
|
151 thread_count = 5 |
|
152 testing_queue = Queue.Queue(thread_count) |
|
153 if test_support.verbose: |
|
154 print |
|
155 print "*** Testing multiple thread creation "\ |
|
156 "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count) |
|
157 for count in xrange(thread_count): |
|
158 if DELAY: |
|
159 local_delay = round(random.random(), 1) |
|
160 else: |
|
161 local_delay = 0 |
|
162 _thread.start_new_thread(queue_mark, |
|
163 (testing_queue, local_delay)) |
|
164 time.sleep(DELAY) |
|
165 if test_support.verbose: |
|
166 print 'done' |
|
167 self.failUnless(testing_queue.qsize() == thread_count, |
|
168 "Not all %s threads executed properly after %s sec." % |
|
169 (thread_count, DELAY)) |
|
170 |
|
171 def test_main(imported_module=None): |
|
172 global _thread, DELAY |
|
173 if imported_module: |
|
174 _thread = imported_module |
|
175 DELAY = 2 |
|
176 if test_support.verbose: |
|
177 print |
|
178 print "*** Using %s as _thread module ***" % _thread |
|
179 test_support.run_unittest(LockTests, MiscTests, ThreadTests) |
|
180 |
|
181 if __name__ == '__main__': |
|
182 test_main() |