|
1 #! /usr/bin/env python |
|
2 """Test script for the bsddb C module by Roger E. Masse |
|
3 Adapted to unittest format and expanded scope by Raymond Hettinger |
|
4 """ |
|
5 import os, sys |
|
6 import copy |
|
7 import bsddb |
|
8 import dbhash # Just so we know it's imported |
|
9 import unittest |
|
10 from test import test_support |
|
11 |
|
12 class TestBSDDB(unittest.TestCase): |
|
13 openflag = 'c' |
|
14 |
|
15 def setUp(self): |
|
16 self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768) |
|
17 self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='') |
|
18 for k, v in self.d.iteritems(): |
|
19 self.f[k] = v |
|
20 |
|
21 def tearDown(self): |
|
22 self.f.sync() |
|
23 self.f.close() |
|
24 if self.fname is None: |
|
25 return |
|
26 try: |
|
27 os.remove(self.fname) |
|
28 except os.error: |
|
29 pass |
|
30 |
|
31 def test_getitem(self): |
|
32 for k, v in self.d.iteritems(): |
|
33 self.assertEqual(self.f[k], v) |
|
34 |
|
35 def test_len(self): |
|
36 self.assertEqual(len(self.f), len(self.d)) |
|
37 |
|
38 def test_change(self): |
|
39 self.f['r'] = 'discovered' |
|
40 self.assertEqual(self.f['r'], 'discovered') |
|
41 self.assert_('r' in self.f.keys()) |
|
42 self.assert_('discovered' in self.f.values()) |
|
43 |
|
44 def test_close_and_reopen(self): |
|
45 if self.fname is None: |
|
46 # if we're using an in-memory only db, we can't reopen it |
|
47 # so finish here. |
|
48 return |
|
49 self.f.close() |
|
50 self.f = self.openmethod[0](self.fname, 'w') |
|
51 for k, v in self.d.iteritems(): |
|
52 self.assertEqual(self.f[k], v) |
|
53 |
|
54 def assertSetEquals(self, seqn1, seqn2): |
|
55 self.assertEqual(set(seqn1), set(seqn2)) |
|
56 |
|
57 def test_mapping_iteration_methods(self): |
|
58 f = self.f |
|
59 d = self.d |
|
60 self.assertSetEquals(d, f) |
|
61 self.assertSetEquals(d.keys(), f.keys()) |
|
62 self.assertSetEquals(d.values(), f.values()) |
|
63 self.assertSetEquals(d.items(), f.items()) |
|
64 self.assertSetEquals(d.iterkeys(), f.iterkeys()) |
|
65 self.assertSetEquals(d.itervalues(), f.itervalues()) |
|
66 self.assertSetEquals(d.iteritems(), f.iteritems()) |
|
67 |
|
68 def test_iter_while_modifying_values(self): |
|
69 di = iter(self.d) |
|
70 while 1: |
|
71 try: |
|
72 key = di.next() |
|
73 self.d[key] = 'modified '+key |
|
74 except StopIteration: |
|
75 break |
|
76 |
|
77 # it should behave the same as a dict. modifying values |
|
78 # of existing keys should not break iteration. (adding |
|
79 # or removing keys should) |
|
80 loops_left = len(self.f) |
|
81 fi = iter(self.f) |
|
82 while 1: |
|
83 try: |
|
84 key = fi.next() |
|
85 self.f[key] = 'modified '+key |
|
86 loops_left -= 1 |
|
87 except StopIteration: |
|
88 break |
|
89 self.assertEqual(loops_left, 0) |
|
90 |
|
91 self.test_mapping_iteration_methods() |
|
92 |
|
93 def test_iter_abort_on_changed_size(self): |
|
94 def DictIterAbort(): |
|
95 di = iter(self.d) |
|
96 while 1: |
|
97 try: |
|
98 di.next() |
|
99 self.d['newkey'] = 'SPAM' |
|
100 except StopIteration: |
|
101 break |
|
102 self.assertRaises(RuntimeError, DictIterAbort) |
|
103 |
|
104 def DbIterAbort(): |
|
105 fi = iter(self.f) |
|
106 while 1: |
|
107 try: |
|
108 fi.next() |
|
109 self.f['newkey'] = 'SPAM' |
|
110 except StopIteration: |
|
111 break |
|
112 self.assertRaises(RuntimeError, DbIterAbort) |
|
113 |
|
114 def test_iteritems_abort_on_changed_size(self): |
|
115 def DictIteritemsAbort(): |
|
116 di = self.d.iteritems() |
|
117 while 1: |
|
118 try: |
|
119 di.next() |
|
120 self.d['newkey'] = 'SPAM' |
|
121 except StopIteration: |
|
122 break |
|
123 self.assertRaises(RuntimeError, DictIteritemsAbort) |
|
124 |
|
125 def DbIteritemsAbort(): |
|
126 fi = self.f.iteritems() |
|
127 while 1: |
|
128 try: |
|
129 key, value = fi.next() |
|
130 del self.f[key] |
|
131 except StopIteration: |
|
132 break |
|
133 self.assertRaises(RuntimeError, DbIteritemsAbort) |
|
134 |
|
135 def test_iteritems_while_modifying_values(self): |
|
136 di = self.d.iteritems() |
|
137 while 1: |
|
138 try: |
|
139 k, v = di.next() |
|
140 self.d[k] = 'modified '+v |
|
141 except StopIteration: |
|
142 break |
|
143 |
|
144 # it should behave the same as a dict. modifying values |
|
145 # of existing keys should not break iteration. (adding |
|
146 # or removing keys should) |
|
147 loops_left = len(self.f) |
|
148 fi = self.f.iteritems() |
|
149 while 1: |
|
150 try: |
|
151 k, v = fi.next() |
|
152 self.f[k] = 'modified '+v |
|
153 loops_left -= 1 |
|
154 except StopIteration: |
|
155 break |
|
156 self.assertEqual(loops_left, 0) |
|
157 |
|
158 self.test_mapping_iteration_methods() |
|
159 |
|
160 def test_first_next_looping(self): |
|
161 items = [self.f.first()] |
|
162 for i in xrange(1, len(self.f)): |
|
163 items.append(self.f.next()) |
|
164 self.assertSetEquals(items, self.d.items()) |
|
165 |
|
166 def test_previous_last_looping(self): |
|
167 items = [self.f.last()] |
|
168 for i in xrange(1, len(self.f)): |
|
169 items.append(self.f.previous()) |
|
170 self.assertSetEquals(items, self.d.items()) |
|
171 |
|
172 def test_first_while_deleting(self): |
|
173 # Test for bug 1725856 |
|
174 self.assert_(len(self.d) >= 2, "test requires >=2 items") |
|
175 for _ in self.d: |
|
176 key = self.f.first()[0] |
|
177 del self.f[key] |
|
178 self.assertEqual([], self.f.items(), "expected empty db after test") |
|
179 |
|
180 def test_last_while_deleting(self): |
|
181 # Test for bug 1725856's evil twin |
|
182 self.assert_(len(self.d) >= 2, "test requires >=2 items") |
|
183 for _ in self.d: |
|
184 key = self.f.last()[0] |
|
185 del self.f[key] |
|
186 self.assertEqual([], self.f.items(), "expected empty db after test") |
|
187 |
|
188 def test_set_location(self): |
|
189 self.assertEqual(self.f.set_location('e'), ('e', self.d['e'])) |
|
190 |
|
191 def test_contains(self): |
|
192 for k in self.d: |
|
193 self.assert_(k in self.f) |
|
194 self.assert_('not here' not in self.f) |
|
195 |
|
196 def test_has_key(self): |
|
197 for k in self.d: |
|
198 self.assert_(self.f.has_key(k)) |
|
199 self.assert_(not self.f.has_key('not here')) |
|
200 |
|
201 def test_clear(self): |
|
202 self.f.clear() |
|
203 self.assertEqual(len(self.f), 0) |
|
204 |
|
205 def test__no_deadlock_first(self, debug=0): |
|
206 # do this so that testers can see what function we're in in |
|
207 # verbose mode when we deadlock. |
|
208 sys.stdout.flush() |
|
209 |
|
210 # in pybsddb's _DBWithCursor this causes an internal DBCursor |
|
211 # object is created. Other test_ methods in this class could |
|
212 # inadvertently cause the deadlock but an explicit test is needed. |
|
213 if debug: print "A" |
|
214 k,v = self.f.first() |
|
215 if debug: print "B", k |
|
216 self.f[k] = "deadlock. do not pass go. do not collect $200." |
|
217 if debug: print "C" |
|
218 # if the bsddb implementation leaves the DBCursor open during |
|
219 # the database write and locking+threading support is enabled |
|
220 # the cursor's read lock will deadlock the write lock request.. |
|
221 |
|
222 # test the iterator interface |
|
223 if True: |
|
224 if debug: print "D" |
|
225 i = self.f.iteritems() |
|
226 k,v = i.next() |
|
227 if debug: print "E" |
|
228 self.f[k] = "please don't deadlock" |
|
229 if debug: print "F" |
|
230 while 1: |
|
231 try: |
|
232 k,v = i.next() |
|
233 except StopIteration: |
|
234 break |
|
235 if debug: print "F2" |
|
236 |
|
237 i = iter(self.f) |
|
238 if debug: print "G" |
|
239 while i: |
|
240 try: |
|
241 if debug: print "H" |
|
242 k = i.next() |
|
243 if debug: print "I" |
|
244 self.f[k] = "deadlocks-r-us" |
|
245 if debug: print "J" |
|
246 except StopIteration: |
|
247 i = None |
|
248 if debug: print "K" |
|
249 |
|
250 # test the legacy cursor interface mixed with writes |
|
251 self.assert_(self.f.first()[0] in self.d) |
|
252 k = self.f.next()[0] |
|
253 self.assert_(k in self.d) |
|
254 self.f[k] = "be gone with ye deadlocks" |
|
255 self.assert_(self.f[k], "be gone with ye deadlocks") |
|
256 |
|
257 def test_for_cursor_memleak(self): |
|
258 # do the bsddb._DBWithCursor iterator internals leak cursors? |
|
259 nc1 = len(self.f._cursor_refs) |
|
260 # create iterator |
|
261 i = self.f.iteritems() |
|
262 nc2 = len(self.f._cursor_refs) |
|
263 # use the iterator (should run to the first yield, creating the cursor) |
|
264 k, v = i.next() |
|
265 nc3 = len(self.f._cursor_refs) |
|
266 # destroy the iterator; this should cause the weakref callback |
|
267 # to remove the cursor object from self.f._cursor_refs |
|
268 del i |
|
269 nc4 = len(self.f._cursor_refs) |
|
270 |
|
271 self.assertEqual(nc1, nc2) |
|
272 self.assertEqual(nc1, nc4) |
|
273 self.assert_(nc3 == nc1+1) |
|
274 |
|
275 def test_popitem(self): |
|
276 k, v = self.f.popitem() |
|
277 self.assert_(k in self.d) |
|
278 self.assert_(v in self.d.values()) |
|
279 self.assert_(k not in self.f) |
|
280 self.assertEqual(len(self.d)-1, len(self.f)) |
|
281 |
|
282 def test_pop(self): |
|
283 k = 'w' |
|
284 v = self.f.pop(k) |
|
285 self.assertEqual(v, self.d[k]) |
|
286 self.assert_(k not in self.f) |
|
287 self.assert_(v not in self.f.values()) |
|
288 self.assertEqual(len(self.d)-1, len(self.f)) |
|
289 |
|
290 def test_get(self): |
|
291 self.assertEqual(self.f.get('NotHere'), None) |
|
292 self.assertEqual(self.f.get('NotHere', 'Default'), 'Default') |
|
293 self.assertEqual(self.f.get('q', 'Default'), self.d['q']) |
|
294 |
|
295 def test_setdefault(self): |
|
296 self.assertEqual(self.f.setdefault('new', 'dog'), 'dog') |
|
297 self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r']) |
|
298 |
|
299 def test_update(self): |
|
300 new = dict(y='life', u='of', i='brian') |
|
301 self.f.update(new) |
|
302 self.d.update(new) |
|
303 for k, v in self.d.iteritems(): |
|
304 self.assertEqual(self.f[k], v) |
|
305 |
|
306 def test_keyordering(self): |
|
307 if self.openmethod[0] is not bsddb.btopen: |
|
308 return |
|
309 keys = self.d.keys() |
|
310 keys.sort() |
|
311 self.assertEqual(self.f.first()[0], keys[0]) |
|
312 self.assertEqual(self.f.next()[0], keys[1]) |
|
313 self.assertEqual(self.f.last()[0], keys[-1]) |
|
314 self.assertEqual(self.f.previous()[0], keys[-2]) |
|
315 self.assertEqual(list(self.f), keys) |
|
316 |
|
317 class TestBTree(TestBSDDB): |
|
318 fname = test_support.TESTFN |
|
319 openmethod = [bsddb.btopen] |
|
320 |
|
321 class TestBTree_InMemory(TestBSDDB): |
|
322 fname = None |
|
323 openmethod = [bsddb.btopen] |
|
324 |
|
325 class TestBTree_InMemory_Truncate(TestBSDDB): |
|
326 fname = None |
|
327 openflag = 'n' |
|
328 openmethod = [bsddb.btopen] |
|
329 |
|
330 class TestHashTable(TestBSDDB): |
|
331 fname = test_support.TESTFN |
|
332 openmethod = [bsddb.hashopen] |
|
333 |
|
334 class TestHashTable_InMemory(TestBSDDB): |
|
335 fname = None |
|
336 openmethod = [bsddb.hashopen] |
|
337 |
|
338 ## # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85 |
|
339 ## # appears broken... at least on |
|
340 ## # Solaris Intel - rmasse 1/97 |
|
341 |
|
342 def test_main(verbose=None): |
|
343 test_support.run_unittest( |
|
344 TestBTree, |
|
345 TestHashTable, |
|
346 TestBTree_InMemory, |
|
347 TestHashTable_InMemory, |
|
348 TestBTree_InMemory_Truncate, |
|
349 ) |
|
350 |
|
351 if __name__ == "__main__": |
|
352 test_main(verbose=True) |