symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_signal.py
changeset 1 2fb8b9db1c86
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_signal.py	Fri Jul 31 15:01:17 2009 +0100
@@ -0,0 +1,392 @@
+import unittest
+from test import test_support
+from contextlib import closing, nested
+import gc
+import pickle
+import select
+import signal
+import subprocess
+import traceback
+import sys, os, time, errno
+
+if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
+    raise test_support.TestSkipped("Can't test signal on %s" % \
+                                   sys.platform)
+
+
+class HandlerBCalled(Exception):
+    pass
+
+
+def exit_subprocess():
+    """Use os._exit(0) to exit the current subprocess.
+
+    Otherwise, the test catches the SystemExit and continues executing
+    in parallel with the original test, so you wind up with an
+    exponential number of tests running concurrently.
+    """
+    os._exit(0)
+
+
+def ignoring_eintr(__func, *args, **kwargs):
+    try:
+        return __func(*args, **kwargs)
+    except EnvironmentError as e:
+        if e.errno != errno.EINTR:
+            raise
+        return None
+
+
+class InterProcessSignalTests(unittest.TestCase):
+    MAX_DURATION = 20   # Entire test should last at most 20 sec.
+
+    def setUp(self):
+        self.using_gc = gc.isenabled()
+        gc.disable()
+
+    def tearDown(self):
+        if self.using_gc:
+            gc.enable()
+
+    def format_frame(self, frame, limit=None):
+        return ''.join(traceback.format_stack(frame, limit=limit))
+
+    def handlerA(self, signum, frame):
+        self.a_called = True
+        if test_support.verbose:
+            print "handlerA invoked from signal %s at:\n%s" % (
+                signum, self.format_frame(frame, limit=1))
+
+    def handlerB(self, signum, frame):
+        self.b_called = True
+        if test_support.verbose:
+            print "handlerB invoked from signal %s at:\n%s" % (
+                signum, self.format_frame(frame, limit=1))
+        raise HandlerBCalled(signum, self.format_frame(frame))
+
+    def wait(self, child):
+        """Wait for child to finish, ignoring EINTR."""
+        while True:
+            try:
+                child.wait()
+                return
+            except OSError as e:
+                if e.errno != errno.EINTR:
+                    raise
+
+    def run_test(self):
+        # Install handlers. This function runs in a sub-process, so we
+        # don't worry about re-setting the default handlers.
+        signal.signal(signal.SIGHUP, self.handlerA)
+        signal.signal(signal.SIGUSR1, self.handlerB)
+        signal.signal(signal.SIGUSR2, signal.SIG_IGN)
+        signal.signal(signal.SIGALRM, signal.default_int_handler)
+
+        # Variables the signals will modify:
+        self.a_called = False
+        self.b_called = False
+
+        # Let the sub-processes know who to send signals to.
+        pid = os.getpid()
+        if test_support.verbose:
+            print "test runner's pid is", pid
+
+        child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
+        if child:
+            self.wait(child)
+            if not self.a_called:
+                time.sleep(1)  # Give the signal time to be delivered.
+        self.assertTrue(self.a_called)
+        self.assertFalse(self.b_called)
+        self.a_called = False
+
+        # Make sure the signal isn't delivered while the previous
+        # Popen object is being destroyed, because __del__ swallows
+        # exceptions.
+        del child
+        try:
+            child = subprocess.Popen(['kill', '-USR1', str(pid)])
+            # This wait should be interrupted by the signal's exception.
+            self.wait(child)
+            time.sleep(1)  # Give the signal time to be delivered.
+            self.fail('HandlerBCalled exception not thrown')
+        except HandlerBCalled:
+            self.assertTrue(self.b_called)
+            self.assertFalse(self.a_called)
+            if test_support.verbose:
+                print "HandlerBCalled exception caught"
+
+        child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
+        if child:
+            self.wait(child)  # Nothing should happen.
+
+        try:
+            signal.alarm(1)
+            # The race condition in pause doesn't matter in this case,
+            # since alarm is going to raise a KeyboardException, which
+            # will skip the call.
+            signal.pause()
+            # But if another signal arrives before the alarm, pause
+            # may return early.
+            time.sleep(1)
+        except KeyboardInterrupt:
+            if test_support.verbose:
+                print "KeyboardInterrupt (the alarm() went off)"
+        except:
+            self.fail("Some other exception woke us from pause: %s" %
+                      traceback.format_exc())
+        else:
+            self.fail("pause returned of its own accord, and the signal"
+                      " didn't arrive after another second.")
+
+    def test_main(self):
+        # This function spawns a child process to insulate the main
+        # test-running process from all the signals. It then
+        # communicates with that child process over a pipe and
+        # re-raises information about any exceptions the child
+        # throws. The real work happens in self.run_test().
+        os_done_r, os_done_w = os.pipe()
+        with nested(closing(os.fdopen(os_done_r)),
+                    closing(os.fdopen(os_done_w, 'w'))) as (done_r, done_w):
+            child = os.fork()
+            if child == 0:
+                # In the child process; run the test and report results
+                # through the pipe.
+                try:
+                    done_r.close()
+                    # Have to close done_w again here because
+                    # exit_subprocess() will skip the enclosing with block.
+                    with closing(done_w):
+                        try:
+                            self.run_test()
+                        except:
+                            pickle.dump(traceback.format_exc(), done_w)
+                        else:
+                            pickle.dump(None, done_w)
+                except:
+                    print 'Uh oh, raised from pickle.'
+                    traceback.print_exc()
+                finally:
+                    exit_subprocess()
+
+            done_w.close()
+            # Block for up to MAX_DURATION seconds for the test to finish.
+            r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
+            if done_r in r:
+                tb = pickle.load(done_r)
+                if tb:
+                    self.fail(tb)
+            else:
+                os.kill(child, signal.SIGKILL)
+                self.fail('Test deadlocked after %d seconds.' %
+                          self.MAX_DURATION)
+
+
+class BasicSignalTests(unittest.TestCase):
+    def trivial_signal_handler(self, *args):
+        pass
+
+    def test_out_of_range_signal_number_raises_error(self):
+        self.assertRaises(ValueError, signal.getsignal, 4242)
+
+        self.assertRaises(ValueError, signal.signal, 4242,
+                          self.trivial_signal_handler)
+
+    def test_setting_signal_handler_to_none_raises_error(self):
+        self.assertRaises(TypeError, signal.signal,
+                          signal.SIGUSR1, None)
+
+    def test_getsignal(self):
+        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
+        self.assertEquals(signal.getsignal(signal.SIGHUP),
+                          self.trivial_signal_handler)
+        signal.signal(signal.SIGHUP, hup)
+        self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
+
+
+class WakeupSignalTests(unittest.TestCase):
+    TIMEOUT_FULL = 10
+    TIMEOUT_HALF = 5
+
+    def test_wakeup_fd_early(self):
+        import select
+
+        signal.alarm(1)
+        before_time = time.time()
+        # We attempt to get a signal during the sleep,
+        # before select is called
+        time.sleep(self.TIMEOUT_FULL)
+        mid_time = time.time()
+        self.assert_(mid_time - before_time < self.TIMEOUT_HALF)
+        select.select([self.read], [], [], self.TIMEOUT_FULL)
+        after_time = time.time()
+        self.assert_(after_time - mid_time < self.TIMEOUT_HALF)
+
+    def test_wakeup_fd_during(self):
+        import select
+
+        signal.alarm(1)
+        before_time = time.time()
+        # We attempt to get a signal during the select call
+        self.assertRaises(select.error, select.select,
+            [self.read], [], [], self.TIMEOUT_FULL)
+        after_time = time.time()
+        self.assert_(after_time - before_time < self.TIMEOUT_HALF)
+
+    def setUp(self):
+        import fcntl
+
+        self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
+        self.read, self.write = os.pipe()
+        flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
+        flags = flags | os.O_NONBLOCK
+        fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
+        self.old_wakeup = signal.set_wakeup_fd(self.write)
+
+    def tearDown(self):
+        signal.set_wakeup_fd(self.old_wakeup)
+        os.close(self.read)
+        os.close(self.write)
+        signal.signal(signal.SIGALRM, self.alrm)
+
+class SiginterruptTest(unittest.TestCase):
+    signum = signal.SIGUSR1
+    def readpipe_interrupted(self, cb):
+        r, w = os.pipe()
+        ppid = os.getpid()
+        pid = os.fork()
+
+        oldhandler = signal.signal(self.signum, lambda x,y: None)
+        cb()
+        if pid==0:
+            # child code: sleep, kill, sleep. and then exit,
+            # which closes the pipe from which the parent process reads
+            try:
+                time.sleep(0.2)
+                os.kill(ppid, self.signum)
+                time.sleep(0.2)
+            finally:
+                exit_subprocess()
+
+        try:
+            os.close(w)
+
+            try:
+                d=os.read(r, 1)
+                return False
+            except OSError, err:
+                if err.errno != errno.EINTR:
+                    raise
+                return True
+        finally:
+            signal.signal(self.signum, oldhandler)
+            os.waitpid(pid, 0)
+
+    def test_without_siginterrupt(self):
+        i=self.readpipe_interrupted(lambda: None)
+        self.assertEquals(i, True)
+
+    def test_siginterrupt_on(self):
+        i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
+        self.assertEquals(i, True)
+
+    def test_siginterrupt_off(self):
+        i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
+        self.assertEquals(i, False)
+
+class ItimerTest(unittest.TestCase):
+    def setUp(self):
+        self.hndl_called = False
+        self.hndl_count = 0
+        self.itimer = None
+        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
+
+    def tearDown(self):
+        signal.signal(signal.SIGALRM, self.old_alarm)
+        if self.itimer is not None: # test_itimer_exc doesn't change this attr
+            # just ensure that itimer is stopped
+            signal.setitimer(self.itimer, 0)
+
+    def sig_alrm(self, *args):
+        self.hndl_called = True
+        if test_support.verbose:
+            print("SIGALRM handler invoked", args)
+
+    def sig_vtalrm(self, *args):
+        self.hndl_called = True
+
+        if self.hndl_count > 3:
+            # it shouldn't be here, because it should have been disabled.
+            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
+                "timer.")
+        elif self.hndl_count == 3:
+            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
+            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
+            if test_support.verbose:
+                print("last SIGVTALRM handler call")
+
+        self.hndl_count += 1
+
+        if test_support.verbose:
+            print("SIGVTALRM handler invoked", args)
+
+    def sig_prof(self, *args):
+        self.hndl_called = True
+        signal.setitimer(signal.ITIMER_PROF, 0)
+
+        if test_support.verbose:
+            print("SIGPROF handler invoked", args)
+
+    def test_itimer_exc(self):
+        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
+        # defines it ?
+        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
+        # Negative times are treated as zero on some platforms.
+        if 0:
+            self.assertRaises(signal.ItimerError,
+                              signal.setitimer, signal.ITIMER_REAL, -1)
+
+    def test_itimer_real(self):
+        self.itimer = signal.ITIMER_REAL
+        signal.setitimer(self.itimer, 1.0)
+        if test_support.verbose:
+            print("\ncall pause()...")
+        signal.pause()
+
+        self.assertEqual(self.hndl_called, True)
+
+    def test_itimer_virtual(self):
+        self.itimer = signal.ITIMER_VIRTUAL
+        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
+        signal.setitimer(self.itimer, 0.3, 0.2)
+
+        for i in xrange(100000000):
+            if signal.getitimer(self.itimer) == (0.0, 0.0):
+                break # sig_vtalrm handler stopped this itimer
+
+        # virtual itimer should be (0.0, 0.0) now
+        self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
+        # and the handler should have been called
+        self.assertEquals(self.hndl_called, True)
+
+    def test_itimer_prof(self):
+        self.itimer = signal.ITIMER_PROF
+        signal.signal(signal.SIGPROF, self.sig_prof)
+        signal.setitimer(self.itimer, 0.2, 0.2)
+
+        for i in xrange(100000000):
+            if signal.getitimer(self.itimer) == (0.0, 0.0):
+                break # sig_prof handler stopped this itimer
+
+        # profiling itimer should be (0.0, 0.0) now
+        self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0))
+        # and the handler should have been called
+        self.assertEqual(self.hndl_called, True)
+
+def test_main():
+    test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
+        WakeupSignalTests, SiginterruptTest, ItimerTest)
+
+
+if __name__ == "__main__":
+    test_main()