|
1 #!/usr/bin/env python |
|
2 |
|
3 import unittest |
|
4 from test import test_support |
|
5 from test.test_urllib2 import sanepathname2url |
|
6 |
|
7 import socket |
|
8 import urllib2 |
|
9 import sys |
|
10 import os |
|
11 import mimetools |
|
12 |
|
13 |
|
14 def _retry_thrice(func, exc, *args, **kwargs): |
|
15 for i in range(3): |
|
16 try: |
|
17 return func(*args, **kwargs) |
|
18 except exc, last_exc: |
|
19 continue |
|
20 except: |
|
21 raise |
|
22 raise last_exc |
|
23 |
|
24 def _wrap_with_retry_thrice(func, exc): |
|
25 def wrapped(*args, **kwargs): |
|
26 return _retry_thrice(func, exc, *args, **kwargs) |
|
27 return wrapped |
|
28 |
|
29 # Connecting to remote hosts is flaky. Make it more robust by retrying |
|
30 # the connection several times. |
|
31 _urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError) |
|
32 |
|
33 |
|
34 class AuthTests(unittest.TestCase): |
|
35 """Tests urllib2 authentication features.""" |
|
36 |
|
37 ## Disabled at the moment since there is no page under python.org which |
|
38 ## could be used to HTTP authentication. |
|
39 # |
|
40 # def test_basic_auth(self): |
|
41 # import httplib |
|
42 # |
|
43 # test_url = "http://www.python.org/test/test_urllib2/basic_auth" |
|
44 # test_hostport = "www.python.org" |
|
45 # test_realm = 'Test Realm' |
|
46 # test_user = 'test.test_urllib2net' |
|
47 # test_password = 'blah' |
|
48 # |
|
49 # # failure |
|
50 # try: |
|
51 # _urlopen_with_retry(test_url) |
|
52 # except urllib2.HTTPError, exc: |
|
53 # self.assertEqual(exc.code, 401) |
|
54 # else: |
|
55 # self.fail("urlopen() should have failed with 401") |
|
56 # |
|
57 # # success |
|
58 # auth_handler = urllib2.HTTPBasicAuthHandler() |
|
59 # auth_handler.add_password(test_realm, test_hostport, |
|
60 # test_user, test_password) |
|
61 # opener = urllib2.build_opener(auth_handler) |
|
62 # f = opener.open('http://localhost/') |
|
63 # response = _urlopen_with_retry("http://www.python.org/") |
|
64 # |
|
65 # # The 'userinfo' URL component is deprecated by RFC 3986 for security |
|
66 # # reasons, let's not implement it! (it's already implemented for proxy |
|
67 # # specification strings (that is, URLs or authorities specifying a |
|
68 # # proxy), so we must keep that) |
|
69 # self.assertRaises(httplib.InvalidURL, |
|
70 # urllib2.urlopen, "http://evil:thing@example.com") |
|
71 |
|
72 |
|
73 class CloseSocketTest(unittest.TestCase): |
|
74 |
|
75 def test_close(self): |
|
76 import socket, httplib, gc |
|
77 |
|
78 # calling .close() on urllib2's response objects should close the |
|
79 # underlying socket |
|
80 |
|
81 # delve deep into response to fetch socket._socketobject |
|
82 response = _urlopen_with_retry("http://www.python.org/") |
|
83 abused_fileobject = response.fp |
|
84 self.assert_(abused_fileobject.__class__ is socket._fileobject) |
|
85 httpresponse = abused_fileobject._sock |
|
86 self.assert_(httpresponse.__class__ is httplib.HTTPResponse) |
|
87 fileobject = httpresponse.fp |
|
88 self.assert_(fileobject.__class__ is socket._fileobject) |
|
89 |
|
90 self.assert_(not fileobject.closed) |
|
91 response.close() |
|
92 self.assert_(fileobject.closed) |
|
93 |
|
94 class OtherNetworkTests(unittest.TestCase): |
|
95 def setUp(self): |
|
96 if 0: # for debugging |
|
97 import logging |
|
98 logger = logging.getLogger("test_urllib2net") |
|
99 logger.addHandler(logging.StreamHandler()) |
|
100 |
|
101 # XXX The rest of these tests aren't very good -- they don't check much. |
|
102 # They do sometimes catch some major disasters, though. |
|
103 |
|
104 def test_ftp(self): |
|
105 urls = [ |
|
106 'ftp://ftp.kernel.org/pub/linux/kernel/README', |
|
107 'ftp://ftp.kernel.org/pub/linux/kernel/non-existant-file', |
|
108 #'ftp://ftp.kernel.org/pub/leenox/kernel/test', |
|
109 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC' |
|
110 '/research-reports/00README-Legal-Rules-Regs', |
|
111 ] |
|
112 self._test_urls(urls, self._extra_handlers()) |
|
113 |
|
114 def test_file(self): |
|
115 TESTFN = test_support.TESTFN |
|
116 f = open(TESTFN, 'w') |
|
117 try: |
|
118 f.write('hi there\n') |
|
119 f.close() |
|
120 urls = [ |
|
121 'file:'+sanepathname2url(os.path.abspath(TESTFN)), |
|
122 ('file:///nonsensename/etc/passwd', None, urllib2.URLError), |
|
123 ] |
|
124 self._test_urls(urls, self._extra_handlers(), retry=True) |
|
125 finally: |
|
126 os.remove(TESTFN) |
|
127 |
|
128 # XXX Following test depends on machine configurations that are internal |
|
129 # to CNRI. Need to set up a public server with the right authentication |
|
130 # configuration for test purposes. |
|
131 |
|
132 ## def test_cnri(self): |
|
133 ## if socket.gethostname() == 'bitdiddle': |
|
134 ## localhost = 'bitdiddle.cnri.reston.va.us' |
|
135 ## elif socket.gethostname() == 'bitdiddle.concentric.net': |
|
136 ## localhost = 'localhost' |
|
137 ## else: |
|
138 ## localhost = None |
|
139 ## if localhost is not None: |
|
140 ## urls = [ |
|
141 ## 'file://%s/etc/passwd' % localhost, |
|
142 ## 'http://%s/simple/' % localhost, |
|
143 ## 'http://%s/digest/' % localhost, |
|
144 ## 'http://%s/not/found.h' % localhost, |
|
145 ## ] |
|
146 |
|
147 ## bauth = HTTPBasicAuthHandler() |
|
148 ## bauth.add_password('basic_test_realm', localhost, 'jhylton', |
|
149 ## 'password') |
|
150 ## dauth = HTTPDigestAuthHandler() |
|
151 ## dauth.add_password('digest_test_realm', localhost, 'jhylton', |
|
152 ## 'password') |
|
153 |
|
154 ## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) |
|
155 |
|
156 def _test_urls(self, urls, handlers, retry=True): |
|
157 import socket |
|
158 import time |
|
159 import logging |
|
160 debug = logging.getLogger("test_urllib2").debug |
|
161 |
|
162 urlopen = urllib2.build_opener(*handlers).open |
|
163 if retry: |
|
164 urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError) |
|
165 |
|
166 for url in urls: |
|
167 if isinstance(url, tuple): |
|
168 url, req, expected_err = url |
|
169 else: |
|
170 req = expected_err = None |
|
171 debug(url) |
|
172 try: |
|
173 f = urlopen(url, req) |
|
174 except EnvironmentError, err: |
|
175 debug(err) |
|
176 if expected_err: |
|
177 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % |
|
178 (expected_err, url, req, type(err), err)) |
|
179 self.assert_(isinstance(err, expected_err), msg) |
|
180 else: |
|
181 with test_support.transient_internet(): |
|
182 buf = f.read() |
|
183 f.close() |
|
184 debug("read %d bytes" % len(buf)) |
|
185 debug("******** next url coming up...") |
|
186 time.sleep(0.1) |
|
187 |
|
188 def _extra_handlers(self): |
|
189 handlers = [] |
|
190 |
|
191 cfh = urllib2.CacheFTPHandler() |
|
192 cfh.setTimeout(1) |
|
193 handlers.append(cfh) |
|
194 |
|
195 return handlers |
|
196 |
|
197 |
|
198 class TimeoutTest(unittest.TestCase): |
|
199 def test_http_basic(self): |
|
200 self.assertTrue(socket.getdefaulttimeout() is None) |
|
201 u = _urlopen_with_retry("http://www.python.org") |
|
202 self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None) |
|
203 |
|
204 def test_http_default_timeout(self): |
|
205 self.assertTrue(socket.getdefaulttimeout() is None) |
|
206 socket.setdefaulttimeout(60) |
|
207 try: |
|
208 u = _urlopen_with_retry("http://www.python.org") |
|
209 finally: |
|
210 socket.setdefaulttimeout(None) |
|
211 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60) |
|
212 |
|
213 def test_http_no_timeout(self): |
|
214 self.assertTrue(socket.getdefaulttimeout() is None) |
|
215 socket.setdefaulttimeout(60) |
|
216 try: |
|
217 u = _urlopen_with_retry("http://www.python.org", timeout=None) |
|
218 finally: |
|
219 socket.setdefaulttimeout(None) |
|
220 self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None) |
|
221 |
|
222 def test_http_timeout(self): |
|
223 u = _urlopen_with_retry("http://www.python.org", timeout=120) |
|
224 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120) |
|
225 |
|
226 FTP_HOST = "ftp://ftp.mirror.nl/pub/mirror/gnu/" |
|
227 |
|
228 def test_ftp_basic(self): |
|
229 self.assertTrue(socket.getdefaulttimeout() is None) |
|
230 u = _urlopen_with_retry(self.FTP_HOST) |
|
231 self.assertTrue(u.fp.fp._sock.gettimeout() is None) |
|
232 |
|
233 def test_ftp_default_timeout(self): |
|
234 self.assertTrue(socket.getdefaulttimeout() is None) |
|
235 socket.setdefaulttimeout(60) |
|
236 try: |
|
237 u = _urlopen_with_retry(self.FTP_HOST) |
|
238 finally: |
|
239 socket.setdefaulttimeout(None) |
|
240 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) |
|
241 |
|
242 def test_ftp_no_timeout(self): |
|
243 self.assertTrue(socket.getdefaulttimeout() is None) |
|
244 socket.setdefaulttimeout(60) |
|
245 try: |
|
246 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) |
|
247 finally: |
|
248 socket.setdefaulttimeout(None) |
|
249 self.assertTrue(u.fp.fp._sock.gettimeout() is None) |
|
250 |
|
251 def test_ftp_timeout(self): |
|
252 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) |
|
253 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) |
|
254 |
|
255 |
|
256 def test_main(): |
|
257 test_support.requires("network") |
|
258 test_support.run_unittest(AuthTests, |
|
259 OtherNetworkTests, |
|
260 CloseSocketTest, |
|
261 TimeoutTest, |
|
262 ) |
|
263 |
|
264 if __name__ == "__main__": |
|
265 test_main() |