|
1 import unittest |
|
2 from test import test_support |
|
3 import subprocess |
|
4 import sys |
|
5 import signal |
|
6 import os |
|
7 import tempfile |
|
8 import time |
|
9 import re |
|
10 |
|
11 mswindows = (sys.platform == "win32") |
|
12 |
|
13 # |
|
14 # Depends on the following external programs: Python |
|
15 # |
|
16 |
|
17 if mswindows: |
|
18 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' |
|
19 'os.O_BINARY);') |
|
20 else: |
|
21 SETBINARY = '' |
|
22 |
|
23 # In a debug build, stuff like "[6580 refs]" is printed to stderr at |
|
24 # shutdown time. That frustrates tests trying to check stderr produced |
|
25 # from a spawned Python process. |
|
26 def remove_stderr_debug_decorations(stderr): |
|
27 return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) |
|
28 |
|
29 class ProcessTestCase(unittest.TestCase): |
|
30 def setUp(self): |
|
31 # Try to minimize the number of children we have so this test |
|
32 # doesn't crash on some buildbots (Alphas in particular). |
|
33 if hasattr(test_support, "reap_children"): |
|
34 test_support.reap_children() |
|
35 |
|
36 def tearDown(self): |
|
37 # Try to minimize the number of children we have so this test |
|
38 # doesn't crash on some buildbots (Alphas in particular). |
|
39 if hasattr(test_support, "reap_children"): |
|
40 test_support.reap_children() |
|
41 |
|
42 def mkstemp(self): |
|
43 """wrapper for mkstemp, calling mktemp if mkstemp is not available""" |
|
44 if hasattr(tempfile, "mkstemp"): |
|
45 return tempfile.mkstemp() |
|
46 else: |
|
47 fname = tempfile.mktemp() |
|
48 return os.open(fname, os.O_RDWR|os.O_CREAT), fname |
|
49 |
|
50 # |
|
51 # Generic tests |
|
52 # |
|
53 def test_call_seq(self): |
|
54 # call() function with sequence argument |
|
55 rc = subprocess.call([sys.executable, "-c", |
|
56 "import sys; sys.exit(47)"]) |
|
57 self.assertEqual(rc, 47) |
|
58 |
|
59 def test_check_call_zero(self): |
|
60 # check_call() function with zero return code |
|
61 rc = subprocess.check_call([sys.executable, "-c", |
|
62 "import sys; sys.exit(0)"]) |
|
63 self.assertEqual(rc, 0) |
|
64 |
|
65 def test_check_call_nonzero(self): |
|
66 # check_call() function with non-zero return code |
|
67 try: |
|
68 subprocess.check_call([sys.executable, "-c", |
|
69 "import sys; sys.exit(47)"]) |
|
70 except subprocess.CalledProcessError, e: |
|
71 self.assertEqual(e.returncode, 47) |
|
72 else: |
|
73 self.fail("Expected CalledProcessError") |
|
74 |
|
75 def test_call_kwargs(self): |
|
76 # call() function with keyword args |
|
77 newenv = os.environ.copy() |
|
78 newenv["FRUIT"] = "banana" |
|
79 rc = subprocess.call([sys.executable, "-c", |
|
80 'import sys, os;' \ |
|
81 'sys.exit(os.getenv("FRUIT")=="banana")'], |
|
82 env=newenv) |
|
83 self.assertEqual(rc, 1) |
|
84 |
|
85 def test_stdin_none(self): |
|
86 # .stdin is None when not redirected |
|
87 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], |
|
88 stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
89 p.wait() |
|
90 self.assertEqual(p.stdin, None) |
|
91 |
|
92 def test_stdout_none(self): |
|
93 # .stdout is None when not redirected |
|
94 p = subprocess.Popen([sys.executable, "-c", |
|
95 'print " this bit of output is from a ' |
|
96 'test of stdout in a different ' |
|
97 'process ..."'], |
|
98 stdin=subprocess.PIPE, stderr=subprocess.PIPE) |
|
99 p.wait() |
|
100 self.assertEqual(p.stdout, None) |
|
101 |
|
102 def test_stderr_none(self): |
|
103 # .stderr is None when not redirected |
|
104 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], |
|
105 stdin=subprocess.PIPE, stdout=subprocess.PIPE) |
|
106 p.wait() |
|
107 self.assertEqual(p.stderr, None) |
|
108 |
|
109 def test_executable(self): |
|
110 p = subprocess.Popen(["somethingyoudonthave", |
|
111 "-c", "import sys; sys.exit(47)"], |
|
112 executable=sys.executable) |
|
113 p.wait() |
|
114 self.assertEqual(p.returncode, 47) |
|
115 |
|
116 def test_stdin_pipe(self): |
|
117 # stdin redirection |
|
118 p = subprocess.Popen([sys.executable, "-c", |
|
119 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
|
120 stdin=subprocess.PIPE) |
|
121 p.stdin.write("pear") |
|
122 p.stdin.close() |
|
123 p.wait() |
|
124 self.assertEqual(p.returncode, 1) |
|
125 |
|
126 def test_stdin_filedes(self): |
|
127 # stdin is set to open file descriptor |
|
128 tf = tempfile.TemporaryFile() |
|
129 d = tf.fileno() |
|
130 os.write(d, "pear") |
|
131 os.lseek(d, 0, 0) |
|
132 p = subprocess.Popen([sys.executable, "-c", |
|
133 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
|
134 stdin=d) |
|
135 p.wait() |
|
136 self.assertEqual(p.returncode, 1) |
|
137 |
|
138 def test_stdin_fileobj(self): |
|
139 # stdin is set to open file object |
|
140 tf = tempfile.TemporaryFile() |
|
141 tf.write("pear") |
|
142 tf.seek(0) |
|
143 p = subprocess.Popen([sys.executable, "-c", |
|
144 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
|
145 stdin=tf) |
|
146 p.wait() |
|
147 self.assertEqual(p.returncode, 1) |
|
148 |
|
149 def test_stdout_pipe(self): |
|
150 # stdout redirection |
|
151 p = subprocess.Popen([sys.executable, "-c", |
|
152 'import sys; sys.stdout.write("orange")'], |
|
153 stdout=subprocess.PIPE) |
|
154 self.assertEqual(p.stdout.read(), "orange") |
|
155 |
|
156 def test_stdout_filedes(self): |
|
157 # stdout is set to open file descriptor |
|
158 tf = tempfile.TemporaryFile() |
|
159 d = tf.fileno() |
|
160 p = subprocess.Popen([sys.executable, "-c", |
|
161 'import sys; sys.stdout.write("orange")'], |
|
162 stdout=d) |
|
163 p.wait() |
|
164 os.lseek(d, 0, 0) |
|
165 self.assertEqual(os.read(d, 1024), "orange") |
|
166 |
|
167 def test_stdout_fileobj(self): |
|
168 # stdout is set to open file object |
|
169 tf = tempfile.TemporaryFile() |
|
170 p = subprocess.Popen([sys.executable, "-c", |
|
171 'import sys; sys.stdout.write("orange")'], |
|
172 stdout=tf) |
|
173 p.wait() |
|
174 tf.seek(0) |
|
175 self.assertEqual(tf.read(), "orange") |
|
176 |
|
177 def test_stderr_pipe(self): |
|
178 # stderr redirection |
|
179 p = subprocess.Popen([sys.executable, "-c", |
|
180 'import sys; sys.stderr.write("strawberry")'], |
|
181 stderr=subprocess.PIPE) |
|
182 self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()), |
|
183 "strawberry") |
|
184 |
|
185 def test_stderr_filedes(self): |
|
186 # stderr is set to open file descriptor |
|
187 tf = tempfile.TemporaryFile() |
|
188 d = tf.fileno() |
|
189 p = subprocess.Popen([sys.executable, "-c", |
|
190 'import sys; sys.stderr.write("strawberry")'], |
|
191 stderr=d) |
|
192 p.wait() |
|
193 os.lseek(d, 0, 0) |
|
194 self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)), |
|
195 "strawberry") |
|
196 |
|
197 def test_stderr_fileobj(self): |
|
198 # stderr is set to open file object |
|
199 tf = tempfile.TemporaryFile() |
|
200 p = subprocess.Popen([sys.executable, "-c", |
|
201 'import sys; sys.stderr.write("strawberry")'], |
|
202 stderr=tf) |
|
203 p.wait() |
|
204 tf.seek(0) |
|
205 self.assertEqual(remove_stderr_debug_decorations(tf.read()), |
|
206 "strawberry") |
|
207 |
|
208 def test_stdout_stderr_pipe(self): |
|
209 # capture stdout and stderr to the same pipe |
|
210 p = subprocess.Popen([sys.executable, "-c", |
|
211 'import sys;' \ |
|
212 'sys.stdout.write("apple");' \ |
|
213 'sys.stdout.flush();' \ |
|
214 'sys.stderr.write("orange")'], |
|
215 stdout=subprocess.PIPE, |
|
216 stderr=subprocess.STDOUT) |
|
217 output = p.stdout.read() |
|
218 stripped = remove_stderr_debug_decorations(output) |
|
219 self.assertEqual(stripped, "appleorange") |
|
220 |
|
221 def test_stdout_stderr_file(self): |
|
222 # capture stdout and stderr to the same open file |
|
223 tf = tempfile.TemporaryFile() |
|
224 p = subprocess.Popen([sys.executable, "-c", |
|
225 'import sys;' \ |
|
226 'sys.stdout.write("apple");' \ |
|
227 'sys.stdout.flush();' \ |
|
228 'sys.stderr.write("orange")'], |
|
229 stdout=tf, |
|
230 stderr=tf) |
|
231 p.wait() |
|
232 tf.seek(0) |
|
233 output = tf.read() |
|
234 stripped = remove_stderr_debug_decorations(output) |
|
235 self.assertEqual(stripped, "appleorange") |
|
236 |
|
237 def test_stdout_filedes_of_stdout(self): |
|
238 # stdout is set to 1 (#1531862). |
|
239 cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))" |
|
240 rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) |
|
241 self.assertEquals(rc, 2) |
|
242 |
|
243 def test_cwd(self): |
|
244 tmpdir = os.getenv("TEMP", "/tmp") |
|
245 # We cannot use os.path.realpath to canonicalize the path, |
|
246 # since it doesn't expand Tru64 {memb} strings. See bug 1063571. |
|
247 cwd = os.getcwd() |
|
248 os.chdir(tmpdir) |
|
249 tmpdir = os.getcwd() |
|
250 os.chdir(cwd) |
|
251 p = subprocess.Popen([sys.executable, "-c", |
|
252 'import sys,os;' \ |
|
253 'sys.stdout.write(os.getcwd())'], |
|
254 stdout=subprocess.PIPE, |
|
255 cwd=tmpdir) |
|
256 normcase = os.path.normcase |
|
257 self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) |
|
258 |
|
259 def test_env(self): |
|
260 newenv = os.environ.copy() |
|
261 newenv["FRUIT"] = "orange" |
|
262 p = subprocess.Popen([sys.executable, "-c", |
|
263 'import sys,os;' \ |
|
264 'sys.stdout.write(os.getenv("FRUIT"))'], |
|
265 stdout=subprocess.PIPE, |
|
266 env=newenv) |
|
267 self.assertEqual(p.stdout.read(), "orange") |
|
268 |
|
269 def test_communicate_stdin(self): |
|
270 p = subprocess.Popen([sys.executable, "-c", |
|
271 'import sys; sys.exit(sys.stdin.read() == "pear")'], |
|
272 stdin=subprocess.PIPE) |
|
273 p.communicate("pear") |
|
274 self.assertEqual(p.returncode, 1) |
|
275 |
|
276 def test_communicate_stdout(self): |
|
277 p = subprocess.Popen([sys.executable, "-c", |
|
278 'import sys; sys.stdout.write("pineapple")'], |
|
279 stdout=subprocess.PIPE) |
|
280 (stdout, stderr) = p.communicate() |
|
281 self.assertEqual(stdout, "pineapple") |
|
282 self.assertEqual(stderr, None) |
|
283 |
|
284 def test_communicate_stderr(self): |
|
285 p = subprocess.Popen([sys.executable, "-c", |
|
286 'import sys; sys.stderr.write("pineapple")'], |
|
287 stderr=subprocess.PIPE) |
|
288 (stdout, stderr) = p.communicate() |
|
289 self.assertEqual(stdout, None) |
|
290 # When running with a pydebug build, the # of references is outputted |
|
291 # to stderr, so just check if stderr at least started with "pinapple" |
|
292 self.assert_(stderr.startswith("pineapple")) |
|
293 |
|
294 def test_communicate(self): |
|
295 p = subprocess.Popen([sys.executable, "-c", |
|
296 'import sys,os;' \ |
|
297 'sys.stderr.write("pineapple");' \ |
|
298 'sys.stdout.write(sys.stdin.read())'], |
|
299 stdin=subprocess.PIPE, |
|
300 stdout=subprocess.PIPE, |
|
301 stderr=subprocess.PIPE) |
|
302 (stdout, stderr) = p.communicate("banana") |
|
303 self.assertEqual(stdout, "banana") |
|
304 self.assertEqual(remove_stderr_debug_decorations(stderr), |
|
305 "pineapple") |
|
306 |
|
307 def test_communicate_returns(self): |
|
308 # communicate() should return None if no redirection is active |
|
309 p = subprocess.Popen([sys.executable, "-c", |
|
310 "import sys; sys.exit(47)"]) |
|
311 (stdout, stderr) = p.communicate() |
|
312 self.assertEqual(stdout, None) |
|
313 self.assertEqual(stderr, None) |
|
314 |
|
315 def test_communicate_pipe_buf(self): |
|
316 # communicate() with writes larger than pipe_buf |
|
317 # This test will probably deadlock rather than fail, if |
|
318 # communicate() does not work properly. |
|
319 x, y = os.pipe() |
|
320 if mswindows: |
|
321 pipe_buf = 512 |
|
322 else: |
|
323 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") |
|
324 os.close(x) |
|
325 os.close(y) |
|
326 p = subprocess.Popen([sys.executable, "-c", |
|
327 'import sys,os;' |
|
328 'sys.stdout.write(sys.stdin.read(47));' \ |
|
329 'sys.stderr.write("xyz"*%d);' \ |
|
330 'sys.stdout.write(sys.stdin.read())' % pipe_buf], |
|
331 stdin=subprocess.PIPE, |
|
332 stdout=subprocess.PIPE, |
|
333 stderr=subprocess.PIPE) |
|
334 string_to_write = "abc"*pipe_buf |
|
335 (stdout, stderr) = p.communicate(string_to_write) |
|
336 self.assertEqual(stdout, string_to_write) |
|
337 |
|
338 def test_writes_before_communicate(self): |
|
339 # stdin.write before communicate() |
|
340 p = subprocess.Popen([sys.executable, "-c", |
|
341 'import sys,os;' \ |
|
342 'sys.stdout.write(sys.stdin.read())'], |
|
343 stdin=subprocess.PIPE, |
|
344 stdout=subprocess.PIPE, |
|
345 stderr=subprocess.PIPE) |
|
346 p.stdin.write("banana") |
|
347 (stdout, stderr) = p.communicate("split") |
|
348 self.assertEqual(stdout, "bananasplit") |
|
349 self.assertEqual(remove_stderr_debug_decorations(stderr), "") |
|
350 |
|
351 def test_universal_newlines(self): |
|
352 p = subprocess.Popen([sys.executable, "-c", |
|
353 'import sys,os;' + SETBINARY + |
|
354 'sys.stdout.write("line1\\n");' |
|
355 'sys.stdout.flush();' |
|
356 'sys.stdout.write("line2\\r");' |
|
357 'sys.stdout.flush();' |
|
358 'sys.stdout.write("line3\\r\\n");' |
|
359 'sys.stdout.flush();' |
|
360 'sys.stdout.write("line4\\r");' |
|
361 'sys.stdout.flush();' |
|
362 'sys.stdout.write("\\nline5");' |
|
363 'sys.stdout.flush();' |
|
364 'sys.stdout.write("\\nline6");'], |
|
365 stdout=subprocess.PIPE, |
|
366 universal_newlines=1) |
|
367 stdout = p.stdout.read() |
|
368 if hasattr(file, 'newlines'): |
|
369 # Interpreter with universal newline support |
|
370 self.assertEqual(stdout, |
|
371 "line1\nline2\nline3\nline4\nline5\nline6") |
|
372 else: |
|
373 # Interpreter without universal newline support |
|
374 self.assertEqual(stdout, |
|
375 "line1\nline2\rline3\r\nline4\r\nline5\nline6") |
|
376 |
|
377 def test_universal_newlines_communicate(self): |
|
378 # universal newlines through communicate() |
|
379 p = subprocess.Popen([sys.executable, "-c", |
|
380 'import sys,os;' + SETBINARY + |
|
381 'sys.stdout.write("line1\\n");' |
|
382 'sys.stdout.flush();' |
|
383 'sys.stdout.write("line2\\r");' |
|
384 'sys.stdout.flush();' |
|
385 'sys.stdout.write("line3\\r\\n");' |
|
386 'sys.stdout.flush();' |
|
387 'sys.stdout.write("line4\\r");' |
|
388 'sys.stdout.flush();' |
|
389 'sys.stdout.write("\\nline5");' |
|
390 'sys.stdout.flush();' |
|
391 'sys.stdout.write("\\nline6");'], |
|
392 stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
|
393 universal_newlines=1) |
|
394 (stdout, stderr) = p.communicate() |
|
395 if hasattr(file, 'newlines'): |
|
396 # Interpreter with universal newline support |
|
397 self.assertEqual(stdout, |
|
398 "line1\nline2\nline3\nline4\nline5\nline6") |
|
399 else: |
|
400 # Interpreter without universal newline support |
|
401 self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6") |
|
402 |
|
403 def test_no_leaking(self): |
|
404 # Make sure we leak no resources |
|
405 if not hasattr(test_support, "is_resource_enabled") \ |
|
406 or test_support.is_resource_enabled("subprocess") and not mswindows: |
|
407 max_handles = 1026 # too much for most UNIX systems |
|
408 else: |
|
409 max_handles = 65 |
|
410 for i in range(max_handles): |
|
411 p = subprocess.Popen([sys.executable, "-c", |
|
412 "import sys;sys.stdout.write(sys.stdin.read())"], |
|
413 stdin=subprocess.PIPE, |
|
414 stdout=subprocess.PIPE, |
|
415 stderr=subprocess.PIPE) |
|
416 data = p.communicate("lime")[0] |
|
417 self.assertEqual(data, "lime") |
|
418 |
|
419 |
|
420 def test_list2cmdline(self): |
|
421 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), |
|
422 '"a b c" d e') |
|
423 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), |
|
424 'ab\\"c \\ d') |
|
425 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), |
|
426 'a\\\\\\b "de fg" h') |
|
427 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), |
|
428 'a\\\\\\"b c d') |
|
429 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), |
|
430 '"a\\\\b c" d e') |
|
431 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), |
|
432 '"a\\\\b\\ c" d e') |
|
433 self.assertEqual(subprocess.list2cmdline(['ab', '']), |
|
434 'ab ""') |
|
435 |
|
436 |
|
437 def test_poll(self): |
|
438 p = subprocess.Popen([sys.executable, |
|
439 "-c", "import time; time.sleep(1)"]) |
|
440 count = 0 |
|
441 while p.poll() is None: |
|
442 time.sleep(0.1) |
|
443 count += 1 |
|
444 # We expect that the poll loop probably went around about 10 times, |
|
445 # but, based on system scheduling we can't control, it's possible |
|
446 # poll() never returned None. It "should be" very rare that it |
|
447 # didn't go around at least twice. |
|
448 self.assert_(count >= 2) |
|
449 # Subsequent invocations should just return the returncode |
|
450 self.assertEqual(p.poll(), 0) |
|
451 |
|
452 |
|
453 def test_wait(self): |
|
454 p = subprocess.Popen([sys.executable, |
|
455 "-c", "import time; time.sleep(2)"]) |
|
456 self.assertEqual(p.wait(), 0) |
|
457 # Subsequent invocations should just return the returncode |
|
458 self.assertEqual(p.wait(), 0) |
|
459 |
|
460 |
|
461 def test_invalid_bufsize(self): |
|
462 # an invalid type of the bufsize argument should raise |
|
463 # TypeError. |
|
464 try: |
|
465 subprocess.Popen([sys.executable, "-c", "pass"], "orange") |
|
466 except TypeError: |
|
467 pass |
|
468 else: |
|
469 self.fail("Expected TypeError") |
|
470 |
|
471 # |
|
472 # POSIX tests |
|
473 # |
|
474 if not mswindows: |
|
475 def test_exceptions(self): |
|
476 # catched & re-raised exceptions |
|
477 try: |
|
478 p = subprocess.Popen([sys.executable, "-c", ""], |
|
479 cwd="/this/path/does/not/exist") |
|
480 except OSError, e: |
|
481 # The attribute child_traceback should contain "os.chdir" |
|
482 # somewhere. |
|
483 self.assertNotEqual(e.child_traceback.find("os.chdir"), -1) |
|
484 else: |
|
485 self.fail("Expected OSError") |
|
486 |
|
487 def _suppress_core_files(self): |
|
488 """Try to prevent core files from being created. |
|
489 Returns previous ulimit if successful, else None. |
|
490 """ |
|
491 try: |
|
492 import resource |
|
493 old_limit = resource.getrlimit(resource.RLIMIT_CORE) |
|
494 resource.setrlimit(resource.RLIMIT_CORE, (0,0)) |
|
495 return old_limit |
|
496 except (ImportError, ValueError, resource.error): |
|
497 return None |
|
498 |
|
499 def _unsuppress_core_files(self, old_limit): |
|
500 """Return core file behavior to default.""" |
|
501 if old_limit is None: |
|
502 return |
|
503 try: |
|
504 import resource |
|
505 resource.setrlimit(resource.RLIMIT_CORE, old_limit) |
|
506 except (ImportError, ValueError, resource.error): |
|
507 return |
|
508 |
|
509 def test_run_abort(self): |
|
510 # returncode handles signal termination |
|
511 old_limit = self._suppress_core_files() |
|
512 try: |
|
513 p = subprocess.Popen([sys.executable, |
|
514 "-c", "import os; os.abort()"]) |
|
515 finally: |
|
516 self._unsuppress_core_files(old_limit) |
|
517 p.wait() |
|
518 self.assertEqual(-p.returncode, signal.SIGABRT) |
|
519 |
|
520 def test_preexec(self): |
|
521 # preexec function |
|
522 p = subprocess.Popen([sys.executable, "-c", |
|
523 'import sys,os;' \ |
|
524 'sys.stdout.write(os.getenv("FRUIT"))'], |
|
525 stdout=subprocess.PIPE, |
|
526 preexec_fn=lambda: os.putenv("FRUIT", "apple")) |
|
527 self.assertEqual(p.stdout.read(), "apple") |
|
528 |
|
529 def test_args_string(self): |
|
530 # args is a string |
|
531 f, fname = self.mkstemp() |
|
532 os.write(f, "#!/bin/sh\n") |
|
533 os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % |
|
534 sys.executable) |
|
535 os.close(f) |
|
536 os.chmod(fname, 0700) |
|
537 p = subprocess.Popen(fname) |
|
538 p.wait() |
|
539 os.remove(fname) |
|
540 self.assertEqual(p.returncode, 47) |
|
541 |
|
542 def test_invalid_args(self): |
|
543 # invalid arguments should raise ValueError |
|
544 self.assertRaises(ValueError, subprocess.call, |
|
545 [sys.executable, |
|
546 "-c", "import sys; sys.exit(47)"], |
|
547 startupinfo=47) |
|
548 self.assertRaises(ValueError, subprocess.call, |
|
549 [sys.executable, |
|
550 "-c", "import sys; sys.exit(47)"], |
|
551 creationflags=47) |
|
552 |
|
553 def test_shell_sequence(self): |
|
554 # Run command through the shell (sequence) |
|
555 newenv = os.environ.copy() |
|
556 newenv["FRUIT"] = "apple" |
|
557 p = subprocess.Popen(["echo $FRUIT"], shell=1, |
|
558 stdout=subprocess.PIPE, |
|
559 env=newenv) |
|
560 self.assertEqual(p.stdout.read().strip(), "apple") |
|
561 |
|
562 def test_shell_string(self): |
|
563 # Run command through the shell (string) |
|
564 newenv = os.environ.copy() |
|
565 newenv["FRUIT"] = "apple" |
|
566 p = subprocess.Popen("echo $FRUIT", shell=1, |
|
567 stdout=subprocess.PIPE, |
|
568 env=newenv) |
|
569 self.assertEqual(p.stdout.read().strip(), "apple") |
|
570 |
|
571 def test_call_string(self): |
|
572 # call() function with string argument on UNIX |
|
573 f, fname = self.mkstemp() |
|
574 os.write(f, "#!/bin/sh\n") |
|
575 os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % |
|
576 sys.executable) |
|
577 os.close(f) |
|
578 os.chmod(fname, 0700) |
|
579 rc = subprocess.call(fname) |
|
580 os.remove(fname) |
|
581 self.assertEqual(rc, 47) |
|
582 |
|
583 |
|
584 # |
|
585 # Windows tests |
|
586 # |
|
587 if mswindows: |
|
588 def test_startupinfo(self): |
|
589 # startupinfo argument |
|
590 # We uses hardcoded constants, because we do not want to |
|
591 # depend on win32all. |
|
592 STARTF_USESHOWWINDOW = 1 |
|
593 SW_MAXIMIZE = 3 |
|
594 startupinfo = subprocess.STARTUPINFO() |
|
595 startupinfo.dwFlags = STARTF_USESHOWWINDOW |
|
596 startupinfo.wShowWindow = SW_MAXIMIZE |
|
597 # Since Python is a console process, it won't be affected |
|
598 # by wShowWindow, but the argument should be silently |
|
599 # ignored |
|
600 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], |
|
601 startupinfo=startupinfo) |
|
602 |
|
603 def test_creationflags(self): |
|
604 # creationflags argument |
|
605 CREATE_NEW_CONSOLE = 16 |
|
606 sys.stderr.write(" a DOS box should flash briefly ...\n") |
|
607 subprocess.call(sys.executable + |
|
608 ' -c "import time; time.sleep(0.25)"', |
|
609 creationflags=CREATE_NEW_CONSOLE) |
|
610 |
|
611 def test_invalid_args(self): |
|
612 # invalid arguments should raise ValueError |
|
613 self.assertRaises(ValueError, subprocess.call, |
|
614 [sys.executable, |
|
615 "-c", "import sys; sys.exit(47)"], |
|
616 preexec_fn=lambda: 1) |
|
617 self.assertRaises(ValueError, subprocess.call, |
|
618 [sys.executable, |
|
619 "-c", "import sys; sys.exit(47)"], |
|
620 close_fds=True) |
|
621 |
|
622 def test_shell_sequence(self): |
|
623 # Run command through the shell (sequence) |
|
624 newenv = os.environ.copy() |
|
625 newenv["FRUIT"] = "physalis" |
|
626 p = subprocess.Popen(["set"], shell=1, |
|
627 stdout=subprocess.PIPE, |
|
628 env=newenv) |
|
629 self.assertNotEqual(p.stdout.read().find("physalis"), -1) |
|
630 |
|
631 def test_shell_string(self): |
|
632 # Run command through the shell (string) |
|
633 newenv = os.environ.copy() |
|
634 newenv["FRUIT"] = "physalis" |
|
635 p = subprocess.Popen("set", shell=1, |
|
636 stdout=subprocess.PIPE, |
|
637 env=newenv) |
|
638 self.assertNotEqual(p.stdout.read().find("physalis"), -1) |
|
639 |
|
640 def test_call_string(self): |
|
641 # call() function with string argument on Windows |
|
642 rc = subprocess.call(sys.executable + |
|
643 ' -c "import sys; sys.exit(47)"') |
|
644 self.assertEqual(rc, 47) |
|
645 |
|
646 |
|
647 def test_main(): |
|
648 test_support.run_unittest(ProcessTestCase) |
|
649 if hasattr(test_support, "reap_children"): |
|
650 test_support.reap_children() |
|
651 |
|
652 if __name__ == "__main__": |
|
653 test_main() |