symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_support.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """Supporting definitions for the Python regression tests."""
       
     2 
       
     3 if __name__ != 'test.test_support':
       
     4     raise ImportError('test_support must be imported from the test package')
       
     5 
       
     6 import contextlib
       
     7 import errno
       
     8 import socket
       
     9 import sys
       
    10 import os
       
    11 import shutil
       
    12 import warnings
       
    13 import unittest
       
    14 
       
    15 __all__ = ["Error", "TestFailed", "TestSkipped", "ResourceDenied", "import_module",
       
    16            "verbose", "use_resources", "max_memuse", "record_original_stdout",
       
    17            "get_original_stdout", "unload", "unlink", "rmtree", "forget",
       
    18            "is_resource_enabled", "requires", "find_unused_port", "bind_port",
       
    19            "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
       
    20            "findfile", "verify", "vereq", "sortdict", "check_syntax_error",
       
    21            "open_urlresource", "check_warnings", "CleanImport",
       
    22            "EnvironmentVarGuard", "captured_output",
       
    23            "captured_stdout", "TransientResource", "transient_internet",
       
    24            "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
       
    25            "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
       
    26            "threading_cleanup", "reap_children"]
       
    27 
       
    28 class Error(Exception):
       
    29     """Base class for regression test exceptions."""
       
    30 
       
    31 class TestFailed(Error):
       
    32     """Test failed."""
       
    33 
       
    34 class TestSkipped(Error):
       
    35     """Test skipped.
       
    36 
       
    37     This can be raised to indicate that a test was deliberatly
       
    38     skipped, but not because a feature wasn't available.  For
       
    39     example, if some resource can't be used, such as the network
       
    40     appears to be unavailable, this should be raised instead of
       
    41     TestFailed.
       
    42     """
       
    43 
       
    44 class ResourceDenied(TestSkipped):
       
    45     """Test skipped because it requested a disallowed resource.
       
    46 
       
    47     This is raised when a test calls requires() for a resource that
       
    48     has not be enabled.  It is used to distinguish between expected
       
    49     and unexpected skips.
       
    50     """
       
    51 
       
    52 def import_module(name, deprecated=False):
       
    53     """Import the module to be tested, raising TestSkipped if it is not
       
    54     available."""
       
    55     with warnings.catch_warnings():
       
    56         if deprecated:
       
    57             warnings.filterwarnings("ignore", ".+ (module|package)",
       
    58                                     DeprecationWarning)
       
    59         try:
       
    60             module = __import__(name, level=0)
       
    61         except ImportError:
       
    62             raise TestSkipped("No module named " + name)
       
    63         else:
       
    64             return module
       
    65 
       
    66 verbose = 1              # Flag set to 0 by regrtest.py
       
    67 use_resources = None     # Flag set to [] by regrtest.py
       
    68 max_memuse = 0           # Disable bigmem tests (they will still be run with
       
    69                          # small sizes, to make sure they work.)
       
    70 real_max_memuse = 0
       
    71 
       
    72 # _original_stdout is meant to hold stdout at the time regrtest began.
       
    73 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
       
    74 # The point is to have some flavor of stdout the user can actually see.
       
    75 _original_stdout = None
       
    76 def record_original_stdout(stdout):
       
    77     global _original_stdout
       
    78     _original_stdout = stdout
       
    79 
       
    80 def get_original_stdout():
       
    81     return _original_stdout or sys.stdout
       
    82 
       
    83 def unload(name):
       
    84     try:
       
    85         del sys.modules[name]
       
    86     except KeyError:
       
    87         pass
       
    88 
       
    89 def unlink(filename):
       
    90     try:
       
    91         os.unlink(filename)
       
    92     except OSError:
       
    93         pass
       
    94 
       
    95 def rmtree(path):
       
    96     try:
       
    97         shutil.rmtree(path)
       
    98     except OSError, e:
       
    99         # Unix returns ENOENT, Windows returns ESRCH.
       
   100         if e.errno not in (errno.ENOENT, errno.ESRCH):
       
   101             raise
       
   102 
       
   103 def forget(modname):
       
   104     '''"Forget" a module was ever imported by removing it from sys.modules and
       
   105     deleting any .pyc and .pyo files.'''
       
   106     unload(modname)
       
   107     for dirname in sys.path:
       
   108         unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
       
   109         # Deleting the .pyo file cannot be within the 'try' for the .pyc since
       
   110         # the chance exists that there is no .pyc (and thus the 'try' statement
       
   111         # is exited) but there is a .pyo file.
       
   112         unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
       
   113 
       
   114 def is_resource_enabled(resource):
       
   115     """Test whether a resource is enabled.  Known resources are set by
       
   116     regrtest.py."""
       
   117     return use_resources is not None and resource in use_resources
       
   118 
       
   119 def requires(resource, msg=None):
       
   120     """Raise ResourceDenied if the specified resource is not available.
       
   121 
       
   122     If the caller's module is __main__ then automatically return True.  The
       
   123     possibility of False being returned occurs when regrtest.py is executing."""
       
   124     # see if the caller's module is __main__ - if so, treat as if
       
   125     # the resource was set
       
   126     if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
       
   127         return
       
   128     if not is_resource_enabled(resource):
       
   129         if msg is None:
       
   130             msg = "Use of the `%s' resource not enabled" % resource
       
   131         raise ResourceDenied(msg)
       
   132 
       
   133 HOST = 'localhost'
       
   134 
       
   135 def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
       
   136     """Returns an unused port that should be suitable for binding.  This is
       
   137     achieved by creating a temporary socket with the same family and type as
       
   138     the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
       
   139     the specified host address (defaults to 0.0.0.0) with the port set to 0,
       
   140     eliciting an unused ephemeral port from the OS.  The temporary socket is
       
   141     then closed and deleted, and the ephemeral port is returned.
       
   142 
       
   143     Either this method or bind_port() should be used for any tests where a
       
   144     server socket needs to be bound to a particular port for the duration of
       
   145     the test.  Which one to use depends on whether the calling code is creating
       
   146     a python socket, or if an unused port needs to be provided in a constructor
       
   147     or passed to an external program (i.e. the -accept argument to openssl's
       
   148     s_server mode).  Always prefer bind_port() over find_unused_port() where
       
   149     possible.  Hard coded ports should *NEVER* be used.  As soon as a server
       
   150     socket is bound to a hard coded port, the ability to run multiple instances
       
   151     of the test simultaneously on the same host is compromised, which makes the
       
   152     test a ticking time bomb in a buildbot environment. On Unix buildbots, this
       
   153     may simply manifest as a failed test, which can be recovered from without
       
   154     intervention in most cases, but on Windows, the entire python process can
       
   155     completely and utterly wedge, requiring someone to log in to the buildbot
       
   156     and manually kill the affected process.
       
   157 
       
   158     (This is easy to reproduce on Windows, unfortunately, and can be traced to
       
   159     the SO_REUSEADDR socket option having different semantics on Windows versus
       
   160     Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
       
   161     listen and then accept connections on identical host/ports.  An EADDRINUSE
       
   162     socket.error will be raised at some point (depending on the platform and
       
   163     the order bind and listen were called on each socket).
       
   164 
       
   165     However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
       
   166     will ever be raised when attempting to bind two identical host/ports. When
       
   167     accept() is called on each socket, the second caller's process will steal
       
   168     the port from the first caller, leaving them both in an awkwardly wedged
       
   169     state where they'll no longer respond to any signals or graceful kills, and
       
   170     must be forcibly killed via OpenProcess()/TerminateProcess().
       
   171 
       
   172     The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
       
   173     instead of SO_REUSEADDR, which effectively affords the same semantics as
       
   174     SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
       
   175     Source world compared to Windows ones, this is a common mistake.  A quick
       
   176     look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
       
   177     openssl.exe is called with the 's_server' option, for example. See
       
   178     http://bugs.python.org/issue2550 for more info.  The following site also
       
   179     has a very thorough description about the implications of both REUSEADDR
       
   180     and EXCLUSIVEADDRUSE on Windows:
       
   181     http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
       
   182 
       
   183     XXX: although this approach is a vast improvement on previous attempts to
       
   184     elicit unused ports, it rests heavily on the assumption that the ephemeral
       
   185     port returned to us by the OS won't immediately be dished back out to some
       
   186     other process when we close and delete our temporary socket but before our
       
   187     calling code has a chance to bind the returned port.  We can deal with this
       
   188     issue if/when we come across it."""
       
   189     tempsock = socket.socket(family, socktype)
       
   190     port = bind_port(tempsock)
       
   191     tempsock.close()
       
   192     del tempsock
       
   193     return port
       
   194 
       
   195 def bind_port(sock, host=HOST):
       
   196     """Bind the socket to a free port and return the port number.  Relies on
       
   197     ephemeral ports in order to ensure we are using an unbound port.  This is
       
   198     important as many tests may be running simultaneously, especially in a
       
   199     buildbot environment.  This method raises an exception if the sock.family
       
   200     is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
       
   201     or SO_REUSEPORT set on it.  Tests should *never* set these socket options
       
   202     for TCP/IP sockets.  The only case for setting these options is testing
       
   203     multicasting via multiple UDP sockets.
       
   204 
       
   205     Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
       
   206     on Windows), it will be set on the socket.  This will prevent anyone else
       
   207     from bind()'ing to our host/port for the duration of the test.
       
   208     """
       
   209     if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
       
   210         if hasattr(socket, 'SO_REUSEADDR'):
       
   211             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
       
   212                 raise TestFailed("tests should never set the SO_REUSEADDR "   \
       
   213                                  "socket option on TCP/IP sockets!")
       
   214         if hasattr(socket, 'SO_REUSEPORT'):
       
   215             if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
       
   216                 raise TestFailed("tests should never set the SO_REUSEPORT "   \
       
   217                                  "socket option on TCP/IP sockets!")
       
   218         if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
       
   219             sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
       
   220 
       
   221     sock.bind((host, 0))
       
   222     port = sock.getsockname()[1]
       
   223     return port
       
   224 
       
   225 FUZZ = 1e-6
       
   226 
       
   227 def fcmp(x, y): # fuzzy comparison function
       
   228     if isinstance(x, float) or isinstance(y, float):
       
   229         try:
       
   230             fuzz = (abs(x) + abs(y)) * FUZZ
       
   231             if abs(x-y) <= fuzz:
       
   232                 return 0
       
   233         except:
       
   234             pass
       
   235     elif type(x) == type(y) and isinstance(x, (tuple, list)):
       
   236         for i in range(min(len(x), len(y))):
       
   237             outcome = fcmp(x[i], y[i])
       
   238             if outcome != 0:
       
   239                 return outcome
       
   240         return (len(x) > len(y)) - (len(x) < len(y))
       
   241     return (x > y) - (x < y)
       
   242 
       
   243 try:
       
   244     unicode
       
   245     have_unicode = True
       
   246 except NameError:
       
   247     have_unicode = False
       
   248 
       
   249 is_jython = sys.platform.startswith('java')
       
   250 
       
   251 # Filename used for testing
       
   252 if os.name == 'java':
       
   253     # Jython disallows @ in module names
       
   254     TESTFN = '$test'
       
   255 elif os.name == 'riscos':
       
   256     TESTFN = 'testfile'
       
   257 else:
       
   258     TESTFN = '@test'
       
   259     # Unicode name only used if TEST_FN_ENCODING exists for the platform.
       
   260     if have_unicode:
       
   261         # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
       
   262         # TESTFN_UNICODE is a filename that can be encoded using the
       
   263         # file system encoding, but *not* with the default (ascii) encoding
       
   264         if isinstance('', unicode):
       
   265             # python -U
       
   266             # XXX perhaps unicode() should accept Unicode strings?
       
   267             TESTFN_UNICODE = "@test-\xe0\xf2"
       
   268         else:
       
   269             # 2 latin characters.
       
   270             TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
       
   271         TESTFN_ENCODING = sys.getfilesystemencoding()
       
   272         # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
       
   273         # able to be encoded by *either* the default or filesystem encoding.
       
   274         # This test really only makes sense on Windows NT platforms
       
   275         # which have special Unicode support in posixmodule.
       
   276         if (not hasattr(sys, "getwindowsversion") or
       
   277                 sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
       
   278             TESTFN_UNICODE_UNENCODEABLE = None
       
   279         else:
       
   280             # Japanese characters (I think - from bug 846133)
       
   281             TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
       
   282             try:
       
   283                 # XXX - Note - should be using TESTFN_ENCODING here - but for
       
   284                 # Windows, "mbcs" currently always operates as if in
       
   285                 # errors=ignore' mode - hence we get '?' characters rather than
       
   286                 # the exception.  'Latin1' operates as we expect - ie, fails.
       
   287                 # See [ 850997 ] mbcs encoding ignores errors
       
   288                 TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
       
   289             except UnicodeEncodeError:
       
   290                 pass
       
   291             else:
       
   292                 print \
       
   293                 'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
       
   294                 'Unicode filename tests may not be effective' \
       
   295                 % TESTFN_UNICODE_UNENCODEABLE
       
   296 
       
   297 # Make sure we can write to TESTFN, try in /tmp if we can't
       
   298 fp = None
       
   299 try:
       
   300     fp = open(TESTFN, 'w+')
       
   301 except IOError:
       
   302     TMP_TESTFN = os.path.join('/tmp', TESTFN)
       
   303     try:
       
   304         fp = open(TMP_TESTFN, 'w+')
       
   305         TESTFN = TMP_TESTFN
       
   306         del TMP_TESTFN
       
   307     except IOError:
       
   308         print ('WARNING: tests will fail, unable to write to: %s or %s' %
       
   309                 (TESTFN, TMP_TESTFN))
       
   310 if fp is not None:
       
   311     fp.close()
       
   312     unlink(TESTFN)
       
   313 del fp
       
   314 
       
   315 def findfile(file, here=__file__):
       
   316     """Try to find a file on sys.path and the working directory.  If it is not
       
   317     found the argument passed to the function is returned (this does not
       
   318     necessarily signal failure; could still be the legitimate path)."""
       
   319     if os.path.isabs(file):
       
   320         return file
       
   321     path = sys.path
       
   322     path = [os.path.dirname(here)] + path
       
   323     for dn in path:
       
   324         fn = os.path.join(dn, file)
       
   325         if os.path.exists(fn): return fn
       
   326     return file
       
   327 
       
   328 def verify(condition, reason='test failed'):
       
   329     """Verify that condition is true. If not, raise TestFailed.
       
   330 
       
   331        The optional argument reason can be given to provide
       
   332        a better error text.
       
   333     """
       
   334 
       
   335     if not condition:
       
   336         raise TestFailed(reason)
       
   337 
       
   338 def vereq(a, b):
       
   339     """Raise TestFailed if a == b is false.
       
   340 
       
   341     This is better than verify(a == b) because, in case of failure, the
       
   342     error message incorporates repr(a) and repr(b) so you can see the
       
   343     inputs.
       
   344 
       
   345     Note that "not (a == b)" isn't necessarily the same as "a != b"; the
       
   346     former is tested.
       
   347     """
       
   348 
       
   349     if not (a == b):
       
   350         raise TestFailed("%r == %r" % (a, b))
       
   351 
       
   352 def sortdict(dict):
       
   353     "Like repr(dict), but in sorted order."
       
   354     items = dict.items()
       
   355     items.sort()
       
   356     reprpairs = ["%r: %r" % pair for pair in items]
       
   357     withcommas = ", ".join(reprpairs)
       
   358     return "{%s}" % withcommas
       
   359 
       
   360 def check_syntax_error(testcase, statement):
       
   361     try:
       
   362         compile(statement, '<test string>', 'exec')
       
   363     except SyntaxError:
       
   364         pass
       
   365     else:
       
   366         testcase.fail('Missing SyntaxError: "%s"' % statement)
       
   367 
       
   368 def open_urlresource(url):
       
   369     import urllib, urlparse
       
   370 
       
   371     requires('urlfetch')
       
   372     filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
       
   373 
       
   374     for path in [os.path.curdir, os.path.pardir]:
       
   375         fn = os.path.join(path, filename)
       
   376         if os.path.exists(fn):
       
   377             return open(fn)
       
   378 
       
   379     print >> get_original_stdout(), '\tfetching %s ...' % url
       
   380     fn, _ = urllib.urlretrieve(url, filename)
       
   381     return open(fn)
       
   382 
       
   383 
       
   384 class WarningsRecorder(object):
       
   385     """Convenience wrapper for the warnings list returned on
       
   386        entry to the warnings.catch_warnings() context manager.
       
   387     """
       
   388     def __init__(self, warnings_list):
       
   389         self.warnings = warnings_list
       
   390 
       
   391     def __getattr__(self, attr):
       
   392         if self.warnings:
       
   393             return getattr(self.warnings[-1], attr)
       
   394         elif attr in warnings.WarningMessage._WARNING_DETAILS:
       
   395             return None
       
   396         raise AttributeError("%r has no attribute %r" % (self, attr))
       
   397 
       
   398     def reset(self):
       
   399         del self.warnings[:]
       
   400 
       
   401 @contextlib.contextmanager
       
   402 def check_warnings():
       
   403     with warnings.catch_warnings(record=True) as w:
       
   404         yield WarningsRecorder(w)
       
   405 
       
   406 
       
   407 class CleanImport(object):
       
   408     """Context manager to force import to return a new module reference.
       
   409 
       
   410     This is useful for testing module-level behaviours, such as
       
   411     the emission of a DeprecationWarning on import.
       
   412 
       
   413     Use like this:
       
   414 
       
   415         with CleanImport("foo"):
       
   416             __import__("foo") # new reference
       
   417     """
       
   418 
       
   419     def __init__(self, *module_names):
       
   420         self.original_modules = sys.modules.copy()
       
   421         for module_name in module_names:
       
   422             if module_name in sys.modules:
       
   423                 module = sys.modules[module_name]
       
   424                 # It is possible that module_name is just an alias for
       
   425                 # another module (e.g. stub for modules renamed in 3.x).
       
   426                 # In that case, we also need delete the real module to clear
       
   427                 # the import cache.
       
   428                 if module.__name__ != module_name:
       
   429                     del sys.modules[module.__name__]
       
   430                 del sys.modules[module_name]
       
   431 
       
   432     def __enter__(self):
       
   433         return self
       
   434 
       
   435     def __exit__(self, *ignore_exc):
       
   436         sys.modules.update(self.original_modules)
       
   437 
       
   438 
       
   439 class EnvironmentVarGuard(object):
       
   440 
       
   441     """Class to help protect the environment variable properly.  Can be used as
       
   442     a context manager."""
       
   443 
       
   444     def __init__(self):
       
   445         self._environ = os.environ
       
   446         self._unset = set()
       
   447         self._reset = dict()
       
   448 
       
   449     def set(self, envvar, value):
       
   450         if envvar not in self._environ:
       
   451             self._unset.add(envvar)
       
   452         else:
       
   453             self._reset[envvar] = self._environ[envvar]
       
   454         self._environ[envvar] = value
       
   455 
       
   456     def unset(self, envvar):
       
   457         if envvar in self._environ:
       
   458             self._reset[envvar] = self._environ[envvar]
       
   459             del self._environ[envvar]
       
   460 
       
   461     def __enter__(self):
       
   462         return self
       
   463 
       
   464     def __exit__(self, *ignore_exc):
       
   465         for envvar, value in self._reset.iteritems():
       
   466             self._environ[envvar] = value
       
   467         for unset in self._unset:
       
   468             del self._environ[unset]
       
   469 
       
   470 class TransientResource(object):
       
   471 
       
   472     """Raise ResourceDenied if an exception is raised while the context manager
       
   473     is in effect that matches the specified exception and attributes."""
       
   474 
       
   475     def __init__(self, exc, **kwargs):
       
   476         self.exc = exc
       
   477         self.attrs = kwargs
       
   478 
       
   479     def __enter__(self):
       
   480         return self
       
   481 
       
   482     def __exit__(self, type_=None, value=None, traceback=None):
       
   483         """If type_ is a subclass of self.exc and value has attributes matching
       
   484         self.attrs, raise ResourceDenied.  Otherwise let the exception
       
   485         propagate (if any)."""
       
   486         if type_ is not None and issubclass(self.exc, type_):
       
   487             for attr, attr_value in self.attrs.iteritems():
       
   488                 if not hasattr(value, attr):
       
   489                     break
       
   490                 if getattr(value, attr) != attr_value:
       
   491                     break
       
   492             else:
       
   493                 raise ResourceDenied("an optional resource is not available")
       
   494 
       
   495 
       
   496 def transient_internet():
       
   497     """Return a context manager that raises ResourceDenied when various issues
       
   498     with the Internet connection manifest themselves as exceptions."""
       
   499     time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
       
   500     socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
       
   501     ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
       
   502     return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
       
   503 
       
   504 
       
   505 @contextlib.contextmanager
       
   506 def captured_output(stream_name):
       
   507     """Run the 'with' statement body using a StringIO object in place of a
       
   508     specific attribute on the sys module.
       
   509     Example use (with 'stream_name=stdout')::
       
   510 
       
   511        with captured_stdout() as s:
       
   512            print "hello"
       
   513        assert s.getvalue() == "hello"
       
   514     """
       
   515     import StringIO
       
   516     orig_stdout = getattr(sys, stream_name)
       
   517     setattr(sys, stream_name, StringIO.StringIO())
       
   518     try:
       
   519         yield getattr(sys, stream_name)
       
   520     finally:
       
   521         setattr(sys, stream_name, orig_stdout)
       
   522 
       
   523 def captured_stdout():
       
   524     return captured_output("stdout")
       
   525 
       
   526 
       
   527 #=======================================================================
       
   528 # Decorator for running a function in a different locale, correctly resetting
       
   529 # it afterwards.
       
   530 
       
   531 def run_with_locale(catstr, *locales):
       
   532     def decorator(func):
       
   533         def inner(*args, **kwds):
       
   534             try:
       
   535                 import locale
       
   536                 category = getattr(locale, catstr)
       
   537                 orig_locale = locale.setlocale(category)
       
   538             except AttributeError:
       
   539                 # if the test author gives us an invalid category string
       
   540                 raise
       
   541             except:
       
   542                 # cannot retrieve original locale, so do nothing
       
   543                 locale = orig_locale = None
       
   544             else:
       
   545                 for loc in locales:
       
   546                     try:
       
   547                         locale.setlocale(category, loc)
       
   548                         break
       
   549                     except:
       
   550                         pass
       
   551 
       
   552             # now run the function, resetting the locale on exceptions
       
   553             try:
       
   554                 return func(*args, **kwds)
       
   555             finally:
       
   556                 if locale and orig_locale:
       
   557                     locale.setlocale(category, orig_locale)
       
   558         inner.func_name = func.func_name
       
   559         inner.__doc__ = func.__doc__
       
   560         return inner
       
   561     return decorator
       
   562 
       
   563 #=======================================================================
       
   564 # Big-memory-test support. Separate from 'resources' because memory use should be configurable.
       
   565 
       
   566 # Some handy shorthands. Note that these are used for byte-limits as well
       
   567 # as size-limits, in the various bigmem tests
       
   568 _1M = 1024*1024
       
   569 _1G = 1024 * _1M
       
   570 _2G = 2 * _1G
       
   571 _4G = 4 * _1G
       
   572 
       
   573 MAX_Py_ssize_t = sys.maxsize
       
   574 
       
   575 def set_memlimit(limit):
       
   576     import re
       
   577     global max_memuse
       
   578     global real_max_memuse
       
   579     sizes = {
       
   580         'k': 1024,
       
   581         'm': _1M,
       
   582         'g': _1G,
       
   583         't': 1024*_1G,
       
   584     }
       
   585     m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
       
   586                  re.IGNORECASE | re.VERBOSE)
       
   587     if m is None:
       
   588         raise ValueError('Invalid memory limit %r' % (limit,))
       
   589     memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
       
   590     real_max_memuse = memlimit
       
   591     if memlimit > MAX_Py_ssize_t:
       
   592         memlimit = MAX_Py_ssize_t
       
   593     if memlimit < _2G - 1:
       
   594         raise ValueError('Memory limit %r too low to be useful' % (limit,))
       
   595     max_memuse = memlimit
       
   596 
       
   597 def bigmemtest(minsize, memuse, overhead=5*_1M):
       
   598     """Decorator for bigmem tests.
       
   599 
       
   600     'minsize' is the minimum useful size for the test (in arbitrary,
       
   601     test-interpreted units.) 'memuse' is the number of 'bytes per size' for
       
   602     the test, or a good estimate of it. 'overhead' specifies fixed overhead,
       
   603     independent of the testsize, and defaults to 5Mb.
       
   604 
       
   605     The decorator tries to guess a good value for 'size' and passes it to
       
   606     the decorated test function. If minsize * memuse is more than the
       
   607     allowed memory use (as defined by max_memuse), the test is skipped.
       
   608     Otherwise, minsize is adjusted upward to use up to max_memuse.
       
   609     """
       
   610     def decorator(f):
       
   611         def wrapper(self):
       
   612             if not max_memuse:
       
   613                 # If max_memuse is 0 (the default),
       
   614                 # we still want to run the tests with size set to a few kb,
       
   615                 # to make sure they work. We still want to avoid using
       
   616                 # too much memory, though, but we do that noisily.
       
   617                 maxsize = 5147
       
   618                 self.failIf(maxsize * memuse + overhead > 20 * _1M)
       
   619             else:
       
   620                 maxsize = int((max_memuse - overhead) / memuse)
       
   621                 if maxsize < minsize:
       
   622                     # Really ought to print 'test skipped' or something
       
   623                     if verbose:
       
   624                         sys.stderr.write("Skipping %s because of memory "
       
   625                                          "constraint\n" % (f.__name__,))
       
   626                     return
       
   627                 # Try to keep some breathing room in memory use
       
   628                 maxsize = max(maxsize - 50 * _1M, minsize)
       
   629             return f(self, maxsize)
       
   630         wrapper.minsize = minsize
       
   631         wrapper.memuse = memuse
       
   632         wrapper.overhead = overhead
       
   633         return wrapper
       
   634     return decorator
       
   635 
       
   636 def precisionbigmemtest(size, memuse, overhead=5*_1M):
       
   637     def decorator(f):
       
   638         def wrapper(self):
       
   639             if not real_max_memuse:
       
   640                 maxsize = 5147
       
   641             else:
       
   642                 maxsize = size
       
   643 
       
   644                 if real_max_memuse and real_max_memuse < maxsize * memuse:
       
   645                     if verbose:
       
   646                         sys.stderr.write("Skipping %s because of memory "
       
   647                                          "constraint\n" % (f.__name__,))
       
   648                     return
       
   649 
       
   650             return f(self, maxsize)
       
   651         wrapper.size = size
       
   652         wrapper.memuse = memuse
       
   653         wrapper.overhead = overhead
       
   654         return wrapper
       
   655     return decorator
       
   656 
       
   657 def bigaddrspacetest(f):
       
   658     """Decorator for tests that fill the address space."""
       
   659     def wrapper(self):
       
   660         if max_memuse < MAX_Py_ssize_t:
       
   661             if verbose:
       
   662                 sys.stderr.write("Skipping %s because of memory "
       
   663                                  "constraint\n" % (f.__name__,))
       
   664         else:
       
   665             return f(self)
       
   666     return wrapper
       
   667 
       
   668 #=======================================================================
       
   669 # unittest integration.
       
   670 
       
   671 class BasicTestRunner:
       
   672     def run(self, test):
       
   673         result = unittest.TestResult()
       
   674         test(result)
       
   675         return result
       
   676 
       
   677 
       
   678 def _run_suite(suite):
       
   679     """Run tests from a unittest.TestSuite-derived class."""
       
   680     if verbose:
       
   681         runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
       
   682     else:
       
   683         runner = BasicTestRunner()
       
   684 
       
   685     result = runner.run(suite)
       
   686     if not result.wasSuccessful():
       
   687         if len(result.errors) == 1 and not result.failures:
       
   688             err = result.errors[0][1]
       
   689         elif len(result.failures) == 1 and not result.errors:
       
   690             err = result.failures[0][1]
       
   691         else:
       
   692             err = "errors occurred; run in verbose mode for details"
       
   693         raise TestFailed(err)
       
   694 
       
   695 
       
   696 def run_unittest(*classes):
       
   697     """Run tests from unittest.TestCase-derived classes."""
       
   698     valid_types = (unittest.TestSuite, unittest.TestCase)
       
   699     suite = unittest.TestSuite()
       
   700     for cls in classes:
       
   701         if isinstance(cls, str):
       
   702             if cls in sys.modules:
       
   703                 suite.addTest(unittest.findTestCases(sys.modules[cls]))
       
   704             else:
       
   705                 raise ValueError("str arguments must be keys in sys.modules")
       
   706         elif isinstance(cls, valid_types):
       
   707             suite.addTest(cls)
       
   708         else:
       
   709             suite.addTest(unittest.makeSuite(cls))
       
   710     _run_suite(suite)
       
   711 
       
   712 
       
   713 #=======================================================================
       
   714 # doctest driver.
       
   715 
       
   716 def run_doctest(module, verbosity=None):
       
   717     """Run doctest on the given module.  Return (#failures, #tests).
       
   718 
       
   719     If optional argument verbosity is not specified (or is None), pass
       
   720     test_support's belief about verbosity on to doctest.  Else doctest's
       
   721     usual behavior is used (it searches sys.argv for -v).
       
   722     """
       
   723 
       
   724     import doctest
       
   725 
       
   726     if verbosity is None:
       
   727         verbosity = verbose
       
   728     else:
       
   729         verbosity = None
       
   730 
       
   731     # Direct doctest output (normally just errors) to real stdout; doctest
       
   732     # output shouldn't be compared by regrtest.
       
   733     save_stdout = sys.stdout
       
   734     sys.stdout = get_original_stdout()
       
   735     try:
       
   736         f, t = doctest.testmod(module, verbose=verbosity)
       
   737         if f:
       
   738             raise TestFailed("%d of %d doctests failed" % (f, t))
       
   739     finally:
       
   740         sys.stdout = save_stdout
       
   741     if verbose:
       
   742         print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
       
   743     return f, t
       
   744 
       
   745 #=======================================================================
       
   746 # Threading support to prevent reporting refleaks when running regrtest.py -R
       
   747 
       
   748 def threading_setup():
       
   749     import threading
       
   750     return len(threading._active), len(threading._limbo)
       
   751 
       
   752 def threading_cleanup(num_active, num_limbo):
       
   753     import threading
       
   754     import time
       
   755 
       
   756     _MAX_COUNT = 10
       
   757     count = 0
       
   758     while len(threading._active) != num_active and count < _MAX_COUNT:
       
   759         count += 1
       
   760         time.sleep(0.1)
       
   761 
       
   762     count = 0
       
   763     while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
       
   764         count += 1
       
   765         time.sleep(0.1)
       
   766 
       
   767 def reap_children():
       
   768     """Use this function at the end of test_main() whenever sub-processes
       
   769     are started.  This will help ensure that no extra children (zombies)
       
   770     stick around to hog resources and create problems when looking
       
   771     for refleaks.
       
   772     """
       
   773 
       
   774     # Reap all our dead child processes so we don't leave zombies around.
       
   775     # These hog resources and might be causing some of the buildbots to die.
       
   776     if hasattr(os, 'waitpid'):
       
   777         any_process = -1
       
   778         while True:
       
   779             try:
       
   780                 # This will raise an exception on Windows.  That's ok.
       
   781                 pid, status = os.waitpid(any_process, os.WNOHANG)
       
   782                 if pid == 0:
       
   783                     break
       
   784             except:
       
   785                 break