symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_bsddb.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 #! /usr/bin/env python
       
     2 """Test script for the bsddb C module by Roger E. Masse
       
     3    Adapted to unittest format and expanded scope by Raymond Hettinger
       
     4 """
       
     5 import os, sys
       
     6 import copy
       
     7 import bsddb
       
     8 import dbhash # Just so we know it's imported
       
     9 import unittest
       
    10 from test import test_support
       
    11 
       
    12 class TestBSDDB(unittest.TestCase):
       
    13     openflag = 'c'
       
    14 
       
    15     def setUp(self):
       
    16         self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
       
    17         self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
       
    18         for k, v in self.d.iteritems():
       
    19             self.f[k] = v
       
    20 
       
    21     def tearDown(self):
       
    22         self.f.sync()
       
    23         self.f.close()
       
    24         if self.fname is None:
       
    25             return
       
    26         try:
       
    27             os.remove(self.fname)
       
    28         except os.error:
       
    29             pass
       
    30 
       
    31     def test_getitem(self):
       
    32         for k, v in self.d.iteritems():
       
    33             self.assertEqual(self.f[k], v)
       
    34 
       
    35     def test_len(self):
       
    36         self.assertEqual(len(self.f), len(self.d))
       
    37 
       
    38     def test_change(self):
       
    39         self.f['r'] = 'discovered'
       
    40         self.assertEqual(self.f['r'], 'discovered')
       
    41         self.assert_('r' in self.f.keys())
       
    42         self.assert_('discovered' in self.f.values())
       
    43 
       
    44     def test_close_and_reopen(self):
       
    45         if self.fname is None:
       
    46             # if we're using an in-memory only db, we can't reopen it
       
    47             # so finish here.
       
    48             return
       
    49         self.f.close()
       
    50         self.f = self.openmethod[0](self.fname, 'w')
       
    51         for k, v in self.d.iteritems():
       
    52             self.assertEqual(self.f[k], v)
       
    53 
       
    54     def assertSetEquals(self, seqn1, seqn2):
       
    55         self.assertEqual(set(seqn1), set(seqn2))
       
    56 
       
    57     def test_mapping_iteration_methods(self):
       
    58         f = self.f
       
    59         d = self.d
       
    60         self.assertSetEquals(d, f)
       
    61         self.assertSetEquals(d.keys(), f.keys())
       
    62         self.assertSetEquals(d.values(), f.values())
       
    63         self.assertSetEquals(d.items(), f.items())
       
    64         self.assertSetEquals(d.iterkeys(), f.iterkeys())
       
    65         self.assertSetEquals(d.itervalues(), f.itervalues())
       
    66         self.assertSetEquals(d.iteritems(), f.iteritems())
       
    67 
       
    68     def test_iter_while_modifying_values(self):
       
    69         di = iter(self.d)
       
    70         while 1:
       
    71             try:
       
    72                 key = di.next()
       
    73                 self.d[key] = 'modified '+key
       
    74             except StopIteration:
       
    75                 break
       
    76 
       
    77         # it should behave the same as a dict.  modifying values
       
    78         # of existing keys should not break iteration.  (adding
       
    79         # or removing keys should)
       
    80         loops_left = len(self.f)
       
    81         fi = iter(self.f)
       
    82         while 1:
       
    83             try:
       
    84                 key = fi.next()
       
    85                 self.f[key] = 'modified '+key
       
    86                 loops_left -= 1
       
    87             except StopIteration:
       
    88                 break
       
    89         self.assertEqual(loops_left, 0)
       
    90 
       
    91         self.test_mapping_iteration_methods()
       
    92 
       
    93     def test_iter_abort_on_changed_size(self):
       
    94         def DictIterAbort():
       
    95             di = iter(self.d)
       
    96             while 1:
       
    97                 try:
       
    98                     di.next()
       
    99                     self.d['newkey'] = 'SPAM'
       
   100                 except StopIteration:
       
   101                     break
       
   102         self.assertRaises(RuntimeError, DictIterAbort)
       
   103 
       
   104         def DbIterAbort():
       
   105             fi = iter(self.f)
       
   106             while 1:
       
   107                 try:
       
   108                     fi.next()
       
   109                     self.f['newkey'] = 'SPAM'
       
   110                 except StopIteration:
       
   111                     break
       
   112         self.assertRaises(RuntimeError, DbIterAbort)
       
   113 
       
   114     def test_iteritems_abort_on_changed_size(self):
       
   115         def DictIteritemsAbort():
       
   116             di = self.d.iteritems()
       
   117             while 1:
       
   118                 try:
       
   119                     di.next()
       
   120                     self.d['newkey'] = 'SPAM'
       
   121                 except StopIteration:
       
   122                     break
       
   123         self.assertRaises(RuntimeError, DictIteritemsAbort)
       
   124 
       
   125         def DbIteritemsAbort():
       
   126             fi = self.f.iteritems()
       
   127             while 1:
       
   128                 try:
       
   129                     key, value = fi.next()
       
   130                     del self.f[key]
       
   131                 except StopIteration:
       
   132                     break
       
   133         self.assertRaises(RuntimeError, DbIteritemsAbort)
       
   134 
       
   135     def test_iteritems_while_modifying_values(self):
       
   136         di = self.d.iteritems()
       
   137         while 1:
       
   138             try:
       
   139                 k, v = di.next()
       
   140                 self.d[k] = 'modified '+v
       
   141             except StopIteration:
       
   142                 break
       
   143 
       
   144         # it should behave the same as a dict.  modifying values
       
   145         # of existing keys should not break iteration.  (adding
       
   146         # or removing keys should)
       
   147         loops_left = len(self.f)
       
   148         fi = self.f.iteritems()
       
   149         while 1:
       
   150             try:
       
   151                 k, v = fi.next()
       
   152                 self.f[k] = 'modified '+v
       
   153                 loops_left -= 1
       
   154             except StopIteration:
       
   155                 break
       
   156         self.assertEqual(loops_left, 0)
       
   157 
       
   158         self.test_mapping_iteration_methods()
       
   159 
       
   160     def test_first_next_looping(self):
       
   161         items = [self.f.first()]
       
   162         for i in xrange(1, len(self.f)):
       
   163             items.append(self.f.next())
       
   164         self.assertSetEquals(items, self.d.items())
       
   165 
       
   166     def test_previous_last_looping(self):
       
   167         items = [self.f.last()]
       
   168         for i in xrange(1, len(self.f)):
       
   169             items.append(self.f.previous())
       
   170         self.assertSetEquals(items, self.d.items())
       
   171 
       
   172     def test_first_while_deleting(self):
       
   173         # Test for bug 1725856
       
   174         self.assert_(len(self.d) >= 2, "test requires >=2 items")
       
   175         for _ in self.d:
       
   176             key = self.f.first()[0]
       
   177             del self.f[key]
       
   178         self.assertEqual([], self.f.items(), "expected empty db after test")
       
   179 
       
   180     def test_last_while_deleting(self):
       
   181         # Test for bug 1725856's evil twin
       
   182         self.assert_(len(self.d) >= 2, "test requires >=2 items")
       
   183         for _ in self.d:
       
   184             key = self.f.last()[0]
       
   185             del self.f[key]
       
   186         self.assertEqual([], self.f.items(), "expected empty db after test")
       
   187 
       
   188     def test_set_location(self):
       
   189         self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
       
   190 
       
   191     def test_contains(self):
       
   192         for k in self.d:
       
   193             self.assert_(k in self.f)
       
   194         self.assert_('not here' not in self.f)
       
   195 
       
   196     def test_has_key(self):
       
   197         for k in self.d:
       
   198             self.assert_(self.f.has_key(k))
       
   199         self.assert_(not self.f.has_key('not here'))
       
   200 
       
   201     def test_clear(self):
       
   202         self.f.clear()
       
   203         self.assertEqual(len(self.f), 0)
       
   204 
       
   205     def test__no_deadlock_first(self, debug=0):
       
   206         # do this so that testers can see what function we're in in
       
   207         # verbose mode when we deadlock.
       
   208         sys.stdout.flush()
       
   209 
       
   210         # in pybsddb's _DBWithCursor this causes an internal DBCursor
       
   211         # object is created.  Other test_ methods in this class could
       
   212         # inadvertently cause the deadlock but an explicit test is needed.
       
   213         if debug: print "A"
       
   214         k,v = self.f.first()
       
   215         if debug: print "B", k
       
   216         self.f[k] = "deadlock.  do not pass go.  do not collect $200."
       
   217         if debug: print "C"
       
   218         # if the bsddb implementation leaves the DBCursor open during
       
   219         # the database write and locking+threading support is enabled
       
   220         # the cursor's read lock will deadlock the write lock request..
       
   221 
       
   222         # test the iterator interface
       
   223         if True:
       
   224             if debug: print "D"
       
   225             i = self.f.iteritems()
       
   226             k,v = i.next()
       
   227             if debug: print "E"
       
   228             self.f[k] = "please don't deadlock"
       
   229             if debug: print "F"
       
   230             while 1:
       
   231                 try:
       
   232                     k,v = i.next()
       
   233                 except StopIteration:
       
   234                     break
       
   235             if debug: print "F2"
       
   236 
       
   237             i = iter(self.f)
       
   238             if debug: print "G"
       
   239             while i:
       
   240                 try:
       
   241                     if debug: print "H"
       
   242                     k = i.next()
       
   243                     if debug: print "I"
       
   244                     self.f[k] = "deadlocks-r-us"
       
   245                     if debug: print "J"
       
   246                 except StopIteration:
       
   247                     i = None
       
   248             if debug: print "K"
       
   249 
       
   250         # test the legacy cursor interface mixed with writes
       
   251         self.assert_(self.f.first()[0] in self.d)
       
   252         k = self.f.next()[0]
       
   253         self.assert_(k in self.d)
       
   254         self.f[k] = "be gone with ye deadlocks"
       
   255         self.assert_(self.f[k], "be gone with ye deadlocks")
       
   256 
       
   257     def test_for_cursor_memleak(self):
       
   258         # do the bsddb._DBWithCursor iterator internals leak cursors?
       
   259         nc1 = len(self.f._cursor_refs)
       
   260         # create iterator
       
   261         i = self.f.iteritems()
       
   262         nc2 = len(self.f._cursor_refs)
       
   263         # use the iterator (should run to the first yield, creating the cursor)
       
   264         k, v = i.next()
       
   265         nc3 = len(self.f._cursor_refs)
       
   266         # destroy the iterator; this should cause the weakref callback
       
   267         # to remove the cursor object from self.f._cursor_refs
       
   268         del i
       
   269         nc4 = len(self.f._cursor_refs)
       
   270 
       
   271         self.assertEqual(nc1, nc2)
       
   272         self.assertEqual(nc1, nc4)
       
   273         self.assert_(nc3 == nc1+1)
       
   274 
       
   275     def test_popitem(self):
       
   276         k, v = self.f.popitem()
       
   277         self.assert_(k in self.d)
       
   278         self.assert_(v in self.d.values())
       
   279         self.assert_(k not in self.f)
       
   280         self.assertEqual(len(self.d)-1, len(self.f))
       
   281 
       
   282     def test_pop(self):
       
   283         k = 'w'
       
   284         v = self.f.pop(k)
       
   285         self.assertEqual(v, self.d[k])
       
   286         self.assert_(k not in self.f)
       
   287         self.assert_(v not in self.f.values())
       
   288         self.assertEqual(len(self.d)-1, len(self.f))
       
   289 
       
   290     def test_get(self):
       
   291         self.assertEqual(self.f.get('NotHere'), None)
       
   292         self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
       
   293         self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
       
   294 
       
   295     def test_setdefault(self):
       
   296         self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
       
   297         self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
       
   298 
       
   299     def test_update(self):
       
   300         new = dict(y='life', u='of', i='brian')
       
   301         self.f.update(new)
       
   302         self.d.update(new)
       
   303         for k, v in self.d.iteritems():
       
   304             self.assertEqual(self.f[k], v)
       
   305 
       
   306     def test_keyordering(self):
       
   307         if self.openmethod[0] is not bsddb.btopen:
       
   308             return
       
   309         keys = self.d.keys()
       
   310         keys.sort()
       
   311         self.assertEqual(self.f.first()[0], keys[0])
       
   312         self.assertEqual(self.f.next()[0], keys[1])
       
   313         self.assertEqual(self.f.last()[0], keys[-1])
       
   314         self.assertEqual(self.f.previous()[0], keys[-2])
       
   315         self.assertEqual(list(self.f), keys)
       
   316 
       
   317 class TestBTree(TestBSDDB):
       
   318     fname = test_support.TESTFN
       
   319     openmethod = [bsddb.btopen]
       
   320 
       
   321 class TestBTree_InMemory(TestBSDDB):
       
   322     fname = None
       
   323     openmethod = [bsddb.btopen]
       
   324 
       
   325 class TestBTree_InMemory_Truncate(TestBSDDB):
       
   326     fname = None
       
   327     openflag = 'n'
       
   328     openmethod = [bsddb.btopen]
       
   329 
       
   330 class TestHashTable(TestBSDDB):
       
   331     fname = test_support.TESTFN
       
   332     openmethod = [bsddb.hashopen]
       
   333 
       
   334 class TestHashTable_InMemory(TestBSDDB):
       
   335     fname = None
       
   336     openmethod = [bsddb.hashopen]
       
   337 
       
   338 ##         # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
       
   339 ##         #                                   appears broken... at least on
       
   340 ##         #                                   Solaris Intel - rmasse 1/97
       
   341 
       
   342 def test_main(verbose=None):
       
   343     test_support.run_unittest(
       
   344         TestBTree,
       
   345         TestHashTable,
       
   346         TestBTree_InMemory,
       
   347         TestHashTable_InMemory,
       
   348         TestBTree_InMemory_Truncate,
       
   349     )
       
   350 
       
   351 if __name__ == "__main__":
       
   352     test_main(verbose=True)