python-2.5.2/win32/Lib/test/test_queue.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 # Some simple Queue module tests, plus some failure conditions
       
     2 # to ensure the Queue locks remain stable.
       
     3 import Queue
       
     4 import sys
       
     5 import threading
       
     6 import time
       
     7 
       
     8 from test.test_support import verify, TestFailed, verbose
       
     9 
       
    10 QUEUE_SIZE = 5
       
    11 
       
    12 # A thread to run a function that unclogs a blocked Queue.
       
    13 class _TriggerThread(threading.Thread):
       
    14     def __init__(self, fn, args):
       
    15         self.fn = fn
       
    16         self.args = args
       
    17         self.startedEvent = threading.Event()
       
    18         threading.Thread.__init__(self)
       
    19 
       
    20     def run(self):
       
    21         # The sleep isn't necessary, but is intended to give the blocking
       
    22         # function in the main thread a chance at actually blocking before
       
    23         # we unclog it.  But if the sleep is longer than the timeout-based
       
    24         # tests wait in their blocking functions, those tests will fail.
       
    25         # So we give them much longer timeout values compared to the
       
    26         # sleep here (I aimed at 10 seconds for blocking functions --
       
    27         # they should never actually wait that long - they should make
       
    28         # progress as soon as we call self.fn()).
       
    29         time.sleep(0.1)
       
    30         self.startedEvent.set()
       
    31         self.fn(*self.args)
       
    32 
       
    33 # Execute a function that blocks, and in a separate thread, a function that
       
    34 # triggers the release.  Returns the result of the blocking function.
       
    35 # Caution:  block_func must guarantee to block until trigger_func is
       
    36 # called, and trigger_func must guarantee to change queue state so that
       
    37 # block_func can make enough progress to return.  In particular, a
       
    38 # block_func that just raises an exception regardless of whether trigger_func
       
    39 # is called will lead to timing-dependent sporadic failures, and one of
       
    40 # those went rarely seen but undiagnosed for years.  Now block_func
       
    41 # must be unexceptional.  If block_func is supposed to raise an exception,
       
    42 # call _doExceptionalBlockingTest() instead.
       
    43 def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
       
    44     t = _TriggerThread(trigger_func, trigger_args)
       
    45     t.start()
       
    46     result = block_func(*block_args)
       
    47     # If block_func returned before our thread made the call, we failed!
       
    48     if not t.startedEvent.isSet():
       
    49         raise TestFailed("blocking function '%r' appeared not to block" %
       
    50                          block_func)
       
    51     t.join(10) # make sure the thread terminates
       
    52     if t.isAlive():
       
    53         raise TestFailed("trigger function '%r' appeared to not return" %
       
    54                          trigger_func)
       
    55     return result
       
    56 
       
    57 # Call this instead if block_func is supposed to raise an exception.
       
    58 def _doExceptionalBlockingTest(block_func, block_args, trigger_func,
       
    59                                trigger_args, expected_exception_class):
       
    60     t = _TriggerThread(trigger_func, trigger_args)
       
    61     t.start()
       
    62     try:
       
    63         try:
       
    64             block_func(*block_args)
       
    65         except expected_exception_class:
       
    66             raise
       
    67         else:
       
    68             raise TestFailed("expected exception of kind %r" %
       
    69                              expected_exception_class)
       
    70     finally:
       
    71         t.join(10) # make sure the thread terminates
       
    72         if t.isAlive():
       
    73             raise TestFailed("trigger function '%r' appeared to not return" %
       
    74                              trigger_func)
       
    75         if not t.startedEvent.isSet():
       
    76             raise TestFailed("trigger thread ended but event never set")
       
    77 
       
    78 # A Queue subclass that can provoke failure at a moment's notice :)
       
    79 class FailingQueueException(Exception):
       
    80     pass
       
    81 
       
    82 class FailingQueue(Queue.Queue):
       
    83     def __init__(self, *args):
       
    84         self.fail_next_put = False
       
    85         self.fail_next_get = False
       
    86         Queue.Queue.__init__(self, *args)
       
    87     def _put(self, item):
       
    88         if self.fail_next_put:
       
    89             self.fail_next_put = False
       
    90             raise FailingQueueException, "You Lose"
       
    91         return Queue.Queue._put(self, item)
       
    92     def _get(self):
       
    93         if self.fail_next_get:
       
    94             self.fail_next_get = False
       
    95             raise FailingQueueException, "You Lose"
       
    96         return Queue.Queue._get(self)
       
    97 
       
    98 def FailingQueueTest(q):
       
    99     if not q.empty():
       
   100         raise RuntimeError, "Call this function with an empty queue"
       
   101     for i in range(QUEUE_SIZE-1):
       
   102         q.put(i)
       
   103     # Test a failing non-blocking put.
       
   104     q.fail_next_put = True
       
   105     try:
       
   106         q.put("oops", block=0)
       
   107         raise TestFailed("The queue didn't fail when it should have")
       
   108     except FailingQueueException:
       
   109         pass
       
   110     q.fail_next_put = True
       
   111     try:
       
   112         q.put("oops", timeout=0.1)
       
   113         raise TestFailed("The queue didn't fail when it should have")
       
   114     except FailingQueueException:
       
   115         pass
       
   116     q.put("last")
       
   117     verify(q.full(), "Queue should be full")
       
   118     # Test a failing blocking put
       
   119     q.fail_next_put = True
       
   120     try:
       
   121         _doBlockingTest(q.put, ("full",), q.get, ())
       
   122         raise TestFailed("The queue didn't fail when it should have")
       
   123     except FailingQueueException:
       
   124         pass
       
   125     # Check the Queue isn't damaged.
       
   126     # put failed, but get succeeded - re-add
       
   127     q.put("last")
       
   128     # Test a failing timeout put
       
   129     q.fail_next_put = True
       
   130     try:
       
   131         _doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (),
       
   132                                    FailingQueueException)
       
   133         raise TestFailed("The queue didn't fail when it should have")
       
   134     except FailingQueueException:
       
   135         pass
       
   136     # Check the Queue isn't damaged.
       
   137     # put failed, but get succeeded - re-add
       
   138     q.put("last")
       
   139     verify(q.full(), "Queue should be full")
       
   140     q.get()
       
   141     verify(not q.full(), "Queue should not be full")
       
   142     q.put("last")
       
   143     verify(q.full(), "Queue should be full")
       
   144     # Test a blocking put
       
   145     _doBlockingTest( q.put, ("full",), q.get, ())
       
   146     # Empty it
       
   147     for i in range(QUEUE_SIZE):
       
   148         q.get()
       
   149     verify(q.empty(), "Queue should be empty")
       
   150     q.put("first")
       
   151     q.fail_next_get = True
       
   152     try:
       
   153         q.get()
       
   154         raise TestFailed("The queue didn't fail when it should have")
       
   155     except FailingQueueException:
       
   156         pass
       
   157     verify(not q.empty(), "Queue should not be empty")
       
   158     q.fail_next_get = True
       
   159     try:
       
   160         q.get(timeout=0.1)
       
   161         raise TestFailed("The queue didn't fail when it should have")
       
   162     except FailingQueueException:
       
   163         pass
       
   164     verify(not q.empty(), "Queue should not be empty")
       
   165     q.get()
       
   166     verify(q.empty(), "Queue should be empty")
       
   167     q.fail_next_get = True
       
   168     try:
       
   169         _doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
       
   170                                    FailingQueueException)
       
   171         raise TestFailed("The queue didn't fail when it should have")
       
   172     except FailingQueueException:
       
   173         pass
       
   174     # put succeeded, but get failed.
       
   175     verify(not q.empty(), "Queue should not be empty")
       
   176     q.get()
       
   177     verify(q.empty(), "Queue should be empty")
       
   178 
       
   179 def SimpleQueueTest(q):
       
   180     if not q.empty():
       
   181         raise RuntimeError, "Call this function with an empty queue"
       
   182     # I guess we better check things actually queue correctly a little :)
       
   183     q.put(111)
       
   184     q.put(222)
       
   185     verify(q.get() == 111 and q.get() == 222,
       
   186            "Didn't seem to queue the correct data!")
       
   187     for i in range(QUEUE_SIZE-1):
       
   188         q.put(i)
       
   189         verify(not q.empty(), "Queue should not be empty")
       
   190     verify(not q.full(), "Queue should not be full")
       
   191     q.put("last")
       
   192     verify(q.full(), "Queue should be full")
       
   193     try:
       
   194         q.put("full", block=0)
       
   195         raise TestFailed("Didn't appear to block with a full queue")
       
   196     except Queue.Full:
       
   197         pass
       
   198     try:
       
   199         q.put("full", timeout=0.01)
       
   200         raise TestFailed("Didn't appear to time-out with a full queue")
       
   201     except Queue.Full:
       
   202         pass
       
   203     # Test a blocking put
       
   204     _doBlockingTest(q.put, ("full",), q.get, ())
       
   205     _doBlockingTest(q.put, ("full", True, 10), q.get, ())
       
   206     # Empty it
       
   207     for i in range(QUEUE_SIZE):
       
   208         q.get()
       
   209     verify(q.empty(), "Queue should be empty")
       
   210     try:
       
   211         q.get(block=0)
       
   212         raise TestFailed("Didn't appear to block with an empty queue")
       
   213     except Queue.Empty:
       
   214         pass
       
   215     try:
       
   216         q.get(timeout=0.01)
       
   217         raise TestFailed("Didn't appear to time-out with an empty queue")
       
   218     except Queue.Empty:
       
   219         pass
       
   220     # Test a blocking get
       
   221     _doBlockingTest(q.get, (), q.put, ('empty',))
       
   222     _doBlockingTest(q.get, (True, 10), q.put, ('empty',))
       
   223 
       
   224 cum = 0
       
   225 cumlock = threading.Lock()
       
   226 
       
   227 def worker(q):
       
   228     global cum
       
   229     while True:
       
   230         x = q.get()
       
   231         if x is None:
       
   232             q.task_done()
       
   233             return
       
   234         cumlock.acquire()
       
   235         try:
       
   236             cum += x
       
   237         finally:
       
   238             cumlock.release()
       
   239         q.task_done()
       
   240 
       
   241 def QueueJoinTest(q):
       
   242     global cum
       
   243     cum = 0
       
   244     for i in (0,1):
       
   245         threading.Thread(target=worker, args=(q,)).start()
       
   246     for i in xrange(100):
       
   247         q.put(i)
       
   248     q.join()
       
   249     verify(cum==sum(range(100)), "q.join() did not block until all tasks were done")
       
   250     for i in (0,1):
       
   251         q.put(None)         # instruct the threads to close
       
   252     q.join()                # verify that you can join twice
       
   253 
       
   254 def QueueTaskDoneTest(q):
       
   255     try:
       
   256         q.task_done()
       
   257     except ValueError:
       
   258         pass
       
   259     else:
       
   260         raise TestFailed("Did not detect task count going negative")
       
   261 
       
   262 def test():
       
   263     q = Queue.Queue()
       
   264     QueueTaskDoneTest(q)
       
   265     QueueJoinTest(q)
       
   266     QueueJoinTest(q)
       
   267     QueueTaskDoneTest(q)
       
   268 
       
   269     q = Queue.Queue(QUEUE_SIZE)
       
   270     # Do it a couple of times on the same queue
       
   271     SimpleQueueTest(q)
       
   272     SimpleQueueTest(q)
       
   273     if verbose:
       
   274         print "Simple Queue tests seemed to work"
       
   275     q = FailingQueue(QUEUE_SIZE)
       
   276     FailingQueueTest(q)
       
   277     FailingQueueTest(q)
       
   278     if verbose:
       
   279         print "Failing Queue tests seemed to work"
       
   280 
       
   281 test()