symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_multiprocessing.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #!/usr/bin/env python
       
     2 
       
     3 #
       
     4 # Unit tests for the multiprocessing package
       
     5 #
       
     6 
       
     7 import unittest
       
     8 import threading
       
     9 import Queue
       
    10 import time
       
    11 import sys
       
    12 import os
       
    13 import gc
       
    14 import signal
       
    15 import array
       
    16 import copy
       
    17 import socket
       
    18 import random
       
    19 import logging
       
    20 
       
    21 
       
    22 # Work around broken sem_open implementations
       
    23 try:
       
    24     import multiprocessing.synchronize
       
    25 except ImportError, e:
       
    26     from test.test_support import TestSkipped
       
    27     raise TestSkipped(e)
       
    28 
       
    29 import multiprocessing.dummy
       
    30 import multiprocessing.connection
       
    31 import multiprocessing.managers
       
    32 import multiprocessing.heap
       
    33 import multiprocessing.pool
       
    34 import _multiprocessing
       
    35 
       
    36 from multiprocessing import util
       
    37 
       
    38 #
       
    39 #
       
    40 #
       
    41 
       
    42 latin = str
       
    43 
       
    44 #
       
    45 # Constants
       
    46 #
       
    47 
       
    48 LOG_LEVEL = util.SUBWARNING
       
    49 #LOG_LEVEL = logging.WARNING
       
    50 
       
    51 DELTA = 0.1
       
    52 CHECK_TIMINGS = False     # making true makes tests take a lot longer
       
    53                           # and can sometimes cause some non-serious
       
    54                           # failures because some calls block a bit
       
    55                           # longer than expected
       
    56 if CHECK_TIMINGS:
       
    57     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
       
    58 else:
       
    59     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
       
    60 
       
    61 HAVE_GETVALUE = not getattr(_multiprocessing,
       
    62                             'HAVE_BROKEN_SEM_GETVALUE', False)
       
    63 
       
    64 #
       
    65 # Creates a wrapper for a function which records the time it takes to finish
       
    66 #
       
    67 
       
    68 class TimingWrapper(object):
       
    69 
       
    70     def __init__(self, func):
       
    71         self.func = func
       
    72         self.elapsed = None
       
    73 
       
    74     def __call__(self, *args, **kwds):
       
    75         t = time.time()
       
    76         try:
       
    77             return self.func(*args, **kwds)
       
    78         finally:
       
    79             self.elapsed = time.time() - t
       
    80 
       
    81 #
       
    82 # Base class for test cases
       
    83 #
       
    84 
       
    85 class BaseTestCase(object):
       
    86 
       
    87     ALLOWED_TYPES = ('processes', 'manager', 'threads')
       
    88 
       
    89     def assertTimingAlmostEqual(self, a, b):
       
    90         if CHECK_TIMINGS:
       
    91             self.assertAlmostEqual(a, b, 1)
       
    92 
       
    93     def assertReturnsIfImplemented(self, value, func, *args):
       
    94         try:
       
    95             res = func(*args)
       
    96         except NotImplementedError:
       
    97             pass
       
    98         else:
       
    99             return self.assertEqual(value, res)
       
   100 
       
   101 #
       
   102 # Return the value of a semaphore
       
   103 #
       
   104 
       
   105 def get_value(self):
       
   106     try:
       
   107         return self.get_value()
       
   108     except AttributeError:
       
   109         try:
       
   110             return self._Semaphore__value
       
   111         except AttributeError:
       
   112             try:
       
   113                 return self._value
       
   114             except AttributeError:
       
   115                 raise NotImplementedError
       
   116 
       
   117 #
       
   118 # Testcases
       
   119 #
       
   120 
       
   121 class _TestProcess(BaseTestCase):
       
   122 
       
   123     ALLOWED_TYPES = ('processes', 'threads')
       
   124 
       
   125     def test_current(self):
       
   126         if self.TYPE == 'threads':
       
   127             return
       
   128 
       
   129         current = self.current_process()
       
   130         authkey = current.authkey
       
   131 
       
   132         self.assertTrue(current.is_alive())
       
   133         self.assertTrue(not current.daemon)
       
   134         self.assertTrue(isinstance(authkey, bytes))
       
   135         self.assertTrue(len(authkey) > 0)
       
   136         self.assertEqual(current.ident, os.getpid())
       
   137         self.assertEqual(current.exitcode, None)
       
   138 
       
   139     def _test(self, q, *args, **kwds):
       
   140         current = self.current_process()
       
   141         q.put(args)
       
   142         q.put(kwds)
       
   143         q.put(current.name)
       
   144         if self.TYPE != 'threads':
       
   145             q.put(bytes(current.authkey))
       
   146             q.put(current.pid)
       
   147 
       
   148     def test_process(self):
       
   149         q = self.Queue(1)
       
   150         e = self.Event()
       
   151         args = (q, 1, 2)
       
   152         kwargs = {'hello':23, 'bye':2.54}
       
   153         name = 'SomeProcess'
       
   154         p = self.Process(
       
   155             target=self._test, args=args, kwargs=kwargs, name=name
       
   156             )
       
   157         p.daemon = True
       
   158         current = self.current_process()
       
   159 
       
   160         if self.TYPE != 'threads':
       
   161             self.assertEquals(p.authkey, current.authkey)
       
   162         self.assertEquals(p.is_alive(), False)
       
   163         self.assertEquals(p.daemon, True)
       
   164         self.assertTrue(p not in self.active_children())
       
   165         self.assertTrue(type(self.active_children()) is list)
       
   166         self.assertEqual(p.exitcode, None)
       
   167 
       
   168         p.start()
       
   169 
       
   170         self.assertEquals(p.exitcode, None)
       
   171         self.assertEquals(p.is_alive(), True)
       
   172         self.assertTrue(p in self.active_children())
       
   173 
       
   174         self.assertEquals(q.get(), args[1:])
       
   175         self.assertEquals(q.get(), kwargs)
       
   176         self.assertEquals(q.get(), p.name)
       
   177         if self.TYPE != 'threads':
       
   178             self.assertEquals(q.get(), current.authkey)
       
   179             self.assertEquals(q.get(), p.pid)
       
   180 
       
   181         p.join()
       
   182 
       
   183         self.assertEquals(p.exitcode, 0)
       
   184         self.assertEquals(p.is_alive(), False)
       
   185         self.assertTrue(p not in self.active_children())
       
   186 
       
   187     def _test_terminate(self):
       
   188         time.sleep(1000)
       
   189 
       
   190     def test_terminate(self):
       
   191         if self.TYPE == 'threads':
       
   192             return
       
   193 
       
   194         p = self.Process(target=self._test_terminate)
       
   195         p.daemon = True
       
   196         p.start()
       
   197 
       
   198         self.assertEqual(p.is_alive(), True)
       
   199         self.assertTrue(p in self.active_children())
       
   200         self.assertEqual(p.exitcode, None)
       
   201 
       
   202         p.terminate()
       
   203 
       
   204         join = TimingWrapper(p.join)
       
   205         self.assertEqual(join(), None)
       
   206         self.assertTimingAlmostEqual(join.elapsed, 0.0)
       
   207 
       
   208         self.assertEqual(p.is_alive(), False)
       
   209         self.assertTrue(p not in self.active_children())
       
   210 
       
   211         p.join()
       
   212 
       
   213         # XXX sometimes get p.exitcode == 0 on Windows ...
       
   214         #self.assertEqual(p.exitcode, -signal.SIGTERM)
       
   215 
       
   216     def test_cpu_count(self):
       
   217         try:
       
   218             cpus = multiprocessing.cpu_count()
       
   219         except NotImplementedError:
       
   220             cpus = 1
       
   221         self.assertTrue(type(cpus) is int)
       
   222         self.assertTrue(cpus >= 1)
       
   223 
       
   224     def test_active_children(self):
       
   225         self.assertEqual(type(self.active_children()), list)
       
   226 
       
   227         p = self.Process(target=time.sleep, args=(DELTA,))
       
   228         self.assertTrue(p not in self.active_children())
       
   229 
       
   230         p.start()
       
   231         self.assertTrue(p in self.active_children())
       
   232 
       
   233         p.join()
       
   234         self.assertTrue(p not in self.active_children())
       
   235 
       
   236     def _test_recursion(self, wconn, id):
       
   237         from multiprocessing import forking
       
   238         wconn.send(id)
       
   239         if len(id) < 2:
       
   240             for i in range(2):
       
   241                 p = self.Process(
       
   242                     target=self._test_recursion, args=(wconn, id+[i])
       
   243                     )
       
   244                 p.start()
       
   245                 p.join()
       
   246 
       
   247     def test_recursion(self):
       
   248         rconn, wconn = self.Pipe(duplex=False)
       
   249         self._test_recursion(wconn, [])
       
   250 
       
   251         time.sleep(DELTA)
       
   252         result = []
       
   253         while rconn.poll():
       
   254             result.append(rconn.recv())
       
   255 
       
   256         expected = [
       
   257             [],
       
   258               [0],
       
   259                 [0, 0],
       
   260                 [0, 1],
       
   261               [1],
       
   262                 [1, 0],
       
   263                 [1, 1]
       
   264             ]
       
   265         self.assertEqual(result, expected)
       
   266 
       
   267 #
       
   268 #
       
   269 #
       
   270 
       
   271 class _UpperCaser(multiprocessing.Process):
       
   272 
       
   273     def __init__(self):
       
   274         multiprocessing.Process.__init__(self)
       
   275         self.child_conn, self.parent_conn = multiprocessing.Pipe()
       
   276 
       
   277     def run(self):
       
   278         self.parent_conn.close()
       
   279         for s in iter(self.child_conn.recv, None):
       
   280             self.child_conn.send(s.upper())
       
   281         self.child_conn.close()
       
   282 
       
   283     def submit(self, s):
       
   284         assert type(s) is str
       
   285         self.parent_conn.send(s)
       
   286         return self.parent_conn.recv()
       
   287 
       
   288     def stop(self):
       
   289         self.parent_conn.send(None)
       
   290         self.parent_conn.close()
       
   291         self.child_conn.close()
       
   292 
       
   293 class _TestSubclassingProcess(BaseTestCase):
       
   294 
       
   295     ALLOWED_TYPES = ('processes',)
       
   296 
       
   297     def test_subclassing(self):
       
   298         uppercaser = _UpperCaser()
       
   299         uppercaser.start()
       
   300         self.assertEqual(uppercaser.submit('hello'), 'HELLO')
       
   301         self.assertEqual(uppercaser.submit('world'), 'WORLD')
       
   302         uppercaser.stop()
       
   303         uppercaser.join()
       
   304 
       
   305 #
       
   306 #
       
   307 #
       
   308 
       
   309 def queue_empty(q):
       
   310     if hasattr(q, 'empty'):
       
   311         return q.empty()
       
   312     else:
       
   313         return q.qsize() == 0
       
   314 
       
   315 def queue_full(q, maxsize):
       
   316     if hasattr(q, 'full'):
       
   317         return q.full()
       
   318     else:
       
   319         return q.qsize() == maxsize
       
   320 
       
   321 
       
   322 class _TestQueue(BaseTestCase):
       
   323 
       
   324 
       
   325     def _test_put(self, queue, child_can_start, parent_can_continue):
       
   326         child_can_start.wait()
       
   327         for i in range(6):
       
   328             queue.get()
       
   329         parent_can_continue.set()
       
   330 
       
   331     def test_put(self):
       
   332         MAXSIZE = 6
       
   333         queue = self.Queue(maxsize=MAXSIZE)
       
   334         child_can_start = self.Event()
       
   335         parent_can_continue = self.Event()
       
   336 
       
   337         proc = self.Process(
       
   338             target=self._test_put,
       
   339             args=(queue, child_can_start, parent_can_continue)
       
   340             )
       
   341         proc.daemon = True
       
   342         proc.start()
       
   343 
       
   344         self.assertEqual(queue_empty(queue), True)
       
   345         self.assertEqual(queue_full(queue, MAXSIZE), False)
       
   346 
       
   347         queue.put(1)
       
   348         queue.put(2, True)
       
   349         queue.put(3, True, None)
       
   350         queue.put(4, False)
       
   351         queue.put(5, False, None)
       
   352         queue.put_nowait(6)
       
   353 
       
   354         # the values may be in buffer but not yet in pipe so sleep a bit
       
   355         time.sleep(DELTA)
       
   356 
       
   357         self.assertEqual(queue_empty(queue), False)
       
   358         self.assertEqual(queue_full(queue, MAXSIZE), True)
       
   359 
       
   360         put = TimingWrapper(queue.put)
       
   361         put_nowait = TimingWrapper(queue.put_nowait)
       
   362 
       
   363         self.assertRaises(Queue.Full, put, 7, False)
       
   364         self.assertTimingAlmostEqual(put.elapsed, 0)
       
   365 
       
   366         self.assertRaises(Queue.Full, put, 7, False, None)
       
   367         self.assertTimingAlmostEqual(put.elapsed, 0)
       
   368 
       
   369         self.assertRaises(Queue.Full, put_nowait, 7)
       
   370         self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
       
   371 
       
   372         self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
       
   373         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
       
   374 
       
   375         self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
       
   376         self.assertTimingAlmostEqual(put.elapsed, 0)
       
   377 
       
   378         self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
       
   379         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
       
   380 
       
   381         child_can_start.set()
       
   382         parent_can_continue.wait()
       
   383 
       
   384         self.assertEqual(queue_empty(queue), True)
       
   385         self.assertEqual(queue_full(queue, MAXSIZE), False)
       
   386 
       
   387         proc.join()
       
   388 
       
   389     def _test_get(self, queue, child_can_start, parent_can_continue):
       
   390         child_can_start.wait()
       
   391         #queue.put(1)
       
   392         queue.put(2)
       
   393         queue.put(3)
       
   394         queue.put(4)
       
   395         queue.put(5)
       
   396         parent_can_continue.set()
       
   397 
       
   398     def test_get(self):
       
   399         queue = self.Queue()
       
   400         child_can_start = self.Event()
       
   401         parent_can_continue = self.Event()
       
   402 
       
   403         proc = self.Process(
       
   404             target=self._test_get,
       
   405             args=(queue, child_can_start, parent_can_continue)
       
   406             )
       
   407         proc.daemon = True
       
   408         proc.start()
       
   409 
       
   410         self.assertEqual(queue_empty(queue), True)
       
   411 
       
   412         child_can_start.set()
       
   413         parent_can_continue.wait()
       
   414 
       
   415         time.sleep(DELTA)
       
   416         self.assertEqual(queue_empty(queue), False)
       
   417 
       
   418         # Hangs unexpectedly, remove for now
       
   419         #self.assertEqual(queue.get(), 1)
       
   420         self.assertEqual(queue.get(True, None), 2)
       
   421         self.assertEqual(queue.get(True), 3)
       
   422         self.assertEqual(queue.get(timeout=1), 4)
       
   423         self.assertEqual(queue.get_nowait(), 5)
       
   424 
       
   425         self.assertEqual(queue_empty(queue), True)
       
   426 
       
   427         get = TimingWrapper(queue.get)
       
   428         get_nowait = TimingWrapper(queue.get_nowait)
       
   429 
       
   430         self.assertRaises(Queue.Empty, get, False)
       
   431         self.assertTimingAlmostEqual(get.elapsed, 0)
       
   432 
       
   433         self.assertRaises(Queue.Empty, get, False, None)
       
   434         self.assertTimingAlmostEqual(get.elapsed, 0)
       
   435 
       
   436         self.assertRaises(Queue.Empty, get_nowait)
       
   437         self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
       
   438 
       
   439         self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
       
   440         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
       
   441 
       
   442         self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
       
   443         self.assertTimingAlmostEqual(get.elapsed, 0)
       
   444 
       
   445         self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
       
   446         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
       
   447 
       
   448         proc.join()
       
   449 
       
   450     def _test_fork(self, queue):
       
   451         for i in range(10, 20):
       
   452             queue.put(i)
       
   453         # note that at this point the items may only be buffered, so the
       
   454         # process cannot shutdown until the feeder thread has finished
       
   455         # pushing items onto the pipe.
       
   456 
       
   457     def test_fork(self):
       
   458         # Old versions of Queue would fail to create a new feeder
       
   459         # thread for a forked process if the original process had its
       
   460         # own feeder thread.  This test checks that this no longer
       
   461         # happens.
       
   462 
       
   463         queue = self.Queue()
       
   464 
       
   465         # put items on queue so that main process starts a feeder thread
       
   466         for i in range(10):
       
   467             queue.put(i)
       
   468 
       
   469         # wait to make sure thread starts before we fork a new process
       
   470         time.sleep(DELTA)
       
   471 
       
   472         # fork process
       
   473         p = self.Process(target=self._test_fork, args=(queue,))
       
   474         p.start()
       
   475 
       
   476         # check that all expected items are in the queue
       
   477         for i in range(20):
       
   478             self.assertEqual(queue.get(), i)
       
   479         self.assertRaises(Queue.Empty, queue.get, False)
       
   480 
       
   481         p.join()
       
   482 
       
   483     def test_qsize(self):
       
   484         q = self.Queue()
       
   485         try:
       
   486             self.assertEqual(q.qsize(), 0)
       
   487         except NotImplementedError:
       
   488             return
       
   489         q.put(1)
       
   490         self.assertEqual(q.qsize(), 1)
       
   491         q.put(5)
       
   492         self.assertEqual(q.qsize(), 2)
       
   493         q.get()
       
   494         self.assertEqual(q.qsize(), 1)
       
   495         q.get()
       
   496         self.assertEqual(q.qsize(), 0)
       
   497 
       
   498     def _test_task_done(self, q):
       
   499         for obj in iter(q.get, None):
       
   500             time.sleep(DELTA)
       
   501             q.task_done()
       
   502 
       
   503     def test_task_done(self):
       
   504         queue = self.JoinableQueue()
       
   505 
       
   506         if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
       
   507             return
       
   508 
       
   509         workers = [self.Process(target=self._test_task_done, args=(queue,))
       
   510                    for i in xrange(4)]
       
   511 
       
   512         for p in workers:
       
   513             p.start()
       
   514 
       
   515         for i in xrange(10):
       
   516             queue.put(i)
       
   517 
       
   518         queue.join()
       
   519 
       
   520         for p in workers:
       
   521             queue.put(None)
       
   522 
       
   523         for p in workers:
       
   524             p.join()
       
   525 
       
   526 #
       
   527 #
       
   528 #
       
   529 
       
   530 class _TestLock(BaseTestCase):
       
   531 
       
   532     def test_lock(self):
       
   533         lock = self.Lock()
       
   534         self.assertEqual(lock.acquire(), True)
       
   535         self.assertEqual(lock.acquire(False), False)
       
   536         self.assertEqual(lock.release(), None)
       
   537         self.assertRaises((ValueError, threading.ThreadError), lock.release)
       
   538 
       
   539     def test_rlock(self):
       
   540         lock = self.RLock()
       
   541         self.assertEqual(lock.acquire(), True)
       
   542         self.assertEqual(lock.acquire(), True)
       
   543         self.assertEqual(lock.acquire(), True)
       
   544         self.assertEqual(lock.release(), None)
       
   545         self.assertEqual(lock.release(), None)
       
   546         self.assertEqual(lock.release(), None)
       
   547         self.assertRaises((AssertionError, RuntimeError), lock.release)
       
   548 
       
   549 
       
   550 class _TestSemaphore(BaseTestCase):
       
   551 
       
   552     def _test_semaphore(self, sem):
       
   553         self.assertReturnsIfImplemented(2, get_value, sem)
       
   554         self.assertEqual(sem.acquire(), True)
       
   555         self.assertReturnsIfImplemented(1, get_value, sem)
       
   556         self.assertEqual(sem.acquire(), True)
       
   557         self.assertReturnsIfImplemented(0, get_value, sem)
       
   558         self.assertEqual(sem.acquire(False), False)
       
   559         self.assertReturnsIfImplemented(0, get_value, sem)
       
   560         self.assertEqual(sem.release(), None)
       
   561         self.assertReturnsIfImplemented(1, get_value, sem)
       
   562         self.assertEqual(sem.release(), None)
       
   563         self.assertReturnsIfImplemented(2, get_value, sem)
       
   564 
       
   565     def test_semaphore(self):
       
   566         sem = self.Semaphore(2)
       
   567         self._test_semaphore(sem)
       
   568         self.assertEqual(sem.release(), None)
       
   569         self.assertReturnsIfImplemented(3, get_value, sem)
       
   570         self.assertEqual(sem.release(), None)
       
   571         self.assertReturnsIfImplemented(4, get_value, sem)
       
   572 
       
   573     def test_bounded_semaphore(self):
       
   574         sem = self.BoundedSemaphore(2)
       
   575         self._test_semaphore(sem)
       
   576         # Currently fails on OS/X
       
   577         #if HAVE_GETVALUE:
       
   578         #    self.assertRaises(ValueError, sem.release)
       
   579         #    self.assertReturnsIfImplemented(2, get_value, sem)
       
   580 
       
   581     def test_timeout(self):
       
   582         if self.TYPE != 'processes':
       
   583             return
       
   584 
       
   585         sem = self.Semaphore(0)
       
   586         acquire = TimingWrapper(sem.acquire)
       
   587 
       
   588         self.assertEqual(acquire(False), False)
       
   589         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
       
   590 
       
   591         self.assertEqual(acquire(False, None), False)
       
   592         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
       
   593 
       
   594         self.assertEqual(acquire(False, TIMEOUT1), False)
       
   595         self.assertTimingAlmostEqual(acquire.elapsed, 0)
       
   596 
       
   597         self.assertEqual(acquire(True, TIMEOUT2), False)
       
   598         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
       
   599 
       
   600         self.assertEqual(acquire(timeout=TIMEOUT3), False)
       
   601         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
       
   602 
       
   603 
       
   604 class _TestCondition(BaseTestCase):
       
   605 
       
   606     def f(self, cond, sleeping, woken, timeout=None):
       
   607         cond.acquire()
       
   608         sleeping.release()
       
   609         cond.wait(timeout)
       
   610         woken.release()
       
   611         cond.release()
       
   612 
       
   613     def check_invariant(self, cond):
       
   614         # this is only supposed to succeed when there are no sleepers
       
   615         if self.TYPE == 'processes':
       
   616             try:
       
   617                 sleepers = (cond._sleeping_count.get_value() -
       
   618                             cond._woken_count.get_value())
       
   619                 self.assertEqual(sleepers, 0)
       
   620                 self.assertEqual(cond._wait_semaphore.get_value(), 0)
       
   621             except NotImplementedError:
       
   622                 pass
       
   623 
       
   624     def test_notify(self):
       
   625         cond = self.Condition()
       
   626         sleeping = self.Semaphore(0)
       
   627         woken = self.Semaphore(0)
       
   628 
       
   629         p = self.Process(target=self.f, args=(cond, sleeping, woken))
       
   630         p.daemon = True
       
   631         p.start()
       
   632 
       
   633         p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
       
   634         p.daemon = True
       
   635         p.start()
       
   636 
       
   637         # wait for both children to start sleeping
       
   638         sleeping.acquire()
       
   639         sleeping.acquire()
       
   640 
       
   641         # check no process/thread has woken up
       
   642         time.sleep(DELTA)
       
   643         self.assertReturnsIfImplemented(0, get_value, woken)
       
   644 
       
   645         # wake up one process/thread
       
   646         cond.acquire()
       
   647         cond.notify()
       
   648         cond.release()
       
   649 
       
   650         # check one process/thread has woken up
       
   651         time.sleep(DELTA)
       
   652         self.assertReturnsIfImplemented(1, get_value, woken)
       
   653 
       
   654         # wake up another
       
   655         cond.acquire()
       
   656         cond.notify()
       
   657         cond.release()
       
   658 
       
   659         # check other has woken up
       
   660         time.sleep(DELTA)
       
   661         self.assertReturnsIfImplemented(2, get_value, woken)
       
   662 
       
   663         # check state is not mucked up
       
   664         self.check_invariant(cond)
       
   665         p.join()
       
   666 
       
   667     def test_notify_all(self):
       
   668         cond = self.Condition()
       
   669         sleeping = self.Semaphore(0)
       
   670         woken = self.Semaphore(0)
       
   671 
       
   672         # start some threads/processes which will timeout
       
   673         for i in range(3):
       
   674             p = self.Process(target=self.f,
       
   675                              args=(cond, sleeping, woken, TIMEOUT1))
       
   676             p.daemon = True
       
   677             p.start()
       
   678 
       
   679             t = threading.Thread(target=self.f,
       
   680                                  args=(cond, sleeping, woken, TIMEOUT1))
       
   681             t.daemon = True
       
   682             t.start()
       
   683 
       
   684         # wait for them all to sleep
       
   685         for i in xrange(6):
       
   686             sleeping.acquire()
       
   687 
       
   688         # check they have all timed out
       
   689         for i in xrange(6):
       
   690             woken.acquire()
       
   691         self.assertReturnsIfImplemented(0, get_value, woken)
       
   692 
       
   693         # check state is not mucked up
       
   694         self.check_invariant(cond)
       
   695 
       
   696         # start some more threads/processes
       
   697         for i in range(3):
       
   698             p = self.Process(target=self.f, args=(cond, sleeping, woken))
       
   699             p.daemon = True
       
   700             p.start()
       
   701 
       
   702             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
       
   703             t.daemon = True
       
   704             t.start()
       
   705 
       
   706         # wait for them to all sleep
       
   707         for i in xrange(6):
       
   708             sleeping.acquire()
       
   709 
       
   710         # check no process/thread has woken up
       
   711         time.sleep(DELTA)
       
   712         self.assertReturnsIfImplemented(0, get_value, woken)
       
   713 
       
   714         # wake them all up
       
   715         cond.acquire()
       
   716         cond.notify_all()
       
   717         cond.release()
       
   718 
       
   719         # check they have all woken
       
   720         time.sleep(DELTA)
       
   721         self.assertReturnsIfImplemented(6, get_value, woken)
       
   722 
       
   723         # check state is not mucked up
       
   724         self.check_invariant(cond)
       
   725 
       
   726     def test_timeout(self):
       
   727         cond = self.Condition()
       
   728         wait = TimingWrapper(cond.wait)
       
   729         cond.acquire()
       
   730         res = wait(TIMEOUT1)
       
   731         cond.release()
       
   732         self.assertEqual(res, None)
       
   733         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
       
   734 
       
   735 
       
   736 class _TestEvent(BaseTestCase):
       
   737 
       
   738     def _test_event(self, event):
       
   739         time.sleep(TIMEOUT2)
       
   740         event.set()
       
   741 
       
   742     def test_event(self):
       
   743         event = self.Event()
       
   744         wait = TimingWrapper(event.wait)
       
   745 
       
   746         # Removed temporaily, due to API shear, this does not
       
   747         # work with threading._Event objects. is_set == isSet
       
   748         #self.assertEqual(event.is_set(), False)
       
   749 
       
   750         self.assertEqual(wait(0.0), None)
       
   751         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
       
   752         self.assertEqual(wait(TIMEOUT1), None)
       
   753         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
       
   754 
       
   755         event.set()
       
   756 
       
   757         # See note above on the API differences
       
   758         # self.assertEqual(event.is_set(), True)
       
   759         self.assertEqual(wait(), None)
       
   760         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
       
   761         self.assertEqual(wait(TIMEOUT1), None)
       
   762         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
       
   763         # self.assertEqual(event.is_set(), True)
       
   764 
       
   765         event.clear()
       
   766 
       
   767         #self.assertEqual(event.is_set(), False)
       
   768 
       
   769         self.Process(target=self._test_event, args=(event,)).start()
       
   770         self.assertEqual(wait(), None)
       
   771 
       
   772 #
       
   773 #
       
   774 #
       
   775 
       
   776 class _TestValue(BaseTestCase):
       
   777 
       
   778     codes_values = [
       
   779         ('i', 4343, 24234),
       
   780         ('d', 3.625, -4.25),
       
   781         ('h', -232, 234),
       
   782         ('c', latin('x'), latin('y'))
       
   783         ]
       
   784 
       
   785     def _test(self, values):
       
   786         for sv, cv in zip(values, self.codes_values):
       
   787             sv.value = cv[2]
       
   788 
       
   789 
       
   790     def test_value(self, raw=False):
       
   791         if self.TYPE != 'processes':
       
   792             return
       
   793 
       
   794         if raw:
       
   795             values = [self.RawValue(code, value)
       
   796                       for code, value, _ in self.codes_values]
       
   797         else:
       
   798             values = [self.Value(code, value)
       
   799                       for code, value, _ in self.codes_values]
       
   800 
       
   801         for sv, cv in zip(values, self.codes_values):
       
   802             self.assertEqual(sv.value, cv[1])
       
   803 
       
   804         proc = self.Process(target=self._test, args=(values,))
       
   805         proc.start()
       
   806         proc.join()
       
   807 
       
   808         for sv, cv in zip(values, self.codes_values):
       
   809             self.assertEqual(sv.value, cv[2])
       
   810 
       
   811     def test_rawvalue(self):
       
   812         self.test_value(raw=True)
       
   813 
       
   814     def test_getobj_getlock(self):
       
   815         if self.TYPE != 'processes':
       
   816             return
       
   817 
       
   818         val1 = self.Value('i', 5)
       
   819         lock1 = val1.get_lock()
       
   820         obj1 = val1.get_obj()
       
   821 
       
   822         val2 = self.Value('i', 5, lock=None)
       
   823         lock2 = val2.get_lock()
       
   824         obj2 = val2.get_obj()
       
   825 
       
   826         lock = self.Lock()
       
   827         val3 = self.Value('i', 5, lock=lock)
       
   828         lock3 = val3.get_lock()
       
   829         obj3 = val3.get_obj()
       
   830         self.assertEqual(lock, lock3)
       
   831 
       
   832         arr4 = self.RawValue('i', 5)
       
   833         self.assertFalse(hasattr(arr4, 'get_lock'))
       
   834         self.assertFalse(hasattr(arr4, 'get_obj'))
       
   835 
       
   836 
       
   837 class _TestArray(BaseTestCase):
       
   838 
       
   839     def f(self, seq):
       
   840         for i in range(1, len(seq)):
       
   841             seq[i] += seq[i-1]
       
   842 
       
   843     def test_array(self, raw=False):
       
   844         if self.TYPE != 'processes':
       
   845             return
       
   846 
       
   847         seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
       
   848         if raw:
       
   849             arr = self.RawArray('i', seq)
       
   850         else:
       
   851             arr = self.Array('i', seq)
       
   852 
       
   853         self.assertEqual(len(arr), len(seq))
       
   854         self.assertEqual(arr[3], seq[3])
       
   855         self.assertEqual(list(arr[2:7]), list(seq[2:7]))
       
   856 
       
   857         arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
       
   858 
       
   859         self.assertEqual(list(arr[:]), seq)
       
   860 
       
   861         self.f(seq)
       
   862 
       
   863         p = self.Process(target=self.f, args=(arr,))
       
   864         p.start()
       
   865         p.join()
       
   866 
       
   867         self.assertEqual(list(arr[:]), seq)
       
   868 
       
   869     def test_rawarray(self):
       
   870         self.test_array(raw=True)
       
   871 
       
   872     def test_getobj_getlock_obj(self):
       
   873         if self.TYPE != 'processes':
       
   874             return
       
   875 
       
   876         arr1 = self.Array('i', range(10))
       
   877         lock1 = arr1.get_lock()
       
   878         obj1 = arr1.get_obj()
       
   879 
       
   880         arr2 = self.Array('i', range(10), lock=None)
       
   881         lock2 = arr2.get_lock()
       
   882         obj2 = arr2.get_obj()
       
   883 
       
   884         lock = self.Lock()
       
   885         arr3 = self.Array('i', range(10), lock=lock)
       
   886         lock3 = arr3.get_lock()
       
   887         obj3 = arr3.get_obj()
       
   888         self.assertEqual(lock, lock3)
       
   889 
       
   890         arr4 = self.RawArray('i', range(10))
       
   891         self.assertFalse(hasattr(arr4, 'get_lock'))
       
   892         self.assertFalse(hasattr(arr4, 'get_obj'))
       
   893 
       
   894 #
       
   895 #
       
   896 #
       
   897 
       
   898 class _TestContainers(BaseTestCase):
       
   899 
       
   900     ALLOWED_TYPES = ('manager',)
       
   901 
       
   902     def test_list(self):
       
   903         a = self.list(range(10))
       
   904         self.assertEqual(a[:], range(10))
       
   905 
       
   906         b = self.list()
       
   907         self.assertEqual(b[:], [])
       
   908 
       
   909         b.extend(range(5))
       
   910         self.assertEqual(b[:], range(5))
       
   911 
       
   912         self.assertEqual(b[2], 2)
       
   913         self.assertEqual(b[2:10], [2,3,4])
       
   914 
       
   915         b *= 2
       
   916         self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
       
   917 
       
   918         self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
       
   919 
       
   920         self.assertEqual(a[:], range(10))
       
   921 
       
   922         d = [a, b]
       
   923         e = self.list(d)
       
   924         self.assertEqual(
       
   925             e[:],
       
   926             [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
       
   927             )
       
   928 
       
   929         f = self.list([a])
       
   930         a.append('hello')
       
   931         self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
       
   932 
       
   933     def test_dict(self):
       
   934         d = self.dict()
       
   935         indices = range(65, 70)
       
   936         for i in indices:
       
   937             d[i] = chr(i)
       
   938         self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
       
   939         self.assertEqual(sorted(d.keys()), indices)
       
   940         self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
       
   941         self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
       
   942 
       
   943     def test_namespace(self):
       
   944         n = self.Namespace()
       
   945         n.name = 'Bob'
       
   946         n.job = 'Builder'
       
   947         n._hidden = 'hidden'
       
   948         self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
       
   949         del n.job
       
   950         self.assertEqual(str(n), "Namespace(name='Bob')")
       
   951         self.assertTrue(hasattr(n, 'name'))
       
   952         self.assertTrue(not hasattr(n, 'job'))
       
   953 
       
   954 #
       
   955 #
       
   956 #
       
   957 
       
   958 def sqr(x, wait=0.0):
       
   959     time.sleep(wait)
       
   960     return x*x
       
   961 class _TestPool(BaseTestCase):
       
   962 
       
   963     def test_apply(self):
       
   964         papply = self.pool.apply
       
   965         self.assertEqual(papply(sqr, (5,)), sqr(5))
       
   966         self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
       
   967 
       
   968     def test_map(self):
       
   969         pmap = self.pool.map
       
   970         self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
       
   971         self.assertEqual(pmap(sqr, range(100), chunksize=20),
       
   972                          map(sqr, range(100)))
       
   973 
       
   974     def test_async(self):
       
   975         res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
       
   976         get = TimingWrapper(res.get)
       
   977         self.assertEqual(get(), 49)
       
   978         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
       
   979 
       
   980     def test_async_timeout(self):
       
   981         res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
       
   982         get = TimingWrapper(res.get)
       
   983         self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
       
   984         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
       
   985 
       
   986     def test_imap(self):
       
   987         it = self.pool.imap(sqr, range(10))
       
   988         self.assertEqual(list(it), map(sqr, range(10)))
       
   989 
       
   990         it = self.pool.imap(sqr, range(10))
       
   991         for i in range(10):
       
   992             self.assertEqual(it.next(), i*i)
       
   993         self.assertRaises(StopIteration, it.next)
       
   994 
       
   995         it = self.pool.imap(sqr, range(1000), chunksize=100)
       
   996         for i in range(1000):
       
   997             self.assertEqual(it.next(), i*i)
       
   998         self.assertRaises(StopIteration, it.next)
       
   999 
       
  1000     def test_imap_unordered(self):
       
  1001         it = self.pool.imap_unordered(sqr, range(1000))
       
  1002         self.assertEqual(sorted(it), map(sqr, range(1000)))
       
  1003 
       
  1004         it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
       
  1005         self.assertEqual(sorted(it), map(sqr, range(1000)))
       
  1006 
       
  1007     def test_make_pool(self):
       
  1008         p = multiprocessing.Pool(3)
       
  1009         self.assertEqual(3, len(p._pool))
       
  1010         p.close()
       
  1011         p.join()
       
  1012 
       
  1013     def test_terminate(self):
       
  1014         if self.TYPE == 'manager':
       
  1015             # On Unix a forked process increfs each shared object to
       
  1016             # which its parent process held a reference.  If the
       
  1017             # forked process gets terminated then there is likely to
       
  1018             # be a reference leak.  So to prevent
       
  1019             # _TestZZZNumberOfObjects from failing we skip this test
       
  1020             # when using a manager.
       
  1021             return
       
  1022 
       
  1023         result = self.pool.map_async(
       
  1024             time.sleep, [0.1 for i in range(10000)], chunksize=1
       
  1025             )
       
  1026         self.pool.terminate()
       
  1027         join = TimingWrapper(self.pool.join)
       
  1028         join()
       
  1029         self.assertTrue(join.elapsed < 0.2)
       
  1030 #
       
  1031 # Test that manager has expected number of shared objects left
       
  1032 #
       
  1033 
       
  1034 class _TestZZZNumberOfObjects(BaseTestCase):
       
  1035     # Because test cases are sorted alphabetically, this one will get
       
  1036     # run after all the other tests for the manager.  It tests that
       
  1037     # there have been no "reference leaks" for the manager's shared
       
  1038     # objects.  Note the comment in _TestPool.test_terminate().
       
  1039     ALLOWED_TYPES = ('manager',)
       
  1040 
       
  1041     def test_number_of_objects(self):
       
  1042         EXPECTED_NUMBER = 1                # the pool object is still alive
       
  1043         multiprocessing.active_children()  # discard dead process objs
       
  1044         gc.collect()                       # do garbage collection
       
  1045         refs = self.manager._number_of_objects()
       
  1046         if refs != EXPECTED_NUMBER:
       
  1047             print self.manager._debug_info()
       
  1048 
       
  1049         self.assertEqual(refs, EXPECTED_NUMBER)
       
  1050 
       
  1051 #
       
  1052 # Test of creating a customized manager class
       
  1053 #
       
  1054 
       
  1055 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
       
  1056 
       
  1057 class FooBar(object):
       
  1058     def f(self):
       
  1059         return 'f()'
       
  1060     def g(self):
       
  1061         raise ValueError
       
  1062     def _h(self):
       
  1063         return '_h()'
       
  1064 
       
  1065 def baz():
       
  1066     for i in xrange(10):
       
  1067         yield i*i
       
  1068 
       
  1069 class IteratorProxy(BaseProxy):
       
  1070     _exposed_ = ('next', '__next__')
       
  1071     def __iter__(self):
       
  1072         return self
       
  1073     def next(self):
       
  1074         return self._callmethod('next')
       
  1075     def __next__(self):
       
  1076         return self._callmethod('__next__')
       
  1077 
       
  1078 class MyManager(BaseManager):
       
  1079     pass
       
  1080 
       
  1081 MyManager.register('Foo', callable=FooBar)
       
  1082 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
       
  1083 MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
       
  1084 
       
  1085 
       
  1086 class _TestMyManager(BaseTestCase):
       
  1087 
       
  1088     ALLOWED_TYPES = ('manager',)
       
  1089 
       
  1090     def test_mymanager(self):
       
  1091         manager = MyManager()
       
  1092         manager.start()
       
  1093 
       
  1094         foo = manager.Foo()
       
  1095         bar = manager.Bar()
       
  1096         baz = manager.baz()
       
  1097 
       
  1098         foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
       
  1099         bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
       
  1100 
       
  1101         self.assertEqual(foo_methods, ['f', 'g'])
       
  1102         self.assertEqual(bar_methods, ['f', '_h'])
       
  1103 
       
  1104         self.assertEqual(foo.f(), 'f()')
       
  1105         self.assertRaises(ValueError, foo.g)
       
  1106         self.assertEqual(foo._callmethod('f'), 'f()')
       
  1107         self.assertRaises(RemoteError, foo._callmethod, '_h')
       
  1108 
       
  1109         self.assertEqual(bar.f(), 'f()')
       
  1110         self.assertEqual(bar._h(), '_h()')
       
  1111         self.assertEqual(bar._callmethod('f'), 'f()')
       
  1112         self.assertEqual(bar._callmethod('_h'), '_h()')
       
  1113 
       
  1114         self.assertEqual(list(baz), [i*i for i in range(10)])
       
  1115 
       
  1116         manager.shutdown()
       
  1117 
       
  1118 #
       
  1119 # Test of connecting to a remote server and using xmlrpclib for serialization
       
  1120 #
       
  1121 
       
  1122 _queue = Queue.Queue()
       
  1123 def get_queue():
       
  1124     return _queue
       
  1125 
       
  1126 class QueueManager(BaseManager):
       
  1127     '''manager class used by server process'''
       
  1128 QueueManager.register('get_queue', callable=get_queue)
       
  1129 
       
  1130 class QueueManager2(BaseManager):
       
  1131     '''manager class which specifies the same interface as QueueManager'''
       
  1132 QueueManager2.register('get_queue')
       
  1133 
       
  1134 
       
  1135 SERIALIZER = 'xmlrpclib'
       
  1136 
       
  1137 class _TestRemoteManager(BaseTestCase):
       
  1138 
       
  1139     ALLOWED_TYPES = ('manager',)
       
  1140 
       
  1141     def _putter(self, address, authkey):
       
  1142         manager = QueueManager2(
       
  1143             address=address, authkey=authkey, serializer=SERIALIZER
       
  1144             )
       
  1145         manager.connect()
       
  1146         queue = manager.get_queue()
       
  1147         queue.put(('hello world', None, True, 2.25))
       
  1148 
       
  1149     def test_remote(self):
       
  1150         authkey = os.urandom(32)
       
  1151 
       
  1152         manager = QueueManager(
       
  1153             address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
       
  1154             )
       
  1155         manager.start()
       
  1156 
       
  1157         p = self.Process(target=self._putter, args=(manager.address, authkey))
       
  1158         p.start()
       
  1159 
       
  1160         manager2 = QueueManager2(
       
  1161             address=manager.address, authkey=authkey, serializer=SERIALIZER
       
  1162             )
       
  1163         manager2.connect()
       
  1164         queue = manager2.get_queue()
       
  1165 
       
  1166         # Note that xmlrpclib will deserialize object as a list not a tuple
       
  1167         self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
       
  1168 
       
  1169         # Because we are using xmlrpclib for serialization instead of
       
  1170         # pickle this will cause a serialization error.
       
  1171         self.assertRaises(Exception, queue.put, time.sleep)
       
  1172 
       
  1173         # Make queue finalizer run before the server is stopped
       
  1174         del queue
       
  1175         manager.shutdown()
       
  1176 
       
  1177 #
       
  1178 #
       
  1179 #
       
  1180 
       
  1181 SENTINEL = latin('')
       
  1182 
       
  1183 class _TestConnection(BaseTestCase):
       
  1184 
       
  1185     ALLOWED_TYPES = ('processes', 'threads')
       
  1186 
       
  1187     def _echo(self, conn):
       
  1188         for msg in iter(conn.recv_bytes, SENTINEL):
       
  1189             conn.send_bytes(msg)
       
  1190         conn.close()
       
  1191 
       
  1192     def test_connection(self):
       
  1193         conn, child_conn = self.Pipe()
       
  1194 
       
  1195         p = self.Process(target=self._echo, args=(child_conn,))
       
  1196         p.daemon = True
       
  1197         p.start()
       
  1198 
       
  1199         seq = [1, 2.25, None]
       
  1200         msg = latin('hello world')
       
  1201         longmsg = msg * 10
       
  1202         arr = array.array('i', range(4))
       
  1203 
       
  1204         if self.TYPE == 'processes':
       
  1205             self.assertEqual(type(conn.fileno()), int)
       
  1206 
       
  1207         self.assertEqual(conn.send(seq), None)
       
  1208         self.assertEqual(conn.recv(), seq)
       
  1209 
       
  1210         self.assertEqual(conn.send_bytes(msg), None)
       
  1211         self.assertEqual(conn.recv_bytes(), msg)
       
  1212 
       
  1213         if self.TYPE == 'processes':
       
  1214             buffer = array.array('i', [0]*10)
       
  1215             expected = list(arr) + [0] * (10 - len(arr))
       
  1216             self.assertEqual(conn.send_bytes(arr), None)
       
  1217             self.assertEqual(conn.recv_bytes_into(buffer),
       
  1218                              len(arr) * buffer.itemsize)
       
  1219             self.assertEqual(list(buffer), expected)
       
  1220 
       
  1221             buffer = array.array('i', [0]*10)
       
  1222             expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
       
  1223             self.assertEqual(conn.send_bytes(arr), None)
       
  1224             self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
       
  1225                              len(arr) * buffer.itemsize)
       
  1226             self.assertEqual(list(buffer), expected)
       
  1227 
       
  1228             buffer = bytearray(latin(' ' * 40))
       
  1229             self.assertEqual(conn.send_bytes(longmsg), None)
       
  1230             try:
       
  1231                 res = conn.recv_bytes_into(buffer)
       
  1232             except multiprocessing.BufferTooShort, e:
       
  1233                 self.assertEqual(e.args, (longmsg,))
       
  1234             else:
       
  1235                 self.fail('expected BufferTooShort, got %s' % res)
       
  1236 
       
  1237         poll = TimingWrapper(conn.poll)
       
  1238 
       
  1239         self.assertEqual(poll(), False)
       
  1240         self.assertTimingAlmostEqual(poll.elapsed, 0)
       
  1241 
       
  1242         self.assertEqual(poll(TIMEOUT1), False)
       
  1243         self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
       
  1244 
       
  1245         conn.send(None)
       
  1246 
       
  1247         self.assertEqual(poll(TIMEOUT1), True)
       
  1248         self.assertTimingAlmostEqual(poll.elapsed, 0)
       
  1249 
       
  1250         self.assertEqual(conn.recv(), None)
       
  1251 
       
  1252         really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
       
  1253         conn.send_bytes(really_big_msg)
       
  1254         self.assertEqual(conn.recv_bytes(), really_big_msg)
       
  1255 
       
  1256         conn.send_bytes(SENTINEL)                          # tell child to quit
       
  1257         child_conn.close()
       
  1258 
       
  1259         if self.TYPE == 'processes':
       
  1260             self.assertEqual(conn.readable, True)
       
  1261             self.assertEqual(conn.writable, True)
       
  1262             self.assertRaises(EOFError, conn.recv)
       
  1263             self.assertRaises(EOFError, conn.recv_bytes)
       
  1264 
       
  1265         p.join()
       
  1266 
       
  1267     def test_duplex_false(self):
       
  1268         reader, writer = self.Pipe(duplex=False)
       
  1269         self.assertEqual(writer.send(1), None)
       
  1270         self.assertEqual(reader.recv(), 1)
       
  1271         if self.TYPE == 'processes':
       
  1272             self.assertEqual(reader.readable, True)
       
  1273             self.assertEqual(reader.writable, False)
       
  1274             self.assertEqual(writer.readable, False)
       
  1275             self.assertEqual(writer.writable, True)
       
  1276             self.assertRaises(IOError, reader.send, 2)
       
  1277             self.assertRaises(IOError, writer.recv)
       
  1278             self.assertRaises(IOError, writer.poll)
       
  1279 
       
  1280     def test_spawn_close(self):
       
  1281         # We test that a pipe connection can be closed by parent
       
  1282         # process immediately after child is spawned.  On Windows this
       
  1283         # would have sometimes failed on old versions because
       
  1284         # child_conn would be closed before the child got a chance to
       
  1285         # duplicate it.
       
  1286         conn, child_conn = self.Pipe()
       
  1287 
       
  1288         p = self.Process(target=self._echo, args=(child_conn,))
       
  1289         p.start()
       
  1290         child_conn.close()    # this might complete before child initializes
       
  1291 
       
  1292         msg = latin('hello')
       
  1293         conn.send_bytes(msg)
       
  1294         self.assertEqual(conn.recv_bytes(), msg)
       
  1295 
       
  1296         conn.send_bytes(SENTINEL)
       
  1297         conn.close()
       
  1298         p.join()
       
  1299 
       
  1300     def test_sendbytes(self):
       
  1301         if self.TYPE != 'processes':
       
  1302             return
       
  1303 
       
  1304         msg = latin('abcdefghijklmnopqrstuvwxyz')
       
  1305         a, b = self.Pipe()
       
  1306 
       
  1307         a.send_bytes(msg)
       
  1308         self.assertEqual(b.recv_bytes(), msg)
       
  1309 
       
  1310         a.send_bytes(msg, 5)
       
  1311         self.assertEqual(b.recv_bytes(), msg[5:])
       
  1312 
       
  1313         a.send_bytes(msg, 7, 8)
       
  1314         self.assertEqual(b.recv_bytes(), msg[7:7+8])
       
  1315 
       
  1316         a.send_bytes(msg, 26)
       
  1317         self.assertEqual(b.recv_bytes(), latin(''))
       
  1318 
       
  1319         a.send_bytes(msg, 26, 0)
       
  1320         self.assertEqual(b.recv_bytes(), latin(''))
       
  1321 
       
  1322         self.assertRaises(ValueError, a.send_bytes, msg, 27)
       
  1323 
       
  1324         self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
       
  1325 
       
  1326         self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
       
  1327 
       
  1328         self.assertRaises(ValueError, a.send_bytes, msg, -1)
       
  1329 
       
  1330         self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
       
  1331 
       
  1332 class _TestListenerClient(BaseTestCase):
       
  1333 
       
  1334     ALLOWED_TYPES = ('processes', 'threads')
       
  1335 
       
  1336     def _test(self, address):
       
  1337         conn = self.connection.Client(address)
       
  1338         conn.send('hello')
       
  1339         conn.close()
       
  1340 
       
  1341     def test_listener_client(self):
       
  1342         for family in self.connection.families:
       
  1343             l = self.connection.Listener(family=family)
       
  1344             p = self.Process(target=self._test, args=(l.address,))
       
  1345             p.daemon = True
       
  1346             p.start()
       
  1347             conn = l.accept()
       
  1348             self.assertEqual(conn.recv(), 'hello')
       
  1349             p.join()
       
  1350             l.close()
       
  1351 #
       
  1352 # Test of sending connection and socket objects between processes
       
  1353 #
       
  1354 """
       
  1355 class _TestPicklingConnections(BaseTestCase):
       
  1356 
       
  1357     ALLOWED_TYPES = ('processes',)
       
  1358 
       
  1359     def _listener(self, conn, families):
       
  1360         for fam in families:
       
  1361             l = self.connection.Listener(family=fam)
       
  1362             conn.send(l.address)
       
  1363             new_conn = l.accept()
       
  1364             conn.send(new_conn)
       
  1365 
       
  1366         if self.TYPE == 'processes':
       
  1367             l = socket.socket()
       
  1368             l.bind(('localhost', 0))
       
  1369             conn.send(l.getsockname())
       
  1370             l.listen(1)
       
  1371             new_conn, addr = l.accept()
       
  1372             conn.send(new_conn)
       
  1373 
       
  1374         conn.recv()
       
  1375 
       
  1376     def _remote(self, conn):
       
  1377         for (address, msg) in iter(conn.recv, None):
       
  1378             client = self.connection.Client(address)
       
  1379             client.send(msg.upper())
       
  1380             client.close()
       
  1381 
       
  1382         if self.TYPE == 'processes':
       
  1383             address, msg = conn.recv()
       
  1384             client = socket.socket()
       
  1385             client.connect(address)
       
  1386             client.sendall(msg.upper())
       
  1387             client.close()
       
  1388 
       
  1389         conn.close()
       
  1390 
       
  1391     def test_pickling(self):
       
  1392         try:
       
  1393             multiprocessing.allow_connection_pickling()
       
  1394         except ImportError:
       
  1395             return
       
  1396 
       
  1397         families = self.connection.families
       
  1398 
       
  1399         lconn, lconn0 = self.Pipe()
       
  1400         lp = self.Process(target=self._listener, args=(lconn0, families))
       
  1401         lp.start()
       
  1402         lconn0.close()
       
  1403 
       
  1404         rconn, rconn0 = self.Pipe()
       
  1405         rp = self.Process(target=self._remote, args=(rconn0,))
       
  1406         rp.start()
       
  1407         rconn0.close()
       
  1408 
       
  1409         for fam in families:
       
  1410             msg = ('This connection uses family %s' % fam).encode('ascii')
       
  1411             address = lconn.recv()
       
  1412             rconn.send((address, msg))
       
  1413             new_conn = lconn.recv()
       
  1414             self.assertEqual(new_conn.recv(), msg.upper())
       
  1415 
       
  1416         rconn.send(None)
       
  1417 
       
  1418         if self.TYPE == 'processes':
       
  1419             msg = latin('This connection uses a normal socket')
       
  1420             address = lconn.recv()
       
  1421             rconn.send((address, msg))
       
  1422             if hasattr(socket, 'fromfd'):
       
  1423                 new_conn = lconn.recv()
       
  1424                 self.assertEqual(new_conn.recv(100), msg.upper())
       
  1425             else:
       
  1426                 # XXX On Windows with Py2.6 need to backport fromfd()
       
  1427                 discard = lconn.recv_bytes()
       
  1428 
       
  1429         lconn.send(None)
       
  1430 
       
  1431         rconn.close()
       
  1432         lconn.close()
       
  1433 
       
  1434         lp.join()
       
  1435         rp.join()
       
  1436 """
       
  1437 #
       
  1438 #
       
  1439 #
       
  1440 
       
  1441 class _TestHeap(BaseTestCase):
       
  1442 
       
  1443     ALLOWED_TYPES = ('processes',)
       
  1444 
       
  1445     def test_heap(self):
       
  1446         iterations = 5000
       
  1447         maxblocks = 50
       
  1448         blocks = []
       
  1449 
       
  1450         # create and destroy lots of blocks of different sizes
       
  1451         for i in xrange(iterations):
       
  1452             size = int(random.lognormvariate(0, 1) * 1000)
       
  1453             b = multiprocessing.heap.BufferWrapper(size)
       
  1454             blocks.append(b)
       
  1455             if len(blocks) > maxblocks:
       
  1456                 i = random.randrange(maxblocks)
       
  1457                 del blocks[i]
       
  1458 
       
  1459         # get the heap object
       
  1460         heap = multiprocessing.heap.BufferWrapper._heap
       
  1461 
       
  1462         # verify the state of the heap
       
  1463         all = []
       
  1464         occupied = 0
       
  1465         for L in heap._len_to_seq.values():
       
  1466             for arena, start, stop in L:
       
  1467                 all.append((heap._arenas.index(arena), start, stop,
       
  1468                             stop-start, 'free'))
       
  1469         for arena, start, stop in heap._allocated_blocks:
       
  1470             all.append((heap._arenas.index(arena), start, stop,
       
  1471                         stop-start, 'occupied'))
       
  1472             occupied += (stop-start)
       
  1473 
       
  1474         all.sort()
       
  1475 
       
  1476         for i in range(len(all)-1):
       
  1477             (arena, start, stop) = all[i][:3]
       
  1478             (narena, nstart, nstop) = all[i+1][:3]
       
  1479             self.assertTrue((arena != narena and nstart == 0) or
       
  1480                             (stop == nstart))
       
  1481 
       
  1482 #
       
  1483 #
       
  1484 #
       
  1485 
       
  1486 try:
       
  1487     from ctypes import Structure, Value, copy, c_int, c_double
       
  1488 except ImportError:
       
  1489     Structure = object
       
  1490     c_int = c_double = None
       
  1491 
       
  1492 class _Foo(Structure):
       
  1493     _fields_ = [
       
  1494         ('x', c_int),
       
  1495         ('y', c_double)
       
  1496         ]
       
  1497 
       
  1498 class _TestSharedCTypes(BaseTestCase):
       
  1499 
       
  1500     ALLOWED_TYPES = ('processes',)
       
  1501 
       
  1502     def _double(self, x, y, foo, arr, string):
       
  1503         x.value *= 2
       
  1504         y.value *= 2
       
  1505         foo.x *= 2
       
  1506         foo.y *= 2
       
  1507         string.value *= 2
       
  1508         for i in range(len(arr)):
       
  1509             arr[i] *= 2
       
  1510 
       
  1511     def test_sharedctypes(self, lock=False):
       
  1512         if c_int is None:
       
  1513             return
       
  1514 
       
  1515         x = Value('i', 7, lock=lock)
       
  1516         y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
       
  1517         foo = Value(_Foo, 3, 2, lock=lock)
       
  1518         arr = Array('d', range(10), lock=lock)
       
  1519         string = Array('c', 20, lock=lock)
       
  1520         string.value = 'hello'
       
  1521 
       
  1522         p = self.Process(target=self._double, args=(x, y, foo, arr, string))
       
  1523         p.start()
       
  1524         p.join()
       
  1525 
       
  1526         self.assertEqual(x.value, 14)
       
  1527         self.assertAlmostEqual(y.value, 2.0/3.0)
       
  1528         self.assertEqual(foo.x, 6)
       
  1529         self.assertAlmostEqual(foo.y, 4.0)
       
  1530         for i in range(10):
       
  1531             self.assertAlmostEqual(arr[i], i*2)
       
  1532         self.assertEqual(string.value, latin('hellohello'))
       
  1533 
       
  1534     def test_synchronize(self):
       
  1535         self.test_sharedctypes(lock=True)
       
  1536 
       
  1537     def test_copy(self):
       
  1538         if c_int is None:
       
  1539             return
       
  1540 
       
  1541         foo = _Foo(2, 5.0)
       
  1542         bar = copy(foo)
       
  1543         foo.x = 0
       
  1544         foo.y = 0
       
  1545         self.assertEqual(bar.x, 2)
       
  1546         self.assertAlmostEqual(bar.y, 5.0)
       
  1547 
       
  1548 #
       
  1549 #
       
  1550 #
       
  1551 
       
  1552 class _TestFinalize(BaseTestCase):
       
  1553 
       
  1554     ALLOWED_TYPES = ('processes',)
       
  1555 
       
  1556     def _test_finalize(self, conn):
       
  1557         class Foo(object):
       
  1558             pass
       
  1559 
       
  1560         a = Foo()
       
  1561         util.Finalize(a, conn.send, args=('a',))
       
  1562         del a           # triggers callback for a
       
  1563 
       
  1564         b = Foo()
       
  1565         close_b = util.Finalize(b, conn.send, args=('b',))
       
  1566         close_b()       # triggers callback for b
       
  1567         close_b()       # does nothing because callback has already been called
       
  1568         del b           # does nothing because callback has already been called
       
  1569 
       
  1570         c = Foo()
       
  1571         util.Finalize(c, conn.send, args=('c',))
       
  1572 
       
  1573         d10 = Foo()
       
  1574         util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
       
  1575 
       
  1576         d01 = Foo()
       
  1577         util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
       
  1578         d02 = Foo()
       
  1579         util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
       
  1580         d03 = Foo()
       
  1581         util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
       
  1582 
       
  1583         util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
       
  1584 
       
  1585         util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
       
  1586 
       
  1587         # call mutliprocessing's cleanup function then exit process without
       
  1588         # garbage collecting locals
       
  1589         util._exit_function()
       
  1590         conn.close()
       
  1591         os._exit(0)
       
  1592 
       
  1593     def test_finalize(self):
       
  1594         conn, child_conn = self.Pipe()
       
  1595 
       
  1596         p = self.Process(target=self._test_finalize, args=(child_conn,))
       
  1597         p.start()
       
  1598         p.join()
       
  1599 
       
  1600         result = [obj for obj in iter(conn.recv, 'STOP')]
       
  1601         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
       
  1602 
       
  1603 #
       
  1604 # Test that from ... import * works for each module
       
  1605 #
       
  1606 
       
  1607 class _TestImportStar(BaseTestCase):
       
  1608 
       
  1609     ALLOWED_TYPES = ('processes',)
       
  1610 
       
  1611     def test_import(self):
       
  1612         modules = (
       
  1613             'multiprocessing', 'multiprocessing.connection',
       
  1614             'multiprocessing.heap', 'multiprocessing.managers',
       
  1615             'multiprocessing.pool', 'multiprocessing.process',
       
  1616             'multiprocessing.reduction', 'multiprocessing.sharedctypes',
       
  1617             'multiprocessing.synchronize', 'multiprocessing.util'
       
  1618             )
       
  1619 
       
  1620         for name in modules:
       
  1621             __import__(name)
       
  1622             mod = sys.modules[name]
       
  1623 
       
  1624             for attr in getattr(mod, '__all__', ()):
       
  1625                 self.assertTrue(
       
  1626                     hasattr(mod, attr),
       
  1627                     '%r does not have attribute %r' % (mod, attr)
       
  1628                     )
       
  1629 
       
  1630 #
       
  1631 # Quick test that logging works -- does not test logging output
       
  1632 #
       
  1633 
       
  1634 class _TestLogging(BaseTestCase):
       
  1635 
       
  1636     ALLOWED_TYPES = ('processes',)
       
  1637 
       
  1638     def test_enable_logging(self):
       
  1639         logger = multiprocessing.get_logger()
       
  1640         logger.setLevel(util.SUBWARNING)
       
  1641         self.assertTrue(logger is not None)
       
  1642         logger.debug('this will not be printed')
       
  1643         logger.info('nor will this')
       
  1644         logger.setLevel(LOG_LEVEL)
       
  1645 
       
  1646     def _test_level(self, conn):
       
  1647         logger = multiprocessing.get_logger()
       
  1648         conn.send(logger.getEffectiveLevel())
       
  1649 
       
  1650     def test_level(self):
       
  1651         LEVEL1 = 32
       
  1652         LEVEL2 = 37
       
  1653 
       
  1654         logger = multiprocessing.get_logger()
       
  1655         root_logger = logging.getLogger()
       
  1656         root_level = root_logger.level
       
  1657 
       
  1658         reader, writer = multiprocessing.Pipe(duplex=False)
       
  1659 
       
  1660         logger.setLevel(LEVEL1)
       
  1661         self.Process(target=self._test_level, args=(writer,)).start()
       
  1662         self.assertEqual(LEVEL1, reader.recv())
       
  1663 
       
  1664         logger.setLevel(logging.NOTSET)
       
  1665         root_logger.setLevel(LEVEL2)
       
  1666         self.Process(target=self._test_level, args=(writer,)).start()
       
  1667         self.assertEqual(LEVEL2, reader.recv())
       
  1668 
       
  1669         root_logger.setLevel(root_level)
       
  1670         logger.setLevel(level=LOG_LEVEL)
       
  1671 
       
  1672 #
       
  1673 # Functions used to create test cases from the base ones in this module
       
  1674 #
       
  1675 
       
  1676 def get_attributes(Source, names):
       
  1677     d = {}
       
  1678     for name in names:
       
  1679         obj = getattr(Source, name)
       
  1680         if type(obj) == type(get_attributes):
       
  1681             obj = staticmethod(obj)
       
  1682         d[name] = obj
       
  1683     return d
       
  1684 
       
  1685 def create_test_cases(Mixin, type):
       
  1686     result = {}
       
  1687     glob = globals()
       
  1688     Type = type[0].upper() + type[1:]
       
  1689 
       
  1690     for name in glob.keys():
       
  1691         if name.startswith('_Test'):
       
  1692             base = glob[name]
       
  1693             if type in base.ALLOWED_TYPES:
       
  1694                 newname = 'With' + Type + name[1:]
       
  1695                 class Temp(base, unittest.TestCase, Mixin):
       
  1696                     pass
       
  1697                 result[newname] = Temp
       
  1698                 Temp.__name__ = newname
       
  1699                 Temp.__module__ = Mixin.__module__
       
  1700     return result
       
  1701 
       
  1702 #
       
  1703 # Create test cases
       
  1704 #
       
  1705 
       
  1706 class ProcessesMixin(object):
       
  1707     TYPE = 'processes'
       
  1708     Process = multiprocessing.Process
       
  1709     locals().update(get_attributes(multiprocessing, (
       
  1710         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
       
  1711         'Condition', 'Event', 'Value', 'Array', 'RawValue',
       
  1712         'RawArray', 'current_process', 'active_children', 'Pipe',
       
  1713         'connection', 'JoinableQueue'
       
  1714         )))
       
  1715 
       
  1716 testcases_processes = create_test_cases(ProcessesMixin, type='processes')
       
  1717 globals().update(testcases_processes)
       
  1718 
       
  1719 
       
  1720 class ManagerMixin(object):
       
  1721     TYPE = 'manager'
       
  1722     Process = multiprocessing.Process
       
  1723     manager = object.__new__(multiprocessing.managers.SyncManager)
       
  1724     locals().update(get_attributes(manager, (
       
  1725         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
       
  1726        'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
       
  1727         'Namespace', 'JoinableQueue'
       
  1728         )))
       
  1729 
       
  1730 testcases_manager = create_test_cases(ManagerMixin, type='manager')
       
  1731 globals().update(testcases_manager)
       
  1732 
       
  1733 
       
  1734 class ThreadsMixin(object):
       
  1735     TYPE = 'threads'
       
  1736     Process = multiprocessing.dummy.Process
       
  1737     locals().update(get_attributes(multiprocessing.dummy, (
       
  1738         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
       
  1739         'Condition', 'Event', 'Value', 'Array', 'current_process',
       
  1740         'active_children', 'Pipe', 'connection', 'dict', 'list',
       
  1741         'Namespace', 'JoinableQueue'
       
  1742         )))
       
  1743 
       
  1744 testcases_threads = create_test_cases(ThreadsMixin, type='threads')
       
  1745 globals().update(testcases_threads)
       
  1746 
       
  1747 class OtherTest(unittest.TestCase):
       
  1748     # TODO: add more tests for deliver/answer challenge.
       
  1749     def test_deliver_challenge_auth_failure(self):
       
  1750         class _FakeConnection(object):
       
  1751             def recv_bytes(self, size):
       
  1752                 return b'something bogus'
       
  1753             def send_bytes(self, data):
       
  1754                 pass
       
  1755         self.assertRaises(multiprocessing.AuthenticationError,
       
  1756                           multiprocessing.connection.deliver_challenge,
       
  1757                           _FakeConnection(), b'abc')
       
  1758 
       
  1759     def test_answer_challenge_auth_failure(self):
       
  1760         class _FakeConnection(object):
       
  1761             def __init__(self):
       
  1762                 self.count = 0
       
  1763             def recv_bytes(self, size):
       
  1764                 self.count += 1
       
  1765                 if self.count == 1:
       
  1766                     return multiprocessing.connection.CHALLENGE
       
  1767                 elif self.count == 2:
       
  1768                     return b'something bogus'
       
  1769                 return b''
       
  1770             def send_bytes(self, data):
       
  1771                 pass
       
  1772         self.assertRaises(multiprocessing.AuthenticationError,
       
  1773                           multiprocessing.connection.answer_challenge,
       
  1774                           _FakeConnection(), b'abc')
       
  1775 
       
  1776 testcases_other = [OtherTest]
       
  1777 
       
  1778 #
       
  1779 #
       
  1780 #
       
  1781 
       
  1782 def test_main(run=None):
       
  1783     if sys.platform.startswith("linux"):
       
  1784         try:
       
  1785             lock = multiprocessing.RLock()
       
  1786         except OSError:
       
  1787             from test.test_support import TestSkipped
       
  1788             raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
       
  1789 
       
  1790     if run is None:
       
  1791         from test.test_support import run_unittest as run
       
  1792 
       
  1793     util.get_temp_dir()     # creates temp directory for use by all processes
       
  1794 
       
  1795     multiprocessing.get_logger().setLevel(LOG_LEVEL)
       
  1796 
       
  1797     ProcessesMixin.pool = multiprocessing.Pool(4)
       
  1798     ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
       
  1799     ManagerMixin.manager.__init__()
       
  1800     ManagerMixin.manager.start()
       
  1801     ManagerMixin.pool = ManagerMixin.manager.Pool(4)
       
  1802 
       
  1803     testcases = (
       
  1804         sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
       
  1805         sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
       
  1806         sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
       
  1807         testcases_other
       
  1808         )
       
  1809 
       
  1810     loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
       
  1811     suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
       
  1812     run(suite)
       
  1813 
       
  1814     ThreadsMixin.pool.terminate()
       
  1815     ProcessesMixin.pool.terminate()
       
  1816     ManagerMixin.pool.terminate()
       
  1817     ManagerMixin.manager.shutdown()
       
  1818 
       
  1819     del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
       
  1820 
       
  1821 def main():
       
  1822     test_main(unittest.TextTestRunner(verbosity=2).run)
       
  1823 
       
  1824 if __name__ == '__main__':
       
  1825     main()