|
1 import pprint |
|
2 import sys |
|
3 import unittest |
|
4 |
|
5 from test import test_support |
|
6 |
|
7 class TestGetProfile(unittest.TestCase): |
|
8 def setUp(self): |
|
9 sys.setprofile(None) |
|
10 |
|
11 def tearDown(self): |
|
12 sys.setprofile(None) |
|
13 |
|
14 def test_empty(self): |
|
15 assert sys.getprofile() == None |
|
16 |
|
17 def test_setget(self): |
|
18 def fn(*args): |
|
19 pass |
|
20 |
|
21 sys.setprofile(fn) |
|
22 assert sys.getprofile() == fn |
|
23 |
|
24 class HookWatcher: |
|
25 def __init__(self): |
|
26 self.frames = [] |
|
27 self.events = [] |
|
28 |
|
29 def callback(self, frame, event, arg): |
|
30 if (event == "call" |
|
31 or event == "return" |
|
32 or event == "exception"): |
|
33 self.add_event(event, frame) |
|
34 |
|
35 def add_event(self, event, frame=None): |
|
36 """Add an event to the log.""" |
|
37 if frame is None: |
|
38 frame = sys._getframe(1) |
|
39 |
|
40 try: |
|
41 frameno = self.frames.index(frame) |
|
42 except ValueError: |
|
43 frameno = len(self.frames) |
|
44 self.frames.append(frame) |
|
45 |
|
46 self.events.append((frameno, event, ident(frame))) |
|
47 |
|
48 def get_events(self): |
|
49 """Remove calls to add_event().""" |
|
50 disallowed = [ident(self.add_event.im_func), ident(ident)] |
|
51 self.frames = None |
|
52 |
|
53 return [item for item in self.events if item[2] not in disallowed] |
|
54 |
|
55 |
|
56 class ProfileSimulator(HookWatcher): |
|
57 def __init__(self, testcase): |
|
58 self.testcase = testcase |
|
59 self.stack = [] |
|
60 HookWatcher.__init__(self) |
|
61 |
|
62 def callback(self, frame, event, arg): |
|
63 # Callback registered with sys.setprofile()/sys.settrace() |
|
64 self.dispatch[event](self, frame) |
|
65 |
|
66 def trace_call(self, frame): |
|
67 self.add_event('call', frame) |
|
68 self.stack.append(frame) |
|
69 |
|
70 def trace_return(self, frame): |
|
71 self.add_event('return', frame) |
|
72 self.stack.pop() |
|
73 |
|
74 def trace_exception(self, frame): |
|
75 self.testcase.fail( |
|
76 "the profiler should never receive exception events") |
|
77 |
|
78 def trace_pass(self, frame): |
|
79 pass |
|
80 |
|
81 dispatch = { |
|
82 'call': trace_call, |
|
83 'exception': trace_exception, |
|
84 'return': trace_return, |
|
85 'c_call': trace_pass, |
|
86 'c_return': trace_pass, |
|
87 'c_exception': trace_pass, |
|
88 } |
|
89 |
|
90 |
|
91 class TestCaseBase(unittest.TestCase): |
|
92 def check_events(self, callable, expected): |
|
93 events = capture_events(callable, self.new_watcher()) |
|
94 if events != expected: |
|
95 self.fail("Expected events:\n%s\nReceived events:\n%s" |
|
96 % (pprint.pformat(expected), pprint.pformat(events))) |
|
97 |
|
98 |
|
99 class ProfileHookTestCase(TestCaseBase): |
|
100 def new_watcher(self): |
|
101 return HookWatcher() |
|
102 |
|
103 def test_simple(self): |
|
104 def f(p): |
|
105 pass |
|
106 f_ident = ident(f) |
|
107 self.check_events(f, [(1, 'call', f_ident), |
|
108 (1, 'return', f_ident), |
|
109 ]) |
|
110 |
|
111 def test_exception(self): |
|
112 def f(p): |
|
113 1/0 |
|
114 f_ident = ident(f) |
|
115 self.check_events(f, [(1, 'call', f_ident), |
|
116 (1, 'return', f_ident), |
|
117 ]) |
|
118 |
|
119 def test_caught_exception(self): |
|
120 def f(p): |
|
121 try: 1/0 |
|
122 except: pass |
|
123 f_ident = ident(f) |
|
124 self.check_events(f, [(1, 'call', f_ident), |
|
125 (1, 'return', f_ident), |
|
126 ]) |
|
127 |
|
128 def test_caught_nested_exception(self): |
|
129 def f(p): |
|
130 try: 1/0 |
|
131 except: pass |
|
132 f_ident = ident(f) |
|
133 self.check_events(f, [(1, 'call', f_ident), |
|
134 (1, 'return', f_ident), |
|
135 ]) |
|
136 |
|
137 def test_nested_exception(self): |
|
138 def f(p): |
|
139 1/0 |
|
140 f_ident = ident(f) |
|
141 self.check_events(f, [(1, 'call', f_ident), |
|
142 # This isn't what I expected: |
|
143 # (0, 'exception', protect_ident), |
|
144 # I expected this again: |
|
145 (1, 'return', f_ident), |
|
146 ]) |
|
147 |
|
148 def test_exception_in_except_clause(self): |
|
149 def f(p): |
|
150 1/0 |
|
151 def g(p): |
|
152 try: |
|
153 f(p) |
|
154 except: |
|
155 try: f(p) |
|
156 except: pass |
|
157 f_ident = ident(f) |
|
158 g_ident = ident(g) |
|
159 self.check_events(g, [(1, 'call', g_ident), |
|
160 (2, 'call', f_ident), |
|
161 (2, 'return', f_ident), |
|
162 (3, 'call', f_ident), |
|
163 (3, 'return', f_ident), |
|
164 (1, 'return', g_ident), |
|
165 ]) |
|
166 |
|
167 def test_exception_propogation(self): |
|
168 def f(p): |
|
169 1/0 |
|
170 def g(p): |
|
171 try: f(p) |
|
172 finally: p.add_event("falling through") |
|
173 f_ident = ident(f) |
|
174 g_ident = ident(g) |
|
175 self.check_events(g, [(1, 'call', g_ident), |
|
176 (2, 'call', f_ident), |
|
177 (2, 'return', f_ident), |
|
178 (1, 'falling through', g_ident), |
|
179 (1, 'return', g_ident), |
|
180 ]) |
|
181 |
|
182 def test_raise_twice(self): |
|
183 def f(p): |
|
184 try: 1/0 |
|
185 except: 1/0 |
|
186 f_ident = ident(f) |
|
187 self.check_events(f, [(1, 'call', f_ident), |
|
188 (1, 'return', f_ident), |
|
189 ]) |
|
190 |
|
191 def test_raise_reraise(self): |
|
192 def f(p): |
|
193 try: 1/0 |
|
194 except: raise |
|
195 f_ident = ident(f) |
|
196 self.check_events(f, [(1, 'call', f_ident), |
|
197 (1, 'return', f_ident), |
|
198 ]) |
|
199 |
|
200 def test_raise(self): |
|
201 def f(p): |
|
202 raise Exception() |
|
203 f_ident = ident(f) |
|
204 self.check_events(f, [(1, 'call', f_ident), |
|
205 (1, 'return', f_ident), |
|
206 ]) |
|
207 |
|
208 def test_distant_exception(self): |
|
209 def f(): |
|
210 1/0 |
|
211 def g(): |
|
212 f() |
|
213 def h(): |
|
214 g() |
|
215 def i(): |
|
216 h() |
|
217 def j(p): |
|
218 i() |
|
219 f_ident = ident(f) |
|
220 g_ident = ident(g) |
|
221 h_ident = ident(h) |
|
222 i_ident = ident(i) |
|
223 j_ident = ident(j) |
|
224 self.check_events(j, [(1, 'call', j_ident), |
|
225 (2, 'call', i_ident), |
|
226 (3, 'call', h_ident), |
|
227 (4, 'call', g_ident), |
|
228 (5, 'call', f_ident), |
|
229 (5, 'return', f_ident), |
|
230 (4, 'return', g_ident), |
|
231 (3, 'return', h_ident), |
|
232 (2, 'return', i_ident), |
|
233 (1, 'return', j_ident), |
|
234 ]) |
|
235 |
|
236 def test_generator(self): |
|
237 def f(): |
|
238 for i in range(2): |
|
239 yield i |
|
240 def g(p): |
|
241 for i in f(): |
|
242 pass |
|
243 f_ident = ident(f) |
|
244 g_ident = ident(g) |
|
245 self.check_events(g, [(1, 'call', g_ident), |
|
246 # call the iterator twice to generate values |
|
247 (2, 'call', f_ident), |
|
248 (2, 'return', f_ident), |
|
249 (2, 'call', f_ident), |
|
250 (2, 'return', f_ident), |
|
251 # once more; returns end-of-iteration with |
|
252 # actually raising an exception |
|
253 (2, 'call', f_ident), |
|
254 (2, 'return', f_ident), |
|
255 (1, 'return', g_ident), |
|
256 ]) |
|
257 |
|
258 def test_stop_iteration(self): |
|
259 def f(): |
|
260 for i in range(2): |
|
261 yield i |
|
262 raise StopIteration |
|
263 def g(p): |
|
264 for i in f(): |
|
265 pass |
|
266 f_ident = ident(f) |
|
267 g_ident = ident(g) |
|
268 self.check_events(g, [(1, 'call', g_ident), |
|
269 # call the iterator twice to generate values |
|
270 (2, 'call', f_ident), |
|
271 (2, 'return', f_ident), |
|
272 (2, 'call', f_ident), |
|
273 (2, 'return', f_ident), |
|
274 # once more to hit the raise: |
|
275 (2, 'call', f_ident), |
|
276 (2, 'return', f_ident), |
|
277 (1, 'return', g_ident), |
|
278 ]) |
|
279 |
|
280 |
|
281 class ProfileSimulatorTestCase(TestCaseBase): |
|
282 def new_watcher(self): |
|
283 return ProfileSimulator(self) |
|
284 |
|
285 def test_simple(self): |
|
286 def f(p): |
|
287 pass |
|
288 f_ident = ident(f) |
|
289 self.check_events(f, [(1, 'call', f_ident), |
|
290 (1, 'return', f_ident), |
|
291 ]) |
|
292 |
|
293 def test_basic_exception(self): |
|
294 def f(p): |
|
295 1/0 |
|
296 f_ident = ident(f) |
|
297 self.check_events(f, [(1, 'call', f_ident), |
|
298 (1, 'return', f_ident), |
|
299 ]) |
|
300 |
|
301 def test_caught_exception(self): |
|
302 def f(p): |
|
303 try: 1/0 |
|
304 except: pass |
|
305 f_ident = ident(f) |
|
306 self.check_events(f, [(1, 'call', f_ident), |
|
307 (1, 'return', f_ident), |
|
308 ]) |
|
309 |
|
310 def test_distant_exception(self): |
|
311 def f(): |
|
312 1/0 |
|
313 def g(): |
|
314 f() |
|
315 def h(): |
|
316 g() |
|
317 def i(): |
|
318 h() |
|
319 def j(p): |
|
320 i() |
|
321 f_ident = ident(f) |
|
322 g_ident = ident(g) |
|
323 h_ident = ident(h) |
|
324 i_ident = ident(i) |
|
325 j_ident = ident(j) |
|
326 self.check_events(j, [(1, 'call', j_ident), |
|
327 (2, 'call', i_ident), |
|
328 (3, 'call', h_ident), |
|
329 (4, 'call', g_ident), |
|
330 (5, 'call', f_ident), |
|
331 (5, 'return', f_ident), |
|
332 (4, 'return', g_ident), |
|
333 (3, 'return', h_ident), |
|
334 (2, 'return', i_ident), |
|
335 (1, 'return', j_ident), |
|
336 ]) |
|
337 |
|
338 |
|
339 def ident(function): |
|
340 if hasattr(function, "f_code"): |
|
341 code = function.f_code |
|
342 else: |
|
343 code = function.func_code |
|
344 return code.co_firstlineno, code.co_name |
|
345 |
|
346 |
|
347 def protect(f, p): |
|
348 try: f(p) |
|
349 except: pass |
|
350 |
|
351 protect_ident = ident(protect) |
|
352 |
|
353 |
|
354 def capture_events(callable, p=None): |
|
355 try: |
|
356 sys.setprofile() |
|
357 except TypeError: |
|
358 pass |
|
359 else: |
|
360 raise test_support.TestFailed( |
|
361 'sys.setprofile() did not raise TypeError') |
|
362 |
|
363 if p is None: |
|
364 p = HookWatcher() |
|
365 sys.setprofile(p.callback) |
|
366 protect(callable, p) |
|
367 sys.setprofile(None) |
|
368 return p.get_events()[1:-1] |
|
369 |
|
370 |
|
371 def show_events(callable): |
|
372 import pprint |
|
373 pprint.pprint(capture_events(callable)) |
|
374 |
|
375 |
|
376 def test_main(): |
|
377 test_support.run_unittest( |
|
378 TestGetProfile, |
|
379 ProfileHookTestCase, |
|
380 ProfileSimulatorTestCase |
|
381 ) |
|
382 |
|
383 |
|
384 if __name__ == "__main__": |
|
385 test_main() |