symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_signal.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 import unittest
       
     2 from test import test_support
       
     3 from contextlib import closing, nested
       
     4 import gc
       
     5 import pickle
       
     6 import select
       
     7 import signal
       
     8 import subprocess
       
     9 import traceback
       
    10 import sys, os, time, errno
       
    11 
       
    12 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
       
    13     raise test_support.TestSkipped("Can't test signal on %s" % \
       
    14                                    sys.platform)
       
    15 
       
    16 
       
    17 class HandlerBCalled(Exception):
       
    18     pass
       
    19 
       
    20 
       
    21 def exit_subprocess():
       
    22     """Use os._exit(0) to exit the current subprocess.
       
    23 
       
    24     Otherwise, the test catches the SystemExit and continues executing
       
    25     in parallel with the original test, so you wind up with an
       
    26     exponential number of tests running concurrently.
       
    27     """
       
    28     os._exit(0)
       
    29 
       
    30 
       
    31 def ignoring_eintr(__func, *args, **kwargs):
       
    32     try:
       
    33         return __func(*args, **kwargs)
       
    34     except EnvironmentError as e:
       
    35         if e.errno != errno.EINTR:
       
    36             raise
       
    37         return None
       
    38 
       
    39 
       
    40 class InterProcessSignalTests(unittest.TestCase):
       
    41     MAX_DURATION = 20   # Entire test should last at most 20 sec.
       
    42 
       
    43     def setUp(self):
       
    44         self.using_gc = gc.isenabled()
       
    45         gc.disable()
       
    46 
       
    47     def tearDown(self):
       
    48         if self.using_gc:
       
    49             gc.enable()
       
    50 
       
    51     def format_frame(self, frame, limit=None):
       
    52         return ''.join(traceback.format_stack(frame, limit=limit))
       
    53 
       
    54     def handlerA(self, signum, frame):
       
    55         self.a_called = True
       
    56         if test_support.verbose:
       
    57             print "handlerA invoked from signal %s at:\n%s" % (
       
    58                 signum, self.format_frame(frame, limit=1))
       
    59 
       
    60     def handlerB(self, signum, frame):
       
    61         self.b_called = True
       
    62         if test_support.verbose:
       
    63             print "handlerB invoked from signal %s at:\n%s" % (
       
    64                 signum, self.format_frame(frame, limit=1))
       
    65         raise HandlerBCalled(signum, self.format_frame(frame))
       
    66 
       
    67     def wait(self, child):
       
    68         """Wait for child to finish, ignoring EINTR."""
       
    69         while True:
       
    70             try:
       
    71                 child.wait()
       
    72                 return
       
    73             except OSError as e:
       
    74                 if e.errno != errno.EINTR:
       
    75                     raise
       
    76 
       
    77     def run_test(self):
       
    78         # Install handlers. This function runs in a sub-process, so we
       
    79         # don't worry about re-setting the default handlers.
       
    80         signal.signal(signal.SIGHUP, self.handlerA)
       
    81         signal.signal(signal.SIGUSR1, self.handlerB)
       
    82         signal.signal(signal.SIGUSR2, signal.SIG_IGN)
       
    83         signal.signal(signal.SIGALRM, signal.default_int_handler)
       
    84 
       
    85         # Variables the signals will modify:
       
    86         self.a_called = False
       
    87         self.b_called = False
       
    88 
       
    89         # Let the sub-processes know who to send signals to.
       
    90         pid = os.getpid()
       
    91         if test_support.verbose:
       
    92             print "test runner's pid is", pid
       
    93 
       
    94         child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
       
    95         if child:
       
    96             self.wait(child)
       
    97             if not self.a_called:
       
    98                 time.sleep(1)  # Give the signal time to be delivered.
       
    99         self.assertTrue(self.a_called)
       
   100         self.assertFalse(self.b_called)
       
   101         self.a_called = False
       
   102 
       
   103         # Make sure the signal isn't delivered while the previous
       
   104         # Popen object is being destroyed, because __del__ swallows
       
   105         # exceptions.
       
   106         del child
       
   107         try:
       
   108             child = subprocess.Popen(['kill', '-USR1', str(pid)])
       
   109             # This wait should be interrupted by the signal's exception.
       
   110             self.wait(child)
       
   111             time.sleep(1)  # Give the signal time to be delivered.
       
   112             self.fail('HandlerBCalled exception not thrown')
       
   113         except HandlerBCalled:
       
   114             self.assertTrue(self.b_called)
       
   115             self.assertFalse(self.a_called)
       
   116             if test_support.verbose:
       
   117                 print "HandlerBCalled exception caught"
       
   118 
       
   119         child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
       
   120         if child:
       
   121             self.wait(child)  # Nothing should happen.
       
   122 
       
   123         try:
       
   124             signal.alarm(1)
       
   125             # The race condition in pause doesn't matter in this case,
       
   126             # since alarm is going to raise a KeyboardException, which
       
   127             # will skip the call.
       
   128             signal.pause()
       
   129             # But if another signal arrives before the alarm, pause
       
   130             # may return early.
       
   131             time.sleep(1)
       
   132         except KeyboardInterrupt:
       
   133             if test_support.verbose:
       
   134                 print "KeyboardInterrupt (the alarm() went off)"
       
   135         except:
       
   136             self.fail("Some other exception woke us from pause: %s" %
       
   137                       traceback.format_exc())
       
   138         else:
       
   139             self.fail("pause returned of its own accord, and the signal"
       
   140                       " didn't arrive after another second.")
       
   141 
       
   142     def test_main(self):
       
   143         # This function spawns a child process to insulate the main
       
   144         # test-running process from all the signals. It then
       
   145         # communicates with that child process over a pipe and
       
   146         # re-raises information about any exceptions the child
       
   147         # throws. The real work happens in self.run_test().
       
   148         os_done_r, os_done_w = os.pipe()
       
   149         with nested(closing(os.fdopen(os_done_r)),
       
   150                     closing(os.fdopen(os_done_w, 'w'))) as (done_r, done_w):
       
   151             child = os.fork()
       
   152             if child == 0:
       
   153                 # In the child process; run the test and report results
       
   154                 # through the pipe.
       
   155                 try:
       
   156                     done_r.close()
       
   157                     # Have to close done_w again here because
       
   158                     # exit_subprocess() will skip the enclosing with block.
       
   159                     with closing(done_w):
       
   160                         try:
       
   161                             self.run_test()
       
   162                         except:
       
   163                             pickle.dump(traceback.format_exc(), done_w)
       
   164                         else:
       
   165                             pickle.dump(None, done_w)
       
   166                 except:
       
   167                     print 'Uh oh, raised from pickle.'
       
   168                     traceback.print_exc()
       
   169                 finally:
       
   170                     exit_subprocess()
       
   171 
       
   172             done_w.close()
       
   173             # Block for up to MAX_DURATION seconds for the test to finish.
       
   174             r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
       
   175             if done_r in r:
       
   176                 tb = pickle.load(done_r)
       
   177                 if tb:
       
   178                     self.fail(tb)
       
   179             else:
       
   180                 os.kill(child, signal.SIGKILL)
       
   181                 self.fail('Test deadlocked after %d seconds.' %
       
   182                           self.MAX_DURATION)
       
   183 
       
   184 
       
   185 class BasicSignalTests(unittest.TestCase):
       
   186     def trivial_signal_handler(self, *args):
       
   187         pass
       
   188 
       
   189     def test_out_of_range_signal_number_raises_error(self):
       
   190         self.assertRaises(ValueError, signal.getsignal, 4242)
       
   191 
       
   192         self.assertRaises(ValueError, signal.signal, 4242,
       
   193                           self.trivial_signal_handler)
       
   194 
       
   195     def test_setting_signal_handler_to_none_raises_error(self):
       
   196         self.assertRaises(TypeError, signal.signal,
       
   197                           signal.SIGUSR1, None)
       
   198 
       
   199     def test_getsignal(self):
       
   200         hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
       
   201         self.assertEquals(signal.getsignal(signal.SIGHUP),
       
   202                           self.trivial_signal_handler)
       
   203         signal.signal(signal.SIGHUP, hup)
       
   204         self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
       
   205 
       
   206 
       
   207 class WakeupSignalTests(unittest.TestCase):
       
   208     TIMEOUT_FULL = 10
       
   209     TIMEOUT_HALF = 5
       
   210 
       
   211     def test_wakeup_fd_early(self):
       
   212         import select
       
   213 
       
   214         signal.alarm(1)
       
   215         before_time = time.time()
       
   216         # We attempt to get a signal during the sleep,
       
   217         # before select is called
       
   218         time.sleep(self.TIMEOUT_FULL)
       
   219         mid_time = time.time()
       
   220         self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
       
   221         select.select([self.read], [], [], self.TIMEOUT_FULL)
       
   222         after_time = time.time()
       
   223         self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
       
   224 
       
   225     def test_wakeup_fd_during(self):
       
   226         import select
       
   227 
       
   228         signal.alarm(1)
       
   229         before_time = time.time()
       
   230         # We attempt to get a signal during the select call
       
   231         self.assertRaises(select.error, select.select,
       
   232             [self.read], [], [], self.TIMEOUT_FULL)
       
   233         after_time = time.time()
       
   234         self.assert_(after_time - before_time < self.TIMEOUT_HALF)
       
   235 
       
   236     def setUp(self):
       
   237         import fcntl
       
   238 
       
   239         self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
       
   240         self.read, self.write = os.pipe()
       
   241         flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
       
   242         flags = flags | os.O_NONBLOCK
       
   243         fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
       
   244         self.old_wakeup = signal.set_wakeup_fd(self.write)
       
   245 
       
   246     def tearDown(self):
       
   247         signal.set_wakeup_fd(self.old_wakeup)
       
   248         os.close(self.read)
       
   249         os.close(self.write)
       
   250         signal.signal(signal.SIGALRM, self.alrm)
       
   251 
       
   252 class SiginterruptTest(unittest.TestCase):
       
   253     signum = signal.SIGUSR1
       
   254     def readpipe_interrupted(self, cb):
       
   255         r, w = os.pipe()
       
   256         ppid = os.getpid()
       
   257         pid = os.fork()
       
   258 
       
   259         oldhandler = signal.signal(self.signum, lambda x,y: None)
       
   260         cb()
       
   261         if pid==0:
       
   262             # child code: sleep, kill, sleep. and then exit,
       
   263             # which closes the pipe from which the parent process reads
       
   264             try:
       
   265                 time.sleep(0.2)
       
   266                 os.kill(ppid, self.signum)
       
   267                 time.sleep(0.2)
       
   268             finally:
       
   269                 exit_subprocess()
       
   270 
       
   271         try:
       
   272             os.close(w)
       
   273 
       
   274             try:
       
   275                 d=os.read(r, 1)
       
   276                 return False
       
   277             except OSError, err:
       
   278                 if err.errno != errno.EINTR:
       
   279                     raise
       
   280                 return True
       
   281         finally:
       
   282             signal.signal(self.signum, oldhandler)
       
   283             os.waitpid(pid, 0)
       
   284 
       
   285     def test_without_siginterrupt(self):
       
   286         i=self.readpipe_interrupted(lambda: None)
       
   287         self.assertEquals(i, True)
       
   288 
       
   289     def test_siginterrupt_on(self):
       
   290         i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
       
   291         self.assertEquals(i, True)
       
   292 
       
   293     def test_siginterrupt_off(self):
       
   294         i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
       
   295         self.assertEquals(i, False)
       
   296 
       
   297 class ItimerTest(unittest.TestCase):
       
   298     def setUp(self):
       
   299         self.hndl_called = False
       
   300         self.hndl_count = 0
       
   301         self.itimer = None
       
   302         self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
       
   303 
       
   304     def tearDown(self):
       
   305         signal.signal(signal.SIGALRM, self.old_alarm)
       
   306         if self.itimer is not None: # test_itimer_exc doesn't change this attr
       
   307             # just ensure that itimer is stopped
       
   308             signal.setitimer(self.itimer, 0)
       
   309 
       
   310     def sig_alrm(self, *args):
       
   311         self.hndl_called = True
       
   312         if test_support.verbose:
       
   313             print("SIGALRM handler invoked", args)
       
   314 
       
   315     def sig_vtalrm(self, *args):
       
   316         self.hndl_called = True
       
   317 
       
   318         if self.hndl_count > 3:
       
   319             # it shouldn't be here, because it should have been disabled.
       
   320             raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
       
   321                 "timer.")
       
   322         elif self.hndl_count == 3:
       
   323             # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
       
   324             signal.setitimer(signal.ITIMER_VIRTUAL, 0)
       
   325             if test_support.verbose:
       
   326                 print("last SIGVTALRM handler call")
       
   327 
       
   328         self.hndl_count += 1
       
   329 
       
   330         if test_support.verbose:
       
   331             print("SIGVTALRM handler invoked", args)
       
   332 
       
   333     def sig_prof(self, *args):
       
   334         self.hndl_called = True
       
   335         signal.setitimer(signal.ITIMER_PROF, 0)
       
   336 
       
   337         if test_support.verbose:
       
   338             print("SIGPROF handler invoked", args)
       
   339 
       
   340     def test_itimer_exc(self):
       
   341         # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
       
   342         # defines it ?
       
   343         self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
       
   344         # Negative times are treated as zero on some platforms.
       
   345         if 0:
       
   346             self.assertRaises(signal.ItimerError,
       
   347                               signal.setitimer, signal.ITIMER_REAL, -1)
       
   348 
       
   349     def test_itimer_real(self):
       
   350         self.itimer = signal.ITIMER_REAL
       
   351         signal.setitimer(self.itimer, 1.0)
       
   352         if test_support.verbose:
       
   353             print("\ncall pause()...")
       
   354         signal.pause()
       
   355 
       
   356         self.assertEqual(self.hndl_called, True)
       
   357 
       
   358     def test_itimer_virtual(self):
       
   359         self.itimer = signal.ITIMER_VIRTUAL
       
   360         signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
       
   361         signal.setitimer(self.itimer, 0.3, 0.2)
       
   362 
       
   363         for i in xrange(100000000):
       
   364             if signal.getitimer(self.itimer) == (0.0, 0.0):
       
   365                 break # sig_vtalrm handler stopped this itimer
       
   366 
       
   367         # virtual itimer should be (0.0, 0.0) now
       
   368         self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
       
   369         # and the handler should have been called
       
   370         self.assertEquals(self.hndl_called, True)
       
   371 
       
   372     def test_itimer_prof(self):
       
   373         self.itimer = signal.ITIMER_PROF
       
   374         signal.signal(signal.SIGPROF, self.sig_prof)
       
   375         signal.setitimer(self.itimer, 0.2, 0.2)
       
   376 
       
   377         for i in xrange(100000000):
       
   378             if signal.getitimer(self.itimer) == (0.0, 0.0):
       
   379                 break # sig_prof handler stopped this itimer
       
   380 
       
   381         # profiling itimer should be (0.0, 0.0) now
       
   382         self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
       
   383         # and the handler should have been called
       
   384         self.assertEqual(self.hndl_called, True)
       
   385 
       
   386 def test_main():
       
   387     test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
       
   388         WakeupSignalTests, SiginterruptTest, ItimerTest)
       
   389 
       
   390 
       
   391 if __name__ == "__main__":
       
   392     test_main()