|
1 # As a test suite for the os module, this is woefully inadequate, but this |
|
2 # does add tests for a few functions which have been determined to be more |
|
3 # portable than they had been thought to be. |
|
4 |
|
5 import os |
|
6 import unittest |
|
7 import warnings |
|
8 import sys |
|
9 from test import test_support |
|
10 |
|
11 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__) |
|
12 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__) |
|
13 |
|
14 # Tests creating TESTFN |
|
15 class FileTests(unittest.TestCase): |
|
16 def setUp(self): |
|
17 if os.path.exists(test_support.TESTFN): |
|
18 os.unlink(test_support.TESTFN) |
|
19 tearDown = setUp |
|
20 |
|
21 def test_access(self): |
|
22 f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR) |
|
23 os.close(f) |
|
24 self.assert_(os.access(test_support.TESTFN, os.W_OK)) |
|
25 |
|
26 def test_closerange(self): |
|
27 first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR) |
|
28 # We must allocate two consecutive file descriptors, otherwise |
|
29 # it will mess up other file descriptors (perhaps even the three |
|
30 # standard ones). |
|
31 second = os.dup(first) |
|
32 try: |
|
33 retries = 0 |
|
34 while second != first + 1: |
|
35 os.close(first) |
|
36 retries += 1 |
|
37 if retries > 10: |
|
38 # XXX test skipped |
|
39 print >> sys.stderr, ( |
|
40 "couldn't allocate two consecutive fds, " |
|
41 "skipping test_closerange") |
|
42 return |
|
43 first, second = second, os.dup(second) |
|
44 finally: |
|
45 os.close(second) |
|
46 # close a fd that is open, and one that isn't |
|
47 os.closerange(first, first + 2) |
|
48 self.assertRaises(OSError, os.write, first, "a") |
|
49 |
|
50 def test_rename(self): |
|
51 path = unicode(test_support.TESTFN) |
|
52 old = sys.getrefcount(path) |
|
53 self.assertRaises(TypeError, os.rename, path, 0) |
|
54 new = sys.getrefcount(path) |
|
55 self.assertEqual(old, new) |
|
56 |
|
57 |
|
58 class TemporaryFileTests(unittest.TestCase): |
|
59 def setUp(self): |
|
60 self.files = [] |
|
61 os.mkdir(test_support.TESTFN) |
|
62 |
|
63 def tearDown(self): |
|
64 for name in self.files: |
|
65 os.unlink(name) |
|
66 os.rmdir(test_support.TESTFN) |
|
67 |
|
68 def check_tempfile(self, name): |
|
69 # make sure it doesn't already exist: |
|
70 self.failIf(os.path.exists(name), |
|
71 "file already exists for temporary file") |
|
72 # make sure we can create the file |
|
73 open(name, "w") |
|
74 self.files.append(name) |
|
75 |
|
76 def test_tempnam(self): |
|
77 if not hasattr(os, "tempnam"): |
|
78 return |
|
79 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, |
|
80 r"test_os$") |
|
81 self.check_tempfile(os.tempnam()) |
|
82 |
|
83 name = os.tempnam(test_support.TESTFN) |
|
84 self.check_tempfile(name) |
|
85 |
|
86 name = os.tempnam(test_support.TESTFN, "pfx") |
|
87 self.assert_(os.path.basename(name)[:3] == "pfx") |
|
88 self.check_tempfile(name) |
|
89 |
|
90 def test_tmpfile(self): |
|
91 if not hasattr(os, "tmpfile"): |
|
92 return |
|
93 # As with test_tmpnam() below, the Windows implementation of tmpfile() |
|
94 # attempts to create a file in the root directory of the current drive. |
|
95 # On Vista and Server 2008, this test will always fail for normal users |
|
96 # as writing to the root directory requires elevated privileges. With |
|
97 # XP and below, the semantics of tmpfile() are the same, but the user |
|
98 # running the test is more likely to have administrative privileges on |
|
99 # their account already. If that's the case, then os.tmpfile() should |
|
100 # work. In order to make this test as useful as possible, rather than |
|
101 # trying to detect Windows versions or whether or not the user has the |
|
102 # right permissions, just try and create a file in the root directory |
|
103 # and see if it raises a 'Permission denied' OSError. If it does, then |
|
104 # test that a subsequent call to os.tmpfile() raises the same error. If |
|
105 # it doesn't, assume we're on XP or below and the user running the test |
|
106 # has administrative privileges, and proceed with the test as normal. |
|
107 if sys.platform == 'win32': |
|
108 name = '\\python_test_os_test_tmpfile.txt' |
|
109 if os.path.exists(name): |
|
110 os.remove(name) |
|
111 try: |
|
112 fp = open(name, 'w') |
|
113 except IOError, first: |
|
114 # open() failed, assert tmpfile() fails in the same way. |
|
115 # Although open() raises an IOError and os.tmpfile() raises an |
|
116 # OSError(), 'args' will be (13, 'Permission denied') in both |
|
117 # cases. |
|
118 try: |
|
119 fp = os.tmpfile() |
|
120 except OSError, second: |
|
121 self.assertEqual(first.args, second.args) |
|
122 else: |
|
123 self.fail("expected os.tmpfile() to raise OSError") |
|
124 return |
|
125 else: |
|
126 # open() worked, therefore, tmpfile() should work. Close our |
|
127 # dummy file and proceed with the test as normal. |
|
128 fp.close() |
|
129 os.remove(name) |
|
130 |
|
131 fp = os.tmpfile() |
|
132 fp.write("foobar") |
|
133 fp.seek(0,0) |
|
134 s = fp.read() |
|
135 fp.close() |
|
136 self.assert_(s == "foobar") |
|
137 |
|
138 def test_tmpnam(self): |
|
139 import sys |
|
140 if not hasattr(os, "tmpnam"): |
|
141 return |
|
142 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, |
|
143 r"test_os$") |
|
144 name = os.tmpnam() |
|
145 if sys.platform in ("win32",): |
|
146 # The Windows tmpnam() seems useless. From the MS docs: |
|
147 # |
|
148 # The character string that tmpnam creates consists of |
|
149 # the path prefix, defined by the entry P_tmpdir in the |
|
150 # file STDIO.H, followed by a sequence consisting of the |
|
151 # digit characters '0' through '9'; the numerical value |
|
152 # of this string is in the range 1 - 65,535. Changing the |
|
153 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not |
|
154 # change the operation of tmpnam. |
|
155 # |
|
156 # The really bizarre part is that, at least under MSVC6, |
|
157 # P_tmpdir is "\\". That is, the path returned refers to |
|
158 # the root of the current drive. That's a terrible place to |
|
159 # put temp files, and, depending on privileges, the user |
|
160 # may not even be able to open a file in the root directory. |
|
161 self.failIf(os.path.exists(name), |
|
162 "file already exists for temporary file") |
|
163 else: |
|
164 self.check_tempfile(name) |
|
165 |
|
166 # Test attributes on return values from os.*stat* family. |
|
167 class StatAttributeTests(unittest.TestCase): |
|
168 def setUp(self): |
|
169 os.mkdir(test_support.TESTFN) |
|
170 self.fname = os.path.join(test_support.TESTFN, "f1") |
|
171 f = open(self.fname, 'wb') |
|
172 f.write("ABC") |
|
173 f.close() |
|
174 |
|
175 def tearDown(self): |
|
176 os.unlink(self.fname) |
|
177 os.rmdir(test_support.TESTFN) |
|
178 |
|
179 def test_stat_attributes(self): |
|
180 if not hasattr(os, "stat"): |
|
181 return |
|
182 |
|
183 import stat |
|
184 result = os.stat(self.fname) |
|
185 |
|
186 # Make sure direct access works |
|
187 self.assertEquals(result[stat.ST_SIZE], 3) |
|
188 self.assertEquals(result.st_size, 3) |
|
189 |
|
190 import sys |
|
191 |
|
192 # Make sure all the attributes are there |
|
193 members = dir(result) |
|
194 for name in dir(stat): |
|
195 if name[:3] == 'ST_': |
|
196 attr = name.lower() |
|
197 if name.endswith("TIME"): |
|
198 def trunc(x): return int(x) |
|
199 else: |
|
200 def trunc(x): return x |
|
201 self.assertEquals(trunc(getattr(result, attr)), |
|
202 result[getattr(stat, name)]) |
|
203 self.assert_(attr in members) |
|
204 |
|
205 try: |
|
206 result[200] |
|
207 self.fail("No exception thrown") |
|
208 except IndexError: |
|
209 pass |
|
210 |
|
211 # Make sure that assignment fails |
|
212 try: |
|
213 result.st_mode = 1 |
|
214 self.fail("No exception thrown") |
|
215 except TypeError: |
|
216 pass |
|
217 |
|
218 try: |
|
219 result.st_rdev = 1 |
|
220 self.fail("No exception thrown") |
|
221 except (AttributeError, TypeError): |
|
222 pass |
|
223 |
|
224 try: |
|
225 result.parrot = 1 |
|
226 self.fail("No exception thrown") |
|
227 except AttributeError: |
|
228 pass |
|
229 |
|
230 # Use the stat_result constructor with a too-short tuple. |
|
231 try: |
|
232 result2 = os.stat_result((10,)) |
|
233 self.fail("No exception thrown") |
|
234 except TypeError: |
|
235 pass |
|
236 |
|
237 # Use the constructr with a too-long tuple. |
|
238 try: |
|
239 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) |
|
240 except TypeError: |
|
241 pass |
|
242 |
|
243 |
|
244 def test_statvfs_attributes(self): |
|
245 if not hasattr(os, "statvfs"): |
|
246 return |
|
247 |
|
248 try: |
|
249 result = os.statvfs(self.fname) |
|
250 except OSError, e: |
|
251 # On AtheOS, glibc always returns ENOSYS |
|
252 import errno |
|
253 if e.errno == errno.ENOSYS: |
|
254 return |
|
255 |
|
256 # Make sure direct access works |
|
257 self.assertEquals(result.f_bfree, result[3]) |
|
258 |
|
259 # Make sure all the attributes are there. |
|
260 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', |
|
261 'ffree', 'favail', 'flag', 'namemax') |
|
262 for value, member in enumerate(members): |
|
263 self.assertEquals(getattr(result, 'f_' + member), result[value]) |
|
264 |
|
265 # Make sure that assignment really fails |
|
266 try: |
|
267 result.f_bfree = 1 |
|
268 self.fail("No exception thrown") |
|
269 except TypeError: |
|
270 pass |
|
271 |
|
272 try: |
|
273 result.parrot = 1 |
|
274 self.fail("No exception thrown") |
|
275 except AttributeError: |
|
276 pass |
|
277 |
|
278 # Use the constructor with a too-short tuple. |
|
279 try: |
|
280 result2 = os.statvfs_result((10,)) |
|
281 self.fail("No exception thrown") |
|
282 except TypeError: |
|
283 pass |
|
284 |
|
285 # Use the constructr with a too-long tuple. |
|
286 try: |
|
287 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) |
|
288 except TypeError: |
|
289 pass |
|
290 |
|
291 def test_utime_dir(self): |
|
292 delta = 1000000 |
|
293 st = os.stat(test_support.TESTFN) |
|
294 # round to int, because some systems may support sub-second |
|
295 # time stamps in stat, but not in utime. |
|
296 os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta))) |
|
297 st2 = os.stat(test_support.TESTFN) |
|
298 self.assertEquals(st2.st_mtime, int(st.st_mtime-delta)) |
|
299 |
|
300 # Restrict test to Win32, since there is no guarantee other |
|
301 # systems support centiseconds |
|
302 if sys.platform == 'win32': |
|
303 def get_file_system(path): |
|
304 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' |
|
305 import ctypes |
|
306 kernel32 = ctypes.windll.kernel32 |
|
307 buf = ctypes.create_string_buffer("", 100) |
|
308 if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)): |
|
309 return buf.value |
|
310 |
|
311 if get_file_system(test_support.TESTFN) == "NTFS": |
|
312 def test_1565150(self): |
|
313 t1 = 1159195039.25 |
|
314 os.utime(self.fname, (t1, t1)) |
|
315 self.assertEquals(os.stat(self.fname).st_mtime, t1) |
|
316 |
|
317 def test_1686475(self): |
|
318 # Verify that an open file can be stat'ed |
|
319 try: |
|
320 os.stat(r"c:\pagefile.sys") |
|
321 except WindowsError, e: |
|
322 if e.errno == 2: # file does not exist; cannot run test |
|
323 return |
|
324 self.fail("Could not stat pagefile.sys") |
|
325 |
|
326 from test import mapping_tests |
|
327 |
|
328 class EnvironTests(mapping_tests.BasicTestMappingProtocol): |
|
329 """check that os.environ object conform to mapping protocol""" |
|
330 type2test = None |
|
331 def _reference(self): |
|
332 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} |
|
333 def _empty_mapping(self): |
|
334 os.environ.clear() |
|
335 return os.environ |
|
336 def setUp(self): |
|
337 self.__save = dict(os.environ) |
|
338 os.environ.clear() |
|
339 def tearDown(self): |
|
340 os.environ.clear() |
|
341 os.environ.update(self.__save) |
|
342 |
|
343 # Bug 1110478 |
|
344 def test_update2(self): |
|
345 if os.path.exists("/bin/sh"): |
|
346 os.environ.update(HELLO="World") |
|
347 value = os.popen("/bin/sh -c 'echo $HELLO'").read().strip() |
|
348 self.assertEquals(value, "World") |
|
349 |
|
350 class WalkTests(unittest.TestCase): |
|
351 """Tests for os.walk().""" |
|
352 |
|
353 def test_traversal(self): |
|
354 import os |
|
355 from os.path import join |
|
356 |
|
357 # Build: |
|
358 # TESTFN/ |
|
359 # TEST1/ a file kid and two directory kids |
|
360 # tmp1 |
|
361 # SUB1/ a file kid and a directory kid |
|
362 # tmp2 |
|
363 # SUB11/ no kids |
|
364 # SUB2/ a file kid and a dirsymlink kid |
|
365 # tmp3 |
|
366 # link/ a symlink to TESTFN.2 |
|
367 # TEST2/ |
|
368 # tmp4 a lone file |
|
369 walk_path = join(test_support.TESTFN, "TEST1") |
|
370 sub1_path = join(walk_path, "SUB1") |
|
371 sub11_path = join(sub1_path, "SUB11") |
|
372 sub2_path = join(walk_path, "SUB2") |
|
373 tmp1_path = join(walk_path, "tmp1") |
|
374 tmp2_path = join(sub1_path, "tmp2") |
|
375 tmp3_path = join(sub2_path, "tmp3") |
|
376 link_path = join(sub2_path, "link") |
|
377 t2_path = join(test_support.TESTFN, "TEST2") |
|
378 tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4") |
|
379 |
|
380 # Create stuff. |
|
381 os.makedirs(sub11_path) |
|
382 os.makedirs(sub2_path) |
|
383 os.makedirs(t2_path) |
|
384 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: |
|
385 f = file(path, "w") |
|
386 f.write("I'm " + path + " and proud of it. Blame test_os.\n") |
|
387 f.close() |
|
388 if hasattr(os, "symlink"): |
|
389 os.symlink(os.path.abspath(t2_path), link_path) |
|
390 sub2_tree = (sub2_path, ["link"], ["tmp3"]) |
|
391 else: |
|
392 sub2_tree = (sub2_path, [], ["tmp3"]) |
|
393 |
|
394 # Walk top-down. |
|
395 all = list(os.walk(walk_path)) |
|
396 self.assertEqual(len(all), 4) |
|
397 # We can't know which order SUB1 and SUB2 will appear in. |
|
398 # Not flipped: TESTFN, SUB1, SUB11, SUB2 |
|
399 # flipped: TESTFN, SUB2, SUB1, SUB11 |
|
400 flipped = all[0][1][0] != "SUB1" |
|
401 all[0][1].sort() |
|
402 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) |
|
403 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"])) |
|
404 self.assertEqual(all[2 + flipped], (sub11_path, [], [])) |
|
405 self.assertEqual(all[3 - 2 * flipped], sub2_tree) |
|
406 |
|
407 # Prune the search. |
|
408 all = [] |
|
409 for root, dirs, files in os.walk(walk_path): |
|
410 all.append((root, dirs, files)) |
|
411 # Don't descend into SUB1. |
|
412 if 'SUB1' in dirs: |
|
413 # Note that this also mutates the dirs we appended to all! |
|
414 dirs.remove('SUB1') |
|
415 self.assertEqual(len(all), 2) |
|
416 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"])) |
|
417 self.assertEqual(all[1], sub2_tree) |
|
418 |
|
419 # Walk bottom-up. |
|
420 all = list(os.walk(walk_path, topdown=False)) |
|
421 self.assertEqual(len(all), 4) |
|
422 # We can't know which order SUB1 and SUB2 will appear in. |
|
423 # Not flipped: SUB11, SUB1, SUB2, TESTFN |
|
424 # flipped: SUB2, SUB11, SUB1, TESTFN |
|
425 flipped = all[3][1][0] != "SUB1" |
|
426 all[3][1].sort() |
|
427 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) |
|
428 self.assertEqual(all[flipped], (sub11_path, [], [])) |
|
429 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"])) |
|
430 self.assertEqual(all[2 - 2 * flipped], sub2_tree) |
|
431 |
|
432 if hasattr(os, "symlink"): |
|
433 # Walk, following symlinks. |
|
434 for root, dirs, files in os.walk(walk_path, followlinks=True): |
|
435 if root == link_path: |
|
436 self.assertEqual(dirs, []) |
|
437 self.assertEqual(files, ["tmp4"]) |
|
438 break |
|
439 else: |
|
440 self.fail("Didn't follow symlink with followlinks=True") |
|
441 |
|
442 def tearDown(self): |
|
443 # Tear everything down. This is a decent use for bottom-up on |
|
444 # Windows, which doesn't have a recursive delete command. The |
|
445 # (not so) subtlety is that rmdir will fail unless the dir's |
|
446 # kids are removed first, so bottom up is essential. |
|
447 for root, dirs, files in os.walk(test_support.TESTFN, topdown=False): |
|
448 for name in files: |
|
449 os.remove(os.path.join(root, name)) |
|
450 for name in dirs: |
|
451 dirname = os.path.join(root, name) |
|
452 if not os.path.islink(dirname): |
|
453 os.rmdir(dirname) |
|
454 else: |
|
455 os.remove(dirname) |
|
456 os.rmdir(test_support.TESTFN) |
|
457 |
|
458 class MakedirTests (unittest.TestCase): |
|
459 def setUp(self): |
|
460 os.mkdir(test_support.TESTFN) |
|
461 |
|
462 def test_makedir(self): |
|
463 base = test_support.TESTFN |
|
464 path = os.path.join(base, 'dir1', 'dir2', 'dir3') |
|
465 os.makedirs(path) # Should work |
|
466 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') |
|
467 os.makedirs(path) |
|
468 |
|
469 # Try paths with a '.' in them |
|
470 self.failUnlessRaises(OSError, os.makedirs, os.curdir) |
|
471 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) |
|
472 os.makedirs(path) |
|
473 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', |
|
474 'dir5', 'dir6') |
|
475 os.makedirs(path) |
|
476 |
|
477 |
|
478 |
|
479 |
|
480 def tearDown(self): |
|
481 path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3', |
|
482 'dir4', 'dir5', 'dir6') |
|
483 # If the tests failed, the bottom-most directory ('../dir6') |
|
484 # may not have been created, so we look for the outermost directory |
|
485 # that exists. |
|
486 while not os.path.exists(path) and path != test_support.TESTFN: |
|
487 path = os.path.dirname(path) |
|
488 |
|
489 os.removedirs(path) |
|
490 |
|
491 class DevNullTests (unittest.TestCase): |
|
492 def test_devnull(self): |
|
493 f = file(os.devnull, 'w') |
|
494 f.write('hello') |
|
495 f.close() |
|
496 f = file(os.devnull, 'r') |
|
497 self.assertEqual(f.read(), '') |
|
498 f.close() |
|
499 |
|
500 class URandomTests (unittest.TestCase): |
|
501 def test_urandom(self): |
|
502 try: |
|
503 self.assertEqual(len(os.urandom(1)), 1) |
|
504 self.assertEqual(len(os.urandom(10)), 10) |
|
505 self.assertEqual(len(os.urandom(100)), 100) |
|
506 self.assertEqual(len(os.urandom(1000)), 1000) |
|
507 # see http://bugs.python.org/issue3708 |
|
508 self.assertEqual(len(os.urandom(0.9)), 0) |
|
509 self.assertEqual(len(os.urandom(1.1)), 1) |
|
510 self.assertEqual(len(os.urandom(2.0)), 2) |
|
511 except NotImplementedError: |
|
512 pass |
|
513 |
|
514 class Win32ErrorTests(unittest.TestCase): |
|
515 def test_rename(self): |
|
516 self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak") |
|
517 |
|
518 def test_remove(self): |
|
519 self.assertRaises(WindowsError, os.remove, test_support.TESTFN) |
|
520 |
|
521 def test_chdir(self): |
|
522 self.assertRaises(WindowsError, os.chdir, test_support.TESTFN) |
|
523 |
|
524 def test_mkdir(self): |
|
525 self.assertRaises(WindowsError, os.chdir, test_support.TESTFN) |
|
526 |
|
527 def test_utime(self): |
|
528 self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None) |
|
529 |
|
530 def test_access(self): |
|
531 self.assertRaises(WindowsError, os.utime, test_support.TESTFN, 0) |
|
532 |
|
533 def test_chmod(self): |
|
534 self.assertRaises(WindowsError, os.utime, test_support.TESTFN, 0) |
|
535 |
|
536 if sys.platform != 'win32': |
|
537 class Win32ErrorTests(unittest.TestCase): |
|
538 pass |
|
539 |
|
540 def test_main(): |
|
541 test_support.run_unittest( |
|
542 FileTests, |
|
543 TemporaryFileTests, |
|
544 StatAttributeTests, |
|
545 EnvironTests, |
|
546 WalkTests, |
|
547 MakedirTests, |
|
548 DevNullTests, |
|
549 URandomTests, |
|
550 Win32ErrorTests |
|
551 ) |
|
552 |
|
553 if __name__ == "__main__": |
|
554 test_main() |