python-2.5.2/win32/Lib/test/test_weakref.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 import gc
       
     2 import sys
       
     3 import unittest
       
     4 import UserList
       
     5 import weakref
       
     6 
       
     7 from test import test_support
       
     8 
       
     9 # Used in ReferencesTestCase.test_ref_created_during_del() .
       
    10 ref_from_del = None
       
    11 
       
    12 class C:
       
    13     def method(self):
       
    14         pass
       
    15 
       
    16 
       
    17 class Callable:
       
    18     bar = None
       
    19 
       
    20     def __call__(self, x):
       
    21         self.bar = x
       
    22 
       
    23 
       
    24 def create_function():
       
    25     def f(): pass
       
    26     return f
       
    27 
       
    28 def create_bound_method():
       
    29     return C().method
       
    30 
       
    31 def create_unbound_method():
       
    32     return C.method
       
    33 
       
    34 
       
    35 class TestBase(unittest.TestCase):
       
    36 
       
    37     def setUp(self):
       
    38         self.cbcalled = 0
       
    39 
       
    40     def callback(self, ref):
       
    41         self.cbcalled += 1
       
    42 
       
    43 
       
    44 class ReferencesTestCase(TestBase):
       
    45 
       
    46     def test_basic_ref(self):
       
    47         self.check_basic_ref(C)
       
    48         self.check_basic_ref(create_function)
       
    49         self.check_basic_ref(create_bound_method)
       
    50         self.check_basic_ref(create_unbound_method)
       
    51 
       
    52         # Just make sure the tp_repr handler doesn't raise an exception.
       
    53         # Live reference:
       
    54         o = C()
       
    55         wr = weakref.ref(o)
       
    56         `wr`
       
    57         # Dead reference:
       
    58         del o
       
    59         `wr`
       
    60 
       
    61     def test_basic_callback(self):
       
    62         self.check_basic_callback(C)
       
    63         self.check_basic_callback(create_function)
       
    64         self.check_basic_callback(create_bound_method)
       
    65         self.check_basic_callback(create_unbound_method)
       
    66 
       
    67     def test_multiple_callbacks(self):
       
    68         o = C()
       
    69         ref1 = weakref.ref(o, self.callback)
       
    70         ref2 = weakref.ref(o, self.callback)
       
    71         del o
       
    72         self.assert_(ref1() is None,
       
    73                      "expected reference to be invalidated")
       
    74         self.assert_(ref2() is None,
       
    75                      "expected reference to be invalidated")
       
    76         self.assert_(self.cbcalled == 2,
       
    77                      "callback not called the right number of times")
       
    78 
       
    79     def test_multiple_selfref_callbacks(self):
       
    80         # Make sure all references are invalidated before callbacks are called
       
    81         #
       
    82         # What's important here is that we're using the first
       
    83         # reference in the callback invoked on the second reference
       
    84         # (the most recently created ref is cleaned up first).  This
       
    85         # tests that all references to the object are invalidated
       
    86         # before any of the callbacks are invoked, so that we only
       
    87         # have one invocation of _weakref.c:cleanup_helper() active
       
    88         # for a particular object at a time.
       
    89         #
       
    90         def callback(object, self=self):
       
    91             self.ref()
       
    92         c = C()
       
    93         self.ref = weakref.ref(c, callback)
       
    94         ref1 = weakref.ref(c, callback)
       
    95         del c
       
    96 
       
    97     def test_proxy_ref(self):
       
    98         o = C()
       
    99         o.bar = 1
       
   100         ref1 = weakref.proxy(o, self.callback)
       
   101         ref2 = weakref.proxy(o, self.callback)
       
   102         del o
       
   103 
       
   104         def check(proxy):
       
   105             proxy.bar
       
   106 
       
   107         self.assertRaises(weakref.ReferenceError, check, ref1)
       
   108         self.assertRaises(weakref.ReferenceError, check, ref2)
       
   109         self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C()))
       
   110         self.assert_(self.cbcalled == 2)
       
   111 
       
   112     def check_basic_ref(self, factory):
       
   113         o = factory()
       
   114         ref = weakref.ref(o)
       
   115         self.assert_(ref() is not None,
       
   116                      "weak reference to live object should be live")
       
   117         o2 = ref()
       
   118         self.assert_(o is o2,
       
   119                      "<ref>() should return original object if live")
       
   120 
       
   121     def check_basic_callback(self, factory):
       
   122         self.cbcalled = 0
       
   123         o = factory()
       
   124         ref = weakref.ref(o, self.callback)
       
   125         del o
       
   126         self.assert_(self.cbcalled == 1,
       
   127                      "callback did not properly set 'cbcalled'")
       
   128         self.assert_(ref() is None,
       
   129                      "ref2 should be dead after deleting object reference")
       
   130 
       
   131     def test_ref_reuse(self):
       
   132         o = C()
       
   133         ref1 = weakref.ref(o)
       
   134         # create a proxy to make sure that there's an intervening creation
       
   135         # between these two; it should make no difference
       
   136         proxy = weakref.proxy(o)
       
   137         ref2 = weakref.ref(o)
       
   138         self.assert_(ref1 is ref2,
       
   139                      "reference object w/out callback should be re-used")
       
   140 
       
   141         o = C()
       
   142         proxy = weakref.proxy(o)
       
   143         ref1 = weakref.ref(o)
       
   144         ref2 = weakref.ref(o)
       
   145         self.assert_(ref1 is ref2,
       
   146                      "reference object w/out callback should be re-used")
       
   147         self.assert_(weakref.getweakrefcount(o) == 2,
       
   148                      "wrong weak ref count for object")
       
   149         del proxy
       
   150         self.assert_(weakref.getweakrefcount(o) == 1,
       
   151                      "wrong weak ref count for object after deleting proxy")
       
   152 
       
   153     def test_proxy_reuse(self):
       
   154         o = C()
       
   155         proxy1 = weakref.proxy(o)
       
   156         ref = weakref.ref(o)
       
   157         proxy2 = weakref.proxy(o)
       
   158         self.assert_(proxy1 is proxy2,
       
   159                      "proxy object w/out callback should have been re-used")
       
   160 
       
   161     def test_basic_proxy(self):
       
   162         o = C()
       
   163         self.check_proxy(o, weakref.proxy(o))
       
   164 
       
   165         L = UserList.UserList()
       
   166         p = weakref.proxy(L)
       
   167         self.failIf(p, "proxy for empty UserList should be false")
       
   168         p.append(12)
       
   169         self.assertEqual(len(L), 1)
       
   170         self.failUnless(p, "proxy for non-empty UserList should be true")
       
   171         p[:] = [2, 3]
       
   172         self.assertEqual(len(L), 2)
       
   173         self.assertEqual(len(p), 2)
       
   174         self.failUnless(3 in p,
       
   175                         "proxy didn't support __contains__() properly")
       
   176         p[1] = 5
       
   177         self.assertEqual(L[1], 5)
       
   178         self.assertEqual(p[1], 5)
       
   179         L2 = UserList.UserList(L)
       
   180         p2 = weakref.proxy(L2)
       
   181         self.assertEqual(p, p2)
       
   182         ## self.assertEqual(repr(L2), repr(p2))
       
   183         L3 = UserList.UserList(range(10))
       
   184         p3 = weakref.proxy(L3)
       
   185         self.assertEqual(L3[:], p3[:])
       
   186         self.assertEqual(L3[5:], p3[5:])
       
   187         self.assertEqual(L3[:5], p3[:5])
       
   188         self.assertEqual(L3[2:5], p3[2:5])
       
   189 
       
   190     # The PyWeakref_* C API is documented as allowing either NULL or
       
   191     # None as the value for the callback, where either means "no
       
   192     # callback".  The "no callback" ref and proxy objects are supposed
       
   193     # to be shared so long as they exist by all callers so long as
       
   194     # they are active.  In Python 2.3.3 and earlier, this guaranttee
       
   195     # was not honored, and was broken in different ways for
       
   196     # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
       
   197 
       
   198     def test_shared_ref_without_callback(self):
       
   199         self.check_shared_without_callback(weakref.ref)
       
   200 
       
   201     def test_shared_proxy_without_callback(self):
       
   202         self.check_shared_without_callback(weakref.proxy)
       
   203 
       
   204     def check_shared_without_callback(self, makeref):
       
   205         o = Object(1)
       
   206         p1 = makeref(o, None)
       
   207         p2 = makeref(o, None)
       
   208         self.assert_(p1 is p2, "both callbacks were None in the C API")
       
   209         del p1, p2
       
   210         p1 = makeref(o)
       
   211         p2 = makeref(o, None)
       
   212         self.assert_(p1 is p2, "callbacks were NULL, None in the C API")
       
   213         del p1, p2
       
   214         p1 = makeref(o)
       
   215         p2 = makeref(o)
       
   216         self.assert_(p1 is p2, "both callbacks were NULL in the C API")
       
   217         del p1, p2
       
   218         p1 = makeref(o, None)
       
   219         p2 = makeref(o)
       
   220         self.assert_(p1 is p2, "callbacks were None, NULL in the C API")
       
   221 
       
   222     def test_callable_proxy(self):
       
   223         o = Callable()
       
   224         ref1 = weakref.proxy(o)
       
   225 
       
   226         self.check_proxy(o, ref1)
       
   227 
       
   228         self.assert_(type(ref1) is weakref.CallableProxyType,
       
   229                      "proxy is not of callable type")
       
   230         ref1('twinkies!')
       
   231         self.assert_(o.bar == 'twinkies!',
       
   232                      "call through proxy not passed through to original")
       
   233         ref1(x='Splat.')
       
   234         self.assert_(o.bar == 'Splat.',
       
   235                      "call through proxy not passed through to original")
       
   236 
       
   237         # expect due to too few args
       
   238         self.assertRaises(TypeError, ref1)
       
   239 
       
   240         # expect due to too many args
       
   241         self.assertRaises(TypeError, ref1, 1, 2, 3)
       
   242 
       
   243     def check_proxy(self, o, proxy):
       
   244         o.foo = 1
       
   245         self.assert_(proxy.foo == 1,
       
   246                      "proxy does not reflect attribute addition")
       
   247         o.foo = 2
       
   248         self.assert_(proxy.foo == 2,
       
   249                      "proxy does not reflect attribute modification")
       
   250         del o.foo
       
   251         self.assert_(not hasattr(proxy, 'foo'),
       
   252                      "proxy does not reflect attribute removal")
       
   253 
       
   254         proxy.foo = 1
       
   255         self.assert_(o.foo == 1,
       
   256                      "object does not reflect attribute addition via proxy")
       
   257         proxy.foo = 2
       
   258         self.assert_(
       
   259             o.foo == 2,
       
   260             "object does not reflect attribute modification via proxy")
       
   261         del proxy.foo
       
   262         self.assert_(not hasattr(o, 'foo'),
       
   263                      "object does not reflect attribute removal via proxy")
       
   264 
       
   265     def test_proxy_deletion(self):
       
   266         # Test clearing of SF bug #762891
       
   267         class Foo:
       
   268             result = None
       
   269             def __delitem__(self, accessor):
       
   270                 self.result = accessor
       
   271         g = Foo()
       
   272         f = weakref.proxy(g)
       
   273         del f[0]
       
   274         self.assertEqual(f.result, 0)
       
   275 
       
   276     def test_proxy_bool(self):
       
   277         # Test clearing of SF bug #1170766
       
   278         class List(list): pass
       
   279         lyst = List()
       
   280         self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
       
   281 
       
   282     def test_getweakrefcount(self):
       
   283         o = C()
       
   284         ref1 = weakref.ref(o)
       
   285         ref2 = weakref.ref(o, self.callback)
       
   286         self.assert_(weakref.getweakrefcount(o) == 2,
       
   287                      "got wrong number of weak reference objects")
       
   288 
       
   289         proxy1 = weakref.proxy(o)
       
   290         proxy2 = weakref.proxy(o, self.callback)
       
   291         self.assert_(weakref.getweakrefcount(o) == 4,
       
   292                      "got wrong number of weak reference objects")
       
   293 
       
   294         del ref1, ref2, proxy1, proxy2
       
   295         self.assert_(weakref.getweakrefcount(o) == 0,
       
   296                      "weak reference objects not unlinked from"
       
   297                      " referent when discarded.")
       
   298 
       
   299         # assumes ints do not support weakrefs
       
   300         self.assert_(weakref.getweakrefcount(1) == 0,
       
   301                      "got wrong number of weak reference objects for int")
       
   302 
       
   303     def test_getweakrefs(self):
       
   304         o = C()
       
   305         ref1 = weakref.ref(o, self.callback)
       
   306         ref2 = weakref.ref(o, self.callback)
       
   307         del ref1
       
   308         self.assert_(weakref.getweakrefs(o) == [ref2],
       
   309                      "list of refs does not match")
       
   310 
       
   311         o = C()
       
   312         ref1 = weakref.ref(o, self.callback)
       
   313         ref2 = weakref.ref(o, self.callback)
       
   314         del ref2
       
   315         self.assert_(weakref.getweakrefs(o) == [ref1],
       
   316                      "list of refs does not match")
       
   317 
       
   318         del ref1
       
   319         self.assert_(weakref.getweakrefs(o) == [],
       
   320                      "list of refs not cleared")
       
   321 
       
   322         # assumes ints do not support weakrefs
       
   323         self.assert_(weakref.getweakrefs(1) == [],
       
   324                      "list of refs does not match for int")
       
   325 
       
   326     def test_newstyle_number_ops(self):
       
   327         class F(float):
       
   328             pass
       
   329         f = F(2.0)
       
   330         p = weakref.proxy(f)
       
   331         self.assert_(p + 1.0 == 3.0)
       
   332         self.assert_(1.0 + p == 3.0)  # this used to SEGV
       
   333 
       
   334     def test_callbacks_protected(self):
       
   335         # Callbacks protected from already-set exceptions?
       
   336         # Regression test for SF bug #478534.
       
   337         class BogusError(Exception):
       
   338             pass
       
   339         data = {}
       
   340         def remove(k):
       
   341             del data[k]
       
   342         def encapsulate():
       
   343             f = lambda : ()
       
   344             data[weakref.ref(f, remove)] = None
       
   345             raise BogusError
       
   346         try:
       
   347             encapsulate()
       
   348         except BogusError:
       
   349             pass
       
   350         else:
       
   351             self.fail("exception not properly restored")
       
   352         try:
       
   353             encapsulate()
       
   354         except BogusError:
       
   355             pass
       
   356         else:
       
   357             self.fail("exception not properly restored")
       
   358 
       
   359     def test_sf_bug_840829(self):
       
   360         # "weakref callbacks and gc corrupt memory"
       
   361         # subtype_dealloc erroneously exposed a new-style instance
       
   362         # already in the process of getting deallocated to gc,
       
   363         # causing double-deallocation if the instance had a weakref
       
   364         # callback that triggered gc.
       
   365         # If the bug exists, there probably won't be an obvious symptom
       
   366         # in a release build.  In a debug build, a segfault will occur
       
   367         # when the second attempt to remove the instance from the "list
       
   368         # of all objects" occurs.
       
   369 
       
   370         import gc
       
   371 
       
   372         class C(object):
       
   373             pass
       
   374 
       
   375         c = C()
       
   376         wr = weakref.ref(c, lambda ignore: gc.collect())
       
   377         del c
       
   378 
       
   379         # There endeth the first part.  It gets worse.
       
   380         del wr
       
   381 
       
   382         c1 = C()
       
   383         c1.i = C()
       
   384         wr = weakref.ref(c1.i, lambda ignore: gc.collect())
       
   385 
       
   386         c2 = C()
       
   387         c2.c1 = c1
       
   388         del c1  # still alive because c2 points to it
       
   389 
       
   390         # Now when subtype_dealloc gets called on c2, it's not enough just
       
   391         # that c2 is immune from gc while the weakref callbacks associated
       
   392         # with c2 execute (there are none in this 2nd half of the test, btw).
       
   393         # subtype_dealloc goes on to call the base classes' deallocs too,
       
   394         # so any gc triggered by weakref callbacks associated with anything
       
   395         # torn down by a base class dealloc can also trigger double
       
   396         # deallocation of c2.
       
   397         del c2
       
   398 
       
   399     def test_callback_in_cycle_1(self):
       
   400         import gc
       
   401 
       
   402         class J(object):
       
   403             pass
       
   404 
       
   405         class II(object):
       
   406             def acallback(self, ignore):
       
   407                 self.J
       
   408 
       
   409         I = II()
       
   410         I.J = J
       
   411         I.wr = weakref.ref(J, I.acallback)
       
   412 
       
   413         # Now J and II are each in a self-cycle (as all new-style class
       
   414         # objects are, since their __mro__ points back to them).  I holds
       
   415         # both a weak reference (I.wr) and a strong reference (I.J) to class
       
   416         # J.  I is also in a cycle (I.wr points to a weakref that references
       
   417         # I.acallback).  When we del these three, they all become trash, but
       
   418         # the cycles prevent any of them from getting cleaned up immediately.
       
   419         # Instead they have to wait for cyclic gc to deduce that they're
       
   420         # trash.
       
   421         #
       
   422         # gc used to call tp_clear on all of them, and the order in which
       
   423         # it does that is pretty accidental.  The exact order in which we
       
   424         # built up these things manages to provoke gc into running tp_clear
       
   425         # in just the right order (I last).  Calling tp_clear on II leaves
       
   426         # behind an insane class object (its __mro__ becomes NULL).  Calling
       
   427         # tp_clear on J breaks its self-cycle, but J doesn't get deleted
       
   428         # just then because of the strong reference from I.J.  Calling
       
   429         # tp_clear on I starts to clear I's __dict__, and just happens to
       
   430         # clear I.J first -- I.wr is still intact.  That removes the last
       
   431         # reference to J, which triggers the weakref callback.  The callback
       
   432         # tries to do "self.J", and instances of new-style classes look up
       
   433         # attributes ("J") in the class dict first.  The class (II) wants to
       
   434         # search II.__mro__, but that's NULL.   The result was a segfault in
       
   435         # a release build, and an assert failure in a debug build.
       
   436         del I, J, II
       
   437         gc.collect()
       
   438 
       
   439     def test_callback_in_cycle_2(self):
       
   440         import gc
       
   441 
       
   442         # This is just like test_callback_in_cycle_1, except that II is an
       
   443         # old-style class.  The symptom is different then:  an instance of an
       
   444         # old-style class looks in its own __dict__ first.  'J' happens to
       
   445         # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
       
   446         # __dict__, so the attribute isn't found.  The difference is that
       
   447         # the old-style II doesn't have a NULL __mro__ (it doesn't have any
       
   448         # __mro__), so no segfault occurs.  Instead it got:
       
   449         #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
       
   450         #    Exception exceptions.AttributeError:
       
   451         #   "II instance has no attribute 'J'" in <bound method II.acallback
       
   452         #       of <?.II instance at 0x00B9B4B8>> ignored
       
   453 
       
   454         class J(object):
       
   455             pass
       
   456 
       
   457         class II:
       
   458             def acallback(self, ignore):
       
   459                 self.J
       
   460 
       
   461         I = II()
       
   462         I.J = J
       
   463         I.wr = weakref.ref(J, I.acallback)
       
   464 
       
   465         del I, J, II
       
   466         gc.collect()
       
   467 
       
   468     def test_callback_in_cycle_3(self):
       
   469         import gc
       
   470 
       
   471         # This one broke the first patch that fixed the last two.  In this
       
   472         # case, the objects reachable from the callback aren't also reachable
       
   473         # from the object (c1) *triggering* the callback:  you can get to
       
   474         # c1 from c2, but not vice-versa.  The result was that c2's __dict__
       
   475         # got tp_clear'ed by the time the c2.cb callback got invoked.
       
   476 
       
   477         class C:
       
   478             def cb(self, ignore):
       
   479                 self.me
       
   480                 self.c1
       
   481                 self.wr
       
   482 
       
   483         c1, c2 = C(), C()
       
   484 
       
   485         c2.me = c2
       
   486         c2.c1 = c1
       
   487         c2.wr = weakref.ref(c1, c2.cb)
       
   488 
       
   489         del c1, c2
       
   490         gc.collect()
       
   491 
       
   492     def test_callback_in_cycle_4(self):
       
   493         import gc
       
   494 
       
   495         # Like test_callback_in_cycle_3, except c2 and c1 have different
       
   496         # classes.  c2's class (C) isn't reachable from c1 then, so protecting
       
   497         # objects reachable from the dying object (c1) isn't enough to stop
       
   498         # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
       
   499         # The result was a segfault (C.__mro__ was NULL when the callback
       
   500         # tried to look up self.me).
       
   501 
       
   502         class C(object):
       
   503             def cb(self, ignore):
       
   504                 self.me
       
   505                 self.c1
       
   506                 self.wr
       
   507 
       
   508         class D:
       
   509             pass
       
   510 
       
   511         c1, c2 = D(), C()
       
   512 
       
   513         c2.me = c2
       
   514         c2.c1 = c1
       
   515         c2.wr = weakref.ref(c1, c2.cb)
       
   516 
       
   517         del c1, c2, C, D
       
   518         gc.collect()
       
   519 
       
   520     def test_callback_in_cycle_resurrection(self):
       
   521         import gc
       
   522 
       
   523         # Do something nasty in a weakref callback:  resurrect objects
       
   524         # from dead cycles.  For this to be attempted, the weakref and
       
   525         # its callback must also be part of the cyclic trash (else the
       
   526         # objects reachable via the callback couldn't be in cyclic trash
       
   527         # to begin with -- the callback would act like an external root).
       
   528         # But gc clears trash weakrefs with callbacks early now, which
       
   529         # disables the callbacks, so the callbacks shouldn't get called
       
   530         # at all (and so nothing actually gets resurrected).
       
   531 
       
   532         alist = []
       
   533         class C(object):
       
   534             def __init__(self, value):
       
   535                 self.attribute = value
       
   536 
       
   537             def acallback(self, ignore):
       
   538                 alist.append(self.c)
       
   539 
       
   540         c1, c2 = C(1), C(2)
       
   541         c1.c = c2
       
   542         c2.c = c1
       
   543         c1.wr = weakref.ref(c2, c1.acallback)
       
   544         c2.wr = weakref.ref(c1, c2.acallback)
       
   545 
       
   546         def C_went_away(ignore):
       
   547             alist.append("C went away")
       
   548         wr = weakref.ref(C, C_went_away)
       
   549 
       
   550         del c1, c2, C   # make them all trash
       
   551         self.assertEqual(alist, [])  # del isn't enough to reclaim anything
       
   552 
       
   553         gc.collect()
       
   554         # c1.wr and c2.wr were part of the cyclic trash, so should have
       
   555         # been cleared without their callbacks executing.  OTOH, the weakref
       
   556         # to C is bound to a function local (wr), and wasn't trash, so that
       
   557         # callback should have been invoked when C went away.
       
   558         self.assertEqual(alist, ["C went away"])
       
   559         # The remaining weakref should be dead now (its callback ran).
       
   560         self.assertEqual(wr(), None)
       
   561 
       
   562         del alist[:]
       
   563         gc.collect()
       
   564         self.assertEqual(alist, [])
       
   565 
       
   566     def test_callbacks_on_callback(self):
       
   567         import gc
       
   568 
       
   569         # Set up weakref callbacks *on* weakref callbacks.
       
   570         alist = []
       
   571         def safe_callback(ignore):
       
   572             alist.append("safe_callback called")
       
   573 
       
   574         class C(object):
       
   575             def cb(self, ignore):
       
   576                 alist.append("cb called")
       
   577 
       
   578         c, d = C(), C()
       
   579         c.other = d
       
   580         d.other = c
       
   581         callback = c.cb
       
   582         c.wr = weakref.ref(d, callback)     # this won't trigger
       
   583         d.wr = weakref.ref(callback, d.cb)  # ditto
       
   584         external_wr = weakref.ref(callback, safe_callback)  # but this will
       
   585         self.assert_(external_wr() is callback)
       
   586 
       
   587         # The weakrefs attached to c and d should get cleared, so that
       
   588         # C.cb is never called.  But external_wr isn't part of the cyclic
       
   589         # trash, and no cyclic trash is reachable from it, so safe_callback
       
   590         # should get invoked when the bound method object callback (c.cb)
       
   591         # -- which is itself a callback, and also part of the cyclic trash --
       
   592         # gets reclaimed at the end of gc.
       
   593 
       
   594         del callback, c, d, C
       
   595         self.assertEqual(alist, [])  # del isn't enough to clean up cycles
       
   596         gc.collect()
       
   597         self.assertEqual(alist, ["safe_callback called"])
       
   598         self.assertEqual(external_wr(), None)
       
   599 
       
   600         del alist[:]
       
   601         gc.collect()
       
   602         self.assertEqual(alist, [])
       
   603 
       
   604     def test_gc_during_ref_creation(self):
       
   605         self.check_gc_during_creation(weakref.ref)
       
   606 
       
   607     def test_gc_during_proxy_creation(self):
       
   608         self.check_gc_during_creation(weakref.proxy)
       
   609 
       
   610     def check_gc_during_creation(self, makeref):
       
   611         thresholds = gc.get_threshold()
       
   612         gc.set_threshold(1, 1, 1)
       
   613         gc.collect()
       
   614         class A:
       
   615             pass
       
   616 
       
   617         def callback(*args):
       
   618             pass
       
   619 
       
   620         referenced = A()
       
   621 
       
   622         a = A()
       
   623         a.a = a
       
   624         a.wr = makeref(referenced)
       
   625 
       
   626         try:
       
   627             # now make sure the object and the ref get labeled as
       
   628             # cyclic trash:
       
   629             a = A()
       
   630             weakref.ref(referenced, callback)
       
   631 
       
   632         finally:
       
   633             gc.set_threshold(*thresholds)
       
   634 
       
   635     def test_ref_created_during_del(self):
       
   636         # Bug #1377858
       
   637         # A weakref created in an object's __del__() would crash the
       
   638         # interpreter when the weakref was cleaned up since it would refer to
       
   639         # non-existent memory.  This test should not segfault the interpreter.
       
   640         class Target(object):
       
   641             def __del__(self):
       
   642                 global ref_from_del
       
   643                 ref_from_del = weakref.ref(self)
       
   644 
       
   645         w = Target()
       
   646 
       
   647 
       
   648 class SubclassableWeakrefTestCase(unittest.TestCase):
       
   649 
       
   650     def test_subclass_refs(self):
       
   651         class MyRef(weakref.ref):
       
   652             def __init__(self, ob, callback=None, value=42):
       
   653                 self.value = value
       
   654                 super(MyRef, self).__init__(ob, callback)
       
   655             def __call__(self):
       
   656                 self.called = True
       
   657                 return super(MyRef, self).__call__()
       
   658         o = Object("foo")
       
   659         mr = MyRef(o, value=24)
       
   660         self.assert_(mr() is o)
       
   661         self.assert_(mr.called)
       
   662         self.assertEqual(mr.value, 24)
       
   663         del o
       
   664         self.assert_(mr() is None)
       
   665         self.assert_(mr.called)
       
   666 
       
   667     def test_subclass_refs_dont_replace_standard_refs(self):
       
   668         class MyRef(weakref.ref):
       
   669             pass
       
   670         o = Object(42)
       
   671         r1 = MyRef(o)
       
   672         r2 = weakref.ref(o)
       
   673         self.assert_(r1 is not r2)
       
   674         self.assertEqual(weakref.getweakrefs(o), [r2, r1])
       
   675         self.assertEqual(weakref.getweakrefcount(o), 2)
       
   676         r3 = MyRef(o)
       
   677         self.assertEqual(weakref.getweakrefcount(o), 3)
       
   678         refs = weakref.getweakrefs(o)
       
   679         self.assertEqual(len(refs), 3)
       
   680         self.assert_(r2 is refs[0])
       
   681         self.assert_(r1 in refs[1:])
       
   682         self.assert_(r3 in refs[1:])
       
   683 
       
   684     def test_subclass_refs_dont_conflate_callbacks(self):
       
   685         class MyRef(weakref.ref):
       
   686             pass
       
   687         o = Object(42)
       
   688         r1 = MyRef(o, id)
       
   689         r2 = MyRef(o, str)
       
   690         self.assert_(r1 is not r2)
       
   691         refs = weakref.getweakrefs(o)
       
   692         self.assert_(r1 in refs)
       
   693         self.assert_(r2 in refs)
       
   694 
       
   695     def test_subclass_refs_with_slots(self):
       
   696         class MyRef(weakref.ref):
       
   697             __slots__ = "slot1", "slot2"
       
   698             def __new__(type, ob, callback, slot1, slot2):
       
   699                 return weakref.ref.__new__(type, ob, callback)
       
   700             def __init__(self, ob, callback, slot1, slot2):
       
   701                 self.slot1 = slot1
       
   702                 self.slot2 = slot2
       
   703             def meth(self):
       
   704                 return self.slot1 + self.slot2
       
   705         o = Object(42)
       
   706         r = MyRef(o, None, "abc", "def")
       
   707         self.assertEqual(r.slot1, "abc")
       
   708         self.assertEqual(r.slot2, "def")
       
   709         self.assertEqual(r.meth(), "abcdef")
       
   710         self.failIf(hasattr(r, "__dict__"))
       
   711 
       
   712 
       
   713 class Object:
       
   714     def __init__(self, arg):
       
   715         self.arg = arg
       
   716     def __repr__(self):
       
   717         return "<Object %r>" % self.arg
       
   718 
       
   719 
       
   720 class MappingTestCase(TestBase):
       
   721 
       
   722     COUNT = 10
       
   723 
       
   724     def test_weak_values(self):
       
   725         #
       
   726         #  This exercises d.copy(), d.items(), d[], del d[], len(d).
       
   727         #
       
   728         dict, objects = self.make_weak_valued_dict()
       
   729         for o in objects:
       
   730             self.assert_(weakref.getweakrefcount(o) == 1,
       
   731                          "wrong number of weak references to %r!" % o)
       
   732             self.assert_(o is dict[o.arg],
       
   733                          "wrong object returned by weak dict!")
       
   734         items1 = dict.items()
       
   735         items2 = dict.copy().items()
       
   736         items1.sort()
       
   737         items2.sort()
       
   738         self.assert_(items1 == items2,
       
   739                      "cloning of weak-valued dictionary did not work!")
       
   740         del items1, items2
       
   741         self.assert_(len(dict) == self.COUNT)
       
   742         del objects[0]
       
   743         self.assert_(len(dict) == (self.COUNT - 1),
       
   744                      "deleting object did not cause dictionary update")
       
   745         del objects, o
       
   746         self.assert_(len(dict) == 0,
       
   747                      "deleting the values did not clear the dictionary")
       
   748         # regression on SF bug #447152:
       
   749         dict = weakref.WeakValueDictionary()
       
   750         self.assertRaises(KeyError, dict.__getitem__, 1)
       
   751         dict[2] = C()
       
   752         self.assertRaises(KeyError, dict.__getitem__, 2)
       
   753 
       
   754     def test_weak_keys(self):
       
   755         #
       
   756         #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
       
   757         #  len(d), d.has_key().
       
   758         #
       
   759         dict, objects = self.make_weak_keyed_dict()
       
   760         for o in objects:
       
   761             self.assert_(weakref.getweakrefcount(o) == 1,
       
   762                          "wrong number of weak references to %r!" % o)
       
   763             self.assert_(o.arg is dict[o],
       
   764                          "wrong object returned by weak dict!")
       
   765         items1 = dict.items()
       
   766         items2 = dict.copy().items()
       
   767         self.assert_(set(items1) == set(items2),
       
   768                      "cloning of weak-keyed dictionary did not work!")
       
   769         del items1, items2
       
   770         self.assert_(len(dict) == self.COUNT)
       
   771         del objects[0]
       
   772         self.assert_(len(dict) == (self.COUNT - 1),
       
   773                      "deleting object did not cause dictionary update")
       
   774         del objects, o
       
   775         self.assert_(len(dict) == 0,
       
   776                      "deleting the keys did not clear the dictionary")
       
   777         o = Object(42)
       
   778         dict[o] = "What is the meaning of the universe?"
       
   779         self.assert_(dict.has_key(o))
       
   780         self.assert_(not dict.has_key(34))
       
   781 
       
   782     def test_weak_keyed_iters(self):
       
   783         dict, objects = self.make_weak_keyed_dict()
       
   784         self.check_iters(dict)
       
   785 
       
   786         # Test keyrefs()
       
   787         refs = dict.keyrefs()
       
   788         self.assertEqual(len(refs), len(objects))
       
   789         objects2 = list(objects)
       
   790         for wr in refs:
       
   791             ob = wr()
       
   792             self.assert_(dict.has_key(ob))
       
   793             self.assert_(ob in dict)
       
   794             self.assertEqual(ob.arg, dict[ob])
       
   795             objects2.remove(ob)
       
   796         self.assertEqual(len(objects2), 0)
       
   797 
       
   798         # Test iterkeyrefs()
       
   799         objects2 = list(objects)
       
   800         self.assertEqual(len(list(dict.iterkeyrefs())), len(objects))
       
   801         for wr in dict.iterkeyrefs():
       
   802             ob = wr()
       
   803             self.assert_(dict.has_key(ob))
       
   804             self.assert_(ob in dict)
       
   805             self.assertEqual(ob.arg, dict[ob])
       
   806             objects2.remove(ob)
       
   807         self.assertEqual(len(objects2), 0)
       
   808 
       
   809     def test_weak_valued_iters(self):
       
   810         dict, objects = self.make_weak_valued_dict()
       
   811         self.check_iters(dict)
       
   812 
       
   813         # Test valuerefs()
       
   814         refs = dict.valuerefs()
       
   815         self.assertEqual(len(refs), len(objects))
       
   816         objects2 = list(objects)
       
   817         for wr in refs:
       
   818             ob = wr()
       
   819             self.assertEqual(ob, dict[ob.arg])
       
   820             self.assertEqual(ob.arg, dict[ob.arg].arg)
       
   821             objects2.remove(ob)
       
   822         self.assertEqual(len(objects2), 0)
       
   823 
       
   824         # Test itervaluerefs()
       
   825         objects2 = list(objects)
       
   826         self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
       
   827         for wr in dict.itervaluerefs():
       
   828             ob = wr()
       
   829             self.assertEqual(ob, dict[ob.arg])
       
   830             self.assertEqual(ob.arg, dict[ob.arg].arg)
       
   831             objects2.remove(ob)
       
   832         self.assertEqual(len(objects2), 0)
       
   833 
       
   834     def check_iters(self, dict):
       
   835         # item iterator:
       
   836         items = dict.items()
       
   837         for item in dict.iteritems():
       
   838             items.remove(item)
       
   839         self.assert_(len(items) == 0, "iteritems() did not touch all items")
       
   840 
       
   841         # key iterator, via __iter__():
       
   842         keys = dict.keys()
       
   843         for k in dict:
       
   844             keys.remove(k)
       
   845         self.assert_(len(keys) == 0, "__iter__() did not touch all keys")
       
   846 
       
   847         # key iterator, via iterkeys():
       
   848         keys = dict.keys()
       
   849         for k in dict.iterkeys():
       
   850             keys.remove(k)
       
   851         self.assert_(len(keys) == 0, "iterkeys() did not touch all keys")
       
   852 
       
   853         # value iterator:
       
   854         values = dict.values()
       
   855         for v in dict.itervalues():
       
   856             values.remove(v)
       
   857         self.assert_(len(values) == 0,
       
   858                      "itervalues() did not touch all values")
       
   859 
       
   860     def test_make_weak_keyed_dict_from_dict(self):
       
   861         o = Object(3)
       
   862         dict = weakref.WeakKeyDictionary({o:364})
       
   863         self.assert_(dict[o] == 364)
       
   864 
       
   865     def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
       
   866         o = Object(3)
       
   867         dict = weakref.WeakKeyDictionary({o:364})
       
   868         dict2 = weakref.WeakKeyDictionary(dict)
       
   869         self.assert_(dict[o] == 364)
       
   870 
       
   871     def make_weak_keyed_dict(self):
       
   872         dict = weakref.WeakKeyDictionary()
       
   873         objects = map(Object, range(self.COUNT))
       
   874         for o in objects:
       
   875             dict[o] = o.arg
       
   876         return dict, objects
       
   877 
       
   878     def make_weak_valued_dict(self):
       
   879         dict = weakref.WeakValueDictionary()
       
   880         objects = map(Object, range(self.COUNT))
       
   881         for o in objects:
       
   882             dict[o.arg] = o
       
   883         return dict, objects
       
   884 
       
   885     def check_popitem(self, klass, key1, value1, key2, value2):
       
   886         weakdict = klass()
       
   887         weakdict[key1] = value1
       
   888         weakdict[key2] = value2
       
   889         self.assert_(len(weakdict) == 2)
       
   890         k, v = weakdict.popitem()
       
   891         self.assert_(len(weakdict) == 1)
       
   892         if k is key1:
       
   893             self.assert_(v is value1)
       
   894         else:
       
   895             self.assert_(v is value2)
       
   896         k, v = weakdict.popitem()
       
   897         self.assert_(len(weakdict) == 0)
       
   898         if k is key1:
       
   899             self.assert_(v is value1)
       
   900         else:
       
   901             self.assert_(v is value2)
       
   902 
       
   903     def test_weak_valued_dict_popitem(self):
       
   904         self.check_popitem(weakref.WeakValueDictionary,
       
   905                            "key1", C(), "key2", C())
       
   906 
       
   907     def test_weak_keyed_dict_popitem(self):
       
   908         self.check_popitem(weakref.WeakKeyDictionary,
       
   909                            C(), "value 1", C(), "value 2")
       
   910 
       
   911     def check_setdefault(self, klass, key, value1, value2):
       
   912         self.assert_(value1 is not value2,
       
   913                      "invalid test"
       
   914                      " -- value parameters must be distinct objects")
       
   915         weakdict = klass()
       
   916         o = weakdict.setdefault(key, value1)
       
   917         self.assert_(o is value1)
       
   918         self.assert_(weakdict.has_key(key))
       
   919         self.assert_(weakdict.get(key) is value1)
       
   920         self.assert_(weakdict[key] is value1)
       
   921 
       
   922         o = weakdict.setdefault(key, value2)
       
   923         self.assert_(o is value1)
       
   924         self.assert_(weakdict.has_key(key))
       
   925         self.assert_(weakdict.get(key) is value1)
       
   926         self.assert_(weakdict[key] is value1)
       
   927 
       
   928     def test_weak_valued_dict_setdefault(self):
       
   929         self.check_setdefault(weakref.WeakValueDictionary,
       
   930                               "key", C(), C())
       
   931 
       
   932     def test_weak_keyed_dict_setdefault(self):
       
   933         self.check_setdefault(weakref.WeakKeyDictionary,
       
   934                               C(), "value 1", "value 2")
       
   935 
       
   936     def check_update(self, klass, dict):
       
   937         #
       
   938         #  This exercises d.update(), len(d), d.keys(), d.has_key(),
       
   939         #  d.get(), d[].
       
   940         #
       
   941         weakdict = klass()
       
   942         weakdict.update(dict)
       
   943         self.assert_(len(weakdict) == len(dict))
       
   944         for k in weakdict.keys():
       
   945             self.assert_(dict.has_key(k),
       
   946                          "mysterious new key appeared in weak dict")
       
   947             v = dict.get(k)
       
   948             self.assert_(v is weakdict[k])
       
   949             self.assert_(v is weakdict.get(k))
       
   950         for k in dict.keys():
       
   951             self.assert_(weakdict.has_key(k),
       
   952                          "original key disappeared in weak dict")
       
   953             v = dict[k]
       
   954             self.assert_(v is weakdict[k])
       
   955             self.assert_(v is weakdict.get(k))
       
   956 
       
   957     def test_weak_valued_dict_update(self):
       
   958         self.check_update(weakref.WeakValueDictionary,
       
   959                           {1: C(), 'a': C(), C(): C()})
       
   960 
       
   961     def test_weak_keyed_dict_update(self):
       
   962         self.check_update(weakref.WeakKeyDictionary,
       
   963                           {C(): 1, C(): 2, C(): 3})
       
   964 
       
   965     def test_weak_keyed_delitem(self):
       
   966         d = weakref.WeakKeyDictionary()
       
   967         o1 = Object('1')
       
   968         o2 = Object('2')
       
   969         d[o1] = 'something'
       
   970         d[o2] = 'something'
       
   971         self.assert_(len(d) == 2)
       
   972         del d[o1]
       
   973         self.assert_(len(d) == 1)
       
   974         self.assert_(d.keys() == [o2])
       
   975 
       
   976     def test_weak_valued_delitem(self):
       
   977         d = weakref.WeakValueDictionary()
       
   978         o1 = Object('1')
       
   979         o2 = Object('2')
       
   980         d['something'] = o1
       
   981         d['something else'] = o2
       
   982         self.assert_(len(d) == 2)
       
   983         del d['something']
       
   984         self.assert_(len(d) == 1)
       
   985         self.assert_(d.items() == [('something else', o2)])
       
   986 
       
   987     def test_weak_keyed_bad_delitem(self):
       
   988         d = weakref.WeakKeyDictionary()
       
   989         o = Object('1')
       
   990         # An attempt to delete an object that isn't there should raise
       
   991         # KeyError.  It didn't before 2.3.
       
   992         self.assertRaises(KeyError, d.__delitem__, o)
       
   993         self.assertRaises(KeyError, d.__getitem__, o)
       
   994 
       
   995         # If a key isn't of a weakly referencable type, __getitem__ and
       
   996         # __setitem__ raise TypeError.  __delitem__ should too.
       
   997         self.assertRaises(TypeError, d.__delitem__,  13)
       
   998         self.assertRaises(TypeError, d.__getitem__,  13)
       
   999         self.assertRaises(TypeError, d.__setitem__,  13, 13)
       
  1000 
       
  1001     def test_weak_keyed_cascading_deletes(self):
       
  1002         # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
       
  1003         # over the keys via self.data.iterkeys().  If things vanished from
       
  1004         # the dict during this (or got added), that caused a RuntimeError.
       
  1005 
       
  1006         d = weakref.WeakKeyDictionary()
       
  1007         mutate = False
       
  1008 
       
  1009         class C(object):
       
  1010             def __init__(self, i):
       
  1011                 self.value = i
       
  1012             def __hash__(self):
       
  1013                 return hash(self.value)
       
  1014             def __eq__(self, other):
       
  1015                 if mutate:
       
  1016                     # Side effect that mutates the dict, by removing the
       
  1017                     # last strong reference to a key.
       
  1018                     del objs[-1]
       
  1019                 return self.value == other.value
       
  1020 
       
  1021         objs = [C(i) for i in range(4)]
       
  1022         for o in objs:
       
  1023             d[o] = o.value
       
  1024         del o   # now the only strong references to keys are in objs
       
  1025         # Find the order in which iterkeys sees the keys.
       
  1026         objs = d.keys()
       
  1027         # Reverse it, so that the iteration implementation of __delitem__
       
  1028         # has to keep looping to find the first object we delete.
       
  1029         objs.reverse()
       
  1030 
       
  1031         # Turn on mutation in C.__eq__.  The first time thru the loop,
       
  1032         # under the iterkeys() business the first comparison will delete
       
  1033         # the last item iterkeys() would see, and that causes a
       
  1034         #     RuntimeError: dictionary changed size during iteration
       
  1035         # when the iterkeys() loop goes around to try comparing the next
       
  1036         # key.  After this was fixed, it just deletes the last object *our*
       
  1037         # "for o in obj" loop would have gotten to.
       
  1038         mutate = True
       
  1039         count = 0
       
  1040         for o in objs:
       
  1041             count += 1
       
  1042             del d[o]
       
  1043         self.assertEqual(len(d), 0)
       
  1044         self.assertEqual(count, 2)
       
  1045 
       
  1046 from test import mapping_tests
       
  1047 
       
  1048 class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
       
  1049     """Check that WeakValueDictionary conforms to the mapping protocol"""
       
  1050     __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
       
  1051     type2test = weakref.WeakValueDictionary
       
  1052     def _reference(self):
       
  1053         return self.__ref.copy()
       
  1054 
       
  1055 class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
       
  1056     """Check that WeakKeyDictionary conforms to the mapping protocol"""
       
  1057     __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
       
  1058     type2test = weakref.WeakKeyDictionary
       
  1059     def _reference(self):
       
  1060         return self.__ref.copy()
       
  1061 
       
  1062 libreftest = """ Doctest for examples in the library reference: libweakref.tex
       
  1063 
       
  1064 >>> import weakref
       
  1065 >>> class Dict(dict):
       
  1066 ...     pass
       
  1067 ...
       
  1068 >>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
       
  1069 >>> r = weakref.ref(obj)
       
  1070 >>> print r() is obj
       
  1071 True
       
  1072 
       
  1073 >>> import weakref
       
  1074 >>> class Object:
       
  1075 ...     pass
       
  1076 ...
       
  1077 >>> o = Object()
       
  1078 >>> r = weakref.ref(o)
       
  1079 >>> o2 = r()
       
  1080 >>> o is o2
       
  1081 True
       
  1082 >>> del o, o2
       
  1083 >>> print r()
       
  1084 None
       
  1085 
       
  1086 >>> import weakref
       
  1087 >>> class ExtendedRef(weakref.ref):
       
  1088 ...     def __init__(self, ob, callback=None, **annotations):
       
  1089 ...         super(ExtendedRef, self).__init__(ob, callback)
       
  1090 ...         self.__counter = 0
       
  1091 ...         for k, v in annotations.iteritems():
       
  1092 ...             setattr(self, k, v)
       
  1093 ...     def __call__(self):
       
  1094 ...         '''Return a pair containing the referent and the number of
       
  1095 ...         times the reference has been called.
       
  1096 ...         '''
       
  1097 ...         ob = super(ExtendedRef, self).__call__()
       
  1098 ...         if ob is not None:
       
  1099 ...             self.__counter += 1
       
  1100 ...             ob = (ob, self.__counter)
       
  1101 ...         return ob
       
  1102 ...
       
  1103 >>> class A:   # not in docs from here, just testing the ExtendedRef
       
  1104 ...     pass
       
  1105 ...
       
  1106 >>> a = A()
       
  1107 >>> r = ExtendedRef(a, foo=1, bar="baz")
       
  1108 >>> r.foo
       
  1109 1
       
  1110 >>> r.bar
       
  1111 'baz'
       
  1112 >>> r()[1]
       
  1113 1
       
  1114 >>> r()[1]
       
  1115 2
       
  1116 >>> r()[0] is a
       
  1117 True
       
  1118 
       
  1119 
       
  1120 >>> import weakref
       
  1121 >>> _id2obj_dict = weakref.WeakValueDictionary()
       
  1122 >>> def remember(obj):
       
  1123 ...     oid = id(obj)
       
  1124 ...     _id2obj_dict[oid] = obj
       
  1125 ...     return oid
       
  1126 ...
       
  1127 >>> def id2obj(oid):
       
  1128 ...     return _id2obj_dict[oid]
       
  1129 ...
       
  1130 >>> a = A()             # from here, just testing
       
  1131 >>> a_id = remember(a)
       
  1132 >>> id2obj(a_id) is a
       
  1133 True
       
  1134 >>> del a
       
  1135 >>> try:
       
  1136 ...     id2obj(a_id)
       
  1137 ... except KeyError:
       
  1138 ...     print 'OK'
       
  1139 ... else:
       
  1140 ...     print 'WeakValueDictionary error'
       
  1141 OK
       
  1142 
       
  1143 """
       
  1144 
       
  1145 __test__ = {'libreftest' : libreftest}
       
  1146 
       
  1147 def test_main():
       
  1148     test_support.run_unittest(
       
  1149         ReferencesTestCase,
       
  1150         MappingTestCase,
       
  1151         WeakValueDictionaryTestCase,
       
  1152         WeakKeyDictionaryTestCase,
       
  1153         )
       
  1154     test_support.run_doctest(sys.modules[__name__])
       
  1155 
       
  1156 
       
  1157 if __name__ == "__main__":
       
  1158     test_main()