|
1 #!/usr/bin/env python |
|
2 |
|
3 # |
|
4 # Unit tests for the multiprocessing package |
|
5 # |
|
6 |
|
7 import unittest |
|
8 import threading |
|
9 import Queue |
|
10 import time |
|
11 import sys |
|
12 import os |
|
13 import gc |
|
14 import signal |
|
15 import array |
|
16 import copy |
|
17 import socket |
|
18 import random |
|
19 import logging |
|
20 |
|
21 |
|
22 # Work around broken sem_open implementations |
|
23 try: |
|
24 import multiprocessing.synchronize |
|
25 except ImportError, e: |
|
26 from test.test_support import TestSkipped |
|
27 raise TestSkipped(e) |
|
28 |
|
29 import multiprocessing.dummy |
|
30 import multiprocessing.connection |
|
31 import multiprocessing.managers |
|
32 import multiprocessing.heap |
|
33 import multiprocessing.pool |
|
34 import _multiprocessing |
|
35 |
|
36 from multiprocessing import util |
|
37 |
|
38 # |
|
39 # |
|
40 # |
|
41 |
|
42 latin = str |
|
43 |
|
44 # |
|
45 # Constants |
|
46 # |
|
47 |
|
48 LOG_LEVEL = util.SUBWARNING |
|
49 #LOG_LEVEL = logging.WARNING |
|
50 |
|
51 DELTA = 0.1 |
|
52 CHECK_TIMINGS = False # making true makes tests take a lot longer |
|
53 # and can sometimes cause some non-serious |
|
54 # failures because some calls block a bit |
|
55 # longer than expected |
|
56 if CHECK_TIMINGS: |
|
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 |
|
58 else: |
|
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 |
|
60 |
|
61 HAVE_GETVALUE = not getattr(_multiprocessing, |
|
62 'HAVE_BROKEN_SEM_GETVALUE', False) |
|
63 |
|
64 # |
|
65 # Creates a wrapper for a function which records the time it takes to finish |
|
66 # |
|
67 |
|
68 class TimingWrapper(object): |
|
69 |
|
70 def __init__(self, func): |
|
71 self.func = func |
|
72 self.elapsed = None |
|
73 |
|
74 def __call__(self, *args, **kwds): |
|
75 t = time.time() |
|
76 try: |
|
77 return self.func(*args, **kwds) |
|
78 finally: |
|
79 self.elapsed = time.time() - t |
|
80 |
|
81 # |
|
82 # Base class for test cases |
|
83 # |
|
84 |
|
85 class BaseTestCase(object): |
|
86 |
|
87 ALLOWED_TYPES = ('processes', 'manager', 'threads') |
|
88 |
|
89 def assertTimingAlmostEqual(self, a, b): |
|
90 if CHECK_TIMINGS: |
|
91 self.assertAlmostEqual(a, b, 1) |
|
92 |
|
93 def assertReturnsIfImplemented(self, value, func, *args): |
|
94 try: |
|
95 res = func(*args) |
|
96 except NotImplementedError: |
|
97 pass |
|
98 else: |
|
99 return self.assertEqual(value, res) |
|
100 |
|
101 # |
|
102 # Return the value of a semaphore |
|
103 # |
|
104 |
|
105 def get_value(self): |
|
106 try: |
|
107 return self.get_value() |
|
108 except AttributeError: |
|
109 try: |
|
110 return self._Semaphore__value |
|
111 except AttributeError: |
|
112 try: |
|
113 return self._value |
|
114 except AttributeError: |
|
115 raise NotImplementedError |
|
116 |
|
117 # |
|
118 # Testcases |
|
119 # |
|
120 |
|
121 class _TestProcess(BaseTestCase): |
|
122 |
|
123 ALLOWED_TYPES = ('processes', 'threads') |
|
124 |
|
125 def test_current(self): |
|
126 if self.TYPE == 'threads': |
|
127 return |
|
128 |
|
129 current = self.current_process() |
|
130 authkey = current.authkey |
|
131 |
|
132 self.assertTrue(current.is_alive()) |
|
133 self.assertTrue(not current.daemon) |
|
134 self.assertTrue(isinstance(authkey, bytes)) |
|
135 self.assertTrue(len(authkey) > 0) |
|
136 self.assertEqual(current.ident, os.getpid()) |
|
137 self.assertEqual(current.exitcode, None) |
|
138 |
|
139 def _test(self, q, *args, **kwds): |
|
140 current = self.current_process() |
|
141 q.put(args) |
|
142 q.put(kwds) |
|
143 q.put(current.name) |
|
144 if self.TYPE != 'threads': |
|
145 q.put(bytes(current.authkey)) |
|
146 q.put(current.pid) |
|
147 |
|
148 def test_process(self): |
|
149 q = self.Queue(1) |
|
150 e = self.Event() |
|
151 args = (q, 1, 2) |
|
152 kwargs = {'hello':23, 'bye':2.54} |
|
153 name = 'SomeProcess' |
|
154 p = self.Process( |
|
155 target=self._test, args=args, kwargs=kwargs, name=name |
|
156 ) |
|
157 p.daemon = True |
|
158 current = self.current_process() |
|
159 |
|
160 if self.TYPE != 'threads': |
|
161 self.assertEquals(p.authkey, current.authkey) |
|
162 self.assertEquals(p.is_alive(), False) |
|
163 self.assertEquals(p.daemon, True) |
|
164 self.assertTrue(p not in self.active_children()) |
|
165 self.assertTrue(type(self.active_children()) is list) |
|
166 self.assertEqual(p.exitcode, None) |
|
167 |
|
168 p.start() |
|
169 |
|
170 self.assertEquals(p.exitcode, None) |
|
171 self.assertEquals(p.is_alive(), True) |
|
172 self.assertTrue(p in self.active_children()) |
|
173 |
|
174 self.assertEquals(q.get(), args[1:]) |
|
175 self.assertEquals(q.get(), kwargs) |
|
176 self.assertEquals(q.get(), p.name) |
|
177 if self.TYPE != 'threads': |
|
178 self.assertEquals(q.get(), current.authkey) |
|
179 self.assertEquals(q.get(), p.pid) |
|
180 |
|
181 p.join() |
|
182 |
|
183 self.assertEquals(p.exitcode, 0) |
|
184 self.assertEquals(p.is_alive(), False) |
|
185 self.assertTrue(p not in self.active_children()) |
|
186 |
|
187 def _test_terminate(self): |
|
188 time.sleep(1000) |
|
189 |
|
190 def test_terminate(self): |
|
191 if self.TYPE == 'threads': |
|
192 return |
|
193 |
|
194 p = self.Process(target=self._test_terminate) |
|
195 p.daemon = True |
|
196 p.start() |
|
197 |
|
198 self.assertEqual(p.is_alive(), True) |
|
199 self.assertTrue(p in self.active_children()) |
|
200 self.assertEqual(p.exitcode, None) |
|
201 |
|
202 p.terminate() |
|
203 |
|
204 join = TimingWrapper(p.join) |
|
205 self.assertEqual(join(), None) |
|
206 self.assertTimingAlmostEqual(join.elapsed, 0.0) |
|
207 |
|
208 self.assertEqual(p.is_alive(), False) |
|
209 self.assertTrue(p not in self.active_children()) |
|
210 |
|
211 p.join() |
|
212 |
|
213 # XXX sometimes get p.exitcode == 0 on Windows ... |
|
214 #self.assertEqual(p.exitcode, -signal.SIGTERM) |
|
215 |
|
216 def test_cpu_count(self): |
|
217 try: |
|
218 cpus = multiprocessing.cpu_count() |
|
219 except NotImplementedError: |
|
220 cpus = 1 |
|
221 self.assertTrue(type(cpus) is int) |
|
222 self.assertTrue(cpus >= 1) |
|
223 |
|
224 def test_active_children(self): |
|
225 self.assertEqual(type(self.active_children()), list) |
|
226 |
|
227 p = self.Process(target=time.sleep, args=(DELTA,)) |
|
228 self.assertTrue(p not in self.active_children()) |
|
229 |
|
230 p.start() |
|
231 self.assertTrue(p in self.active_children()) |
|
232 |
|
233 p.join() |
|
234 self.assertTrue(p not in self.active_children()) |
|
235 |
|
236 def _test_recursion(self, wconn, id): |
|
237 from multiprocessing import forking |
|
238 wconn.send(id) |
|
239 if len(id) < 2: |
|
240 for i in range(2): |
|
241 p = self.Process( |
|
242 target=self._test_recursion, args=(wconn, id+[i]) |
|
243 ) |
|
244 p.start() |
|
245 p.join() |
|
246 |
|
247 def test_recursion(self): |
|
248 rconn, wconn = self.Pipe(duplex=False) |
|
249 self._test_recursion(wconn, []) |
|
250 |
|
251 time.sleep(DELTA) |
|
252 result = [] |
|
253 while rconn.poll(): |
|
254 result.append(rconn.recv()) |
|
255 |
|
256 expected = [ |
|
257 [], |
|
258 [0], |
|
259 [0, 0], |
|
260 [0, 1], |
|
261 [1], |
|
262 [1, 0], |
|
263 [1, 1] |
|
264 ] |
|
265 self.assertEqual(result, expected) |
|
266 |
|
267 # |
|
268 # |
|
269 # |
|
270 |
|
271 class _UpperCaser(multiprocessing.Process): |
|
272 |
|
273 def __init__(self): |
|
274 multiprocessing.Process.__init__(self) |
|
275 self.child_conn, self.parent_conn = multiprocessing.Pipe() |
|
276 |
|
277 def run(self): |
|
278 self.parent_conn.close() |
|
279 for s in iter(self.child_conn.recv, None): |
|
280 self.child_conn.send(s.upper()) |
|
281 self.child_conn.close() |
|
282 |
|
283 def submit(self, s): |
|
284 assert type(s) is str |
|
285 self.parent_conn.send(s) |
|
286 return self.parent_conn.recv() |
|
287 |
|
288 def stop(self): |
|
289 self.parent_conn.send(None) |
|
290 self.parent_conn.close() |
|
291 self.child_conn.close() |
|
292 |
|
293 class _TestSubclassingProcess(BaseTestCase): |
|
294 |
|
295 ALLOWED_TYPES = ('processes',) |
|
296 |
|
297 def test_subclassing(self): |
|
298 uppercaser = _UpperCaser() |
|
299 uppercaser.start() |
|
300 self.assertEqual(uppercaser.submit('hello'), 'HELLO') |
|
301 self.assertEqual(uppercaser.submit('world'), 'WORLD') |
|
302 uppercaser.stop() |
|
303 uppercaser.join() |
|
304 |
|
305 # |
|
306 # |
|
307 # |
|
308 |
|
309 def queue_empty(q): |
|
310 if hasattr(q, 'empty'): |
|
311 return q.empty() |
|
312 else: |
|
313 return q.qsize() == 0 |
|
314 |
|
315 def queue_full(q, maxsize): |
|
316 if hasattr(q, 'full'): |
|
317 return q.full() |
|
318 else: |
|
319 return q.qsize() == maxsize |
|
320 |
|
321 |
|
322 class _TestQueue(BaseTestCase): |
|
323 |
|
324 |
|
325 def _test_put(self, queue, child_can_start, parent_can_continue): |
|
326 child_can_start.wait() |
|
327 for i in range(6): |
|
328 queue.get() |
|
329 parent_can_continue.set() |
|
330 |
|
331 def test_put(self): |
|
332 MAXSIZE = 6 |
|
333 queue = self.Queue(maxsize=MAXSIZE) |
|
334 child_can_start = self.Event() |
|
335 parent_can_continue = self.Event() |
|
336 |
|
337 proc = self.Process( |
|
338 target=self._test_put, |
|
339 args=(queue, child_can_start, parent_can_continue) |
|
340 ) |
|
341 proc.daemon = True |
|
342 proc.start() |
|
343 |
|
344 self.assertEqual(queue_empty(queue), True) |
|
345 self.assertEqual(queue_full(queue, MAXSIZE), False) |
|
346 |
|
347 queue.put(1) |
|
348 queue.put(2, True) |
|
349 queue.put(3, True, None) |
|
350 queue.put(4, False) |
|
351 queue.put(5, False, None) |
|
352 queue.put_nowait(6) |
|
353 |
|
354 # the values may be in buffer but not yet in pipe so sleep a bit |
|
355 time.sleep(DELTA) |
|
356 |
|
357 self.assertEqual(queue_empty(queue), False) |
|
358 self.assertEqual(queue_full(queue, MAXSIZE), True) |
|
359 |
|
360 put = TimingWrapper(queue.put) |
|
361 put_nowait = TimingWrapper(queue.put_nowait) |
|
362 |
|
363 self.assertRaises(Queue.Full, put, 7, False) |
|
364 self.assertTimingAlmostEqual(put.elapsed, 0) |
|
365 |
|
366 self.assertRaises(Queue.Full, put, 7, False, None) |
|
367 self.assertTimingAlmostEqual(put.elapsed, 0) |
|
368 |
|
369 self.assertRaises(Queue.Full, put_nowait, 7) |
|
370 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) |
|
371 |
|
372 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1) |
|
373 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) |
|
374 |
|
375 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2) |
|
376 self.assertTimingAlmostEqual(put.elapsed, 0) |
|
377 |
|
378 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3) |
|
379 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) |
|
380 |
|
381 child_can_start.set() |
|
382 parent_can_continue.wait() |
|
383 |
|
384 self.assertEqual(queue_empty(queue), True) |
|
385 self.assertEqual(queue_full(queue, MAXSIZE), False) |
|
386 |
|
387 proc.join() |
|
388 |
|
389 def _test_get(self, queue, child_can_start, parent_can_continue): |
|
390 child_can_start.wait() |
|
391 #queue.put(1) |
|
392 queue.put(2) |
|
393 queue.put(3) |
|
394 queue.put(4) |
|
395 queue.put(5) |
|
396 parent_can_continue.set() |
|
397 |
|
398 def test_get(self): |
|
399 queue = self.Queue() |
|
400 child_can_start = self.Event() |
|
401 parent_can_continue = self.Event() |
|
402 |
|
403 proc = self.Process( |
|
404 target=self._test_get, |
|
405 args=(queue, child_can_start, parent_can_continue) |
|
406 ) |
|
407 proc.daemon = True |
|
408 proc.start() |
|
409 |
|
410 self.assertEqual(queue_empty(queue), True) |
|
411 |
|
412 child_can_start.set() |
|
413 parent_can_continue.wait() |
|
414 |
|
415 time.sleep(DELTA) |
|
416 self.assertEqual(queue_empty(queue), False) |
|
417 |
|
418 # Hangs unexpectedly, remove for now |
|
419 #self.assertEqual(queue.get(), 1) |
|
420 self.assertEqual(queue.get(True, None), 2) |
|
421 self.assertEqual(queue.get(True), 3) |
|
422 self.assertEqual(queue.get(timeout=1), 4) |
|
423 self.assertEqual(queue.get_nowait(), 5) |
|
424 |
|
425 self.assertEqual(queue_empty(queue), True) |
|
426 |
|
427 get = TimingWrapper(queue.get) |
|
428 get_nowait = TimingWrapper(queue.get_nowait) |
|
429 |
|
430 self.assertRaises(Queue.Empty, get, False) |
|
431 self.assertTimingAlmostEqual(get.elapsed, 0) |
|
432 |
|
433 self.assertRaises(Queue.Empty, get, False, None) |
|
434 self.assertTimingAlmostEqual(get.elapsed, 0) |
|
435 |
|
436 self.assertRaises(Queue.Empty, get_nowait) |
|
437 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) |
|
438 |
|
439 self.assertRaises(Queue.Empty, get, True, TIMEOUT1) |
|
440 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) |
|
441 |
|
442 self.assertRaises(Queue.Empty, get, False, TIMEOUT2) |
|
443 self.assertTimingAlmostEqual(get.elapsed, 0) |
|
444 |
|
445 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3) |
|
446 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) |
|
447 |
|
448 proc.join() |
|
449 |
|
450 def _test_fork(self, queue): |
|
451 for i in range(10, 20): |
|
452 queue.put(i) |
|
453 # note that at this point the items may only be buffered, so the |
|
454 # process cannot shutdown until the feeder thread has finished |
|
455 # pushing items onto the pipe. |
|
456 |
|
457 def test_fork(self): |
|
458 # Old versions of Queue would fail to create a new feeder |
|
459 # thread for a forked process if the original process had its |
|
460 # own feeder thread. This test checks that this no longer |
|
461 # happens. |
|
462 |
|
463 queue = self.Queue() |
|
464 |
|
465 # put items on queue so that main process starts a feeder thread |
|
466 for i in range(10): |
|
467 queue.put(i) |
|
468 |
|
469 # wait to make sure thread starts before we fork a new process |
|
470 time.sleep(DELTA) |
|
471 |
|
472 # fork process |
|
473 p = self.Process(target=self._test_fork, args=(queue,)) |
|
474 p.start() |
|
475 |
|
476 # check that all expected items are in the queue |
|
477 for i in range(20): |
|
478 self.assertEqual(queue.get(), i) |
|
479 self.assertRaises(Queue.Empty, queue.get, False) |
|
480 |
|
481 p.join() |
|
482 |
|
483 def test_qsize(self): |
|
484 q = self.Queue() |
|
485 try: |
|
486 self.assertEqual(q.qsize(), 0) |
|
487 except NotImplementedError: |
|
488 return |
|
489 q.put(1) |
|
490 self.assertEqual(q.qsize(), 1) |
|
491 q.put(5) |
|
492 self.assertEqual(q.qsize(), 2) |
|
493 q.get() |
|
494 self.assertEqual(q.qsize(), 1) |
|
495 q.get() |
|
496 self.assertEqual(q.qsize(), 0) |
|
497 |
|
498 def _test_task_done(self, q): |
|
499 for obj in iter(q.get, None): |
|
500 time.sleep(DELTA) |
|
501 q.task_done() |
|
502 |
|
503 def test_task_done(self): |
|
504 queue = self.JoinableQueue() |
|
505 |
|
506 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'): |
|
507 return |
|
508 |
|
509 workers = [self.Process(target=self._test_task_done, args=(queue,)) |
|
510 for i in xrange(4)] |
|
511 |
|
512 for p in workers: |
|
513 p.start() |
|
514 |
|
515 for i in xrange(10): |
|
516 queue.put(i) |
|
517 |
|
518 queue.join() |
|
519 |
|
520 for p in workers: |
|
521 queue.put(None) |
|
522 |
|
523 for p in workers: |
|
524 p.join() |
|
525 |
|
526 # |
|
527 # |
|
528 # |
|
529 |
|
530 class _TestLock(BaseTestCase): |
|
531 |
|
532 def test_lock(self): |
|
533 lock = self.Lock() |
|
534 self.assertEqual(lock.acquire(), True) |
|
535 self.assertEqual(lock.acquire(False), False) |
|
536 self.assertEqual(lock.release(), None) |
|
537 self.assertRaises((ValueError, threading.ThreadError), lock.release) |
|
538 |
|
539 def test_rlock(self): |
|
540 lock = self.RLock() |
|
541 self.assertEqual(lock.acquire(), True) |
|
542 self.assertEqual(lock.acquire(), True) |
|
543 self.assertEqual(lock.acquire(), True) |
|
544 self.assertEqual(lock.release(), None) |
|
545 self.assertEqual(lock.release(), None) |
|
546 self.assertEqual(lock.release(), None) |
|
547 self.assertRaises((AssertionError, RuntimeError), lock.release) |
|
548 |
|
549 |
|
550 class _TestSemaphore(BaseTestCase): |
|
551 |
|
552 def _test_semaphore(self, sem): |
|
553 self.assertReturnsIfImplemented(2, get_value, sem) |
|
554 self.assertEqual(sem.acquire(), True) |
|
555 self.assertReturnsIfImplemented(1, get_value, sem) |
|
556 self.assertEqual(sem.acquire(), True) |
|
557 self.assertReturnsIfImplemented(0, get_value, sem) |
|
558 self.assertEqual(sem.acquire(False), False) |
|
559 self.assertReturnsIfImplemented(0, get_value, sem) |
|
560 self.assertEqual(sem.release(), None) |
|
561 self.assertReturnsIfImplemented(1, get_value, sem) |
|
562 self.assertEqual(sem.release(), None) |
|
563 self.assertReturnsIfImplemented(2, get_value, sem) |
|
564 |
|
565 def test_semaphore(self): |
|
566 sem = self.Semaphore(2) |
|
567 self._test_semaphore(sem) |
|
568 self.assertEqual(sem.release(), None) |
|
569 self.assertReturnsIfImplemented(3, get_value, sem) |
|
570 self.assertEqual(sem.release(), None) |
|
571 self.assertReturnsIfImplemented(4, get_value, sem) |
|
572 |
|
573 def test_bounded_semaphore(self): |
|
574 sem = self.BoundedSemaphore(2) |
|
575 self._test_semaphore(sem) |
|
576 # Currently fails on OS/X |
|
577 #if HAVE_GETVALUE: |
|
578 # self.assertRaises(ValueError, sem.release) |
|
579 # self.assertReturnsIfImplemented(2, get_value, sem) |
|
580 |
|
581 def test_timeout(self): |
|
582 if self.TYPE != 'processes': |
|
583 return |
|
584 |
|
585 sem = self.Semaphore(0) |
|
586 acquire = TimingWrapper(sem.acquire) |
|
587 |
|
588 self.assertEqual(acquire(False), False) |
|
589 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) |
|
590 |
|
591 self.assertEqual(acquire(False, None), False) |
|
592 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) |
|
593 |
|
594 self.assertEqual(acquire(False, TIMEOUT1), False) |
|
595 self.assertTimingAlmostEqual(acquire.elapsed, 0) |
|
596 |
|
597 self.assertEqual(acquire(True, TIMEOUT2), False) |
|
598 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) |
|
599 |
|
600 self.assertEqual(acquire(timeout=TIMEOUT3), False) |
|
601 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) |
|
602 |
|
603 |
|
604 class _TestCondition(BaseTestCase): |
|
605 |
|
606 def f(self, cond, sleeping, woken, timeout=None): |
|
607 cond.acquire() |
|
608 sleeping.release() |
|
609 cond.wait(timeout) |
|
610 woken.release() |
|
611 cond.release() |
|
612 |
|
613 def check_invariant(self, cond): |
|
614 # this is only supposed to succeed when there are no sleepers |
|
615 if self.TYPE == 'processes': |
|
616 try: |
|
617 sleepers = (cond._sleeping_count.get_value() - |
|
618 cond._woken_count.get_value()) |
|
619 self.assertEqual(sleepers, 0) |
|
620 self.assertEqual(cond._wait_semaphore.get_value(), 0) |
|
621 except NotImplementedError: |
|
622 pass |
|
623 |
|
624 def test_notify(self): |
|
625 cond = self.Condition() |
|
626 sleeping = self.Semaphore(0) |
|
627 woken = self.Semaphore(0) |
|
628 |
|
629 p = self.Process(target=self.f, args=(cond, sleeping, woken)) |
|
630 p.daemon = True |
|
631 p.start() |
|
632 |
|
633 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) |
|
634 p.daemon = True |
|
635 p.start() |
|
636 |
|
637 # wait for both children to start sleeping |
|
638 sleeping.acquire() |
|
639 sleeping.acquire() |
|
640 |
|
641 # check no process/thread has woken up |
|
642 time.sleep(DELTA) |
|
643 self.assertReturnsIfImplemented(0, get_value, woken) |
|
644 |
|
645 # wake up one process/thread |
|
646 cond.acquire() |
|
647 cond.notify() |
|
648 cond.release() |
|
649 |
|
650 # check one process/thread has woken up |
|
651 time.sleep(DELTA) |
|
652 self.assertReturnsIfImplemented(1, get_value, woken) |
|
653 |
|
654 # wake up another |
|
655 cond.acquire() |
|
656 cond.notify() |
|
657 cond.release() |
|
658 |
|
659 # check other has woken up |
|
660 time.sleep(DELTA) |
|
661 self.assertReturnsIfImplemented(2, get_value, woken) |
|
662 |
|
663 # check state is not mucked up |
|
664 self.check_invariant(cond) |
|
665 p.join() |
|
666 |
|
667 def test_notify_all(self): |
|
668 cond = self.Condition() |
|
669 sleeping = self.Semaphore(0) |
|
670 woken = self.Semaphore(0) |
|
671 |
|
672 # start some threads/processes which will timeout |
|
673 for i in range(3): |
|
674 p = self.Process(target=self.f, |
|
675 args=(cond, sleeping, woken, TIMEOUT1)) |
|
676 p.daemon = True |
|
677 p.start() |
|
678 |
|
679 t = threading.Thread(target=self.f, |
|
680 args=(cond, sleeping, woken, TIMEOUT1)) |
|
681 t.daemon = True |
|
682 t.start() |
|
683 |
|
684 # wait for them all to sleep |
|
685 for i in xrange(6): |
|
686 sleeping.acquire() |
|
687 |
|
688 # check they have all timed out |
|
689 for i in xrange(6): |
|
690 woken.acquire() |
|
691 self.assertReturnsIfImplemented(0, get_value, woken) |
|
692 |
|
693 # check state is not mucked up |
|
694 self.check_invariant(cond) |
|
695 |
|
696 # start some more threads/processes |
|
697 for i in range(3): |
|
698 p = self.Process(target=self.f, args=(cond, sleeping, woken)) |
|
699 p.daemon = True |
|
700 p.start() |
|
701 |
|
702 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) |
|
703 t.daemon = True |
|
704 t.start() |
|
705 |
|
706 # wait for them to all sleep |
|
707 for i in xrange(6): |
|
708 sleeping.acquire() |
|
709 |
|
710 # check no process/thread has woken up |
|
711 time.sleep(DELTA) |
|
712 self.assertReturnsIfImplemented(0, get_value, woken) |
|
713 |
|
714 # wake them all up |
|
715 cond.acquire() |
|
716 cond.notify_all() |
|
717 cond.release() |
|
718 |
|
719 # check they have all woken |
|
720 time.sleep(DELTA) |
|
721 self.assertReturnsIfImplemented(6, get_value, woken) |
|
722 |
|
723 # check state is not mucked up |
|
724 self.check_invariant(cond) |
|
725 |
|
726 def test_timeout(self): |
|
727 cond = self.Condition() |
|
728 wait = TimingWrapper(cond.wait) |
|
729 cond.acquire() |
|
730 res = wait(TIMEOUT1) |
|
731 cond.release() |
|
732 self.assertEqual(res, None) |
|
733 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) |
|
734 |
|
735 |
|
736 class _TestEvent(BaseTestCase): |
|
737 |
|
738 def _test_event(self, event): |
|
739 time.sleep(TIMEOUT2) |
|
740 event.set() |
|
741 |
|
742 def test_event(self): |
|
743 event = self.Event() |
|
744 wait = TimingWrapper(event.wait) |
|
745 |
|
746 # Removed temporaily, due to API shear, this does not |
|
747 # work with threading._Event objects. is_set == isSet |
|
748 #self.assertEqual(event.is_set(), False) |
|
749 |
|
750 self.assertEqual(wait(0.0), None) |
|
751 self.assertTimingAlmostEqual(wait.elapsed, 0.0) |
|
752 self.assertEqual(wait(TIMEOUT1), None) |
|
753 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) |
|
754 |
|
755 event.set() |
|
756 |
|
757 # See note above on the API differences |
|
758 # self.assertEqual(event.is_set(), True) |
|
759 self.assertEqual(wait(), None) |
|
760 self.assertTimingAlmostEqual(wait.elapsed, 0.0) |
|
761 self.assertEqual(wait(TIMEOUT1), None) |
|
762 self.assertTimingAlmostEqual(wait.elapsed, 0.0) |
|
763 # self.assertEqual(event.is_set(), True) |
|
764 |
|
765 event.clear() |
|
766 |
|
767 #self.assertEqual(event.is_set(), False) |
|
768 |
|
769 self.Process(target=self._test_event, args=(event,)).start() |
|
770 self.assertEqual(wait(), None) |
|
771 |
|
772 # |
|
773 # |
|
774 # |
|
775 |
|
776 class _TestValue(BaseTestCase): |
|
777 |
|
778 codes_values = [ |
|
779 ('i', 4343, 24234), |
|
780 ('d', 3.625, -4.25), |
|
781 ('h', -232, 234), |
|
782 ('c', latin('x'), latin('y')) |
|
783 ] |
|
784 |
|
785 def _test(self, values): |
|
786 for sv, cv in zip(values, self.codes_values): |
|
787 sv.value = cv[2] |
|
788 |
|
789 |
|
790 def test_value(self, raw=False): |
|
791 if self.TYPE != 'processes': |
|
792 return |
|
793 |
|
794 if raw: |
|
795 values = [self.RawValue(code, value) |
|
796 for code, value, _ in self.codes_values] |
|
797 else: |
|
798 values = [self.Value(code, value) |
|
799 for code, value, _ in self.codes_values] |
|
800 |
|
801 for sv, cv in zip(values, self.codes_values): |
|
802 self.assertEqual(sv.value, cv[1]) |
|
803 |
|
804 proc = self.Process(target=self._test, args=(values,)) |
|
805 proc.start() |
|
806 proc.join() |
|
807 |
|
808 for sv, cv in zip(values, self.codes_values): |
|
809 self.assertEqual(sv.value, cv[2]) |
|
810 |
|
811 def test_rawvalue(self): |
|
812 self.test_value(raw=True) |
|
813 |
|
814 def test_getobj_getlock(self): |
|
815 if self.TYPE != 'processes': |
|
816 return |
|
817 |
|
818 val1 = self.Value('i', 5) |
|
819 lock1 = val1.get_lock() |
|
820 obj1 = val1.get_obj() |
|
821 |
|
822 val2 = self.Value('i', 5, lock=None) |
|
823 lock2 = val2.get_lock() |
|
824 obj2 = val2.get_obj() |
|
825 |
|
826 lock = self.Lock() |
|
827 val3 = self.Value('i', 5, lock=lock) |
|
828 lock3 = val3.get_lock() |
|
829 obj3 = val3.get_obj() |
|
830 self.assertEqual(lock, lock3) |
|
831 |
|
832 arr4 = self.RawValue('i', 5) |
|
833 self.assertFalse(hasattr(arr4, 'get_lock')) |
|
834 self.assertFalse(hasattr(arr4, 'get_obj')) |
|
835 |
|
836 |
|
837 class _TestArray(BaseTestCase): |
|
838 |
|
839 def f(self, seq): |
|
840 for i in range(1, len(seq)): |
|
841 seq[i] += seq[i-1] |
|
842 |
|
843 def test_array(self, raw=False): |
|
844 if self.TYPE != 'processes': |
|
845 return |
|
846 |
|
847 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] |
|
848 if raw: |
|
849 arr = self.RawArray('i', seq) |
|
850 else: |
|
851 arr = self.Array('i', seq) |
|
852 |
|
853 self.assertEqual(len(arr), len(seq)) |
|
854 self.assertEqual(arr[3], seq[3]) |
|
855 self.assertEqual(list(arr[2:7]), list(seq[2:7])) |
|
856 |
|
857 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) |
|
858 |
|
859 self.assertEqual(list(arr[:]), seq) |
|
860 |
|
861 self.f(seq) |
|
862 |
|
863 p = self.Process(target=self.f, args=(arr,)) |
|
864 p.start() |
|
865 p.join() |
|
866 |
|
867 self.assertEqual(list(arr[:]), seq) |
|
868 |
|
869 def test_rawarray(self): |
|
870 self.test_array(raw=True) |
|
871 |
|
872 def test_getobj_getlock_obj(self): |
|
873 if self.TYPE != 'processes': |
|
874 return |
|
875 |
|
876 arr1 = self.Array('i', range(10)) |
|
877 lock1 = arr1.get_lock() |
|
878 obj1 = arr1.get_obj() |
|
879 |
|
880 arr2 = self.Array('i', range(10), lock=None) |
|
881 lock2 = arr2.get_lock() |
|
882 obj2 = arr2.get_obj() |
|
883 |
|
884 lock = self.Lock() |
|
885 arr3 = self.Array('i', range(10), lock=lock) |
|
886 lock3 = arr3.get_lock() |
|
887 obj3 = arr3.get_obj() |
|
888 self.assertEqual(lock, lock3) |
|
889 |
|
890 arr4 = self.RawArray('i', range(10)) |
|
891 self.assertFalse(hasattr(arr4, 'get_lock')) |
|
892 self.assertFalse(hasattr(arr4, 'get_obj')) |
|
893 |
|
894 # |
|
895 # |
|
896 # |
|
897 |
|
898 class _TestContainers(BaseTestCase): |
|
899 |
|
900 ALLOWED_TYPES = ('manager',) |
|
901 |
|
902 def test_list(self): |
|
903 a = self.list(range(10)) |
|
904 self.assertEqual(a[:], range(10)) |
|
905 |
|
906 b = self.list() |
|
907 self.assertEqual(b[:], []) |
|
908 |
|
909 b.extend(range(5)) |
|
910 self.assertEqual(b[:], range(5)) |
|
911 |
|
912 self.assertEqual(b[2], 2) |
|
913 self.assertEqual(b[2:10], [2,3,4]) |
|
914 |
|
915 b *= 2 |
|
916 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) |
|
917 |
|
918 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) |
|
919 |
|
920 self.assertEqual(a[:], range(10)) |
|
921 |
|
922 d = [a, b] |
|
923 e = self.list(d) |
|
924 self.assertEqual( |
|
925 e[:], |
|
926 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] |
|
927 ) |
|
928 |
|
929 f = self.list([a]) |
|
930 a.append('hello') |
|
931 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) |
|
932 |
|
933 def test_dict(self): |
|
934 d = self.dict() |
|
935 indices = range(65, 70) |
|
936 for i in indices: |
|
937 d[i] = chr(i) |
|
938 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) |
|
939 self.assertEqual(sorted(d.keys()), indices) |
|
940 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) |
|
941 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) |
|
942 |
|
943 def test_namespace(self): |
|
944 n = self.Namespace() |
|
945 n.name = 'Bob' |
|
946 n.job = 'Builder' |
|
947 n._hidden = 'hidden' |
|
948 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) |
|
949 del n.job |
|
950 self.assertEqual(str(n), "Namespace(name='Bob')") |
|
951 self.assertTrue(hasattr(n, 'name')) |
|
952 self.assertTrue(not hasattr(n, 'job')) |
|
953 |
|
954 # |
|
955 # |
|
956 # |
|
957 |
|
958 def sqr(x, wait=0.0): |
|
959 time.sleep(wait) |
|
960 return x*x |
|
961 class _TestPool(BaseTestCase): |
|
962 |
|
963 def test_apply(self): |
|
964 papply = self.pool.apply |
|
965 self.assertEqual(papply(sqr, (5,)), sqr(5)) |
|
966 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) |
|
967 |
|
968 def test_map(self): |
|
969 pmap = self.pool.map |
|
970 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10))) |
|
971 self.assertEqual(pmap(sqr, range(100), chunksize=20), |
|
972 map(sqr, range(100))) |
|
973 |
|
974 def test_async(self): |
|
975 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) |
|
976 get = TimingWrapper(res.get) |
|
977 self.assertEqual(get(), 49) |
|
978 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) |
|
979 |
|
980 def test_async_timeout(self): |
|
981 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2)) |
|
982 get = TimingWrapper(res.get) |
|
983 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) |
|
984 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) |
|
985 |
|
986 def test_imap(self): |
|
987 it = self.pool.imap(sqr, range(10)) |
|
988 self.assertEqual(list(it), map(sqr, range(10))) |
|
989 |
|
990 it = self.pool.imap(sqr, range(10)) |
|
991 for i in range(10): |
|
992 self.assertEqual(it.next(), i*i) |
|
993 self.assertRaises(StopIteration, it.next) |
|
994 |
|
995 it = self.pool.imap(sqr, range(1000), chunksize=100) |
|
996 for i in range(1000): |
|
997 self.assertEqual(it.next(), i*i) |
|
998 self.assertRaises(StopIteration, it.next) |
|
999 |
|
1000 def test_imap_unordered(self): |
|
1001 it = self.pool.imap_unordered(sqr, range(1000)) |
|
1002 self.assertEqual(sorted(it), map(sqr, range(1000))) |
|
1003 |
|
1004 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53) |
|
1005 self.assertEqual(sorted(it), map(sqr, range(1000))) |
|
1006 |
|
1007 def test_make_pool(self): |
|
1008 p = multiprocessing.Pool(3) |
|
1009 self.assertEqual(3, len(p._pool)) |
|
1010 p.close() |
|
1011 p.join() |
|
1012 |
|
1013 def test_terminate(self): |
|
1014 if self.TYPE == 'manager': |
|
1015 # On Unix a forked process increfs each shared object to |
|
1016 # which its parent process held a reference. If the |
|
1017 # forked process gets terminated then there is likely to |
|
1018 # be a reference leak. So to prevent |
|
1019 # _TestZZZNumberOfObjects from failing we skip this test |
|
1020 # when using a manager. |
|
1021 return |
|
1022 |
|
1023 result = self.pool.map_async( |
|
1024 time.sleep, [0.1 for i in range(10000)], chunksize=1 |
|
1025 ) |
|
1026 self.pool.terminate() |
|
1027 join = TimingWrapper(self.pool.join) |
|
1028 join() |
|
1029 self.assertTrue(join.elapsed < 0.2) |
|
1030 # |
|
1031 # Test that manager has expected number of shared objects left |
|
1032 # |
|
1033 |
|
1034 class _TestZZZNumberOfObjects(BaseTestCase): |
|
1035 # Because test cases are sorted alphabetically, this one will get |
|
1036 # run after all the other tests for the manager. It tests that |
|
1037 # there have been no "reference leaks" for the manager's shared |
|
1038 # objects. Note the comment in _TestPool.test_terminate(). |
|
1039 ALLOWED_TYPES = ('manager',) |
|
1040 |
|
1041 def test_number_of_objects(self): |
|
1042 EXPECTED_NUMBER = 1 # the pool object is still alive |
|
1043 multiprocessing.active_children() # discard dead process objs |
|
1044 gc.collect() # do garbage collection |
|
1045 refs = self.manager._number_of_objects() |
|
1046 if refs != EXPECTED_NUMBER: |
|
1047 print self.manager._debug_info() |
|
1048 |
|
1049 self.assertEqual(refs, EXPECTED_NUMBER) |
|
1050 |
|
1051 # |
|
1052 # Test of creating a customized manager class |
|
1053 # |
|
1054 |
|
1055 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError |
|
1056 |
|
1057 class FooBar(object): |
|
1058 def f(self): |
|
1059 return 'f()' |
|
1060 def g(self): |
|
1061 raise ValueError |
|
1062 def _h(self): |
|
1063 return '_h()' |
|
1064 |
|
1065 def baz(): |
|
1066 for i in xrange(10): |
|
1067 yield i*i |
|
1068 |
|
1069 class IteratorProxy(BaseProxy): |
|
1070 _exposed_ = ('next', '__next__') |
|
1071 def __iter__(self): |
|
1072 return self |
|
1073 def next(self): |
|
1074 return self._callmethod('next') |
|
1075 def __next__(self): |
|
1076 return self._callmethod('__next__') |
|
1077 |
|
1078 class MyManager(BaseManager): |
|
1079 pass |
|
1080 |
|
1081 MyManager.register('Foo', callable=FooBar) |
|
1082 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) |
|
1083 MyManager.register('baz', callable=baz, proxytype=IteratorProxy) |
|
1084 |
|
1085 |
|
1086 class _TestMyManager(BaseTestCase): |
|
1087 |
|
1088 ALLOWED_TYPES = ('manager',) |
|
1089 |
|
1090 def test_mymanager(self): |
|
1091 manager = MyManager() |
|
1092 manager.start() |
|
1093 |
|
1094 foo = manager.Foo() |
|
1095 bar = manager.Bar() |
|
1096 baz = manager.baz() |
|
1097 |
|
1098 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] |
|
1099 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] |
|
1100 |
|
1101 self.assertEqual(foo_methods, ['f', 'g']) |
|
1102 self.assertEqual(bar_methods, ['f', '_h']) |
|
1103 |
|
1104 self.assertEqual(foo.f(), 'f()') |
|
1105 self.assertRaises(ValueError, foo.g) |
|
1106 self.assertEqual(foo._callmethod('f'), 'f()') |
|
1107 self.assertRaises(RemoteError, foo._callmethod, '_h') |
|
1108 |
|
1109 self.assertEqual(bar.f(), 'f()') |
|
1110 self.assertEqual(bar._h(), '_h()') |
|
1111 self.assertEqual(bar._callmethod('f'), 'f()') |
|
1112 self.assertEqual(bar._callmethod('_h'), '_h()') |
|
1113 |
|
1114 self.assertEqual(list(baz), [i*i for i in range(10)]) |
|
1115 |
|
1116 manager.shutdown() |
|
1117 |
|
1118 # |
|
1119 # Test of connecting to a remote server and using xmlrpclib for serialization |
|
1120 # |
|
1121 |
|
1122 _queue = Queue.Queue() |
|
1123 def get_queue(): |
|
1124 return _queue |
|
1125 |
|
1126 class QueueManager(BaseManager): |
|
1127 '''manager class used by server process''' |
|
1128 QueueManager.register('get_queue', callable=get_queue) |
|
1129 |
|
1130 class QueueManager2(BaseManager): |
|
1131 '''manager class which specifies the same interface as QueueManager''' |
|
1132 QueueManager2.register('get_queue') |
|
1133 |
|
1134 |
|
1135 SERIALIZER = 'xmlrpclib' |
|
1136 |
|
1137 class _TestRemoteManager(BaseTestCase): |
|
1138 |
|
1139 ALLOWED_TYPES = ('manager',) |
|
1140 |
|
1141 def _putter(self, address, authkey): |
|
1142 manager = QueueManager2( |
|
1143 address=address, authkey=authkey, serializer=SERIALIZER |
|
1144 ) |
|
1145 manager.connect() |
|
1146 queue = manager.get_queue() |
|
1147 queue.put(('hello world', None, True, 2.25)) |
|
1148 |
|
1149 def test_remote(self): |
|
1150 authkey = os.urandom(32) |
|
1151 |
|
1152 manager = QueueManager( |
|
1153 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER |
|
1154 ) |
|
1155 manager.start() |
|
1156 |
|
1157 p = self.Process(target=self._putter, args=(manager.address, authkey)) |
|
1158 p.start() |
|
1159 |
|
1160 manager2 = QueueManager2( |
|
1161 address=manager.address, authkey=authkey, serializer=SERIALIZER |
|
1162 ) |
|
1163 manager2.connect() |
|
1164 queue = manager2.get_queue() |
|
1165 |
|
1166 # Note that xmlrpclib will deserialize object as a list not a tuple |
|
1167 self.assertEqual(queue.get(), ['hello world', None, True, 2.25]) |
|
1168 |
|
1169 # Because we are using xmlrpclib for serialization instead of |
|
1170 # pickle this will cause a serialization error. |
|
1171 self.assertRaises(Exception, queue.put, time.sleep) |
|
1172 |
|
1173 # Make queue finalizer run before the server is stopped |
|
1174 del queue |
|
1175 manager.shutdown() |
|
1176 |
|
1177 # |
|
1178 # |
|
1179 # |
|
1180 |
|
1181 SENTINEL = latin('') |
|
1182 |
|
1183 class _TestConnection(BaseTestCase): |
|
1184 |
|
1185 ALLOWED_TYPES = ('processes', 'threads') |
|
1186 |
|
1187 def _echo(self, conn): |
|
1188 for msg in iter(conn.recv_bytes, SENTINEL): |
|
1189 conn.send_bytes(msg) |
|
1190 conn.close() |
|
1191 |
|
1192 def test_connection(self): |
|
1193 conn, child_conn = self.Pipe() |
|
1194 |
|
1195 p = self.Process(target=self._echo, args=(child_conn,)) |
|
1196 p.daemon = True |
|
1197 p.start() |
|
1198 |
|
1199 seq = [1, 2.25, None] |
|
1200 msg = latin('hello world') |
|
1201 longmsg = msg * 10 |
|
1202 arr = array.array('i', range(4)) |
|
1203 |
|
1204 if self.TYPE == 'processes': |
|
1205 self.assertEqual(type(conn.fileno()), int) |
|
1206 |
|
1207 self.assertEqual(conn.send(seq), None) |
|
1208 self.assertEqual(conn.recv(), seq) |
|
1209 |
|
1210 self.assertEqual(conn.send_bytes(msg), None) |
|
1211 self.assertEqual(conn.recv_bytes(), msg) |
|
1212 |
|
1213 if self.TYPE == 'processes': |
|
1214 buffer = array.array('i', [0]*10) |
|
1215 expected = list(arr) + [0] * (10 - len(arr)) |
|
1216 self.assertEqual(conn.send_bytes(arr), None) |
|
1217 self.assertEqual(conn.recv_bytes_into(buffer), |
|
1218 len(arr) * buffer.itemsize) |
|
1219 self.assertEqual(list(buffer), expected) |
|
1220 |
|
1221 buffer = array.array('i', [0]*10) |
|
1222 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) |
|
1223 self.assertEqual(conn.send_bytes(arr), None) |
|
1224 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), |
|
1225 len(arr) * buffer.itemsize) |
|
1226 self.assertEqual(list(buffer), expected) |
|
1227 |
|
1228 buffer = bytearray(latin(' ' * 40)) |
|
1229 self.assertEqual(conn.send_bytes(longmsg), None) |
|
1230 try: |
|
1231 res = conn.recv_bytes_into(buffer) |
|
1232 except multiprocessing.BufferTooShort, e: |
|
1233 self.assertEqual(e.args, (longmsg,)) |
|
1234 else: |
|
1235 self.fail('expected BufferTooShort, got %s' % res) |
|
1236 |
|
1237 poll = TimingWrapper(conn.poll) |
|
1238 |
|
1239 self.assertEqual(poll(), False) |
|
1240 self.assertTimingAlmostEqual(poll.elapsed, 0) |
|
1241 |
|
1242 self.assertEqual(poll(TIMEOUT1), False) |
|
1243 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) |
|
1244 |
|
1245 conn.send(None) |
|
1246 |
|
1247 self.assertEqual(poll(TIMEOUT1), True) |
|
1248 self.assertTimingAlmostEqual(poll.elapsed, 0) |
|
1249 |
|
1250 self.assertEqual(conn.recv(), None) |
|
1251 |
|
1252 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb |
|
1253 conn.send_bytes(really_big_msg) |
|
1254 self.assertEqual(conn.recv_bytes(), really_big_msg) |
|
1255 |
|
1256 conn.send_bytes(SENTINEL) # tell child to quit |
|
1257 child_conn.close() |
|
1258 |
|
1259 if self.TYPE == 'processes': |
|
1260 self.assertEqual(conn.readable, True) |
|
1261 self.assertEqual(conn.writable, True) |
|
1262 self.assertRaises(EOFError, conn.recv) |
|
1263 self.assertRaises(EOFError, conn.recv_bytes) |
|
1264 |
|
1265 p.join() |
|
1266 |
|
1267 def test_duplex_false(self): |
|
1268 reader, writer = self.Pipe(duplex=False) |
|
1269 self.assertEqual(writer.send(1), None) |
|
1270 self.assertEqual(reader.recv(), 1) |
|
1271 if self.TYPE == 'processes': |
|
1272 self.assertEqual(reader.readable, True) |
|
1273 self.assertEqual(reader.writable, False) |
|
1274 self.assertEqual(writer.readable, False) |
|
1275 self.assertEqual(writer.writable, True) |
|
1276 self.assertRaises(IOError, reader.send, 2) |
|
1277 self.assertRaises(IOError, writer.recv) |
|
1278 self.assertRaises(IOError, writer.poll) |
|
1279 |
|
1280 def test_spawn_close(self): |
|
1281 # We test that a pipe connection can be closed by parent |
|
1282 # process immediately after child is spawned. On Windows this |
|
1283 # would have sometimes failed on old versions because |
|
1284 # child_conn would be closed before the child got a chance to |
|
1285 # duplicate it. |
|
1286 conn, child_conn = self.Pipe() |
|
1287 |
|
1288 p = self.Process(target=self._echo, args=(child_conn,)) |
|
1289 p.start() |
|
1290 child_conn.close() # this might complete before child initializes |
|
1291 |
|
1292 msg = latin('hello') |
|
1293 conn.send_bytes(msg) |
|
1294 self.assertEqual(conn.recv_bytes(), msg) |
|
1295 |
|
1296 conn.send_bytes(SENTINEL) |
|
1297 conn.close() |
|
1298 p.join() |
|
1299 |
|
1300 def test_sendbytes(self): |
|
1301 if self.TYPE != 'processes': |
|
1302 return |
|
1303 |
|
1304 msg = latin('abcdefghijklmnopqrstuvwxyz') |
|
1305 a, b = self.Pipe() |
|
1306 |
|
1307 a.send_bytes(msg) |
|
1308 self.assertEqual(b.recv_bytes(), msg) |
|
1309 |
|
1310 a.send_bytes(msg, 5) |
|
1311 self.assertEqual(b.recv_bytes(), msg[5:]) |
|
1312 |
|
1313 a.send_bytes(msg, 7, 8) |
|
1314 self.assertEqual(b.recv_bytes(), msg[7:7+8]) |
|
1315 |
|
1316 a.send_bytes(msg, 26) |
|
1317 self.assertEqual(b.recv_bytes(), latin('')) |
|
1318 |
|
1319 a.send_bytes(msg, 26, 0) |
|
1320 self.assertEqual(b.recv_bytes(), latin('')) |
|
1321 |
|
1322 self.assertRaises(ValueError, a.send_bytes, msg, 27) |
|
1323 |
|
1324 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) |
|
1325 |
|
1326 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) |
|
1327 |
|
1328 self.assertRaises(ValueError, a.send_bytes, msg, -1) |
|
1329 |
|
1330 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) |
|
1331 |
|
1332 class _TestListenerClient(BaseTestCase): |
|
1333 |
|
1334 ALLOWED_TYPES = ('processes', 'threads') |
|
1335 |
|
1336 def _test(self, address): |
|
1337 conn = self.connection.Client(address) |
|
1338 conn.send('hello') |
|
1339 conn.close() |
|
1340 |
|
1341 def test_listener_client(self): |
|
1342 for family in self.connection.families: |
|
1343 l = self.connection.Listener(family=family) |
|
1344 p = self.Process(target=self._test, args=(l.address,)) |
|
1345 p.daemon = True |
|
1346 p.start() |
|
1347 conn = l.accept() |
|
1348 self.assertEqual(conn.recv(), 'hello') |
|
1349 p.join() |
|
1350 l.close() |
|
1351 # |
|
1352 # Test of sending connection and socket objects between processes |
|
1353 # |
|
1354 """ |
|
1355 class _TestPicklingConnections(BaseTestCase): |
|
1356 |
|
1357 ALLOWED_TYPES = ('processes',) |
|
1358 |
|
1359 def _listener(self, conn, families): |
|
1360 for fam in families: |
|
1361 l = self.connection.Listener(family=fam) |
|
1362 conn.send(l.address) |
|
1363 new_conn = l.accept() |
|
1364 conn.send(new_conn) |
|
1365 |
|
1366 if self.TYPE == 'processes': |
|
1367 l = socket.socket() |
|
1368 l.bind(('localhost', 0)) |
|
1369 conn.send(l.getsockname()) |
|
1370 l.listen(1) |
|
1371 new_conn, addr = l.accept() |
|
1372 conn.send(new_conn) |
|
1373 |
|
1374 conn.recv() |
|
1375 |
|
1376 def _remote(self, conn): |
|
1377 for (address, msg) in iter(conn.recv, None): |
|
1378 client = self.connection.Client(address) |
|
1379 client.send(msg.upper()) |
|
1380 client.close() |
|
1381 |
|
1382 if self.TYPE == 'processes': |
|
1383 address, msg = conn.recv() |
|
1384 client = socket.socket() |
|
1385 client.connect(address) |
|
1386 client.sendall(msg.upper()) |
|
1387 client.close() |
|
1388 |
|
1389 conn.close() |
|
1390 |
|
1391 def test_pickling(self): |
|
1392 try: |
|
1393 multiprocessing.allow_connection_pickling() |
|
1394 except ImportError: |
|
1395 return |
|
1396 |
|
1397 families = self.connection.families |
|
1398 |
|
1399 lconn, lconn0 = self.Pipe() |
|
1400 lp = self.Process(target=self._listener, args=(lconn0, families)) |
|
1401 lp.start() |
|
1402 lconn0.close() |
|
1403 |
|
1404 rconn, rconn0 = self.Pipe() |
|
1405 rp = self.Process(target=self._remote, args=(rconn0,)) |
|
1406 rp.start() |
|
1407 rconn0.close() |
|
1408 |
|
1409 for fam in families: |
|
1410 msg = ('This connection uses family %s' % fam).encode('ascii') |
|
1411 address = lconn.recv() |
|
1412 rconn.send((address, msg)) |
|
1413 new_conn = lconn.recv() |
|
1414 self.assertEqual(new_conn.recv(), msg.upper()) |
|
1415 |
|
1416 rconn.send(None) |
|
1417 |
|
1418 if self.TYPE == 'processes': |
|
1419 msg = latin('This connection uses a normal socket') |
|
1420 address = lconn.recv() |
|
1421 rconn.send((address, msg)) |
|
1422 if hasattr(socket, 'fromfd'): |
|
1423 new_conn = lconn.recv() |
|
1424 self.assertEqual(new_conn.recv(100), msg.upper()) |
|
1425 else: |
|
1426 # XXX On Windows with Py2.6 need to backport fromfd() |
|
1427 discard = lconn.recv_bytes() |
|
1428 |
|
1429 lconn.send(None) |
|
1430 |
|
1431 rconn.close() |
|
1432 lconn.close() |
|
1433 |
|
1434 lp.join() |
|
1435 rp.join() |
|
1436 """ |
|
1437 # |
|
1438 # |
|
1439 # |
|
1440 |
|
1441 class _TestHeap(BaseTestCase): |
|
1442 |
|
1443 ALLOWED_TYPES = ('processes',) |
|
1444 |
|
1445 def test_heap(self): |
|
1446 iterations = 5000 |
|
1447 maxblocks = 50 |
|
1448 blocks = [] |
|
1449 |
|
1450 # create and destroy lots of blocks of different sizes |
|
1451 for i in xrange(iterations): |
|
1452 size = int(random.lognormvariate(0, 1) * 1000) |
|
1453 b = multiprocessing.heap.BufferWrapper(size) |
|
1454 blocks.append(b) |
|
1455 if len(blocks) > maxblocks: |
|
1456 i = random.randrange(maxblocks) |
|
1457 del blocks[i] |
|
1458 |
|
1459 # get the heap object |
|
1460 heap = multiprocessing.heap.BufferWrapper._heap |
|
1461 |
|
1462 # verify the state of the heap |
|
1463 all = [] |
|
1464 occupied = 0 |
|
1465 for L in heap._len_to_seq.values(): |
|
1466 for arena, start, stop in L: |
|
1467 all.append((heap._arenas.index(arena), start, stop, |
|
1468 stop-start, 'free')) |
|
1469 for arena, start, stop in heap._allocated_blocks: |
|
1470 all.append((heap._arenas.index(arena), start, stop, |
|
1471 stop-start, 'occupied')) |
|
1472 occupied += (stop-start) |
|
1473 |
|
1474 all.sort() |
|
1475 |
|
1476 for i in range(len(all)-1): |
|
1477 (arena, start, stop) = all[i][:3] |
|
1478 (narena, nstart, nstop) = all[i+1][:3] |
|
1479 self.assertTrue((arena != narena and nstart == 0) or |
|
1480 (stop == nstart)) |
|
1481 |
|
1482 # |
|
1483 # |
|
1484 # |
|
1485 |
|
1486 try: |
|
1487 from ctypes import Structure, Value, copy, c_int, c_double |
|
1488 except ImportError: |
|
1489 Structure = object |
|
1490 c_int = c_double = None |
|
1491 |
|
1492 class _Foo(Structure): |
|
1493 _fields_ = [ |
|
1494 ('x', c_int), |
|
1495 ('y', c_double) |
|
1496 ] |
|
1497 |
|
1498 class _TestSharedCTypes(BaseTestCase): |
|
1499 |
|
1500 ALLOWED_TYPES = ('processes',) |
|
1501 |
|
1502 def _double(self, x, y, foo, arr, string): |
|
1503 x.value *= 2 |
|
1504 y.value *= 2 |
|
1505 foo.x *= 2 |
|
1506 foo.y *= 2 |
|
1507 string.value *= 2 |
|
1508 for i in range(len(arr)): |
|
1509 arr[i] *= 2 |
|
1510 |
|
1511 def test_sharedctypes(self, lock=False): |
|
1512 if c_int is None: |
|
1513 return |
|
1514 |
|
1515 x = Value('i', 7, lock=lock) |
|
1516 y = Value(ctypes.c_double, 1.0/3.0, lock=lock) |
|
1517 foo = Value(_Foo, 3, 2, lock=lock) |
|
1518 arr = Array('d', range(10), lock=lock) |
|
1519 string = Array('c', 20, lock=lock) |
|
1520 string.value = 'hello' |
|
1521 |
|
1522 p = self.Process(target=self._double, args=(x, y, foo, arr, string)) |
|
1523 p.start() |
|
1524 p.join() |
|
1525 |
|
1526 self.assertEqual(x.value, 14) |
|
1527 self.assertAlmostEqual(y.value, 2.0/3.0) |
|
1528 self.assertEqual(foo.x, 6) |
|
1529 self.assertAlmostEqual(foo.y, 4.0) |
|
1530 for i in range(10): |
|
1531 self.assertAlmostEqual(arr[i], i*2) |
|
1532 self.assertEqual(string.value, latin('hellohello')) |
|
1533 |
|
1534 def test_synchronize(self): |
|
1535 self.test_sharedctypes(lock=True) |
|
1536 |
|
1537 def test_copy(self): |
|
1538 if c_int is None: |
|
1539 return |
|
1540 |
|
1541 foo = _Foo(2, 5.0) |
|
1542 bar = copy(foo) |
|
1543 foo.x = 0 |
|
1544 foo.y = 0 |
|
1545 self.assertEqual(bar.x, 2) |
|
1546 self.assertAlmostEqual(bar.y, 5.0) |
|
1547 |
|
1548 # |
|
1549 # |
|
1550 # |
|
1551 |
|
1552 class _TestFinalize(BaseTestCase): |
|
1553 |
|
1554 ALLOWED_TYPES = ('processes',) |
|
1555 |
|
1556 def _test_finalize(self, conn): |
|
1557 class Foo(object): |
|
1558 pass |
|
1559 |
|
1560 a = Foo() |
|
1561 util.Finalize(a, conn.send, args=('a',)) |
|
1562 del a # triggers callback for a |
|
1563 |
|
1564 b = Foo() |
|
1565 close_b = util.Finalize(b, conn.send, args=('b',)) |
|
1566 close_b() # triggers callback for b |
|
1567 close_b() # does nothing because callback has already been called |
|
1568 del b # does nothing because callback has already been called |
|
1569 |
|
1570 c = Foo() |
|
1571 util.Finalize(c, conn.send, args=('c',)) |
|
1572 |
|
1573 d10 = Foo() |
|
1574 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) |
|
1575 |
|
1576 d01 = Foo() |
|
1577 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) |
|
1578 d02 = Foo() |
|
1579 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) |
|
1580 d03 = Foo() |
|
1581 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) |
|
1582 |
|
1583 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) |
|
1584 |
|
1585 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) |
|
1586 |
|
1587 # call mutliprocessing's cleanup function then exit process without |
|
1588 # garbage collecting locals |
|
1589 util._exit_function() |
|
1590 conn.close() |
|
1591 os._exit(0) |
|
1592 |
|
1593 def test_finalize(self): |
|
1594 conn, child_conn = self.Pipe() |
|
1595 |
|
1596 p = self.Process(target=self._test_finalize, args=(child_conn,)) |
|
1597 p.start() |
|
1598 p.join() |
|
1599 |
|
1600 result = [obj for obj in iter(conn.recv, 'STOP')] |
|
1601 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) |
|
1602 |
|
1603 # |
|
1604 # Test that from ... import * works for each module |
|
1605 # |
|
1606 |
|
1607 class _TestImportStar(BaseTestCase): |
|
1608 |
|
1609 ALLOWED_TYPES = ('processes',) |
|
1610 |
|
1611 def test_import(self): |
|
1612 modules = ( |
|
1613 'multiprocessing', 'multiprocessing.connection', |
|
1614 'multiprocessing.heap', 'multiprocessing.managers', |
|
1615 'multiprocessing.pool', 'multiprocessing.process', |
|
1616 'multiprocessing.reduction', 'multiprocessing.sharedctypes', |
|
1617 'multiprocessing.synchronize', 'multiprocessing.util' |
|
1618 ) |
|
1619 |
|
1620 for name in modules: |
|
1621 __import__(name) |
|
1622 mod = sys.modules[name] |
|
1623 |
|
1624 for attr in getattr(mod, '__all__', ()): |
|
1625 self.assertTrue( |
|
1626 hasattr(mod, attr), |
|
1627 '%r does not have attribute %r' % (mod, attr) |
|
1628 ) |
|
1629 |
|
1630 # |
|
1631 # Quick test that logging works -- does not test logging output |
|
1632 # |
|
1633 |
|
1634 class _TestLogging(BaseTestCase): |
|
1635 |
|
1636 ALLOWED_TYPES = ('processes',) |
|
1637 |
|
1638 def test_enable_logging(self): |
|
1639 logger = multiprocessing.get_logger() |
|
1640 logger.setLevel(util.SUBWARNING) |
|
1641 self.assertTrue(logger is not None) |
|
1642 logger.debug('this will not be printed') |
|
1643 logger.info('nor will this') |
|
1644 logger.setLevel(LOG_LEVEL) |
|
1645 |
|
1646 def _test_level(self, conn): |
|
1647 logger = multiprocessing.get_logger() |
|
1648 conn.send(logger.getEffectiveLevel()) |
|
1649 |
|
1650 def test_level(self): |
|
1651 LEVEL1 = 32 |
|
1652 LEVEL2 = 37 |
|
1653 |
|
1654 logger = multiprocessing.get_logger() |
|
1655 root_logger = logging.getLogger() |
|
1656 root_level = root_logger.level |
|
1657 |
|
1658 reader, writer = multiprocessing.Pipe(duplex=False) |
|
1659 |
|
1660 logger.setLevel(LEVEL1) |
|
1661 self.Process(target=self._test_level, args=(writer,)).start() |
|
1662 self.assertEqual(LEVEL1, reader.recv()) |
|
1663 |
|
1664 logger.setLevel(logging.NOTSET) |
|
1665 root_logger.setLevel(LEVEL2) |
|
1666 self.Process(target=self._test_level, args=(writer,)).start() |
|
1667 self.assertEqual(LEVEL2, reader.recv()) |
|
1668 |
|
1669 root_logger.setLevel(root_level) |
|
1670 logger.setLevel(level=LOG_LEVEL) |
|
1671 |
|
1672 # |
|
1673 # Functions used to create test cases from the base ones in this module |
|
1674 # |
|
1675 |
|
1676 def get_attributes(Source, names): |
|
1677 d = {} |
|
1678 for name in names: |
|
1679 obj = getattr(Source, name) |
|
1680 if type(obj) == type(get_attributes): |
|
1681 obj = staticmethod(obj) |
|
1682 d[name] = obj |
|
1683 return d |
|
1684 |
|
1685 def create_test_cases(Mixin, type): |
|
1686 result = {} |
|
1687 glob = globals() |
|
1688 Type = type[0].upper() + type[1:] |
|
1689 |
|
1690 for name in glob.keys(): |
|
1691 if name.startswith('_Test'): |
|
1692 base = glob[name] |
|
1693 if type in base.ALLOWED_TYPES: |
|
1694 newname = 'With' + Type + name[1:] |
|
1695 class Temp(base, unittest.TestCase, Mixin): |
|
1696 pass |
|
1697 result[newname] = Temp |
|
1698 Temp.__name__ = newname |
|
1699 Temp.__module__ = Mixin.__module__ |
|
1700 return result |
|
1701 |
|
1702 # |
|
1703 # Create test cases |
|
1704 # |
|
1705 |
|
1706 class ProcessesMixin(object): |
|
1707 TYPE = 'processes' |
|
1708 Process = multiprocessing.Process |
|
1709 locals().update(get_attributes(multiprocessing, ( |
|
1710 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', |
|
1711 'Condition', 'Event', 'Value', 'Array', 'RawValue', |
|
1712 'RawArray', 'current_process', 'active_children', 'Pipe', |
|
1713 'connection', 'JoinableQueue' |
|
1714 ))) |
|
1715 |
|
1716 testcases_processes = create_test_cases(ProcessesMixin, type='processes') |
|
1717 globals().update(testcases_processes) |
|
1718 |
|
1719 |
|
1720 class ManagerMixin(object): |
|
1721 TYPE = 'manager' |
|
1722 Process = multiprocessing.Process |
|
1723 manager = object.__new__(multiprocessing.managers.SyncManager) |
|
1724 locals().update(get_attributes(manager, ( |
|
1725 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', |
|
1726 'Condition', 'Event', 'Value', 'Array', 'list', 'dict', |
|
1727 'Namespace', 'JoinableQueue' |
|
1728 ))) |
|
1729 |
|
1730 testcases_manager = create_test_cases(ManagerMixin, type='manager') |
|
1731 globals().update(testcases_manager) |
|
1732 |
|
1733 |
|
1734 class ThreadsMixin(object): |
|
1735 TYPE = 'threads' |
|
1736 Process = multiprocessing.dummy.Process |
|
1737 locals().update(get_attributes(multiprocessing.dummy, ( |
|
1738 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', |
|
1739 'Condition', 'Event', 'Value', 'Array', 'current_process', |
|
1740 'active_children', 'Pipe', 'connection', 'dict', 'list', |
|
1741 'Namespace', 'JoinableQueue' |
|
1742 ))) |
|
1743 |
|
1744 testcases_threads = create_test_cases(ThreadsMixin, type='threads') |
|
1745 globals().update(testcases_threads) |
|
1746 |
|
1747 class OtherTest(unittest.TestCase): |
|
1748 # TODO: add more tests for deliver/answer challenge. |
|
1749 def test_deliver_challenge_auth_failure(self): |
|
1750 class _FakeConnection(object): |
|
1751 def recv_bytes(self, size): |
|
1752 return b'something bogus' |
|
1753 def send_bytes(self, data): |
|
1754 pass |
|
1755 self.assertRaises(multiprocessing.AuthenticationError, |
|
1756 multiprocessing.connection.deliver_challenge, |
|
1757 _FakeConnection(), b'abc') |
|
1758 |
|
1759 def test_answer_challenge_auth_failure(self): |
|
1760 class _FakeConnection(object): |
|
1761 def __init__(self): |
|
1762 self.count = 0 |
|
1763 def recv_bytes(self, size): |
|
1764 self.count += 1 |
|
1765 if self.count == 1: |
|
1766 return multiprocessing.connection.CHALLENGE |
|
1767 elif self.count == 2: |
|
1768 return b'something bogus' |
|
1769 return b'' |
|
1770 def send_bytes(self, data): |
|
1771 pass |
|
1772 self.assertRaises(multiprocessing.AuthenticationError, |
|
1773 multiprocessing.connection.answer_challenge, |
|
1774 _FakeConnection(), b'abc') |
|
1775 |
|
1776 testcases_other = [OtherTest] |
|
1777 |
|
1778 # |
|
1779 # |
|
1780 # |
|
1781 |
|
1782 def test_main(run=None): |
|
1783 if sys.platform.startswith("linux"): |
|
1784 try: |
|
1785 lock = multiprocessing.RLock() |
|
1786 except OSError: |
|
1787 from test.test_support import TestSkipped |
|
1788 raise TestSkipped("OSError raises on RLock creation, see issue 3111!") |
|
1789 |
|
1790 if run is None: |
|
1791 from test.test_support import run_unittest as run |
|
1792 |
|
1793 util.get_temp_dir() # creates temp directory for use by all processes |
|
1794 |
|
1795 multiprocessing.get_logger().setLevel(LOG_LEVEL) |
|
1796 |
|
1797 ProcessesMixin.pool = multiprocessing.Pool(4) |
|
1798 ThreadsMixin.pool = multiprocessing.dummy.Pool(4) |
|
1799 ManagerMixin.manager.__init__() |
|
1800 ManagerMixin.manager.start() |
|
1801 ManagerMixin.pool = ManagerMixin.manager.Pool(4) |
|
1802 |
|
1803 testcases = ( |
|
1804 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) + |
|
1805 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) + |
|
1806 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) + |
|
1807 testcases_other |
|
1808 ) |
|
1809 |
|
1810 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase |
|
1811 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases) |
|
1812 run(suite) |
|
1813 |
|
1814 ThreadsMixin.pool.terminate() |
|
1815 ProcessesMixin.pool.terminate() |
|
1816 ManagerMixin.pool.terminate() |
|
1817 ManagerMixin.manager.shutdown() |
|
1818 |
|
1819 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool |
|
1820 |
|
1821 def main(): |
|
1822 test_main(unittest.TextTestRunner(verbosity=2).run) |
|
1823 |
|
1824 if __name__ == '__main__': |
|
1825 main() |