|
1 # |
|
2 # A test of `multiprocessing.Pool` class |
|
3 # |
|
4 |
|
5 import multiprocessing |
|
6 import time |
|
7 import random |
|
8 import sys |
|
9 |
|
10 # |
|
11 # Functions used by test code |
|
12 # |
|
13 |
|
14 def calculate(func, args): |
|
15 result = func(*args) |
|
16 return '%s says that %s%s = %s' % ( |
|
17 multiprocessing.current_process().name, |
|
18 func.__name__, args, result |
|
19 ) |
|
20 |
|
21 def calculatestar(args): |
|
22 return calculate(*args) |
|
23 |
|
24 def mul(a, b): |
|
25 time.sleep(0.5*random.random()) |
|
26 return a * b |
|
27 |
|
28 def plus(a, b): |
|
29 time.sleep(0.5*random.random()) |
|
30 return a + b |
|
31 |
|
32 def f(x): |
|
33 return 1.0 / (x-5.0) |
|
34 |
|
35 def pow3(x): |
|
36 return x**3 |
|
37 |
|
38 def noop(x): |
|
39 pass |
|
40 |
|
41 # |
|
42 # Test code |
|
43 # |
|
44 |
|
45 def test(): |
|
46 print 'cpu_count() = %d\n' % multiprocessing.cpu_count() |
|
47 |
|
48 # |
|
49 # Create pool |
|
50 # |
|
51 |
|
52 PROCESSES = 4 |
|
53 print 'Creating pool with %d processes\n' % PROCESSES |
|
54 pool = multiprocessing.Pool(PROCESSES) |
|
55 print 'pool = %s' % pool |
|
56 print |
|
57 |
|
58 # |
|
59 # Tests |
|
60 # |
|
61 |
|
62 TASKS = [(mul, (i, 7)) for i in range(10)] + \ |
|
63 [(plus, (i, 8)) for i in range(10)] |
|
64 |
|
65 results = [pool.apply_async(calculate, t) for t in TASKS] |
|
66 imap_it = pool.imap(calculatestar, TASKS) |
|
67 imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) |
|
68 |
|
69 print 'Ordered results using pool.apply_async():' |
|
70 for r in results: |
|
71 print '\t', r.get() |
|
72 print |
|
73 |
|
74 print 'Ordered results using pool.imap():' |
|
75 for x in imap_it: |
|
76 print '\t', x |
|
77 print |
|
78 |
|
79 print 'Unordered results using pool.imap_unordered():' |
|
80 for x in imap_unordered_it: |
|
81 print '\t', x |
|
82 print |
|
83 |
|
84 print 'Ordered results using pool.map() --- will block till complete:' |
|
85 for x in pool.map(calculatestar, TASKS): |
|
86 print '\t', x |
|
87 print |
|
88 |
|
89 # |
|
90 # Simple benchmarks |
|
91 # |
|
92 |
|
93 N = 100000 |
|
94 print 'def pow3(x): return x**3' |
|
95 |
|
96 t = time.time() |
|
97 A = map(pow3, xrange(N)) |
|
98 print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \ |
|
99 (N, time.time() - t) |
|
100 |
|
101 t = time.time() |
|
102 B = pool.map(pow3, xrange(N)) |
|
103 print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \ |
|
104 (N, time.time() - t) |
|
105 |
|
106 t = time.time() |
|
107 C = list(pool.imap(pow3, xrange(N), chunksize=N//8)) |
|
108 print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \ |
|
109 ' seconds' % (N, N//8, time.time() - t) |
|
110 |
|
111 assert A == B == C, (len(A), len(B), len(C)) |
|
112 print |
|
113 |
|
114 L = [None] * 1000000 |
|
115 print 'def noop(x): pass' |
|
116 print 'L = [None] * 1000000' |
|
117 |
|
118 t = time.time() |
|
119 A = map(noop, L) |
|
120 print '\tmap(noop, L):\n\t\t%s seconds' % \ |
|
121 (time.time() - t) |
|
122 |
|
123 t = time.time() |
|
124 B = pool.map(noop, L) |
|
125 print '\tpool.map(noop, L):\n\t\t%s seconds' % \ |
|
126 (time.time() - t) |
|
127 |
|
128 t = time.time() |
|
129 C = list(pool.imap(noop, L, chunksize=len(L)//8)) |
|
130 print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \ |
|
131 (len(L)//8, time.time() - t) |
|
132 |
|
133 assert A == B == C, (len(A), len(B), len(C)) |
|
134 print |
|
135 |
|
136 del A, B, C, L |
|
137 |
|
138 # |
|
139 # Test error handling |
|
140 # |
|
141 |
|
142 print 'Testing error handling:' |
|
143 |
|
144 try: |
|
145 print pool.apply(f, (5,)) |
|
146 except ZeroDivisionError: |
|
147 print '\tGot ZeroDivisionError as expected from pool.apply()' |
|
148 else: |
|
149 raise AssertionError, 'expected ZeroDivisionError' |
|
150 |
|
151 try: |
|
152 print pool.map(f, range(10)) |
|
153 except ZeroDivisionError: |
|
154 print '\tGot ZeroDivisionError as expected from pool.map()' |
|
155 else: |
|
156 raise AssertionError, 'expected ZeroDivisionError' |
|
157 |
|
158 try: |
|
159 print list(pool.imap(f, range(10))) |
|
160 except ZeroDivisionError: |
|
161 print '\tGot ZeroDivisionError as expected from list(pool.imap())' |
|
162 else: |
|
163 raise AssertionError, 'expected ZeroDivisionError' |
|
164 |
|
165 it = pool.imap(f, range(10)) |
|
166 for i in range(10): |
|
167 try: |
|
168 x = it.next() |
|
169 except ZeroDivisionError: |
|
170 if i == 5: |
|
171 pass |
|
172 except StopIteration: |
|
173 break |
|
174 else: |
|
175 if i == 5: |
|
176 raise AssertionError, 'expected ZeroDivisionError' |
|
177 |
|
178 assert i == 9 |
|
179 print '\tGot ZeroDivisionError as expected from IMapIterator.next()' |
|
180 print |
|
181 |
|
182 # |
|
183 # Testing timeouts |
|
184 # |
|
185 |
|
186 print 'Testing ApplyResult.get() with timeout:', |
|
187 res = pool.apply_async(calculate, TASKS[0]) |
|
188 while 1: |
|
189 sys.stdout.flush() |
|
190 try: |
|
191 sys.stdout.write('\n\t%s' % res.get(0.02)) |
|
192 break |
|
193 except multiprocessing.TimeoutError: |
|
194 sys.stdout.write('.') |
|
195 print |
|
196 print |
|
197 |
|
198 print 'Testing IMapIterator.next() with timeout:', |
|
199 it = pool.imap(calculatestar, TASKS) |
|
200 while 1: |
|
201 sys.stdout.flush() |
|
202 try: |
|
203 sys.stdout.write('\n\t%s' % it.next(0.02)) |
|
204 except StopIteration: |
|
205 break |
|
206 except multiprocessing.TimeoutError: |
|
207 sys.stdout.write('.') |
|
208 print |
|
209 print |
|
210 |
|
211 # |
|
212 # Testing callback |
|
213 # |
|
214 |
|
215 print 'Testing callback:' |
|
216 |
|
217 A = [] |
|
218 B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729] |
|
219 |
|
220 r = pool.apply_async(mul, (7, 8), callback=A.append) |
|
221 r.wait() |
|
222 |
|
223 r = pool.map_async(pow3, range(10), callback=A.extend) |
|
224 r.wait() |
|
225 |
|
226 if A == B: |
|
227 print '\tcallbacks succeeded\n' |
|
228 else: |
|
229 print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B) |
|
230 |
|
231 # |
|
232 # Check there are no outstanding tasks |
|
233 # |
|
234 |
|
235 assert not pool._cache, 'cache = %r' % pool._cache |
|
236 |
|
237 # |
|
238 # Check close() methods |
|
239 # |
|
240 |
|
241 print 'Testing close():' |
|
242 |
|
243 for worker in pool._pool: |
|
244 assert worker.is_alive() |
|
245 |
|
246 result = pool.apply_async(time.sleep, [0.5]) |
|
247 pool.close() |
|
248 pool.join() |
|
249 |
|
250 assert result.get() is None |
|
251 |
|
252 for worker in pool._pool: |
|
253 assert not worker.is_alive() |
|
254 |
|
255 print '\tclose() succeeded\n' |
|
256 |
|
257 # |
|
258 # Check terminate() method |
|
259 # |
|
260 |
|
261 print 'Testing terminate():' |
|
262 |
|
263 pool = multiprocessing.Pool(2) |
|
264 DELTA = 0.1 |
|
265 ignore = pool.apply(pow3, [2]) |
|
266 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] |
|
267 pool.terminate() |
|
268 pool.join() |
|
269 |
|
270 for worker in pool._pool: |
|
271 assert not worker.is_alive() |
|
272 |
|
273 print '\tterminate() succeeded\n' |
|
274 |
|
275 # |
|
276 # Check garbage collection |
|
277 # |
|
278 |
|
279 print 'Testing garbage collection:' |
|
280 |
|
281 pool = multiprocessing.Pool(2) |
|
282 DELTA = 0.1 |
|
283 processes = pool._pool |
|
284 ignore = pool.apply(pow3, [2]) |
|
285 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] |
|
286 |
|
287 results = pool = None |
|
288 |
|
289 time.sleep(DELTA * 2) |
|
290 |
|
291 for worker in processes: |
|
292 assert not worker.is_alive() |
|
293 |
|
294 print '\tgarbage collection succeeded\n' |
|
295 |
|
296 |
|
297 if __name__ == '__main__': |
|
298 multiprocessing.freeze_support() |
|
299 |
|
300 assert len(sys.argv) in (1, 2) |
|
301 |
|
302 if len(sys.argv) == 1 or sys.argv[1] == 'processes': |
|
303 print ' Using processes '.center(79, '-') |
|
304 elif sys.argv[1] == 'threads': |
|
305 print ' Using threads '.center(79, '-') |
|
306 import multiprocessing.dummy as multiprocessing |
|
307 else: |
|
308 print 'Usage:\n\t%s [processes | threads]' % sys.argv[0] |
|
309 raise SystemExit(2) |
|
310 |
|
311 test() |