symbian-qemu-0.9.1-12/python-2.6.1/Doc/includes/mp_pool.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #
       
     2 # A test of `multiprocessing.Pool` class
       
     3 #
       
     4 
       
     5 import multiprocessing
       
     6 import time
       
     7 import random
       
     8 import sys
       
     9 
       
    10 #
       
    11 # Functions used by test code
       
    12 #
       
    13 
       
    14 def calculate(func, args):
       
    15     result = func(*args)
       
    16     return '%s says that %s%s = %s' % (
       
    17         multiprocessing.current_process().name,
       
    18         func.__name__, args, result
       
    19         )
       
    20 
       
    21 def calculatestar(args):
       
    22     return calculate(*args)
       
    23 
       
    24 def mul(a, b):
       
    25     time.sleep(0.5*random.random())
       
    26     return a * b
       
    27 
       
    28 def plus(a, b):
       
    29     time.sleep(0.5*random.random())
       
    30     return a + b
       
    31 
       
    32 def f(x):
       
    33     return 1.0 / (x-5.0)
       
    34 
       
    35 def pow3(x):
       
    36     return x**3
       
    37 
       
    38 def noop(x):
       
    39     pass
       
    40 
       
    41 #
       
    42 # Test code
       
    43 #
       
    44 
       
    45 def test():
       
    46     print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
       
    47 
       
    48     #
       
    49     # Create pool
       
    50     #
       
    51 
       
    52     PROCESSES = 4
       
    53     print 'Creating pool with %d processes\n' % PROCESSES
       
    54     pool = multiprocessing.Pool(PROCESSES)
       
    55     print 'pool = %s' % pool
       
    56     print
       
    57 
       
    58     #
       
    59     # Tests
       
    60     #
       
    61 
       
    62     TASKS = [(mul, (i, 7)) for i in range(10)] + \
       
    63             [(plus, (i, 8)) for i in range(10)]
       
    64 
       
    65     results = [pool.apply_async(calculate, t) for t in TASKS]
       
    66     imap_it = pool.imap(calculatestar, TASKS)
       
    67     imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
       
    68 
       
    69     print 'Ordered results using pool.apply_async():'
       
    70     for r in results:
       
    71         print '\t', r.get()
       
    72     print
       
    73 
       
    74     print 'Ordered results using pool.imap():'
       
    75     for x in imap_it:
       
    76         print '\t', x
       
    77     print
       
    78 
       
    79     print 'Unordered results using pool.imap_unordered():'
       
    80     for x in imap_unordered_it:
       
    81         print '\t', x
       
    82     print
       
    83 
       
    84     print 'Ordered results using pool.map() --- will block till complete:'
       
    85     for x in pool.map(calculatestar, TASKS):
       
    86         print '\t', x
       
    87     print
       
    88 
       
    89     #
       
    90     # Simple benchmarks
       
    91     #
       
    92 
       
    93     N = 100000
       
    94     print 'def pow3(x): return x**3'
       
    95 
       
    96     t = time.time()
       
    97     A = map(pow3, xrange(N))
       
    98     print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
       
    99           (N, time.time() - t)
       
   100 
       
   101     t = time.time()
       
   102     B = pool.map(pow3, xrange(N))
       
   103     print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
       
   104           (N, time.time() - t)
       
   105 
       
   106     t = time.time()
       
   107     C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
       
   108     print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
       
   109           ' seconds' % (N, N//8, time.time() - t)
       
   110 
       
   111     assert A == B == C, (len(A), len(B), len(C))
       
   112     print
       
   113 
       
   114     L = [None] * 1000000
       
   115     print 'def noop(x): pass'
       
   116     print 'L = [None] * 1000000'
       
   117 
       
   118     t = time.time()
       
   119     A = map(noop, L)
       
   120     print '\tmap(noop, L):\n\t\t%s seconds' % \
       
   121           (time.time() - t)
       
   122 
       
   123     t = time.time()
       
   124     B = pool.map(noop, L)
       
   125     print '\tpool.map(noop, L):\n\t\t%s seconds' % \
       
   126           (time.time() - t)
       
   127 
       
   128     t = time.time()
       
   129     C = list(pool.imap(noop, L, chunksize=len(L)//8))
       
   130     print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
       
   131           (len(L)//8, time.time() - t)
       
   132 
       
   133     assert A == B == C, (len(A), len(B), len(C))
       
   134     print
       
   135 
       
   136     del A, B, C, L
       
   137 
       
   138     #
       
   139     # Test error handling
       
   140     #
       
   141 
       
   142     print 'Testing error handling:'
       
   143 
       
   144     try:
       
   145         print pool.apply(f, (5,))
       
   146     except ZeroDivisionError:
       
   147         print '\tGot ZeroDivisionError as expected from pool.apply()'
       
   148     else:
       
   149         raise AssertionError, 'expected ZeroDivisionError'
       
   150 
       
   151     try:
       
   152         print pool.map(f, range(10))
       
   153     except ZeroDivisionError:
       
   154         print '\tGot ZeroDivisionError as expected from pool.map()'
       
   155     else:
       
   156         raise AssertionError, 'expected ZeroDivisionError'
       
   157 
       
   158     try:
       
   159         print list(pool.imap(f, range(10)))
       
   160     except ZeroDivisionError:
       
   161         print '\tGot ZeroDivisionError as expected from list(pool.imap())'
       
   162     else:
       
   163         raise AssertionError, 'expected ZeroDivisionError'
       
   164 
       
   165     it = pool.imap(f, range(10))
       
   166     for i in range(10):
       
   167         try:
       
   168             x = it.next()
       
   169         except ZeroDivisionError:
       
   170             if i == 5:
       
   171                 pass
       
   172         except StopIteration:
       
   173             break
       
   174         else:
       
   175             if i == 5:
       
   176                 raise AssertionError, 'expected ZeroDivisionError'
       
   177 
       
   178     assert i == 9
       
   179     print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
       
   180     print
       
   181 
       
   182     #
       
   183     # Testing timeouts
       
   184     #
       
   185 
       
   186     print 'Testing ApplyResult.get() with timeout:',
       
   187     res = pool.apply_async(calculate, TASKS[0])
       
   188     while 1:
       
   189         sys.stdout.flush()
       
   190         try:
       
   191             sys.stdout.write('\n\t%s' % res.get(0.02))
       
   192             break
       
   193         except multiprocessing.TimeoutError:
       
   194             sys.stdout.write('.')
       
   195     print
       
   196     print
       
   197 
       
   198     print 'Testing IMapIterator.next() with timeout:',
       
   199     it = pool.imap(calculatestar, TASKS)
       
   200     while 1:
       
   201         sys.stdout.flush()
       
   202         try:
       
   203             sys.stdout.write('\n\t%s' % it.next(0.02))
       
   204         except StopIteration:
       
   205             break
       
   206         except multiprocessing.TimeoutError:
       
   207             sys.stdout.write('.')
       
   208     print
       
   209     print
       
   210 
       
   211     #
       
   212     # Testing callback
       
   213     #
       
   214 
       
   215     print 'Testing callback:'
       
   216 
       
   217     A = []
       
   218     B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
       
   219 
       
   220     r = pool.apply_async(mul, (7, 8), callback=A.append)
       
   221     r.wait()
       
   222 
       
   223     r = pool.map_async(pow3, range(10), callback=A.extend)
       
   224     r.wait()
       
   225 
       
   226     if A == B:
       
   227         print '\tcallbacks succeeded\n'
       
   228     else:
       
   229         print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
       
   230 
       
   231     #
       
   232     # Check there are no outstanding tasks
       
   233     #
       
   234 
       
   235     assert not pool._cache, 'cache = %r' % pool._cache
       
   236 
       
   237     #
       
   238     # Check close() methods
       
   239     #
       
   240 
       
   241     print 'Testing close():'
       
   242 
       
   243     for worker in pool._pool:
       
   244         assert worker.is_alive()
       
   245 
       
   246     result = pool.apply_async(time.sleep, [0.5])
       
   247     pool.close()
       
   248     pool.join()
       
   249 
       
   250     assert result.get() is None
       
   251 
       
   252     for worker in pool._pool:
       
   253         assert not worker.is_alive()
       
   254 
       
   255     print '\tclose() succeeded\n'
       
   256 
       
   257     #
       
   258     # Check terminate() method
       
   259     #
       
   260 
       
   261     print 'Testing terminate():'
       
   262 
       
   263     pool = multiprocessing.Pool(2)
       
   264     DELTA = 0.1
       
   265     ignore = pool.apply(pow3, [2])
       
   266     results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
       
   267     pool.terminate()
       
   268     pool.join()
       
   269 
       
   270     for worker in pool._pool:
       
   271         assert not worker.is_alive()
       
   272 
       
   273     print '\tterminate() succeeded\n'
       
   274 
       
   275     #
       
   276     # Check garbage collection
       
   277     #
       
   278 
       
   279     print 'Testing garbage collection:'
       
   280 
       
   281     pool = multiprocessing.Pool(2)
       
   282     DELTA = 0.1
       
   283     processes = pool._pool
       
   284     ignore = pool.apply(pow3, [2])
       
   285     results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
       
   286 
       
   287     results = pool = None
       
   288 
       
   289     time.sleep(DELTA * 2)
       
   290 
       
   291     for worker in processes:
       
   292         assert not worker.is_alive()
       
   293 
       
   294     print '\tgarbage collection succeeded\n'
       
   295 
       
   296 
       
   297 if __name__ == '__main__':
       
   298     multiprocessing.freeze_support()
       
   299 
       
   300     assert len(sys.argv) in (1, 2)
       
   301 
       
   302     if len(sys.argv) == 1 or sys.argv[1] == 'processes':
       
   303         print ' Using processes '.center(79, '-')
       
   304     elif sys.argv[1] == 'threads':
       
   305         print ' Using threads '.center(79, '-')
       
   306         import multiprocessing.dummy as multiprocessing
       
   307     else:
       
   308         print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
       
   309         raise SystemExit(2)
       
   310 
       
   311     test()