|
1 import unittest |
|
2 from test import test_support |
|
3 import subprocess |
|
4 import sys |
|
5 import signal |
|
6 import os |
|
7 import tempfile |
|
8 import time |
|
9 import re |
|
10 |
|
11 mswindows = (sys.platform == "win32") |
|
12 |
|
13 # |
|
14 # Depends on the following external programs: Python |
|
15 # |
|
16 |
|
17 if mswindows: |
|
18 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' |
|
19 'os.O_BINARY);') |
|
20 else: |
|
21 SETBINARY = '' |
|
22 |
|
23 # In a debug build, stuff like "[6580 refs]" is printed to stderr at |
|
24 # shutdown time. That frustrates tests trying to check stderr produced |
|
25 # from a spawned Python process. |
|
26 def remove_stderr_debug_decorations(stderr): |
|
27 return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) |
|
28 |
|
29 class ProcessTestCase(unittest.TestCase): |
|
30 def setUp(self): |
|
31 # Try to minimize the number of children we have so this test |
|
32 # doesn't crash on some buildbots (Alphas in particular). |
|
33 if hasattr(test_support, "reap_children"): |
|
34 test_support.reap_children() |
|
35 |
|
36 def tearDown(self): |
|
37 # Try to minimize the number of children we have so this test |
|
38 # doesn't crash on some buildbots (Alphas in particular). |
|
39 if hasattr(test_support, "reap_children"): |
|
40 test_support.reap_children() |
|
41 |
|
42 def mkstemp(self): |
|
43 """wrapper for mkstemp, calling mktemp if mkstemp is not available""" |
|
44 if hasattr(tempfile, "mkstemp"): |
|
45 return tempfile.mkstemp() |
|
46 else: |
|
47 fname = tempfile.mktemp() |
|
48 return os.open(fname, os.O_RDWR|os.O_CREAT), fname |
|
49 |
|
50 # |
|
51 # Generic tests |
|
52 # |
|
53 def test_call_seq(self): |
|
54 # call() function with sequence argument |
|
55 rc = subprocess.call([sys.executable, "-c", |
|
56 "import sys; sys.exit(47)"]) |
|
57 self.assertEqual(rc, 47) |
|
58 |
|
59 def test_check_call_zero(self): |
|
60 # check_call() function with zero return code |
|
61 rc = subprocess.check_call([sys.executable, "-c", |
|
62 "import sys; sys.exit(0)"]) |
|
63 self.assertEqual(rc, 0) |
|
64 |
|
65 def test_check_call_nonzero(self): |
|
66 # check_call() function with non-zero return code |
|
67 try: |
|
68 subprocess.check_call([sys.executable, "-c", |
|
69 "import sys; sys.exit(47)"]) |
|
70 except subprocess.CalledProcessError, e: |
|
71 self.assertEqual(e.returncode, 47) |
|
72 else: |
|
73 self.fail("Expected CalledProcessError") |
|
74 |
|
75 def test_call_kwargs(self): |
|
76 # call() function with keyword args |
|
77 newenv = os.environ.copy() |
|
78 newenv["FRUIT"] = "banana" |
|
79 rc = subprocess.call([sys.executable, "-c", |
|
80 'import sys, os;' \ |
|
81 'sys.exit(os.getenv("FRUIT")=="banana")'], |
|
82 env=newenv) |
|
83 self.assertEqual(rc, 1) |
|
84 |
|
85 def test_stdin_none(self): |
|
86 # .stdin is None when not redirected |
|
87 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], |
|
88 stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
89 p.wait() |
|
90 self.assertEqual(p.stdin, None) |
|
91 |
|
92 def test_stdout_none(self): |
|
93 # .stdout is None when not redirected |
|
94 p = subprocess.Popen([sys.executable, "-c", |
|
95 'print " this bit of output is from a ' |
|
96 'test of stdout in a different ' |
|
97 'process ..."'], |
|
98 stdin=subprocess.PIPE, stderr=subprocess.PIPE) |
|
99 p.wait() |
|
100 self.assertEqual(p.stdout, None) |
|
101 |
|
102 def test_stderr_none(self): |
|
103 # .stderr is None when not redirected |
|
104 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], |
|
105 stdin=subprocess.PIPE, stdout=subprocess.PIPE) |
|
106 p.wait() |
|
107 self.assertEqual(p.stderr, None) |
|
108 |
|
109 def test_executable(self): |
|
110 p = subprocess.Popen(["somethingyoudonthave", |
|
111 "-c", "import sys; sys.exit(47)"], |
|
112 executable=sys.executable) |
|
113 p.wait() |
|
114 self.assertEqual(p.returncode, 47) |
|
115 |
|
116 def test_stdin_pipe(self): |
|
117 # stdin redirection |
|
118 p = subprocess.Popen([sys.executable, "-c", |
|
119 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
|
120 stdin=subprocess.PIPE) |
|
121 p.stdin.write("pear") |
|
122 p.stdin.close() |
|
123 p.wait() |
|
124 self.assertEqual(p.returncode, 1) |
|
125 |
|
126 def test_stdin_filedes(self): |
|
127 # stdin is set to open file descriptor |
|
128 tf = tempfile.TemporaryFile() |
|
129 d = tf.fileno() |
|
130 os.write(d, "pear") |
|
131 os.lseek(d, 0, 0) |
|
132 p = subprocess.Popen([sys.executable, "-c", |
|
133 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
|
134 stdin=d) |
|
135 p.wait() |
|
136 self.assertEqual(p.returncode, 1) |
|
137 |
|
138 def test_stdin_fileobj(self): |
|
139 # stdin is set to open file object |
|
140 tf = tempfile.TemporaryFile() |
|
141 tf.write("pear") |
|
142 tf.seek(0) |
|
143 p = subprocess.Popen([sys.executable, "-c", |
|
144 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
|
145 stdin=tf) |
|
146 p.wait() |
|
147 self.assertEqual(p.returncode, 1) |
|
148 |
|
149 def test_stdout_pipe(self): |
|
150 # stdout redirection |
|
151 p = subprocess.Popen([sys.executable, "-c", |
|
152 'import sys; sys.stdout.write("orange")'], |
|
153 stdout=subprocess.PIPE) |
|
154 self.assertEqual(p.stdout.read(), "orange") |
|
155 |
|
156 def test_stdout_filedes(self): |
|
157 # stdout is set to open file descriptor |
|
158 tf = tempfile.TemporaryFile() |
|
159 d = tf.fileno() |
|
160 p = subprocess.Popen([sys.executable, "-c", |
|
161 'import sys; sys.stdout.write("orange")'], |
|
162 stdout=d) |
|
163 p.wait() |
|
164 os.lseek(d, 0, 0) |
|
165 self.assertEqual(os.read(d, 1024), "orange") |
|
166 |
|
167 def test_stdout_fileobj(self): |
|
168 # stdout is set to open file object |
|
169 tf = tempfile.TemporaryFile() |
|
170 p = subprocess.Popen([sys.executable, "-c", |
|
171 'import sys; sys.stdout.write("orange")'], |
|
172 stdout=tf) |
|
173 p.wait() |
|
174 tf.seek(0) |
|
175 self.assertEqual(tf.read(), "orange") |
|
176 |
|
177 def test_stderr_pipe(self): |
|
178 # stderr redirection |
|
179 p = subprocess.Popen([sys.executable, "-c", |
|
180 'import sys; sys.stderr.write("strawberry")'], |
|
181 stderr=subprocess.PIPE) |
|
182 self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()), |
|
183 "strawberry") |
|
184 |
|
185 def test_stderr_filedes(self): |
|
186 # stderr is set to open file descriptor |
|
187 tf = tempfile.TemporaryFile() |
|
188 d = tf.fileno() |
|
189 p = subprocess.Popen([sys.executable, "-c", |
|
190 'import sys; sys.stderr.write("strawberry")'], |
|
191 stderr=d) |
|
192 p.wait() |
|
193 os.lseek(d, 0, 0) |
|
194 self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)), |
|
195 "strawberry") |
|
196 |
|
197 def test_stderr_fileobj(self): |
|
198 # stderr is set to open file object |
|
199 tf = tempfile.TemporaryFile() |
|
200 p = subprocess.Popen([sys.executable, "-c", |
|
201 'import sys; sys.stderr.write("strawberry")'], |
|
202 stderr=tf) |
|
203 p.wait() |
|
204 tf.seek(0) |
|
205 self.assertEqual(remove_stderr_debug_decorations(tf.read()), |
|
206 "strawberry") |
|
207 |
|
208 def test_stdout_stderr_pipe(self): |
|
209 # capture stdout and stderr to the same pipe |
|
210 p = subprocess.Popen([sys.executable, "-c", |
|
211 'import sys;' \ |
|
212 'sys.stdout.write("apple");' \ |
|
213 'sys.stdout.flush();' \ |
|
214 'sys.stderr.write("orange")'], |
|
215 stdout=subprocess.PIPE, |
|
216 stderr=subprocess.STDOUT) |
|
217 output = p.stdout.read() |
|
218 stripped = remove_stderr_debug_decorations(output) |
|
219 self.assertEqual(stripped, "appleorange") |
|
220 |
|
221 def test_stdout_stderr_file(self): |
|
222 # capture stdout and stderr to the same open file |
|
223 tf = tempfile.TemporaryFile() |
|
224 p = subprocess.Popen([sys.executable, "-c", |
|
225 'import sys;' \ |
|
226 'sys.stdout.write("apple");' \ |
|
227 'sys.stdout.flush();' \ |
|
228 'sys.stderr.write("orange")'], |
|
229 stdout=tf, |
|
230 stderr=tf) |
|
231 p.wait() |
|
232 tf.seek(0) |
|
233 output = tf.read() |
|
234 stripped = remove_stderr_debug_decorations(output) |
|
235 self.assertEqual(stripped, "appleorange") |
|
236 |
|
237 def test_stdout_filedes_of_stdout(self): |
|
238 # stdout is set to 1 (#1531862). |
|
239 cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))" |
|
240 rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) |
|
241 self.assertEquals(rc, 2) |
|
242 |
|
243 def test_cwd(self): |
|
244 tmpdir = tempfile.gettempdir() |
|
245 # We cannot use os.path.realpath to canonicalize the path, |
|
246 # since it doesn't expand Tru64 {memb} strings. See bug 1063571. |
|
247 cwd = os.getcwd() |
|
248 os.chdir(tmpdir) |
|
249 tmpdir = os.getcwd() |
|
250 os.chdir(cwd) |
|
251 p = subprocess.Popen([sys.executable, "-c", |
|
252 'import sys,os;' \ |
|
253 'sys.stdout.write(os.getcwd())'], |
|
254 stdout=subprocess.PIPE, |
|
255 cwd=tmpdir) |
|
256 normcase = os.path.normcase |
|
257 self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) |
|
258 |
|
259 def test_env(self): |
|
260 newenv = os.environ.copy() |
|
261 newenv["FRUIT"] = "orange" |
|
262 p = subprocess.Popen([sys.executable, "-c", |
|
263 'import sys,os;' \ |
|
264 'sys.stdout.write(os.getenv("FRUIT"))'], |
|
265 stdout=subprocess.PIPE, |
|
266 env=newenv) |
|
267 self.assertEqual(p.stdout.read(), "orange") |
|
268 |
|
269 def test_communicate_stdin(self): |
|
270 p = subprocess.Popen([sys.executable, "-c", |
|
271 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
|
272 stdin=subprocess.PIPE) |
|
273 p.communicate("pear") |
|
274 self.assertEqual(p.returncode, 1) |
|
275 |
|
276 def test_communicate_stdout(self): |
|
277 p = subprocess.Popen([sys.executable, "-c", |
|
278 'import sys; sys.stdout.write("pineapple")'], |
|
279 stdout=subprocess.PIPE) |
|
280 (stdout, stderr) = p.communicate() |
|
281 self.assertEqual(stdout, "pineapple") |
|
282 self.assertEqual(stderr, None) |
|
283 |
|
284 def test_communicate_stderr(self): |
|
285 p = subprocess.Popen([sys.executable, "-c", |
|
286 'import sys; sys.stderr.write("pineapple")'], |
|
287 stderr=subprocess.PIPE) |
|
288 (stdout, stderr) = p.communicate() |
|
289 self.assertEqual(stdout, None) |
|
290 self.assertEqual(remove_stderr_debug_decorations(stderr), "pineapple") |
|
291 |
|
292 def test_communicate(self): |
|
293 p = subprocess.Popen([sys.executable, "-c", |
|
294 'import sys,os;' |
|
295 'sys.stderr.write("pineapple");' |
|
296 'sys.stdout.write(sys.stdin.read())'], |
|
297 stdin=subprocess.PIPE, |
|
298 stdout=subprocess.PIPE, |
|
299 stderr=subprocess.PIPE) |
|
300 (stdout, stderr) = p.communicate("banana") |
|
301 self.assertEqual(stdout, "banana") |
|
302 self.assertEqual(remove_stderr_debug_decorations(stderr), |
|
303 "pineapple") |
|
304 |
|
305 # This test is Linux specific for simplicity to at least have |
|
306 # some coverage. It is not a platform specific bug. |
|
307 if os.path.isdir('/proc/%d/fd' % os.getpid()): |
|
308 # Test for the fd leak reported in http://bugs.python.org/issue2791. |
|
309 def test_communicate_pipe_fd_leak(self): |
|
310 fd_directory = '/proc/%d/fd' % os.getpid() |
|
311 num_fds_before_popen = len(os.listdir(fd_directory)) |
|
312 p = subprocess.Popen([sys.executable, '-c', 'print()'], |
|
313 stdout=subprocess.PIPE) |
|
314 p.communicate() |
|
315 num_fds_after_communicate = len(os.listdir(fd_directory)) |
|
316 del p |
|
317 num_fds_after_destruction = len(os.listdir(fd_directory)) |
|
318 self.assertEqual(num_fds_before_popen, num_fds_after_destruction) |
|
319 self.assertEqual(num_fds_before_popen, num_fds_after_communicate) |
|
320 |
|
321 def test_communicate_returns(self): |
|
322 # communicate() should return None if no redirection is active |
|
323 p = subprocess.Popen([sys.executable, "-c", |
|
324 "import sys; sys.exit(47)"]) |
|
325 (stdout, stderr) = p.communicate() |
|
326 self.assertEqual(stdout, None) |
|
327 self.assertEqual(stderr, None) |
|
328 |
|
329 def test_communicate_pipe_buf(self): |
|
330 # communicate() with writes larger than pipe_buf |
|
331 # This test will probably deadlock rather than fail, if |
|
332 # communicate() does not work properly. |
|
333 x, y = os.pipe() |
|
334 if mswindows: |
|
335 pipe_buf = 512 |
|
336 else: |
|
337 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") |
|
338 os.close(x) |
|
339 os.close(y) |
|
340 p = subprocess.Popen([sys.executable, "-c", |
|
341 'import sys,os;' |
|
342 'sys.stdout.write(sys.stdin.read(47));' \ |
|
343 'sys.stderr.write("xyz"*%d);' \ |
|
344 'sys.stdout.write(sys.stdin.read())' % pipe_buf], |
|
345 stdin=subprocess.PIPE, |
|
346 stdout=subprocess.PIPE, |
|
347 stderr=subprocess.PIPE) |
|
348 string_to_write = "abc"*pipe_buf |
|
349 (stdout, stderr) = p.communicate(string_to_write) |
|
350 self.assertEqual(stdout, string_to_write) |
|
351 |
|
352 def test_writes_before_communicate(self): |
|
353 # stdin.write before communicate() |
|
354 p = subprocess.Popen([sys.executable, "-c", |
|
355 'import sys,os;' \ |
|
356 'sys.stdout.write(sys.stdin.read())'], |
|
357 stdin=subprocess.PIPE, |
|
358 stdout=subprocess.PIPE, |
|
359 stderr=subprocess.PIPE) |
|
360 p.stdin.write("banana") |
|
361 (stdout, stderr) = p.communicate("split") |
|
362 self.assertEqual(stdout, "bananasplit") |
|
363 self.assertEqual(remove_stderr_debug_decorations(stderr), "") |
|
364 |
|
365 def test_universal_newlines(self): |
|
366 p = subprocess.Popen([sys.executable, "-c", |
|
367 'import sys,os;' + SETBINARY + |
|
368 'sys.stdout.write("line1\\n");' |
|
369 'sys.stdout.flush();' |
|
370 'sys.stdout.write("line2\\r");' |
|
371 'sys.stdout.flush();' |
|
372 'sys.stdout.write("line3\\r\\n");' |
|
373 'sys.stdout.flush();' |
|
374 'sys.stdout.write("line4\\r");' |
|
375 'sys.stdout.flush();' |
|
376 'sys.stdout.write("\\nline5");' |
|
377 'sys.stdout.flush();' |
|
378 'sys.stdout.write("\\nline6");'], |
|
379 stdout=subprocess.PIPE, |
|
380 universal_newlines=1) |
|
381 stdout = p.stdout.read() |
|
382 if hasattr(file, 'newlines'): |
|
383 # Interpreter with universal newline support |
|
384 self.assertEqual(stdout, |
|
385 "line1\nline2\nline3\nline4\nline5\nline6") |
|
386 else: |
|
387 # Interpreter without universal newline support |
|
388 self.assertEqual(stdout, |
|
389 "line1\nline2\rline3\r\nline4\r\nline5\nline6") |
|
390 |
|
391 def test_universal_newlines_communicate(self): |
|
392 # universal newlines through communicate() |
|
393 p = subprocess.Popen([sys.executable, "-c", |
|
394 'import sys,os;' + SETBINARY + |
|
395 'sys.stdout.write("line1\\n");' |
|
396 'sys.stdout.flush();' |
|
397 'sys.stdout.write("line2\\r");' |
|
398 'sys.stdout.flush();' |
|
399 'sys.stdout.write("line3\\r\\n");' |
|
400 'sys.stdout.flush();' |
|
401 'sys.stdout.write("line4\\r");' |
|
402 'sys.stdout.flush();' |
|
403 'sys.stdout.write("\\nline5");' |
|
404 'sys.stdout.flush();' |
|
405 'sys.stdout.write("\\nline6");'], |
|
406 stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
|
407 universal_newlines=1) |
|
408 (stdout, stderr) = p.communicate() |
|
409 if hasattr(file, 'newlines'): |
|
410 # Interpreter with universal newline support |
|
411 self.assertEqual(stdout, |
|
412 "line1\nline2\nline3\nline4\nline5\nline6") |
|
413 else: |
|
414 # Interpreter without universal newline support |
|
415 self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6") |
|
416 |
|
417 def test_no_leaking(self): |
|
418 # Make sure we leak no resources |
|
419 if not hasattr(test_support, "is_resource_enabled") \ |
|
420 or test_support.is_resource_enabled("subprocess") and not mswindows: |
|
421 max_handles = 1026 # too much for most UNIX systems |
|
422 else: |
|
423 max_handles = 65 |
|
424 for i in range(max_handles): |
|
425 p = subprocess.Popen([sys.executable, "-c", |
|
426 "import sys;sys.stdout.write(sys.stdin.read())"], |
|
427 stdin=subprocess.PIPE, |
|
428 stdout=subprocess.PIPE, |
|
429 stderr=subprocess.PIPE) |
|
430 data = p.communicate("lime")[0] |
|
431 self.assertEqual(data, "lime") |
|
432 |
|
433 |
|
434 def test_list2cmdline(self): |
|
435 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), |
|
436 '"a b c" d e') |
|
437 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), |
|
438 'ab\\"c \\ d') |
|
439 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), |
|
440 'ab\\"c " \\\\" d') |
|
441 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), |
|
442 'a\\\\\\b "de fg" h') |
|
443 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), |
|
444 'a\\\\\\"b c d') |
|
445 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), |
|
446 '"a\\\\b c" d e') |
|
447 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), |
|
448 '"a\\\\b\\ c" d e') |
|
449 self.assertEqual(subprocess.list2cmdline(['ab', '']), |
|
450 'ab ""') |
|
451 self.assertEqual(subprocess.list2cmdline(['echo', 'foo|bar']), |
|
452 'echo "foo|bar"') |
|
453 |
|
454 |
|
455 def test_poll(self): |
|
456 p = subprocess.Popen([sys.executable, |
|
457 "-c", "import time; time.sleep(1)"]) |
|
458 count = 0 |
|
459 while p.poll() is None: |
|
460 time.sleep(0.1) |
|
461 count += 1 |
|
462 # We expect that the poll loop probably went around about 10 times, |
|
463 # but, based on system scheduling we can't control, it's possible |
|
464 # poll() never returned None. It "should be" very rare that it |
|
465 # didn't go around at least twice. |
|
466 self.assert_(count >= 2) |
|
467 # Subsequent invocations should just return the returncode |
|
468 self.assertEqual(p.poll(), 0) |
|
469 |
|
470 |
|
471 def test_wait(self): |
|
472 p = subprocess.Popen([sys.executable, |
|
473 "-c", "import time; time.sleep(2)"]) |
|
474 self.assertEqual(p.wait(), 0) |
|
475 # Subsequent invocations should just return the returncode |
|
476 self.assertEqual(p.wait(), 0) |
|
477 |
|
478 |
|
479 def test_invalid_bufsize(self): |
|
480 # an invalid type of the bufsize argument should raise |
|
481 # TypeError. |
|
482 try: |
|
483 subprocess.Popen([sys.executable, "-c", "pass"], "orange") |
|
484 except TypeError: |
|
485 pass |
|
486 else: |
|
487 self.fail("Expected TypeError") |
|
488 |
|
489 # |
|
490 # POSIX tests |
|
491 # |
|
492 if not mswindows: |
|
493 def test_exceptions(self): |
|
494 # catched & re-raised exceptions |
|
495 try: |
|
496 p = subprocess.Popen([sys.executable, "-c", ""], |
|
497 cwd="/this/path/does/not/exist") |
|
498 except OSError, e: |
|
499 # The attribute child_traceback should contain "os.chdir" |
|
500 # somewhere. |
|
501 self.assertNotEqual(e.child_traceback.find("os.chdir"), -1) |
|
502 else: |
|
503 self.fail("Expected OSError") |
|
504 |
|
505 def _suppress_core_files(self): |
|
506 """Try to prevent core files from being created. |
|
507 Returns previous ulimit if successful, else None. |
|
508 """ |
|
509 try: |
|
510 import resource |
|
511 old_limit = resource.getrlimit(resource.RLIMIT_CORE) |
|
512 resource.setrlimit(resource.RLIMIT_CORE, (0,0)) |
|
513 return old_limit |
|
514 except (ImportError, ValueError, resource.error): |
|
515 return None |
|
516 |
|
517 def _unsuppress_core_files(self, old_limit): |
|
518 """Return core file behavior to default.""" |
|
519 if old_limit is None: |
|
520 return |
|
521 try: |
|
522 import resource |
|
523 resource.setrlimit(resource.RLIMIT_CORE, old_limit) |
|
524 except (ImportError, ValueError, resource.error): |
|
525 return |
|
526 |
|
527 def test_run_abort(self): |
|
528 # returncode handles signal termination |
|
529 old_limit = self._suppress_core_files() |
|
530 try: |
|
531 p = subprocess.Popen([sys.executable, |
|
532 "-c", "import os; os.abort()"]) |
|
533 finally: |
|
534 self._unsuppress_core_files(old_limit) |
|
535 p.wait() |
|
536 self.assertEqual(-p.returncode, signal.SIGABRT) |
|
537 |
|
538 def test_preexec(self): |
|
539 # preexec function |
|
540 p = subprocess.Popen([sys.executable, "-c", |
|
541 'import sys,os;' \ |
|
542 'sys.stdout.write(os.getenv("FRUIT"))'], |
|
543 stdout=subprocess.PIPE, |
|
544 preexec_fn=lambda: os.putenv("FRUIT", "apple")) |
|
545 self.assertEqual(p.stdout.read(), "apple") |
|
546 |
|
547 def test_args_string(self): |
|
548 # args is a string |
|
549 f, fname = self.mkstemp() |
|
550 os.write(f, "#!/bin/sh\n") |
|
551 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % |
|
552 sys.executable) |
|
553 os.close(f) |
|
554 os.chmod(fname, 0700) |
|
555 p = subprocess.Popen(fname) |
|
556 p.wait() |
|
557 os.remove(fname) |
|
558 self.assertEqual(p.returncode, 47) |
|
559 |
|
560 def test_invalid_args(self): |
|
561 # invalid arguments should raise ValueError |
|
562 self.assertRaises(ValueError, subprocess.call, |
|
563 [sys.executable, |
|
564 "-c", "import sys; sys.exit(47)"], |
|
565 startupinfo=47) |
|
566 self.assertRaises(ValueError, subprocess.call, |
|
567 [sys.executable, |
|
568 "-c", "import sys; sys.exit(47)"], |
|
569 creationflags=47) |
|
570 |
|
571 def test_shell_sequence(self): |
|
572 # Run command through the shell (sequence) |
|
573 newenv = os.environ.copy() |
|
574 newenv["FRUIT"] = "apple" |
|
575 p = subprocess.Popen(["echo $FRUIT"], shell=1, |
|
576 stdout=subprocess.PIPE, |
|
577 env=newenv) |
|
578 self.assertEqual(p.stdout.read().strip(), "apple") |
|
579 |
|
580 def test_shell_string(self): |
|
581 # Run command through the shell (string) |
|
582 newenv = os.environ.copy() |
|
583 newenv["FRUIT"] = "apple" |
|
584 p = subprocess.Popen("echo $FRUIT", shell=1, |
|
585 stdout=subprocess.PIPE, |
|
586 env=newenv) |
|
587 self.assertEqual(p.stdout.read().strip(), "apple") |
|
588 |
|
589 def test_call_string(self): |
|
590 # call() function with string argument on UNIX |
|
591 f, fname = self.mkstemp() |
|
592 os.write(f, "#!/bin/sh\n") |
|
593 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % |
|
594 sys.executable) |
|
595 os.close(f) |
|
596 os.chmod(fname, 0700) |
|
597 rc = subprocess.call(fname) |
|
598 os.remove(fname) |
|
599 self.assertEqual(rc, 47) |
|
600 |
|
601 def DISABLED_test_send_signal(self): |
|
602 p = subprocess.Popen([sys.executable, |
|
603 "-c", "input()"]) |
|
604 |
|
605 self.assert_(p.poll() is None, p.poll()) |
|
606 p.send_signal(signal.SIGINT) |
|
607 self.assertNotEqual(p.wait(), 0) |
|
608 |
|
609 def DISABLED_test_kill(self): |
|
610 p = subprocess.Popen([sys.executable, |
|
611 "-c", "input()"]) |
|
612 |
|
613 self.assert_(p.poll() is None, p.poll()) |
|
614 p.kill() |
|
615 self.assertEqual(p.wait(), -signal.SIGKILL) |
|
616 |
|
617 def DISABLED_test_terminate(self): |
|
618 p = subprocess.Popen([sys.executable, |
|
619 "-c", "input()"]) |
|
620 |
|
621 self.assert_(p.poll() is None, p.poll()) |
|
622 p.terminate() |
|
623 self.assertEqual(p.wait(), -signal.SIGTERM) |
|
624 |
|
625 # |
|
626 # Windows tests |
|
627 # |
|
628 if mswindows: |
|
629 def test_startupinfo(self): |
|
630 # startupinfo argument |
|
631 # We uses hardcoded constants, because we do not want to |
|
632 # depend on win32all. |
|
633 STARTF_USESHOWWINDOW = 1 |
|
634 SW_MAXIMIZE = 3 |
|
635 startupinfo = subprocess.STARTUPINFO() |
|
636 startupinfo.dwFlags = STARTF_USESHOWWINDOW |
|
637 startupinfo.wShowWindow = SW_MAXIMIZE |
|
638 # Since Python is a console process, it won't be affected |
|
639 # by wShowWindow, but the argument should be silently |
|
640 # ignored |
|
641 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], |
|
642 startupinfo=startupinfo) |
|
643 |
|
644 def test_creationflags(self): |
|
645 # creationflags argument |
|
646 CREATE_NEW_CONSOLE = 16 |
|
647 sys.stderr.write(" a DOS box should flash briefly ...\n") |
|
648 subprocess.call(sys.executable + |
|
649 ' -c "import time; time.sleep(0.25)"', |
|
650 creationflags=CREATE_NEW_CONSOLE) |
|
651 |
|
652 def test_invalid_args(self): |
|
653 # invalid arguments should raise ValueError |
|
654 self.assertRaises(ValueError, subprocess.call, |
|
655 [sys.executable, |
|
656 "-c", "import sys; sys.exit(47)"], |
|
657 preexec_fn=lambda: 1) |
|
658 self.assertRaises(ValueError, subprocess.call, |
|
659 [sys.executable, |
|
660 "-c", "import sys; sys.exit(47)"], |
|
661 stdout=subprocess.PIPE, |
|
662 close_fds=True) |
|
663 |
|
664 def test_close_fds(self): |
|
665 # close file descriptors |
|
666 rc = subprocess.call([sys.executable, "-c", |
|
667 "import sys; sys.exit(47)"], |
|
668 close_fds=True) |
|
669 self.assertEqual(rc, 47) |
|
670 |
|
671 def test_shell_sequence(self): |
|
672 # Run command through the shell (sequence) |
|
673 newenv = os.environ.copy() |
|
674 newenv["FRUIT"] = "physalis" |
|
675 p = subprocess.Popen(["set"], shell=1, |
|
676 stdout=subprocess.PIPE, |
|
677 env=newenv) |
|
678 self.assertNotEqual(p.stdout.read().find("physalis"), -1) |
|
679 |
|
680 def test_shell_string(self): |
|
681 # Run command through the shell (string) |
|
682 newenv = os.environ.copy() |
|
683 newenv["FRUIT"] = "physalis" |
|
684 p = subprocess.Popen("set", shell=1, |
|
685 stdout=subprocess.PIPE, |
|
686 env=newenv) |
|
687 self.assertNotEqual(p.stdout.read().find("physalis"), -1) |
|
688 |
|
689 def test_call_string(self): |
|
690 # call() function with string argument on Windows |
|
691 rc = subprocess.call(sys.executable + |
|
692 ' -c "import sys; sys.exit(47)"') |
|
693 self.assertEqual(rc, 47) |
|
694 |
|
695 def DISABLED_test_send_signal(self): |
|
696 p = subprocess.Popen([sys.executable, |
|
697 "-c", "input()"]) |
|
698 |
|
699 self.assert_(p.poll() is None, p.poll()) |
|
700 p.send_signal(signal.SIGTERM) |
|
701 self.assertNotEqual(p.wait(), 0) |
|
702 |
|
703 def DISABLED_test_kill(self): |
|
704 p = subprocess.Popen([sys.executable, |
|
705 "-c", "input()"]) |
|
706 |
|
707 self.assert_(p.poll() is None, p.poll()) |
|
708 p.kill() |
|
709 self.assertNotEqual(p.wait(), 0) |
|
710 |
|
711 def DISABLED_test_terminate(self): |
|
712 p = subprocess.Popen([sys.executable, |
|
713 "-c", "input()"]) |
|
714 |
|
715 self.assert_(p.poll() is None, p.poll()) |
|
716 p.terminate() |
|
717 self.assertNotEqual(p.wait(), 0) |
|
718 |
|
719 def test_main(): |
|
720 test_support.run_unittest(ProcessTestCase) |
|
721 if hasattr(test_support, "reap_children"): |
|
722 test_support.reap_children() |
|
723 |
|
724 if __name__ == "__main__": |
|
725 test_main() |