|
1 # |
|
2 # A test file for the `multiprocessing` package |
|
3 # |
|
4 |
|
5 import time, sys, random |
|
6 from Queue import Empty |
|
7 |
|
8 import multiprocessing # may get overwritten |
|
9 |
|
10 |
|
11 #### TEST_VALUE |
|
12 |
|
13 def value_func(running, mutex): |
|
14 random.seed() |
|
15 time.sleep(random.random()*4) |
|
16 |
|
17 mutex.acquire() |
|
18 print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished' |
|
19 running.value -= 1 |
|
20 mutex.release() |
|
21 |
|
22 def test_value(): |
|
23 TASKS = 10 |
|
24 running = multiprocessing.Value('i', TASKS) |
|
25 mutex = multiprocessing.Lock() |
|
26 |
|
27 for i in range(TASKS): |
|
28 p = multiprocessing.Process(target=value_func, args=(running, mutex)) |
|
29 p.start() |
|
30 |
|
31 while running.value > 0: |
|
32 time.sleep(0.08) |
|
33 mutex.acquire() |
|
34 print running.value, |
|
35 sys.stdout.flush() |
|
36 mutex.release() |
|
37 |
|
38 print |
|
39 print 'No more running processes' |
|
40 |
|
41 |
|
42 #### TEST_QUEUE |
|
43 |
|
44 def queue_func(queue): |
|
45 for i in range(30): |
|
46 time.sleep(0.5 * random.random()) |
|
47 queue.put(i*i) |
|
48 queue.put('STOP') |
|
49 |
|
50 def test_queue(): |
|
51 q = multiprocessing.Queue() |
|
52 |
|
53 p = multiprocessing.Process(target=queue_func, args=(q,)) |
|
54 p.start() |
|
55 |
|
56 o = None |
|
57 while o != 'STOP': |
|
58 try: |
|
59 o = q.get(timeout=0.3) |
|
60 print o, |
|
61 sys.stdout.flush() |
|
62 except Empty: |
|
63 print 'TIMEOUT' |
|
64 |
|
65 print |
|
66 |
|
67 |
|
68 #### TEST_CONDITION |
|
69 |
|
70 def condition_func(cond): |
|
71 cond.acquire() |
|
72 print '\t' + str(cond) |
|
73 time.sleep(2) |
|
74 print '\tchild is notifying' |
|
75 print '\t' + str(cond) |
|
76 cond.notify() |
|
77 cond.release() |
|
78 |
|
79 def test_condition(): |
|
80 cond = multiprocessing.Condition() |
|
81 |
|
82 p = multiprocessing.Process(target=condition_func, args=(cond,)) |
|
83 print cond |
|
84 |
|
85 cond.acquire() |
|
86 print cond |
|
87 cond.acquire() |
|
88 print cond |
|
89 |
|
90 p.start() |
|
91 |
|
92 print 'main is waiting' |
|
93 cond.wait() |
|
94 print 'main has woken up' |
|
95 |
|
96 print cond |
|
97 cond.release() |
|
98 print cond |
|
99 cond.release() |
|
100 |
|
101 p.join() |
|
102 print cond |
|
103 |
|
104 |
|
105 #### TEST_SEMAPHORE |
|
106 |
|
107 def semaphore_func(sema, mutex, running): |
|
108 sema.acquire() |
|
109 |
|
110 mutex.acquire() |
|
111 running.value += 1 |
|
112 print running.value, 'tasks are running' |
|
113 mutex.release() |
|
114 |
|
115 random.seed() |
|
116 time.sleep(random.random()*2) |
|
117 |
|
118 mutex.acquire() |
|
119 running.value -= 1 |
|
120 print '%s has finished' % multiprocessing.current_process() |
|
121 mutex.release() |
|
122 |
|
123 sema.release() |
|
124 |
|
125 def test_semaphore(): |
|
126 sema = multiprocessing.Semaphore(3) |
|
127 mutex = multiprocessing.RLock() |
|
128 running = multiprocessing.Value('i', 0) |
|
129 |
|
130 processes = [ |
|
131 multiprocessing.Process(target=semaphore_func, |
|
132 args=(sema, mutex, running)) |
|
133 for i in range(10) |
|
134 ] |
|
135 |
|
136 for p in processes: |
|
137 p.start() |
|
138 |
|
139 for p in processes: |
|
140 p.join() |
|
141 |
|
142 |
|
143 #### TEST_JOIN_TIMEOUT |
|
144 |
|
145 def join_timeout_func(): |
|
146 print '\tchild sleeping' |
|
147 time.sleep(5.5) |
|
148 print '\n\tchild terminating' |
|
149 |
|
150 def test_join_timeout(): |
|
151 p = multiprocessing.Process(target=join_timeout_func) |
|
152 p.start() |
|
153 |
|
154 print 'waiting for process to finish' |
|
155 |
|
156 while 1: |
|
157 p.join(timeout=1) |
|
158 if not p.is_alive(): |
|
159 break |
|
160 print '.', |
|
161 sys.stdout.flush() |
|
162 |
|
163 |
|
164 #### TEST_EVENT |
|
165 |
|
166 def event_func(event): |
|
167 print '\t%r is waiting' % multiprocessing.current_process() |
|
168 event.wait() |
|
169 print '\t%r has woken up' % multiprocessing.current_process() |
|
170 |
|
171 def test_event(): |
|
172 event = multiprocessing.Event() |
|
173 |
|
174 processes = [multiprocessing.Process(target=event_func, args=(event,)) |
|
175 for i in range(5)] |
|
176 |
|
177 for p in processes: |
|
178 p.start() |
|
179 |
|
180 print 'main is sleeping' |
|
181 time.sleep(2) |
|
182 |
|
183 print 'main is setting event' |
|
184 event.set() |
|
185 |
|
186 for p in processes: |
|
187 p.join() |
|
188 |
|
189 |
|
190 #### TEST_SHAREDVALUES |
|
191 |
|
192 def sharedvalues_func(values, arrays, shared_values, shared_arrays): |
|
193 for i in range(len(values)): |
|
194 v = values[i][1] |
|
195 sv = shared_values[i].value |
|
196 assert v == sv |
|
197 |
|
198 for i in range(len(values)): |
|
199 a = arrays[i][1] |
|
200 sa = list(shared_arrays[i][:]) |
|
201 assert a == sa |
|
202 |
|
203 print 'Tests passed' |
|
204 |
|
205 def test_sharedvalues(): |
|
206 values = [ |
|
207 ('i', 10), |
|
208 ('h', -2), |
|
209 ('d', 1.25) |
|
210 ] |
|
211 arrays = [ |
|
212 ('i', range(100)), |
|
213 ('d', [0.25 * i for i in range(100)]), |
|
214 ('H', range(1000)) |
|
215 ] |
|
216 |
|
217 shared_values = [multiprocessing.Value(id, v) for id, v in values] |
|
218 shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] |
|
219 |
|
220 p = multiprocessing.Process( |
|
221 target=sharedvalues_func, |
|
222 args=(values, arrays, shared_values, shared_arrays) |
|
223 ) |
|
224 p.start() |
|
225 p.join() |
|
226 |
|
227 assert p.exitcode == 0 |
|
228 |
|
229 |
|
230 #### |
|
231 |
|
232 def test(namespace=multiprocessing): |
|
233 global multiprocessing |
|
234 |
|
235 multiprocessing = namespace |
|
236 |
|
237 for func in [ test_value, test_queue, test_condition, |
|
238 test_semaphore, test_join_timeout, test_event, |
|
239 test_sharedvalues ]: |
|
240 |
|
241 print '\n\t######## %s\n' % func.__name__ |
|
242 func() |
|
243 |
|
244 ignore = multiprocessing.active_children() # cleanup any old processes |
|
245 if hasattr(multiprocessing, '_debug_info'): |
|
246 info = multiprocessing._debug_info() |
|
247 if info: |
|
248 print info |
|
249 raise ValueError, 'there should be no positive refcounts left' |
|
250 |
|
251 |
|
252 if __name__ == '__main__': |
|
253 multiprocessing.freeze_support() |
|
254 |
|
255 assert len(sys.argv) in (1, 2) |
|
256 |
|
257 if len(sys.argv) == 1 or sys.argv[1] == 'processes': |
|
258 print ' Using processes '.center(79, '-') |
|
259 namespace = multiprocessing |
|
260 elif sys.argv[1] == 'manager': |
|
261 print ' Using processes and a manager '.center(79, '-') |
|
262 namespace = multiprocessing.Manager() |
|
263 namespace.Process = multiprocessing.Process |
|
264 namespace.current_process = multiprocessing.current_process |
|
265 namespace.active_children = multiprocessing.active_children |
|
266 elif sys.argv[1] == 'threads': |
|
267 print ' Using threads '.center(79, '-') |
|
268 import multiprocessing.dummy as namespace |
|
269 else: |
|
270 print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0] |
|
271 raise SystemExit, 2 |
|
272 |
|
273 test(namespace) |