|
1 import httplib |
|
2 import StringIO |
|
3 import sys |
|
4 |
|
5 from unittest import TestCase |
|
6 |
|
7 from test import test_support |
|
8 |
|
9 class FakeSocket: |
|
10 def __init__(self, text, fileclass=StringIO.StringIO): |
|
11 self.text = text |
|
12 self.fileclass = fileclass |
|
13 |
|
14 def sendall(self, data): |
|
15 self.data = data |
|
16 |
|
17 def makefile(self, mode, bufsize=None): |
|
18 if mode != 'r' and mode != 'rb': |
|
19 raise httplib.UnimplementedFileMode() |
|
20 return self.fileclass(self.text) |
|
21 |
|
22 class NoEOFStringIO(StringIO.StringIO): |
|
23 """Like StringIO, but raises AssertionError on EOF. |
|
24 |
|
25 This is used below to test that httplib doesn't try to read |
|
26 more from the underlying file than it should. |
|
27 """ |
|
28 def read(self, n=-1): |
|
29 data = StringIO.StringIO.read(self, n) |
|
30 if data == '': |
|
31 raise AssertionError('caller tried to read past EOF') |
|
32 return data |
|
33 |
|
34 def readline(self, length=None): |
|
35 data = StringIO.StringIO.readline(self, length) |
|
36 if data == '': |
|
37 raise AssertionError('caller tried to read past EOF') |
|
38 return data |
|
39 |
|
40 |
|
41 class HeaderTests(TestCase): |
|
42 def test_auto_headers(self): |
|
43 # Some headers are added automatically, but should not be added by |
|
44 # .request() if they are explicitly set. |
|
45 |
|
46 import httplib |
|
47 |
|
48 class HeaderCountingBuffer(list): |
|
49 def __init__(self): |
|
50 self.count = {} |
|
51 def append(self, item): |
|
52 kv = item.split(':') |
|
53 if len(kv) > 1: |
|
54 # item is a 'Key: Value' header string |
|
55 lcKey = kv[0].lower() |
|
56 self.count.setdefault(lcKey, 0) |
|
57 self.count[lcKey] += 1 |
|
58 list.append(self, item) |
|
59 |
|
60 for explicit_header in True, False: |
|
61 for header in 'Content-length', 'Host', 'Accept-encoding': |
|
62 conn = httplib.HTTPConnection('example.com') |
|
63 conn.sock = FakeSocket('blahblahblah') |
|
64 conn._buffer = HeaderCountingBuffer() |
|
65 |
|
66 body = 'spamspamspam' |
|
67 headers = {} |
|
68 if explicit_header: |
|
69 headers[header] = str(len(body)) |
|
70 conn.request('POST', '/', body, headers) |
|
71 self.assertEqual(conn._buffer.count[header.lower()], 1) |
|
72 |
|
73 # Collect output to a buffer so that we don't have to cope with line-ending |
|
74 # issues across platforms. Specifically, the headers will have \r\n pairs |
|
75 # and some platforms will strip them from the output file. |
|
76 |
|
77 def test(): |
|
78 buf = StringIO.StringIO() |
|
79 _stdout = sys.stdout |
|
80 try: |
|
81 sys.stdout = buf |
|
82 _test() |
|
83 finally: |
|
84 sys.stdout = _stdout |
|
85 |
|
86 # print individual lines with endings stripped |
|
87 s = buf.getvalue() |
|
88 for line in s.split("\n"): |
|
89 print line.strip() |
|
90 |
|
91 def _test(): |
|
92 # Test HTTP status lines |
|
93 |
|
94 body = "HTTP/1.1 200 Ok\r\n\r\nText" |
|
95 sock = FakeSocket(body) |
|
96 resp = httplib.HTTPResponse(sock, 1) |
|
97 resp.begin() |
|
98 print resp.read() |
|
99 resp.close() |
|
100 |
|
101 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" |
|
102 sock = FakeSocket(body) |
|
103 resp = httplib.HTTPResponse(sock, 1) |
|
104 try: |
|
105 resp.begin() |
|
106 except httplib.BadStatusLine: |
|
107 print "BadStatusLine raised as expected" |
|
108 else: |
|
109 print "Expect BadStatusLine" |
|
110 |
|
111 # Check invalid host_port |
|
112 |
|
113 for hp in ("www.python.org:abc", "www.python.org:"): |
|
114 try: |
|
115 h = httplib.HTTP(hp) |
|
116 except httplib.InvalidURL: |
|
117 print "InvalidURL raised as expected" |
|
118 else: |
|
119 print "Expect InvalidURL" |
|
120 |
|
121 for hp,h,p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b", 8000), |
|
122 ("www.python.org:80", "www.python.org", 80), |
|
123 ("www.python.org", "www.python.org", 80), |
|
124 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)): |
|
125 try: |
|
126 http = httplib.HTTP(hp) |
|
127 except httplib.InvalidURL: |
|
128 print "InvalidURL raised erroneously" |
|
129 c = http._conn |
|
130 if h != c.host: raise AssertionError, ("Host incorrectly parsed", h, c.host) |
|
131 if p != c.port: raise AssertionError, ("Port incorrectly parsed", p, c.host) |
|
132 |
|
133 # test response with multiple message headers with the same field name. |
|
134 text = ('HTTP/1.1 200 OK\r\n' |
|
135 'Set-Cookie: Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"\r\n' |
|
136 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' |
|
137 ' Path="/acme"\r\n' |
|
138 '\r\n' |
|
139 'No body\r\n') |
|
140 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' |
|
141 ', ' |
|
142 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') |
|
143 s = FakeSocket(text) |
|
144 r = httplib.HTTPResponse(s, 1) |
|
145 r.begin() |
|
146 cookies = r.getheader("Set-Cookie") |
|
147 if cookies != hdr: |
|
148 raise AssertionError, "multiple headers not combined properly" |
|
149 |
|
150 # Test that the library doesn't attempt to read any data |
|
151 # from a HEAD request. (Tickles SF bug #622042.) |
|
152 sock = FakeSocket( |
|
153 'HTTP/1.1 200 OK\r\n' |
|
154 'Content-Length: 14432\r\n' |
|
155 '\r\n', |
|
156 NoEOFStringIO) |
|
157 resp = httplib.HTTPResponse(sock, 1, method="HEAD") |
|
158 resp.begin() |
|
159 if resp.read() != "": |
|
160 raise AssertionError, "Did not expect response from HEAD request" |
|
161 resp.close() |
|
162 |
|
163 |
|
164 class OfflineTest(TestCase): |
|
165 def test_responses(self): |
|
166 self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found") |
|
167 |
|
168 def test_main(verbose=None): |
|
169 tests = [HeaderTests,OfflineTest] |
|
170 test_support.run_unittest(*tests) |
|
171 |
|
172 test() |