python-2.5.2/win32/Lib/test/test_urllib2.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 import unittest
       
     2 from test import test_support
       
     3 
       
     4 import os, socket
       
     5 import StringIO
       
     6 
       
     7 import urllib2
       
     8 from urllib2 import Request, OpenerDirector
       
     9 
       
    10 # XXX
       
    11 # Request
       
    12 # CacheFTPHandler (hard to write)
       
    13 # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
       
    14 
       
    15 class TrivialTests(unittest.TestCase):
       
    16     def test_trivial(self):
       
    17         # A couple trivial tests
       
    18 
       
    19         self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')
       
    20 
       
    21         # XXX Name hacking to get this to work on Windows.
       
    22         fname = os.path.abspath(urllib2.__file__).replace('\\', '/')
       
    23         if fname[1:2] == ":":
       
    24             fname = fname[2:]
       
    25         # And more hacking to get it to work on MacOS. This assumes
       
    26         # urllib.pathname2url works, unfortunately...
       
    27         if os.name == 'mac':
       
    28             fname = '/' + fname.replace(':', '/')
       
    29         elif os.name == 'riscos':
       
    30             import string
       
    31             fname = os.expand(fname)
       
    32             fname = fname.translate(string.maketrans("/.", "./"))
       
    33 
       
    34         file_url = "file://%s" % fname
       
    35         f = urllib2.urlopen(file_url)
       
    36 
       
    37         buf = f.read()
       
    38         f.close()
       
    39 
       
    40     def test_parse_http_list(self):
       
    41         tests = [('a,b,c', ['a', 'b', 'c']),
       
    42                  ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
       
    43                  ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
       
    44                  ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
       
    45         for string, list in tests:
       
    46             self.assertEquals(urllib2.parse_http_list(string), list)
       
    47 
       
    48 
       
    49 def test_request_headers_dict():
       
    50     """
       
    51     The Request.headers dictionary is not a documented interface.  It should
       
    52     stay that way, because the complete set of headers are only accessible
       
    53     through the .get_header(), .has_header(), .header_items() interface.
       
    54     However, .headers pre-dates those methods, and so real code will be using
       
    55     the dictionary.
       
    56 
       
    57     The introduction in 2.4 of those methods was a mistake for the same reason:
       
    58     code that previously saw all (urllib2 user)-provided headers in .headers
       
    59     now sees only a subset (and the function interface is ugly and incomplete).
       
    60     A better change would have been to replace .headers dict with a dict
       
    61     subclass (or UserDict.DictMixin instance?)  that preserved the .headers
       
    62     interface and also provided access to the "unredirected" headers.  It's
       
    63     probably too late to fix that, though.
       
    64 
       
    65 
       
    66     Check .capitalize() case normalization:
       
    67 
       
    68     >>> url = "http://example.com"
       
    69     >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
       
    70     'blah'
       
    71     >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
       
    72     'blah'
       
    73 
       
    74     Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
       
    75     but that could be changed in future.
       
    76 
       
    77     """
       
    78 
       
    79 def test_request_headers_methods():
       
    80     """
       
    81     Note the case normalization of header names here, to .capitalize()-case.
       
    82     This should be preserved for backwards-compatibility.  (In the HTTP case,
       
    83     normalization to .title()-case is done by urllib2 before sending headers to
       
    84     httplib).
       
    85 
       
    86     >>> url = "http://example.com"
       
    87     >>> r = Request(url, headers={"Spam-eggs": "blah"})
       
    88     >>> r.has_header("Spam-eggs")
       
    89     True
       
    90     >>> r.header_items()
       
    91     [('Spam-eggs', 'blah')]
       
    92     >>> r.add_header("Foo-Bar", "baz")
       
    93     >>> items = r.header_items()
       
    94     >>> items.sort()
       
    95     >>> items
       
    96     [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
       
    97 
       
    98     Note that e.g. r.has_header("spam-EggS") is currently False, and
       
    99     r.get_header("spam-EggS") returns None, but that could be changed in
       
   100     future.
       
   101 
       
   102     >>> r.has_header("Not-there")
       
   103     False
       
   104     >>> print r.get_header("Not-there")
       
   105     None
       
   106     >>> r.get_header("Not-there", "default")
       
   107     'default'
       
   108 
       
   109     """
       
   110 
       
   111 
       
   112 def test_password_manager(self):
       
   113     """
       
   114     >>> mgr = urllib2.HTTPPasswordMgr()
       
   115     >>> add = mgr.add_password
       
   116     >>> add("Some Realm", "http://example.com/", "joe", "password")
       
   117     >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
       
   118     >>> add("c", "http://example.com/foo", "foo", "ni")
       
   119     >>> add("c", "http://example.com/bar", "bar", "nini")
       
   120     >>> add("b", "http://example.com/", "first", "blah")
       
   121     >>> add("b", "http://example.com/", "second", "spam")
       
   122     >>> add("a", "http://example.com", "1", "a")
       
   123     >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
       
   124     >>> add("Some Realm", "d.example.com", "4", "d")
       
   125     >>> add("Some Realm", "e.example.com:3128", "5", "e")
       
   126 
       
   127     >>> mgr.find_user_password("Some Realm", "example.com")
       
   128     ('joe', 'password')
       
   129     >>> mgr.find_user_password("Some Realm", "http://example.com")
       
   130     ('joe', 'password')
       
   131     >>> mgr.find_user_password("Some Realm", "http://example.com/")
       
   132     ('joe', 'password')
       
   133     >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
       
   134     ('joe', 'password')
       
   135     >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
       
   136     ('joe', 'password')
       
   137     >>> mgr.find_user_password("c", "http://example.com/foo")
       
   138     ('foo', 'ni')
       
   139     >>> mgr.find_user_password("c", "http://example.com/bar")
       
   140     ('bar', 'nini')
       
   141 
       
   142     Actually, this is really undefined ATM
       
   143 ##     Currently, we use the highest-level path where more than one match:
       
   144 
       
   145 ##     >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
       
   146 ##     ('joe', 'password')
       
   147 
       
   148     Use latest add_password() in case of conflict:
       
   149 
       
   150     >>> mgr.find_user_password("b", "http://example.com/")
       
   151     ('second', 'spam')
       
   152 
       
   153     No special relationship between a.example.com and example.com:
       
   154 
       
   155     >>> mgr.find_user_password("a", "http://example.com/")
       
   156     ('1', 'a')
       
   157     >>> mgr.find_user_password("a", "http://a.example.com/")
       
   158     (None, None)
       
   159 
       
   160     Ports:
       
   161 
       
   162     >>> mgr.find_user_password("Some Realm", "c.example.com")
       
   163     (None, None)
       
   164     >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
       
   165     ('3', 'c')
       
   166     >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
       
   167     ('3', 'c')
       
   168     >>> mgr.find_user_password("Some Realm", "d.example.com")
       
   169     ('4', 'd')
       
   170     >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
       
   171     ('5', 'e')
       
   172 
       
   173     """
       
   174     pass
       
   175 
       
   176 
       
   177 def test_password_manager_default_port(self):
       
   178     """
       
   179     >>> mgr = urllib2.HTTPPasswordMgr()
       
   180     >>> add = mgr.add_password
       
   181 
       
   182     The point to note here is that we can't guess the default port if there's
       
   183     no scheme.  This applies to both add_password and find_user_password.
       
   184 
       
   185     >>> add("f", "http://g.example.com:80", "10", "j")
       
   186     >>> add("g", "http://h.example.com", "11", "k")
       
   187     >>> add("h", "i.example.com:80", "12", "l")
       
   188     >>> add("i", "j.example.com", "13", "m")
       
   189     >>> mgr.find_user_password("f", "g.example.com:100")
       
   190     (None, None)
       
   191     >>> mgr.find_user_password("f", "g.example.com:80")
       
   192     ('10', 'j')
       
   193     >>> mgr.find_user_password("f", "g.example.com")
       
   194     (None, None)
       
   195     >>> mgr.find_user_password("f", "http://g.example.com:100")
       
   196     (None, None)
       
   197     >>> mgr.find_user_password("f", "http://g.example.com:80")
       
   198     ('10', 'j')
       
   199     >>> mgr.find_user_password("f", "http://g.example.com")
       
   200     ('10', 'j')
       
   201     >>> mgr.find_user_password("g", "h.example.com")
       
   202     ('11', 'k')
       
   203     >>> mgr.find_user_password("g", "h.example.com:80")
       
   204     ('11', 'k')
       
   205     >>> mgr.find_user_password("g", "http://h.example.com:80")
       
   206     ('11', 'k')
       
   207     >>> mgr.find_user_password("h", "i.example.com")
       
   208     (None, None)
       
   209     >>> mgr.find_user_password("h", "i.example.com:80")
       
   210     ('12', 'l')
       
   211     >>> mgr.find_user_password("h", "http://i.example.com:80")
       
   212     ('12', 'l')
       
   213     >>> mgr.find_user_password("i", "j.example.com")
       
   214     ('13', 'm')
       
   215     >>> mgr.find_user_password("i", "j.example.com:80")
       
   216     (None, None)
       
   217     >>> mgr.find_user_password("i", "http://j.example.com")
       
   218     ('13', 'm')
       
   219     >>> mgr.find_user_password("i", "http://j.example.com:80")
       
   220     (None, None)
       
   221 
       
   222     """
       
   223 
       
   224 class MockOpener:
       
   225     addheaders = []
       
   226     def open(self, req, data=None):
       
   227         self.req, self.data = req, data
       
   228     def error(self, proto, *args):
       
   229         self.proto, self.args = proto, args
       
   230 
       
   231 class MockFile:
       
   232     def read(self, count=None): pass
       
   233     def readline(self, count=None): pass
       
   234     def close(self): pass
       
   235 
       
   236 class MockHeaders(dict):
       
   237     def getheaders(self, name):
       
   238         return self.values()
       
   239 
       
   240 class MockResponse(StringIO.StringIO):
       
   241     def __init__(self, code, msg, headers, data, url=None):
       
   242         StringIO.StringIO.__init__(self, data)
       
   243         self.code, self.msg, self.headers, self.url = code, msg, headers, url
       
   244     def info(self):
       
   245         return self.headers
       
   246     def geturl(self):
       
   247         return self.url
       
   248 
       
   249 class MockCookieJar:
       
   250     def add_cookie_header(self, request):
       
   251         self.ach_req = request
       
   252     def extract_cookies(self, response, request):
       
   253         self.ec_req, self.ec_r = request, response
       
   254 
       
   255 class FakeMethod:
       
   256     def __init__(self, meth_name, action, handle):
       
   257         self.meth_name = meth_name
       
   258         self.handle = handle
       
   259         self.action = action
       
   260     def __call__(self, *args):
       
   261         return self.handle(self.meth_name, self.action, *args)
       
   262 
       
   263 class MockHandler:
       
   264     # useful for testing handler machinery
       
   265     # see add_ordered_mock_handlers() docstring
       
   266     handler_order = 500
       
   267     def __init__(self, methods):
       
   268         self._define_methods(methods)
       
   269     def _define_methods(self, methods):
       
   270         for spec in methods:
       
   271             if len(spec) == 2: name, action = spec
       
   272             else: name, action = spec, None
       
   273             meth = FakeMethod(name, action, self.handle)
       
   274             setattr(self.__class__, name, meth)
       
   275     def handle(self, fn_name, action, *args, **kwds):
       
   276         self.parent.calls.append((self, fn_name, args, kwds))
       
   277         if action is None:
       
   278             return None
       
   279         elif action == "return self":
       
   280             return self
       
   281         elif action == "return response":
       
   282             res = MockResponse(200, "OK", {}, "")
       
   283             return res
       
   284         elif action == "return request":
       
   285             return Request("http://blah/")
       
   286         elif action.startswith("error"):
       
   287             code = action[action.rfind(" ")+1:]
       
   288             try:
       
   289                 code = int(code)
       
   290             except ValueError:
       
   291                 pass
       
   292             res = MockResponse(200, "OK", {}, "")
       
   293             return self.parent.error("http", args[0], res, code, "", {})
       
   294         elif action == "raise":
       
   295             raise urllib2.URLError("blah")
       
   296         assert False
       
   297     def close(self): pass
       
   298     def add_parent(self, parent):
       
   299         self.parent = parent
       
   300         self.parent.calls = []
       
   301     def __lt__(self, other):
       
   302         if not hasattr(other, "handler_order"):
       
   303             # No handler_order, leave in original order.  Yuck.
       
   304             return True
       
   305         return self.handler_order < other.handler_order
       
   306 
       
   307 def add_ordered_mock_handlers(opener, meth_spec):
       
   308     """Create MockHandlers and add them to an OpenerDirector.
       
   309 
       
   310     meth_spec: list of lists of tuples and strings defining methods to define
       
   311     on handlers.  eg:
       
   312 
       
   313     [["http_error", "ftp_open"], ["http_open"]]
       
   314 
       
   315     defines methods .http_error() and .ftp_open() on one handler, and
       
   316     .http_open() on another.  These methods just record their arguments and
       
   317     return None.  Using a tuple instead of a string causes the method to
       
   318     perform some action (see MockHandler.handle()), eg:
       
   319 
       
   320     [["http_error"], [("http_open", "return request")]]
       
   321 
       
   322     defines .http_error() on one handler (which simply returns None), and
       
   323     .http_open() on another handler, which returns a Request object.
       
   324 
       
   325     """
       
   326     handlers = []
       
   327     count = 0
       
   328     for meths in meth_spec:
       
   329         class MockHandlerSubclass(MockHandler): pass
       
   330         h = MockHandlerSubclass(meths)
       
   331         h.handler_order += count
       
   332         h.add_parent(opener)
       
   333         count = count + 1
       
   334         handlers.append(h)
       
   335         opener.add_handler(h)
       
   336     return handlers
       
   337 
       
   338 def build_test_opener(*handler_instances):
       
   339     opener = OpenerDirector()
       
   340     for h in handler_instances:
       
   341         opener.add_handler(h)
       
   342     return opener
       
   343 
       
   344 class MockHTTPHandler(urllib2.BaseHandler):
       
   345     # useful for testing redirections and auth
       
   346     # sends supplied headers and code as first response
       
   347     # sends 200 OK as second response
       
   348     def __init__(self, code, headers):
       
   349         self.code = code
       
   350         self.headers = headers
       
   351         self.reset()
       
   352     def reset(self):
       
   353         self._count = 0
       
   354         self.requests = []
       
   355     def http_open(self, req):
       
   356         import mimetools, httplib, copy
       
   357         from StringIO import StringIO
       
   358         self.requests.append(copy.deepcopy(req))
       
   359         if self._count == 0:
       
   360             self._count = self._count + 1
       
   361             name = httplib.responses[self.code]
       
   362             msg = mimetools.Message(StringIO(self.headers))
       
   363             return self.parent.error(
       
   364                 "http", req, MockFile(), self.code, name, msg)
       
   365         else:
       
   366             self.req = req
       
   367             msg = mimetools.Message(StringIO("\r\n\r\n"))
       
   368             return MockResponse(200, "OK", msg, "", req.get_full_url())
       
   369 
       
   370 class MockPasswordManager:
       
   371     def add_password(self, realm, uri, user, password):
       
   372         self.realm = realm
       
   373         self.url = uri
       
   374         self.user = user
       
   375         self.password = password
       
   376     def find_user_password(self, realm, authuri):
       
   377         self.target_realm = realm
       
   378         self.target_url = authuri
       
   379         return self.user, self.password
       
   380 
       
   381 
       
   382 class OpenerDirectorTests(unittest.TestCase):
       
   383 
       
   384     def test_add_non_handler(self):
       
   385         class NonHandler(object):
       
   386             pass
       
   387         self.assertRaises(TypeError,
       
   388                           OpenerDirector().add_handler, NonHandler())
       
   389 
       
   390     def test_badly_named_methods(self):
       
   391         # test work-around for three methods that accidentally follow the
       
   392         # naming conventions for handler methods
       
   393         # (*_open() / *_request() / *_response())
       
   394 
       
   395         # These used to call the accidentally-named methods, causing a
       
   396         # TypeError in real code; here, returning self from these mock
       
   397         # methods would either cause no exception, or AttributeError.
       
   398 
       
   399         from urllib2 import URLError
       
   400 
       
   401         o = OpenerDirector()
       
   402         meth_spec = [
       
   403             [("do_open", "return self"), ("proxy_open", "return self")],
       
   404             [("redirect_request", "return self")],
       
   405             ]
       
   406         handlers = add_ordered_mock_handlers(o, meth_spec)
       
   407         o.add_handler(urllib2.UnknownHandler())
       
   408         for scheme in "do", "proxy", "redirect":
       
   409             self.assertRaises(URLError, o.open, scheme+"://example.com/")
       
   410 
       
   411     def test_handled(self):
       
   412         # handler returning non-None means no more handlers will be called
       
   413         o = OpenerDirector()
       
   414         meth_spec = [
       
   415             ["http_open", "ftp_open", "http_error_302"],
       
   416             ["ftp_open"],
       
   417             [("http_open", "return self")],
       
   418             [("http_open", "return self")],
       
   419             ]
       
   420         handlers = add_ordered_mock_handlers(o, meth_spec)
       
   421 
       
   422         req = Request("http://example.com/")
       
   423         r = o.open(req)
       
   424         # Second .http_open() gets called, third doesn't, since second returned
       
   425         # non-None.  Handlers without .http_open() never get any methods called
       
   426         # on them.
       
   427         # In fact, second mock handler defining .http_open() returns self
       
   428         # (instead of response), which becomes the OpenerDirector's return
       
   429         # value.
       
   430         self.assertEqual(r, handlers[2])
       
   431         calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
       
   432         for expected, got in zip(calls, o.calls):
       
   433             handler, name, args, kwds = got
       
   434             self.assertEqual((handler, name), expected)
       
   435             self.assertEqual(args, (req,))
       
   436 
       
   437     def test_handler_order(self):
       
   438         o = OpenerDirector()
       
   439         handlers = []
       
   440         for meths, handler_order in [
       
   441             ([("http_open", "return self")], 500),
       
   442             (["http_open"], 0),
       
   443             ]:
       
   444             class MockHandlerSubclass(MockHandler): pass
       
   445             h = MockHandlerSubclass(meths)
       
   446             h.handler_order = handler_order
       
   447             handlers.append(h)
       
   448             o.add_handler(h)
       
   449 
       
   450         r = o.open("http://example.com/")
       
   451         # handlers called in reverse order, thanks to their sort order
       
   452         self.assertEqual(o.calls[0][0], handlers[1])
       
   453         self.assertEqual(o.calls[1][0], handlers[0])
       
   454 
       
   455     def test_raise(self):
       
   456         # raising URLError stops processing of request
       
   457         o = OpenerDirector()
       
   458         meth_spec = [
       
   459             [("http_open", "raise")],
       
   460             [("http_open", "return self")],
       
   461             ]
       
   462         handlers = add_ordered_mock_handlers(o, meth_spec)
       
   463 
       
   464         req = Request("http://example.com/")
       
   465         self.assertRaises(urllib2.URLError, o.open, req)
       
   466         self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
       
   467 
       
   468 ##     def test_error(self):
       
   469 ##         # XXX this doesn't actually seem to be used in standard library,
       
   470 ##         #  but should really be tested anyway...
       
   471 
       
   472     def test_http_error(self):
       
   473         # XXX http_error_default
       
   474         # http errors are a special case
       
   475         o = OpenerDirector()
       
   476         meth_spec = [
       
   477             [("http_open", "error 302")],
       
   478             [("http_error_400", "raise"), "http_open"],
       
   479             [("http_error_302", "return response"), "http_error_303",
       
   480              "http_error"],
       
   481             [("http_error_302")],
       
   482             ]
       
   483         handlers = add_ordered_mock_handlers(o, meth_spec)
       
   484 
       
   485         class Unknown:
       
   486             def __eq__(self, other): return True
       
   487 
       
   488         req = Request("http://example.com/")
       
   489         r = o.open(req)
       
   490         assert len(o.calls) == 2
       
   491         calls = [(handlers[0], "http_open", (req,)),
       
   492                  (handlers[2], "http_error_302",
       
   493                   (req, Unknown(), 302, "", {}))]
       
   494         for expected, got in zip(calls, o.calls):
       
   495             handler, method_name, args = expected
       
   496             self.assertEqual((handler, method_name), got[:2])
       
   497             self.assertEqual(args, got[2])
       
   498 
       
   499     def test_processors(self):
       
   500         # *_request / *_response methods get called appropriately
       
   501         o = OpenerDirector()
       
   502         meth_spec = [
       
   503             [("http_request", "return request"),
       
   504              ("http_response", "return response")],
       
   505             [("http_request", "return request"),
       
   506              ("http_response", "return response")],
       
   507             ]
       
   508         handlers = add_ordered_mock_handlers(o, meth_spec)
       
   509 
       
   510         req = Request("http://example.com/")
       
   511         r = o.open(req)
       
   512         # processor methods are called on *all* handlers that define them,
       
   513         # not just the first handler that handles the request
       
   514         calls = [
       
   515             (handlers[0], "http_request"), (handlers[1], "http_request"),
       
   516             (handlers[0], "http_response"), (handlers[1], "http_response")]
       
   517 
       
   518         for i, (handler, name, args, kwds) in enumerate(o.calls):
       
   519             if i < 2:
       
   520                 # *_request
       
   521                 self.assertEqual((handler, name), calls[i])
       
   522                 self.assertEqual(len(args), 1)
       
   523                 self.assert_(isinstance(args[0], Request))
       
   524             else:
       
   525                 # *_response
       
   526                 self.assertEqual((handler, name), calls[i])
       
   527                 self.assertEqual(len(args), 2)
       
   528                 self.assert_(isinstance(args[0], Request))
       
   529                 # response from opener.open is None, because there's no
       
   530                 # handler that defines http_open to handle it
       
   531                 self.assert_(args[1] is None or
       
   532                              isinstance(args[1], MockResponse))
       
   533 
       
   534 
       
   535 def sanepathname2url(path):
       
   536     import urllib
       
   537     urlpath = urllib.pathname2url(path)
       
   538     if os.name == "nt" and urlpath.startswith("///"):
       
   539         urlpath = urlpath[2:]
       
   540     # XXX don't ask me about the mac...
       
   541     return urlpath
       
   542 
       
   543 class HandlerTests(unittest.TestCase):
       
   544 
       
   545     def test_ftp(self):
       
   546         class MockFTPWrapper:
       
   547             def __init__(self, data): self.data = data
       
   548             def retrfile(self, filename, filetype):
       
   549                 self.filename, self.filetype = filename, filetype
       
   550                 return StringIO.StringIO(self.data), len(self.data)
       
   551 
       
   552         class NullFTPHandler(urllib2.FTPHandler):
       
   553             def __init__(self, data): self.data = data
       
   554             def connect_ftp(self, user, passwd, host, port, dirs):
       
   555                 self.user, self.passwd = user, passwd
       
   556                 self.host, self.port = host, port
       
   557                 self.dirs = dirs
       
   558                 self.ftpwrapper = MockFTPWrapper(self.data)
       
   559                 return self.ftpwrapper
       
   560 
       
   561         import ftplib, socket
       
   562         data = "rheum rhaponicum"
       
   563         h = NullFTPHandler(data)
       
   564         o = h.parent = MockOpener()
       
   565 
       
   566         for url, host, port, type_, dirs, filename, mimetype in [
       
   567             ("ftp://localhost/foo/bar/baz.html",
       
   568              "localhost", ftplib.FTP_PORT, "I",
       
   569              ["foo", "bar"], "baz.html", "text/html"),
       
   570             ("ftp://localhost:80/foo/bar/",
       
   571              "localhost", 80, "D",
       
   572              ["foo", "bar"], "", None),
       
   573             ("ftp://localhost/baz.gif;type=a",
       
   574              "localhost", ftplib.FTP_PORT, "A",
       
   575              [], "baz.gif", None),  # XXX really this should guess image/gif
       
   576             ]:
       
   577             r = h.ftp_open(Request(url))
       
   578             # ftp authentication not yet implemented by FTPHandler
       
   579             self.assert_(h.user == h.passwd == "")
       
   580             self.assertEqual(h.host, socket.gethostbyname(host))
       
   581             self.assertEqual(h.port, port)
       
   582             self.assertEqual(h.dirs, dirs)
       
   583             self.assertEqual(h.ftpwrapper.filename, filename)
       
   584             self.assertEqual(h.ftpwrapper.filetype, type_)
       
   585             headers = r.info()
       
   586             self.assertEqual(headers.get("Content-type"), mimetype)
       
   587             self.assertEqual(int(headers["Content-length"]), len(data))
       
   588 
       
   589     def test_file(self):
       
   590         import time, rfc822, socket
       
   591         h = urllib2.FileHandler()
       
   592         o = h.parent = MockOpener()
       
   593 
       
   594         TESTFN = test_support.TESTFN
       
   595         urlpath = sanepathname2url(os.path.abspath(TESTFN))
       
   596         towrite = "hello, world\n"
       
   597         urls = [
       
   598             "file://localhost%s" % urlpath,
       
   599             "file://%s" % urlpath,
       
   600             "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
       
   601             ]
       
   602         try:
       
   603             localaddr = socket.gethostbyname(socket.gethostname())
       
   604         except socket.gaierror:
       
   605             localaddr = ''
       
   606         if localaddr:
       
   607             urls.append("file://%s%s" % (localaddr, urlpath))
       
   608 
       
   609         for url in urls:
       
   610             f = open(TESTFN, "wb")
       
   611             try:
       
   612                 try:
       
   613                     f.write(towrite)
       
   614                 finally:
       
   615                     f.close()
       
   616 
       
   617                 r = h.file_open(Request(url))
       
   618                 try:
       
   619                     data = r.read()
       
   620                     headers = r.info()
       
   621                     newurl = r.geturl()
       
   622                 finally:
       
   623                     r.close()
       
   624                 stats = os.stat(TESTFN)
       
   625                 modified = rfc822.formatdate(stats.st_mtime)
       
   626             finally:
       
   627                 os.remove(TESTFN)
       
   628             self.assertEqual(data, towrite)
       
   629             self.assertEqual(headers["Content-type"], "text/plain")
       
   630             self.assertEqual(headers["Content-length"], "13")
       
   631             self.assertEqual(headers["Last-modified"], modified)
       
   632 
       
   633         for url in [
       
   634             "file://localhost:80%s" % urlpath,
       
   635 # XXXX bug: these fail with socket.gaierror, should be URLError
       
   636 ##             "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
       
   637 ##                                    os.getcwd(), TESTFN),
       
   638 ##             "file://somerandomhost.ontheinternet.com%s/%s" %
       
   639 ##             (os.getcwd(), TESTFN),
       
   640             ]:
       
   641             try:
       
   642                 f = open(TESTFN, "wb")
       
   643                 try:
       
   644                     f.write(towrite)
       
   645                 finally:
       
   646                     f.close()
       
   647 
       
   648                 self.assertRaises(urllib2.URLError,
       
   649                                   h.file_open, Request(url))
       
   650             finally:
       
   651                 os.remove(TESTFN)
       
   652 
       
   653         h = urllib2.FileHandler()
       
   654         o = h.parent = MockOpener()
       
   655         # XXXX why does // mean ftp (and /// mean not ftp!), and where
       
   656         #  is file: scheme specified?  I think this is really a bug, and
       
   657         #  what was intended was to distinguish between URLs like:
       
   658         # file:/blah.txt (a file)
       
   659         # file://localhost/blah.txt (a file)
       
   660         # file:///blah.txt (a file)
       
   661         # file://ftp.example.com/blah.txt (an ftp URL)
       
   662         for url, ftp in [
       
   663             ("file://ftp.example.com//foo.txt", True),
       
   664             ("file://ftp.example.com///foo.txt", False),
       
   665 # XXXX bug: fails with OSError, should be URLError
       
   666             ("file://ftp.example.com/foo.txt", False),
       
   667             ]:
       
   668             req = Request(url)
       
   669             try:
       
   670                 h.file_open(req)
       
   671             # XXXX remove OSError when bug fixed
       
   672             except (urllib2.URLError, OSError):
       
   673                 self.assert_(not ftp)
       
   674             else:
       
   675                 self.assert_(o.req is req)
       
   676                 self.assertEqual(req.type, "ftp")
       
   677 
       
   678     def test_http(self):
       
   679         class MockHTTPResponse:
       
   680             def __init__(self, fp, msg, status, reason):
       
   681                 self.fp = fp
       
   682                 self.msg = msg
       
   683                 self.status = status
       
   684                 self.reason = reason
       
   685             def read(self):
       
   686                 return ''
       
   687         class MockHTTPClass:
       
   688             def __init__(self):
       
   689                 self.req_headers = []
       
   690                 self.data = None
       
   691                 self.raise_on_endheaders = False
       
   692             def __call__(self, host):
       
   693                 self.host = host
       
   694                 return self
       
   695             def set_debuglevel(self, level):
       
   696                 self.level = level
       
   697             def request(self, method, url, body=None, headers={}):
       
   698                 self.method = method
       
   699                 self.selector = url
       
   700                 self.req_headers += headers.items()
       
   701                 self.req_headers.sort()
       
   702                 if body:
       
   703                     self.data = body
       
   704                 if self.raise_on_endheaders:
       
   705                     import socket
       
   706                     raise socket.error()
       
   707             def getresponse(self):
       
   708                 return MockHTTPResponse(MockFile(), {}, 200, "OK")
       
   709 
       
   710         h = urllib2.AbstractHTTPHandler()
       
   711         o = h.parent = MockOpener()
       
   712 
       
   713         url = "http://example.com/"
       
   714         for method, data in [("GET", None), ("POST", "blah")]:
       
   715             req = Request(url, data, {"Foo": "bar"})
       
   716             req.add_unredirected_header("Spam", "eggs")
       
   717             http = MockHTTPClass()
       
   718             r = h.do_open(http, req)
       
   719 
       
   720             # result attributes
       
   721             r.read; r.readline  # wrapped MockFile methods
       
   722             r.info; r.geturl  # addinfourl methods
       
   723             r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
       
   724             hdrs = r.info()
       
   725             hdrs.get; hdrs.has_key  # r.info() gives dict from .getreply()
       
   726             self.assertEqual(r.geturl(), url)
       
   727 
       
   728             self.assertEqual(http.host, "example.com")
       
   729             self.assertEqual(http.level, 0)
       
   730             self.assertEqual(http.method, method)
       
   731             self.assertEqual(http.selector, "/")
       
   732             self.assertEqual(http.req_headers,
       
   733                              [("Connection", "close"),
       
   734                               ("Foo", "bar"), ("Spam", "eggs")])
       
   735             self.assertEqual(http.data, data)
       
   736 
       
   737         # check socket.error converted to URLError
       
   738         http.raise_on_endheaders = True
       
   739         self.assertRaises(urllib2.URLError, h.do_open, http, req)
       
   740 
       
   741         # check adding of standard headers
       
   742         o.addheaders = [("Spam", "eggs")]
       
   743         for data in "", None:  # POST, GET
       
   744             req = Request("http://example.com/", data)
       
   745             r = MockResponse(200, "OK", {}, "")
       
   746             newreq = h.do_request_(req)
       
   747             if data is None:  # GET
       
   748                 self.assert_("Content-length" not in req.unredirected_hdrs)
       
   749                 self.assert_("Content-type" not in req.unredirected_hdrs)
       
   750             else:  # POST
       
   751                 self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
       
   752                 self.assertEqual(req.unredirected_hdrs["Content-type"],
       
   753                              "application/x-www-form-urlencoded")
       
   754             # XXX the details of Host could be better tested
       
   755             self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
       
   756             self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
       
   757 
       
   758             # don't clobber existing headers
       
   759             req.add_unredirected_header("Content-length", "foo")
       
   760             req.add_unredirected_header("Content-type", "bar")
       
   761             req.add_unredirected_header("Host", "baz")
       
   762             req.add_unredirected_header("Spam", "foo")
       
   763             newreq = h.do_request_(req)
       
   764             self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
       
   765             self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
       
   766             self.assertEqual(req.unredirected_hdrs["Host"], "baz")
       
   767             self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
       
   768 
       
   769     def test_errors(self):
       
   770         h = urllib2.HTTPErrorProcessor()
       
   771         o = h.parent = MockOpener()
       
   772 
       
   773         url = "http://example.com/"
       
   774         req = Request(url)
       
   775         # 200 OK is passed through
       
   776         r = MockResponse(200, "OK", {}, "", url)
       
   777         newr = h.http_response(req, r)
       
   778         self.assert_(r is newr)
       
   779         self.assert_(not hasattr(o, "proto"))  # o.error not called
       
   780         # anything else calls o.error (and MockOpener returns None, here)
       
   781         r = MockResponse(201, "Created", {}, "", url)
       
   782         self.assert_(h.http_response(req, r) is None)
       
   783         self.assertEqual(o.proto, "http")  # o.error called
       
   784         self.assertEqual(o.args, (req, r, 201, "Created", {}))
       
   785 
       
   786     def test_cookies(self):
       
   787         cj = MockCookieJar()
       
   788         h = urllib2.HTTPCookieProcessor(cj)
       
   789         o = h.parent = MockOpener()
       
   790 
       
   791         req = Request("http://example.com/")
       
   792         r = MockResponse(200, "OK", {}, "")
       
   793         newreq = h.http_request(req)
       
   794         self.assert_(cj.ach_req is req is newreq)
       
   795         self.assertEquals(req.get_origin_req_host(), "example.com")
       
   796         self.assert_(not req.is_unverifiable())
       
   797         newr = h.http_response(req, r)
       
   798         self.assert_(cj.ec_req is req)
       
   799         self.assert_(cj.ec_r is r is newr)
       
   800 
       
   801     def test_redirect(self):
       
   802         from_url = "http://example.com/a.html"
       
   803         to_url = "http://example.com/b.html"
       
   804         h = urllib2.HTTPRedirectHandler()
       
   805         o = h.parent = MockOpener()
       
   806 
       
   807         # ordinary redirect behaviour
       
   808         for code in 301, 302, 303, 307:
       
   809             for data in None, "blah\nblah\n":
       
   810                 method = getattr(h, "http_error_%s" % code)
       
   811                 req = Request(from_url, data)
       
   812                 req.add_header("Nonsense", "viking=withhold")
       
   813                 req.add_unredirected_header("Spam", "spam")
       
   814                 try:
       
   815                     method(req, MockFile(), code, "Blah",
       
   816                            MockHeaders({"location": to_url}))
       
   817                 except urllib2.HTTPError:
       
   818                     # 307 in response to POST requires user OK
       
   819                     self.assert_(code == 307 and data is not None)
       
   820                 self.assertEqual(o.req.get_full_url(), to_url)
       
   821                 try:
       
   822                     self.assertEqual(o.req.get_method(), "GET")
       
   823                 except AttributeError:
       
   824                     self.assert_(not o.req.has_data())
       
   825                 self.assertEqual(o.req.headers["Nonsense"],
       
   826                                  "viking=withhold")
       
   827                 self.assert_("Spam" not in o.req.headers)
       
   828                 self.assert_("Spam" not in o.req.unredirected_hdrs)
       
   829 
       
   830         # loop detection
       
   831         req = Request(from_url)
       
   832         def redirect(h, req, url=to_url):
       
   833             h.http_error_302(req, MockFile(), 302, "Blah",
       
   834                              MockHeaders({"location": url}))
       
   835         # Note that the *original* request shares the same record of
       
   836         # redirections with the sub-requests caused by the redirections.
       
   837 
       
   838         # detect infinite loop redirect of a URL to itself
       
   839         req = Request(from_url, origin_req_host="example.com")
       
   840         count = 0
       
   841         try:
       
   842             while 1:
       
   843                 redirect(h, req, "http://example.com/")
       
   844                 count = count + 1
       
   845         except urllib2.HTTPError:
       
   846             # don't stop until max_repeats, because cookies may introduce state
       
   847             self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)
       
   848 
       
   849         # detect endless non-repeating chain of redirects
       
   850         req = Request(from_url, origin_req_host="example.com")
       
   851         count = 0
       
   852         try:
       
   853             while 1:
       
   854                 redirect(h, req, "http://example.com/%d" % count)
       
   855                 count = count + 1
       
   856         except urllib2.HTTPError:
       
   857             self.assertEqual(count,
       
   858                              urllib2.HTTPRedirectHandler.max_redirections)
       
   859 
       
   860     def test_cookie_redirect(self):
       
   861         # cookies shouldn't leak into redirected requests
       
   862         from cookielib import CookieJar
       
   863 
       
   864         from test.test_cookielib import interact_netscape
       
   865 
       
   866         cj = CookieJar()
       
   867         interact_netscape(cj, "http://www.example.com/", "spam=eggs")
       
   868         hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
       
   869         hdeh = urllib2.HTTPDefaultErrorHandler()
       
   870         hrh = urllib2.HTTPRedirectHandler()
       
   871         cp = urllib2.HTTPCookieProcessor(cj)
       
   872         o = build_test_opener(hh, hdeh, hrh, cp)
       
   873         o.open("http://www.example.com/")
       
   874         self.assert_(not hh.req.has_header("Cookie"))
       
   875 
       
   876     def test_proxy(self):
       
   877         o = OpenerDirector()
       
   878         ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
       
   879         o.add_handler(ph)
       
   880         meth_spec = [
       
   881             [("http_open", "return response")]
       
   882             ]
       
   883         handlers = add_ordered_mock_handlers(o, meth_spec)
       
   884 
       
   885         req = Request("http://acme.example.com/")
       
   886         self.assertEqual(req.get_host(), "acme.example.com")
       
   887         r = o.open(req)
       
   888         self.assertEqual(req.get_host(), "proxy.example.com:3128")
       
   889 
       
   890         self.assertEqual([(handlers[0], "http_open")],
       
   891                          [tup[0:2] for tup in o.calls])
       
   892 
       
   893     def test_basic_auth(self):
       
   894         opener = OpenerDirector()
       
   895         password_manager = MockPasswordManager()
       
   896         auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
       
   897         realm = "ACME Widget Store"
       
   898         http_handler = MockHTTPHandler(
       
   899             401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
       
   900         opener.add_handler(auth_handler)
       
   901         opener.add_handler(http_handler)
       
   902         self._test_basic_auth(opener, auth_handler, "Authorization",
       
   903                               realm, http_handler, password_manager,
       
   904                               "http://acme.example.com/protected",
       
   905                               "http://acme.example.com/protected",
       
   906                               )
       
   907 
       
   908     def test_proxy_basic_auth(self):
       
   909         opener = OpenerDirector()
       
   910         ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
       
   911         opener.add_handler(ph)
       
   912         password_manager = MockPasswordManager()
       
   913         auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
       
   914         realm = "ACME Networks"
       
   915         http_handler = MockHTTPHandler(
       
   916             407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
       
   917         opener.add_handler(auth_handler)
       
   918         opener.add_handler(http_handler)
       
   919         self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
       
   920                               realm, http_handler, password_manager,
       
   921                               "http://acme.example.com:3128/protected",
       
   922                               "proxy.example.com:3128",
       
   923                               )
       
   924 
       
   925     def test_basic_and_digest_auth_handlers(self):
       
   926         # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
       
   927         # response (http://python.org/sf/1479302), where it should instead
       
   928         # return None to allow another handler (especially
       
   929         # HTTPBasicAuthHandler) to handle the response.
       
   930 
       
   931         # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
       
   932         # try digest first (since it's the strongest auth scheme), so we record
       
   933         # order of calls here to check digest comes first:
       
   934         class RecordingOpenerDirector(OpenerDirector):
       
   935             def __init__(self):
       
   936                 OpenerDirector.__init__(self)
       
   937                 self.recorded = []
       
   938             def record(self, info):
       
   939                 self.recorded.append(info)
       
   940         class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
       
   941             def http_error_401(self, *args, **kwds):
       
   942                 self.parent.record("digest")
       
   943                 urllib2.HTTPDigestAuthHandler.http_error_401(self,
       
   944                                                              *args, **kwds)
       
   945         class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
       
   946             def http_error_401(self, *args, **kwds):
       
   947                 self.parent.record("basic")
       
   948                 urllib2.HTTPBasicAuthHandler.http_error_401(self,
       
   949                                                             *args, **kwds)
       
   950 
       
   951         opener = RecordingOpenerDirector()
       
   952         password_manager = MockPasswordManager()
       
   953         digest_handler = TestDigestAuthHandler(password_manager)
       
   954         basic_handler = TestBasicAuthHandler(password_manager)
       
   955         realm = "ACME Networks"
       
   956         http_handler = MockHTTPHandler(
       
   957             401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
       
   958         opener.add_handler(basic_handler)
       
   959         opener.add_handler(digest_handler)
       
   960         opener.add_handler(http_handler)
       
   961 
       
   962         # check basic auth isn't blocked by digest handler failing
       
   963         self._test_basic_auth(opener, basic_handler, "Authorization",
       
   964                               realm, http_handler, password_manager,
       
   965                               "http://acme.example.com/protected",
       
   966                               "http://acme.example.com/protected",
       
   967                               )
       
   968         # check digest was tried before basic (twice, because
       
   969         # _test_basic_auth called .open() twice)
       
   970         self.assertEqual(opener.recorded, ["digest", "basic"]*2)
       
   971 
       
   972     def _test_basic_auth(self, opener, auth_handler, auth_header,
       
   973                          realm, http_handler, password_manager,
       
   974                          request_url, protected_url):
       
   975         import base64, httplib
       
   976         user, password = "wile", "coyote"
       
   977 
       
   978         # .add_password() fed through to password manager
       
   979         auth_handler.add_password(realm, request_url, user, password)
       
   980         self.assertEqual(realm, password_manager.realm)
       
   981         self.assertEqual(request_url, password_manager.url)
       
   982         self.assertEqual(user, password_manager.user)
       
   983         self.assertEqual(password, password_manager.password)
       
   984 
       
   985         r = opener.open(request_url)
       
   986 
       
   987         # should have asked the password manager for the username/password
       
   988         self.assertEqual(password_manager.target_realm, realm)
       
   989         self.assertEqual(password_manager.target_url, protected_url)
       
   990 
       
   991         # expect one request without authorization, then one with
       
   992         self.assertEqual(len(http_handler.requests), 2)
       
   993         self.assertFalse(http_handler.requests[0].has_header(auth_header))
       
   994         userpass = '%s:%s' % (user, password)
       
   995         auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
       
   996         self.assertEqual(http_handler.requests[1].get_header(auth_header),
       
   997                          auth_hdr_value)
       
   998 
       
   999         # if the password manager can't find a password, the handler won't
       
  1000         # handle the HTTP auth error
       
  1001         password_manager.user = password_manager.password = None
       
  1002         http_handler.reset()
       
  1003         r = opener.open(request_url)
       
  1004         self.assertEqual(len(http_handler.requests), 1)
       
  1005         self.assertFalse(http_handler.requests[0].has_header(auth_header))
       
  1006 
       
  1007 
       
  1008 class MiscTests(unittest.TestCase):
       
  1009 
       
  1010     def test_build_opener(self):
       
  1011         class MyHTTPHandler(urllib2.HTTPHandler): pass
       
  1012         class FooHandler(urllib2.BaseHandler):
       
  1013             def foo_open(self): pass
       
  1014         class BarHandler(urllib2.BaseHandler):
       
  1015             def bar_open(self): pass
       
  1016 
       
  1017         build_opener = urllib2.build_opener
       
  1018 
       
  1019         o = build_opener(FooHandler, BarHandler)
       
  1020         self.opener_has_handler(o, FooHandler)
       
  1021         self.opener_has_handler(o, BarHandler)
       
  1022 
       
  1023         # can take a mix of classes and instances
       
  1024         o = build_opener(FooHandler, BarHandler())
       
  1025         self.opener_has_handler(o, FooHandler)
       
  1026         self.opener_has_handler(o, BarHandler)
       
  1027 
       
  1028         # subclasses of default handlers override default handlers
       
  1029         o = build_opener(MyHTTPHandler)
       
  1030         self.opener_has_handler(o, MyHTTPHandler)
       
  1031 
       
  1032         # a particular case of overriding: default handlers can be passed
       
  1033         # in explicitly
       
  1034         o = build_opener()
       
  1035         self.opener_has_handler(o, urllib2.HTTPHandler)
       
  1036         o = build_opener(urllib2.HTTPHandler)
       
  1037         self.opener_has_handler(o, urllib2.HTTPHandler)
       
  1038         o = build_opener(urllib2.HTTPHandler())
       
  1039         self.opener_has_handler(o, urllib2.HTTPHandler)
       
  1040 
       
  1041     def opener_has_handler(self, opener, handler_class):
       
  1042         for h in opener.handlers:
       
  1043             if h.__class__ == handler_class:
       
  1044                 break
       
  1045         else:
       
  1046             self.assert_(False)
       
  1047 
       
  1048 
       
  1049 def test_main(verbose=None):
       
  1050     from test import test_urllib2
       
  1051     test_support.run_doctest(test_urllib2, verbose)
       
  1052     test_support.run_doctest(urllib2, verbose)
       
  1053     tests = (TrivialTests,
       
  1054              OpenerDirectorTests,
       
  1055              HandlerTests,
       
  1056              MiscTests)
       
  1057     test_support.run_unittest(*tests)
       
  1058 
       
  1059 if __name__ == "__main__":
       
  1060     test_main(verbose=True)