|
1 from test.test_support import run_unittest |
|
2 import cgi |
|
3 import os |
|
4 import sys |
|
5 import tempfile |
|
6 import unittest |
|
7 from StringIO import StringIO |
|
8 |
|
9 class HackedSysModule: |
|
10 # The regression test will have real values in sys.argv, which |
|
11 # will completely confuse the test of the cgi module |
|
12 argv = [] |
|
13 stdin = sys.stdin |
|
14 |
|
15 cgi.sys = HackedSysModule() |
|
16 |
|
17 try: |
|
18 from cStringIO import StringIO |
|
19 except ImportError: |
|
20 from StringIO import StringIO |
|
21 |
|
22 class ComparableException: |
|
23 def __init__(self, err): |
|
24 self.err = err |
|
25 |
|
26 def __str__(self): |
|
27 return str(self.err) |
|
28 |
|
29 def __cmp__(self, anExc): |
|
30 if not isinstance(anExc, Exception): |
|
31 return -1 |
|
32 x = cmp(self.err.__class__, anExc.__class__) |
|
33 if x != 0: |
|
34 return x |
|
35 return cmp(self.err.args, anExc.args) |
|
36 |
|
37 def __getattr__(self, attr): |
|
38 return getattr(self.err, attr) |
|
39 |
|
40 def do_test(buf, method): |
|
41 env = {} |
|
42 if method == "GET": |
|
43 fp = None |
|
44 env['REQUEST_METHOD'] = 'GET' |
|
45 env['QUERY_STRING'] = buf |
|
46 elif method == "POST": |
|
47 fp = StringIO(buf) |
|
48 env['REQUEST_METHOD'] = 'POST' |
|
49 env['CONTENT_TYPE'] = 'application/x-www-form-urlencoded' |
|
50 env['CONTENT_LENGTH'] = str(len(buf)) |
|
51 else: |
|
52 raise ValueError, "unknown method: %s" % method |
|
53 try: |
|
54 return cgi.parse(fp, env, strict_parsing=1) |
|
55 except StandardError, err: |
|
56 return ComparableException(err) |
|
57 |
|
58 parse_strict_test_cases = [ |
|
59 ("", ValueError("bad query field: ''")), |
|
60 ("&", ValueError("bad query field: ''")), |
|
61 ("&&", ValueError("bad query field: ''")), |
|
62 (";", ValueError("bad query field: ''")), |
|
63 (";&;", ValueError("bad query field: ''")), |
|
64 # Should the next few really be valid? |
|
65 ("=", {}), |
|
66 ("=&=", {}), |
|
67 ("=;=", {}), |
|
68 # This rest seem to make sense |
|
69 ("=a", {'': ['a']}), |
|
70 ("&=a", ValueError("bad query field: ''")), |
|
71 ("=a&", ValueError("bad query field: ''")), |
|
72 ("=&a", ValueError("bad query field: 'a'")), |
|
73 ("b=a", {'b': ['a']}), |
|
74 ("b+=a", {'b ': ['a']}), |
|
75 ("a=b=a", {'a': ['b=a']}), |
|
76 ("a=+b=a", {'a': [' b=a']}), |
|
77 ("&b=a", ValueError("bad query field: ''")), |
|
78 ("b&=a", ValueError("bad query field: 'b'")), |
|
79 ("a=a+b&b=b+c", {'a': ['a b'], 'b': ['b c']}), |
|
80 ("a=a+b&a=b+a", {'a': ['a b', 'b a']}), |
|
81 ("x=1&y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}), |
|
82 ("x=1;y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}), |
|
83 ("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}), |
|
84 ("Hbc5161168c542333633315dee1182227:key_store_seqid=400006&cuyer=r&view=bustomer&order_id=0bb2e248638833d48cb7fed300000f1b&expire=964546263&lobale=en-US&kid=130003.300038&ss=env", |
|
85 {'Hbc5161168c542333633315dee1182227:key_store_seqid': ['400006'], |
|
86 'cuyer': ['r'], |
|
87 'expire': ['964546263'], |
|
88 'kid': ['130003.300038'], |
|
89 'lobale': ['en-US'], |
|
90 'order_id': ['0bb2e248638833d48cb7fed300000f1b'], |
|
91 'ss': ['env'], |
|
92 'view': ['bustomer'], |
|
93 }), |
|
94 |
|
95 ("group_id=5470&set=custom&_assigned_to=31392&_status=1&_category=100&SUBMIT=Browse", |
|
96 {'SUBMIT': ['Browse'], |
|
97 '_assigned_to': ['31392'], |
|
98 '_category': ['100'], |
|
99 '_status': ['1'], |
|
100 'group_id': ['5470'], |
|
101 'set': ['custom'], |
|
102 }) |
|
103 ] |
|
104 |
|
105 def norm(list): |
|
106 if type(list) == type([]): |
|
107 list.sort() |
|
108 return list |
|
109 |
|
110 def first_elts(list): |
|
111 return map(lambda x:x[0], list) |
|
112 |
|
113 def first_second_elts(list): |
|
114 return map(lambda p:(p[0], p[1][0]), list) |
|
115 |
|
116 def gen_result(data, environ): |
|
117 fake_stdin = StringIO(data) |
|
118 fake_stdin.seek(0) |
|
119 form = cgi.FieldStorage(fp=fake_stdin, environ=environ) |
|
120 |
|
121 result = {} |
|
122 for k, v in dict(form).items(): |
|
123 result[k] = type(v) is list and form.getlist(k) or v.value |
|
124 |
|
125 return result |
|
126 |
|
127 class CgiTests(unittest.TestCase): |
|
128 |
|
129 def test_strict(self): |
|
130 for orig, expect in parse_strict_test_cases: |
|
131 # Test basic parsing |
|
132 d = do_test(orig, "GET") |
|
133 self.assertEqual(d, expect, "Error parsing %s" % repr(orig)) |
|
134 d = do_test(orig, "POST") |
|
135 self.assertEqual(d, expect, "Error parsing %s" % repr(orig)) |
|
136 |
|
137 env = {'QUERY_STRING': orig} |
|
138 fcd = cgi.FormContentDict(env) |
|
139 sd = cgi.SvFormContentDict(env) |
|
140 fs = cgi.FieldStorage(environ=env) |
|
141 if type(expect) == type({}): |
|
142 # test dict interface |
|
143 self.assertEqual(len(expect), len(fcd)) |
|
144 self.assertEqual(norm(expect.keys()), norm(fcd.keys())) |
|
145 self.assertEqual(norm(expect.values()), norm(fcd.values())) |
|
146 self.assertEqual(norm(expect.items()), norm(fcd.items())) |
|
147 self.assertEqual(fcd.get("nonexistent field", "default"), "default") |
|
148 self.assertEqual(len(sd), len(fs)) |
|
149 self.assertEqual(norm(sd.keys()), norm(fs.keys())) |
|
150 self.assertEqual(fs.getvalue("nonexistent field", "default"), "default") |
|
151 # test individual fields |
|
152 for key in expect.keys(): |
|
153 expect_val = expect[key] |
|
154 self.assert_(fcd.has_key(key)) |
|
155 self.assertEqual(norm(fcd[key]), norm(expect[key])) |
|
156 self.assertEqual(fcd.get(key, "default"), fcd[key]) |
|
157 self.assert_(fs.has_key(key)) |
|
158 if len(expect_val) > 1: |
|
159 single_value = 0 |
|
160 else: |
|
161 single_value = 1 |
|
162 try: |
|
163 val = sd[key] |
|
164 except IndexError: |
|
165 self.failIf(single_value) |
|
166 self.assertEqual(fs.getvalue(key), expect_val) |
|
167 else: |
|
168 self.assert_(single_value) |
|
169 self.assertEqual(val, expect_val[0]) |
|
170 self.assertEqual(fs.getvalue(key), expect_val[0]) |
|
171 self.assertEqual(norm(sd.getlist(key)), norm(expect_val)) |
|
172 if single_value: |
|
173 self.assertEqual(norm(sd.values()), |
|
174 first_elts(norm(expect.values()))) |
|
175 self.assertEqual(norm(sd.items()), |
|
176 first_second_elts(norm(expect.items()))) |
|
177 |
|
178 def test_weird_formcontentdict(self): |
|
179 # Test the weird FormContentDict classes |
|
180 env = {'QUERY_STRING': "x=1&y=2.0&z=2-3.%2b0&1=1abc"} |
|
181 expect = {'x': 1, 'y': 2.0, 'z': '2-3.+0', '1': '1abc'} |
|
182 d = cgi.InterpFormContentDict(env) |
|
183 for k, v in expect.items(): |
|
184 self.assertEqual(d[k], v) |
|
185 for k, v in d.items(): |
|
186 self.assertEqual(expect[k], v) |
|
187 self.assertEqual(norm(expect.values()), norm(d.values())) |
|
188 |
|
189 def test_log(self): |
|
190 cgi.log("Testing") |
|
191 |
|
192 cgi.logfp = StringIO() |
|
193 cgi.initlog("%s", "Testing initlog 1") |
|
194 cgi.log("%s", "Testing log 2") |
|
195 self.assertEqual(cgi.logfp.getvalue(), "Testing initlog 1\nTesting log 2\n") |
|
196 if os.path.exists("/dev/null"): |
|
197 cgi.logfp = None |
|
198 cgi.logfile = "/dev/null" |
|
199 cgi.initlog("%s", "Testing log 3") |
|
200 cgi.log("Testing log 4") |
|
201 |
|
202 def test_fieldstorage_readline(self): |
|
203 # FieldStorage uses readline, which has the capacity to read all |
|
204 # contents of the input file into memory; we use readline's size argument |
|
205 # to prevent that for files that do not contain any newlines in |
|
206 # non-GET/HEAD requests |
|
207 class TestReadlineFile: |
|
208 def __init__(self, file): |
|
209 self.file = file |
|
210 self.numcalls = 0 |
|
211 |
|
212 def readline(self, size=None): |
|
213 self.numcalls += 1 |
|
214 if size: |
|
215 return self.file.readline(size) |
|
216 else: |
|
217 return self.file.readline() |
|
218 |
|
219 def __getattr__(self, name): |
|
220 file = self.__dict__['file'] |
|
221 a = getattr(file, name) |
|
222 if not isinstance(a, int): |
|
223 setattr(self, name, a) |
|
224 return a |
|
225 |
|
226 f = TestReadlineFile(tempfile.TemporaryFile()) |
|
227 f.write('x' * 256 * 1024) |
|
228 f.seek(0) |
|
229 env = {'REQUEST_METHOD':'PUT'} |
|
230 fs = cgi.FieldStorage(fp=f, environ=env) |
|
231 # if we're not chunking properly, readline is only called twice |
|
232 # (by read_binary); if we are chunking properly, it will be called 5 times |
|
233 # as long as the chunksize is 1 << 16. |
|
234 self.assert_(f.numcalls > 2) |
|
235 |
|
236 def test_fieldstorage_multipart(self): |
|
237 #Test basic FieldStorage multipart parsing |
|
238 env = {'REQUEST_METHOD':'POST', 'CONTENT_TYPE':'multipart/form-data; boundary=---------------------------721837373350705526688164684', 'CONTENT_LENGTH':'558'} |
|
239 postdata = """-----------------------------721837373350705526688164684 |
|
240 Content-Disposition: form-data; name="id" |
|
241 |
|
242 1234 |
|
243 -----------------------------721837373350705526688164684 |
|
244 Content-Disposition: form-data; name="title" |
|
245 |
|
246 |
|
247 -----------------------------721837373350705526688164684 |
|
248 Content-Disposition: form-data; name="file"; filename="test.txt" |
|
249 Content-Type: text/plain |
|
250 |
|
251 Testing 123. |
|
252 |
|
253 -----------------------------721837373350705526688164684 |
|
254 Content-Disposition: form-data; name="submit" |
|
255 |
|
256 Add\x20 |
|
257 -----------------------------721837373350705526688164684-- |
|
258 """ |
|
259 fs = cgi.FieldStorage(fp=StringIO(postdata), environ=env) |
|
260 self.assertEquals(len(fs.list), 4) |
|
261 expect = [{'name':'id', 'filename':None, 'value':'1234'}, |
|
262 {'name':'title', 'filename':None, 'value':''}, |
|
263 {'name':'file', 'filename':'test.txt','value':'Testing 123.\n'}, |
|
264 {'name':'submit', 'filename':None, 'value':' Add '}] |
|
265 for x in range(len(fs.list)): |
|
266 for k, exp in expect[x].items(): |
|
267 got = getattr(fs.list[x], k) |
|
268 self.assertEquals(got, exp) |
|
269 |
|
270 _qs_result = { |
|
271 'key1': 'value1', |
|
272 'key2': ['value2x', 'value2y'], |
|
273 'key3': 'value3', |
|
274 'key4': 'value4' |
|
275 } |
|
276 def testQSAndUrlEncode(self): |
|
277 data = "key2=value2x&key3=value3&key4=value4" |
|
278 environ = { |
|
279 'CONTENT_LENGTH': str(len(data)), |
|
280 'CONTENT_TYPE': 'application/x-www-form-urlencoded', |
|
281 'QUERY_STRING': 'key1=value1&key2=value2y', |
|
282 'REQUEST_METHOD': 'POST', |
|
283 } |
|
284 v = gen_result(data, environ) |
|
285 self.assertEqual(self._qs_result, v) |
|
286 |
|
287 def testQSAndFormData(self): |
|
288 data = """ |
|
289 ---123 |
|
290 Content-Disposition: form-data; name="key2" |
|
291 |
|
292 value2y |
|
293 ---123 |
|
294 Content-Disposition: form-data; name="key3" |
|
295 |
|
296 value3 |
|
297 ---123 |
|
298 Content-Disposition: form-data; name="key4" |
|
299 |
|
300 value4 |
|
301 ---123-- |
|
302 """ |
|
303 environ = { |
|
304 'CONTENT_LENGTH': str(len(data)), |
|
305 'CONTENT_TYPE': 'multipart/form-data; boundary=-123', |
|
306 'QUERY_STRING': 'key1=value1&key2=value2x', |
|
307 'REQUEST_METHOD': 'POST', |
|
308 } |
|
309 v = gen_result(data, environ) |
|
310 self.assertEqual(self._qs_result, v) |
|
311 |
|
312 def testQSAndFormDataFile(self): |
|
313 data = """ |
|
314 ---123 |
|
315 Content-Disposition: form-data; name="key2" |
|
316 |
|
317 value2y |
|
318 ---123 |
|
319 Content-Disposition: form-data; name="key3" |
|
320 |
|
321 value3 |
|
322 ---123 |
|
323 Content-Disposition: form-data; name="key4" |
|
324 |
|
325 value4 |
|
326 ---123 |
|
327 Content-Disposition: form-data; name="upload"; filename="fake.txt" |
|
328 Content-Type: text/plain |
|
329 |
|
330 this is the content of the fake file |
|
331 |
|
332 ---123-- |
|
333 """ |
|
334 environ = { |
|
335 'CONTENT_LENGTH': str(len(data)), |
|
336 'CONTENT_TYPE': 'multipart/form-data; boundary=-123', |
|
337 'QUERY_STRING': 'key1=value1&key2=value2x', |
|
338 'REQUEST_METHOD': 'POST', |
|
339 } |
|
340 result = self._qs_result.copy() |
|
341 result.update({ |
|
342 'upload': 'this is the content of the fake file\n' |
|
343 }) |
|
344 v = gen_result(data, environ) |
|
345 self.assertEqual(result, v) |
|
346 |
|
347 def test_deprecated_parse_qs(self): |
|
348 # this func is moved to urlparse, this is just a sanity check |
|
349 self.assertEqual({'a': ['A1'], 'B': ['B3'], 'b': ['B2']}, |
|
350 cgi.parse_qs('a=A1&b=B2&B=B3')) |
|
351 |
|
352 def test_deprecated_parse_qsl(self): |
|
353 # this func is moved to urlparse, this is just a sanity check |
|
354 self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')], |
|
355 cgi.parse_qsl('a=A1&b=B2&B=B3')) |
|
356 |
|
357 |
|
358 def test_main(): |
|
359 run_unittest(CgiTests) |
|
360 |
|
361 if __name__ == '__main__': |
|
362 test_main() |