|
1 # Very rudimentary test of thread module |
|
2 |
|
3 # Create a bunch of threads, let each do some work, wait until all are done |
|
4 |
|
5 from test.test_support import verbose |
|
6 import random |
|
7 import thread |
|
8 import time |
|
9 |
|
10 mutex = thread.allocate_lock() |
|
11 rmutex = thread.allocate_lock() # for calls to random |
|
12 running = 0 |
|
13 done = thread.allocate_lock() |
|
14 done.acquire() |
|
15 |
|
16 numtasks = 10 |
|
17 |
|
18 def task(ident): |
|
19 global running |
|
20 rmutex.acquire() |
|
21 delay = random.random() * numtasks |
|
22 rmutex.release() |
|
23 if verbose: |
|
24 print 'task', ident, 'will run for', round(delay, 1), 'sec' |
|
25 time.sleep(delay) |
|
26 if verbose: |
|
27 print 'task', ident, 'done' |
|
28 mutex.acquire() |
|
29 running = running - 1 |
|
30 if running == 0: |
|
31 done.release() |
|
32 mutex.release() |
|
33 |
|
34 next_ident = 0 |
|
35 def newtask(): |
|
36 global next_ident, running |
|
37 mutex.acquire() |
|
38 next_ident = next_ident + 1 |
|
39 if verbose: |
|
40 print 'creating task', next_ident |
|
41 thread.start_new_thread(task, (next_ident,)) |
|
42 running = running + 1 |
|
43 mutex.release() |
|
44 |
|
45 for i in range(numtasks): |
|
46 newtask() |
|
47 |
|
48 print 'waiting for all tasks to complete' |
|
49 done.acquire() |
|
50 print 'all tasks done' |
|
51 |
|
52 class barrier: |
|
53 def __init__(self, n): |
|
54 self.n = n |
|
55 self.waiting = 0 |
|
56 self.checkin = thread.allocate_lock() |
|
57 self.checkout = thread.allocate_lock() |
|
58 self.checkout.acquire() |
|
59 |
|
60 def enter(self): |
|
61 checkin, checkout = self.checkin, self.checkout |
|
62 |
|
63 checkin.acquire() |
|
64 self.waiting = self.waiting + 1 |
|
65 if self.waiting == self.n: |
|
66 self.waiting = self.n - 1 |
|
67 checkout.release() |
|
68 return |
|
69 checkin.release() |
|
70 |
|
71 checkout.acquire() |
|
72 self.waiting = self.waiting - 1 |
|
73 if self.waiting == 0: |
|
74 checkin.release() |
|
75 return |
|
76 checkout.release() |
|
77 |
|
78 numtrips = 3 |
|
79 def task2(ident): |
|
80 global running |
|
81 for i in range(numtrips): |
|
82 if ident == 0: |
|
83 # give it a good chance to enter the next |
|
84 # barrier before the others are all out |
|
85 # of the current one |
|
86 delay = 0.001 |
|
87 else: |
|
88 rmutex.acquire() |
|
89 delay = random.random() * numtasks |
|
90 rmutex.release() |
|
91 if verbose: |
|
92 print 'task', ident, 'will run for', round(delay, 1), 'sec' |
|
93 time.sleep(delay) |
|
94 if verbose: |
|
95 print 'task', ident, 'entering barrier', i |
|
96 bar.enter() |
|
97 if verbose: |
|
98 print 'task', ident, 'leaving barrier', i |
|
99 mutex.acquire() |
|
100 running -= 1 |
|
101 # Must release mutex before releasing done, else the main thread can |
|
102 # exit and set mutex to None as part of global teardown; then |
|
103 # mutex.release() raises AttributeError. |
|
104 finished = running == 0 |
|
105 mutex.release() |
|
106 if finished: |
|
107 done.release() |
|
108 |
|
109 print '\n*** Barrier Test ***' |
|
110 if done.acquire(0): |
|
111 raise ValueError, "'done' should have remained acquired" |
|
112 bar = barrier(numtasks) |
|
113 running = numtasks |
|
114 for i in range(numtasks): |
|
115 thread.start_new_thread(task2, (i,)) |
|
116 done.acquire() |
|
117 print 'all tasks done' |
|
118 |
|
119 # not all platforms support changing thread stack size |
|
120 print '\n*** Changing thread stack size ***' |
|
121 if thread.stack_size() != 0: |
|
122 raise ValueError, "initial stack_size not 0" |
|
123 |
|
124 thread.stack_size(0) |
|
125 if thread.stack_size() != 0: |
|
126 raise ValueError, "stack_size not reset to default" |
|
127 |
|
128 from os import name as os_name |
|
129 if os_name in ("nt", "os2", "posix"): |
|
130 |
|
131 tss_supported = 1 |
|
132 try: |
|
133 thread.stack_size(4096) |
|
134 except ValueError: |
|
135 print 'caught expected ValueError setting stack_size(4096)' |
|
136 except thread.error: |
|
137 tss_supported = 0 |
|
138 print 'platform does not support changing thread stack size' |
|
139 |
|
140 if tss_supported: |
|
141 failed = lambda s, e: s != e |
|
142 fail_msg = "stack_size(%d) failed - should succeed" |
|
143 for tss in (262144, 0x100000, 0): |
|
144 thread.stack_size(tss) |
|
145 if failed(thread.stack_size(), tss): |
|
146 raise ValueError, fail_msg % tss |
|
147 print 'successfully set stack_size(%d)' % tss |
|
148 |
|
149 for tss in (262144, 0x100000): |
|
150 print 'trying stack_size = %d' % tss |
|
151 next_ident = 0 |
|
152 for i in range(numtasks): |
|
153 newtask() |
|
154 |
|
155 print 'waiting for all tasks to complete' |
|
156 done.acquire() |
|
157 print 'all tasks done' |
|
158 |
|
159 # reset stack size to default |
|
160 thread.stack_size(0) |