symbian-qemu-0.9.1-12/python-2.6.1/Doc/includes/mp_synchronize.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #
       
     2 # A test file for the `multiprocessing` package
       
     3 #
       
     4 
       
     5 import time, sys, random
       
     6 from Queue import Empty
       
     7 
       
     8 import multiprocessing               # may get overwritten
       
     9 
       
    10 
       
    11 #### TEST_VALUE
       
    12 
       
    13 def value_func(running, mutex):
       
    14     random.seed()
       
    15     time.sleep(random.random()*4)
       
    16 
       
    17     mutex.acquire()
       
    18     print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
       
    19     running.value -= 1
       
    20     mutex.release()
       
    21 
       
    22 def test_value():
       
    23     TASKS = 10
       
    24     running = multiprocessing.Value('i', TASKS)
       
    25     mutex = multiprocessing.Lock()
       
    26 
       
    27     for i in range(TASKS):
       
    28         p = multiprocessing.Process(target=value_func, args=(running, mutex))
       
    29         p.start()
       
    30 
       
    31     while running.value > 0:
       
    32         time.sleep(0.08)
       
    33         mutex.acquire()
       
    34         print running.value,
       
    35         sys.stdout.flush()
       
    36         mutex.release()
       
    37 
       
    38     print
       
    39     print 'No more running processes'
       
    40 
       
    41 
       
    42 #### TEST_QUEUE
       
    43 
       
    44 def queue_func(queue):
       
    45     for i in range(30):
       
    46         time.sleep(0.5 * random.random())
       
    47         queue.put(i*i)
       
    48     queue.put('STOP')
       
    49 
       
    50 def test_queue():
       
    51     q = multiprocessing.Queue()
       
    52 
       
    53     p = multiprocessing.Process(target=queue_func, args=(q,))
       
    54     p.start()
       
    55 
       
    56     o = None
       
    57     while o != 'STOP':
       
    58         try:
       
    59             o = q.get(timeout=0.3)
       
    60             print o,
       
    61             sys.stdout.flush()
       
    62         except Empty:
       
    63             print 'TIMEOUT'
       
    64 
       
    65     print
       
    66 
       
    67 
       
    68 #### TEST_CONDITION
       
    69 
       
    70 def condition_func(cond):
       
    71     cond.acquire()
       
    72     print '\t' + str(cond)
       
    73     time.sleep(2)
       
    74     print '\tchild is notifying'
       
    75     print '\t' + str(cond)
       
    76     cond.notify()
       
    77     cond.release()
       
    78 
       
    79 def test_condition():
       
    80     cond = multiprocessing.Condition()
       
    81 
       
    82     p = multiprocessing.Process(target=condition_func, args=(cond,))
       
    83     print cond
       
    84 
       
    85     cond.acquire()
       
    86     print cond
       
    87     cond.acquire()
       
    88     print cond
       
    89 
       
    90     p.start()
       
    91 
       
    92     print 'main is waiting'
       
    93     cond.wait()
       
    94     print 'main has woken up'
       
    95 
       
    96     print cond
       
    97     cond.release()
       
    98     print cond
       
    99     cond.release()
       
   100 
       
   101     p.join()
       
   102     print cond
       
   103 
       
   104 
       
   105 #### TEST_SEMAPHORE
       
   106 
       
   107 def semaphore_func(sema, mutex, running):
       
   108     sema.acquire()
       
   109 
       
   110     mutex.acquire()
       
   111     running.value += 1
       
   112     print running.value, 'tasks are running'
       
   113     mutex.release()
       
   114 
       
   115     random.seed()
       
   116     time.sleep(random.random()*2)
       
   117 
       
   118     mutex.acquire()
       
   119     running.value -= 1
       
   120     print '%s has finished' % multiprocessing.current_process()
       
   121     mutex.release()
       
   122 
       
   123     sema.release()
       
   124 
       
   125 def test_semaphore():
       
   126     sema = multiprocessing.Semaphore(3)
       
   127     mutex = multiprocessing.RLock()
       
   128     running = multiprocessing.Value('i', 0)
       
   129 
       
   130     processes = [
       
   131         multiprocessing.Process(target=semaphore_func,
       
   132                                 args=(sema, mutex, running))
       
   133         for i in range(10)
       
   134         ]
       
   135 
       
   136     for p in processes:
       
   137         p.start()
       
   138 
       
   139     for p in processes:
       
   140         p.join()
       
   141 
       
   142 
       
   143 #### TEST_JOIN_TIMEOUT
       
   144 
       
   145 def join_timeout_func():
       
   146     print '\tchild sleeping'
       
   147     time.sleep(5.5)
       
   148     print '\n\tchild terminating'
       
   149 
       
   150 def test_join_timeout():
       
   151     p = multiprocessing.Process(target=join_timeout_func)
       
   152     p.start()
       
   153 
       
   154     print 'waiting for process to finish'
       
   155 
       
   156     while 1:
       
   157         p.join(timeout=1)
       
   158         if not p.is_alive():
       
   159             break
       
   160         print '.',
       
   161         sys.stdout.flush()
       
   162 
       
   163 
       
   164 #### TEST_EVENT
       
   165 
       
   166 def event_func(event):
       
   167     print '\t%r is waiting' % multiprocessing.current_process()
       
   168     event.wait()
       
   169     print '\t%r has woken up' % multiprocessing.current_process()
       
   170 
       
   171 def test_event():
       
   172     event = multiprocessing.Event()
       
   173 
       
   174     processes = [multiprocessing.Process(target=event_func, args=(event,))
       
   175                  for i in range(5)]
       
   176 
       
   177     for p in processes:
       
   178         p.start()
       
   179 
       
   180     print 'main is sleeping'
       
   181     time.sleep(2)
       
   182 
       
   183     print 'main is setting event'
       
   184     event.set()
       
   185 
       
   186     for p in processes:
       
   187         p.join()
       
   188 
       
   189 
       
   190 #### TEST_SHAREDVALUES
       
   191 
       
   192 def sharedvalues_func(values, arrays, shared_values, shared_arrays):
       
   193     for i in range(len(values)):
       
   194         v = values[i][1]
       
   195         sv = shared_values[i].value
       
   196         assert v == sv
       
   197 
       
   198     for i in range(len(values)):
       
   199         a = arrays[i][1]
       
   200         sa = list(shared_arrays[i][:])
       
   201         assert a == sa
       
   202 
       
   203     print 'Tests passed'
       
   204 
       
   205 def test_sharedvalues():
       
   206     values = [
       
   207         ('i', 10),
       
   208         ('h', -2),
       
   209         ('d', 1.25)
       
   210         ]
       
   211     arrays = [
       
   212         ('i', range(100)),
       
   213         ('d', [0.25 * i for i in range(100)]),
       
   214         ('H', range(1000))
       
   215         ]
       
   216 
       
   217     shared_values = [multiprocessing.Value(id, v) for id, v in values]
       
   218     shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
       
   219 
       
   220     p = multiprocessing.Process(
       
   221         target=sharedvalues_func,
       
   222         args=(values, arrays, shared_values, shared_arrays)
       
   223         )
       
   224     p.start()
       
   225     p.join()
       
   226 
       
   227     assert p.exitcode == 0
       
   228 
       
   229 
       
   230 ####
       
   231 
       
   232 def test(namespace=multiprocessing):
       
   233     global multiprocessing
       
   234 
       
   235     multiprocessing = namespace
       
   236 
       
   237     for func in [ test_value, test_queue, test_condition,
       
   238                   test_semaphore, test_join_timeout, test_event,
       
   239                   test_sharedvalues ]:
       
   240 
       
   241         print '\n\t######## %s\n' % func.__name__
       
   242         func()
       
   243 
       
   244     ignore = multiprocessing.active_children()      # cleanup any old processes
       
   245     if hasattr(multiprocessing, '_debug_info'):
       
   246         info = multiprocessing._debug_info()
       
   247         if info:
       
   248             print info
       
   249             raise ValueError, 'there should be no positive refcounts left'
       
   250 
       
   251 
       
   252 if __name__ == '__main__':
       
   253     multiprocessing.freeze_support()
       
   254 
       
   255     assert len(sys.argv) in (1, 2)
       
   256 
       
   257     if len(sys.argv) == 1 or sys.argv[1] == 'processes':
       
   258         print ' Using processes '.center(79, '-')
       
   259         namespace = multiprocessing
       
   260     elif sys.argv[1] == 'manager':
       
   261         print ' Using processes and a manager '.center(79, '-')
       
   262         namespace = multiprocessing.Manager()
       
   263         namespace.Process = multiprocessing.Process
       
   264         namespace.current_process = multiprocessing.current_process
       
   265         namespace.active_children = multiprocessing.active_children
       
   266     elif sys.argv[1] == 'threads':
       
   267         print ' Using threads '.center(79, '-')
       
   268         import multiprocessing.dummy as namespace
       
   269     else:
       
   270         print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
       
   271         raise SystemExit, 2
       
   272 
       
   273     test(namespace)