|
1 #!/usr/bin/env python |
|
2 |
|
3 import mimetools |
|
4 import threading |
|
5 import urlparse |
|
6 import urllib2 |
|
7 import BaseHTTPServer |
|
8 import unittest |
|
9 import hashlib |
|
10 from test import test_support |
|
11 |
|
12 # Loopback http server infrastructure |
|
13 |
|
14 class LoopbackHttpServer(BaseHTTPServer.HTTPServer): |
|
15 """HTTP server w/ a few modifications that make it useful for |
|
16 loopback testing purposes. |
|
17 """ |
|
18 |
|
19 def __init__(self, server_address, RequestHandlerClass): |
|
20 BaseHTTPServer.HTTPServer.__init__(self, |
|
21 server_address, |
|
22 RequestHandlerClass) |
|
23 |
|
24 # Set the timeout of our listening socket really low so |
|
25 # that we can stop the server easily. |
|
26 self.socket.settimeout(1.0) |
|
27 |
|
28 def get_request(self): |
|
29 """BaseHTTPServer method, overridden.""" |
|
30 |
|
31 request, client_address = self.socket.accept() |
|
32 |
|
33 # It's a loopback connection, so setting the timeout |
|
34 # really low shouldn't affect anything, but should make |
|
35 # deadlocks less likely to occur. |
|
36 request.settimeout(10.0) |
|
37 |
|
38 return (request, client_address) |
|
39 |
|
40 class LoopbackHttpServerThread(threading.Thread): |
|
41 """Stoppable thread that runs a loopback http server.""" |
|
42 |
|
43 def __init__(self, request_handler): |
|
44 threading.Thread.__init__(self) |
|
45 self._stop = False |
|
46 self.ready = threading.Event() |
|
47 request_handler.protocol_version = "HTTP/1.0" |
|
48 self.httpd = LoopbackHttpServer(('127.0.0.1', 0), |
|
49 request_handler) |
|
50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name, |
|
51 # self.httpd.server_port) |
|
52 self.port = self.httpd.server_port |
|
53 |
|
54 def stop(self): |
|
55 """Stops the webserver if it's currently running.""" |
|
56 |
|
57 # Set the stop flag. |
|
58 self._stop = True |
|
59 |
|
60 self.join() |
|
61 |
|
62 def run(self): |
|
63 self.ready.set() |
|
64 while not self._stop: |
|
65 self.httpd.handle_request() |
|
66 |
|
67 # Authentication infrastructure |
|
68 |
|
69 class DigestAuthHandler: |
|
70 """Handler for performing digest authentication.""" |
|
71 |
|
72 def __init__(self): |
|
73 self._request_num = 0 |
|
74 self._nonces = [] |
|
75 self._users = {} |
|
76 self._realm_name = "Test Realm" |
|
77 self._qop = "auth" |
|
78 |
|
79 def set_qop(self, qop): |
|
80 self._qop = qop |
|
81 |
|
82 def set_users(self, users): |
|
83 assert isinstance(users, dict) |
|
84 self._users = users |
|
85 |
|
86 def set_realm(self, realm): |
|
87 self._realm_name = realm |
|
88 |
|
89 def _generate_nonce(self): |
|
90 self._request_num += 1 |
|
91 nonce = hashlib.md5(str(self._request_num)).hexdigest() |
|
92 self._nonces.append(nonce) |
|
93 return nonce |
|
94 |
|
95 def _create_auth_dict(self, auth_str): |
|
96 first_space_index = auth_str.find(" ") |
|
97 auth_str = auth_str[first_space_index+1:] |
|
98 |
|
99 parts = auth_str.split(",") |
|
100 |
|
101 auth_dict = {} |
|
102 for part in parts: |
|
103 name, value = part.split("=") |
|
104 name = name.strip() |
|
105 if value[0] == '"' and value[-1] == '"': |
|
106 value = value[1:-1] |
|
107 else: |
|
108 value = value.strip() |
|
109 auth_dict[name] = value |
|
110 return auth_dict |
|
111 |
|
112 def _validate_auth(self, auth_dict, password, method, uri): |
|
113 final_dict = {} |
|
114 final_dict.update(auth_dict) |
|
115 final_dict["password"] = password |
|
116 final_dict["method"] = method |
|
117 final_dict["uri"] = uri |
|
118 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict |
|
119 HA1 = hashlib.md5(HA1_str).hexdigest() |
|
120 HA2_str = "%(method)s:%(uri)s" % final_dict |
|
121 HA2 = hashlib.md5(HA2_str).hexdigest() |
|
122 final_dict["HA1"] = HA1 |
|
123 final_dict["HA2"] = HA2 |
|
124 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ |
|
125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict |
|
126 response = hashlib.md5(response_str).hexdigest() |
|
127 |
|
128 return response == auth_dict["response"] |
|
129 |
|
130 def _return_auth_challenge(self, request_handler): |
|
131 request_handler.send_response(407, "Proxy Authentication Required") |
|
132 request_handler.send_header("Content-Type", "text/html") |
|
133 request_handler.send_header( |
|
134 'Proxy-Authenticate', 'Digest realm="%s", ' |
|
135 'qop="%s",' |
|
136 'nonce="%s", ' % \ |
|
137 (self._realm_name, self._qop, self._generate_nonce())) |
|
138 # XXX: Not sure if we're supposed to add this next header or |
|
139 # not. |
|
140 #request_handler.send_header('Connection', 'close') |
|
141 request_handler.end_headers() |
|
142 request_handler.wfile.write("Proxy Authentication Required.") |
|
143 return False |
|
144 |
|
145 def handle_request(self, request_handler): |
|
146 """Performs digest authentication on the given HTTP request |
|
147 handler. Returns True if authentication was successful, False |
|
148 otherwise. |
|
149 |
|
150 If no users have been set, then digest auth is effectively |
|
151 disabled and this method will always return True. |
|
152 """ |
|
153 |
|
154 if len(self._users) == 0: |
|
155 return True |
|
156 |
|
157 if not request_handler.headers.has_key('Proxy-Authorization'): |
|
158 return self._return_auth_challenge(request_handler) |
|
159 else: |
|
160 auth_dict = self._create_auth_dict( |
|
161 request_handler.headers['Proxy-Authorization'] |
|
162 ) |
|
163 if self._users.has_key(auth_dict["username"]): |
|
164 password = self._users[ auth_dict["username"] ] |
|
165 else: |
|
166 return self._return_auth_challenge(request_handler) |
|
167 if not auth_dict.get("nonce") in self._nonces: |
|
168 return self._return_auth_challenge(request_handler) |
|
169 else: |
|
170 self._nonces.remove(auth_dict["nonce"]) |
|
171 |
|
172 auth_validated = False |
|
173 |
|
174 # MSIE uses short_path in its validation, but Python's |
|
175 # urllib2 uses the full path, so we're going to see if |
|
176 # either of them works here. |
|
177 |
|
178 for path in [request_handler.path, request_handler.short_path]: |
|
179 if self._validate_auth(auth_dict, |
|
180 password, |
|
181 request_handler.command, |
|
182 path): |
|
183 auth_validated = True |
|
184 |
|
185 if not auth_validated: |
|
186 return self._return_auth_challenge(request_handler) |
|
187 return True |
|
188 |
|
189 # Proxy test infrastructure |
|
190 |
|
191 class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): |
|
192 """This is a 'fake proxy' that makes it look like the entire |
|
193 internet has gone down due to a sudden zombie invasion. It main |
|
194 utility is in providing us with authentication support for |
|
195 testing. |
|
196 """ |
|
197 |
|
198 digest_auth_handler = DigestAuthHandler() |
|
199 |
|
200 def log_message(self, format, *args): |
|
201 # Uncomment the next line for debugging. |
|
202 #sys.stderr.write(format % args) |
|
203 pass |
|
204 |
|
205 def do_GET(self): |
|
206 (scm, netloc, path, params, query, fragment) = urlparse.urlparse( |
|
207 self.path, 'http') |
|
208 self.short_path = path |
|
209 if self.digest_auth_handler.handle_request(self): |
|
210 self.send_response(200, "OK") |
|
211 self.send_header("Content-Type", "text/html") |
|
212 self.end_headers() |
|
213 self.wfile.write("You've reached %s!<BR>" % self.path) |
|
214 self.wfile.write("Our apologies, but our server is down due to " |
|
215 "a sudden zombie invasion.") |
|
216 |
|
217 # Test cases |
|
218 |
|
219 class ProxyAuthTests(unittest.TestCase): |
|
220 URL = "http://localhost" |
|
221 |
|
222 USER = "tester" |
|
223 PASSWD = "test123" |
|
224 REALM = "TestRealm" |
|
225 |
|
226 def setUp(self): |
|
227 FakeProxyHandler.digest_auth_handler.set_users({ |
|
228 self.USER : self.PASSWD |
|
229 }) |
|
230 FakeProxyHandler.digest_auth_handler.set_realm(self.REALM) |
|
231 |
|
232 self.server = LoopbackHttpServerThread(FakeProxyHandler) |
|
233 self.server.start() |
|
234 self.server.ready.wait() |
|
235 proxy_url = "http://127.0.0.1:%d" % self.server.port |
|
236 handler = urllib2.ProxyHandler({"http" : proxy_url}) |
|
237 self._digest_auth_handler = urllib2.ProxyDigestAuthHandler() |
|
238 self.opener = urllib2.build_opener(handler, self._digest_auth_handler) |
|
239 |
|
240 def tearDown(self): |
|
241 self.server.stop() |
|
242 |
|
243 def test_proxy_with_bad_password_raises_httperror(self): |
|
244 self._digest_auth_handler.add_password(self.REALM, self.URL, |
|
245 self.USER, self.PASSWD+"bad") |
|
246 FakeProxyHandler.digest_auth_handler.set_qop("auth") |
|
247 self.assertRaises(urllib2.HTTPError, |
|
248 self.opener.open, |
|
249 self.URL) |
|
250 |
|
251 def test_proxy_with_no_password_raises_httperror(self): |
|
252 FakeProxyHandler.digest_auth_handler.set_qop("auth") |
|
253 self.assertRaises(urllib2.HTTPError, |
|
254 self.opener.open, |
|
255 self.URL) |
|
256 |
|
257 def test_proxy_qop_auth_works(self): |
|
258 self._digest_auth_handler.add_password(self.REALM, self.URL, |
|
259 self.USER, self.PASSWD) |
|
260 FakeProxyHandler.digest_auth_handler.set_qop("auth") |
|
261 result = self.opener.open(self.URL) |
|
262 while result.read(): |
|
263 pass |
|
264 result.close() |
|
265 |
|
266 def test_proxy_qop_auth_int_works_or_throws_urlerror(self): |
|
267 self._digest_auth_handler.add_password(self.REALM, self.URL, |
|
268 self.USER, self.PASSWD) |
|
269 FakeProxyHandler.digest_auth_handler.set_qop("auth-int") |
|
270 try: |
|
271 result = self.opener.open(self.URL) |
|
272 except urllib2.URLError: |
|
273 # It's okay if we don't support auth-int, but we certainly |
|
274 # shouldn't receive any kind of exception here other than |
|
275 # a URLError. |
|
276 result = None |
|
277 if result: |
|
278 while result.read(): |
|
279 pass |
|
280 result.close() |
|
281 |
|
282 |
|
283 def GetRequestHandler(responses): |
|
284 |
|
285 class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): |
|
286 |
|
287 server_version = "TestHTTP/" |
|
288 requests = [] |
|
289 headers_received = [] |
|
290 port = 80 |
|
291 |
|
292 def do_GET(self): |
|
293 body = self.send_head() |
|
294 if body: |
|
295 self.wfile.write(body) |
|
296 |
|
297 def do_POST(self): |
|
298 content_length = self.headers['Content-Length'] |
|
299 post_data = self.rfile.read(int(content_length)) |
|
300 self.do_GET() |
|
301 self.requests.append(post_data) |
|
302 |
|
303 def send_head(self): |
|
304 FakeHTTPRequestHandler.headers_received = self.headers |
|
305 self.requests.append(self.path) |
|
306 response_code, headers, body = responses.pop(0) |
|
307 |
|
308 self.send_response(response_code) |
|
309 |
|
310 for (header, value) in headers: |
|
311 self.send_header(header, value % self.port) |
|
312 if body: |
|
313 self.send_header('Content-type', 'text/plain') |
|
314 self.end_headers() |
|
315 return body |
|
316 self.end_headers() |
|
317 |
|
318 def log_message(self, *args): |
|
319 pass |
|
320 |
|
321 |
|
322 return FakeHTTPRequestHandler |
|
323 |
|
324 |
|
325 class TestUrlopen(unittest.TestCase): |
|
326 """Tests urllib2.urlopen using the network. |
|
327 |
|
328 These tests are not exhaustive. Assuming that testing using files does a |
|
329 good job overall of some of the basic interface features. There are no |
|
330 tests exercising the optional 'data' and 'proxies' arguments. No tests |
|
331 for transparent redirection have been written. |
|
332 """ |
|
333 |
|
334 def start_server(self, responses): |
|
335 handler = GetRequestHandler(responses) |
|
336 |
|
337 self.server = LoopbackHttpServerThread(handler) |
|
338 self.server.start() |
|
339 self.server.ready.wait() |
|
340 port = self.server.port |
|
341 handler.port = port |
|
342 return handler |
|
343 |
|
344 |
|
345 def test_redirection(self): |
|
346 expected_response = 'We got here...' |
|
347 responses = [ |
|
348 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''), |
|
349 (200, [], expected_response) |
|
350 ] |
|
351 |
|
352 handler = self.start_server(responses) |
|
353 |
|
354 try: |
|
355 f = urllib2.urlopen('http://localhost:%s/' % handler.port) |
|
356 data = f.read() |
|
357 f.close() |
|
358 |
|
359 self.assertEquals(data, expected_response) |
|
360 self.assertEquals(handler.requests, ['/', '/somewhere_else']) |
|
361 finally: |
|
362 self.server.stop() |
|
363 |
|
364 |
|
365 def test_404(self): |
|
366 expected_response = 'Bad bad bad...' |
|
367 handler = self.start_server([(404, [], expected_response)]) |
|
368 |
|
369 try: |
|
370 try: |
|
371 urllib2.urlopen('http://localhost:%s/weeble' % handler.port) |
|
372 except urllib2.URLError, f: |
|
373 pass |
|
374 else: |
|
375 self.fail('404 should raise URLError') |
|
376 |
|
377 data = f.read() |
|
378 f.close() |
|
379 |
|
380 self.assertEquals(data, expected_response) |
|
381 self.assertEquals(handler.requests, ['/weeble']) |
|
382 finally: |
|
383 self.server.stop() |
|
384 |
|
385 |
|
386 def test_200(self): |
|
387 expected_response = 'pycon 2008...' |
|
388 handler = self.start_server([(200, [], expected_response)]) |
|
389 |
|
390 try: |
|
391 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port) |
|
392 data = f.read() |
|
393 f.close() |
|
394 |
|
395 self.assertEquals(data, expected_response) |
|
396 self.assertEquals(handler.requests, ['/bizarre']) |
|
397 finally: |
|
398 self.server.stop() |
|
399 |
|
400 def test_200_with_parameters(self): |
|
401 expected_response = 'pycon 2008...' |
|
402 handler = self.start_server([(200, [], expected_response)]) |
|
403 |
|
404 try: |
|
405 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling') |
|
406 data = f.read() |
|
407 f.close() |
|
408 |
|
409 self.assertEquals(data, expected_response) |
|
410 self.assertEquals(handler.requests, ['/bizarre', 'get=with_feeling']) |
|
411 finally: |
|
412 self.server.stop() |
|
413 |
|
414 |
|
415 def test_sending_headers(self): |
|
416 handler = self.start_server([(200, [], "we don't care")]) |
|
417 |
|
418 try: |
|
419 req = urllib2.Request("http://localhost:%s/" % handler.port, |
|
420 headers={'Range': 'bytes=20-39'}) |
|
421 urllib2.urlopen(req) |
|
422 self.assertEqual(handler.headers_received['Range'], 'bytes=20-39') |
|
423 finally: |
|
424 self.server.stop() |
|
425 |
|
426 def test_basic(self): |
|
427 handler = self.start_server([(200, [], "we don't care")]) |
|
428 |
|
429 try: |
|
430 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) |
|
431 for attr in ("read", "close", "info", "geturl"): |
|
432 self.assert_(hasattr(open_url, attr), "object returned from " |
|
433 "urlopen lacks the %s attribute" % attr) |
|
434 try: |
|
435 self.assert_(open_url.read(), "calling 'read' failed") |
|
436 finally: |
|
437 open_url.close() |
|
438 finally: |
|
439 self.server.stop() |
|
440 |
|
441 def test_info(self): |
|
442 handler = self.start_server([(200, [], "we don't care")]) |
|
443 |
|
444 try: |
|
445 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) |
|
446 info_obj = open_url.info() |
|
447 self.assert_(isinstance(info_obj, mimetools.Message), |
|
448 "object returned by 'info' is not an instance of " |
|
449 "mimetools.Message") |
|
450 self.assertEqual(info_obj.getsubtype(), "plain") |
|
451 finally: |
|
452 self.server.stop() |
|
453 |
|
454 def test_geturl(self): |
|
455 # Make sure same URL as opened is returned by geturl. |
|
456 handler = self.start_server([(200, [], "we don't care")]) |
|
457 |
|
458 try: |
|
459 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) |
|
460 url = open_url.geturl() |
|
461 self.assertEqual(url, "http://localhost:%s" % handler.port) |
|
462 finally: |
|
463 self.server.stop() |
|
464 |
|
465 |
|
466 def test_bad_address(self): |
|
467 # Make sure proper exception is raised when connecting to a bogus |
|
468 # address. |
|
469 self.assertRaises(IOError, |
|
470 # SF patch 809915: In Sep 2003, VeriSign started |
|
471 # highjacking invalid .com and .net addresses to |
|
472 # boost traffic to their own site. This test |
|
473 # started failing then. One hopes the .invalid |
|
474 # domain will be spared to serve its defined |
|
475 # purpose. |
|
476 # urllib2.urlopen, "http://www.sadflkjsasadf.com/") |
|
477 urllib2.urlopen, "http://www.python.invalid./") |
|
478 |
|
479 |
|
480 def test_main(): |
|
481 # We will NOT depend on the network resource flag |
|
482 # (Lib/test/regrtest.py -u network) since all tests here are only |
|
483 # localhost. However, if this is a bad rationale, then uncomment |
|
484 # the next line. |
|
485 #test_support.requires("network") |
|
486 |
|
487 test_support.run_unittest(ProxyAuthTests) |
|
488 test_support.run_unittest(TestUrlopen) |
|
489 |
|
490 if __name__ == "__main__": |
|
491 test_main() |