|
1 """ |
|
2 |
|
3 Test case for causing uncatched exception in thread |
|
4 |
|
5 Mikko Ohtamaa <mikko@redinnovation.com> |
|
6 |
|
7 |
|
8 """ |
|
9 import e32 |
|
10 import thread |
|
11 import time |
|
12 import sys |
|
13 import traceback |
|
14 |
|
15 import positioning |
|
16 |
|
17 done = False |
|
18 |
|
19 |
|
20 debug_buf = [] |
|
21 |
|
22 def debug(msg): |
|
23 """ Output debug message from non-main thread """ |
|
24 debug_buf.append(msg) |
|
25 |
|
26 def cb(data): |
|
27 debug(str(data)) |
|
28 |
|
29 def go_thread(): |
|
30 try: |
|
31 debug("GPS thread launching") |
|
32 positioning.set_requestors([{"type":"service","format":"application","data":"test_app"}]) |
|
33 |
|
34 # interval: position update periof when gps is on (microseconds) |
|
35 # callback: callable to call when position update is available |
|
36 positioning.position(callback=cb, |
|
37 interval=5*1000*1000, |
|
38 satellites=1, |
|
39 course=1, |
|
40 partial = 1) |
|
41 |
|
42 death_point = time.time() + 10 |
|
43 while time.time() < death_point : |
|
44 debug("GPS thread looping") |
|
45 e32.ao_sleep(1) |
|
46 except: |
|
47 info = sys.exc_info() |
|
48 debug(traceback.format_exception(*info)) |
|
49 |
|
50 def run(): |
|
51 global debug_buf |
|
52 |
|
53 death_point = time.time() +10 |
|
54 |
|
55 test_thread = time.time()+2 |
|
56 while time.time() < death_point : |
|
57 print "Pending" |
|
58 e32.ao_sleep(2) |
|
59 |
|
60 while len(debug_buf) > 0: |
|
61 msg = debug_buf[0] |
|
62 debug_buf = debug_buf[1:] |
|
63 if len(debug_buf) == 1: |
|
64 break |
|
65 print "Buffer %d Message: %s" % (len(debug_buf), msg) |
|
66 |
|
67 if time.time() > test_thread: |
|
68 thread.start_new_thread(go_thread, ()) |
|
69 |
|
70 # This row is never reached |
|
71 # The Python Interpreter shell is totally frozen |
|
72 # and it refuses to be terminated via Python task manager |
|
73 # -> phone must be restarted |
|
74 print "Done" |
|
75 |
|
76 |
|
77 if __name__ == "__main__": |
|
78 run() |