python-2.5.2/win32/Lib/test/test_threading.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 # Very rudimentary test of threading module
       
     2 
       
     3 import test.test_support
       
     4 from test.test_support import verbose
       
     5 import random
       
     6 import sys
       
     7 import threading
       
     8 import thread
       
     9 import time
       
    10 import unittest
       
    11 
       
    12 # A trivial mutable counter.
       
    13 class Counter(object):
       
    14     def __init__(self):
       
    15         self.value = 0
       
    16     def inc(self):
       
    17         self.value += 1
       
    18     def dec(self):
       
    19         self.value -= 1
       
    20     def get(self):
       
    21         return self.value
       
    22 
       
    23 class TestThread(threading.Thread):
       
    24     def __init__(self, name, testcase, sema, mutex, nrunning):
       
    25         threading.Thread.__init__(self, name=name)
       
    26         self.testcase = testcase
       
    27         self.sema = sema
       
    28         self.mutex = mutex
       
    29         self.nrunning = nrunning
       
    30 
       
    31     def run(self):
       
    32         delay = random.random() * 2
       
    33         if verbose:
       
    34             print 'task', self.getName(), 'will run for', delay, 'sec'
       
    35 
       
    36         self.sema.acquire()
       
    37 
       
    38         self.mutex.acquire()
       
    39         self.nrunning.inc()
       
    40         if verbose:
       
    41             print self.nrunning.get(), 'tasks are running'
       
    42         self.testcase.assert_(self.nrunning.get() <= 3)
       
    43         self.mutex.release()
       
    44 
       
    45         time.sleep(delay)
       
    46         if verbose:
       
    47             print 'task', self.getName(), 'done'
       
    48 
       
    49         self.mutex.acquire()
       
    50         self.nrunning.dec()
       
    51         self.testcase.assert_(self.nrunning.get() >= 0)
       
    52         if verbose:
       
    53             print self.getName(), 'is finished.', self.nrunning.get(), \
       
    54                   'tasks are running'
       
    55         self.mutex.release()
       
    56 
       
    57         self.sema.release()
       
    58 
       
    59 class ThreadTests(unittest.TestCase):
       
    60 
       
    61     # Create a bunch of threads, let each do some work, wait until all are
       
    62     # done.
       
    63     def test_various_ops(self):
       
    64         # This takes about n/3 seconds to run (about n/3 clumps of tasks,
       
    65         # times about 1 second per clump).
       
    66         NUMTASKS = 10
       
    67 
       
    68         # no more than 3 of the 10 can run at once
       
    69         sema = threading.BoundedSemaphore(value=3)
       
    70         mutex = threading.RLock()
       
    71         numrunning = Counter()
       
    72 
       
    73         threads = []
       
    74 
       
    75         for i in range(NUMTASKS):
       
    76             t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
       
    77             threads.append(t)
       
    78             t.start()
       
    79 
       
    80         if verbose:
       
    81             print 'waiting for all tasks to complete'
       
    82         for t in threads:
       
    83             t.join(NUMTASKS)
       
    84             self.assert_(not t.isAlive())
       
    85         if verbose:
       
    86             print 'all tasks done'
       
    87         self.assertEqual(numrunning.get(), 0)
       
    88 
       
    89     # run with a small(ish) thread stack size (256kB)
       
    90     def test_various_ops_small_stack(self):
       
    91         if verbose:
       
    92             print 'with 256kB thread stack size...'
       
    93         try:
       
    94             threading.stack_size(262144)
       
    95         except thread.error:
       
    96             if verbose:
       
    97                 print 'platform does not support changing thread stack size'
       
    98             return
       
    99         self.test_various_ops()
       
   100         threading.stack_size(0)
       
   101 
       
   102     # run with a large thread stack size (1MB)
       
   103     def test_various_ops_large_stack(self):
       
   104         if verbose:
       
   105             print 'with 1MB thread stack size...'
       
   106         try:
       
   107             threading.stack_size(0x100000)
       
   108         except thread.error:
       
   109             if verbose:
       
   110                 print 'platform does not support changing thread stack size'
       
   111             return
       
   112         self.test_various_ops()
       
   113         threading.stack_size(0)
       
   114 
       
   115     def test_foreign_thread(self):
       
   116         # Check that a "foreign" thread can use the threading module.
       
   117         def f(mutex):
       
   118             # Acquiring an RLock forces an entry for the foreign
       
   119             # thread to get made in the threading._active map.
       
   120             r = threading.RLock()
       
   121             r.acquire()
       
   122             r.release()
       
   123             mutex.release()
       
   124 
       
   125         mutex = threading.Lock()
       
   126         mutex.acquire()
       
   127         tid = thread.start_new_thread(f, (mutex,))
       
   128         # Wait for the thread to finish.
       
   129         mutex.acquire()
       
   130         self.assert_(tid in threading._active)
       
   131         self.assert_(isinstance(threading._active[tid],
       
   132                                 threading._DummyThread))
       
   133         del threading._active[tid]
       
   134 
       
   135     # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
       
   136     # exposed at the Python level.  This test relies on ctypes to get at it.
       
   137     def test_PyThreadState_SetAsyncExc(self):
       
   138         try:
       
   139             import ctypes
       
   140         except ImportError:
       
   141             if verbose:
       
   142                 print "test_PyThreadState_SetAsyncExc can't import ctypes"
       
   143             return  # can't do anything
       
   144 
       
   145         set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
       
   146 
       
   147         class AsyncExc(Exception):
       
   148             pass
       
   149 
       
   150         exception = ctypes.py_object(AsyncExc)
       
   151 
       
   152         # `worker_started` is set by the thread when it's inside a try/except
       
   153         # block waiting to catch the asynchronously set AsyncExc exception.
       
   154         # `worker_saw_exception` is set by the thread upon catching that
       
   155         # exception.
       
   156         worker_started = threading.Event()
       
   157         worker_saw_exception = threading.Event()
       
   158 
       
   159         class Worker(threading.Thread):
       
   160             def run(self):
       
   161                 self.id = thread.get_ident()
       
   162                 self.finished = False
       
   163 
       
   164                 try:
       
   165                     while True:
       
   166                         worker_started.set()
       
   167                         time.sleep(0.1)
       
   168                 except AsyncExc:
       
   169                     self.finished = True
       
   170                     worker_saw_exception.set()
       
   171 
       
   172         t = Worker()
       
   173         t.setDaemon(True) # so if this fails, we don't hang Python at shutdown
       
   174         t.start()
       
   175         if verbose:
       
   176             print "    started worker thread"
       
   177 
       
   178         # Try a thread id that doesn't make sense.
       
   179         if verbose:
       
   180             print "    trying nonsensical thread id"
       
   181         result = set_async_exc(ctypes.c_long(-1), exception)
       
   182         self.assertEqual(result, 0)  # no thread states modified
       
   183 
       
   184         # Now raise an exception in the worker thread.
       
   185         if verbose:
       
   186             print "    waiting for worker thread to get started"
       
   187         worker_started.wait()
       
   188         if verbose:
       
   189             print "    verifying worker hasn't exited"
       
   190         self.assert_(not t.finished)
       
   191         if verbose:
       
   192             print "    attempting to raise asynch exception in worker"
       
   193         result = set_async_exc(ctypes.c_long(t.id), exception)
       
   194         self.assertEqual(result, 1) # one thread state modified
       
   195         if verbose:
       
   196             print "    waiting for worker to say it caught the exception"
       
   197         worker_saw_exception.wait(timeout=10)
       
   198         self.assert_(t.finished)
       
   199         if verbose:
       
   200             print "    all OK -- joining worker"
       
   201         if t.finished:
       
   202             t.join()
       
   203         # else the thread is still running, and we have no way to kill it
       
   204 
       
   205     def test_enumerate_after_join(self):
       
   206         # Try hard to trigger #1703448: a thread is still returned in
       
   207         # threading.enumerate() after it has been join()ed.
       
   208         enum = threading.enumerate
       
   209         old_interval = sys.getcheckinterval()
       
   210         sys.setcheckinterval(1)
       
   211         try:
       
   212             for i in xrange(1, 1000):
       
   213                 t = threading.Thread(target=lambda: None)
       
   214                 t.start()
       
   215                 t.join()
       
   216                 l = enum()
       
   217                 self.assertFalse(t in l,
       
   218                     "#1703448 triggered after %d trials: %s" % (i, l))
       
   219         finally:
       
   220             sys.setcheckinterval(old_interval)
       
   221 
       
   222 
       
   223 def test_main():
       
   224     test.test_support.run_unittest(ThreadTests)
       
   225 
       
   226 if __name__ == "__main__":
       
   227     test_main()