|
1 import pty, os, sys, signal |
|
2 from test.test_support import verbose, TestFailed, TestSkipped |
|
3 |
|
4 TEST_STRING_1 = "I wish to buy a fish license.\n" |
|
5 TEST_STRING_2 = "For my pet fish, Eric.\n" |
|
6 |
|
7 if verbose: |
|
8 def debug(msg): |
|
9 print msg |
|
10 else: |
|
11 def debug(msg): |
|
12 pass |
|
13 |
|
14 def normalize_output(data): |
|
15 # Some operating systems do conversions on newline. We could possibly |
|
16 # fix that by doing the appropriate termios.tcsetattr()s. I couldn't |
|
17 # figure out the right combo on Tru64 and I don't have an IRIX box. |
|
18 # So just normalize the output and doc the problem O/Ses by allowing |
|
19 # certain combinations for some platforms, but avoid allowing other |
|
20 # differences (like extra whitespace, trailing garbage, etc.) |
|
21 |
|
22 # This is about the best we can do without getting some feedback |
|
23 # from someone more knowledgable. |
|
24 |
|
25 # OSF/1 (Tru64) apparently turns \n into \r\r\n. |
|
26 if data.endswith('\r\r\n'): |
|
27 return data.replace('\r\r\n', '\n') |
|
28 |
|
29 # IRIX apparently turns \n into \r\n. |
|
30 if data.endswith('\r\n'): |
|
31 return data.replace('\r\n', '\n') |
|
32 |
|
33 return data |
|
34 |
|
35 # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing |
|
36 # because pty code is not too portable. |
|
37 |
|
38 def test_basic_pty(): |
|
39 try: |
|
40 debug("Calling master_open()") |
|
41 master_fd, slave_name = pty.master_open() |
|
42 debug("Got master_fd '%d', slave_name '%s'"%(master_fd, slave_name)) |
|
43 debug("Calling slave_open(%r)"%(slave_name,)) |
|
44 slave_fd = pty.slave_open(slave_name) |
|
45 debug("Got slave_fd '%d'"%slave_fd) |
|
46 except OSError: |
|
47 # " An optional feature could not be imported " ... ? |
|
48 raise TestSkipped, "Pseudo-terminals (seemingly) not functional." |
|
49 |
|
50 if not os.isatty(slave_fd): |
|
51 raise TestFailed, "slave_fd is not a tty" |
|
52 |
|
53 debug("Writing to slave_fd") |
|
54 os.write(slave_fd, TEST_STRING_1) |
|
55 s1 = os.read(master_fd, 1024) |
|
56 sys.stdout.write(normalize_output(s1)) |
|
57 |
|
58 debug("Writing chunked output") |
|
59 os.write(slave_fd, TEST_STRING_2[:5]) |
|
60 os.write(slave_fd, TEST_STRING_2[5:]) |
|
61 s2 = os.read(master_fd, 1024) |
|
62 sys.stdout.write(normalize_output(s2)) |
|
63 |
|
64 os.close(slave_fd) |
|
65 os.close(master_fd) |
|
66 |
|
67 def handle_sig(sig, frame): |
|
68 raise TestFailed, "isatty hung" |
|
69 |
|
70 # isatty() and close() can hang on some platforms |
|
71 # set an alarm before running the test to make sure we don't hang forever |
|
72 old_alarm = signal.signal(signal.SIGALRM, handle_sig) |
|
73 signal.alarm(10) |
|
74 |
|
75 try: |
|
76 test_basic_pty() |
|
77 finally: |
|
78 # remove alarm, restore old alarm handler |
|
79 signal.alarm(0) |
|
80 signal.signal(signal.SIGALRM, old_alarm) |
|
81 |
|
82 # basic pty passed. |
|
83 |
|
84 debug("calling pty.fork()") |
|
85 pid, master_fd = pty.fork() |
|
86 if pid == pty.CHILD: |
|
87 # stdout should be connected to a tty. |
|
88 if not os.isatty(1): |
|
89 debug("Child's fd 1 is not a tty?!") |
|
90 os._exit(3) |
|
91 |
|
92 # After pty.fork(), the child should already be a session leader. |
|
93 # (on those systems that have that concept.) |
|
94 debug("In child, calling os.setsid()") |
|
95 try: |
|
96 os.setsid() |
|
97 except OSError: |
|
98 # Good, we already were session leader |
|
99 debug("Good: OSError was raised.") |
|
100 pass |
|
101 except AttributeError: |
|
102 # Have pty, but not setsid() ? |
|
103 debug("No setsid() available ?") |
|
104 pass |
|
105 except: |
|
106 # We don't want this error to propagate, escaping the call to |
|
107 # os._exit() and causing very peculiar behavior in the calling |
|
108 # regrtest.py ! |
|
109 # Note: could add traceback printing here. |
|
110 debug("An unexpected error was raised.") |
|
111 os._exit(1) |
|
112 else: |
|
113 debug("os.setsid() succeeded! (bad!)") |
|
114 os._exit(2) |
|
115 os._exit(4) |
|
116 else: |
|
117 debug("Waiting for child (%d) to finish."%pid) |
|
118 # In verbose mode, we have to consume the debug output from the child or |
|
119 # the child will block, causing this test to hang in the parent's |
|
120 # waitpid() call. The child blocks after a platform-dependent amount of |
|
121 # data is written to its fd. On Linux 2.6, it's 4000 bytes and the child |
|
122 # won't block, but on OS X even the small writes in the child above will |
|
123 # block it. Also on Linux, the read() will throw an OSError (input/output |
|
124 # error) when it tries to read past the end of the buffer but the child's |
|
125 # already exited, so catch and discard those exceptions. It's not worth |
|
126 # checking for EIO. |
|
127 while True: |
|
128 try: |
|
129 data = os.read(master_fd, 80) |
|
130 except OSError: |
|
131 break |
|
132 if not data: |
|
133 break |
|
134 sys.stdout.write(data.replace('\r\n', '\n')) |
|
135 |
|
136 ##line = os.read(master_fd, 80) |
|
137 ##lines = line.replace('\r\n', '\n').split('\n') |
|
138 ##if False and lines != ['In child, calling os.setsid()', |
|
139 ## 'Good: OSError was raised.', '']: |
|
140 ## raise TestFailed("Unexpected output from child: %r" % line) |
|
141 |
|
142 (pid, status) = os.waitpid(pid, 0) |
|
143 res = status >> 8 |
|
144 debug("Child (%d) exited with status %d (%d)."%(pid, res, status)) |
|
145 if res == 1: |
|
146 raise TestFailed, "Child raised an unexpected exception in os.setsid()" |
|
147 elif res == 2: |
|
148 raise TestFailed, "pty.fork() failed to make child a session leader." |
|
149 elif res == 3: |
|
150 raise TestFailed, "Child spawned by pty.fork() did not have a tty as stdout" |
|
151 elif res != 4: |
|
152 raise TestFailed, "pty.fork() failed for unknown reasons." |
|
153 |
|
154 ##debug("Reading from master_fd now that the child has exited") |
|
155 ##try: |
|
156 ## s1 = os.read(master_fd, 1024) |
|
157 ##except os.error: |
|
158 ## pass |
|
159 ##else: |
|
160 ## raise TestFailed("Read from master_fd did not raise exception") |
|
161 |
|
162 |
|
163 os.close(master_fd) |
|
164 |
|
165 # pty.fork() passed. |