1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """ASTE test drop generation."""
23
24 from optparse import OptionParser
25 from textwrap import dedent
26 from xml.etree import ElementTree as et
27 from xml.sax.saxutils import quoteattr
28 import logging
29 import os
30 import re
31 import sys
32 import tempfile
33 import zipfile
34
35 from path import path
36 import amara
37
38 _logger = logging.getLogger('ats3')
39
40
41 E = et.Element
42 SE = et.SubElement
43 TEMP_PATH = None
44
46 """
47 ASTE drop generation configuration.
48 """
49
51 """
52 Initialize from optparse configuration options.
53 """
54 self._opts = opts
55
56 self.build_drive = path(self._opts.build_drive)
57 self.file_store = path(self._opts.file_store)
58 self.flash_images = self.split_paths(self._opts.flash_images)
59
61 """
62 Split the string by delim, removing extra whitespace.
63 """
64 return [path(part.strip())
65 for part in arg.split(delim) if part.strip()]
66
68 return getattr(self._opts, attr)
69
71 dump = "Configuration:\n"
72 seen = set()
73 for key, value in vars(self).items():
74 if not key.startswith("_"):
75 dump += "\t%s = %s\n" % (key, value)
76 seen.add(key)
77 for key, value in vars(self._opts).items():
78 if key not in seen:
79 dump += "\t%s = %s\n" % (key, value)
80 seen.add(key)
81 return dump
82
83
85 """
86 Tells ASTE server what to test and how.
87
88 The ASTE test plan from which the test.xml file can be written. The test
89 plan requires TestAsset(s) to perform the tests
90 """
91
92 EMAIL_SUBJECT = (u"ATS3 report for §RUN_NAME§ §RUN_START_DATE§ "
93 u"§RUN_START_TIME§")
94 REPORT_PATH = u"§RUN_NAME§\§RUN_START_DATE§_§RUN_START_TIME§"
95
97 self.diamonds_build_url = config.diamonds_build_url
98 self.testrun_name = config.testrun_name
99 self.harness = "ASTE"
100 self.device_type = config.device_type
101 self.test_type = config.test_type
102 self.device_hwid = config.device_hwid
103 self.plan_name = config.plan_name
104 self.report_email = config.report_email
105 self.file_store = config.file_store
106 self.test_timeout = config.test_timeout
107 self.testasset_location = config.testasset_location
108 self.testasset_caseids = config.testasset_caseids
109 self.software_version = config.software_version
110 self.device_language = config.device_language
111 self.software_release = config.software_release
112 self.sets = []
113 self.temp_directory = path(tempfile.mkdtemp())
114
115 - def insert_set(self, image_files=None, test_timeout=None, ):
116 """
117 Insert a test set into the test plan.
118 """
119 if image_files is None:
120 image_files = []
121 if test_timeout is None:
122 test_timeout = []
123 test_harness = self.harness
124 setd = dict(name="set%d" % len(self.sets), image_files=image_files)
125 setd = dict(setd, test_timeout=test_timeout, test_harness=test_harness)
126
127 self.sets.append(setd)
128
129 @property
130 - def post_actions(self):
131 """ATS3 and ASTE post actions."""
132 actions = []
133 report_path = self.file_store.joinpath(self.REPORT_PATH)
134 email_action = ("SendEmailAction",
135 (("subject", self.EMAIL_SUBJECT),
136 ("type", "ATS3_REPORT"),
137 ("send-files", "true"),
138 ("to", self.report_email)))
139 ats3_report = ("FileStoreAction",
140 (("to-folder", report_path.joinpath("ATS3_REPORT")),
141 ("report-type", "ATS_REPORT"),
142 ("date-format", "yyyyMMdd"),
143 ("time-format", "HHmmss")))
144 aste_report = ("FileStoreAction",
145 (("to-folder", report_path.joinpath("ASTE_REPORT")),
146 ("report-type", "ASTE_REPORT"),
147 ("run-log", "true"),
148 ("date-format", "yyyyMMdd"),
149 ("time-format", "HHmmss")))
150
151 diamonds_action = ("DiamondsAction", ())
152 if self.report_email:
153 actions.append(email_action)
154 if self.file_store:
155 actions.append(ats3_report)
156 actions.append(aste_report)
157 if self.diamonds_build_url:
158 actions.append(diamonds_action)
159 return actions
160
162 return self.__dict__[key]
163
164
165
167 """
168 Generate test drop zip file for ATS3.
169
170 Generates drop zip files file from a TestPlan instance. The main
171 responsibility of this class is to serialize the plan into a valid XML
172 file and build a zip file for the drop.
173
174 Creates one <set> for ASTE tests.
175
176 ASTE harness, normal operation
177 ------------------------------
178
179 - create logging dir for aste makedir (to C:\logs\TestFramework)
180 - execute asset from the testasset.zip execute-asset
181 - fetch logs fetch-log
182
183 """
184
185 ASTE_LOG_DIR = r"c:\logs\testframework"
186
188 self.drop_path_root = path("ATS3Drop")
189 self.drop_path = None
190 self.defaults = {}
191
192 - def generate(self, test_plan, output_file):
196
198 """Generate test drop zip file."""
199 zfile = zipfile.ZipFile(output_file, "w", zipfile.ZIP_DEFLATED)
200 try:
201 for drop_file, src_file in self.drop_files(test_plan):
202 _logger.info(" + Adding: %s" % src_file.strip())
203 zfile.write(src_file.strip(), drop_file.encode('utf-8'))
204 doc = amara.parse(et.tostring(xml.getroot()))
205 _logger.debug("XML output: %s" % doc.xml(indent=u"yes"))
206 zfile.writestr("test.xml", doc.xml(indent="yes"))
207 finally:
208 zfile.close()
209 return zfile
210
212 """Generate test drop XML."""
213 self.defaults = {"enabled": "true",
214 "passrate": "100",
215 "significant": "false",
216 "harness": "%s" % test_plan["harness"]}
217 root = E("test")
218 root.append(et.XML("<name>%(testrun_name)s</name>" % test_plan))
219 if test_plan["diamonds_build_url"]:
220 root.append(
221 et.XML("<buildid>%(diamonds_build_url)s</buildid>" % test_plan))
222 self.generate_target(test_plan, root)
223 root.append(self.generate_plan(test_plan))
224 for post_action in self.generate_post_actions(test_plan):
225 root.append(post_action)
226 root.append(self.generate_files(test_plan))
227 etree = et.ElementTree(root)
228 return etree
229
231 """Append target(s) into the XML"""
232 target = E("target")
233 device = SE(target, "device", rank="none", alias="DEFAULT")
234 SE(device, "property", name="harness", value=test_plan["harness"])
235 SE(device, "property", name="hardware", value=test_plan["device_type"])
236 SE(device, "setting", name="softwareVersion", value=test_plan["software_version"])
237 SE(device, "setting", name="softwareRelease", value=test_plan["software_release"])
238 SE(device, "setting", name="language", value=test_plan["device_language"])
239
240 if test_plan["device_hwid"] != "":
241 SE(device, "property", name="HWID", value=test_plan["device_hwid"])
242 root.append(target)
243
245 """Generate the test <plan> with multiple <set>s."""
246 plan = E("plan", name="Plan %s %s" % (test_plan["test_type"], test_plan["device_type"]), **self.defaults)
247 session = SE(plan, "session", name="session", **self.defaults)
248
249 for setd in test_plan.sets:
250 self.drop_path = self.drop_path_root.joinpath(setd["name"])
251 elem = SE(session, "set", name=setd["name"], **self.defaults)
252 SE(SE(elem, "target"), "device", rank="master", alias="DEFAULT")
253 case = SE(elem, "case", name="%s case" % setd["name"], **self.defaults)
254 self.generate_steps(setd, case, test_plan)
255 return plan
256
258 """Generate the test plan <step>s."""
259
260 images = self.drop_path_root.joinpath("images")
261 for image_file in setd["image_files"]:
262 flash = SE(case, "flash", images=images.joinpath(image_file.name))
263 flash.set("target-alias", "DEFAULT")
264
265
266 self.generate_execute_asset_steps(case, test_plan)
267
269 """Executes steps for TestAsset"""
270 time_out = test_plan["test_timeout"]
271 step = SE(case, "step",
272 name="Execute asset zip step" , **self.defaults)
273 SE(step, "command").text = "execute-asset"
274 params = SE(step, "params")
275 SE(params, "param", repeat="1")
276 elem = SE(params, "param")
277 elem.set("asset-source", path(r"ATS3Drop" + os.sep + "TestAssets" + os.sep + "TestAsset.zip"))
278 elem = SE(params, "param")
279 elem.set("testcase-ids", test_plan["testasset_caseids"])
280 SE(params, "param", timeout=time_out)
281
282
283 - def generate_post_actions(self, test_plan):
284 """Generate post actions."""
285 actions = []
286 for action_type, parameters in test_plan.post_actions:
287 action = E("postAction")
288 SE(action, "type").text = action_type
289 params = SE(action, "params")
290 for name, value in parameters:
291 SE(params, "param", name=name, value=value)
292 actions.append(action)
293 return actions
294
296 """Generate TestAsset.zip for the ASTE server"""
297 filename = test_plan["temp_directory"].joinpath(r"TestAsset.zip")
298 if output_file != None:
299 filename = output_file
300 testasset_location = path(test_plan["testasset_location"])
301 if re.search(r"[.]zip", testasset_location):
302 return testasset_location
303 else:
304 zfile = zipfile.ZipFile(filename, "w", zipfile.ZIP_DEFLATED)
305 try:
306 for file_ in list(testasset_location.walkfiles()):
307 file_mod = file_.replace(testasset_location, "")
308 zfile.write(file_, file_mod.encode('utf-8'))
309 finally:
310 zfile.close()
311 return filename
312
314 """Yield a list of drop files."""
315 drop_set = set()
316 drop_files = []
317 zip_file = path(self.generate_testasset_zip(test_plan))
318 for setd in test_plan.sets:
319 drop_path = self.drop_path_root.joinpath(setd["name"])
320 drop_files = ((drop_path.parent, "images", setd["image_files"]),
321 (drop_path.parent, "TestAssets", [zip_file]))
322 for drop_dir, sub_dir, files in drop_files:
323 for file_path in files:
324 if file_path != None:
325 drop_file = drop_dir.joinpath(sub_dir, file_path.name)
326 drop_file = drop_file.normpath()
327 if drop_file not in drop_set:
328 drop_set.add(drop_file)
329 yield (drop_file, file_path.normpath())
330
331
333 """Generate the <files> section."""
334 files_elem = E("files")
335 for drop_file, _ in self.drop_files(test_plan):
336 SE(files_elem, "file").text = drop_file
337 return files_elem
338
340 """
341
342 Add information to the test_plan
343
344 """
345
347 self.flash_images = [path(p) for p in config.flash_images]
348 self.build_drive = config.build_drive
349 self.test_timeout = config.test_timeout
350
351
353 """Parse flash images and creates inserts into 'sets'"""
354
355 test_plan.insert_set(image_files=self.flash_images,
356 test_timeout=list(self.test_timeout))
357
358
369
370
372 """Main entry point."""
373 cli = OptionParser(usage="%prog [options] TSRC1 [TSRC2 [TSRC3 ...]]")
374 cli.add_option("--build-drive", help="Build area root drive")
375 cli.add_option("--device-type", help="Device type (e.g. 'PRODUCT')",
376 default="unknown")
377 cli.add_option("--device-hwid", help="Device hwid",
378 default="")
379 cli.add_option("--diamonds-build-url", help="Diamonds build url")
380 cli.add_option("--drop-file", help="Name for the final drop zip file",
381 default="ATS3Drop.zip")
382 cli.add_option("--file-store", help="Destination path for reports.",
383 default="")
384 cli.add_option("--flash-images", help="Paths to the flash image files",
385 default="")
386 cli.add_option("--minimum-flash-images", help="Minimum amount of flash images",
387 default=2)
388 cli.add_option("--report-email", help="Email notification receivers",
389 default="")
390 cli.add_option("--plan-name", help="Name of the test plan",
391 default="plan")
392 cli.add_option("--test-timeout", help="Test execution timeout value (default: %default)",
393 default="60")
394 cli.add_option("--testrun-name", help="Name of the test run",
395 default="run")
396 cli.add_option("--testasset-location", help="Path to the ASTE test assets location",
397 default="")
398 cli.add_option("--testasset-caseids", help="Test case IDs to run",
399 default="")
400 cli.add_option("--software-version", help="Sofwtare version",
401 default="")
402 cli.add_option("--test-type", help="Sofwtare version",
403 default="smoke")
404 cli.add_option("--device-language", help="language name e.g. English",
405 default="English")
406 cli.add_option("--software-release", help="Software release or product name e.g. PPD 52.50",
407 default="")
408 cli.add_option("--verbose", help="Increase output verbosity",
409 action="store_true", default=False)
410
411 opts, _ = cli.parse_args()
412
413 if not opts.flash_images:
414 cli.error("no flash image files given")
415 if len(opts.flash_images.split(",")) < int(opts.minimum_flash_images):
416 cli.error("Not enough flash files: %i defined, %i needed" % (len(opts.flash_images.split(",")), int(opts.minimum_flash_images) ))
417
418 if opts.verbose:
419 _logger.setLevel(logging.DEBUG)
420 logging.basicConfig(level=logging.DEBUG)
421 TEMP_PATH = tempfile.mkdtemp()
422 config = Configuration(opts)
423 create_drop(config)
424
425 if __name__ == "__main__":
426 main()
427