diff -r ffa851df0825 -r 2fb8b9db1c86 symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_dummy_threading.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_dummy_threading.py Fri Jul 31 15:01:17 2009 +0100 @@ -0,0 +1,63 @@ +from test import test_support +import unittest +import dummy_threading as _threading +import time + +class DummyThreadingTestCase(unittest.TestCase): + + class TestThread(_threading.Thread): + + def run(self): + global running + global sema + global mutex + # Uncomment if testing another module, such as the real 'threading' + # module. + #delay = random.random() * 2 + delay = 0 + if test_support.verbose: + print 'task', self.name, 'will run for', delay, 'sec' + sema.acquire() + mutex.acquire() + running += 1 + if test_support.verbose: + print running, 'tasks are running' + mutex.release() + time.sleep(delay) + if test_support.verbose: + print 'task', self.name, 'done' + mutex.acquire() + running -= 1 + if test_support.verbose: + print self.name, 'is finished.', running, 'tasks are running' + mutex.release() + sema.release() + + def setUp(self): + self.numtasks = 10 + global sema + sema = _threading.BoundedSemaphore(value=3) + global mutex + mutex = _threading.RLock() + global running + running = 0 + self.threads = [] + + def test_tasks(self): + for i in range(self.numtasks): + t = self.TestThread(name=""%i) + self.threads.append(t) + t.start() + + if test_support.verbose: + print 'waiting for all tasks to complete' + for t in self.threads: + t.join() + if test_support.verbose: + print 'all tasks done' + +def test_main(): + test_support.run_unittest(DummyThreadingTestCase) + +if __name__ == '__main__': + test_main()