|
1 # |
|
2 # Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). |
|
3 # All rights reserved. |
|
4 # This component and the accompanying materials are made available |
|
5 # under the terms of "Eclipse Public License v1.0" |
|
6 # which accompanies this distribution, and is available |
|
7 # at the URL "http://www.eclipse.org/legal/epl-v10.html". |
|
8 # |
|
9 # Initial Contributors: |
|
10 # Nokia Corporation - initial contribution. |
|
11 # |
|
12 # Contributors: |
|
13 # |
|
14 # Description: |
|
15 # |
|
16 |
|
17 |
|
18 """ |
|
19 Test the CPF root file parsing routines |
|
20 """ |
|
21 |
|
22 import zipfile |
|
23 import unittest |
|
24 import string |
|
25 import sys,os,shutil |
|
26 |
|
27 ROOT_PATH = os.path.dirname(os.path.abspath(__file__)) |
|
28 |
|
29 import __init__ |
|
30 from cone.public import exceptions, api |
|
31 from cone.storage import zipstorage |
|
32 |
|
33 datazip = os.path.join(ROOT_PATH,"data.zip") |
|
34 |
|
35 #class TestZipStorageDummy(unittest.TestCase): |
|
36 # def test_open_storage(self): |
|
37 # storage = zipstorage.ZipStorage("TestProlog.zip","r") |
|
38 # res = storage.open_resource("TestProlog") |
|
39 # storage.close() |
|
40 |
|
41 class TestZipStorage(unittest.TestCase): |
|
42 def test_create_new_storage(self): |
|
43 storage = zipstorage.ZipStorage("new.zip","w") |
|
44 storage.close() |
|
45 self.assertTrue(os.path.exists("new.zip")) |
|
46 os.unlink("new.zip") |
|
47 |
|
48 def test_open_existing_storage(self): |
|
49 storage = zipstorage.ZipStorage(datazip,"r") |
|
50 storage.close() |
|
51 |
|
52 def test_open_existing_storage_and_close_twice(self): |
|
53 storage = zipstorage.ZipStorage(datazip,"r") |
|
54 storage.close() |
|
55 try: |
|
56 storage.close() |
|
57 self.fail('closing twice succeeds!') |
|
58 except exceptions.StorageException: |
|
59 pass |
|
60 |
|
61 def test_open_nonexisting_storage_fails(self): |
|
62 try: |
|
63 storage = zipstorage.ZipStorage("foo.zip","r") |
|
64 storage.close() |
|
65 self.fail("opening a non existing ZipStorage succeeded?") |
|
66 except zipstorage.ZipException: |
|
67 self.assertTrue(True) |
|
68 |
|
69 def test_open_a_non_zipfile_fails(self): |
|
70 try: |
|
71 storage = zipstorage.ZipStorage("data/onefile/test.txt","r") |
|
72 storage.close() |
|
73 self.fail("opening a non zipfile for ZipStorage succeeded?") |
|
74 except zipstorage.ZipException,e: |
|
75 self.assertTrue(True) |
|
76 |
|
77 class TestStorage(unittest.TestCase): |
|
78 def setUp(self): |
|
79 shutil.copyfile(datazip,"temptests.zip") |
|
80 self.storage = zipstorage.ZipStorage("temptests.zip","a") |
|
81 |
|
82 def tearDown(self): |
|
83 self.storage.close() |
|
84 os.unlink("temptests.zip") |
|
85 |
|
86 def test_open_resource_existing_file_for_reading(self): |
|
87 res = self.storage.open_resource("data/simple.confml","w") |
|
88 self.assertTrue(res) |
|
89 self.assertTrue(isinstance(res,api.Resource)) |
|
90 |
|
91 def test_open_resource_new_file(self): |
|
92 storage = zipstorage.ZipStorage("testnewfile.zip","w") |
|
93 res = storage.open_resource("data/newfile.txt","w") |
|
94 res.write("test write") |
|
95 self.assertTrue(res) |
|
96 self.assertTrue(isinstance(res,api.Resource)) |
|
97 res.close() |
|
98 self.assertEquals(storage.list_resources("",True), ['data/newfile.txt']) |
|
99 storage.close() |
|
100 os.unlink("testnewfile.zip") |
|
101 |
|
102 |
|
103 def test_open_resource_nonexisting(self): |
|
104 try: |
|
105 res = self.storage.open_resource("iamnothere.txt") |
|
106 self.fail("Opening of a non existing file succeeds!??") |
|
107 except exceptions.NotResource: |
|
108 self.assertTrue(True) |
|
109 |
|
110 def test_list_resources_nonrecurse(self): |
|
111 storage = zipstorage.ZipStorage("testnonrecurse.zip","w") |
|
112 res = storage.open_resource("data/morestuff.confml","w") |
|
113 res.close() |
|
114 res = storage.open_resource("data/prodX.confml","w") |
|
115 res.close() |
|
116 file_array = storage.list_resources("data") |
|
117 self.assertEquals(file_array[0],"data/morestuff.confml") |
|
118 self.assertEquals(file_array[1],"data/prodX.confml") |
|
119 storage.close() |
|
120 os.unlink("testnonrecurse.zip") |
|
121 |
|
122 def test_list_resources_recurse(self): |
|
123 storage = zipstorage.ZipStorage("testrecurse.zip","w") |
|
124 res = storage.open_resource("data/foo/morestuff.confml","w") |
|
125 res.close() |
|
126 res = storage.open_resource("data/prodX.confml","w") |
|
127 res.close() |
|
128 res = storage.open_resource("data/ncp11/confml/jallaa.confml","w") |
|
129 res.close() |
|
130 file_array = storage.list_resources("data",True) |
|
131 self.assertEquals(file_array,['data/foo/morestuff.confml', 'data/prodX.confml', 'data/ncp11/confml/jallaa.confml']) |
|
132 storage.close() |
|
133 os.unlink("testrecurse.zip") |
|
134 |
|
135 def test_is_resource_true(self): |
|
136 res = self.storage.open_resource("data/simple.confml","w") |
|
137 res.close() |
|
138 self.assertTrue(self.storage.is_resource("data/simple.confml")) |
|
139 |
|
140 def test_is_resource_true_for_dotted_file(self): |
|
141 res = self.storage.open_resource(".metadata","w") |
|
142 res.close() |
|
143 self.assertTrue(self.storage.is_resource(".metadata")) |
|
144 |
|
145 def test_is_resource_true_with_slash(self): |
|
146 res = self.storage.open_resource("data/simple.confml","w") |
|
147 res.close() |
|
148 self.assertTrue(self.storage.is_resource("/data/simple.confml")) |
|
149 |
|
150 def test_is_resource_false(self): |
|
151 self.assertFalse(self.storage.is_resource("data")) |
|
152 |
|
153 def test_metadata_writing(self): |
|
154 fs = zipstorage.ZipStorage("testtemp.zip","w") |
|
155 fs.set_active_configuration('testing.confml') |
|
156 fs.close() |
|
157 fs = zipstorage.ZipStorage("testtemp.zip","r") |
|
158 self.assertEquals(fs.get_active_configuration(),'testing.confml') |
|
159 fs.close() |
|
160 os.unlink("testtemp.zip") |
|
161 |
|
162 def test_open_resource_new_file_and_overwrite(self): |
|
163 storage = api.Storage.open("testoverwrite.zip","w") |
|
164 res = storage.open_resource("data/newfile.txt","w") |
|
165 res.write("test write") |
|
166 self.assertTrue(res) |
|
167 self.assertTrue(isinstance(res,api.Resource)) |
|
168 res.close() |
|
169 res = storage.open_resource("data/newfile.txt","w") |
|
170 res.write("Hahaaa") |
|
171 res.close() |
|
172 storage.close() |
|
173 storage = api.Storage.open("testoverwrite.zip","r") |
|
174 self.assertEquals(storage.open_resource("data/newfile.txt").read(), "Hahaaa") |
|
175 storage.close() |
|
176 os.unlink("testoverwrite.zip") |
|
177 |
|
178 def test_delete_resource(self): |
|
179 storage = api.Storage.open("testdelete.zip","w") |
|
180 res = storage.open_resource("data/newfile.txt","w") |
|
181 res.write("test write") |
|
182 res.close() |
|
183 res = storage.open_resource("readme.txt","w") |
|
184 res.write("test 2") |
|
185 res.close() |
|
186 storage.close() |
|
187 storage2 = api.Storage.open("testdelete.zip","a") |
|
188 #self.assertEquals(storage2.list_resources("",True), ['.metadata', 'data/newfile.txt', 'readme.txt']) |
|
189 self.assertEquals(storage2.open_resource("data/newfile.txt").read(),"test write") |
|
190 storage2.delete_resource("data/newfile.txt") |
|
191 self.assertEquals(len(storage2.list_resources("",True)),2) |
|
192 storage2.close() |
|
193 storage3 = api.Storage.open("testdelete.zip","a") |
|
194 self.assertEquals(storage3.list_resources("",True), ['readme.txt','.metadata']) |
|
195 storage3.close() |
|
196 os.unlink("testdelete.zip") |
|
197 |
|
198 |
|
199 def test_create_folder(self): |
|
200 storage = api.Storage.open("empty_folder.zip","w") |
|
201 res = storage.open_resource("test.txt","w") |
|
202 res.write('test') |
|
203 res.close() |
|
204 storage.create_folder("data") |
|
205 storage.create_folder("data2/folder1") |
|
206 storage.create_folder("data3\\") |
|
207 storage.close() |
|
208 |
|
209 storage2 = api.Storage.open("empty_folder.zip","a") |
|
210 self.assertEquals(storage2.is_folder("data"),True) |
|
211 self.assertEquals(storage2.is_folder("data2/folder1"),True) |
|
212 self.assertEquals(storage2.is_folder("data3"),True) |
|
213 self.assertEquals(storage2.list_resources('.',True),['test.txt','.metadata']) |
|
214 self.assertEquals(storage2.list_resources(''),['test.txt','.metadata']) |
|
215 storage2.close() |
|
216 os.unlink("empty_folder.zip") |
|
217 |
|
218 class TestZipStorageListResources(unittest.TestCase): |
|
219 |
|
220 def _run_test_list_resources(self, zip_file): |
|
221 full_path = os.path.join(ROOT_PATH, 'list_resources_data', zip_file) |
|
222 zs = zipstorage.ZipStorage(full_path, 'r') |
|
223 res_list = zs.list_resources('/', recurse=True, empty_folders=True) |
|
224 |
|
225 expected = [ |
|
226 ('folder', 'test'), |
|
227 ('folder', 'test/layer'), |
|
228 ('folder', 'test/layer/confml'), |
|
229 ('folder', 'test/layer/content'), |
|
230 ('folder', 'test/layer/content/empty'), |
|
231 ('folder', 'test/layer/content/something'), |
|
232 ('resource', 'test/layer/content/something/x.txt'), |
|
233 ('folder', 'test/layer/doc'), |
|
234 ('folder', 'test/layer/implml'), |
|
235 ('resource', 'test/layer/root.confml'), |
|
236 ('resource', 'test/root.confml')] |
|
237 for type, res in expected: |
|
238 if type == 'resource': |
|
239 self.assertTrue(zs.is_resource(res), "zs.is_resource('%s') returns False" % res) |
|
240 elif type == 'folder': |
|
241 self.assertTrue(zs.is_folder(res), "zs.is_folder('%s') returns False" % res) |
|
242 else: |
|
243 raise RuntimeError('Invalid type') |
|
244 |
|
245 def test_list_resources_7zip_zipped(self): |
|
246 self._run_test_list_resources('7zip.zip') |
|
247 |
|
248 def test_list_resources_winzip_zipped(self): |
|
249 self._run_test_list_resources('winzip.zip') |
|
250 |
|
251 def test_list_resources_carbide_ct_zipped(self): |
|
252 self._run_test_list_resources('carbide.ct.cpf') |
|
253 |
|
254 if __name__ == '__main__': |
|
255 unittest.main() |