symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_kqueue.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """
       
     2 Tests for kqueue wrapper.
       
     3 """
       
     4 import socket
       
     5 import errno
       
     6 import time
       
     7 import select
       
     8 import sys
       
     9 import unittest
       
    10 
       
    11 from test import test_support
       
    12 if not hasattr(select, "kqueue"):
       
    13     raise test_support.TestSkipped("test works only on BSD")
       
    14 
       
    15 class TestKQueue(unittest.TestCase):
       
    16     def test_create_queue(self):
       
    17         kq = select.kqueue()
       
    18         self.assert_(kq.fileno() > 0, kq.fileno())
       
    19         self.assert_(not kq.closed)
       
    20         kq.close()
       
    21         self.assert_(kq.closed)
       
    22         self.assertRaises(ValueError, kq.fileno)
       
    23 
       
    24     def test_create_event(self):
       
    25         fd = sys.stderr.fileno()
       
    26         ev = select.kevent(fd)
       
    27         other = select.kevent(1000)
       
    28         self.assertEqual(ev.ident, fd)
       
    29         self.assertEqual(ev.filter, select.KQ_FILTER_READ)
       
    30         self.assertEqual(ev.flags, select.KQ_EV_ADD)
       
    31         self.assertEqual(ev.fflags, 0)
       
    32         self.assertEqual(ev.data, 0)
       
    33         self.assertEqual(ev.udata, 0)
       
    34         self.assertEqual(ev, ev)
       
    35         self.assertNotEqual(ev, other)
       
    36         self.assertEqual(cmp(ev, other), -1)
       
    37         self.assert_(ev < other)
       
    38         self.assert_(other >= ev)
       
    39         self.assertRaises(TypeError, cmp, ev, None)
       
    40         self.assertRaises(TypeError, cmp, ev, 1)
       
    41         self.assertRaises(TypeError, cmp, ev, "ev")
       
    42 
       
    43         ev = select.kevent(fd, select.KQ_FILTER_WRITE)
       
    44         self.assertEqual(ev.ident, fd)
       
    45         self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
       
    46         self.assertEqual(ev.flags, select.KQ_EV_ADD)
       
    47         self.assertEqual(ev.fflags, 0)
       
    48         self.assertEqual(ev.data, 0)
       
    49         self.assertEqual(ev.udata, 0)
       
    50         self.assertEqual(ev, ev)
       
    51         self.assertNotEqual(ev, other)
       
    52 
       
    53         ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
       
    54         self.assertEqual(ev.ident, fd)
       
    55         self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
       
    56         self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
       
    57         self.assertEqual(ev.fflags, 0)
       
    58         self.assertEqual(ev.data, 0)
       
    59         self.assertEqual(ev.udata, 0)
       
    60         self.assertEqual(ev, ev)
       
    61         self.assertNotEqual(ev, other)
       
    62 
       
    63         ev = select.kevent(1, 2, 3, 4, 5, 6)
       
    64         self.assertEqual(ev.ident, 1)
       
    65         self.assertEqual(ev.filter, 2)
       
    66         self.assertEqual(ev.flags, 3)
       
    67         self.assertEqual(ev.fflags, 4)
       
    68         self.assertEqual(ev.data, 5)
       
    69         self.assertEqual(ev.udata, 6)
       
    70         self.assertEqual(ev, ev)
       
    71         self.assertNotEqual(ev, other)
       
    72 
       
    73     def test_queue_event(self):
       
    74         serverSocket = socket.socket()
       
    75         serverSocket.bind(('127.0.0.1', 0))
       
    76         serverSocket.listen(1)
       
    77         client = socket.socket()
       
    78         client.setblocking(False)
       
    79         try:
       
    80             client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
       
    81         except socket.error, e:
       
    82             self.assertEquals(e.args[0], errno.EINPROGRESS)
       
    83         else:
       
    84             #raise AssertionError("Connect should have raised EINPROGRESS")
       
    85             pass # FreeBSD doesn't raise an exception here
       
    86         server, addr = serverSocket.accept()
       
    87 
       
    88         if sys.platform.startswith("darwin"):
       
    89             flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
       
    90         else:
       
    91             flags = 0
       
    92 
       
    93         kq = select.kqueue()
       
    94         kq2 = select.kqueue.fromfd(kq.fileno())
       
    95 
       
    96         ev = select.kevent(server.fileno(),
       
    97                            select.KQ_FILTER_WRITE,
       
    98                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
       
    99         kq.control([ev], 0)
       
   100         ev = select.kevent(server.fileno(),
       
   101                            select.KQ_FILTER_READ,
       
   102                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
       
   103         kq.control([ev], 0)
       
   104         ev = select.kevent(client.fileno(),
       
   105                            select.KQ_FILTER_WRITE,
       
   106                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
       
   107         kq2.control([ev], 0)
       
   108         ev = select.kevent(client.fileno(),
       
   109                            select.KQ_FILTER_READ,
       
   110                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
       
   111         kq2.control([ev], 0)
       
   112 
       
   113         events = kq.control(None, 4, 1)
       
   114         events = [(e.ident, e.filter, e.flags) for e in events]
       
   115         events.sort()
       
   116         self.assertEquals(events, [
       
   117             (client.fileno(), select.KQ_FILTER_WRITE, flags),
       
   118             (server.fileno(), select.KQ_FILTER_WRITE, flags)])
       
   119 
       
   120         client.send("Hello!")
       
   121         server.send("world!!!")
       
   122 
       
   123         events = kq.control(None, 4, 1)
       
   124         # We may need to call it several times
       
   125         for i in range(5):
       
   126             if len(events) == 4:
       
   127                 break
       
   128             events = kq.control(None, 4, 1)
       
   129         events = [(e.ident, e.filter, e.flags) for e in events]
       
   130         events.sort()
       
   131 
       
   132         self.assertEquals(events, [
       
   133             (client.fileno(), select.KQ_FILTER_WRITE, flags),
       
   134             (client.fileno(), select.KQ_FILTER_READ, flags),
       
   135             (server.fileno(), select.KQ_FILTER_WRITE, flags),
       
   136             (server.fileno(), select.KQ_FILTER_READ, flags)])
       
   137 
       
   138         # Remove completely client, and server read part
       
   139         ev = select.kevent(client.fileno(),
       
   140                            select.KQ_FILTER_WRITE,
       
   141                            select.KQ_EV_DELETE)
       
   142         kq.control([ev], 0)
       
   143         ev = select.kevent(client.fileno(),
       
   144                            select.KQ_FILTER_READ,
       
   145                            select.KQ_EV_DELETE)
       
   146         kq.control([ev], 0)
       
   147         ev = select.kevent(server.fileno(),
       
   148                            select.KQ_FILTER_READ,
       
   149                            select.KQ_EV_DELETE)
       
   150         kq.control([ev], 0, 0)
       
   151 
       
   152         events = kq.control([], 4, 0.99)
       
   153         events = [(e.ident, e.filter, e.flags) for e in events]
       
   154         events.sort()
       
   155         self.assertEquals(events, [
       
   156             (server.fileno(), select.KQ_FILTER_WRITE, flags)])
       
   157 
       
   158         client.close()
       
   159         server.close()
       
   160         serverSocket.close()
       
   161 
       
   162 def test_main():
       
   163     test_support.run_unittest(TestKQueue)
       
   164 
       
   165 if __name__ == "__main__":
       
   166     test_main()