|
1 # -*- coding: iso-8859-15 -*- |
|
2 |
|
3 import sys |
|
4 import os |
|
5 import shutil |
|
6 import tempfile |
|
7 import StringIO |
|
8 from hashlib import md5 |
|
9 import errno |
|
10 |
|
11 import unittest |
|
12 import tarfile |
|
13 |
|
14 from test import test_support |
|
15 |
|
16 # Check for our compression modules. |
|
17 try: |
|
18 import gzip |
|
19 gzip.GzipFile |
|
20 except (ImportError, AttributeError): |
|
21 gzip = None |
|
22 try: |
|
23 import bz2 |
|
24 except ImportError: |
|
25 bz2 = None |
|
26 |
|
27 def md5sum(data): |
|
28 return md5(data).hexdigest() |
|
29 |
|
30 def path(path): |
|
31 return test_support.findfile(path) |
|
32 |
|
33 TEMPDIR = os.path.join(tempfile.gettempdir(), "test_tarfile_tmp") |
|
34 tarname = path("testtar.tar") |
|
35 gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") |
|
36 bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") |
|
37 tmpname = os.path.join(TEMPDIR, "tmp.tar") |
|
38 |
|
39 md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" |
|
40 md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" |
|
41 |
|
42 |
|
43 class ReadTest(unittest.TestCase): |
|
44 |
|
45 tarname = tarname |
|
46 mode = "r:" |
|
47 |
|
48 def setUp(self): |
|
49 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") |
|
50 |
|
51 def tearDown(self): |
|
52 self.tar.close() |
|
53 |
|
54 |
|
55 class UstarReadTest(ReadTest): |
|
56 |
|
57 def test_fileobj_regular_file(self): |
|
58 tarinfo = self.tar.getmember("ustar/regtype") |
|
59 fobj = self.tar.extractfile(tarinfo) |
|
60 data = fobj.read() |
|
61 self.assert_((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), |
|
62 "regular file extraction failed") |
|
63 |
|
64 def test_fileobj_readlines(self): |
|
65 self.tar.extract("ustar/regtype", TEMPDIR) |
|
66 tarinfo = self.tar.getmember("ustar/regtype") |
|
67 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") |
|
68 fobj2 = self.tar.extractfile(tarinfo) |
|
69 |
|
70 lines1 = fobj1.readlines() |
|
71 lines2 = fobj2.readlines() |
|
72 self.assert_(lines1 == lines2, |
|
73 "fileobj.readlines() failed") |
|
74 self.assert_(len(lines2) == 114, |
|
75 "fileobj.readlines() failed") |
|
76 self.assert_(lines2[83] == \ |
|
77 "I will gladly admit that Python is not the fastest running scripting language.\n", |
|
78 "fileobj.readlines() failed") |
|
79 |
|
80 def test_fileobj_iter(self): |
|
81 self.tar.extract("ustar/regtype", TEMPDIR) |
|
82 tarinfo = self.tar.getmember("ustar/regtype") |
|
83 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") |
|
84 fobj2 = self.tar.extractfile(tarinfo) |
|
85 lines1 = fobj1.readlines() |
|
86 lines2 = [line for line in fobj2] |
|
87 self.assert_(lines1 == lines2, |
|
88 "fileobj.__iter__() failed") |
|
89 |
|
90 def test_fileobj_seek(self): |
|
91 self.tar.extract("ustar/regtype", TEMPDIR) |
|
92 fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") |
|
93 data = fobj.read() |
|
94 fobj.close() |
|
95 |
|
96 tarinfo = self.tar.getmember("ustar/regtype") |
|
97 fobj = self.tar.extractfile(tarinfo) |
|
98 |
|
99 text = fobj.read() |
|
100 fobj.seek(0) |
|
101 self.assert_(0 == fobj.tell(), |
|
102 "seek() to file's start failed") |
|
103 fobj.seek(2048, 0) |
|
104 self.assert_(2048 == fobj.tell(), |
|
105 "seek() to absolute position failed") |
|
106 fobj.seek(-1024, 1) |
|
107 self.assert_(1024 == fobj.tell(), |
|
108 "seek() to negative relative position failed") |
|
109 fobj.seek(1024, 1) |
|
110 self.assert_(2048 == fobj.tell(), |
|
111 "seek() to positive relative position failed") |
|
112 s = fobj.read(10) |
|
113 self.assert_(s == data[2048:2058], |
|
114 "read() after seek failed") |
|
115 fobj.seek(0, 2) |
|
116 self.assert_(tarinfo.size == fobj.tell(), |
|
117 "seek() to file's end failed") |
|
118 self.assert_(fobj.read() == "", |
|
119 "read() at file's end did not return empty string") |
|
120 fobj.seek(-tarinfo.size, 2) |
|
121 self.assert_(0 == fobj.tell(), |
|
122 "relative seek() to file's start failed") |
|
123 fobj.seek(512) |
|
124 s1 = fobj.readlines() |
|
125 fobj.seek(512) |
|
126 s2 = fobj.readlines() |
|
127 self.assert_(s1 == s2, |
|
128 "readlines() after seek failed") |
|
129 fobj.seek(0) |
|
130 self.assert_(len(fobj.readline()) == fobj.tell(), |
|
131 "tell() after readline() failed") |
|
132 fobj.seek(512) |
|
133 self.assert_(len(fobj.readline()) + 512 == fobj.tell(), |
|
134 "tell() after seek() and readline() failed") |
|
135 fobj.seek(0) |
|
136 line = fobj.readline() |
|
137 self.assert_(fobj.read() == data[len(line):], |
|
138 "read() after readline() failed") |
|
139 fobj.close() |
|
140 |
|
141 |
|
142 class MiscReadTest(ReadTest): |
|
143 |
|
144 def test_no_name_argument(self): |
|
145 fobj = open(self.tarname, "rb") |
|
146 tar = tarfile.open(fileobj=fobj, mode=self.mode) |
|
147 self.assertEqual(tar.name, os.path.abspath(fobj.name)) |
|
148 |
|
149 def test_no_name_attribute(self): |
|
150 data = open(self.tarname, "rb").read() |
|
151 fobj = StringIO.StringIO(data) |
|
152 self.assertRaises(AttributeError, getattr, fobj, "name") |
|
153 tar = tarfile.open(fileobj=fobj, mode=self.mode) |
|
154 self.assertEqual(tar.name, None) |
|
155 |
|
156 def test_empty_name_attribute(self): |
|
157 data = open(self.tarname, "rb").read() |
|
158 fobj = StringIO.StringIO(data) |
|
159 fobj.name = "" |
|
160 tar = tarfile.open(fileobj=fobj, mode=self.mode) |
|
161 self.assertEqual(tar.name, None) |
|
162 |
|
163 def test_fileobj_with_offset(self): |
|
164 # Skip the first member and store values from the second member |
|
165 # of the testtar. |
|
166 tar = tarfile.open(self.tarname, mode=self.mode) |
|
167 tar.next() |
|
168 t = tar.next() |
|
169 name = t.name |
|
170 offset = t.offset |
|
171 data = tar.extractfile(t).read() |
|
172 tar.close() |
|
173 |
|
174 # Open the testtar and seek to the offset of the second member. |
|
175 if self.mode.endswith(":gz"): |
|
176 _open = gzip.GzipFile |
|
177 elif self.mode.endswith(":bz2"): |
|
178 _open = bz2.BZ2File |
|
179 else: |
|
180 _open = open |
|
181 fobj = _open(self.tarname, "rb") |
|
182 fobj.seek(offset) |
|
183 |
|
184 # Test if the tarfile starts with the second member. |
|
185 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) |
|
186 t = tar.next() |
|
187 self.assertEqual(t.name, name) |
|
188 # Read to the end of fileobj and test if seeking back to the |
|
189 # beginning works. |
|
190 tar.getmembers() |
|
191 self.assertEqual(tar.extractfile(t).read(), data, |
|
192 "seek back did not work") |
|
193 tar.close() |
|
194 |
|
195 def test_fail_comp(self): |
|
196 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. |
|
197 if self.mode == "r:": |
|
198 return |
|
199 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) |
|
200 fobj = open(tarname, "rb") |
|
201 self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode) |
|
202 |
|
203 def test_v7_dirtype(self): |
|
204 # Test old style dirtype member (bug #1336623): |
|
205 # Old V7 tars create directory members using an AREGTYPE |
|
206 # header with a "/" appended to the filename field. |
|
207 tarinfo = self.tar.getmember("misc/dirtype-old-v7") |
|
208 self.assert_(tarinfo.type == tarfile.DIRTYPE, |
|
209 "v7 dirtype failed") |
|
210 |
|
211 def test_xstar_type(self): |
|
212 # The xstar format stores extra atime and ctime fields inside the |
|
213 # space reserved for the prefix field. The prefix field must be |
|
214 # ignored in this case, otherwise it will mess up the name. |
|
215 try: |
|
216 self.tar.getmember("misc/regtype-xstar") |
|
217 except KeyError: |
|
218 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") |
|
219 |
|
220 def test_check_members(self): |
|
221 for tarinfo in self.tar: |
|
222 self.assert_(int(tarinfo.mtime) == 07606136617, |
|
223 "wrong mtime for %s" % tarinfo.name) |
|
224 if not tarinfo.name.startswith("ustar/"): |
|
225 continue |
|
226 self.assert_(tarinfo.uname == "tarfile", |
|
227 "wrong uname for %s" % tarinfo.name) |
|
228 |
|
229 def test_find_members(self): |
|
230 self.assert_(self.tar.getmembers()[-1].name == "misc/eof", |
|
231 "could not find all members") |
|
232 |
|
233 def test_extract_hardlink(self): |
|
234 # Test hardlink extraction (e.g. bug #857297). |
|
235 tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") |
|
236 |
|
237 tar.extract("ustar/regtype", TEMPDIR) |
|
238 try: |
|
239 tar.extract("ustar/lnktype", TEMPDIR) |
|
240 except EnvironmentError, e: |
|
241 if e.errno == errno.ENOENT: |
|
242 self.fail("hardlink not extracted properly") |
|
243 |
|
244 data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read() |
|
245 self.assertEqual(md5sum(data), md5_regtype) |
|
246 |
|
247 try: |
|
248 tar.extract("ustar/symtype", TEMPDIR) |
|
249 except EnvironmentError, e: |
|
250 if e.errno == errno.ENOENT: |
|
251 self.fail("symlink not extracted properly") |
|
252 |
|
253 data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read() |
|
254 self.assertEqual(md5sum(data), md5_regtype) |
|
255 |
|
256 def test_extractall(self): |
|
257 # Test if extractall() correctly restores directory permissions |
|
258 # and times (see issue1735). |
|
259 if sys.platform == "win32": |
|
260 # Win32 has no support for utime() on directories or |
|
261 # fine grained permissions. |
|
262 return |
|
263 |
|
264 tar = tarfile.open(tarname, encoding="iso8859-1") |
|
265 directories = [t for t in tar if t.isdir()] |
|
266 tar.extractall(TEMPDIR, directories) |
|
267 for tarinfo in directories: |
|
268 path = os.path.join(TEMPDIR, tarinfo.name) |
|
269 self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777) |
|
270 self.assertEqual(tarinfo.mtime, os.path.getmtime(path)) |
|
271 tar.close() |
|
272 |
|
273 |
|
274 class StreamReadTest(ReadTest): |
|
275 |
|
276 mode="r|" |
|
277 |
|
278 def test_fileobj_regular_file(self): |
|
279 tarinfo = self.tar.next() # get "regtype" (can't use getmember) |
|
280 fobj = self.tar.extractfile(tarinfo) |
|
281 data = fobj.read() |
|
282 self.assert_((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), |
|
283 "regular file extraction failed") |
|
284 |
|
285 def test_provoke_stream_error(self): |
|
286 tarinfos = self.tar.getmembers() |
|
287 f = self.tar.extractfile(tarinfos[0]) # read the first member |
|
288 self.assertRaises(tarfile.StreamError, f.read) |
|
289 |
|
290 def test_compare_members(self): |
|
291 tar1 = tarfile.open(tarname, encoding="iso8859-1") |
|
292 tar2 = self.tar |
|
293 |
|
294 while True: |
|
295 t1 = tar1.next() |
|
296 t2 = tar2.next() |
|
297 if t1 is None: |
|
298 break |
|
299 self.assert_(t2 is not None, "stream.next() failed.") |
|
300 |
|
301 if t2.islnk() or t2.issym(): |
|
302 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) |
|
303 continue |
|
304 |
|
305 v1 = tar1.extractfile(t1) |
|
306 v2 = tar2.extractfile(t2) |
|
307 if v1 is None: |
|
308 continue |
|
309 self.assert_(v2 is not None, "stream.extractfile() failed") |
|
310 self.assert_(v1.read() == v2.read(), "stream extraction failed") |
|
311 |
|
312 tar1.close() |
|
313 |
|
314 |
|
315 class DetectReadTest(unittest.TestCase): |
|
316 |
|
317 def _testfunc_file(self, name, mode): |
|
318 try: |
|
319 tarfile.open(name, mode) |
|
320 except tarfile.ReadError: |
|
321 self.fail() |
|
322 |
|
323 def _testfunc_fileobj(self, name, mode): |
|
324 try: |
|
325 tarfile.open(name, mode, fileobj=open(name, "rb")) |
|
326 except tarfile.ReadError: |
|
327 self.fail() |
|
328 |
|
329 def _test_modes(self, testfunc): |
|
330 testfunc(tarname, "r") |
|
331 testfunc(tarname, "r:") |
|
332 testfunc(tarname, "r:*") |
|
333 testfunc(tarname, "r|") |
|
334 testfunc(tarname, "r|*") |
|
335 |
|
336 if gzip: |
|
337 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") |
|
338 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") |
|
339 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") |
|
340 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") |
|
341 |
|
342 testfunc(gzipname, "r") |
|
343 testfunc(gzipname, "r:*") |
|
344 testfunc(gzipname, "r:gz") |
|
345 testfunc(gzipname, "r|*") |
|
346 testfunc(gzipname, "r|gz") |
|
347 |
|
348 if bz2: |
|
349 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") |
|
350 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") |
|
351 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") |
|
352 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") |
|
353 |
|
354 testfunc(bz2name, "r") |
|
355 testfunc(bz2name, "r:*") |
|
356 testfunc(bz2name, "r:bz2") |
|
357 testfunc(bz2name, "r|*") |
|
358 testfunc(bz2name, "r|bz2") |
|
359 |
|
360 def test_detect_file(self): |
|
361 self._test_modes(self._testfunc_file) |
|
362 |
|
363 def test_detect_fileobj(self): |
|
364 self._test_modes(self._testfunc_fileobj) |
|
365 |
|
366 |
|
367 class MemberReadTest(ReadTest): |
|
368 |
|
369 def _test_member(self, tarinfo, chksum=None, **kwargs): |
|
370 if chksum is not None: |
|
371 self.assert_(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, |
|
372 "wrong md5sum for %s" % tarinfo.name) |
|
373 |
|
374 kwargs["mtime"] = 07606136617 |
|
375 kwargs["uid"] = 1000 |
|
376 kwargs["gid"] = 100 |
|
377 if "old-v7" not in tarinfo.name: |
|
378 # V7 tar can't handle alphabetic owners. |
|
379 kwargs["uname"] = "tarfile" |
|
380 kwargs["gname"] = "tarfile" |
|
381 for k, v in kwargs.iteritems(): |
|
382 self.assert_(getattr(tarinfo, k) == v, |
|
383 "wrong value in %s field of %s" % (k, tarinfo.name)) |
|
384 |
|
385 def test_find_regtype(self): |
|
386 tarinfo = self.tar.getmember("ustar/regtype") |
|
387 self._test_member(tarinfo, size=7011, chksum=md5_regtype) |
|
388 |
|
389 def test_find_conttype(self): |
|
390 tarinfo = self.tar.getmember("ustar/conttype") |
|
391 self._test_member(tarinfo, size=7011, chksum=md5_regtype) |
|
392 |
|
393 def test_find_dirtype(self): |
|
394 tarinfo = self.tar.getmember("ustar/dirtype") |
|
395 self._test_member(tarinfo, size=0) |
|
396 |
|
397 def test_find_dirtype_with_size(self): |
|
398 tarinfo = self.tar.getmember("ustar/dirtype-with-size") |
|
399 self._test_member(tarinfo, size=255) |
|
400 |
|
401 def test_find_lnktype(self): |
|
402 tarinfo = self.tar.getmember("ustar/lnktype") |
|
403 self._test_member(tarinfo, size=0, linkname="ustar/regtype") |
|
404 |
|
405 def test_find_symtype(self): |
|
406 tarinfo = self.tar.getmember("ustar/symtype") |
|
407 self._test_member(tarinfo, size=0, linkname="regtype") |
|
408 |
|
409 def test_find_blktype(self): |
|
410 tarinfo = self.tar.getmember("ustar/blktype") |
|
411 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) |
|
412 |
|
413 def test_find_chrtype(self): |
|
414 tarinfo = self.tar.getmember("ustar/chrtype") |
|
415 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) |
|
416 |
|
417 def test_find_fifotype(self): |
|
418 tarinfo = self.tar.getmember("ustar/fifotype") |
|
419 self._test_member(tarinfo, size=0) |
|
420 |
|
421 def test_find_sparse(self): |
|
422 tarinfo = self.tar.getmember("ustar/sparse") |
|
423 self._test_member(tarinfo, size=86016, chksum=md5_sparse) |
|
424 |
|
425 def test_find_umlauts(self): |
|
426 tarinfo = self.tar.getmember("ustar/umlauts-ÄÖÜäöüß") |
|
427 self._test_member(tarinfo, size=7011, chksum=md5_regtype) |
|
428 |
|
429 def test_find_ustar_longname(self): |
|
430 name = "ustar/" + "12345/" * 39 + "1234567/longname" |
|
431 self.assert_(name in self.tar.getnames()) |
|
432 |
|
433 def test_find_regtype_oldv7(self): |
|
434 tarinfo = self.tar.getmember("misc/regtype-old-v7") |
|
435 self._test_member(tarinfo, size=7011, chksum=md5_regtype) |
|
436 |
|
437 def test_find_pax_umlauts(self): |
|
438 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") |
|
439 tarinfo = self.tar.getmember("pax/umlauts-ÄÖÜäöüß") |
|
440 self._test_member(tarinfo, size=7011, chksum=md5_regtype) |
|
441 |
|
442 |
|
443 class LongnameTest(ReadTest): |
|
444 |
|
445 def test_read_longname(self): |
|
446 # Test reading of longname (bug #1471427). |
|
447 longname = self.subdir + "/" + "123/" * 125 + "longname" |
|
448 try: |
|
449 tarinfo = self.tar.getmember(longname) |
|
450 except KeyError: |
|
451 self.fail("longname not found") |
|
452 self.assert_(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") |
|
453 |
|
454 def test_read_longlink(self): |
|
455 longname = self.subdir + "/" + "123/" * 125 + "longname" |
|
456 longlink = self.subdir + "/" + "123/" * 125 + "longlink" |
|
457 try: |
|
458 tarinfo = self.tar.getmember(longlink) |
|
459 except KeyError: |
|
460 self.fail("longlink not found") |
|
461 self.assert_(tarinfo.linkname == longname, "linkname wrong") |
|
462 |
|
463 def test_truncated_longname(self): |
|
464 longname = self.subdir + "/" + "123/" * 125 + "longname" |
|
465 tarinfo = self.tar.getmember(longname) |
|
466 offset = tarinfo.offset |
|
467 self.tar.fileobj.seek(offset) |
|
468 fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512)) |
|
469 self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) |
|
470 |
|
471 def test_header_offset(self): |
|
472 # Test if the start offset of the TarInfo object includes |
|
473 # the preceding extended header. |
|
474 longname = self.subdir + "/" + "123/" * 125 + "longname" |
|
475 offset = self.tar.getmember(longname).offset |
|
476 fobj = open(tarname) |
|
477 fobj.seek(offset) |
|
478 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512)) |
|
479 self.assertEqual(tarinfo.type, self.longnametype) |
|
480 |
|
481 |
|
482 class GNUReadTest(LongnameTest): |
|
483 |
|
484 subdir = "gnu" |
|
485 longnametype = tarfile.GNUTYPE_LONGNAME |
|
486 |
|
487 def test_sparse_file(self): |
|
488 tarinfo1 = self.tar.getmember("ustar/sparse") |
|
489 fobj1 = self.tar.extractfile(tarinfo1) |
|
490 tarinfo2 = self.tar.getmember("gnu/sparse") |
|
491 fobj2 = self.tar.extractfile(tarinfo2) |
|
492 self.assert_(fobj1.read() == fobj2.read(), |
|
493 "sparse file extraction failed") |
|
494 |
|
495 |
|
496 class PaxReadTest(LongnameTest): |
|
497 |
|
498 subdir = "pax" |
|
499 longnametype = tarfile.XHDTYPE |
|
500 |
|
501 def test_pax_global_headers(self): |
|
502 tar = tarfile.open(tarname, encoding="iso8859-1") |
|
503 |
|
504 tarinfo = tar.getmember("pax/regtype1") |
|
505 self.assertEqual(tarinfo.uname, "foo") |
|
506 self.assertEqual(tarinfo.gname, "bar") |
|
507 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"ÄÖÜäöüß") |
|
508 |
|
509 tarinfo = tar.getmember("pax/regtype2") |
|
510 self.assertEqual(tarinfo.uname, "") |
|
511 self.assertEqual(tarinfo.gname, "bar") |
|
512 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"ÄÖÜäöüß") |
|
513 |
|
514 tarinfo = tar.getmember("pax/regtype3") |
|
515 self.assertEqual(tarinfo.uname, "tarfile") |
|
516 self.assertEqual(tarinfo.gname, "tarfile") |
|
517 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"ÄÖÜäöüß") |
|
518 |
|
519 def test_pax_number_fields(self): |
|
520 # All following number fields are read from the pax header. |
|
521 tar = tarfile.open(tarname, encoding="iso8859-1") |
|
522 tarinfo = tar.getmember("pax/regtype4") |
|
523 self.assertEqual(tarinfo.size, 7011) |
|
524 self.assertEqual(tarinfo.uid, 123) |
|
525 self.assertEqual(tarinfo.gid, 123) |
|
526 self.assertEqual(tarinfo.mtime, 1041808783.0) |
|
527 self.assertEqual(type(tarinfo.mtime), float) |
|
528 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) |
|
529 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) |
|
530 |
|
531 |
|
532 class WriteTestBase(unittest.TestCase): |
|
533 # Put all write tests in here that are supposed to be tested |
|
534 # in all possible mode combinations. |
|
535 |
|
536 def test_fileobj_no_close(self): |
|
537 fobj = StringIO.StringIO() |
|
538 tar = tarfile.open(fileobj=fobj, mode=self.mode) |
|
539 tar.addfile(tarfile.TarInfo("foo")) |
|
540 tar.close() |
|
541 self.assert_(fobj.closed is False, "external fileobjs must never closed") |
|
542 |
|
543 |
|
544 class WriteTest(WriteTestBase): |
|
545 |
|
546 mode = "w:" |
|
547 |
|
548 def test_100_char_name(self): |
|
549 # The name field in a tar header stores strings of at most 100 chars. |
|
550 # If a string is shorter than 100 chars it has to be padded with '\0', |
|
551 # which implies that a string of exactly 100 chars is stored without |
|
552 # a trailing '\0'. |
|
553 name = "0123456789" * 10 |
|
554 tar = tarfile.open(tmpname, self.mode) |
|
555 t = tarfile.TarInfo(name) |
|
556 tar.addfile(t) |
|
557 tar.close() |
|
558 |
|
559 tar = tarfile.open(tmpname) |
|
560 self.assert_(tar.getnames()[0] == name, |
|
561 "failed to store 100 char filename") |
|
562 tar.close() |
|
563 |
|
564 def test_tar_size(self): |
|
565 # Test for bug #1013882. |
|
566 tar = tarfile.open(tmpname, self.mode) |
|
567 path = os.path.join(TEMPDIR, "file") |
|
568 fobj = open(path, "wb") |
|
569 fobj.write("aaa") |
|
570 fobj.close() |
|
571 tar.add(path) |
|
572 tar.close() |
|
573 self.assert_(os.path.getsize(tmpname) > 0, |
|
574 "tarfile is empty") |
|
575 |
|
576 # The test_*_size tests test for bug #1167128. |
|
577 def test_file_size(self): |
|
578 tar = tarfile.open(tmpname, self.mode) |
|
579 |
|
580 path = os.path.join(TEMPDIR, "file") |
|
581 fobj = open(path, "wb") |
|
582 fobj.close() |
|
583 tarinfo = tar.gettarinfo(path) |
|
584 self.assertEqual(tarinfo.size, 0) |
|
585 |
|
586 fobj = open(path, "wb") |
|
587 fobj.write("aaa") |
|
588 fobj.close() |
|
589 tarinfo = tar.gettarinfo(path) |
|
590 self.assertEqual(tarinfo.size, 3) |
|
591 |
|
592 tar.close() |
|
593 |
|
594 def test_directory_size(self): |
|
595 path = os.path.join(TEMPDIR, "directory") |
|
596 os.mkdir(path) |
|
597 try: |
|
598 tar = tarfile.open(tmpname, self.mode) |
|
599 tarinfo = tar.gettarinfo(path) |
|
600 self.assertEqual(tarinfo.size, 0) |
|
601 finally: |
|
602 os.rmdir(path) |
|
603 |
|
604 def test_link_size(self): |
|
605 if hasattr(os, "link"): |
|
606 link = os.path.join(TEMPDIR, "link") |
|
607 target = os.path.join(TEMPDIR, "link_target") |
|
608 open(target, "wb").close() |
|
609 os.link(target, link) |
|
610 try: |
|
611 tar = tarfile.open(tmpname, self.mode) |
|
612 tarinfo = tar.gettarinfo(link) |
|
613 self.assertEqual(tarinfo.size, 0) |
|
614 finally: |
|
615 os.remove(target) |
|
616 os.remove(link) |
|
617 |
|
618 def test_symlink_size(self): |
|
619 if hasattr(os, "symlink"): |
|
620 path = os.path.join(TEMPDIR, "symlink") |
|
621 os.symlink("link_target", path) |
|
622 try: |
|
623 tar = tarfile.open(tmpname, self.mode) |
|
624 tarinfo = tar.gettarinfo(path) |
|
625 self.assertEqual(tarinfo.size, 0) |
|
626 finally: |
|
627 os.remove(path) |
|
628 |
|
629 def test_add_self(self): |
|
630 # Test for #1257255. |
|
631 dstname = os.path.abspath(tmpname) |
|
632 |
|
633 tar = tarfile.open(tmpname, self.mode) |
|
634 self.assert_(tar.name == dstname, "archive name must be absolute") |
|
635 |
|
636 tar.add(dstname) |
|
637 self.assert_(tar.getnames() == [], "added the archive to itself") |
|
638 |
|
639 cwd = os.getcwd() |
|
640 os.chdir(TEMPDIR) |
|
641 tar.add(dstname) |
|
642 os.chdir(cwd) |
|
643 self.assert_(tar.getnames() == [], "added the archive to itself") |
|
644 |
|
645 def test_exclude(self): |
|
646 tempdir = os.path.join(TEMPDIR, "exclude") |
|
647 os.mkdir(tempdir) |
|
648 try: |
|
649 for name in ("foo", "bar", "baz"): |
|
650 name = os.path.join(tempdir, name) |
|
651 open(name, "wb").close() |
|
652 |
|
653 def exclude(name): |
|
654 return os.path.isfile(name) |
|
655 |
|
656 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") |
|
657 tar.add(tempdir, arcname="empty_dir", exclude=exclude) |
|
658 tar.close() |
|
659 |
|
660 tar = tarfile.open(tmpname, "r") |
|
661 self.assertEqual(len(tar.getmembers()), 1) |
|
662 self.assertEqual(tar.getnames()[0], "empty_dir") |
|
663 finally: |
|
664 shutil.rmtree(tempdir) |
|
665 |
|
666 |
|
667 class StreamWriteTest(WriteTestBase): |
|
668 |
|
669 mode = "w|" |
|
670 |
|
671 def test_stream_padding(self): |
|
672 # Test for bug #1543303. |
|
673 tar = tarfile.open(tmpname, self.mode) |
|
674 tar.close() |
|
675 |
|
676 if self.mode.endswith("gz"): |
|
677 fobj = gzip.GzipFile(tmpname) |
|
678 data = fobj.read() |
|
679 fobj.close() |
|
680 elif self.mode.endswith("bz2"): |
|
681 dec = bz2.BZ2Decompressor() |
|
682 data = open(tmpname, "rb").read() |
|
683 data = dec.decompress(data) |
|
684 self.assert_(len(dec.unused_data) == 0, |
|
685 "found trailing data") |
|
686 else: |
|
687 fobj = open(tmpname, "rb") |
|
688 data = fobj.read() |
|
689 fobj.close() |
|
690 |
|
691 self.assert_(data.count("\0") == tarfile.RECORDSIZE, |
|
692 "incorrect zero padding") |
|
693 |
|
694 |
|
695 class GNUWriteTest(unittest.TestCase): |
|
696 # This testcase checks for correct creation of GNU Longname |
|
697 # and Longlink extended headers (cp. bug #812325). |
|
698 |
|
699 def _length(self, s): |
|
700 blocks, remainder = divmod(len(s) + 1, 512) |
|
701 if remainder: |
|
702 blocks += 1 |
|
703 return blocks * 512 |
|
704 |
|
705 def _calc_size(self, name, link=None): |
|
706 # Initial tar header |
|
707 count = 512 |
|
708 |
|
709 if len(name) > tarfile.LENGTH_NAME: |
|
710 # GNU longname extended header + longname |
|
711 count += 512 |
|
712 count += self._length(name) |
|
713 if link is not None and len(link) > tarfile.LENGTH_LINK: |
|
714 # GNU longlink extended header + longlink |
|
715 count += 512 |
|
716 count += self._length(link) |
|
717 return count |
|
718 |
|
719 def _test(self, name, link=None): |
|
720 tarinfo = tarfile.TarInfo(name) |
|
721 if link: |
|
722 tarinfo.linkname = link |
|
723 tarinfo.type = tarfile.LNKTYPE |
|
724 |
|
725 tar = tarfile.open(tmpname, "w") |
|
726 tar.format = tarfile.GNU_FORMAT |
|
727 tar.addfile(tarinfo) |
|
728 |
|
729 v1 = self._calc_size(name, link) |
|
730 v2 = tar.offset |
|
731 self.assert_(v1 == v2, "GNU longname/longlink creation failed") |
|
732 |
|
733 tar.close() |
|
734 |
|
735 tar = tarfile.open(tmpname) |
|
736 member = tar.next() |
|
737 self.failIf(member is None, "unable to read longname member") |
|
738 self.assert_(tarinfo.name == member.name and \ |
|
739 tarinfo.linkname == member.linkname, \ |
|
740 "unable to read longname member") |
|
741 |
|
742 def test_longname_1023(self): |
|
743 self._test(("longnam/" * 127) + "longnam") |
|
744 |
|
745 def test_longname_1024(self): |
|
746 self._test(("longnam/" * 127) + "longname") |
|
747 |
|
748 def test_longname_1025(self): |
|
749 self._test(("longnam/" * 127) + "longname_") |
|
750 |
|
751 def test_longlink_1023(self): |
|
752 self._test("name", ("longlnk/" * 127) + "longlnk") |
|
753 |
|
754 def test_longlink_1024(self): |
|
755 self._test("name", ("longlnk/" * 127) + "longlink") |
|
756 |
|
757 def test_longlink_1025(self): |
|
758 self._test("name", ("longlnk/" * 127) + "longlink_") |
|
759 |
|
760 def test_longnamelink_1023(self): |
|
761 self._test(("longnam/" * 127) + "longnam", |
|
762 ("longlnk/" * 127) + "longlnk") |
|
763 |
|
764 def test_longnamelink_1024(self): |
|
765 self._test(("longnam/" * 127) + "longname", |
|
766 ("longlnk/" * 127) + "longlink") |
|
767 |
|
768 def test_longnamelink_1025(self): |
|
769 self._test(("longnam/" * 127) + "longname_", |
|
770 ("longlnk/" * 127) + "longlink_") |
|
771 |
|
772 |
|
773 class HardlinkTest(unittest.TestCase): |
|
774 # Test the creation of LNKTYPE (hardlink) members in an archive. |
|
775 |
|
776 def setUp(self): |
|
777 self.foo = os.path.join(TEMPDIR, "foo") |
|
778 self.bar = os.path.join(TEMPDIR, "bar") |
|
779 |
|
780 fobj = open(self.foo, "wb") |
|
781 fobj.write("foo") |
|
782 fobj.close() |
|
783 |
|
784 os.link(self.foo, self.bar) |
|
785 |
|
786 self.tar = tarfile.open(tmpname, "w") |
|
787 self.tar.add(self.foo) |
|
788 |
|
789 def tearDown(self): |
|
790 self.tar.close() |
|
791 os.remove(self.foo) |
|
792 os.remove(self.bar) |
|
793 |
|
794 def test_add_twice(self): |
|
795 # The same name will be added as a REGTYPE every |
|
796 # time regardless of st_nlink. |
|
797 tarinfo = self.tar.gettarinfo(self.foo) |
|
798 self.assert_(tarinfo.type == tarfile.REGTYPE, |
|
799 "add file as regular failed") |
|
800 |
|
801 def test_add_hardlink(self): |
|
802 tarinfo = self.tar.gettarinfo(self.bar) |
|
803 self.assert_(tarinfo.type == tarfile.LNKTYPE, |
|
804 "add file as hardlink failed") |
|
805 |
|
806 def test_dereference_hardlink(self): |
|
807 self.tar.dereference = True |
|
808 tarinfo = self.tar.gettarinfo(self.bar) |
|
809 self.assert_(tarinfo.type == tarfile.REGTYPE, |
|
810 "dereferencing hardlink failed") |
|
811 |
|
812 |
|
813 class PaxWriteTest(GNUWriteTest): |
|
814 |
|
815 def _test(self, name, link=None): |
|
816 # See GNUWriteTest. |
|
817 tarinfo = tarfile.TarInfo(name) |
|
818 if link: |
|
819 tarinfo.linkname = link |
|
820 tarinfo.type = tarfile.LNKTYPE |
|
821 |
|
822 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) |
|
823 tar.addfile(tarinfo) |
|
824 tar.close() |
|
825 |
|
826 tar = tarfile.open(tmpname) |
|
827 if link: |
|
828 l = tar.getmembers()[0].linkname |
|
829 self.assert_(link == l, "PAX longlink creation failed") |
|
830 else: |
|
831 n = tar.getmembers()[0].name |
|
832 self.assert_(name == n, "PAX longname creation failed") |
|
833 |
|
834 def test_pax_global_header(self): |
|
835 pax_headers = { |
|
836 u"foo": u"bar", |
|
837 u"uid": u"0", |
|
838 u"mtime": u"1.23", |
|
839 u"test": u"äöü", |
|
840 u"äöü": u"test"} |
|
841 |
|
842 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, \ |
|
843 pax_headers=pax_headers) |
|
844 tar.addfile(tarfile.TarInfo("test")) |
|
845 tar.close() |
|
846 |
|
847 # Test if the global header was written correctly. |
|
848 tar = tarfile.open(tmpname, encoding="iso8859-1") |
|
849 self.assertEqual(tar.pax_headers, pax_headers) |
|
850 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) |
|
851 |
|
852 # Test if all the fields are unicode. |
|
853 for key, val in tar.pax_headers.iteritems(): |
|
854 self.assert_(type(key) is unicode) |
|
855 self.assert_(type(val) is unicode) |
|
856 if key in tarfile.PAX_NUMBER_FIELDS: |
|
857 try: |
|
858 tarfile.PAX_NUMBER_FIELDS[key](val) |
|
859 except (TypeError, ValueError): |
|
860 self.fail("unable to convert pax header field") |
|
861 |
|
862 def test_pax_extended_header(self): |
|
863 # The fields from the pax header have priority over the |
|
864 # TarInfo. |
|
865 pax_headers = {u"path": u"foo", u"uid": u"123"} |
|
866 |
|
867 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") |
|
868 t = tarfile.TarInfo() |
|
869 t.name = u"äöü" # non-ASCII |
|
870 t.uid = 8**8 # too large |
|
871 t.pax_headers = pax_headers |
|
872 tar.addfile(t) |
|
873 tar.close() |
|
874 |
|
875 tar = tarfile.open(tmpname, encoding="iso8859-1") |
|
876 t = tar.getmembers()[0] |
|
877 self.assertEqual(t.pax_headers, pax_headers) |
|
878 self.assertEqual(t.name, "foo") |
|
879 self.assertEqual(t.uid, 123) |
|
880 |
|
881 |
|
882 class UstarUnicodeTest(unittest.TestCase): |
|
883 # All *UnicodeTests FIXME |
|
884 |
|
885 format = tarfile.USTAR_FORMAT |
|
886 |
|
887 def test_iso8859_1_filename(self): |
|
888 self._test_unicode_filename("iso8859-1") |
|
889 |
|
890 def test_utf7_filename(self): |
|
891 self._test_unicode_filename("utf7") |
|
892 |
|
893 def test_utf8_filename(self): |
|
894 self._test_unicode_filename("utf8") |
|
895 |
|
896 def _test_unicode_filename(self, encoding): |
|
897 tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") |
|
898 name = u"äöü" |
|
899 tar.addfile(tarfile.TarInfo(name)) |
|
900 tar.close() |
|
901 |
|
902 tar = tarfile.open(tmpname, encoding=encoding) |
|
903 self.assert_(type(tar.getnames()[0]) is not unicode) |
|
904 self.assertEqual(tar.getmembers()[0].name, name.encode(encoding)) |
|
905 tar.close() |
|
906 |
|
907 def test_unicode_filename_error(self): |
|
908 tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") |
|
909 tarinfo = tarfile.TarInfo() |
|
910 |
|
911 tarinfo.name = "äöü" |
|
912 if self.format == tarfile.PAX_FORMAT: |
|
913 self.assertRaises(UnicodeError, tar.addfile, tarinfo) |
|
914 else: |
|
915 tar.addfile(tarinfo) |
|
916 |
|
917 tarinfo.name = u"äöü" |
|
918 self.assertRaises(UnicodeError, tar.addfile, tarinfo) |
|
919 |
|
920 tarinfo.name = "foo" |
|
921 tarinfo.uname = u"äöü" |
|
922 self.assertRaises(UnicodeError, tar.addfile, tarinfo) |
|
923 |
|
924 def test_unicode_argument(self): |
|
925 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") |
|
926 for t in tar: |
|
927 self.assert_(type(t.name) is str) |
|
928 self.assert_(type(t.linkname) is str) |
|
929 self.assert_(type(t.uname) is str) |
|
930 self.assert_(type(t.gname) is str) |
|
931 tar.close() |
|
932 |
|
933 def test_uname_unicode(self): |
|
934 for name in (u"äöü", "äöü"): |
|
935 t = tarfile.TarInfo("foo") |
|
936 t.uname = name |
|
937 t.gname = name |
|
938 |
|
939 fobj = StringIO.StringIO() |
|
940 tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1") |
|
941 tar.addfile(t) |
|
942 tar.close() |
|
943 fobj.seek(0) |
|
944 |
|
945 tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1") |
|
946 t = tar.getmember("foo") |
|
947 self.assertEqual(t.uname, "äöü") |
|
948 self.assertEqual(t.gname, "äöü") |
|
949 |
|
950 |
|
951 class GNUUnicodeTest(UstarUnicodeTest): |
|
952 |
|
953 format = tarfile.GNU_FORMAT |
|
954 |
|
955 |
|
956 class PaxUnicodeTest(UstarUnicodeTest): |
|
957 |
|
958 format = tarfile.PAX_FORMAT |
|
959 |
|
960 def _create_unicode_name(self, name): |
|
961 tar = tarfile.open(tmpname, "w", format=self.format) |
|
962 t = tarfile.TarInfo() |
|
963 t.pax_headers["path"] = name |
|
964 tar.addfile(t) |
|
965 tar.close() |
|
966 |
|
967 def test_error_handlers(self): |
|
968 # Test if the unicode error handlers work correctly for characters |
|
969 # that cannot be expressed in a given encoding. |
|
970 self._create_unicode_name(u"äöü") |
|
971 |
|
972 for handler, name in (("utf-8", u"äöü".encode("utf8")), |
|
973 ("replace", "???"), ("ignore", "")): |
|
974 tar = tarfile.open(tmpname, format=self.format, encoding="ascii", |
|
975 errors=handler) |
|
976 self.assertEqual(tar.getnames()[0], name) |
|
977 |
|
978 self.assertRaises(UnicodeError, tarfile.open, tmpname, |
|
979 encoding="ascii", errors="strict") |
|
980 |
|
981 def test_error_handler_utf8(self): |
|
982 # Create a pathname that has one component representable using |
|
983 # iso8859-1 and the other only in iso8859-15. |
|
984 self._create_unicode_name(u"äöü/¤") |
|
985 |
|
986 tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1", |
|
987 errors="utf-8") |
|
988 self.assertEqual(tar.getnames()[0], "äöü/" + u"¤".encode("utf8")) |
|
989 |
|
990 |
|
991 class AppendTest(unittest.TestCase): |
|
992 # Test append mode (cp. patch #1652681). |
|
993 |
|
994 def setUp(self): |
|
995 self.tarname = tmpname |
|
996 if os.path.exists(self.tarname): |
|
997 os.remove(self.tarname) |
|
998 |
|
999 def _add_testfile(self, fileobj=None): |
|
1000 tar = tarfile.open(self.tarname, "a", fileobj=fileobj) |
|
1001 tar.addfile(tarfile.TarInfo("bar")) |
|
1002 tar.close() |
|
1003 |
|
1004 def _create_testtar(self, mode="w:"): |
|
1005 src = tarfile.open(tarname, encoding="iso8859-1") |
|
1006 t = src.getmember("ustar/regtype") |
|
1007 t.name = "foo" |
|
1008 f = src.extractfile(t) |
|
1009 tar = tarfile.open(self.tarname, mode) |
|
1010 tar.addfile(t, f) |
|
1011 tar.close() |
|
1012 |
|
1013 def _test(self, names=["bar"], fileobj=None): |
|
1014 tar = tarfile.open(self.tarname, fileobj=fileobj) |
|
1015 self.assertEqual(tar.getnames(), names) |
|
1016 |
|
1017 def test_non_existing(self): |
|
1018 self._add_testfile() |
|
1019 self._test() |
|
1020 |
|
1021 def test_empty(self): |
|
1022 open(self.tarname, "w").close() |
|
1023 self._add_testfile() |
|
1024 self._test() |
|
1025 |
|
1026 def test_empty_fileobj(self): |
|
1027 fobj = StringIO.StringIO() |
|
1028 self._add_testfile(fobj) |
|
1029 fobj.seek(0) |
|
1030 self._test(fileobj=fobj) |
|
1031 |
|
1032 def test_fileobj(self): |
|
1033 self._create_testtar() |
|
1034 data = open(self.tarname).read() |
|
1035 fobj = StringIO.StringIO(data) |
|
1036 self._add_testfile(fobj) |
|
1037 fobj.seek(0) |
|
1038 self._test(names=["foo", "bar"], fileobj=fobj) |
|
1039 |
|
1040 def test_existing(self): |
|
1041 self._create_testtar() |
|
1042 self._add_testfile() |
|
1043 self._test(names=["foo", "bar"]) |
|
1044 |
|
1045 def test_append_gz(self): |
|
1046 if gzip is None: |
|
1047 return |
|
1048 self._create_testtar("w:gz") |
|
1049 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") |
|
1050 |
|
1051 def test_append_bz2(self): |
|
1052 if bz2 is None: |
|
1053 return |
|
1054 self._create_testtar("w:bz2") |
|
1055 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") |
|
1056 |
|
1057 |
|
1058 class LimitsTest(unittest.TestCase): |
|
1059 |
|
1060 def test_ustar_limits(self): |
|
1061 # 100 char name |
|
1062 tarinfo = tarfile.TarInfo("0123456789" * 10) |
|
1063 tarinfo.tobuf(tarfile.USTAR_FORMAT) |
|
1064 |
|
1065 # 101 char name that cannot be stored |
|
1066 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") |
|
1067 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) |
|
1068 |
|
1069 # 256 char name with a slash at pos 156 |
|
1070 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") |
|
1071 tarinfo.tobuf(tarfile.USTAR_FORMAT) |
|
1072 |
|
1073 # 256 char name that cannot be stored |
|
1074 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") |
|
1075 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) |
|
1076 |
|
1077 # 512 char name |
|
1078 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") |
|
1079 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) |
|
1080 |
|
1081 # 512 char linkname |
|
1082 tarinfo = tarfile.TarInfo("longlink") |
|
1083 tarinfo.linkname = "123/" * 126 + "longname" |
|
1084 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) |
|
1085 |
|
1086 # uid > 8 digits |
|
1087 tarinfo = tarfile.TarInfo("name") |
|
1088 tarinfo.uid = 010000000 |
|
1089 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) |
|
1090 |
|
1091 def test_gnu_limits(self): |
|
1092 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") |
|
1093 tarinfo.tobuf(tarfile.GNU_FORMAT) |
|
1094 |
|
1095 tarinfo = tarfile.TarInfo("longlink") |
|
1096 tarinfo.linkname = "123/" * 126 + "longname" |
|
1097 tarinfo.tobuf(tarfile.GNU_FORMAT) |
|
1098 |
|
1099 # uid >= 256 ** 7 |
|
1100 tarinfo = tarfile.TarInfo("name") |
|
1101 tarinfo.uid = 04000000000000000000L |
|
1102 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) |
|
1103 |
|
1104 def test_pax_limits(self): |
|
1105 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") |
|
1106 tarinfo.tobuf(tarfile.PAX_FORMAT) |
|
1107 |
|
1108 tarinfo = tarfile.TarInfo("longlink") |
|
1109 tarinfo.linkname = "123/" * 126 + "longname" |
|
1110 tarinfo.tobuf(tarfile.PAX_FORMAT) |
|
1111 |
|
1112 tarinfo = tarfile.TarInfo("name") |
|
1113 tarinfo.uid = 04000000000000000000L |
|
1114 tarinfo.tobuf(tarfile.PAX_FORMAT) |
|
1115 |
|
1116 |
|
1117 class GzipMiscReadTest(MiscReadTest): |
|
1118 tarname = gzipname |
|
1119 mode = "r:gz" |
|
1120 class GzipUstarReadTest(UstarReadTest): |
|
1121 tarname = gzipname |
|
1122 mode = "r:gz" |
|
1123 class GzipStreamReadTest(StreamReadTest): |
|
1124 tarname = gzipname |
|
1125 mode = "r|gz" |
|
1126 class GzipWriteTest(WriteTest): |
|
1127 mode = "w:gz" |
|
1128 class GzipStreamWriteTest(StreamWriteTest): |
|
1129 mode = "w|gz" |
|
1130 |
|
1131 |
|
1132 class Bz2MiscReadTest(MiscReadTest): |
|
1133 tarname = bz2name |
|
1134 mode = "r:bz2" |
|
1135 class Bz2UstarReadTest(UstarReadTest): |
|
1136 tarname = bz2name |
|
1137 mode = "r:bz2" |
|
1138 class Bz2StreamReadTest(StreamReadTest): |
|
1139 tarname = bz2name |
|
1140 mode = "r|bz2" |
|
1141 class Bz2WriteTest(WriteTest): |
|
1142 mode = "w:bz2" |
|
1143 class Bz2StreamWriteTest(StreamWriteTest): |
|
1144 mode = "w|bz2" |
|
1145 |
|
1146 def test_main(): |
|
1147 if not os.path.exists(TEMPDIR): |
|
1148 os.mkdir(TEMPDIR) |
|
1149 |
|
1150 tests = [ |
|
1151 UstarReadTest, |
|
1152 MiscReadTest, |
|
1153 StreamReadTest, |
|
1154 DetectReadTest, |
|
1155 MemberReadTest, |
|
1156 GNUReadTest, |
|
1157 PaxReadTest, |
|
1158 WriteTest, |
|
1159 StreamWriteTest, |
|
1160 GNUWriteTest, |
|
1161 PaxWriteTest, |
|
1162 UstarUnicodeTest, |
|
1163 GNUUnicodeTest, |
|
1164 PaxUnicodeTest, |
|
1165 AppendTest, |
|
1166 LimitsTest, |
|
1167 ] |
|
1168 |
|
1169 if hasattr(os, "link"): |
|
1170 tests.append(HardlinkTest) |
|
1171 |
|
1172 fobj = open(tarname, "rb") |
|
1173 data = fobj.read() |
|
1174 fobj.close() |
|
1175 |
|
1176 if gzip: |
|
1177 # Create testtar.tar.gz and add gzip-specific tests. |
|
1178 tar = gzip.open(gzipname, "wb") |
|
1179 tar.write(data) |
|
1180 tar.close() |
|
1181 |
|
1182 tests += [ |
|
1183 GzipMiscReadTest, |
|
1184 GzipUstarReadTest, |
|
1185 GzipStreamReadTest, |
|
1186 GzipWriteTest, |
|
1187 GzipStreamWriteTest, |
|
1188 ] |
|
1189 |
|
1190 if bz2: |
|
1191 # Create testtar.tar.bz2 and add bz2-specific tests. |
|
1192 tar = bz2.BZ2File(bz2name, "wb") |
|
1193 tar.write(data) |
|
1194 tar.close() |
|
1195 |
|
1196 tests += [ |
|
1197 Bz2MiscReadTest, |
|
1198 Bz2UstarReadTest, |
|
1199 Bz2StreamReadTest, |
|
1200 Bz2WriteTest, |
|
1201 Bz2StreamWriteTest, |
|
1202 ] |
|
1203 |
|
1204 try: |
|
1205 test_support.run_unittest(*tests) |
|
1206 finally: |
|
1207 if os.path.exists(TEMPDIR): |
|
1208 shutil.rmtree(TEMPDIR) |
|
1209 |
|
1210 if __name__ == "__main__": |
|
1211 test_main() |