|
1 # |
|
2 # Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). |
|
3 # All rights reserved. |
|
4 # This component and the accompanying materials are made available |
|
5 # under the terms of "Eclipse Public License v1.0" |
|
6 # which accompanies this distribution, and is available |
|
7 # at the URL "http://www.eclipse.org/legal/epl-v10.html". |
|
8 # |
|
9 # Initial Contributors: |
|
10 # Nokia Corporation - initial contribution. |
|
11 # |
|
12 # Contributors: |
|
13 # |
|
14 # Description: |
|
15 # |
|
16 |
|
17 |
|
18 """ |
|
19 Test the CPF root file parsing routines |
|
20 """ |
|
21 |
|
22 import zipfile |
|
23 import unittest |
|
24 import string |
|
25 import sys,os,shutil |
|
26 |
|
27 ROOT_PATH = os.path.dirname(os.path.abspath(__file__)) |
|
28 |
|
29 import __init__ |
|
30 from cone.public.exceptions import * |
|
31 from cone.public.api import Resource |
|
32 from cone.storage import zipstorage |
|
33 |
|
34 TEMP_DIR = os.path.join(ROOT_PATH, 'temp') |
|
35 tempzip = os.path.join(ROOT_PATH,"zipres_test.zip") |
|
36 |
|
37 |
|
38 class TestFileResource(unittest.TestCase): |
|
39 |
|
40 def _prepare_tempzip(self, filename): |
|
41 """ |
|
42 Prepare a temporary ZIP file with the given name. |
|
43 @return: Absolute path to the newly copied ZIP file. |
|
44 """ |
|
45 new_zip = os.path.join(TEMP_DIR, '%s' % filename) |
|
46 |
|
47 if os.path.exists(new_zip): |
|
48 os.unlink(new_zip) |
|
49 |
|
50 if not os.path.exists(TEMP_DIR): |
|
51 os.makedirs(TEMP_DIR) |
|
52 |
|
53 shutil.copyfile(tempzip, new_zip) |
|
54 return new_zip |
|
55 |
|
56 def test_open_and_write_resource(self): |
|
57 ZIPFILE = self._prepare_tempzip('test1.zip') |
|
58 store = zipstorage.ZipStorage(ZIPFILE,"a") |
|
59 res = store.open_resource("temp/testwrite.txt","w") |
|
60 testdata = "Testing writing func!" |
|
61 res.write(testdata) |
|
62 res.close() |
|
63 store.close() |
|
64 zf = zipfile.ZipFile(ZIPFILE,"r") |
|
65 data = zf.read("temp/testwrite.txt") |
|
66 self.assertEquals(data,testdata) |
|
67 |
|
68 def test_open_and_read_resource(self): |
|
69 ZIPFILE = self._prepare_tempzip('test2.zip') |
|
70 store = zipstorage.ZipStorage(ZIPFILE,"a") |
|
71 res = store.open_resource("temp/testread.txt","r") |
|
72 resdata = res.read() |
|
73 res.close() |
|
74 store.close() |
|
75 self.assertTrue(resdata.startswith("Hello")) |
|
76 |
|
77 def test_get_size(self): |
|
78 ZIPFILE = self._prepare_tempzip('test_getsize.zip') |
|
79 store = zipstorage.ZipStorage(ZIPFILE, "r") |
|
80 res = store.open_resource("temp/testread.txt","r") |
|
81 self.assertEquals(res.get_size(), 28) |
|
82 # Try a second time just in case |
|
83 self.assertEquals(res.get_size(), 28) |
|
84 res.close() |
|
85 store.close() |
|
86 |
|
87 def test_get_size_largefile(self): |
|
88 ZIPFILE = self._prepare_tempzip('test_getsize_largefile.zip') |
|
89 store = zipstorage.ZipStorage(ZIPFILE, "r") |
|
90 res = store.open_resource("largefile.bin","r") |
|
91 self.assertEquals(res.get_size(), 25000) |
|
92 # Try a second time just in case |
|
93 self.assertEquals(res.get_size(), 25000) |
|
94 res.close() |
|
95 store.close() |
|
96 |
|
97 def test_get_size_fails_in_write_mode(self): |
|
98 ZIPFILE = os.path.join(TEMP_DIR, 'getsize_fails_in_write_mode.zip') |
|
99 store = zipstorage.ZipStorage(ZIPFILE, "w") |
|
100 res = store.open_resource("test_getsize.txt", "w") |
|
101 res.write("Writing foobar") |
|
102 self.assertRaises(StorageException, res.get_size) |
|
103 res.close() |
|
104 store.close() |
|
105 |
|
106 def test_get_content_info_and_read_data(self): |
|
107 ZIPFILE = self._prepare_tempzip('test2.zip') |
|
108 store = zipstorage.ZipStorage(ZIPFILE,"a") |
|
109 res = store.open_resource("temp/testread.txt","r") |
|
110 ci = res.get_content_info() |
|
111 self.assertEquals('text/plain', ci.content_type) |
|
112 resdata = res.read() |
|
113 self.assertEquals('Hello!\r\nHow is my reading?\r\n', resdata) |
|
114 res.close() |
|
115 store.close() |
|
116 |
|
117 |
|
118 if __name__ == '__main__': |
|
119 unittest.main() |