|
1 # |
|
2 # Simple example which uses a pool of workers to carry out some tasks. |
|
3 # |
|
4 # Notice that the results will probably not come out of the output |
|
5 # queue in the same in the same order as the corresponding tasks were |
|
6 # put on the input queue. If it is important to get the results back |
|
7 # in the original order then consider using `Pool.map()` or |
|
8 # `Pool.imap()` (which will save on the amount of code needed anyway). |
|
9 # |
|
10 |
|
11 import time |
|
12 import random |
|
13 |
|
14 from multiprocessing import Process, Queue, current_process, freeze_support |
|
15 |
|
16 # |
|
17 # Function run by worker processes |
|
18 # |
|
19 |
|
20 def worker(input, output): |
|
21 for func, args in iter(input.get, 'STOP'): |
|
22 result = calculate(func, args) |
|
23 output.put(result) |
|
24 |
|
25 # |
|
26 # Function used to calculate result |
|
27 # |
|
28 |
|
29 def calculate(func, args): |
|
30 result = func(*args) |
|
31 return '%s says that %s%s = %s' % \ |
|
32 (current_process().name, func.__name__, args, result) |
|
33 |
|
34 # |
|
35 # Functions referenced by tasks |
|
36 # |
|
37 |
|
38 def mul(a, b): |
|
39 time.sleep(0.5*random.random()) |
|
40 return a * b |
|
41 |
|
42 def plus(a, b): |
|
43 time.sleep(0.5*random.random()) |
|
44 return a + b |
|
45 |
|
46 # |
|
47 # |
|
48 # |
|
49 |
|
50 def test(): |
|
51 NUMBER_OF_PROCESSES = 4 |
|
52 TASKS1 = [(mul, (i, 7)) for i in range(20)] |
|
53 TASKS2 = [(plus, (i, 8)) for i in range(10)] |
|
54 |
|
55 # Create queues |
|
56 task_queue = Queue() |
|
57 done_queue = Queue() |
|
58 |
|
59 # Submit tasks |
|
60 for task in TASKS1: |
|
61 task_queue.put(task) |
|
62 |
|
63 # Start worker processes |
|
64 for i in range(NUMBER_OF_PROCESSES): |
|
65 Process(target=worker, args=(task_queue, done_queue)).start() |
|
66 |
|
67 # Get and print results |
|
68 print 'Unordered results:' |
|
69 for i in range(len(TASKS1)): |
|
70 print '\t', done_queue.get() |
|
71 |
|
72 # Add more tasks using `put()` |
|
73 for task in TASKS2: |
|
74 task_queue.put(task) |
|
75 |
|
76 # Get and print some more results |
|
77 for i in range(len(TASKS2)): |
|
78 print '\t', done_queue.get() |
|
79 |
|
80 # Tell child processes to stop |
|
81 for i in range(NUMBER_OF_PROCESSES): |
|
82 task_queue.put('STOP') |
|
83 |
|
84 |
|
85 if __name__ == '__main__': |
|
86 freeze_support() |
|
87 test() |