symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_urllib2_localnet.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #!/usr/bin/env python
       
     2 
       
     3 import mimetools
       
     4 import threading
       
     5 import urlparse
       
     6 import urllib2
       
     7 import BaseHTTPServer
       
     8 import unittest
       
     9 import hashlib
       
    10 from test import test_support
       
    11 
       
    12 # Loopback http server infrastructure
       
    13 
       
    14 class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
       
    15     """HTTP server w/ a few modifications that make it useful for
       
    16     loopback testing purposes.
       
    17     """
       
    18 
       
    19     def __init__(self, server_address, RequestHandlerClass):
       
    20         BaseHTTPServer.HTTPServer.__init__(self,
       
    21                                            server_address,
       
    22                                            RequestHandlerClass)
       
    23 
       
    24         # Set the timeout of our listening socket really low so
       
    25         # that we can stop the server easily.
       
    26         self.socket.settimeout(1.0)
       
    27 
       
    28     def get_request(self):
       
    29         """BaseHTTPServer method, overridden."""
       
    30 
       
    31         request, client_address = self.socket.accept()
       
    32 
       
    33         # It's a loopback connection, so setting the timeout
       
    34         # really low shouldn't affect anything, but should make
       
    35         # deadlocks less likely to occur.
       
    36         request.settimeout(10.0)
       
    37 
       
    38         return (request, client_address)
       
    39 
       
    40 class LoopbackHttpServerThread(threading.Thread):
       
    41     """Stoppable thread that runs a loopback http server."""
       
    42 
       
    43     def __init__(self, request_handler):
       
    44         threading.Thread.__init__(self)
       
    45         self._stop = False
       
    46         self.ready = threading.Event()
       
    47         request_handler.protocol_version = "HTTP/1.0"
       
    48         self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
       
    49                                         request_handler)
       
    50         #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
       
    51         #                                      self.httpd.server_port)
       
    52         self.port = self.httpd.server_port
       
    53 
       
    54     def stop(self):
       
    55         """Stops the webserver if it's currently running."""
       
    56 
       
    57         # Set the stop flag.
       
    58         self._stop = True
       
    59 
       
    60         self.join()
       
    61 
       
    62     def run(self):
       
    63         self.ready.set()
       
    64         while not self._stop:
       
    65             self.httpd.handle_request()
       
    66 
       
    67 # Authentication infrastructure
       
    68 
       
    69 class DigestAuthHandler:
       
    70     """Handler for performing digest authentication."""
       
    71 
       
    72     def __init__(self):
       
    73         self._request_num = 0
       
    74         self._nonces = []
       
    75         self._users = {}
       
    76         self._realm_name = "Test Realm"
       
    77         self._qop = "auth"
       
    78 
       
    79     def set_qop(self, qop):
       
    80         self._qop = qop
       
    81 
       
    82     def set_users(self, users):
       
    83         assert isinstance(users, dict)
       
    84         self._users = users
       
    85 
       
    86     def set_realm(self, realm):
       
    87         self._realm_name = realm
       
    88 
       
    89     def _generate_nonce(self):
       
    90         self._request_num += 1
       
    91         nonce = hashlib.md5(str(self._request_num)).hexdigest()
       
    92         self._nonces.append(nonce)
       
    93         return nonce
       
    94 
       
    95     def _create_auth_dict(self, auth_str):
       
    96         first_space_index = auth_str.find(" ")
       
    97         auth_str = auth_str[first_space_index+1:]
       
    98 
       
    99         parts = auth_str.split(",")
       
   100 
       
   101         auth_dict = {}
       
   102         for part in parts:
       
   103             name, value = part.split("=")
       
   104             name = name.strip()
       
   105             if value[0] == '"' and value[-1] == '"':
       
   106                 value = value[1:-1]
       
   107             else:
       
   108                 value = value.strip()
       
   109             auth_dict[name] = value
       
   110         return auth_dict
       
   111 
       
   112     def _validate_auth(self, auth_dict, password, method, uri):
       
   113         final_dict = {}
       
   114         final_dict.update(auth_dict)
       
   115         final_dict["password"] = password
       
   116         final_dict["method"] = method
       
   117         final_dict["uri"] = uri
       
   118         HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
       
   119         HA1 = hashlib.md5(HA1_str).hexdigest()
       
   120         HA2_str = "%(method)s:%(uri)s" % final_dict
       
   121         HA2 = hashlib.md5(HA2_str).hexdigest()
       
   122         final_dict["HA1"] = HA1
       
   123         final_dict["HA2"] = HA2
       
   124         response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
       
   125                        "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
       
   126         response = hashlib.md5(response_str).hexdigest()
       
   127 
       
   128         return response == auth_dict["response"]
       
   129 
       
   130     def _return_auth_challenge(self, request_handler):
       
   131         request_handler.send_response(407, "Proxy Authentication Required")
       
   132         request_handler.send_header("Content-Type", "text/html")
       
   133         request_handler.send_header(
       
   134             'Proxy-Authenticate', 'Digest realm="%s", '
       
   135             'qop="%s",'
       
   136             'nonce="%s", ' % \
       
   137             (self._realm_name, self._qop, self._generate_nonce()))
       
   138         # XXX: Not sure if we're supposed to add this next header or
       
   139         # not.
       
   140         #request_handler.send_header('Connection', 'close')
       
   141         request_handler.end_headers()
       
   142         request_handler.wfile.write("Proxy Authentication Required.")
       
   143         return False
       
   144 
       
   145     def handle_request(self, request_handler):
       
   146         """Performs digest authentication on the given HTTP request
       
   147         handler.  Returns True if authentication was successful, False
       
   148         otherwise.
       
   149 
       
   150         If no users have been set, then digest auth is effectively
       
   151         disabled and this method will always return True.
       
   152         """
       
   153 
       
   154         if len(self._users) == 0:
       
   155             return True
       
   156 
       
   157         if not request_handler.headers.has_key('Proxy-Authorization'):
       
   158             return self._return_auth_challenge(request_handler)
       
   159         else:
       
   160             auth_dict = self._create_auth_dict(
       
   161                 request_handler.headers['Proxy-Authorization']
       
   162                 )
       
   163             if self._users.has_key(auth_dict["username"]):
       
   164                 password = self._users[ auth_dict["username"] ]
       
   165             else:
       
   166                 return self._return_auth_challenge(request_handler)
       
   167             if not auth_dict.get("nonce") in self._nonces:
       
   168                 return self._return_auth_challenge(request_handler)
       
   169             else:
       
   170                 self._nonces.remove(auth_dict["nonce"])
       
   171 
       
   172             auth_validated = False
       
   173 
       
   174             # MSIE uses short_path in its validation, but Python's
       
   175             # urllib2 uses the full path, so we're going to see if
       
   176             # either of them works here.
       
   177 
       
   178             for path in [request_handler.path, request_handler.short_path]:
       
   179                 if self._validate_auth(auth_dict,
       
   180                                        password,
       
   181                                        request_handler.command,
       
   182                                        path):
       
   183                     auth_validated = True
       
   184 
       
   185             if not auth_validated:
       
   186                 return self._return_auth_challenge(request_handler)
       
   187             return True
       
   188 
       
   189 # Proxy test infrastructure
       
   190 
       
   191 class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
       
   192     """This is a 'fake proxy' that makes it look like the entire
       
   193     internet has gone down due to a sudden zombie invasion.  It main
       
   194     utility is in providing us with authentication support for
       
   195     testing.
       
   196     """
       
   197 
       
   198     digest_auth_handler = DigestAuthHandler()
       
   199 
       
   200     def log_message(self, format, *args):
       
   201         # Uncomment the next line for debugging.
       
   202         #sys.stderr.write(format % args)
       
   203         pass
       
   204 
       
   205     def do_GET(self):
       
   206         (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
       
   207             self.path, 'http')
       
   208         self.short_path = path
       
   209         if self.digest_auth_handler.handle_request(self):
       
   210             self.send_response(200, "OK")
       
   211             self.send_header("Content-Type", "text/html")
       
   212             self.end_headers()
       
   213             self.wfile.write("You've reached %s!<BR>" % self.path)
       
   214             self.wfile.write("Our apologies, but our server is down due to "
       
   215                               "a sudden zombie invasion.")
       
   216 
       
   217 # Test cases
       
   218 
       
   219 class ProxyAuthTests(unittest.TestCase):
       
   220     URL = "http://localhost"
       
   221 
       
   222     USER = "tester"
       
   223     PASSWD = "test123"
       
   224     REALM = "TestRealm"
       
   225 
       
   226     def setUp(self):
       
   227         FakeProxyHandler.digest_auth_handler.set_users({
       
   228             self.USER : self.PASSWD
       
   229             })
       
   230         FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
       
   231 
       
   232         self.server = LoopbackHttpServerThread(FakeProxyHandler)
       
   233         self.server.start()
       
   234         self.server.ready.wait()
       
   235         proxy_url = "http://127.0.0.1:%d" % self.server.port
       
   236         handler = urllib2.ProxyHandler({"http" : proxy_url})
       
   237         self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
       
   238         self.opener = urllib2.build_opener(handler, self._digest_auth_handler)
       
   239 
       
   240     def tearDown(self):
       
   241         self.server.stop()
       
   242 
       
   243     def test_proxy_with_bad_password_raises_httperror(self):
       
   244         self._digest_auth_handler.add_password(self.REALM, self.URL,
       
   245                                                self.USER, self.PASSWD+"bad")
       
   246         FakeProxyHandler.digest_auth_handler.set_qop("auth")
       
   247         self.assertRaises(urllib2.HTTPError,
       
   248                           self.opener.open,
       
   249                           self.URL)
       
   250 
       
   251     def test_proxy_with_no_password_raises_httperror(self):
       
   252         FakeProxyHandler.digest_auth_handler.set_qop("auth")
       
   253         self.assertRaises(urllib2.HTTPError,
       
   254                           self.opener.open,
       
   255                           self.URL)
       
   256 
       
   257     def test_proxy_qop_auth_works(self):
       
   258         self._digest_auth_handler.add_password(self.REALM, self.URL,
       
   259                                                self.USER, self.PASSWD)
       
   260         FakeProxyHandler.digest_auth_handler.set_qop("auth")
       
   261         result = self.opener.open(self.URL)
       
   262         while result.read():
       
   263             pass
       
   264         result.close()
       
   265 
       
   266     def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
       
   267         self._digest_auth_handler.add_password(self.REALM, self.URL,
       
   268                                                self.USER, self.PASSWD)
       
   269         FakeProxyHandler.digest_auth_handler.set_qop("auth-int")
       
   270         try:
       
   271             result = self.opener.open(self.URL)
       
   272         except urllib2.URLError:
       
   273             # It's okay if we don't support auth-int, but we certainly
       
   274             # shouldn't receive any kind of exception here other than
       
   275             # a URLError.
       
   276             result = None
       
   277         if result:
       
   278             while result.read():
       
   279                 pass
       
   280             result.close()
       
   281 
       
   282 
       
   283 def GetRequestHandler(responses):
       
   284 
       
   285     class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
       
   286 
       
   287         server_version = "TestHTTP/"
       
   288         requests = []
       
   289         headers_received = []
       
   290         port = 80
       
   291 
       
   292         def do_GET(self):
       
   293             body = self.send_head()
       
   294             if body:
       
   295                 self.wfile.write(body)
       
   296 
       
   297         def do_POST(self):
       
   298             content_length = self.headers['Content-Length']
       
   299             post_data = self.rfile.read(int(content_length))
       
   300             self.do_GET()
       
   301             self.requests.append(post_data)
       
   302 
       
   303         def send_head(self):
       
   304             FakeHTTPRequestHandler.headers_received = self.headers
       
   305             self.requests.append(self.path)
       
   306             response_code, headers, body = responses.pop(0)
       
   307 
       
   308             self.send_response(response_code)
       
   309 
       
   310             for (header, value) in headers:
       
   311                 self.send_header(header, value % self.port)
       
   312             if body:
       
   313                 self.send_header('Content-type', 'text/plain')
       
   314                 self.end_headers()
       
   315                 return body
       
   316             self.end_headers()
       
   317 
       
   318         def log_message(self, *args):
       
   319             pass
       
   320 
       
   321 
       
   322     return FakeHTTPRequestHandler
       
   323 
       
   324 
       
   325 class TestUrlopen(unittest.TestCase):
       
   326     """Tests urllib2.urlopen using the network.
       
   327 
       
   328     These tests are not exhaustive.  Assuming that testing using files does a
       
   329     good job overall of some of the basic interface features.  There are no
       
   330     tests exercising the optional 'data' and 'proxies' arguments.  No tests
       
   331     for transparent redirection have been written.
       
   332     """
       
   333 
       
   334     def start_server(self, responses):
       
   335         handler = GetRequestHandler(responses)
       
   336 
       
   337         self.server = LoopbackHttpServerThread(handler)
       
   338         self.server.start()
       
   339         self.server.ready.wait()
       
   340         port = self.server.port
       
   341         handler.port = port
       
   342         return handler
       
   343 
       
   344 
       
   345     def test_redirection(self):
       
   346         expected_response = 'We got here...'
       
   347         responses = [
       
   348             (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
       
   349             (200, [], expected_response)
       
   350         ]
       
   351 
       
   352         handler = self.start_server(responses)
       
   353 
       
   354         try:
       
   355             f = urllib2.urlopen('http://localhost:%s/' % handler.port)
       
   356             data = f.read()
       
   357             f.close()
       
   358 
       
   359             self.assertEquals(data, expected_response)
       
   360             self.assertEquals(handler.requests, ['/', '/somewhere_else'])
       
   361         finally:
       
   362             self.server.stop()
       
   363 
       
   364 
       
   365     def test_404(self):
       
   366         expected_response = 'Bad bad bad...'
       
   367         handler = self.start_server([(404, [], expected_response)])
       
   368 
       
   369         try:
       
   370             try:
       
   371                 urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
       
   372             except urllib2.URLError, f:
       
   373                 pass
       
   374             else:
       
   375                 self.fail('404 should raise URLError')
       
   376 
       
   377             data = f.read()
       
   378             f.close()
       
   379 
       
   380             self.assertEquals(data, expected_response)
       
   381             self.assertEquals(handler.requests, ['/weeble'])
       
   382         finally:
       
   383             self.server.stop()
       
   384 
       
   385 
       
   386     def test_200(self):
       
   387         expected_response = 'pycon 2008...'
       
   388         handler = self.start_server([(200, [], expected_response)])
       
   389 
       
   390         try:
       
   391             f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
       
   392             data = f.read()
       
   393             f.close()
       
   394 
       
   395             self.assertEquals(data, expected_response)
       
   396             self.assertEquals(handler.requests, ['/bizarre'])
       
   397         finally:
       
   398             self.server.stop()
       
   399 
       
   400     def test_200_with_parameters(self):
       
   401         expected_response = 'pycon 2008...'
       
   402         handler = self.start_server([(200, [], expected_response)])
       
   403 
       
   404         try:
       
   405             f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
       
   406             data = f.read()
       
   407             f.close()
       
   408 
       
   409             self.assertEquals(data, expected_response)
       
   410             self.assertEquals(handler.requests, ['/bizarre', 'get=with_feeling'])
       
   411         finally:
       
   412             self.server.stop()
       
   413 
       
   414 
       
   415     def test_sending_headers(self):
       
   416         handler = self.start_server([(200, [], "we don't care")])
       
   417 
       
   418         try:
       
   419             req = urllib2.Request("http://localhost:%s/" % handler.port,
       
   420                                   headers={'Range': 'bytes=20-39'})
       
   421             urllib2.urlopen(req)
       
   422             self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
       
   423         finally:
       
   424             self.server.stop()
       
   425 
       
   426     def test_basic(self):
       
   427         handler = self.start_server([(200, [], "we don't care")])
       
   428 
       
   429         try:
       
   430             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
       
   431             for attr in ("read", "close", "info", "geturl"):
       
   432                 self.assert_(hasattr(open_url, attr), "object returned from "
       
   433                              "urlopen lacks the %s attribute" % attr)
       
   434             try:
       
   435                 self.assert_(open_url.read(), "calling 'read' failed")
       
   436             finally:
       
   437                 open_url.close()
       
   438         finally:
       
   439             self.server.stop()
       
   440 
       
   441     def test_info(self):
       
   442         handler = self.start_server([(200, [], "we don't care")])
       
   443 
       
   444         try:
       
   445             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
       
   446             info_obj = open_url.info()
       
   447             self.assert_(isinstance(info_obj, mimetools.Message),
       
   448                          "object returned by 'info' is not an instance of "
       
   449                          "mimetools.Message")
       
   450             self.assertEqual(info_obj.getsubtype(), "plain")
       
   451         finally:
       
   452             self.server.stop()
       
   453 
       
   454     def test_geturl(self):
       
   455         # Make sure same URL as opened is returned by geturl.
       
   456         handler = self.start_server([(200, [], "we don't care")])
       
   457 
       
   458         try:
       
   459             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
       
   460             url = open_url.geturl()
       
   461             self.assertEqual(url, "http://localhost:%s" % handler.port)
       
   462         finally:
       
   463             self.server.stop()
       
   464 
       
   465 
       
   466     def test_bad_address(self):
       
   467         # Make sure proper exception is raised when connecting to a bogus
       
   468         # address.
       
   469         self.assertRaises(IOError,
       
   470                           # SF patch 809915:  In Sep 2003, VeriSign started
       
   471                           # highjacking invalid .com and .net addresses to
       
   472                           # boost traffic to their own site.  This test
       
   473                           # started failing then.  One hopes the .invalid
       
   474                           # domain will be spared to serve its defined
       
   475                           # purpose.
       
   476                           # urllib2.urlopen, "http://www.sadflkjsasadf.com/")
       
   477                           urllib2.urlopen, "http://www.python.invalid./")
       
   478 
       
   479 
       
   480 def test_main():
       
   481     # We will NOT depend on the network resource flag
       
   482     # (Lib/test/regrtest.py -u network) since all tests here are only
       
   483     # localhost.  However, if this is a bad rationale, then uncomment
       
   484     # the next line.
       
   485     #test_support.requires("network")
       
   486 
       
   487     test_support.run_unittest(ProxyAuthTests)
       
   488     test_support.run_unittest(TestUrlopen)
       
   489 
       
   490 if __name__ == "__main__":
       
   491     test_main()