python-2.5.2/win32/Lib/test/test_subprocess.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 import unittest
       
     2 from test import test_support
       
     3 import subprocess
       
     4 import sys
       
     5 import signal
       
     6 import os
       
     7 import tempfile
       
     8 import time
       
     9 import re
       
    10 
       
    11 mswindows = (sys.platform == "win32")
       
    12 
       
    13 #
       
    14 # Depends on the following external programs: Python
       
    15 #
       
    16 
       
    17 if mswindows:
       
    18     SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
       
    19                                                 'os.O_BINARY);')
       
    20 else:
       
    21     SETBINARY = ''
       
    22 
       
    23 # In a debug build, stuff like "[6580 refs]" is printed to stderr at
       
    24 # shutdown time.  That frustrates tests trying to check stderr produced
       
    25 # from a spawned Python process.
       
    26 def remove_stderr_debug_decorations(stderr):
       
    27     return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
       
    28 
       
    29 class ProcessTestCase(unittest.TestCase):
       
    30     def setUp(self):
       
    31         # Try to minimize the number of children we have so this test
       
    32         # doesn't crash on some buildbots (Alphas in particular).
       
    33         if hasattr(test_support, "reap_children"):
       
    34             test_support.reap_children()
       
    35 
       
    36     def tearDown(self):
       
    37         # Try to minimize the number of children we have so this test
       
    38         # doesn't crash on some buildbots (Alphas in particular).
       
    39         if hasattr(test_support, "reap_children"):
       
    40             test_support.reap_children()
       
    41 
       
    42     def mkstemp(self):
       
    43         """wrapper for mkstemp, calling mktemp if mkstemp is not available"""
       
    44         if hasattr(tempfile, "mkstemp"):
       
    45             return tempfile.mkstemp()
       
    46         else:
       
    47             fname = tempfile.mktemp()
       
    48             return os.open(fname, os.O_RDWR|os.O_CREAT), fname
       
    49 
       
    50     #
       
    51     # Generic tests
       
    52     #
       
    53     def test_call_seq(self):
       
    54         # call() function with sequence argument
       
    55         rc = subprocess.call([sys.executable, "-c",
       
    56                               "import sys; sys.exit(47)"])
       
    57         self.assertEqual(rc, 47)
       
    58 
       
    59     def test_check_call_zero(self):
       
    60         # check_call() function with zero return code
       
    61         rc = subprocess.check_call([sys.executable, "-c",
       
    62                                     "import sys; sys.exit(0)"])
       
    63         self.assertEqual(rc, 0)
       
    64 
       
    65     def test_check_call_nonzero(self):
       
    66         # check_call() function with non-zero return code
       
    67         try:
       
    68             subprocess.check_call([sys.executable, "-c",
       
    69                                    "import sys; sys.exit(47)"])
       
    70         except subprocess.CalledProcessError, e:
       
    71             self.assertEqual(e.returncode, 47)
       
    72         else:
       
    73             self.fail("Expected CalledProcessError")
       
    74 
       
    75     def test_call_kwargs(self):
       
    76         # call() function with keyword args
       
    77         newenv = os.environ.copy()
       
    78         newenv["FRUIT"] = "banana"
       
    79         rc = subprocess.call([sys.executable, "-c",
       
    80                           'import sys, os;' \
       
    81                           'sys.exit(os.getenv("FRUIT")=="banana")'],
       
    82                         env=newenv)
       
    83         self.assertEqual(rc, 1)
       
    84 
       
    85     def test_stdin_none(self):
       
    86         # .stdin is None when not redirected
       
    87         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
       
    88                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
       
    89         p.wait()
       
    90         self.assertEqual(p.stdin, None)
       
    91 
       
    92     def test_stdout_none(self):
       
    93         # .stdout is None when not redirected
       
    94         p = subprocess.Popen([sys.executable, "-c",
       
    95                              'print "    this bit of output is from a '
       
    96                              'test of stdout in a different '
       
    97                              'process ..."'],
       
    98                              stdin=subprocess.PIPE, stderr=subprocess.PIPE)
       
    99         p.wait()
       
   100         self.assertEqual(p.stdout, None)
       
   101 
       
   102     def test_stderr_none(self):
       
   103         # .stderr is None when not redirected
       
   104         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
       
   105                          stdin=subprocess.PIPE, stdout=subprocess.PIPE)
       
   106         p.wait()
       
   107         self.assertEqual(p.stderr, None)
       
   108 
       
   109     def test_executable(self):
       
   110         p = subprocess.Popen(["somethingyoudonthave",
       
   111                               "-c", "import sys; sys.exit(47)"],
       
   112                              executable=sys.executable)
       
   113         p.wait()
       
   114         self.assertEqual(p.returncode, 47)
       
   115 
       
   116     def test_stdin_pipe(self):
       
   117         # stdin redirection
       
   118         p = subprocess.Popen([sys.executable, "-c",
       
   119                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
       
   120                         stdin=subprocess.PIPE)
       
   121         p.stdin.write("pear")
       
   122         p.stdin.close()
       
   123         p.wait()
       
   124         self.assertEqual(p.returncode, 1)
       
   125 
       
   126     def test_stdin_filedes(self):
       
   127         # stdin is set to open file descriptor
       
   128         tf = tempfile.TemporaryFile()
       
   129         d = tf.fileno()
       
   130         os.write(d, "pear")
       
   131         os.lseek(d, 0, 0)
       
   132         p = subprocess.Popen([sys.executable, "-c",
       
   133                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
       
   134                          stdin=d)
       
   135         p.wait()
       
   136         self.assertEqual(p.returncode, 1)
       
   137 
       
   138     def test_stdin_fileobj(self):
       
   139         # stdin is set to open file object
       
   140         tf = tempfile.TemporaryFile()
       
   141         tf.write("pear")
       
   142         tf.seek(0)
       
   143         p = subprocess.Popen([sys.executable, "-c",
       
   144                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
       
   145                          stdin=tf)
       
   146         p.wait()
       
   147         self.assertEqual(p.returncode, 1)
       
   148 
       
   149     def test_stdout_pipe(self):
       
   150         # stdout redirection
       
   151         p = subprocess.Popen([sys.executable, "-c",
       
   152                           'import sys; sys.stdout.write("orange")'],
       
   153                          stdout=subprocess.PIPE)
       
   154         self.assertEqual(p.stdout.read(), "orange")
       
   155 
       
   156     def test_stdout_filedes(self):
       
   157         # stdout is set to open file descriptor
       
   158         tf = tempfile.TemporaryFile()
       
   159         d = tf.fileno()
       
   160         p = subprocess.Popen([sys.executable, "-c",
       
   161                           'import sys; sys.stdout.write("orange")'],
       
   162                          stdout=d)
       
   163         p.wait()
       
   164         os.lseek(d, 0, 0)
       
   165         self.assertEqual(os.read(d, 1024), "orange")
       
   166 
       
   167     def test_stdout_fileobj(self):
       
   168         # stdout is set to open file object
       
   169         tf = tempfile.TemporaryFile()
       
   170         p = subprocess.Popen([sys.executable, "-c",
       
   171                           'import sys; sys.stdout.write("orange")'],
       
   172                          stdout=tf)
       
   173         p.wait()
       
   174         tf.seek(0)
       
   175         self.assertEqual(tf.read(), "orange")
       
   176 
       
   177     def test_stderr_pipe(self):
       
   178         # stderr redirection
       
   179         p = subprocess.Popen([sys.executable, "-c",
       
   180                           'import sys; sys.stderr.write("strawberry")'],
       
   181                          stderr=subprocess.PIPE)
       
   182         self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()),
       
   183                          "strawberry")
       
   184 
       
   185     def test_stderr_filedes(self):
       
   186         # stderr is set to open file descriptor
       
   187         tf = tempfile.TemporaryFile()
       
   188         d = tf.fileno()
       
   189         p = subprocess.Popen([sys.executable, "-c",
       
   190                           'import sys; sys.stderr.write("strawberry")'],
       
   191                          stderr=d)
       
   192         p.wait()
       
   193         os.lseek(d, 0, 0)
       
   194         self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)),
       
   195                          "strawberry")
       
   196 
       
   197     def test_stderr_fileobj(self):
       
   198         # stderr is set to open file object
       
   199         tf = tempfile.TemporaryFile()
       
   200         p = subprocess.Popen([sys.executable, "-c",
       
   201                           'import sys; sys.stderr.write("strawberry")'],
       
   202                          stderr=tf)
       
   203         p.wait()
       
   204         tf.seek(0)
       
   205         self.assertEqual(remove_stderr_debug_decorations(tf.read()),
       
   206                          "strawberry")
       
   207 
       
   208     def test_stdout_stderr_pipe(self):
       
   209         # capture stdout and stderr to the same pipe
       
   210         p = subprocess.Popen([sys.executable, "-c",
       
   211                           'import sys;' \
       
   212                           'sys.stdout.write("apple");' \
       
   213                           'sys.stdout.flush();' \
       
   214                           'sys.stderr.write("orange")'],
       
   215                          stdout=subprocess.PIPE,
       
   216                          stderr=subprocess.STDOUT)
       
   217         output = p.stdout.read()
       
   218         stripped = remove_stderr_debug_decorations(output)
       
   219         self.assertEqual(stripped, "appleorange")
       
   220 
       
   221     def test_stdout_stderr_file(self):
       
   222         # capture stdout and stderr to the same open file
       
   223         tf = tempfile.TemporaryFile()
       
   224         p = subprocess.Popen([sys.executable, "-c",
       
   225                           'import sys;' \
       
   226                           'sys.stdout.write("apple");' \
       
   227                           'sys.stdout.flush();' \
       
   228                           'sys.stderr.write("orange")'],
       
   229                          stdout=tf,
       
   230                          stderr=tf)
       
   231         p.wait()
       
   232         tf.seek(0)
       
   233         output = tf.read()
       
   234         stripped = remove_stderr_debug_decorations(output)
       
   235         self.assertEqual(stripped, "appleorange")
       
   236 
       
   237     def test_stdout_filedes_of_stdout(self):
       
   238         # stdout is set to 1 (#1531862).
       
   239         cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))"
       
   240         rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
       
   241         self.assertEquals(rc, 2)
       
   242 
       
   243     def test_cwd(self):
       
   244         tmpdir = os.getenv("TEMP", "/tmp")
       
   245         # We cannot use os.path.realpath to canonicalize the path,
       
   246         # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
       
   247         cwd = os.getcwd()
       
   248         os.chdir(tmpdir)
       
   249         tmpdir = os.getcwd()
       
   250         os.chdir(cwd)
       
   251         p = subprocess.Popen([sys.executable, "-c",
       
   252                           'import sys,os;' \
       
   253                           'sys.stdout.write(os.getcwd())'],
       
   254                          stdout=subprocess.PIPE,
       
   255                          cwd=tmpdir)
       
   256         normcase = os.path.normcase
       
   257         self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
       
   258 
       
   259     def test_env(self):
       
   260         newenv = os.environ.copy()
       
   261         newenv["FRUIT"] = "orange"
       
   262         p = subprocess.Popen([sys.executable, "-c",
       
   263                           'import sys,os;' \
       
   264                           'sys.stdout.write(os.getenv("FRUIT"))'],
       
   265                          stdout=subprocess.PIPE,
       
   266                          env=newenv)
       
   267         self.assertEqual(p.stdout.read(), "orange")
       
   268 
       
   269     def test_communicate_stdin(self):
       
   270         p = subprocess.Popen([sys.executable, "-c",
       
   271                               'import sys; sys.exit(sys.stdin.read() == "pear")'],
       
   272                              stdin=subprocess.PIPE)
       
   273         p.communicate("pear")
       
   274         self.assertEqual(p.returncode, 1)
       
   275 
       
   276     def test_communicate_stdout(self):
       
   277         p = subprocess.Popen([sys.executable, "-c",
       
   278                               'import sys; sys.stdout.write("pineapple")'],
       
   279                              stdout=subprocess.PIPE)
       
   280         (stdout, stderr) = p.communicate()
       
   281         self.assertEqual(stdout, "pineapple")
       
   282         self.assertEqual(stderr, None)
       
   283 
       
   284     def test_communicate_stderr(self):
       
   285         p = subprocess.Popen([sys.executable, "-c",
       
   286                               'import sys; sys.stderr.write("pineapple")'],
       
   287                              stderr=subprocess.PIPE)
       
   288         (stdout, stderr) = p.communicate()
       
   289         self.assertEqual(stdout, None)
       
   290         # When running with a pydebug build, the # of references is outputted
       
   291         # to stderr, so just check if stderr at least started with "pinapple"
       
   292         self.assert_(stderr.startswith("pineapple"))
       
   293 
       
   294     def test_communicate(self):
       
   295         p = subprocess.Popen([sys.executable, "-c",
       
   296                           'import sys,os;' \
       
   297                           'sys.stderr.write("pineapple");' \
       
   298                           'sys.stdout.write(sys.stdin.read())'],
       
   299                          stdin=subprocess.PIPE,
       
   300                          stdout=subprocess.PIPE,
       
   301                          stderr=subprocess.PIPE)
       
   302         (stdout, stderr) = p.communicate("banana")
       
   303         self.assertEqual(stdout, "banana")
       
   304         self.assertEqual(remove_stderr_debug_decorations(stderr),
       
   305                          "pineapple")
       
   306 
       
   307     def test_communicate_returns(self):
       
   308         # communicate() should return None if no redirection is active
       
   309         p = subprocess.Popen([sys.executable, "-c",
       
   310                               "import sys; sys.exit(47)"])
       
   311         (stdout, stderr) = p.communicate()
       
   312         self.assertEqual(stdout, None)
       
   313         self.assertEqual(stderr, None)
       
   314 
       
   315     def test_communicate_pipe_buf(self):
       
   316         # communicate() with writes larger than pipe_buf
       
   317         # This test will probably deadlock rather than fail, if
       
   318         # communicate() does not work properly.
       
   319         x, y = os.pipe()
       
   320         if mswindows:
       
   321             pipe_buf = 512
       
   322         else:
       
   323             pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
       
   324         os.close(x)
       
   325         os.close(y)
       
   326         p = subprocess.Popen([sys.executable, "-c",
       
   327                           'import sys,os;'
       
   328                           'sys.stdout.write(sys.stdin.read(47));' \
       
   329                           'sys.stderr.write("xyz"*%d);' \
       
   330                           'sys.stdout.write(sys.stdin.read())' % pipe_buf],
       
   331                          stdin=subprocess.PIPE,
       
   332                          stdout=subprocess.PIPE,
       
   333                          stderr=subprocess.PIPE)
       
   334         string_to_write = "abc"*pipe_buf
       
   335         (stdout, stderr) = p.communicate(string_to_write)
       
   336         self.assertEqual(stdout, string_to_write)
       
   337 
       
   338     def test_writes_before_communicate(self):
       
   339         # stdin.write before communicate()
       
   340         p = subprocess.Popen([sys.executable, "-c",
       
   341                           'import sys,os;' \
       
   342                           'sys.stdout.write(sys.stdin.read())'],
       
   343                          stdin=subprocess.PIPE,
       
   344                          stdout=subprocess.PIPE,
       
   345                          stderr=subprocess.PIPE)
       
   346         p.stdin.write("banana")
       
   347         (stdout, stderr) = p.communicate("split")
       
   348         self.assertEqual(stdout, "bananasplit")
       
   349         self.assertEqual(remove_stderr_debug_decorations(stderr), "")
       
   350 
       
   351     def test_universal_newlines(self):
       
   352         p = subprocess.Popen([sys.executable, "-c",
       
   353                           'import sys,os;' + SETBINARY +
       
   354                           'sys.stdout.write("line1\\n");'
       
   355                           'sys.stdout.flush();'
       
   356                           'sys.stdout.write("line2\\r");'
       
   357                           'sys.stdout.flush();'
       
   358                           'sys.stdout.write("line3\\r\\n");'
       
   359                           'sys.stdout.flush();'
       
   360                           'sys.stdout.write("line4\\r");'
       
   361                           'sys.stdout.flush();'
       
   362                           'sys.stdout.write("\\nline5");'
       
   363                           'sys.stdout.flush();'
       
   364                           'sys.stdout.write("\\nline6");'],
       
   365                          stdout=subprocess.PIPE,
       
   366                          universal_newlines=1)
       
   367         stdout = p.stdout.read()
       
   368         if hasattr(file, 'newlines'):
       
   369             # Interpreter with universal newline support
       
   370             self.assertEqual(stdout,
       
   371                              "line1\nline2\nline3\nline4\nline5\nline6")
       
   372         else:
       
   373             # Interpreter without universal newline support
       
   374             self.assertEqual(stdout,
       
   375                              "line1\nline2\rline3\r\nline4\r\nline5\nline6")
       
   376 
       
   377     def test_universal_newlines_communicate(self):
       
   378         # universal newlines through communicate()
       
   379         p = subprocess.Popen([sys.executable, "-c",
       
   380                           'import sys,os;' + SETBINARY +
       
   381                           'sys.stdout.write("line1\\n");'
       
   382                           'sys.stdout.flush();'
       
   383                           'sys.stdout.write("line2\\r");'
       
   384                           'sys.stdout.flush();'
       
   385                           'sys.stdout.write("line3\\r\\n");'
       
   386                           'sys.stdout.flush();'
       
   387                           'sys.stdout.write("line4\\r");'
       
   388                           'sys.stdout.flush();'
       
   389                           'sys.stdout.write("\\nline5");'
       
   390                           'sys.stdout.flush();'
       
   391                           'sys.stdout.write("\\nline6");'],
       
   392                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
       
   393                          universal_newlines=1)
       
   394         (stdout, stderr) = p.communicate()
       
   395         if hasattr(file, 'newlines'):
       
   396             # Interpreter with universal newline support
       
   397             self.assertEqual(stdout,
       
   398                              "line1\nline2\nline3\nline4\nline5\nline6")
       
   399         else:
       
   400             # Interpreter without universal newline support
       
   401             self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
       
   402 
       
   403     def test_no_leaking(self):
       
   404         # Make sure we leak no resources
       
   405         if not hasattr(test_support, "is_resource_enabled") \
       
   406                or test_support.is_resource_enabled("subprocess") and not mswindows:
       
   407             max_handles = 1026 # too much for most UNIX systems
       
   408         else:
       
   409             max_handles = 65
       
   410         for i in range(max_handles):
       
   411             p = subprocess.Popen([sys.executable, "-c",
       
   412                     "import sys;sys.stdout.write(sys.stdin.read())"],
       
   413                     stdin=subprocess.PIPE,
       
   414                     stdout=subprocess.PIPE,
       
   415                     stderr=subprocess.PIPE)
       
   416             data = p.communicate("lime")[0]
       
   417             self.assertEqual(data, "lime")
       
   418 
       
   419 
       
   420     def test_list2cmdline(self):
       
   421         self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
       
   422                          '"a b c" d e')
       
   423         self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
       
   424                          'ab\\"c \\ d')
       
   425         self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
       
   426                          'a\\\\\\b "de fg" h')
       
   427         self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
       
   428                          'a\\\\\\"b c d')
       
   429         self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
       
   430                          '"a\\\\b c" d e')
       
   431         self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
       
   432                          '"a\\\\b\\ c" d e')
       
   433         self.assertEqual(subprocess.list2cmdline(['ab', '']),
       
   434                          'ab ""')
       
   435 
       
   436 
       
   437     def test_poll(self):
       
   438         p = subprocess.Popen([sys.executable,
       
   439                           "-c", "import time; time.sleep(1)"])
       
   440         count = 0
       
   441         while p.poll() is None:
       
   442             time.sleep(0.1)
       
   443             count += 1
       
   444         # We expect that the poll loop probably went around about 10 times,
       
   445         # but, based on system scheduling we can't control, it's possible
       
   446         # poll() never returned None.  It "should be" very rare that it
       
   447         # didn't go around at least twice.
       
   448         self.assert_(count >= 2)
       
   449         # Subsequent invocations should just return the returncode
       
   450         self.assertEqual(p.poll(), 0)
       
   451 
       
   452 
       
   453     def test_wait(self):
       
   454         p = subprocess.Popen([sys.executable,
       
   455                           "-c", "import time; time.sleep(2)"])
       
   456         self.assertEqual(p.wait(), 0)
       
   457         # Subsequent invocations should just return the returncode
       
   458         self.assertEqual(p.wait(), 0)
       
   459 
       
   460 
       
   461     def test_invalid_bufsize(self):
       
   462         # an invalid type of the bufsize argument should raise
       
   463         # TypeError.
       
   464         try:
       
   465             subprocess.Popen([sys.executable, "-c", "pass"], "orange")
       
   466         except TypeError:
       
   467             pass
       
   468         else:
       
   469             self.fail("Expected TypeError")
       
   470 
       
   471     #
       
   472     # POSIX tests
       
   473     #
       
   474     if not mswindows:
       
   475         def test_exceptions(self):
       
   476             # catched & re-raised exceptions
       
   477             try:
       
   478                 p = subprocess.Popen([sys.executable, "-c", ""],
       
   479                                  cwd="/this/path/does/not/exist")
       
   480             except OSError, e:
       
   481                 # The attribute child_traceback should contain "os.chdir"
       
   482                 # somewhere.
       
   483                 self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
       
   484             else:
       
   485                 self.fail("Expected OSError")
       
   486 
       
   487         def _suppress_core_files(self):
       
   488             """Try to prevent core files from being created.
       
   489             Returns previous ulimit if successful, else None.
       
   490             """
       
   491             try:
       
   492                 import resource
       
   493                 old_limit = resource.getrlimit(resource.RLIMIT_CORE)
       
   494                 resource.setrlimit(resource.RLIMIT_CORE, (0,0))
       
   495                 return old_limit
       
   496             except (ImportError, ValueError, resource.error):
       
   497                 return None
       
   498 
       
   499         def _unsuppress_core_files(self, old_limit):
       
   500             """Return core file behavior to default."""
       
   501             if old_limit is None:
       
   502                 return
       
   503             try:
       
   504                 import resource
       
   505                 resource.setrlimit(resource.RLIMIT_CORE, old_limit)
       
   506             except (ImportError, ValueError, resource.error):
       
   507                 return
       
   508 
       
   509         def test_run_abort(self):
       
   510             # returncode handles signal termination
       
   511             old_limit = self._suppress_core_files()
       
   512             try:
       
   513                 p = subprocess.Popen([sys.executable,
       
   514                                       "-c", "import os; os.abort()"])
       
   515             finally:
       
   516                 self._unsuppress_core_files(old_limit)
       
   517             p.wait()
       
   518             self.assertEqual(-p.returncode, signal.SIGABRT)
       
   519 
       
   520         def test_preexec(self):
       
   521             # preexec function
       
   522             p = subprocess.Popen([sys.executable, "-c",
       
   523                               'import sys,os;' \
       
   524                               'sys.stdout.write(os.getenv("FRUIT"))'],
       
   525                              stdout=subprocess.PIPE,
       
   526                              preexec_fn=lambda: os.putenv("FRUIT", "apple"))
       
   527             self.assertEqual(p.stdout.read(), "apple")
       
   528 
       
   529         def test_args_string(self):
       
   530             # args is a string
       
   531             f, fname = self.mkstemp()
       
   532             os.write(f, "#!/bin/sh\n")
       
   533             os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" %
       
   534                         sys.executable)
       
   535             os.close(f)
       
   536             os.chmod(fname, 0700)
       
   537             p = subprocess.Popen(fname)
       
   538             p.wait()
       
   539             os.remove(fname)
       
   540             self.assertEqual(p.returncode, 47)
       
   541 
       
   542         def test_invalid_args(self):
       
   543             # invalid arguments should raise ValueError
       
   544             self.assertRaises(ValueError, subprocess.call,
       
   545                               [sys.executable,
       
   546                                "-c", "import sys; sys.exit(47)"],
       
   547                               startupinfo=47)
       
   548             self.assertRaises(ValueError, subprocess.call,
       
   549                               [sys.executable,
       
   550                                "-c", "import sys; sys.exit(47)"],
       
   551                               creationflags=47)
       
   552 
       
   553         def test_shell_sequence(self):
       
   554             # Run command through the shell (sequence)
       
   555             newenv = os.environ.copy()
       
   556             newenv["FRUIT"] = "apple"
       
   557             p = subprocess.Popen(["echo $FRUIT"], shell=1,
       
   558                                  stdout=subprocess.PIPE,
       
   559                                  env=newenv)
       
   560             self.assertEqual(p.stdout.read().strip(), "apple")
       
   561 
       
   562         def test_shell_string(self):
       
   563             # Run command through the shell (string)
       
   564             newenv = os.environ.copy()
       
   565             newenv["FRUIT"] = "apple"
       
   566             p = subprocess.Popen("echo $FRUIT", shell=1,
       
   567                                  stdout=subprocess.PIPE,
       
   568                                  env=newenv)
       
   569             self.assertEqual(p.stdout.read().strip(), "apple")
       
   570 
       
   571         def test_call_string(self):
       
   572             # call() function with string argument on UNIX
       
   573             f, fname = self.mkstemp()
       
   574             os.write(f, "#!/bin/sh\n")
       
   575             os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" %
       
   576                         sys.executable)
       
   577             os.close(f)
       
   578             os.chmod(fname, 0700)
       
   579             rc = subprocess.call(fname)
       
   580             os.remove(fname)
       
   581             self.assertEqual(rc, 47)
       
   582 
       
   583 
       
   584     #
       
   585     # Windows tests
       
   586     #
       
   587     if mswindows:
       
   588         def test_startupinfo(self):
       
   589             # startupinfo argument
       
   590             # We uses hardcoded constants, because we do not want to
       
   591             # depend on win32all.
       
   592             STARTF_USESHOWWINDOW = 1
       
   593             SW_MAXIMIZE = 3
       
   594             startupinfo = subprocess.STARTUPINFO()
       
   595             startupinfo.dwFlags = STARTF_USESHOWWINDOW
       
   596             startupinfo.wShowWindow = SW_MAXIMIZE
       
   597             # Since Python is a console process, it won't be affected
       
   598             # by wShowWindow, but the argument should be silently
       
   599             # ignored
       
   600             subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
       
   601                         startupinfo=startupinfo)
       
   602 
       
   603         def test_creationflags(self):
       
   604             # creationflags argument
       
   605             CREATE_NEW_CONSOLE = 16
       
   606             sys.stderr.write("    a DOS box should flash briefly ...\n")
       
   607             subprocess.call(sys.executable +
       
   608                                 ' -c "import time; time.sleep(0.25)"',
       
   609                             creationflags=CREATE_NEW_CONSOLE)
       
   610 
       
   611         def test_invalid_args(self):
       
   612             # invalid arguments should raise ValueError
       
   613             self.assertRaises(ValueError, subprocess.call,
       
   614                               [sys.executable,
       
   615                                "-c", "import sys; sys.exit(47)"],
       
   616                               preexec_fn=lambda: 1)
       
   617             self.assertRaises(ValueError, subprocess.call,
       
   618                               [sys.executable,
       
   619                                "-c", "import sys; sys.exit(47)"],
       
   620                               close_fds=True)
       
   621 
       
   622         def test_shell_sequence(self):
       
   623             # Run command through the shell (sequence)
       
   624             newenv = os.environ.copy()
       
   625             newenv["FRUIT"] = "physalis"
       
   626             p = subprocess.Popen(["set"], shell=1,
       
   627                                  stdout=subprocess.PIPE,
       
   628                                  env=newenv)
       
   629             self.assertNotEqual(p.stdout.read().find("physalis"), -1)
       
   630 
       
   631         def test_shell_string(self):
       
   632             # Run command through the shell (string)
       
   633             newenv = os.environ.copy()
       
   634             newenv["FRUIT"] = "physalis"
       
   635             p = subprocess.Popen("set", shell=1,
       
   636                                  stdout=subprocess.PIPE,
       
   637                                  env=newenv)
       
   638             self.assertNotEqual(p.stdout.read().find("physalis"), -1)
       
   639 
       
   640         def test_call_string(self):
       
   641             # call() function with string argument on Windows
       
   642             rc = subprocess.call(sys.executable +
       
   643                                  ' -c "import sys; sys.exit(47)"')
       
   644             self.assertEqual(rc, 47)
       
   645 
       
   646 
       
   647 def test_main():
       
   648     test_support.run_unittest(ProcessTestCase)
       
   649     if hasattr(test_support, "reap_children"):
       
   650         test_support.reap_children()
       
   651 
       
   652 if __name__ == "__main__":
       
   653     test_main()