|
1 import httplib |
|
2 import StringIO |
|
3 import socket |
|
4 |
|
5 from unittest import TestCase |
|
6 |
|
7 from test import test_support |
|
8 |
|
9 HOST = test_support.HOST |
|
10 |
|
11 class FakeSocket: |
|
12 def __init__(self, text, fileclass=StringIO.StringIO): |
|
13 self.text = text |
|
14 self.fileclass = fileclass |
|
15 self.data = '' |
|
16 |
|
17 def sendall(self, data): |
|
18 self.data += data |
|
19 |
|
20 def makefile(self, mode, bufsize=None): |
|
21 if mode != 'r' and mode != 'rb': |
|
22 raise httplib.UnimplementedFileMode() |
|
23 return self.fileclass(self.text) |
|
24 |
|
25 class NoEOFStringIO(StringIO.StringIO): |
|
26 """Like StringIO, but raises AssertionError on EOF. |
|
27 |
|
28 This is used below to test that httplib doesn't try to read |
|
29 more from the underlying file than it should. |
|
30 """ |
|
31 def read(self, n=-1): |
|
32 data = StringIO.StringIO.read(self, n) |
|
33 if data == '': |
|
34 raise AssertionError('caller tried to read past EOF') |
|
35 return data |
|
36 |
|
37 def readline(self, length=None): |
|
38 data = StringIO.StringIO.readline(self, length) |
|
39 if data == '': |
|
40 raise AssertionError('caller tried to read past EOF') |
|
41 return data |
|
42 |
|
43 |
|
44 class HeaderTests(TestCase): |
|
45 def test_auto_headers(self): |
|
46 # Some headers are added automatically, but should not be added by |
|
47 # .request() if they are explicitly set. |
|
48 |
|
49 import httplib |
|
50 |
|
51 class HeaderCountingBuffer(list): |
|
52 def __init__(self): |
|
53 self.count = {} |
|
54 def append(self, item): |
|
55 kv = item.split(':') |
|
56 if len(kv) > 1: |
|
57 # item is a 'Key: Value' header string |
|
58 lcKey = kv[0].lower() |
|
59 self.count.setdefault(lcKey, 0) |
|
60 self.count[lcKey] += 1 |
|
61 list.append(self, item) |
|
62 |
|
63 for explicit_header in True, False: |
|
64 for header in 'Content-length', 'Host', 'Accept-encoding': |
|
65 conn = httplib.HTTPConnection('example.com') |
|
66 conn.sock = FakeSocket('blahblahblah') |
|
67 conn._buffer = HeaderCountingBuffer() |
|
68 |
|
69 body = 'spamspamspam' |
|
70 headers = {} |
|
71 if explicit_header: |
|
72 headers[header] = str(len(body)) |
|
73 conn.request('POST', '/', body, headers) |
|
74 self.assertEqual(conn._buffer.count[header.lower()], 1) |
|
75 |
|
76 class BasicTest(TestCase): |
|
77 def test_status_lines(self): |
|
78 # Test HTTP status lines |
|
79 |
|
80 body = "HTTP/1.1 200 Ok\r\n\r\nText" |
|
81 sock = FakeSocket(body) |
|
82 resp = httplib.HTTPResponse(sock) |
|
83 resp.begin() |
|
84 self.assertEqual(resp.read(), 'Text') |
|
85 self.assertTrue(resp.isclosed()) |
|
86 |
|
87 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" |
|
88 sock = FakeSocket(body) |
|
89 resp = httplib.HTTPResponse(sock) |
|
90 self.assertRaises(httplib.BadStatusLine, resp.begin) |
|
91 |
|
92 def test_partial_reads(self): |
|
93 # if we have a lenght, the system knows when to close itself |
|
94 # same behaviour than when we read the whole thing with read() |
|
95 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" |
|
96 sock = FakeSocket(body) |
|
97 resp = httplib.HTTPResponse(sock) |
|
98 resp.begin() |
|
99 self.assertEqual(resp.read(2), 'Te') |
|
100 self.assertFalse(resp.isclosed()) |
|
101 self.assertEqual(resp.read(2), 'xt') |
|
102 self.assertTrue(resp.isclosed()) |
|
103 |
|
104 def test_host_port(self): |
|
105 # Check invalid host_port |
|
106 |
|
107 for hp in ("www.python.org:abc", "www.python.org:"): |
|
108 self.assertRaises(httplib.InvalidURL, httplib.HTTP, hp) |
|
109 |
|
110 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b", 8000), |
|
111 ("www.python.org:80", "www.python.org", 80), |
|
112 ("www.python.org", "www.python.org", 80), |
|
113 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)): |
|
114 http = httplib.HTTP(hp) |
|
115 c = http._conn |
|
116 if h != c.host: self.fail("Host incorrectly parsed: %s != %s" % (h, c.host)) |
|
117 if p != c.port: self.fail("Port incorrectly parsed: %s != %s" % (p, c.host)) |
|
118 |
|
119 def test_response_headers(self): |
|
120 # test response with multiple message headers with the same field name. |
|
121 text = ('HTTP/1.1 200 OK\r\n' |
|
122 'Set-Cookie: Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"\r\n' |
|
123 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' |
|
124 ' Path="/acme"\r\n' |
|
125 '\r\n' |
|
126 'No body\r\n') |
|
127 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' |
|
128 ', ' |
|
129 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') |
|
130 s = FakeSocket(text) |
|
131 r = httplib.HTTPResponse(s) |
|
132 r.begin() |
|
133 cookies = r.getheader("Set-Cookie") |
|
134 if cookies != hdr: |
|
135 self.fail("multiple headers not combined properly") |
|
136 |
|
137 def test_read_head(self): |
|
138 # Test that the library doesn't attempt to read any data |
|
139 # from a HEAD request. (Tickles SF bug #622042.) |
|
140 sock = FakeSocket( |
|
141 'HTTP/1.1 200 OK\r\n' |
|
142 'Content-Length: 14432\r\n' |
|
143 '\r\n', |
|
144 NoEOFStringIO) |
|
145 resp = httplib.HTTPResponse(sock, method="HEAD") |
|
146 resp.begin() |
|
147 if resp.read() != "": |
|
148 self.fail("Did not expect response from HEAD request") |
|
149 |
|
150 def test_send_file(self): |
|
151 expected = 'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ |
|
152 'Accept-Encoding: identity\r\nContent-Length:' |
|
153 |
|
154 body = open(__file__, 'rb') |
|
155 conn = httplib.HTTPConnection('example.com') |
|
156 sock = FakeSocket(body) |
|
157 conn.sock = sock |
|
158 conn.request('GET', '/foo', body) |
|
159 self.assertTrue(sock.data.startswith(expected)) |
|
160 |
|
161 def test_chunked(self): |
|
162 chunked_start = ( |
|
163 'HTTP/1.1 200 OK\r\n' |
|
164 'Transfer-Encoding: chunked\r\n\r\n' |
|
165 'a\r\n' |
|
166 'hello worl\r\n' |
|
167 '1\r\n' |
|
168 'd\r\n' |
|
169 ) |
|
170 sock = FakeSocket(chunked_start + '0\r\n') |
|
171 resp = httplib.HTTPResponse(sock, method="GET") |
|
172 resp.begin() |
|
173 self.assertEquals(resp.read(), 'hello world') |
|
174 resp.close() |
|
175 |
|
176 for x in ('', 'foo\r\n'): |
|
177 sock = FakeSocket(chunked_start + x) |
|
178 resp = httplib.HTTPResponse(sock, method="GET") |
|
179 resp.begin() |
|
180 try: |
|
181 resp.read() |
|
182 except httplib.IncompleteRead, i: |
|
183 self.assertEquals(i.partial, 'hello world') |
|
184 else: |
|
185 self.fail('IncompleteRead expected') |
|
186 finally: |
|
187 resp.close() |
|
188 |
|
189 def test_negative_content_length(self): |
|
190 sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n') |
|
191 resp = httplib.HTTPResponse(sock, method="GET") |
|
192 resp.begin() |
|
193 self.assertEquals(resp.read(), 'Hello\r\n') |
|
194 resp.close() |
|
195 |
|
196 |
|
197 class OfflineTest(TestCase): |
|
198 def test_responses(self): |
|
199 self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found") |
|
200 |
|
201 class TimeoutTest(TestCase): |
|
202 PORT = None |
|
203 |
|
204 def setUp(self): |
|
205 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
206 TimeoutTest.PORT = test_support.bind_port(self.serv) |
|
207 self.serv.listen(5) |
|
208 |
|
209 def tearDown(self): |
|
210 self.serv.close() |
|
211 self.serv = None |
|
212 |
|
213 def testTimeoutAttribute(self): |
|
214 '''This will prove that the timeout gets through |
|
215 HTTPConnection and into the socket. |
|
216 ''' |
|
217 # default -- use global socket timeout |
|
218 self.assert_(socket.getdefaulttimeout() is None) |
|
219 socket.setdefaulttimeout(30) |
|
220 try: |
|
221 httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT) |
|
222 httpConn.connect() |
|
223 finally: |
|
224 socket.setdefaulttimeout(None) |
|
225 self.assertEqual(httpConn.sock.gettimeout(), 30) |
|
226 httpConn.close() |
|
227 |
|
228 # no timeout -- do not use global socket default |
|
229 self.assert_(socket.getdefaulttimeout() is None) |
|
230 socket.setdefaulttimeout(30) |
|
231 try: |
|
232 httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, |
|
233 timeout=None) |
|
234 httpConn.connect() |
|
235 finally: |
|
236 socket.setdefaulttimeout(None) |
|
237 self.assertEqual(httpConn.sock.gettimeout(), None) |
|
238 httpConn.close() |
|
239 |
|
240 # a value |
|
241 httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30) |
|
242 httpConn.connect() |
|
243 self.assertEqual(httpConn.sock.gettimeout(), 30) |
|
244 httpConn.close() |
|
245 |
|
246 |
|
247 class HTTPSTimeoutTest(TestCase): |
|
248 # XXX Here should be tests for HTTPS, there isn't any right now! |
|
249 |
|
250 def test_attributes(self): |
|
251 # simple test to check it's storing it |
|
252 if hasattr(httplib, 'HTTPSConnection'): |
|
253 h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) |
|
254 self.assertEqual(h.timeout, 30) |
|
255 |
|
256 def test_main(verbose=None): |
|
257 test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, |
|
258 HTTPSTimeoutTest) |
|
259 |
|
260 if __name__ == '__main__': |
|
261 test_main() |