changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
     1 """Supporting definitions for the Python regression tests."""
     3 if __name__ != 'test.test_support':
     4     raise ImportError('test_support must be imported from the test package')
     6 import contextlib
     7 import errno
     8 import socket
     9 import sys
    10 import os
    11 import shutil
    12 import warnings
    13 import unittest
    15 __all__ = ["Error", "TestFailed", "TestSkipped", "ResourceDenied", "import_module",
    16            "verbose", "use_resources", "max_memuse", "record_original_stdout",
    17            "get_original_stdout", "unload", "unlink", "rmtree", "forget",
    18            "is_resource_enabled", "requires", "find_unused_port", "bind_port",
    19            "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
    20            "findfile", "verify", "vereq", "sortdict", "check_syntax_error",
    21            "open_urlresource", "check_warnings", "CleanImport",
    22            "EnvironmentVarGuard", "captured_output",
    23            "captured_stdout", "TransientResource", "transient_internet",
    24            "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
    25            "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
    26            "threading_cleanup", "reap_children"]
    28 class Error(Exception):
    29     """Base class for regression test exceptions."""
    31 class TestFailed(Error):
    32     """Test failed."""
    34 class TestSkipped(Error):
    35     """Test skipped.
    37     This can be raised to indicate that a test was deliberatly
    38     skipped, but not because a feature wasn't available.  For
    39     example, if some resource can't be used, such as the network
    40     appears to be unavailable, this should be raised instead of
    41     TestFailed.
    42     """
    44 class ResourceDenied(TestSkipped):
    45     """Test skipped because it requested a disallowed resource.
    47     This is raised when a test calls requires() for a resource that
    48     has not be enabled.  It is used to distinguish between expected
    49     and unexpected skips.
    50     """
    52 def import_module(name, deprecated=False):
    53     """Import the module to be tested, raising TestSkipped if it is not
    54     available."""
    55     with warnings.catch_warnings():
    56         if deprecated:
    57             warnings.filterwarnings("ignore", ".+ (module|package)",
    58                                     DeprecationWarning)
    59         try:
    60             module = __import__(name, level=0)
    61         except ImportError:
    62             raise TestSkipped("No module named " + name)
    63         else:
    64             return module
    66 verbose = 1              # Flag set to 0 by
    67 use_resources = None     # Flag set to [] by
    68 max_memuse = 0           # Disable bigmem tests (they will still be run with
    69                          # small sizes, to make sure they work.)
    70 real_max_memuse = 0
    72 # _original_stdout is meant to hold stdout at the time regrtest began.
    73 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
    74 # The point is to have some flavor of stdout the user can actually see.
    75 _original_stdout = None
    76 def record_original_stdout(stdout):
    77     global _original_stdout
    78     _original_stdout = stdout
    80 def get_original_stdout():
    81     return _original_stdout or sys.stdout
    83 def unload(name):
    84     try:
    85         del sys.modules[name]
    86     except KeyError:
    87         pass
    89 def unlink(filename):
    90     try:
    91         os.unlink(filename)
    92     except OSError:
    93         pass
    95 def rmtree(path):
    96     try:
    97         shutil.rmtree(path)
    98     except OSError, e:
    99         # Unix returns ENOENT, Windows returns ESRCH.
   100         if e.errno not in (errno.ENOENT, errno.ESRCH):
   101             raise
   103 def forget(modname):
   104     '''"Forget" a module was ever imported by removing it from sys.modules and
   105     deleting any .pyc and .pyo files.'''
   106     unload(modname)
   107     for dirname in sys.path:
   108         unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
   109         # Deleting the .pyo file cannot be within the 'try' for the .pyc since
   110         # the chance exists that there is no .pyc (and thus the 'try' statement
   111         # is exited) but there is a .pyo file.
   112         unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
   114 def is_resource_enabled(resource):
   115     """Test whether a resource is enabled.  Known resources are set by
   117     return use_resources is not None and resource in use_resources
   119 def requires(resource, msg=None):
   120     """Raise ResourceDenied if the specified resource is not available.
   122     If the caller's module is __main__ then automatically return True.  The
   123     possibility of False being returned occurs when is executing."""
   124     # see if the caller's module is __main__ - if so, treat as if
   125     # the resource was set
   126     if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
   127         return
   128     if not is_resource_enabled(resource):
   129         if msg is None:
   130             msg = "Use of the `%s' resource not enabled" % resource
   131         raise ResourceDenied(msg)
   133 HOST = 'localhost'
   135 def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
   136     """Returns an unused port that should be suitable for binding.  This is
   137     achieved by creating a temporary socket with the same family and type as
   138     the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
   139     the specified host address (defaults to with the port set to 0,
   140     eliciting an unused ephemeral port from the OS.  The temporary socket is
   141     then closed and deleted, and the ephemeral port is returned.
   143     Either this method or bind_port() should be used for any tests where a
   144     server socket needs to be bound to a particular port for the duration of
   145     the test.  Which one to use depends on whether the calling code is creating
   146     a python socket, or if an unused port needs to be provided in a constructor
   147     or passed to an external program (i.e. the -accept argument to openssl's
   148     s_server mode).  Always prefer bind_port() over find_unused_port() where
   149     possible.  Hard coded ports should *NEVER* be used.  As soon as a server
   150     socket is bound to a hard coded port, the ability to run multiple instances
   151     of the test simultaneously on the same host is compromised, which makes the
   152     test a ticking time bomb in a buildbot environment. On Unix buildbots, this
   153     may simply manifest as a failed test, which can be recovered from without
   154     intervention in most cases, but on Windows, the entire python process can
   155     completely and utterly wedge, requiring someone to log in to the buildbot
   156     and manually kill the affected process.
   158     (This is easy to reproduce on Windows, unfortunately, and can be traced to
   159     the SO_REUSEADDR socket option having different semantics on Windows versus
   160     Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
   161     listen and then accept connections on identical host/ports.  An EADDRINUSE
   162     socket.error will be raised at some point (depending on the platform and
   163     the order bind and listen were called on each socket).
   165     However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
   166     will ever be raised when attempting to bind two identical host/ports. When
   167     accept() is called on each socket, the second caller's process will steal
   168     the port from the first caller, leaving them both in an awkwardly wedged
   169     state where they'll no longer respond to any signals or graceful kills, and
   170     must be forcibly killed via OpenProcess()/TerminateProcess().
   172     The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
   173     instead of SO_REUSEADDR, which effectively affords the same semantics as
   174     SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
   175     Source world compared to Windows ones, this is a common mistake.  A quick
   176     look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
   177     openssl.exe is called with the 's_server' option, for example. See
   178 for more info.  The following site also
   179     has a very thorough description about the implications of both REUSEADDR
   180     and EXCLUSIVEADDRUSE on Windows:
   183     XXX: although this approach is a vast improvement on previous attempts to
   184     elicit unused ports, it rests heavily on the assumption that the ephemeral
   185     port returned to us by the OS won't immediately be dished back out to some
   186     other process when we close and delete our temporary socket but before our
   187     calling code has a chance to bind the returned port.  We can deal with this
   188     issue if/when we come across it."""
   189     tempsock = socket.socket(family, socktype)
   190     port = bind_port(tempsock)
   191     tempsock.close()
   192     del tempsock
   193     return port
   195 def bind_port(sock, host=HOST):
   196     """Bind the socket to a free port and return the port number.  Relies on
   197     ephemeral ports in order to ensure we are using an unbound port.  This is
   198     important as many tests may be running simultaneously, especially in a
   199     buildbot environment.  This method raises an exception if the
   200     is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
   201     or SO_REUSEPORT set on it.  Tests should *never* set these socket options
   202     for TCP/IP sockets.  The only case for setting these options is testing
   203     multicasting via multiple UDP sockets.
   205     Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
   206     on Windows), it will be set on the socket.  This will prevent anyone else
   207     from bind()'ing to our host/port for the duration of the test.
   208     """
   209     if == socket.AF_INET and sock.type == socket.SOCK_STREAM:
   210         if hasattr(socket, 'SO_REUSEADDR'):
   211             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
   212                 raise TestFailed("tests should never set the SO_REUSEADDR "   \
   213                                  "socket option on TCP/IP sockets!")
   214         if hasattr(socket, 'SO_REUSEPORT'):
   215             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
   216                 raise TestFailed("tests should never set the SO_REUSEPORT "   \
   217                                  "socket option on TCP/IP sockets!")
   218         if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
   219             sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
   221     sock.bind((host, 0))
   222     port = sock.getsockname()[1]
   223     return port
   225 FUZZ = 1e-6
   227 def fcmp(x, y): # fuzzy comparison function
   228     if isinstance(x, float) or isinstance(y, float):
   229         try:
   230             fuzz = (abs(x) + abs(y)) * FUZZ
   231             if abs(x-y) <= fuzz:
   232                 return 0
   233         except:
   234             pass
   235     elif type(x) == type(y) and isinstance(x, (tuple, list)):
   236         for i in range(min(len(x), len(y))):
   237             outcome = fcmp(x[i], y[i])
   238             if outcome != 0:
   239                 return outcome
   240         return (len(x) > len(y)) - (len(x) < len(y))
   241     return (x > y) - (x < y)
   243 try:
   244     unicode
   245     have_unicode = True
   246 except NameError:
   247     have_unicode = False
   249 is_jython = sys.platform.startswith('java')
   251 # Filename used for testing
   252 if == 'java':
   253     # Jython disallows @ in module names
   254     TESTFN = '$test'
   255 elif == 'riscos':
   256     TESTFN = 'testfile'
   257 else:
   258     TESTFN = '@test'
   259     # Unicode name only used if TEST_FN_ENCODING exists for the platform.
   260     if have_unicode:
   261         # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
   262         # TESTFN_UNICODE is a filename that can be encoded using the
   263         # file system encoding, but *not* with the default (ascii) encoding
   264         if isinstance('', unicode):
   265             # python -U
   266             # XXX perhaps unicode() should accept Unicode strings?
   267             TESTFN_UNICODE = "@test-\xe0\xf2"
   268         else:
   269             # 2 latin characters.
   270             TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
   271         TESTFN_ENCODING = sys.getfilesystemencoding()
   272         # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
   273         # able to be encoded by *either* the default or filesystem encoding.
   274         # This test really only makes sense on Windows NT platforms
   275         # which have special Unicode support in posixmodule.
   276         if (not hasattr(sys, "getwindowsversion") or
   277                 sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
   278             TESTFN_UNICODE_UNENCODEABLE = None
   279         else:
   280             # Japanese characters (I think - from bug 846133)
   281             TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
   282             try:
   283                 # XXX - Note - should be using TESTFN_ENCODING here - but for
   284                 # Windows, "mbcs" currently always operates as if in
   285                 # errors=ignore' mode - hence we get '?' characters rather than
   286                 # the exception.  'Latin1' operates as we expect - ie, fails.
   287                 # See [ 850997 ] mbcs encoding ignores errors
   288                 TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
   289             except UnicodeEncodeError:
   290                 pass
   291             else:
   292                 print \
   293                 'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
   294                 'Unicode filename tests may not be effective' \
   295                 % TESTFN_UNICODE_UNENCODEABLE
   297 # Make sure we can write to TESTFN, try in /tmp if we can't
   298 fp = None
   299 try:
   300     fp = open(TESTFN, 'w+')
   301 except IOError:
   302     TMP_TESTFN = os.path.join('/tmp', TESTFN)
   303     try:
   304         fp = open(TMP_TESTFN, 'w+')
   305         TESTFN = TMP_TESTFN
   306         del TMP_TESTFN
   307     except IOError:
   308         print ('WARNING: tests will fail, unable to write to: %s or %s' %
   309                 (TESTFN, TMP_TESTFN))
   310 if fp is not None:
   311     fp.close()
   312     unlink(TESTFN)
   313 del fp
   315 def findfile(file, here=__file__):
   316     """Try to find a file on sys.path and the working directory.  If it is not
   317     found the argument passed to the function is returned (this does not
   318     necessarily signal failure; could still be the legitimate path)."""
   319     if os.path.isabs(file):
   320         return file
   321     path = sys.path
   322     path = [os.path.dirname(here)] + path
   323     for dn in path:
   324         fn = os.path.join(dn, file)
   325         if os.path.exists(fn): return fn
   326     return file
   328 def verify(condition, reason='test failed'):
   329     """Verify that condition is true. If not, raise TestFailed.
   331        The optional argument reason can be given to provide
   332        a better error text.
   333     """
   335     if not condition:
   336         raise TestFailed(reason)
   338 def vereq(a, b):
   339     """Raise TestFailed if a == b is false.
   341     This is better than verify(a == b) because, in case of failure, the
   342     error message incorporates repr(a) and repr(b) so you can see the
   343     inputs.
   345     Note that "not (a == b)" isn't necessarily the same as "a != b"; the
   346     former is tested.
   347     """
   349     if not (a == b):
   350         raise TestFailed("%r == %r" % (a, b))
   352 def sortdict(dict):
   353     "Like repr(dict), but in sorted order."
   354     items = dict.items()
   355     items.sort()
   356     reprpairs = ["%r: %r" % pair for pair in items]
   357     withcommas = ", ".join(reprpairs)
   358     return "{%s}" % withcommas
   360 def check_syntax_error(testcase, statement):
   361     try:
   362         compile(statement, '<test string>', 'exec')
   363     except SyntaxError:
   364         pass
   365     else:
   366'Missing SyntaxError: "%s"' % statement)
   368 def open_urlresource(url):
   369     import urllib, urlparse
   371     requires('urlfetch')
   372     filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
   374     for path in [os.path.curdir, os.path.pardir]:
   375         fn = os.path.join(path, filename)
   376         if os.path.exists(fn):
   377             return open(fn)
   379     print >> get_original_stdout(), '\tfetching %s ...' % url
   380     fn, _ = urllib.urlretrieve(url, filename)
   381     return open(fn)
   384 class WarningsRecorder(object):
   385     """Convenience wrapper for the warnings list returned on
   386        entry to the warnings.catch_warnings() context manager.
   387     """
   388     def __init__(self, warnings_list):
   389         self.warnings = warnings_list
   391     def __getattr__(self, attr):
   392         if self.warnings:
   393             return getattr(self.warnings[-1], attr)
   394         elif attr in warnings.WarningMessage._WARNING_DETAILS:
   395             return None
   396         raise AttributeError("%r has no attribute %r" % (self, attr))
   398     def reset(self):
   399         del self.warnings[:]
   401 @contextlib.contextmanager
   402 def check_warnings():
   403     with warnings.catch_warnings(record=True) as w:
   404         yield WarningsRecorder(w)
   407 class CleanImport(object):
   408     """Context manager to force import to return a new module reference.
   410     This is useful for testing module-level behaviours, such as
   411     the emission of a DeprecationWarning on import.
   413     Use like this:
   415         with CleanImport("foo"):
   416             __import__("foo") # new reference
   417     """
   419     def __init__(self, *module_names):
   420         self.original_modules = sys.modules.copy()
   421         for module_name in module_names:
   422             if module_name in sys.modules:
   423                 module = sys.modules[module_name]
   424                 # It is possible that module_name is just an alias for
   425                 # another module (e.g. stub for modules renamed in 3.x).
   426                 # In that case, we also need delete the real module to clear
   427                 # the import cache.
   428                 if module.__name__ != module_name:
   429                     del sys.modules[module.__name__]
   430                 del sys.modules[module_name]
   432     def __enter__(self):
   433         return self
   435     def __exit__(self, *ignore_exc):
   436         sys.modules.update(self.original_modules)
   439 class EnvironmentVarGuard(object):
   441     """Class to help protect the environment variable properly.  Can be used as
   442     a context manager."""
   444     def __init__(self):
   445         self._environ = os.environ
   446         self._unset = set()
   447         self._reset = dict()
   449     def set(self, envvar, value):
   450         if envvar not in self._environ:
   451             self._unset.add(envvar)
   452         else:
   453             self._reset[envvar] = self._environ[envvar]
   454         self._environ[envvar] = value
   456     def unset(self, envvar):
   457         if envvar in self._environ:
   458             self._reset[envvar] = self._environ[envvar]
   459             del self._environ[envvar]
   461     def __enter__(self):
   462         return self
   464     def __exit__(self, *ignore_exc):
   465         for envvar, value in self._reset.iteritems():
   466             self._environ[envvar] = value
   467         for unset in self._unset:
   468             del self._environ[unset]
   470 class TransientResource(object):
   472     """Raise ResourceDenied if an exception is raised while the context manager
   473     is in effect that matches the specified exception and attributes."""
   475     def __init__(self, exc, **kwargs):
   476         self.exc = exc
   477         self.attrs = kwargs
   479     def __enter__(self):
   480         return self
   482     def __exit__(self, type_=None, value=None, traceback=None):
   483         """If type_ is a subclass of self.exc and value has attributes matching
   484         self.attrs, raise ResourceDenied.  Otherwise let the exception
   485         propagate (if any)."""
   486         if type_ is not None and issubclass(self.exc, type_):
   487             for attr, attr_value in self.attrs.iteritems():
   488                 if not hasattr(value, attr):
   489                     break
   490                 if getattr(value, attr) != attr_value:
   491                     break
   492             else:
   493                 raise ResourceDenied("an optional resource is not available")
   496 def transient_internet():
   497     """Return a context manager that raises ResourceDenied when various issues
   498     with the Internet connection manifest themselves as exceptions."""
   499     time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
   500     socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
   501     ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
   502     return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
   505 @contextlib.contextmanager
   506 def captured_output(stream_name):
   507     """Run the 'with' statement body using a StringIO object in place of a
   508     specific attribute on the sys module.
   509     Example use (with 'stream_name=stdout')::
   511        with captured_stdout() as s:
   512            print "hello"
   513        assert s.getvalue() == "hello"
   514     """
   515     import StringIO
   516     orig_stdout = getattr(sys, stream_name)
   517     setattr(sys, stream_name, StringIO.StringIO())
   518     try:
   519         yield getattr(sys, stream_name)
   520     finally:
   521         setattr(sys, stream_name, orig_stdout)
   523 def captured_stdout():
   524     return captured_output("stdout")
   527 #=======================================================================
   528 # Decorator for running a function in a different locale, correctly resetting
   529 # it afterwards.
   531 def run_with_locale(catstr, *locales):
   532     def decorator(func):
   533         def inner(*args, **kwds):
   534             try:
   535                 import locale
   536                 category = getattr(locale, catstr)
   537                 orig_locale = locale.setlocale(category)
   538             except AttributeError:
   539                 # if the test author gives us an invalid category string
   540                 raise
   541             except:
   542                 # cannot retrieve original locale, so do nothing
   543                 locale = orig_locale = None
   544             else:
   545                 for loc in locales:
   546                     try:
   547                         locale.setlocale(category, loc)
   548                         break
   549                     except:
   550                         pass
   552             # now run the function, resetting the locale on exceptions
   553             try:
   554                 return func(*args, **kwds)
   555             finally:
   556                 if locale and orig_locale:
   557                     locale.setlocale(category, orig_locale)
   558         inner.func_name = func.func_name
   559         inner.__doc__ = func.__doc__
   560         return inner
   561     return decorator
   563 #=======================================================================
   564 # Big-memory-test support. Separate from 'resources' because memory use should be configurable.
   566 # Some handy shorthands. Note that these are used for byte-limits as well
   567 # as size-limits, in the various bigmem tests
   568 _1M = 1024*1024
   569 _1G = 1024 * _1M
   570 _2G = 2 * _1G
   571 _4G = 4 * _1G
   573 MAX_Py_ssize_t = sys.maxsize
   575 def set_memlimit(limit):
   576     import re
   577     global max_memuse
   578     global real_max_memuse
   579     sizes = {
   580         'k': 1024,
   581         'm': _1M,
   582         'g': _1G,
   583         't': 1024*_1G,
   584     }
   585     m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
   586                  re.IGNORECASE | re.VERBOSE)
   587     if m is None:
   588         raise ValueError('Invalid memory limit %r' % (limit,))
   589     memlimit = int(float( * sizes[])
   590     real_max_memuse = memlimit
   591     if memlimit > MAX_Py_ssize_t:
   592         memlimit = MAX_Py_ssize_t
   593     if memlimit < _2G - 1:
   594         raise ValueError('Memory limit %r too low to be useful' % (limit,))
   595     max_memuse = memlimit
   597 def bigmemtest(minsize, memuse, overhead=5*_1M):
   598     """Decorator for bigmem tests.
   600     'minsize' is the minimum useful size for the test (in arbitrary,
   601     test-interpreted units.) 'memuse' is the number of 'bytes per size' for
   602     the test, or a good estimate of it. 'overhead' specifies fixed overhead,
   603     independent of the testsize, and defaults to 5Mb.
   605     The decorator tries to guess a good value for 'size' and passes it to
   606     the decorated test function. If minsize * memuse is more than the
   607     allowed memory use (as defined by max_memuse), the test is skipped.
   608     Otherwise, minsize is adjusted upward to use up to max_memuse.
   609     """
   610     def decorator(f):
   611         def wrapper(self):
   612             if not max_memuse:
   613                 # If max_memuse is 0 (the default),
   614                 # we still want to run the tests with size set to a few kb,
   615                 # to make sure they work. We still want to avoid using
   616                 # too much memory, though, but we do that noisily.
   617                 maxsize = 5147
   618                 self.failIf(maxsize * memuse + overhead > 20 * _1M)
   619             else:
   620                 maxsize = int((max_memuse - overhead) / memuse)
   621                 if maxsize < minsize:
   622                     # Really ought to print 'test skipped' or something
   623                     if verbose:
   624                         sys.stderr.write("Skipping %s because of memory "
   625                                          "constraint\n" % (f.__name__,))
   626                     return
   627                 # Try to keep some breathing room in memory use
   628                 maxsize = max(maxsize - 50 * _1M, minsize)
   629             return f(self, maxsize)
   630         wrapper.minsize = minsize
   631         wrapper.memuse = memuse
   632         wrapper.overhead = overhead
   633         return wrapper
   634     return decorator
   636 def precisionbigmemtest(size, memuse, overhead=5*_1M):
   637     def decorator(f):
   638         def wrapper(self):
   639             if not real_max_memuse:
   640                 maxsize = 5147
   641             else:
   642                 maxsize = size
   644                 if real_max_memuse and real_max_memuse < maxsize * memuse:
   645                     if verbose:
   646                         sys.stderr.write("Skipping %s because of memory "
   647                                          "constraint\n" % (f.__name__,))
   648                     return
   650             return f(self, maxsize)
   651         wrapper.size = size
   652         wrapper.memuse = memuse
   653         wrapper.overhead = overhead
   654         return wrapper
   655     return decorator
   657 def bigaddrspacetest(f):
   658     """Decorator for tests that fill the address space."""
   659     def wrapper(self):
   660         if max_memuse < MAX_Py_ssize_t:
   661             if verbose:
   662                 sys.stderr.write("Skipping %s because of memory "
   663                                  "constraint\n" % (f.__name__,))
   664         else:
   665             return f(self)
   666     return wrapper
   668 #=======================================================================
   669 # unittest integration.
   671 class BasicTestRunner:
   672     def run(self, test):
   673         result = unittest.TestResult()
   674         test(result)
   675         return result
   678 def _run_suite(suite):
   679     """Run tests from a unittest.TestSuite-derived class."""
   680     if verbose:
   681         runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
   682     else:
   683         runner = BasicTestRunner()
   685     result =
   686     if not result.wasSuccessful():
   687         if len(result.errors) == 1 and not result.failures:
   688             err = result.errors[0][1]
   689         elif len(result.failures) == 1 and not result.errors:
   690             err = result.failures[0][1]
   691         else:
   692             err = "errors occurred; run in verbose mode for details"
   693         raise TestFailed(err)
   696 def run_unittest(*classes):
   697     """Run tests from unittest.TestCase-derived classes."""
   698     valid_types = (unittest.TestSuite, unittest.TestCase)
   699     suite = unittest.TestSuite()
   700     for cls in classes:
   701         if isinstance(cls, str):
   702             if cls in sys.modules:
   703                 suite.addTest(unittest.findTestCases(sys.modules[cls]))
   704             else:
   705                 raise ValueError("str arguments must be keys in sys.modules")
   706         elif isinstance(cls, valid_types):
   707             suite.addTest(cls)
   708         else:
   709             suite.addTest(unittest.makeSuite(cls))
   710     _run_suite(suite)
   713 #=======================================================================
   714 # doctest driver.
   716 def run_doctest(module, verbosity=None):
   717     """Run doctest on the given module.  Return (#failures, #tests).
   719     If optional argument verbosity is not specified (or is None), pass
   720     test_support's belief about verbosity on to doctest.  Else doctest's
   721     usual behavior is used (it searches sys.argv for -v).
   722     """
   724     import doctest
   726     if verbosity is None:
   727         verbosity = verbose
   728     else:
   729         verbosity = None
   731     # Direct doctest output (normally just errors) to real stdout; doctest
   732     # output shouldn't be compared by regrtest.
   733     save_stdout = sys.stdout
   734     sys.stdout = get_original_stdout()
   735     try:
   736         f, t = doctest.testmod(module, verbose=verbosity)
   737         if f:
   738             raise TestFailed("%d of %d doctests failed" % (f, t))
   739     finally:
   740         sys.stdout = save_stdout
   741     if verbose:
   742         print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
   743     return f, t
   745 #=======================================================================
   746 # Threading support to prevent reporting refleaks when running -R
   748 def threading_setup():
   749     import threading
   750     return len(threading._active), len(threading._limbo)
   752 def threading_cleanup(num_active, num_limbo):
   753     import threading
   754     import time
   756     _MAX_COUNT = 10
   757     count = 0
   758     while len(threading._active) != num_active and count < _MAX_COUNT:
   759         count += 1
   760         time.sleep(0.1)
   762     count = 0
   763     while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
   764         count += 1
   765         time.sleep(0.1)
   767 def reap_children():
   768     """Use this function at the end of test_main() whenever sub-processes
   769     are started.  This will help ensure that no extra children (zombies)
   770     stick around to hog resources and create problems when looking
   771     for refleaks.
   772     """
   774     # Reap all our dead child processes so we don't leave zombies around.
   775     # These hog resources and might be causing some of the buildbots to die.
   776     if hasattr(os, 'waitpid'):
   777         any_process = -1
   778         while True:
   779             try:
   780                 # This will raise an exception on Windows.  That's ok.
   781                 pid, status = os.waitpid(any_process, os.WNOHANG)
   782                 if pid == 0:
   783                     break
   784             except:
   785                 break