|
1 #-*- coding: ISO-8859-1 -*- |
|
2 # pysqlite2/test/hooks.py: tests for various SQLite-specific hooks |
|
3 # |
|
4 # Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de> |
|
5 # |
|
6 # This file is part of pysqlite. |
|
7 # |
|
8 # This software is provided 'as-is', without any express or implied |
|
9 # warranty. In no event will the authors be held liable for any damages |
|
10 # arising from the use of this software. |
|
11 # |
|
12 # Permission is granted to anyone to use this software for any purpose, |
|
13 # including commercial applications, and to alter it and redistribute it |
|
14 # freely, subject to the following restrictions: |
|
15 # |
|
16 # 1. The origin of this software must not be misrepresented; you must not |
|
17 # claim that you wrote the original software. If you use this software |
|
18 # in a product, an acknowledgment in the product documentation would be |
|
19 # appreciated but is not required. |
|
20 # 2. Altered source versions must be plainly marked as such, and must not be |
|
21 # misrepresented as being the original software. |
|
22 # 3. This notice may not be removed or altered from any source distribution. |
|
23 |
|
24 import os, unittest |
|
25 import sqlite3 as sqlite |
|
26 |
|
27 class CollationTests(unittest.TestCase): |
|
28 def setUp(self): |
|
29 pass |
|
30 |
|
31 def tearDown(self): |
|
32 pass |
|
33 |
|
34 def CheckCreateCollationNotCallable(self): |
|
35 con = sqlite.connect(":memory:") |
|
36 try: |
|
37 con.create_collation("X", 42) |
|
38 self.fail("should have raised a TypeError") |
|
39 except TypeError, e: |
|
40 self.failUnlessEqual(e.args[0], "parameter must be callable") |
|
41 |
|
42 def CheckCreateCollationNotAscii(self): |
|
43 con = sqlite.connect(":memory:") |
|
44 try: |
|
45 con.create_collation("collä", cmp) |
|
46 self.fail("should have raised a ProgrammingError") |
|
47 except sqlite.ProgrammingError, e: |
|
48 pass |
|
49 |
|
50 def CheckCollationIsUsed(self): |
|
51 if sqlite.version_info < (3, 2, 1): # old SQLite versions crash on this test |
|
52 return |
|
53 def mycoll(x, y): |
|
54 # reverse order |
|
55 return -cmp(x, y) |
|
56 |
|
57 con = sqlite.connect(":memory:") |
|
58 con.create_collation("mycoll", mycoll) |
|
59 sql = """ |
|
60 select x from ( |
|
61 select 'a' as x |
|
62 union |
|
63 select 'b' as x |
|
64 union |
|
65 select 'c' as x |
|
66 ) order by x collate mycoll |
|
67 """ |
|
68 result = con.execute(sql).fetchall() |
|
69 if result[0][0] != "c" or result[1][0] != "b" or result[2][0] != "a": |
|
70 self.fail("the expected order was not returned") |
|
71 |
|
72 con.create_collation("mycoll", None) |
|
73 try: |
|
74 result = con.execute(sql).fetchall() |
|
75 self.fail("should have raised an OperationalError") |
|
76 except sqlite.OperationalError, e: |
|
77 self.failUnlessEqual(e.args[0].lower(), "no such collation sequence: mycoll") |
|
78 |
|
79 def CheckCollationRegisterTwice(self): |
|
80 """ |
|
81 Register two different collation functions under the same name. |
|
82 Verify that the last one is actually used. |
|
83 """ |
|
84 con = sqlite.connect(":memory:") |
|
85 con.create_collation("mycoll", cmp) |
|
86 con.create_collation("mycoll", lambda x, y: -cmp(x, y)) |
|
87 result = con.execute(""" |
|
88 select x from (select 'a' as x union select 'b' as x) order by x collate mycoll |
|
89 """).fetchall() |
|
90 if result[0][0] != 'b' or result[1][0] != 'a': |
|
91 self.fail("wrong collation function is used") |
|
92 |
|
93 def CheckDeregisterCollation(self): |
|
94 """ |
|
95 Register a collation, then deregister it. Make sure an error is raised if we try |
|
96 to use it. |
|
97 """ |
|
98 con = sqlite.connect(":memory:") |
|
99 con.create_collation("mycoll", cmp) |
|
100 con.create_collation("mycoll", None) |
|
101 try: |
|
102 con.execute("select 'a' as x union select 'b' as x order by x collate mycoll") |
|
103 self.fail("should have raised an OperationalError") |
|
104 except sqlite.OperationalError, e: |
|
105 if not e.args[0].startswith("no such collation sequence"): |
|
106 self.fail("wrong OperationalError raised") |
|
107 |
|
108 class ProgressTests(unittest.TestCase): |
|
109 def CheckProgressHandlerUsed(self): |
|
110 """ |
|
111 Test that the progress handler is invoked once it is set. |
|
112 """ |
|
113 con = sqlite.connect(":memory:") |
|
114 progress_calls = [] |
|
115 def progress(): |
|
116 progress_calls.append(None) |
|
117 return 0 |
|
118 con.set_progress_handler(progress, 1) |
|
119 con.execute(""" |
|
120 create table foo(a, b) |
|
121 """) |
|
122 self.failUnless(progress_calls) |
|
123 |
|
124 |
|
125 def CheckOpcodeCount(self): |
|
126 """ |
|
127 Test that the opcode argument is respected. |
|
128 """ |
|
129 con = sqlite.connect(":memory:") |
|
130 progress_calls = [] |
|
131 def progress(): |
|
132 progress_calls.append(None) |
|
133 return 0 |
|
134 con.set_progress_handler(progress, 1) |
|
135 curs = con.cursor() |
|
136 curs.execute(""" |
|
137 create table foo (a, b) |
|
138 """) |
|
139 first_count = len(progress_calls) |
|
140 progress_calls = [] |
|
141 con.set_progress_handler(progress, 2) |
|
142 curs.execute(""" |
|
143 create table bar (a, b) |
|
144 """) |
|
145 second_count = len(progress_calls) |
|
146 self.failUnless(first_count > second_count) |
|
147 |
|
148 def CheckCancelOperation(self): |
|
149 """ |
|
150 Test that returning a non-zero value stops the operation in progress. |
|
151 """ |
|
152 con = sqlite.connect(":memory:") |
|
153 progress_calls = [] |
|
154 def progress(): |
|
155 progress_calls.append(None) |
|
156 return 1 |
|
157 con.set_progress_handler(progress, 1) |
|
158 curs = con.cursor() |
|
159 self.assertRaises( |
|
160 sqlite.OperationalError, |
|
161 curs.execute, |
|
162 "create table bar (a, b)") |
|
163 |
|
164 def CheckClearHandler(self): |
|
165 """ |
|
166 Test that setting the progress handler to None clears the previously set handler. |
|
167 """ |
|
168 con = sqlite.connect(":memory:") |
|
169 action = 0 |
|
170 def progress(): |
|
171 action = 1 |
|
172 return 0 |
|
173 con.set_progress_handler(progress, 1) |
|
174 con.set_progress_handler(None, 1) |
|
175 con.execute("select 1 union select 2 union select 3").fetchall() |
|
176 self.failUnlessEqual(action, 0, "progress handler was not cleared") |
|
177 |
|
178 def suite(): |
|
179 collation_suite = unittest.makeSuite(CollationTests, "Check") |
|
180 progress_suite = unittest.makeSuite(ProgressTests, "Check") |
|
181 return unittest.TestSuite((collation_suite, progress_suite)) |
|
182 |
|
183 def test(): |
|
184 runner = unittest.TextTestRunner() |
|
185 runner.run(suite()) |
|
186 |
|
187 if __name__ == "__main__": |
|
188 test() |