|
1 # Copyright (c) 2001-2006 Twisted Matrix Laboratories. |
|
2 # |
|
3 # Permission is hereby granted, free of charge, to any person obtaining |
|
4 # a copy of this software and associated documentation files (the |
|
5 # "Software"), to deal in the Software without restriction, including |
|
6 # without limitation the rights to use, copy, modify, merge, publish, |
|
7 # distribute, sublicense, and/or sell copies of the Software, and to |
|
8 # permit persons to whom the Software is furnished to do so, subject to |
|
9 # the following conditions: |
|
10 # |
|
11 # The above copyright notice and this permission notice shall be |
|
12 # included in all copies or substantial portions of the Software. |
|
13 # |
|
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
|
15 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
|
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
|
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE |
|
18 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
|
19 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION |
|
20 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|
21 """ |
|
22 Tests for epoll wrapper. |
|
23 """ |
|
24 import os |
|
25 import socket |
|
26 import errno |
|
27 import time |
|
28 import select |
|
29 import tempfile |
|
30 import unittest |
|
31 |
|
32 from test import test_support |
|
33 if not hasattr(select, "epoll"): |
|
34 raise test_support.TestSkipped("test works only on Linux 2.6") |
|
35 |
|
36 try: |
|
37 select.epoll() |
|
38 except IOError, e: |
|
39 if e.errno == errno.ENOSYS: |
|
40 raise test_support.TestSkipped("kernel doesn't support epoll()") |
|
41 |
|
42 class TestEPoll(unittest.TestCase): |
|
43 |
|
44 def setUp(self): |
|
45 self.serverSocket = socket.socket() |
|
46 self.serverSocket.bind(('127.0.0.1', 0)) |
|
47 self.serverSocket.listen(1) |
|
48 self.connections = [self.serverSocket] |
|
49 |
|
50 |
|
51 def tearDown(self): |
|
52 for skt in self.connections: |
|
53 skt.close() |
|
54 |
|
55 def _connected_pair(self): |
|
56 client = socket.socket() |
|
57 client.setblocking(False) |
|
58 try: |
|
59 client.connect(('127.0.0.1', self.serverSocket.getsockname()[1])) |
|
60 except socket.error, e: |
|
61 self.assertEquals(e.args[0], errno.EINPROGRESS) |
|
62 else: |
|
63 raise AssertionError("Connect should have raised EINPROGRESS") |
|
64 server, addr = self.serverSocket.accept() |
|
65 |
|
66 self.connections.extend((client, server)) |
|
67 return client, server |
|
68 |
|
69 def test_create(self): |
|
70 try: |
|
71 ep = select.epoll(16) |
|
72 except OSError, e: |
|
73 raise AssertionError(str(e)) |
|
74 self.assert_(ep.fileno() > 0, ep.fileno()) |
|
75 self.assert_(not ep.closed) |
|
76 ep.close() |
|
77 self.assert_(ep.closed) |
|
78 self.assertRaises(ValueError, ep.fileno) |
|
79 |
|
80 def test_badcreate(self): |
|
81 self.assertRaises(TypeError, select.epoll, 1, 2, 3) |
|
82 self.assertRaises(TypeError, select.epoll, 'foo') |
|
83 self.assertRaises(TypeError, select.epoll, None) |
|
84 self.assertRaises(TypeError, select.epoll, ()) |
|
85 self.assertRaises(TypeError, select.epoll, ['foo']) |
|
86 self.assertRaises(TypeError, select.epoll, {}) |
|
87 |
|
88 def test_add(self): |
|
89 server, client = self._connected_pair() |
|
90 |
|
91 ep = select.epoll(2) |
|
92 try: |
|
93 ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) |
|
94 ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) |
|
95 finally: |
|
96 ep.close() |
|
97 |
|
98 def test_fromfd(self): |
|
99 server, client = self._connected_pair() |
|
100 |
|
101 ep = select.epoll(2) |
|
102 ep2 = select.epoll.fromfd(ep.fileno()) |
|
103 |
|
104 ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) |
|
105 ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) |
|
106 |
|
107 events = ep.poll(1, 4) |
|
108 events2 = ep2.poll(0.9, 4) |
|
109 self.assertEqual(len(events), 2) |
|
110 self.assertEqual(len(events2), 2) |
|
111 |
|
112 ep.close() |
|
113 try: |
|
114 ep2.poll(1, 4) |
|
115 except IOError, e: |
|
116 self.failUnlessEqual(e.args[0], errno.EBADF, e) |
|
117 else: |
|
118 self.fail("epoll on closed fd didn't raise EBADF") |
|
119 |
|
120 def test_control_and_wait(self): |
|
121 client, server = self._connected_pair() |
|
122 |
|
123 ep = select.epoll(16) |
|
124 ep.register(server.fileno(), |
|
125 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) |
|
126 ep.register(client.fileno(), |
|
127 select.EPOLLIN | select.EPOLLOUT | select.EPOLLET) |
|
128 |
|
129 now = time.time() |
|
130 events = ep.poll(1, 4) |
|
131 then = time.time() |
|
132 self.failIf(then - now > 0.1, then - now) |
|
133 |
|
134 events.sort() |
|
135 expected = [(client.fileno(), select.EPOLLOUT), |
|
136 (server.fileno(), select.EPOLLOUT)] |
|
137 expected.sort() |
|
138 |
|
139 self.assertEquals(events, expected) |
|
140 self.failIf(then - now > 0.01, then - now) |
|
141 |
|
142 now = time.time() |
|
143 events = ep.poll(timeout=2.1, maxevents=4) |
|
144 then = time.time() |
|
145 self.failIf(events) |
|
146 |
|
147 client.send("Hello!") |
|
148 server.send("world!!!") |
|
149 |
|
150 now = time.time() |
|
151 events = ep.poll(1, 4) |
|
152 then = time.time() |
|
153 self.failIf(then - now > 0.01) |
|
154 |
|
155 events.sort() |
|
156 expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT), |
|
157 (server.fileno(), select.EPOLLIN | select.EPOLLOUT)] |
|
158 expected.sort() |
|
159 |
|
160 self.assertEquals(events, expected) |
|
161 |
|
162 ep.unregister(client.fileno()) |
|
163 ep.modify(server.fileno(), select.EPOLLOUT) |
|
164 now = time.time() |
|
165 events = ep.poll(1, 4) |
|
166 then = time.time() |
|
167 self.failIf(then - now > 0.01) |
|
168 |
|
169 expected = [(server.fileno(), select.EPOLLOUT)] |
|
170 self.assertEquals(events, expected) |
|
171 |
|
172 def test_errors(self): |
|
173 self.assertRaises(ValueError, select.epoll, -2) |
|
174 self.assertRaises(ValueError, select.epoll().register, -1, |
|
175 select.EPOLLIN) |
|
176 |
|
177 def test_unregister_closed(self): |
|
178 server, client = self._connected_pair() |
|
179 fd = server.fileno() |
|
180 ep = select.epoll(16) |
|
181 ep.register(server) |
|
182 |
|
183 now = time.time() |
|
184 events = ep.poll(1, 4) |
|
185 then = time.time() |
|
186 self.failIf(then - now > 0.01) |
|
187 |
|
188 server.close() |
|
189 ep.unregister(fd) |
|
190 |
|
191 def test_main(): |
|
192 test_support.run_unittest(TestEPoll) |
|
193 |
|
194 if __name__ == "__main__": |
|
195 test_main() |