python-2.5.2/win32/Lib/test/test_pty.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 import pty, os, sys, signal
       
     2 from test.test_support import verbose, TestFailed, TestSkipped
       
     3 
       
     4 TEST_STRING_1 = "I wish to buy a fish license.\n"
       
     5 TEST_STRING_2 = "For my pet fish, Eric.\n"
       
     6 
       
     7 if verbose:
       
     8     def debug(msg):
       
     9         print msg
       
    10 else:
       
    11     def debug(msg):
       
    12         pass
       
    13 
       
    14 def normalize_output(data):
       
    15     # Some operating systems do conversions on newline.  We could possibly
       
    16     # fix that by doing the appropriate termios.tcsetattr()s.  I couldn't
       
    17     # figure out the right combo on Tru64 and I don't have an IRIX box.
       
    18     # So just normalize the output and doc the problem O/Ses by allowing
       
    19     # certain combinations for some platforms, but avoid allowing other
       
    20     # differences (like extra whitespace, trailing garbage, etc.)
       
    21 
       
    22     # This is about the best we can do without getting some feedback
       
    23     # from someone more knowledgable.
       
    24 
       
    25     # OSF/1 (Tru64) apparently turns \n into \r\r\n.
       
    26     if data.endswith('\r\r\n'):
       
    27         return data.replace('\r\r\n', '\n')
       
    28 
       
    29     # IRIX apparently turns \n into \r\n.
       
    30     if data.endswith('\r\n'):
       
    31         return data.replace('\r\n', '\n')
       
    32 
       
    33     return data
       
    34 
       
    35 # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
       
    36 # because pty code is not too portable.
       
    37 
       
    38 def test_basic_pty():
       
    39     try:
       
    40         debug("Calling master_open()")
       
    41         master_fd, slave_name = pty.master_open()
       
    42         debug("Got master_fd '%d', slave_name '%s'"%(master_fd, slave_name))
       
    43         debug("Calling slave_open(%r)"%(slave_name,))
       
    44         slave_fd = pty.slave_open(slave_name)
       
    45         debug("Got slave_fd '%d'"%slave_fd)
       
    46     except OSError:
       
    47         # " An optional feature could not be imported " ... ?
       
    48         raise TestSkipped, "Pseudo-terminals (seemingly) not functional."
       
    49 
       
    50     if not os.isatty(slave_fd):
       
    51         raise TestFailed, "slave_fd is not a tty"
       
    52 
       
    53     debug("Writing to slave_fd")
       
    54     os.write(slave_fd, TEST_STRING_1)
       
    55     s1 = os.read(master_fd, 1024)
       
    56     sys.stdout.write(normalize_output(s1))
       
    57 
       
    58     debug("Writing chunked output")
       
    59     os.write(slave_fd, TEST_STRING_2[:5])
       
    60     os.write(slave_fd, TEST_STRING_2[5:])
       
    61     s2 = os.read(master_fd, 1024)
       
    62     sys.stdout.write(normalize_output(s2))
       
    63 
       
    64     os.close(slave_fd)
       
    65     os.close(master_fd)
       
    66 
       
    67 def handle_sig(sig, frame):
       
    68     raise TestFailed, "isatty hung"
       
    69 
       
    70 # isatty() and close() can hang on some platforms
       
    71 # set an alarm before running the test to make sure we don't hang forever
       
    72 old_alarm = signal.signal(signal.SIGALRM, handle_sig)
       
    73 signal.alarm(10)
       
    74 
       
    75 try:
       
    76     test_basic_pty()
       
    77 finally:
       
    78     # remove alarm, restore old alarm handler
       
    79     signal.alarm(0)
       
    80     signal.signal(signal.SIGALRM, old_alarm)
       
    81 
       
    82 # basic pty passed.
       
    83 
       
    84 debug("calling pty.fork()")
       
    85 pid, master_fd = pty.fork()
       
    86 if pid == pty.CHILD:
       
    87     # stdout should be connected to a tty.
       
    88     if not os.isatty(1):
       
    89         debug("Child's fd 1 is not a tty?!")
       
    90         os._exit(3)
       
    91 
       
    92     # After pty.fork(), the child should already be a session leader.
       
    93     # (on those systems that have that concept.)
       
    94     debug("In child, calling os.setsid()")
       
    95     try:
       
    96         os.setsid()
       
    97     except OSError:
       
    98         # Good, we already were session leader
       
    99         debug("Good: OSError was raised.")
       
   100         pass
       
   101     except AttributeError:
       
   102         # Have pty, but not setsid() ?
       
   103         debug("No setsid() available ?")
       
   104         pass
       
   105     except:
       
   106         # We don't want this error to propagate, escaping the call to
       
   107         # os._exit() and causing very peculiar behavior in the calling
       
   108         # regrtest.py !
       
   109         # Note: could add traceback printing here.
       
   110         debug("An unexpected error was raised.")
       
   111         os._exit(1)
       
   112     else:
       
   113         debug("os.setsid() succeeded! (bad!)")
       
   114         os._exit(2)
       
   115     os._exit(4)
       
   116 else:
       
   117     debug("Waiting for child (%d) to finish."%pid)
       
   118     # In verbose mode, we have to consume the debug output from the child or
       
   119     # the child will block, causing this test to hang in the parent's
       
   120     # waitpid() call.  The child blocks after a platform-dependent amount of
       
   121     # data is written to its fd.  On Linux 2.6, it's 4000 bytes and the child
       
   122     # won't block, but on OS X even the small writes in the child above will
       
   123     # block it.  Also on Linux, the read() will throw an OSError (input/output
       
   124     # error) when it tries to read past the end of the buffer but the child's
       
   125     # already exited, so catch and discard those exceptions.  It's not worth
       
   126     # checking for EIO.
       
   127     while True:
       
   128         try:
       
   129             data = os.read(master_fd, 80)
       
   130         except OSError:
       
   131             break
       
   132         if not data:
       
   133             break
       
   134         sys.stdout.write(data.replace('\r\n', '\n'))
       
   135 
       
   136     ##line = os.read(master_fd, 80)
       
   137     ##lines = line.replace('\r\n', '\n').split('\n')
       
   138     ##if False and lines != ['In child, calling os.setsid()',
       
   139     ##             'Good: OSError was raised.', '']:
       
   140     ##    raise TestFailed("Unexpected output from child: %r" % line)
       
   141 
       
   142     (pid, status) = os.waitpid(pid, 0)
       
   143     res = status >> 8
       
   144     debug("Child (%d) exited with status %d (%d)."%(pid, res, status))
       
   145     if res == 1:
       
   146         raise TestFailed, "Child raised an unexpected exception in os.setsid()"
       
   147     elif res == 2:
       
   148         raise TestFailed, "pty.fork() failed to make child a session leader."
       
   149     elif res == 3:
       
   150         raise TestFailed, "Child spawned by pty.fork() did not have a tty as stdout"
       
   151     elif res != 4:
       
   152         raise TestFailed, "pty.fork() failed for unknown reasons."
       
   153 
       
   154     ##debug("Reading from master_fd now that the child has exited")
       
   155     ##try:
       
   156     ##    s1 = os.read(master_fd, 1024)
       
   157     ##except os.error:
       
   158     ##    pass
       
   159     ##else:
       
   160     ##    raise TestFailed("Read from master_fd did not raise exception")
       
   161 
       
   162 
       
   163 os.close(master_fd)
       
   164 
       
   165 # pty.fork() passed.