|
1 """Test largefile support on system where this makes sense. |
|
2 """ |
|
3 |
|
4 import os |
|
5 import stat |
|
6 import sys |
|
7 import unittest |
|
8 from test.test_support import run_unittest, TESTFN, verbose, requires, \ |
|
9 TestSkipped, unlink |
|
10 |
|
11 try: |
|
12 import signal |
|
13 # The default handler for SIGXFSZ is to abort the process. |
|
14 # By ignoring it, system calls exceeding the file size resource |
|
15 # limit will raise IOError instead of crashing the interpreter. |
|
16 oldhandler = signal.signal(signal.SIGXFSZ, signal.SIG_IGN) |
|
17 except (ImportError, AttributeError): |
|
18 pass |
|
19 |
|
20 # create >2GB file (2GB = 2147483648 bytes) |
|
21 size = 2500000000L |
|
22 |
|
23 |
|
24 class TestCase(unittest.TestCase): |
|
25 """Test that each file function works as expected for a large |
|
26 (i.e. > 2GB, do we have to check > 4GB) files. |
|
27 |
|
28 NOTE: the order of execution of the test methods is important! test_seek |
|
29 must run first to create the test file. File cleanup must also be handled |
|
30 outside the test instances because of this. |
|
31 |
|
32 """ |
|
33 |
|
34 def test_seek(self): |
|
35 if verbose: |
|
36 print 'create large file via seek (may be sparse file) ...' |
|
37 with open(TESTFN, 'wb') as f: |
|
38 f.write('z') |
|
39 f.seek(0) |
|
40 f.seek(size) |
|
41 f.write('a') |
|
42 f.flush() |
|
43 if verbose: |
|
44 print 'check file size with os.fstat' |
|
45 self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1) |
|
46 |
|
47 def test_osstat(self): |
|
48 if verbose: |
|
49 print 'check file size with os.stat' |
|
50 self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1) |
|
51 |
|
52 def test_seek_read(self): |
|
53 if verbose: |
|
54 print 'play around with seek() and read() with the built largefile' |
|
55 with open(TESTFN, 'rb') as f: |
|
56 self.assertEqual(f.tell(), 0) |
|
57 self.assertEqual(f.read(1), 'z') |
|
58 self.assertEqual(f.tell(), 1) |
|
59 f.seek(0) |
|
60 self.assertEqual(f.tell(), 0) |
|
61 f.seek(0, 0) |
|
62 self.assertEqual(f.tell(), 0) |
|
63 f.seek(42) |
|
64 self.assertEqual(f.tell(), 42) |
|
65 f.seek(42, 0) |
|
66 self.assertEqual(f.tell(), 42) |
|
67 f.seek(42, 1) |
|
68 self.assertEqual(f.tell(), 84) |
|
69 f.seek(0, 1) |
|
70 self.assertEqual(f.tell(), 84) |
|
71 f.seek(0, 2) # seek from the end |
|
72 self.assertEqual(f.tell(), size + 1 + 0) |
|
73 f.seek(-10, 2) |
|
74 self.assertEqual(f.tell(), size + 1 - 10) |
|
75 f.seek(-size-1, 2) |
|
76 self.assertEqual(f.tell(), 0) |
|
77 f.seek(size) |
|
78 self.assertEqual(f.tell(), size) |
|
79 # the 'a' that was written at the end of file above |
|
80 self.assertEqual(f.read(1), 'a') |
|
81 f.seek(-size-1, 1) |
|
82 self.assertEqual(f.read(1), 'z') |
|
83 self.assertEqual(f.tell(), 1) |
|
84 |
|
85 def test_lseek(self): |
|
86 if verbose: |
|
87 print 'play around with os.lseek() with the built largefile' |
|
88 with open(TESTFN, 'rb') as f: |
|
89 self.assertEqual(os.lseek(f.fileno(), 0, 0), 0) |
|
90 self.assertEqual(os.lseek(f.fileno(), 42, 0), 42) |
|
91 self.assertEqual(os.lseek(f.fileno(), 42, 1), 84) |
|
92 self.assertEqual(os.lseek(f.fileno(), 0, 1), 84) |
|
93 self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0) |
|
94 self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10) |
|
95 self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0) |
|
96 self.assertEqual(os.lseek(f.fileno(), size, 0), size) |
|
97 # the 'a' that was written at the end of file above |
|
98 self.assertEqual(f.read(1), 'a') |
|
99 |
|
100 def test_truncate(self): |
|
101 if verbose: |
|
102 print 'try truncate' |
|
103 with open(TESTFN, 'r+b') as f: |
|
104 # this is already decided before start running the test suite |
|
105 # but we do it anyway for extra protection |
|
106 if not hasattr(f, 'truncate'): |
|
107 raise TestSkipped, "open().truncate() not available on this system" |
|
108 f.seek(0, 2) |
|
109 # else we've lost track of the true size |
|
110 self.assertEqual(f.tell(), size+1) |
|
111 # Cut it back via seek + truncate with no argument. |
|
112 newsize = size - 10 |
|
113 f.seek(newsize) |
|
114 f.truncate() |
|
115 self.assertEqual(f.tell(), newsize) # else pointer moved |
|
116 f.seek(0, 2) |
|
117 self.assertEqual(f.tell(), newsize) # else wasn't truncated |
|
118 # Ensure that truncate(smaller than true size) shrinks |
|
119 # the file. |
|
120 newsize -= 1 |
|
121 f.seek(42) |
|
122 f.truncate(newsize) |
|
123 self.assertEqual(f.tell(), 42) # else pointer moved |
|
124 f.seek(0, 2) |
|
125 self.assertEqual(f.tell(), newsize) # else wasn't truncated |
|
126 |
|
127 # XXX truncate(larger than true size) is ill-defined |
|
128 # across platform; cut it waaaaay back |
|
129 f.seek(0) |
|
130 f.truncate(1) |
|
131 self.assertEqual(f.tell(), 0) # else pointer moved |
|
132 self.assertEqual(len(f.read()), 1) # else wasn't truncated |
|
133 |
|
134 |
|
135 def test_main(): |
|
136 # On Windows and Mac OSX this test comsumes large resources; It |
|
137 # takes a long time to build the >2GB file and takes >2GB of disk |
|
138 # space therefore the resource must be enabled to run this test. |
|
139 # If not, nothing after this line stanza will be executed. |
|
140 if sys.platform[:3] == 'win' or sys.platform == 'darwin': |
|
141 requires('largefile', |
|
142 'test requires %s bytes and a long time to run' % str(size)) |
|
143 else: |
|
144 # Only run if the current filesystem supports large files. |
|
145 # (Skip this test on Windows, since we now always support |
|
146 # large files.) |
|
147 f = open(TESTFN, 'wb') |
|
148 try: |
|
149 # 2**31 == 2147483648 |
|
150 f.seek(2147483649L) |
|
151 # Seeking is not enough of a test: you must write and |
|
152 # flush, too! |
|
153 f.write("x") |
|
154 f.flush() |
|
155 except (IOError, OverflowError): |
|
156 f.close() |
|
157 unlink(TESTFN) |
|
158 raise TestSkipped, "filesystem does not have largefile support" |
|
159 else: |
|
160 f.close() |
|
161 suite = unittest.TestSuite() |
|
162 suite.addTest(TestCase('test_seek')) |
|
163 suite.addTest(TestCase('test_osstat')) |
|
164 suite.addTest(TestCase('test_seek_read')) |
|
165 suite.addTest(TestCase('test_lseek')) |
|
166 with open(TESTFN, 'w') as f: |
|
167 if hasattr(f, 'truncate'): |
|
168 suite.addTest(TestCase('test_truncate')) |
|
169 unlink(TESTFN) |
|
170 try: |
|
171 run_unittest(suite) |
|
172 finally: |
|
173 unlink(TESTFN) |
|
174 |
|
175 |
|
176 if __name__ == '__main__': |
|
177 test_main() |