symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/fork_wait.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """This test case provides support for checking forking and wait behavior.
       
     2 
       
     3 To test different wait behavior, overrise the wait_impl method.
       
     4 
       
     5 We want fork1() semantics -- only the forking thread survives in the
       
     6 child after a fork().
       
     7 
       
     8 On some systems (e.g. Solaris without posix threads) we find that all
       
     9 active threads survive in the child after a fork(); this is an error.
       
    10 
       
    11 While BeOS doesn't officially support fork and native threading in
       
    12 the same application, the present example should work just fine.  DC
       
    13 """
       
    14 
       
    15 import os, sys, time, thread, unittest
       
    16 
       
    17 LONGSLEEP = 2
       
    18 SHORTSLEEP = 0.5
       
    19 NUM_THREADS = 4
       
    20 
       
    21 class ForkWait(unittest.TestCase):
       
    22 
       
    23     def setUp(self):
       
    24         self.alive = {}
       
    25         self.stop = 0
       
    26 
       
    27     def f(self, id):
       
    28         while not self.stop:
       
    29             self.alive[id] = os.getpid()
       
    30             try:
       
    31                 time.sleep(SHORTSLEEP)
       
    32             except IOError:
       
    33                 pass
       
    34 
       
    35     def wait_impl(self, cpid):
       
    36         for i in range(10):
       
    37             # waitpid() shouldn't hang, but some of the buildbots seem to hang
       
    38             # in the forking tests.  This is an attempt to fix the problem.
       
    39             spid, status = os.waitpid(cpid, os.WNOHANG)
       
    40             if spid == cpid:
       
    41                 break
       
    42             time.sleep(2 * SHORTSLEEP)
       
    43 
       
    44         self.assertEquals(spid, cpid)
       
    45         self.assertEquals(status, 0, "cause = %d, exit = %d" % (status&0xff, status>>8))
       
    46 
       
    47     def test_wait(self):
       
    48         for i in range(NUM_THREADS):
       
    49             thread.start_new(self.f, (i,))
       
    50 
       
    51         time.sleep(LONGSLEEP)
       
    52 
       
    53         a = self.alive.keys()
       
    54         a.sort()
       
    55         self.assertEquals(a, range(NUM_THREADS))
       
    56 
       
    57         prefork_lives = self.alive.copy()
       
    58 
       
    59         if sys.platform in ['unixware7']:
       
    60             cpid = os.fork1()
       
    61         else:
       
    62             cpid = os.fork()
       
    63 
       
    64         if cpid == 0:
       
    65             # Child
       
    66             time.sleep(LONGSLEEP)
       
    67             n = 0
       
    68             for key in self.alive:
       
    69                 if self.alive[key] != prefork_lives[key]:
       
    70                     n += 1
       
    71             os._exit(n)
       
    72         else:
       
    73             # Parent
       
    74             self.wait_impl(cpid)
       
    75             # Tell threads to die
       
    76             self.stop = 1
       
    77             time.sleep(2*SHORTSLEEP) # Wait for threads to die