|
1 """Supporting definitions for the Python regression tests.""" |
|
2 |
|
3 if __name__ != 'test.test_support': |
|
4 raise ImportError('test_support must be imported from the test package') |
|
5 |
|
6 import contextlib |
|
7 import errno |
|
8 import socket |
|
9 import sys |
|
10 import os |
|
11 import shutil |
|
12 import warnings |
|
13 import unittest |
|
14 |
|
15 __all__ = ["Error", "TestFailed", "TestSkipped", "ResourceDenied", "import_module", |
|
16 "verbose", "use_resources", "max_memuse", "record_original_stdout", |
|
17 "get_original_stdout", "unload", "unlink", "rmtree", "forget", |
|
18 "is_resource_enabled", "requires", "find_unused_port", "bind_port", |
|
19 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ", |
|
20 "findfile", "verify", "vereq", "sortdict", "check_syntax_error", |
|
21 "open_urlresource", "check_warnings", "CleanImport", |
|
22 "EnvironmentVarGuard", "captured_output", |
|
23 "captured_stdout", "TransientResource", "transient_internet", |
|
24 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", |
|
25 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", |
|
26 "threading_cleanup", "reap_children"] |
|
27 |
|
28 class Error(Exception): |
|
29 """Base class for regression test exceptions.""" |
|
30 |
|
31 class TestFailed(Error): |
|
32 """Test failed.""" |
|
33 |
|
34 class TestSkipped(Error): |
|
35 """Test skipped. |
|
36 |
|
37 This can be raised to indicate that a test was deliberatly |
|
38 skipped, but not because a feature wasn't available. For |
|
39 example, if some resource can't be used, such as the network |
|
40 appears to be unavailable, this should be raised instead of |
|
41 TestFailed. |
|
42 """ |
|
43 |
|
44 class ResourceDenied(TestSkipped): |
|
45 """Test skipped because it requested a disallowed resource. |
|
46 |
|
47 This is raised when a test calls requires() for a resource that |
|
48 has not be enabled. It is used to distinguish between expected |
|
49 and unexpected skips. |
|
50 """ |
|
51 |
|
52 def import_module(name, deprecated=False): |
|
53 """Import the module to be tested, raising TestSkipped if it is not |
|
54 available.""" |
|
55 with warnings.catch_warnings(): |
|
56 if deprecated: |
|
57 warnings.filterwarnings("ignore", ".+ (module|package)", |
|
58 DeprecationWarning) |
|
59 try: |
|
60 module = __import__(name, level=0) |
|
61 except ImportError: |
|
62 raise TestSkipped("No module named " + name) |
|
63 else: |
|
64 return module |
|
65 |
|
66 verbose = 1 # Flag set to 0 by regrtest.py |
|
67 use_resources = None # Flag set to [] by regrtest.py |
|
68 max_memuse = 0 # Disable bigmem tests (they will still be run with |
|
69 # small sizes, to make sure they work.) |
|
70 real_max_memuse = 0 |
|
71 |
|
72 # _original_stdout is meant to hold stdout at the time regrtest began. |
|
73 # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. |
|
74 # The point is to have some flavor of stdout the user can actually see. |
|
75 _original_stdout = None |
|
76 def record_original_stdout(stdout): |
|
77 global _original_stdout |
|
78 _original_stdout = stdout |
|
79 |
|
80 def get_original_stdout(): |
|
81 return _original_stdout or sys.stdout |
|
82 |
|
83 def unload(name): |
|
84 try: |
|
85 del sys.modules[name] |
|
86 except KeyError: |
|
87 pass |
|
88 |
|
89 def unlink(filename): |
|
90 try: |
|
91 os.unlink(filename) |
|
92 except OSError: |
|
93 pass |
|
94 |
|
95 def rmtree(path): |
|
96 try: |
|
97 shutil.rmtree(path) |
|
98 except OSError, e: |
|
99 # Unix returns ENOENT, Windows returns ESRCH. |
|
100 if e.errno not in (errno.ENOENT, errno.ESRCH): |
|
101 raise |
|
102 |
|
103 def forget(modname): |
|
104 '''"Forget" a module was ever imported by removing it from sys.modules and |
|
105 deleting any .pyc and .pyo files.''' |
|
106 unload(modname) |
|
107 for dirname in sys.path: |
|
108 unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) |
|
109 # Deleting the .pyo file cannot be within the 'try' for the .pyc since |
|
110 # the chance exists that there is no .pyc (and thus the 'try' statement |
|
111 # is exited) but there is a .pyo file. |
|
112 unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) |
|
113 |
|
114 def is_resource_enabled(resource): |
|
115 """Test whether a resource is enabled. Known resources are set by |
|
116 regrtest.py.""" |
|
117 return use_resources is not None and resource in use_resources |
|
118 |
|
119 def requires(resource, msg=None): |
|
120 """Raise ResourceDenied if the specified resource is not available. |
|
121 |
|
122 If the caller's module is __main__ then automatically return True. The |
|
123 possibility of False being returned occurs when regrtest.py is executing.""" |
|
124 # see if the caller's module is __main__ - if so, treat as if |
|
125 # the resource was set |
|
126 if sys._getframe().f_back.f_globals.get("__name__") == "__main__": |
|
127 return |
|
128 if not is_resource_enabled(resource): |
|
129 if msg is None: |
|
130 msg = "Use of the `%s' resource not enabled" % resource |
|
131 raise ResourceDenied(msg) |
|
132 |
|
133 HOST = 'localhost' |
|
134 |
|
135 def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): |
|
136 """Returns an unused port that should be suitable for binding. This is |
|
137 achieved by creating a temporary socket with the same family and type as |
|
138 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to |
|
139 the specified host address (defaults to 0.0.0.0) with the port set to 0, |
|
140 eliciting an unused ephemeral port from the OS. The temporary socket is |
|
141 then closed and deleted, and the ephemeral port is returned. |
|
142 |
|
143 Either this method or bind_port() should be used for any tests where a |
|
144 server socket needs to be bound to a particular port for the duration of |
|
145 the test. Which one to use depends on whether the calling code is creating |
|
146 a python socket, or if an unused port needs to be provided in a constructor |
|
147 or passed to an external program (i.e. the -accept argument to openssl's |
|
148 s_server mode). Always prefer bind_port() over find_unused_port() where |
|
149 possible. Hard coded ports should *NEVER* be used. As soon as a server |
|
150 socket is bound to a hard coded port, the ability to run multiple instances |
|
151 of the test simultaneously on the same host is compromised, which makes the |
|
152 test a ticking time bomb in a buildbot environment. On Unix buildbots, this |
|
153 may simply manifest as a failed test, which can be recovered from without |
|
154 intervention in most cases, but on Windows, the entire python process can |
|
155 completely and utterly wedge, requiring someone to log in to the buildbot |
|
156 and manually kill the affected process. |
|
157 |
|
158 (This is easy to reproduce on Windows, unfortunately, and can be traced to |
|
159 the SO_REUSEADDR socket option having different semantics on Windows versus |
|
160 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, |
|
161 listen and then accept connections on identical host/ports. An EADDRINUSE |
|
162 socket.error will be raised at some point (depending on the platform and |
|
163 the order bind and listen were called on each socket). |
|
164 |
|
165 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE |
|
166 will ever be raised when attempting to bind two identical host/ports. When |
|
167 accept() is called on each socket, the second caller's process will steal |
|
168 the port from the first caller, leaving them both in an awkwardly wedged |
|
169 state where they'll no longer respond to any signals or graceful kills, and |
|
170 must be forcibly killed via OpenProcess()/TerminateProcess(). |
|
171 |
|
172 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option |
|
173 instead of SO_REUSEADDR, which effectively affords the same semantics as |
|
174 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open |
|
175 Source world compared to Windows ones, this is a common mistake. A quick |
|
176 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when |
|
177 openssl.exe is called with the 's_server' option, for example. See |
|
178 http://bugs.python.org/issue2550 for more info. The following site also |
|
179 has a very thorough description about the implications of both REUSEADDR |
|
180 and EXCLUSIVEADDRUSE on Windows: |
|
181 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) |
|
182 |
|
183 XXX: although this approach is a vast improvement on previous attempts to |
|
184 elicit unused ports, it rests heavily on the assumption that the ephemeral |
|
185 port returned to us by the OS won't immediately be dished back out to some |
|
186 other process when we close and delete our temporary socket but before our |
|
187 calling code has a chance to bind the returned port. We can deal with this |
|
188 issue if/when we come across it.""" |
|
189 tempsock = socket.socket(family, socktype) |
|
190 port = bind_port(tempsock) |
|
191 tempsock.close() |
|
192 del tempsock |
|
193 return port |
|
194 |
|
195 def bind_port(sock, host=HOST): |
|
196 """Bind the socket to a free port and return the port number. Relies on |
|
197 ephemeral ports in order to ensure we are using an unbound port. This is |
|
198 important as many tests may be running simultaneously, especially in a |
|
199 buildbot environment. This method raises an exception if the sock.family |
|
200 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR |
|
201 or SO_REUSEPORT set on it. Tests should *never* set these socket options |
|
202 for TCP/IP sockets. The only case for setting these options is testing |
|
203 multicasting via multiple UDP sockets. |
|
204 |
|
205 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. |
|
206 on Windows), it will be set on the socket. This will prevent anyone else |
|
207 from bind()'ing to our host/port for the duration of the test. |
|
208 """ |
|
209 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: |
|
210 if hasattr(socket, 'SO_REUSEADDR'): |
|
211 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: |
|
212 raise TestFailed("tests should never set the SO_REUSEADDR " \ |
|
213 "socket option on TCP/IP sockets!") |
|
214 if hasattr(socket, 'SO_REUSEPORT'): |
|
215 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: |
|
216 raise TestFailed("tests should never set the SO_REUSEPORT " \ |
|
217 "socket option on TCP/IP sockets!") |
|
218 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): |
|
219 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) |
|
220 |
|
221 sock.bind((host, 0)) |
|
222 port = sock.getsockname()[1] |
|
223 return port |
|
224 |
|
225 FUZZ = 1e-6 |
|
226 |
|
227 def fcmp(x, y): # fuzzy comparison function |
|
228 if isinstance(x, float) or isinstance(y, float): |
|
229 try: |
|
230 fuzz = (abs(x) + abs(y)) * FUZZ |
|
231 if abs(x-y) <= fuzz: |
|
232 return 0 |
|
233 except: |
|
234 pass |
|
235 elif type(x) == type(y) and isinstance(x, (tuple, list)): |
|
236 for i in range(min(len(x), len(y))): |
|
237 outcome = fcmp(x[i], y[i]) |
|
238 if outcome != 0: |
|
239 return outcome |
|
240 return (len(x) > len(y)) - (len(x) < len(y)) |
|
241 return (x > y) - (x < y) |
|
242 |
|
243 try: |
|
244 unicode |
|
245 have_unicode = True |
|
246 except NameError: |
|
247 have_unicode = False |
|
248 |
|
249 is_jython = sys.platform.startswith('java') |
|
250 |
|
251 # Filename used for testing |
|
252 if os.name == 'java': |
|
253 # Jython disallows @ in module names |
|
254 TESTFN = '$test' |
|
255 elif os.name == 'riscos': |
|
256 TESTFN = 'testfile' |
|
257 else: |
|
258 TESTFN = '@test' |
|
259 # Unicode name only used if TEST_FN_ENCODING exists for the platform. |
|
260 if have_unicode: |
|
261 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() |
|
262 # TESTFN_UNICODE is a filename that can be encoded using the |
|
263 # file system encoding, but *not* with the default (ascii) encoding |
|
264 if isinstance('', unicode): |
|
265 # python -U |
|
266 # XXX perhaps unicode() should accept Unicode strings? |
|
267 TESTFN_UNICODE = "@test-\xe0\xf2" |
|
268 else: |
|
269 # 2 latin characters. |
|
270 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") |
|
271 TESTFN_ENCODING = sys.getfilesystemencoding() |
|
272 # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be |
|
273 # able to be encoded by *either* the default or filesystem encoding. |
|
274 # This test really only makes sense on Windows NT platforms |
|
275 # which have special Unicode support in posixmodule. |
|
276 if (not hasattr(sys, "getwindowsversion") or |
|
277 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME |
|
278 TESTFN_UNICODE_UNENCODEABLE = None |
|
279 else: |
|
280 # Japanese characters (I think - from bug 846133) |
|
281 TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') |
|
282 try: |
|
283 # XXX - Note - should be using TESTFN_ENCODING here - but for |
|
284 # Windows, "mbcs" currently always operates as if in |
|
285 # errors=ignore' mode - hence we get '?' characters rather than |
|
286 # the exception. 'Latin1' operates as we expect - ie, fails. |
|
287 # See [ 850997 ] mbcs encoding ignores errors |
|
288 TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") |
|
289 except UnicodeEncodeError: |
|
290 pass |
|
291 else: |
|
292 print \ |
|
293 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ |
|
294 'Unicode filename tests may not be effective' \ |
|
295 % TESTFN_UNICODE_UNENCODEABLE |
|
296 |
|
297 # Make sure we can write to TESTFN, try in /tmp if we can't |
|
298 fp = None |
|
299 try: |
|
300 fp = open(TESTFN, 'w+') |
|
301 except IOError: |
|
302 TMP_TESTFN = os.path.join('/tmp', TESTFN) |
|
303 try: |
|
304 fp = open(TMP_TESTFN, 'w+') |
|
305 TESTFN = TMP_TESTFN |
|
306 del TMP_TESTFN |
|
307 except IOError: |
|
308 print ('WARNING: tests will fail, unable to write to: %s or %s' % |
|
309 (TESTFN, TMP_TESTFN)) |
|
310 if fp is not None: |
|
311 fp.close() |
|
312 unlink(TESTFN) |
|
313 del fp |
|
314 |
|
315 def findfile(file, here=__file__): |
|
316 """Try to find a file on sys.path and the working directory. If it is not |
|
317 found the argument passed to the function is returned (this does not |
|
318 necessarily signal failure; could still be the legitimate path).""" |
|
319 if os.path.isabs(file): |
|
320 return file |
|
321 path = sys.path |
|
322 path = [os.path.dirname(here)] + path |
|
323 for dn in path: |
|
324 fn = os.path.join(dn, file) |
|
325 if os.path.exists(fn): return fn |
|
326 return file |
|
327 |
|
328 def verify(condition, reason='test failed'): |
|
329 """Verify that condition is true. If not, raise TestFailed. |
|
330 |
|
331 The optional argument reason can be given to provide |
|
332 a better error text. |
|
333 """ |
|
334 |
|
335 if not condition: |
|
336 raise TestFailed(reason) |
|
337 |
|
338 def vereq(a, b): |
|
339 """Raise TestFailed if a == b is false. |
|
340 |
|
341 This is better than verify(a == b) because, in case of failure, the |
|
342 error message incorporates repr(a) and repr(b) so you can see the |
|
343 inputs. |
|
344 |
|
345 Note that "not (a == b)" isn't necessarily the same as "a != b"; the |
|
346 former is tested. |
|
347 """ |
|
348 |
|
349 if not (a == b): |
|
350 raise TestFailed("%r == %r" % (a, b)) |
|
351 |
|
352 def sortdict(dict): |
|
353 "Like repr(dict), but in sorted order." |
|
354 items = dict.items() |
|
355 items.sort() |
|
356 reprpairs = ["%r: %r" % pair for pair in items] |
|
357 withcommas = ", ".join(reprpairs) |
|
358 return "{%s}" % withcommas |
|
359 |
|
360 def check_syntax_error(testcase, statement): |
|
361 try: |
|
362 compile(statement, '<test string>', 'exec') |
|
363 except SyntaxError: |
|
364 pass |
|
365 else: |
|
366 testcase.fail('Missing SyntaxError: "%s"' % statement) |
|
367 |
|
368 def open_urlresource(url): |
|
369 import urllib, urlparse |
|
370 |
|
371 requires('urlfetch') |
|
372 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! |
|
373 |
|
374 for path in [os.path.curdir, os.path.pardir]: |
|
375 fn = os.path.join(path, filename) |
|
376 if os.path.exists(fn): |
|
377 return open(fn) |
|
378 |
|
379 print >> get_original_stdout(), '\tfetching %s ...' % url |
|
380 fn, _ = urllib.urlretrieve(url, filename) |
|
381 return open(fn) |
|
382 |
|
383 |
|
384 class WarningsRecorder(object): |
|
385 """Convenience wrapper for the warnings list returned on |
|
386 entry to the warnings.catch_warnings() context manager. |
|
387 """ |
|
388 def __init__(self, warnings_list): |
|
389 self.warnings = warnings_list |
|
390 |
|
391 def __getattr__(self, attr): |
|
392 if self.warnings: |
|
393 return getattr(self.warnings[-1], attr) |
|
394 elif attr in warnings.WarningMessage._WARNING_DETAILS: |
|
395 return None |
|
396 raise AttributeError("%r has no attribute %r" % (self, attr)) |
|
397 |
|
398 def reset(self): |
|
399 del self.warnings[:] |
|
400 |
|
401 @contextlib.contextmanager |
|
402 def check_warnings(): |
|
403 with warnings.catch_warnings(record=True) as w: |
|
404 yield WarningsRecorder(w) |
|
405 |
|
406 |
|
407 class CleanImport(object): |
|
408 """Context manager to force import to return a new module reference. |
|
409 |
|
410 This is useful for testing module-level behaviours, such as |
|
411 the emission of a DeprecationWarning on import. |
|
412 |
|
413 Use like this: |
|
414 |
|
415 with CleanImport("foo"): |
|
416 __import__("foo") # new reference |
|
417 """ |
|
418 |
|
419 def __init__(self, *module_names): |
|
420 self.original_modules = sys.modules.copy() |
|
421 for module_name in module_names: |
|
422 if module_name in sys.modules: |
|
423 module = sys.modules[module_name] |
|
424 # It is possible that module_name is just an alias for |
|
425 # another module (e.g. stub for modules renamed in 3.x). |
|
426 # In that case, we also need delete the real module to clear |
|
427 # the import cache. |
|
428 if module.__name__ != module_name: |
|
429 del sys.modules[module.__name__] |
|
430 del sys.modules[module_name] |
|
431 |
|
432 def __enter__(self): |
|
433 return self |
|
434 |
|
435 def __exit__(self, *ignore_exc): |
|
436 sys.modules.update(self.original_modules) |
|
437 |
|
438 |
|
439 class EnvironmentVarGuard(object): |
|
440 |
|
441 """Class to help protect the environment variable properly. Can be used as |
|
442 a context manager.""" |
|
443 |
|
444 def __init__(self): |
|
445 self._environ = os.environ |
|
446 self._unset = set() |
|
447 self._reset = dict() |
|
448 |
|
449 def set(self, envvar, value): |
|
450 if envvar not in self._environ: |
|
451 self._unset.add(envvar) |
|
452 else: |
|
453 self._reset[envvar] = self._environ[envvar] |
|
454 self._environ[envvar] = value |
|
455 |
|
456 def unset(self, envvar): |
|
457 if envvar in self._environ: |
|
458 self._reset[envvar] = self._environ[envvar] |
|
459 del self._environ[envvar] |
|
460 |
|
461 def __enter__(self): |
|
462 return self |
|
463 |
|
464 def __exit__(self, *ignore_exc): |
|
465 for envvar, value in self._reset.iteritems(): |
|
466 self._environ[envvar] = value |
|
467 for unset in self._unset: |
|
468 del self._environ[unset] |
|
469 |
|
470 class TransientResource(object): |
|
471 |
|
472 """Raise ResourceDenied if an exception is raised while the context manager |
|
473 is in effect that matches the specified exception and attributes.""" |
|
474 |
|
475 def __init__(self, exc, **kwargs): |
|
476 self.exc = exc |
|
477 self.attrs = kwargs |
|
478 |
|
479 def __enter__(self): |
|
480 return self |
|
481 |
|
482 def __exit__(self, type_=None, value=None, traceback=None): |
|
483 """If type_ is a subclass of self.exc and value has attributes matching |
|
484 self.attrs, raise ResourceDenied. Otherwise let the exception |
|
485 propagate (if any).""" |
|
486 if type_ is not None and issubclass(self.exc, type_): |
|
487 for attr, attr_value in self.attrs.iteritems(): |
|
488 if not hasattr(value, attr): |
|
489 break |
|
490 if getattr(value, attr) != attr_value: |
|
491 break |
|
492 else: |
|
493 raise ResourceDenied("an optional resource is not available") |
|
494 |
|
495 |
|
496 def transient_internet(): |
|
497 """Return a context manager that raises ResourceDenied when various issues |
|
498 with the Internet connection manifest themselves as exceptions.""" |
|
499 time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) |
|
500 socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) |
|
501 ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) |
|
502 return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) |
|
503 |
|
504 |
|
505 @contextlib.contextmanager |
|
506 def captured_output(stream_name): |
|
507 """Run the 'with' statement body using a StringIO object in place of a |
|
508 specific attribute on the sys module. |
|
509 Example use (with 'stream_name=stdout'):: |
|
510 |
|
511 with captured_stdout() as s: |
|
512 print "hello" |
|
513 assert s.getvalue() == "hello" |
|
514 """ |
|
515 import StringIO |
|
516 orig_stdout = getattr(sys, stream_name) |
|
517 setattr(sys, stream_name, StringIO.StringIO()) |
|
518 try: |
|
519 yield getattr(sys, stream_name) |
|
520 finally: |
|
521 setattr(sys, stream_name, orig_stdout) |
|
522 |
|
523 def captured_stdout(): |
|
524 return captured_output("stdout") |
|
525 |
|
526 |
|
527 #======================================================================= |
|
528 # Decorator for running a function in a different locale, correctly resetting |
|
529 # it afterwards. |
|
530 |
|
531 def run_with_locale(catstr, *locales): |
|
532 def decorator(func): |
|
533 def inner(*args, **kwds): |
|
534 try: |
|
535 import locale |
|
536 category = getattr(locale, catstr) |
|
537 orig_locale = locale.setlocale(category) |
|
538 except AttributeError: |
|
539 # if the test author gives us an invalid category string |
|
540 raise |
|
541 except: |
|
542 # cannot retrieve original locale, so do nothing |
|
543 locale = orig_locale = None |
|
544 else: |
|
545 for loc in locales: |
|
546 try: |
|
547 locale.setlocale(category, loc) |
|
548 break |
|
549 except: |
|
550 pass |
|
551 |
|
552 # now run the function, resetting the locale on exceptions |
|
553 try: |
|
554 return func(*args, **kwds) |
|
555 finally: |
|
556 if locale and orig_locale: |
|
557 locale.setlocale(category, orig_locale) |
|
558 inner.func_name = func.func_name |
|
559 inner.__doc__ = func.__doc__ |
|
560 return inner |
|
561 return decorator |
|
562 |
|
563 #======================================================================= |
|
564 # Big-memory-test support. Separate from 'resources' because memory use should be configurable. |
|
565 |
|
566 # Some handy shorthands. Note that these are used for byte-limits as well |
|
567 # as size-limits, in the various bigmem tests |
|
568 _1M = 1024*1024 |
|
569 _1G = 1024 * _1M |
|
570 _2G = 2 * _1G |
|
571 _4G = 4 * _1G |
|
572 |
|
573 MAX_Py_ssize_t = sys.maxsize |
|
574 |
|
575 def set_memlimit(limit): |
|
576 import re |
|
577 global max_memuse |
|
578 global real_max_memuse |
|
579 sizes = { |
|
580 'k': 1024, |
|
581 'm': _1M, |
|
582 'g': _1G, |
|
583 't': 1024*_1G, |
|
584 } |
|
585 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, |
|
586 re.IGNORECASE | re.VERBOSE) |
|
587 if m is None: |
|
588 raise ValueError('Invalid memory limit %r' % (limit,)) |
|
589 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) |
|
590 real_max_memuse = memlimit |
|
591 if memlimit > MAX_Py_ssize_t: |
|
592 memlimit = MAX_Py_ssize_t |
|
593 if memlimit < _2G - 1: |
|
594 raise ValueError('Memory limit %r too low to be useful' % (limit,)) |
|
595 max_memuse = memlimit |
|
596 |
|
597 def bigmemtest(minsize, memuse, overhead=5*_1M): |
|
598 """Decorator for bigmem tests. |
|
599 |
|
600 'minsize' is the minimum useful size for the test (in arbitrary, |
|
601 test-interpreted units.) 'memuse' is the number of 'bytes per size' for |
|
602 the test, or a good estimate of it. 'overhead' specifies fixed overhead, |
|
603 independent of the testsize, and defaults to 5Mb. |
|
604 |
|
605 The decorator tries to guess a good value for 'size' and passes it to |
|
606 the decorated test function. If minsize * memuse is more than the |
|
607 allowed memory use (as defined by max_memuse), the test is skipped. |
|
608 Otherwise, minsize is adjusted upward to use up to max_memuse. |
|
609 """ |
|
610 def decorator(f): |
|
611 def wrapper(self): |
|
612 if not max_memuse: |
|
613 # If max_memuse is 0 (the default), |
|
614 # we still want to run the tests with size set to a few kb, |
|
615 # to make sure they work. We still want to avoid using |
|
616 # too much memory, though, but we do that noisily. |
|
617 maxsize = 5147 |
|
618 self.failIf(maxsize * memuse + overhead > 20 * _1M) |
|
619 else: |
|
620 maxsize = int((max_memuse - overhead) / memuse) |
|
621 if maxsize < minsize: |
|
622 # Really ought to print 'test skipped' or something |
|
623 if verbose: |
|
624 sys.stderr.write("Skipping %s because of memory " |
|
625 "constraint\n" % (f.__name__,)) |
|
626 return |
|
627 # Try to keep some breathing room in memory use |
|
628 maxsize = max(maxsize - 50 * _1M, minsize) |
|
629 return f(self, maxsize) |
|
630 wrapper.minsize = minsize |
|
631 wrapper.memuse = memuse |
|
632 wrapper.overhead = overhead |
|
633 return wrapper |
|
634 return decorator |
|
635 |
|
636 def precisionbigmemtest(size, memuse, overhead=5*_1M): |
|
637 def decorator(f): |
|
638 def wrapper(self): |
|
639 if not real_max_memuse: |
|
640 maxsize = 5147 |
|
641 else: |
|
642 maxsize = size |
|
643 |
|
644 if real_max_memuse and real_max_memuse < maxsize * memuse: |
|
645 if verbose: |
|
646 sys.stderr.write("Skipping %s because of memory " |
|
647 "constraint\n" % (f.__name__,)) |
|
648 return |
|
649 |
|
650 return f(self, maxsize) |
|
651 wrapper.size = size |
|
652 wrapper.memuse = memuse |
|
653 wrapper.overhead = overhead |
|
654 return wrapper |
|
655 return decorator |
|
656 |
|
657 def bigaddrspacetest(f): |
|
658 """Decorator for tests that fill the address space.""" |
|
659 def wrapper(self): |
|
660 if max_memuse < MAX_Py_ssize_t: |
|
661 if verbose: |
|
662 sys.stderr.write("Skipping %s because of memory " |
|
663 "constraint\n" % (f.__name__,)) |
|
664 else: |
|
665 return f(self) |
|
666 return wrapper |
|
667 |
|
668 #======================================================================= |
|
669 # unittest integration. |
|
670 |
|
671 class BasicTestRunner: |
|
672 def run(self, test): |
|
673 result = unittest.TestResult() |
|
674 test(result) |
|
675 return result |
|
676 |
|
677 |
|
678 def _run_suite(suite): |
|
679 """Run tests from a unittest.TestSuite-derived class.""" |
|
680 if verbose: |
|
681 runner = unittest.TextTestRunner(sys.stdout, verbosity=2) |
|
682 else: |
|
683 runner = BasicTestRunner() |
|
684 |
|
685 result = runner.run(suite) |
|
686 if not result.wasSuccessful(): |
|
687 if len(result.errors) == 1 and not result.failures: |
|
688 err = result.errors[0][1] |
|
689 elif len(result.failures) == 1 and not result.errors: |
|
690 err = result.failures[0][1] |
|
691 else: |
|
692 err = "errors occurred; run in verbose mode for details" |
|
693 raise TestFailed(err) |
|
694 |
|
695 |
|
696 def run_unittest(*classes): |
|
697 """Run tests from unittest.TestCase-derived classes.""" |
|
698 valid_types = (unittest.TestSuite, unittest.TestCase) |
|
699 suite = unittest.TestSuite() |
|
700 for cls in classes: |
|
701 if isinstance(cls, str): |
|
702 if cls in sys.modules: |
|
703 suite.addTest(unittest.findTestCases(sys.modules[cls])) |
|
704 else: |
|
705 raise ValueError("str arguments must be keys in sys.modules") |
|
706 elif isinstance(cls, valid_types): |
|
707 suite.addTest(cls) |
|
708 else: |
|
709 suite.addTest(unittest.makeSuite(cls)) |
|
710 _run_suite(suite) |
|
711 |
|
712 |
|
713 #======================================================================= |
|
714 # doctest driver. |
|
715 |
|
716 def run_doctest(module, verbosity=None): |
|
717 """Run doctest on the given module. Return (#failures, #tests). |
|
718 |
|
719 If optional argument verbosity is not specified (or is None), pass |
|
720 test_support's belief about verbosity on to doctest. Else doctest's |
|
721 usual behavior is used (it searches sys.argv for -v). |
|
722 """ |
|
723 |
|
724 import doctest |
|
725 |
|
726 if verbosity is None: |
|
727 verbosity = verbose |
|
728 else: |
|
729 verbosity = None |
|
730 |
|
731 # Direct doctest output (normally just errors) to real stdout; doctest |
|
732 # output shouldn't be compared by regrtest. |
|
733 save_stdout = sys.stdout |
|
734 sys.stdout = get_original_stdout() |
|
735 try: |
|
736 f, t = doctest.testmod(module, verbose=verbosity) |
|
737 if f: |
|
738 raise TestFailed("%d of %d doctests failed" % (f, t)) |
|
739 finally: |
|
740 sys.stdout = save_stdout |
|
741 if verbose: |
|
742 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) |
|
743 return f, t |
|
744 |
|
745 #======================================================================= |
|
746 # Threading support to prevent reporting refleaks when running regrtest.py -R |
|
747 |
|
748 def threading_setup(): |
|
749 import threading |
|
750 return len(threading._active), len(threading._limbo) |
|
751 |
|
752 def threading_cleanup(num_active, num_limbo): |
|
753 import threading |
|
754 import time |
|
755 |
|
756 _MAX_COUNT = 10 |
|
757 count = 0 |
|
758 while len(threading._active) != num_active and count < _MAX_COUNT: |
|
759 count += 1 |
|
760 time.sleep(0.1) |
|
761 |
|
762 count = 0 |
|
763 while len(threading._limbo) != num_limbo and count < _MAX_COUNT: |
|
764 count += 1 |
|
765 time.sleep(0.1) |
|
766 |
|
767 def reap_children(): |
|
768 """Use this function at the end of test_main() whenever sub-processes |
|
769 are started. This will help ensure that no extra children (zombies) |
|
770 stick around to hog resources and create problems when looking |
|
771 for refleaks. |
|
772 """ |
|
773 |
|
774 # Reap all our dead child processes so we don't leave zombies around. |
|
775 # These hog resources and might be causing some of the buildbots to die. |
|
776 if hasattr(os, 'waitpid'): |
|
777 any_process = -1 |
|
778 while True: |
|
779 try: |
|
780 # This will raise an exception on Windows. That's ok. |
|
781 pid, status = os.waitpid(any_process, os.WNOHANG) |
|
782 if pid == 0: |
|
783 break |
|
784 except: |
|
785 break |