python-2.5.2/win32/Lib/test/test_bsddb.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 #! /usr/bin/env python
       
     2 """Test script for the bsddb C module by Roger E. Masse
       
     3    Adapted to unittest format and expanded scope by Raymond Hettinger
       
     4 """
       
     5 import os, sys
       
     6 import copy
       
     7 import bsddb
       
     8 import dbhash # Just so we know it's imported
       
     9 import unittest
       
    10 from test import test_support
       
    11 
       
    12 class TestBSDDB(unittest.TestCase):
       
    13     openflag = 'c'
       
    14 
       
    15     def setUp(self):
       
    16         self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
       
    17         self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
       
    18         for k, v in self.d.iteritems():
       
    19             self.f[k] = v
       
    20 
       
    21     def tearDown(self):
       
    22         self.f.sync()
       
    23         self.f.close()
       
    24         if self.fname is None:
       
    25             return
       
    26         try:
       
    27             os.remove(self.fname)
       
    28         except os.error:
       
    29             pass
       
    30 
       
    31     def test_getitem(self):
       
    32         for k, v in self.d.iteritems():
       
    33             self.assertEqual(self.f[k], v)
       
    34 
       
    35     def test_len(self):
       
    36         self.assertEqual(len(self.f), len(self.d))
       
    37 
       
    38     def test_change(self):
       
    39         self.f['r'] = 'discovered'
       
    40         self.assertEqual(self.f['r'], 'discovered')
       
    41         self.assert_('r' in self.f.keys())
       
    42         self.assert_('discovered' in self.f.values())
       
    43 
       
    44     def test_close_and_reopen(self):
       
    45         if self.fname is None:
       
    46             # if we're using an in-memory only db, we can't reopen it
       
    47             # so finish here.
       
    48             return
       
    49         self.f.close()
       
    50         self.f = self.openmethod[0](self.fname, 'w')
       
    51         for k, v in self.d.iteritems():
       
    52             self.assertEqual(self.f[k], v)
       
    53 
       
    54     def assertSetEquals(self, seqn1, seqn2):
       
    55         self.assertEqual(set(seqn1), set(seqn2))
       
    56 
       
    57     def test_mapping_iteration_methods(self):
       
    58         f = self.f
       
    59         d = self.d
       
    60         self.assertSetEquals(d, f)
       
    61         self.assertSetEquals(d.keys(), f.keys())
       
    62         self.assertSetEquals(d.values(), f.values())
       
    63         self.assertSetEquals(d.items(), f.items())
       
    64         self.assertSetEquals(d.iterkeys(), f.iterkeys())
       
    65         self.assertSetEquals(d.itervalues(), f.itervalues())
       
    66         self.assertSetEquals(d.iteritems(), f.iteritems())
       
    67 
       
    68     def test_iter_while_modifying_values(self):
       
    69         if not hasattr(self.f, '__iter__'):
       
    70             return
       
    71 
       
    72         di = iter(self.d)
       
    73         while 1:
       
    74             try:
       
    75                 key = di.next()
       
    76                 self.d[key] = 'modified '+key
       
    77             except StopIteration:
       
    78                 break
       
    79 
       
    80         # it should behave the same as a dict.  modifying values
       
    81         # of existing keys should not break iteration.  (adding
       
    82         # or removing keys should)
       
    83         fi = iter(self.f)
       
    84         while 1:
       
    85             try:
       
    86                 key = fi.next()
       
    87                 self.f[key] = 'modified '+key
       
    88             except StopIteration:
       
    89                 break
       
    90 
       
    91         self.test_mapping_iteration_methods()
       
    92 
       
    93     def test_iteritems_while_modifying_values(self):
       
    94         if not hasattr(self.f, 'iteritems'):
       
    95             return
       
    96 
       
    97         di = self.d.iteritems()
       
    98         while 1:
       
    99             try:
       
   100                 k, v = di.next()
       
   101                 self.d[k] = 'modified '+v
       
   102             except StopIteration:
       
   103                 break
       
   104 
       
   105         # it should behave the same as a dict.  modifying values
       
   106         # of existing keys should not break iteration.  (adding
       
   107         # or removing keys should)
       
   108         fi = self.f.iteritems()
       
   109         while 1:
       
   110             try:
       
   111                 k, v = fi.next()
       
   112                 self.f[k] = 'modified '+v
       
   113             except StopIteration:
       
   114                 break
       
   115 
       
   116         self.test_mapping_iteration_methods()
       
   117 
       
   118     def test_first_next_looping(self):
       
   119         items = [self.f.first()]
       
   120         for i in xrange(1, len(self.f)):
       
   121             items.append(self.f.next())
       
   122         self.assertSetEquals(items, self.d.items())
       
   123 
       
   124     def test_previous_last_looping(self):
       
   125         items = [self.f.last()]
       
   126         for i in xrange(1, len(self.f)):
       
   127             items.append(self.f.previous())
       
   128         self.assertSetEquals(items, self.d.items())
       
   129 
       
   130     def test_first_while_deleting(self):
       
   131         # Test for bug 1725856
       
   132         self.assert_(len(self.d) >= 2, "test requires >=2 items")
       
   133         for _ in self.d:
       
   134             key = self.f.first()[0]
       
   135             del self.f[key]
       
   136         self.assertEqual([], self.f.items(), "expected empty db after test")
       
   137 
       
   138     def test_last_while_deleting(self):
       
   139         # Test for bug 1725856's evil twin
       
   140         self.assert_(len(self.d) >= 2, "test requires >=2 items")
       
   141         for _ in self.d:
       
   142             key = self.f.last()[0]
       
   143             del self.f[key]
       
   144         self.assertEqual([], self.f.items(), "expected empty db after test")
       
   145 
       
   146     def test_set_location(self):
       
   147         self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
       
   148 
       
   149     def test_contains(self):
       
   150         for k in self.d:
       
   151             self.assert_(k in self.f)
       
   152         self.assert_('not here' not in self.f)
       
   153 
       
   154     def test_has_key(self):
       
   155         for k in self.d:
       
   156             self.assert_(self.f.has_key(k))
       
   157         self.assert_(not self.f.has_key('not here'))
       
   158 
       
   159     def test_clear(self):
       
   160         self.f.clear()
       
   161         self.assertEqual(len(self.f), 0)
       
   162 
       
   163     def test__no_deadlock_first(self, debug=0):
       
   164         # do this so that testers can see what function we're in in
       
   165         # verbose mode when we deadlock.
       
   166         sys.stdout.flush()
       
   167 
       
   168         # in pybsddb's _DBWithCursor this causes an internal DBCursor
       
   169         # object is created.  Other test_ methods in this class could
       
   170         # inadvertently cause the deadlock but an explicit test is needed.
       
   171         if debug: print "A"
       
   172         k,v = self.f.first()
       
   173         if debug: print "B", k
       
   174         self.f[k] = "deadlock.  do not pass go.  do not collect $200."
       
   175         if debug: print "C"
       
   176         # if the bsddb implementation leaves the DBCursor open during
       
   177         # the database write and locking+threading support is enabled
       
   178         # the cursor's read lock will deadlock the write lock request..
       
   179 
       
   180         # test the iterator interface (if present)
       
   181         if hasattr(self.f, 'iteritems'):
       
   182             if debug: print "D"
       
   183             i = self.f.iteritems()
       
   184             k,v = i.next()
       
   185             if debug: print "E"
       
   186             self.f[k] = "please don't deadlock"
       
   187             if debug: print "F"
       
   188             while 1:
       
   189                 try:
       
   190                     k,v = i.next()
       
   191                 except StopIteration:
       
   192                     break
       
   193             if debug: print "F2"
       
   194 
       
   195             i = iter(self.f)
       
   196             if debug: print "G"
       
   197             while i:
       
   198                 try:
       
   199                     if debug: print "H"
       
   200                     k = i.next()
       
   201                     if debug: print "I"
       
   202                     self.f[k] = "deadlocks-r-us"
       
   203                     if debug: print "J"
       
   204                 except StopIteration:
       
   205                     i = None
       
   206             if debug: print "K"
       
   207 
       
   208         # test the legacy cursor interface mixed with writes
       
   209         self.assert_(self.f.first()[0] in self.d)
       
   210         k = self.f.next()[0]
       
   211         self.assert_(k in self.d)
       
   212         self.f[k] = "be gone with ye deadlocks"
       
   213         self.assert_(self.f[k], "be gone with ye deadlocks")
       
   214 
       
   215     def test_for_cursor_memleak(self):
       
   216         if not hasattr(self.f, 'iteritems'):
       
   217             return
       
   218 
       
   219         # do the bsddb._DBWithCursor _iter_mixin internals leak cursors?
       
   220         nc1 = len(self.f._cursor_refs)
       
   221         # create iterator
       
   222         i = self.f.iteritems()
       
   223         nc2 = len(self.f._cursor_refs)
       
   224         # use the iterator (should run to the first yeild, creating the cursor)
       
   225         k, v = i.next()
       
   226         nc3 = len(self.f._cursor_refs)
       
   227         # destroy the iterator; this should cause the weakref callback
       
   228         # to remove the cursor object from self.f._cursor_refs
       
   229         del i
       
   230         nc4 = len(self.f._cursor_refs)
       
   231 
       
   232         self.assertEqual(nc1, nc2)
       
   233         self.assertEqual(nc1, nc4)
       
   234         self.assert_(nc3 == nc1+1)
       
   235 
       
   236     def test_popitem(self):
       
   237         k, v = self.f.popitem()
       
   238         self.assert_(k in self.d)
       
   239         self.assert_(v in self.d.values())
       
   240         self.assert_(k not in self.f)
       
   241         self.assertEqual(len(self.d)-1, len(self.f))
       
   242 
       
   243     def test_pop(self):
       
   244         k = 'w'
       
   245         v = self.f.pop(k)
       
   246         self.assertEqual(v, self.d[k])
       
   247         self.assert_(k not in self.f)
       
   248         self.assert_(v not in self.f.values())
       
   249         self.assertEqual(len(self.d)-1, len(self.f))
       
   250 
       
   251     def test_get(self):
       
   252         self.assertEqual(self.f.get('NotHere'), None)
       
   253         self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
       
   254         self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
       
   255 
       
   256     def test_setdefault(self):
       
   257         self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
       
   258         self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
       
   259 
       
   260     def test_update(self):
       
   261         new = dict(y='life', u='of', i='brian')
       
   262         self.f.update(new)
       
   263         self.d.update(new)
       
   264         for k, v in self.d.iteritems():
       
   265             self.assertEqual(self.f[k], v)
       
   266 
       
   267     def test_keyordering(self):
       
   268         if self.openmethod[0] is not bsddb.btopen:
       
   269             return
       
   270         keys = self.d.keys()
       
   271         keys.sort()
       
   272         self.assertEqual(self.f.first()[0], keys[0])
       
   273         self.assertEqual(self.f.next()[0], keys[1])
       
   274         self.assertEqual(self.f.last()[0], keys[-1])
       
   275         self.assertEqual(self.f.previous()[0], keys[-2])
       
   276         self.assertEqual(list(self.f), keys)
       
   277 
       
   278 class TestBTree(TestBSDDB):
       
   279     fname = test_support.TESTFN
       
   280     openmethod = [bsddb.btopen]
       
   281 
       
   282 class TestBTree_InMemory(TestBSDDB):
       
   283     fname = None
       
   284     openmethod = [bsddb.btopen]
       
   285 
       
   286 class TestBTree_InMemory_Truncate(TestBSDDB):
       
   287     fname = None
       
   288     openflag = 'n'
       
   289     openmethod = [bsddb.btopen]
       
   290 
       
   291 class TestHashTable(TestBSDDB):
       
   292     fname = test_support.TESTFN
       
   293     openmethod = [bsddb.hashopen]
       
   294 
       
   295 class TestHashTable_InMemory(TestBSDDB):
       
   296     fname = None
       
   297     openmethod = [bsddb.hashopen]
       
   298 
       
   299 ##         # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
       
   300 ##         #                                   appears broken... at least on
       
   301 ##         #                                   Solaris Intel - rmasse 1/97
       
   302 
       
   303 def test_main(verbose=None):
       
   304     test_support.run_unittest(
       
   305         TestBTree,
       
   306         TestHashTable,
       
   307         TestBTree_InMemory,
       
   308         TestHashTable_InMemory,
       
   309         TestBTree_InMemory_Truncate,
       
   310     )
       
   311 
       
   312 if __name__ == "__main__":
       
   313     test_main(verbose=True)