|
1 # Test case for the os.poll() function |
|
2 |
|
3 import os, select, random, unittest |
|
4 from test.test_support import TestSkipped, TESTFN, run_unittest |
|
5 |
|
6 try: |
|
7 select.poll |
|
8 except AttributeError: |
|
9 raise TestSkipped, "select.poll not defined -- skipping test_poll" |
|
10 |
|
11 |
|
12 def find_ready_matching(ready, flag): |
|
13 match = [] |
|
14 for fd, mode in ready: |
|
15 if mode & flag: |
|
16 match.append(fd) |
|
17 return match |
|
18 |
|
19 class PollTests(unittest.TestCase): |
|
20 |
|
21 def test_poll1(self): |
|
22 # Basic functional test of poll object |
|
23 # Create a bunch of pipe and test that poll works with them. |
|
24 |
|
25 p = select.poll() |
|
26 |
|
27 NUM_PIPES = 12 |
|
28 MSG = " This is a test." |
|
29 MSG_LEN = len(MSG) |
|
30 readers = [] |
|
31 writers = [] |
|
32 r2w = {} |
|
33 w2r = {} |
|
34 |
|
35 for i in range(NUM_PIPES): |
|
36 rd, wr = os.pipe() |
|
37 p.register(rd) |
|
38 p.modify(rd, select.POLLIN) |
|
39 p.register(wr, select.POLLOUT) |
|
40 readers.append(rd) |
|
41 writers.append(wr) |
|
42 r2w[rd] = wr |
|
43 w2r[wr] = rd |
|
44 |
|
45 bufs = [] |
|
46 |
|
47 while writers: |
|
48 ready = p.poll() |
|
49 ready_writers = find_ready_matching(ready, select.POLLOUT) |
|
50 if not ready_writers: |
|
51 raise RuntimeError, "no pipes ready for writing" |
|
52 wr = random.choice(ready_writers) |
|
53 os.write(wr, MSG) |
|
54 |
|
55 ready = p.poll() |
|
56 ready_readers = find_ready_matching(ready, select.POLLIN) |
|
57 if not ready_readers: |
|
58 raise RuntimeError, "no pipes ready for reading" |
|
59 rd = random.choice(ready_readers) |
|
60 buf = os.read(rd, MSG_LEN) |
|
61 self.assertEqual(len(buf), MSG_LEN) |
|
62 bufs.append(buf) |
|
63 os.close(r2w[rd]) ; os.close( rd ) |
|
64 p.unregister( r2w[rd] ) |
|
65 p.unregister( rd ) |
|
66 writers.remove(r2w[rd]) |
|
67 |
|
68 self.assertEqual(bufs, [MSG] * NUM_PIPES) |
|
69 |
|
70 def poll_unit_tests(self): |
|
71 # returns NVAL for invalid file descriptor |
|
72 FD = 42 |
|
73 try: |
|
74 os.close(FD) |
|
75 except OSError: |
|
76 pass |
|
77 p = select.poll() |
|
78 p.register(FD) |
|
79 r = p.poll() |
|
80 self.assertEqual(r[0], (FD, select.POLLNVAL)) |
|
81 |
|
82 f = open(TESTFN, 'w') |
|
83 fd = f.fileno() |
|
84 p = select.poll() |
|
85 p.register(f) |
|
86 r = p.poll() |
|
87 self.assertEqual(r[0][0], fd) |
|
88 f.close() |
|
89 r = p.poll() |
|
90 self.assertEqual(r[0], (fd, select.POLLNVAL)) |
|
91 os.unlink(TESTFN) |
|
92 |
|
93 # type error for invalid arguments |
|
94 p = select.poll() |
|
95 self.assertRaises(TypeError, p.register, p) |
|
96 self.assertRaises(TypeError, p.unregister, p) |
|
97 |
|
98 # can't unregister non-existent object |
|
99 p = select.poll() |
|
100 self.assertRaises(KeyError, p.unregister, 3) |
|
101 |
|
102 # Test error cases |
|
103 pollster = select.poll() |
|
104 class Nope: |
|
105 pass |
|
106 |
|
107 class Almost: |
|
108 def fileno(self): |
|
109 return 'fileno' |
|
110 |
|
111 self.assertRaises(TypeError, pollster.register, Nope(), 0) |
|
112 self.assertRaises(TypeError, pollster.register, Almost(), 0) |
|
113 |
|
114 # Another test case for poll(). This is copied from the test case for |
|
115 # select(), modified to use poll() instead. |
|
116 |
|
117 def test_poll2(self): |
|
118 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' |
|
119 p = os.popen(cmd, 'r') |
|
120 pollster = select.poll() |
|
121 pollster.register( p, select.POLLIN ) |
|
122 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: |
|
123 fdlist = pollster.poll(tout) |
|
124 if (fdlist == []): |
|
125 continue |
|
126 fd, flags = fdlist[0] |
|
127 if flags & select.POLLHUP: |
|
128 line = p.readline() |
|
129 if line != "": |
|
130 self.fail('error: pipe seems to be closed, but still returns data') |
|
131 continue |
|
132 |
|
133 elif flags & select.POLLIN: |
|
134 line = p.readline() |
|
135 if not line: |
|
136 break |
|
137 continue |
|
138 else: |
|
139 self.fail('Unexpected return value from select.poll: %s' % fdlist) |
|
140 p.close() |
|
141 |
|
142 def test_poll3(self): |
|
143 # test int overflow |
|
144 pollster = select.poll() |
|
145 pollster.register(1) |
|
146 |
|
147 self.assertRaises(OverflowError, pollster.poll, 1L << 64) |
|
148 |
|
149 x = 2 + 3 |
|
150 if x != 5: |
|
151 self.fail('Overflow must have occurred') |
|
152 |
|
153 def test_main(): |
|
154 run_unittest(PollTests) |
|
155 |
|
156 if __name__ == '__main__': |
|
157 test_main() |