symbian-qemu-0.9.1-12/python-2.6.1/Lib/bsddb/test/test_all.py
changeset 1 2fb8b9db1c86
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/symbian-qemu-0.9.1-12/python-2.6.1/Lib/bsddb/test/test_all.py	Fri Jul 31 15:01:17 2009 +0100
@@ -0,0 +1,525 @@
+"""Run all test cases.
+"""
+
+import sys
+import os
+import unittest
+try:
+    # For Pythons w/distutils pybsddb
+    import bsddb3 as bsddb
+except ImportError:
+    # For Python 2.3
+    import bsddb
+
+
+if sys.version_info[0] >= 3 :
+    charset = "iso8859-1"  # Full 8 bit
+
+    class cursor_py3k(object) :
+        def __init__(self, db, *args, **kwargs) :
+            self._dbcursor = db.cursor(*args, **kwargs)
+
+        def __getattr__(self, v) :
+            return getattr(self._dbcursor, v)
+
+        def _fix(self, v) :
+            if v == None : return None
+            key, value = v
+            if isinstance(key, bytes) :
+                key = key.decode(charset)
+            return (key, value.decode(charset))
+
+        def __next__(self) :
+            v = getattr(self._dbcursor, "next")()
+            return self._fix(v)
+
+        next = __next__
+
+        def previous(self) :
+            v = self._dbcursor.previous()
+            return self._fix(v)
+
+        def last(self) :
+            v = self._dbcursor.last()
+            return self._fix(v)
+
+        def set(self, k) :
+            if isinstance(k, str) :
+                k = bytes(k, charset)
+            v = self._dbcursor.set(k)
+            return self._fix(v)
+
+        def set_recno(self, num) :
+            v = self._dbcursor.set_recno(num)
+            return self._fix(v)
+
+        def set_range(self, k, dlen=-1, doff=-1) :
+            if isinstance(k, str) :
+                k = bytes(k, charset)
+            v = self._dbcursor.set_range(k, dlen=dlen, doff=doff)
+            return self._fix(v)
+
+        def dup(self, flags=0) :
+            cursor = self._dbcursor.dup(flags)
+            return dup_cursor_py3k(cursor)
+
+        def next_dup(self) :
+            v = self._dbcursor.next_dup()
+            return self._fix(v)
+
+        def next_nodup(self) :
+            v = self._dbcursor.next_nodup()
+            return self._fix(v)
+
+        def put(self, key, value, flags=0, dlen=-1, doff=-1) :
+            if isinstance(key, str) :
+                key = bytes(key, charset)
+            if isinstance(value, str) :
+                value = bytes(value, charset)
+            return self._dbcursor.put(key, value, flags=flags, dlen=dlen,
+                    doff=doff)
+
+        def current(self, flags=0, dlen=-1, doff=-1) :
+            v = self._dbcursor.current(flags=flags, dlen=dlen, doff=doff)
+            return self._fix(v)
+
+        def first(self) :
+            v = self._dbcursor.first()
+            return self._fix(v)
+
+        def pget(self, key=None, data=None, flags=0) :
+            # Incorrect because key can be a bare number,
+            # but enough to pass testsuite
+            if isinstance(key, int) and (data==None) and (flags==0) :
+                flags = key
+                key = None
+            if isinstance(key, str) :
+                key = bytes(key, charset)
+            if isinstance(data, int) and (flags==0) :
+                flags = data
+                data = None
+            if isinstance(data, str) :
+                data = bytes(data, charset)
+            v=self._dbcursor.pget(key=key, data=data, flags=flags)
+            if v != None :
+                v1, v2, v3 = v
+                if isinstance(v1, bytes) :
+                    v1 = v1.decode(charset)
+                if isinstance(v2, bytes) :
+                    v2 = v2.decode(charset)
+
+                v = (v1, v2, v3.decode(charset))
+
+            return v
+
+        def join_item(self) :
+            v = self._dbcursor.join_item()
+            if v != None :
+                v = v.decode(charset)
+            return v
+
+        def get(self, *args, **kwargs) :
+            l = len(args)
+            if l == 2 :
+                k, f = args
+                if isinstance(k, str) :
+                    k = bytes(k, "iso8859-1")
+                args = (k, f)
+            elif l == 3 :
+                k, d, f = args
+                if isinstance(k, str) :
+                    k = bytes(k, charset)
+                if isinstance(d, str) :
+                    d = bytes(d, charset)
+                args =(k, d, f)
+
+            v = self._dbcursor.get(*args, **kwargs)
+            if v != None :
+                k, v = v
+                if isinstance(k, bytes) :
+                    k = k.decode(charset)
+                v = (k, v.decode(charset))
+            return v
+
+        def get_both(self, key, value) :
+            if isinstance(key, str) :
+                key = bytes(key, charset)
+            if isinstance(value, str) :
+                value = bytes(value, charset)
+            v=self._dbcursor.get_both(key, value)
+            return self._fix(v)
+
+    class dup_cursor_py3k(cursor_py3k) :
+        def __init__(self, dbcursor) :
+            self._dbcursor = dbcursor
+
+    class DB_py3k(object) :
+        def __init__(self, *args, **kwargs) :
+            args2=[]
+            for i in args :
+                if isinstance(i, DBEnv_py3k) :
+                    i = i._dbenv
+                args2.append(i)
+            args = tuple(args2)
+            for k, v in kwargs.items() :
+                if isinstance(v, DBEnv_py3k) :
+                    kwargs[k] = v._dbenv
+
+            self._db = bsddb._db.DB_orig(*args, **kwargs)
+
+        def __contains__(self, k) :
+            if isinstance(k, str) :
+                k = bytes(k, charset)
+            return getattr(self._db, "has_key")(k)
+
+        def __getitem__(self, k) :
+            if isinstance(k, str) :
+                k = bytes(k, charset)
+            v = self._db[k]
+            if v != None :
+                v = v.decode(charset)
+            return v
+
+        def __setitem__(self, k, v) :
+            if isinstance(k, str) :
+                k = bytes(k, charset)
+            if isinstance(v, str) :
+                v = bytes(v, charset)
+            self._db[k] = v
+
+        def __delitem__(self, k) :
+            if isinstance(k, str) :
+                k = bytes(k, charset)
+            del self._db[k]
+
+        def __getattr__(self, v) :
+            return getattr(self._db, v)
+
+        def __len__(self) :
+            return len(self._db)
+
+        def has_key(self, k, txn=None) :
+            if isinstance(k, str) :
+                k = bytes(k, charset)
+            return self._db.has_key(k, txn=txn)
+
+        def put(self, key, value, txn=None, flags=0, dlen=-1, doff=-1) :
+            if isinstance(key, str) :
+                key = bytes(key, charset)
+            if isinstance(value, str) :
+                value = bytes(value, charset)
+            return self._db.put(key, value, flags=flags, txn=txn, dlen=dlen,
+                    doff=doff)
+
+        def append(self, value, txn=None) :
+            if isinstance(value, str) :
+                value = bytes(value, charset)
+            return self._db.append(value, txn=txn)
+
+        def get_size(self, key) :
+            if isinstance(key, str) :
+                key = bytes(key, charset)
+            return self._db.get_size(key)
+
+        def get(self, key, default="MagicCookie", txn=None, flags=0, dlen=-1, doff=-1) :
+            if isinstance(key, str) :
+                key = bytes(key, charset)
+            if default != "MagicCookie" :  # Magic for 'test_get_none.py'
+                v=self._db.get(key, default=default, txn=txn, flags=flags,
+                        dlen=dlen, doff=doff)
+            else :
+                v=self._db.get(key, txn=txn, flags=flags,
+                        dlen=dlen, doff=doff)
+            if (v != None) and isinstance(v, bytes) :
+                v = v.decode(charset)
+            return v
+
+        def pget(self, key, txn=None) :
+            if isinstance(key, str) :
+                key = bytes(key, charset)
+            v=self._db.pget(key, txn=txn)
+            if v != None :
+                v1, v2 = v
+                if isinstance(v1, bytes) :
+                    v1 = v1.decode(charset)
+
+                v = (v1, v2.decode(charset))
+            return v
+
+        def get_both(self, key, value, txn=None, flags=0) :
+            if isinstance(key, str) :
+                key = bytes(key, charset)
+            if isinstance(value, str) :
+                value = bytes(value, charset)
+            v=self._db.get_both(key, value, txn=txn, flags=flags)
+            if v != None :
+                v = v.decode(charset)
+            return v
+
+        def delete(self, key, txn=None) :
+            if isinstance(key, str) :
+                key = bytes(key, charset)
+            return self._db.delete(key, txn=txn)
+
+        def keys(self) :
+            k = self._db.keys()
+            if len(k) and isinstance(k[0], bytes) :
+                return [i.decode(charset) for i in self._db.keys()]
+            else :
+                return k
+
+        def items(self) :
+            data = self._db.items()
+            if not len(data) : return data
+            data2 = []
+            for k, v in data :
+                if isinstance(k, bytes) :
+                    k = k.decode(charset)
+                data2.append((k, v.decode(charset)))
+            return data2
+
+        def associate(self, secondarydb, callback, flags=0, txn=None) :
+            class associate_callback(object) :
+                def __init__(self, callback) :
+                    self._callback = callback
+
+                def callback(self, key, data) :
+                    if isinstance(key, str) :
+                        key = key.decode(charset)
+                    data = data.decode(charset)
+                    key = self._callback(key, data)
+                    if (key != bsddb._db.DB_DONOTINDEX) and isinstance(key,
+                            str) :
+                        key = bytes(key, charset)
+                    return key
+
+            return self._db.associate(secondarydb._db,
+                    associate_callback(callback).callback, flags=flags, txn=txn)
+
+        def cursor(self, txn=None, flags=0) :
+            return cursor_py3k(self._db, txn=txn, flags=flags)
+
+        def join(self, cursor_list) :
+            cursor_list = [i._dbcursor for i in cursor_list]
+            return dup_cursor_py3k(self._db.join(cursor_list))
+
+    class DBEnv_py3k(object) :
+        def __init__(self, *args, **kwargs) :
+            self._dbenv = bsddb._db.DBEnv_orig(*args, **kwargs)
+
+        def __getattr__(self, v) :
+            return getattr(self._dbenv, v)
+
+    class DBSequence_py3k(object) :
+        def __init__(self, db, *args, **kwargs) :
+            self._db=db
+            self._dbsequence = bsddb._db.DBSequence_orig(db._db, *args, **kwargs)
+
+        def __getattr__(self, v) :
+            return getattr(self._dbsequence, v)
+
+        def open(self, key, *args, **kwargs) :
+            return self._dbsequence.open(bytes(key, charset), *args, **kwargs)
+
+        def get_key(self) :
+            return  self._dbsequence.get_key().decode(charset)
+
+        def get_dbp(self) :
+            return self._db
+
+    import string
+    string.letters=[chr(i) for i in xrange(65,91)]
+
+    bsddb._db.DBEnv_orig = bsddb._db.DBEnv
+    bsddb._db.DB_orig = bsddb._db.DB
+    bsddb._db.DBSequence_orig = bsddb._db.DBSequence
+
+    def do_proxy_db_py3k(flag) :
+        flag2 = do_proxy_db_py3k.flag
+        do_proxy_db_py3k.flag = flag
+        if flag :
+            bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = DBEnv_py3k
+            bsddb.DB = bsddb.db.DB = bsddb._db.DB = DB_py3k
+            bsddb._db.DBSequence = DBSequence_py3k
+        else :
+            bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = bsddb._db.DBEnv_orig
+            bsddb.DB = bsddb.db.DB = bsddb._db.DB = bsddb._db.DB_orig
+            bsddb._db.DBSequence = bsddb._db.DBSequence_orig
+        return flag2
+
+    do_proxy_db_py3k.flag = False
+    do_proxy_db_py3k(True)
+
+try:
+    # For Pythons w/distutils pybsddb
+    from bsddb3 import db, dbtables, dbutils, dbshelve, \
+            hashopen, btopen, rnopen, dbobj
+except ImportError:
+    # For Python 2.3
+    from bsddb import db, dbtables, dbutils, dbshelve, \
+            hashopen, btopen, rnopen, dbobj
+
+try:
+    from bsddb3 import test_support
+except ImportError:
+    if sys.version_info[0] < 3 :
+        from test import test_support
+    else :
+        from test import support as test_support
+
+
+try:
+    if sys.version_info[0] < 3 :
+        from threading import Thread, currentThread
+        del Thread, currentThread
+    else :
+        from threading import Thread, current_thread
+        del Thread, current_thread
+    have_threads = True
+except ImportError:
+    have_threads = False
+
+verbose = 0
+if 'verbose' in sys.argv:
+    verbose = 1
+    sys.argv.remove('verbose')
+
+if 'silent' in sys.argv:  # take care of old flag, just in case
+    verbose = 0
+    sys.argv.remove('silent')
+
+
+def print_versions():
+    print
+    print '-=' * 38
+    print db.DB_VERSION_STRING
+    print 'bsddb.db.version():   %s' % (db.version(), )
+    print 'bsddb.db.__version__: %s' % db.__version__
+    print 'bsddb.db.cvsid:       %s' % db.cvsid
+    print 'py module:            %s' % bsddb.__file__
+    print 'extension module:     %s' % bsddb._bsddb.__file__
+    print 'python version:       %s' % sys.version
+    print 'My pid:               %s' % os.getpid()
+    print '-=' * 38
+
+
+def get_new_path(name) :
+    get_new_path.mutex.acquire()
+    try :
+        import os
+        path=os.path.join(get_new_path.prefix,
+                name+"_"+str(os.getpid())+"_"+str(get_new_path.num))
+        get_new_path.num+=1
+    finally :
+        get_new_path.mutex.release()
+    return path
+
+def get_new_environment_path() :
+    path=get_new_path("environment")
+    import os
+    try:
+        os.makedirs(path,mode=0700)
+    except os.error:
+        test_support.rmtree(path)
+        os.makedirs(path)
+    return path
+
+def get_new_database_path() :
+    path=get_new_path("database")
+    import os
+    if os.path.exists(path) :
+        os.remove(path)
+    return path
+
+
+# This path can be overriden via "set_test_path_prefix()".
+import os, os.path
+get_new_path.prefix=os.path.join(os.sep,"tmp","z-Berkeley_DB")
+get_new_path.num=0
+
+def get_test_path_prefix() :
+    return get_new_path.prefix
+
+def set_test_path_prefix(path) :
+    get_new_path.prefix=path
+
+def remove_test_path_directory() :
+    test_support.rmtree(get_new_path.prefix)
+
+if have_threads :
+    import threading
+    get_new_path.mutex=threading.Lock()
+    del threading
+else :
+    class Lock(object) :
+        def acquire(self) :
+            pass
+        def release(self) :
+            pass
+    get_new_path.mutex=Lock()
+    del Lock
+
+
+
+class PrintInfoFakeTest(unittest.TestCase):
+    def testPrintVersions(self):
+        print_versions()
+
+
+# This little hack is for when this module is run as main and all the
+# other modules import it so they will still be able to get the right
+# verbose setting.  It's confusing but it works.
+if sys.version_info[0] < 3 :
+    import test_all
+    test_all.verbose = verbose
+else :
+    import sys
+    print >>sys.stderr, "Work to do!"
+
+
+def suite(module_prefix='', timing_check=None):
+    test_modules = [
+        'test_associate',
+        'test_basics',
+        'test_compare',
+        'test_compat',
+        'test_cursor_pget_bug',
+        'test_dbobj',
+        'test_dbshelve',
+        'test_dbtables',
+        'test_distributed_transactions',
+        'test_early_close',
+        'test_get_none',
+        'test_join',
+        'test_lock',
+        'test_misc',
+        'test_pickle',
+        'test_queue',
+        'test_recno',
+        'test_replication',
+        'test_sequence',
+        'test_thread',
+        ]
+
+    alltests = unittest.TestSuite()
+    for name in test_modules:
+        #module = __import__(name)
+        # Do it this way so that suite may be called externally via
+        # python's Lib/test/test_bsddb3.
+        module = __import__(module_prefix+name, globals(), locals(), name)
+
+        alltests.addTest(module.test_suite())
+        if timing_check:
+            alltests.addTest(unittest.makeSuite(timing_check))
+    return alltests
+
+
+def test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(PrintInfoFakeTest))
+    return suite
+
+
+if __name__ == '__main__':
+    print_versions()
+    unittest.main(defaultTest='suite')