|
1 import unittest |
|
2 from test import test_support |
|
3 from contextlib import closing, nested |
|
4 import gc |
|
5 import pickle |
|
6 import select |
|
7 import signal |
|
8 import subprocess |
|
9 import traceback |
|
10 import sys, os, time, errno |
|
11 |
|
12 if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos': |
|
13 raise test_support.TestSkipped("Can't test signal on %s" % \ |
|
14 sys.platform) |
|
15 |
|
16 |
|
17 class HandlerBCalled(Exception): |
|
18 pass |
|
19 |
|
20 |
|
21 def exit_subprocess(): |
|
22 """Use os._exit(0) to exit the current subprocess. |
|
23 |
|
24 Otherwise, the test catches the SystemExit and continues executing |
|
25 in parallel with the original test, so you wind up with an |
|
26 exponential number of tests running concurrently. |
|
27 """ |
|
28 os._exit(0) |
|
29 |
|
30 |
|
31 def ignoring_eintr(__func, *args, **kwargs): |
|
32 try: |
|
33 return __func(*args, **kwargs) |
|
34 except EnvironmentError as e: |
|
35 if e.errno != errno.EINTR: |
|
36 raise |
|
37 return None |
|
38 |
|
39 |
|
40 class InterProcessSignalTests(unittest.TestCase): |
|
41 MAX_DURATION = 20 # Entire test should last at most 20 sec. |
|
42 |
|
43 def setUp(self): |
|
44 self.using_gc = gc.isenabled() |
|
45 gc.disable() |
|
46 |
|
47 def tearDown(self): |
|
48 if self.using_gc: |
|
49 gc.enable() |
|
50 |
|
51 def format_frame(self, frame, limit=None): |
|
52 return ''.join(traceback.format_stack(frame, limit=limit)) |
|
53 |
|
54 def handlerA(self, signum, frame): |
|
55 self.a_called = True |
|
56 if test_support.verbose: |
|
57 print "handlerA invoked from signal %s at:\n%s" % ( |
|
58 signum, self.format_frame(frame, limit=1)) |
|
59 |
|
60 def handlerB(self, signum, frame): |
|
61 self.b_called = True |
|
62 if test_support.verbose: |
|
63 print "handlerB invoked from signal %s at:\n%s" % ( |
|
64 signum, self.format_frame(frame, limit=1)) |
|
65 raise HandlerBCalled(signum, self.format_frame(frame)) |
|
66 |
|
67 def wait(self, child): |
|
68 """Wait for child to finish, ignoring EINTR.""" |
|
69 while True: |
|
70 try: |
|
71 child.wait() |
|
72 return |
|
73 except OSError as e: |
|
74 if e.errno != errno.EINTR: |
|
75 raise |
|
76 |
|
77 def run_test(self): |
|
78 # Install handlers. This function runs in a sub-process, so we |
|
79 # don't worry about re-setting the default handlers. |
|
80 signal.signal(signal.SIGHUP, self.handlerA) |
|
81 signal.signal(signal.SIGUSR1, self.handlerB) |
|
82 signal.signal(signal.SIGUSR2, signal.SIG_IGN) |
|
83 signal.signal(signal.SIGALRM, signal.default_int_handler) |
|
84 |
|
85 # Variables the signals will modify: |
|
86 self.a_called = False |
|
87 self.b_called = False |
|
88 |
|
89 # Let the sub-processes know who to send signals to. |
|
90 pid = os.getpid() |
|
91 if test_support.verbose: |
|
92 print "test runner's pid is", pid |
|
93 |
|
94 child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)]) |
|
95 if child: |
|
96 self.wait(child) |
|
97 if not self.a_called: |
|
98 time.sleep(1) # Give the signal time to be delivered. |
|
99 self.assertTrue(self.a_called) |
|
100 self.assertFalse(self.b_called) |
|
101 self.a_called = False |
|
102 |
|
103 # Make sure the signal isn't delivered while the previous |
|
104 # Popen object is being destroyed, because __del__ swallows |
|
105 # exceptions. |
|
106 del child |
|
107 try: |
|
108 child = subprocess.Popen(['kill', '-USR1', str(pid)]) |
|
109 # This wait should be interrupted by the signal's exception. |
|
110 self.wait(child) |
|
111 time.sleep(1) # Give the signal time to be delivered. |
|
112 self.fail('HandlerBCalled exception not thrown') |
|
113 except HandlerBCalled: |
|
114 self.assertTrue(self.b_called) |
|
115 self.assertFalse(self.a_called) |
|
116 if test_support.verbose: |
|
117 print "HandlerBCalled exception caught" |
|
118 |
|
119 child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)]) |
|
120 if child: |
|
121 self.wait(child) # Nothing should happen. |
|
122 |
|
123 try: |
|
124 signal.alarm(1) |
|
125 # The race condition in pause doesn't matter in this case, |
|
126 # since alarm is going to raise a KeyboardException, which |
|
127 # will skip the call. |
|
128 signal.pause() |
|
129 # But if another signal arrives before the alarm, pause |
|
130 # may return early. |
|
131 time.sleep(1) |
|
132 except KeyboardInterrupt: |
|
133 if test_support.verbose: |
|
134 print "KeyboardInterrupt (the alarm() went off)" |
|
135 except: |
|
136 self.fail("Some other exception woke us from pause: %s" % |
|
137 traceback.format_exc()) |
|
138 else: |
|
139 self.fail("pause returned of its own accord, and the signal" |
|
140 " didn't arrive after another second.") |
|
141 |
|
142 def test_main(self): |
|
143 # This function spawns a child process to insulate the main |
|
144 # test-running process from all the signals. It then |
|
145 # communicates with that child process over a pipe and |
|
146 # re-raises information about any exceptions the child |
|
147 # throws. The real work happens in self.run_test(). |
|
148 os_done_r, os_done_w = os.pipe() |
|
149 with nested(closing(os.fdopen(os_done_r)), |
|
150 closing(os.fdopen(os_done_w, 'w'))) as (done_r, done_w): |
|
151 child = os.fork() |
|
152 if child == 0: |
|
153 # In the child process; run the test and report results |
|
154 # through the pipe. |
|
155 try: |
|
156 done_r.close() |
|
157 # Have to close done_w again here because |
|
158 # exit_subprocess() will skip the enclosing with block. |
|
159 with closing(done_w): |
|
160 try: |
|
161 self.run_test() |
|
162 except: |
|
163 pickle.dump(traceback.format_exc(), done_w) |
|
164 else: |
|
165 pickle.dump(None, done_w) |
|
166 except: |
|
167 print 'Uh oh, raised from pickle.' |
|
168 traceback.print_exc() |
|
169 finally: |
|
170 exit_subprocess() |
|
171 |
|
172 done_w.close() |
|
173 # Block for up to MAX_DURATION seconds for the test to finish. |
|
174 r, w, x = select.select([done_r], [], [], self.MAX_DURATION) |
|
175 if done_r in r: |
|
176 tb = pickle.load(done_r) |
|
177 if tb: |
|
178 self.fail(tb) |
|
179 else: |
|
180 os.kill(child, signal.SIGKILL) |
|
181 self.fail('Test deadlocked after %d seconds.' % |
|
182 self.MAX_DURATION) |
|
183 |
|
184 |
|
185 class BasicSignalTests(unittest.TestCase): |
|
186 def trivial_signal_handler(self, *args): |
|
187 pass |
|
188 |
|
189 def test_out_of_range_signal_number_raises_error(self): |
|
190 self.assertRaises(ValueError, signal.getsignal, 4242) |
|
191 |
|
192 self.assertRaises(ValueError, signal.signal, 4242, |
|
193 self.trivial_signal_handler) |
|
194 |
|
195 def test_setting_signal_handler_to_none_raises_error(self): |
|
196 self.assertRaises(TypeError, signal.signal, |
|
197 signal.SIGUSR1, None) |
|
198 |
|
199 def test_getsignal(self): |
|
200 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) |
|
201 self.assertEquals(signal.getsignal(signal.SIGHUP), |
|
202 self.trivial_signal_handler) |
|
203 signal.signal(signal.SIGHUP, hup) |
|
204 self.assertEquals(signal.getsignal(signal.SIGHUP), hup) |
|
205 |
|
206 |
|
207 class WakeupSignalTests(unittest.TestCase): |
|
208 TIMEOUT_FULL = 10 |
|
209 TIMEOUT_HALF = 5 |
|
210 |
|
211 def test_wakeup_fd_early(self): |
|
212 import select |
|
213 |
|
214 signal.alarm(1) |
|
215 before_time = time.time() |
|
216 # We attempt to get a signal during the sleep, |
|
217 # before select is called |
|
218 time.sleep(self.TIMEOUT_FULL) |
|
219 mid_time = time.time() |
|
220 self.assert_(mid_time - before_time < self.TIMEOUT_HALF) |
|
221 select.select([self.read], [], [], self.TIMEOUT_FULL) |
|
222 after_time = time.time() |
|
223 self.assert_(after_time - mid_time < self.TIMEOUT_HALF) |
|
224 |
|
225 def test_wakeup_fd_during(self): |
|
226 import select |
|
227 |
|
228 signal.alarm(1) |
|
229 before_time = time.time() |
|
230 # We attempt to get a signal during the select call |
|
231 self.assertRaises(select.error, select.select, |
|
232 [self.read], [], [], self.TIMEOUT_FULL) |
|
233 after_time = time.time() |
|
234 self.assert_(after_time - before_time < self.TIMEOUT_HALF) |
|
235 |
|
236 def setUp(self): |
|
237 import fcntl |
|
238 |
|
239 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None) |
|
240 self.read, self.write = os.pipe() |
|
241 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0) |
|
242 flags = flags | os.O_NONBLOCK |
|
243 fcntl.fcntl(self.write, fcntl.F_SETFL, flags) |
|
244 self.old_wakeup = signal.set_wakeup_fd(self.write) |
|
245 |
|
246 def tearDown(self): |
|
247 signal.set_wakeup_fd(self.old_wakeup) |
|
248 os.close(self.read) |
|
249 os.close(self.write) |
|
250 signal.signal(signal.SIGALRM, self.alrm) |
|
251 |
|
252 class SiginterruptTest(unittest.TestCase): |
|
253 signum = signal.SIGUSR1 |
|
254 def readpipe_interrupted(self, cb): |
|
255 r, w = os.pipe() |
|
256 ppid = os.getpid() |
|
257 pid = os.fork() |
|
258 |
|
259 oldhandler = signal.signal(self.signum, lambda x,y: None) |
|
260 cb() |
|
261 if pid==0: |
|
262 # child code: sleep, kill, sleep. and then exit, |
|
263 # which closes the pipe from which the parent process reads |
|
264 try: |
|
265 time.sleep(0.2) |
|
266 os.kill(ppid, self.signum) |
|
267 time.sleep(0.2) |
|
268 finally: |
|
269 exit_subprocess() |
|
270 |
|
271 try: |
|
272 os.close(w) |
|
273 |
|
274 try: |
|
275 d=os.read(r, 1) |
|
276 return False |
|
277 except OSError, err: |
|
278 if err.errno != errno.EINTR: |
|
279 raise |
|
280 return True |
|
281 finally: |
|
282 signal.signal(self.signum, oldhandler) |
|
283 os.waitpid(pid, 0) |
|
284 |
|
285 def test_without_siginterrupt(self): |
|
286 i=self.readpipe_interrupted(lambda: None) |
|
287 self.assertEquals(i, True) |
|
288 |
|
289 def test_siginterrupt_on(self): |
|
290 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1)) |
|
291 self.assertEquals(i, True) |
|
292 |
|
293 def test_siginterrupt_off(self): |
|
294 i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0)) |
|
295 self.assertEquals(i, False) |
|
296 |
|
297 class ItimerTest(unittest.TestCase): |
|
298 def setUp(self): |
|
299 self.hndl_called = False |
|
300 self.hndl_count = 0 |
|
301 self.itimer = None |
|
302 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) |
|
303 |
|
304 def tearDown(self): |
|
305 signal.signal(signal.SIGALRM, self.old_alarm) |
|
306 if self.itimer is not None: # test_itimer_exc doesn't change this attr |
|
307 # just ensure that itimer is stopped |
|
308 signal.setitimer(self.itimer, 0) |
|
309 |
|
310 def sig_alrm(self, *args): |
|
311 self.hndl_called = True |
|
312 if test_support.verbose: |
|
313 print("SIGALRM handler invoked", args) |
|
314 |
|
315 def sig_vtalrm(self, *args): |
|
316 self.hndl_called = True |
|
317 |
|
318 if self.hndl_count > 3: |
|
319 # it shouldn't be here, because it should have been disabled. |
|
320 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " |
|
321 "timer.") |
|
322 elif self.hndl_count == 3: |
|
323 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore |
|
324 signal.setitimer(signal.ITIMER_VIRTUAL, 0) |
|
325 if test_support.verbose: |
|
326 print("last SIGVTALRM handler call") |
|
327 |
|
328 self.hndl_count += 1 |
|
329 |
|
330 if test_support.verbose: |
|
331 print("SIGVTALRM handler invoked", args) |
|
332 |
|
333 def sig_prof(self, *args): |
|
334 self.hndl_called = True |
|
335 signal.setitimer(signal.ITIMER_PROF, 0) |
|
336 |
|
337 if test_support.verbose: |
|
338 print("SIGPROF handler invoked", args) |
|
339 |
|
340 def test_itimer_exc(self): |
|
341 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform |
|
342 # defines it ? |
|
343 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) |
|
344 # Negative times are treated as zero on some platforms. |
|
345 if 0: |
|
346 self.assertRaises(signal.ItimerError, |
|
347 signal.setitimer, signal.ITIMER_REAL, -1) |
|
348 |
|
349 def test_itimer_real(self): |
|
350 self.itimer = signal.ITIMER_REAL |
|
351 signal.setitimer(self.itimer, 1.0) |
|
352 if test_support.verbose: |
|
353 print("\ncall pause()...") |
|
354 signal.pause() |
|
355 |
|
356 self.assertEqual(self.hndl_called, True) |
|
357 |
|
358 def test_itimer_virtual(self): |
|
359 self.itimer = signal.ITIMER_VIRTUAL |
|
360 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) |
|
361 signal.setitimer(self.itimer, 0.3, 0.2) |
|
362 |
|
363 for i in xrange(100000000): |
|
364 if signal.getitimer(self.itimer) == (0.0, 0.0): |
|
365 break # sig_vtalrm handler stopped this itimer |
|
366 |
|
367 # virtual itimer should be (0.0, 0.0) now |
|
368 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0)) |
|
369 # and the handler should have been called |
|
370 self.assertEquals(self.hndl_called, True) |
|
371 |
|
372 def test_itimer_prof(self): |
|
373 self.itimer = signal.ITIMER_PROF |
|
374 signal.signal(signal.SIGPROF, self.sig_prof) |
|
375 signal.setitimer(self.itimer, 0.2, 0.2) |
|
376 |
|
377 for i in xrange(100000000): |
|
378 if signal.getitimer(self.itimer) == (0.0, 0.0): |
|
379 break # sig_prof handler stopped this itimer |
|
380 |
|
381 # profiling itimer should be (0.0, 0.0) now |
|
382 self.assertEquals(signal.getitimer(self.itimer), (0.0, 0.0)) |
|
383 # and the handler should have been called |
|
384 self.assertEqual(self.hndl_called, True) |
|
385 |
|
386 def test_main(): |
|
387 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests, |
|
388 WakeupSignalTests, SiginterruptTest, ItimerTest) |
|
389 |
|
390 |
|
391 if __name__ == "__main__": |
|
392 test_main() |