587
|
1 |
# -*- encoding: latin-1 -*-
|
|
2 |
|
|
3 |
#============================================================================
|
|
4 |
#Name : aste.py
|
|
5 |
#Part of : Helium
|
|
6 |
|
|
7 |
#Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
|
|
8 |
#All rights reserved.
|
|
9 |
#This component and the accompanying materials are made available
|
|
10 |
#under the terms of the License "Eclipse Public License v1.0"
|
|
11 |
#which accompanies this distribution, and is available
|
|
12 |
#at the URL "http://www.eclipse.org/legal/epl-v10.html".
|
|
13 |
#
|
|
14 |
#Initial Contributors:
|
|
15 |
#Nokia Corporation - initial contribution.
|
|
16 |
#
|
|
17 |
#Contributors:
|
|
18 |
#
|
|
19 |
#Description:
|
|
20 |
#===============================================================================
|
|
21 |
|
|
22 |
"""ASTE test drop generation."""
|
|
23 |
|
628
|
24 |
|
587
|
25 |
#W0142 => * and ** were used
|
|
26 |
#R* removed during refactoring
|
|
27 |
|
|
28 |
from optparse import OptionParser
|
|
29 |
from xml.etree import ElementTree as et
|
|
30 |
import logging
|
|
31 |
import os
|
|
32 |
import re
|
|
33 |
import tempfile
|
|
34 |
import zipfile
|
|
35 |
import pkg_resources
|
628
|
36 |
from path import path # pylint: disable=F0401
|
587
|
37 |
import amara
|
|
38 |
import ntpath as atspath
|
628
|
39 |
import jinja2 # pylint: disable=F0401
|
587
|
40 |
|
|
41 |
_logger = logging.getLogger('ats3')
|
|
42 |
|
|
43 |
# Shortcuts
|
|
44 |
E = et.Element
|
|
45 |
SE = et.SubElement
|
|
46 |
|
|
47 |
class Configuration(object):
|
|
48 |
"""
|
|
49 |
ASTE drop generation configuration.
|
|
50 |
"""
|
|
51 |
|
|
52 |
def __init__(self, opts):
|
|
53 |
"""
|
|
54 |
Initialize from optparse configuration options.
|
|
55 |
"""
|
|
56 |
self._opts = opts
|
|
57 |
# Customize some attributes from how optparse leaves them.
|
|
58 |
self.build_drive = path(self._opts.build_drive)
|
|
59 |
self.file_store = path(self._opts.file_store)
|
|
60 |
self.flash_images = self.split_paths(self._opts.flash_images)
|
|
61 |
|
|
62 |
def split_paths(self, arg, delim=","):
|
|
63 |
"""
|
|
64 |
Split the string by delim, removing extra whitespace.
|
|
65 |
"""
|
|
66 |
return [path(part.strip())
|
|
67 |
for part in arg.split(delim) if part.strip()]
|
|
68 |
|
|
69 |
def __getattr__(self, attr):
|
|
70 |
return getattr(self._opts, attr)
|
|
71 |
|
|
72 |
def __str__(self):
|
|
73 |
dump = "Configuration:\n"
|
|
74 |
seen = set()
|
|
75 |
for key, value in vars(self).items():
|
|
76 |
if not key.startswith("_"):
|
|
77 |
dump += "\t%s = %s\n" % (key, value)
|
|
78 |
seen.add(key)
|
|
79 |
for key, value in vars(self._opts).items():
|
|
80 |
if key not in seen:
|
|
81 |
dump += "\t%s = %s\n" % (key, value)
|
|
82 |
seen.add(key)
|
|
83 |
return dump
|
|
84 |
|
|
85 |
|
|
86 |
class AsteTestPlan(object):
|
|
87 |
"""
|
|
88 |
Tells ASTE server what to test and how.
|
|
89 |
|
|
90 |
The ASTE test plan from which the test.xml file can be written. The test
|
|
91 |
plan requires TestAsset(s) to perform the tests
|
|
92 |
"""
|
|
93 |
|
|
94 |
EMAIL_SUBJECT = (u"ATS3 report for §RUN_NAME§ §RUN_START_DATE§ "
|
|
95 |
u"§RUN_START_TIME§")
|
|
96 |
REPORT_PATH = u"§RUN_NAME§\§RUN_START_DATE§_§RUN_START_TIME§"
|
|
97 |
|
|
98 |
def __init__(self, config):
|
|
99 |
self.diamonds_build_url = config.diamonds_build_url
|
|
100 |
self.testrun_name = config.testrun_name
|
|
101 |
self.harness = "ASTE"
|
|
102 |
self.device_type = config.device_type
|
|
103 |
self.test_type = config.test_type
|
|
104 |
self.device_hwid = config.device_hwid
|
|
105 |
self.plan_name = config.plan_name
|
|
106 |
self.report_email = config.report_email
|
|
107 |
self.file_store = config.file_store
|
|
108 |
self.test_timeout = config.test_timeout
|
|
109 |
self.testasset_location = config.testasset_location
|
|
110 |
self.testasset_caseids = config.testasset_caseids
|
|
111 |
self.software_version = config.software_version
|
|
112 |
self.device_language = config.device_language
|
|
113 |
self.software_release = config.software_release
|
|
114 |
self.sets = []
|
|
115 |
self.temp_directory = path(tempfile.mkdtemp())
|
|
116 |
|
|
117 |
def insert_set(self, image_files=None, test_timeout=None, ):
|
|
118 |
"""
|
|
119 |
Insert a test set into the test plan.
|
|
120 |
"""
|
|
121 |
if image_files is None:
|
|
122 |
image_files = []
|
|
123 |
if test_timeout is None:
|
|
124 |
test_timeout = []
|
|
125 |
test_harness = self.harness
|
|
126 |
setd = dict(name="set%d" % len(self.sets), image_files=image_files)
|
|
127 |
setd = dict(setd, test_timeout=test_timeout, test_harness=test_harness)
|
|
128 |
|
|
129 |
self.sets.append(setd)
|
|
130 |
|
|
131 |
@property
|
|
132 |
def post_actions(self):
|
|
133 |
"""ATS3 and ASTE post actions."""
|
|
134 |
actions = []
|
|
135 |
report_path = self.file_store.joinpath(self.REPORT_PATH)
|
|
136 |
email_action = ("SendEmailAction",
|
|
137 |
(("subject", self.EMAIL_SUBJECT),
|
|
138 |
("type", "ATS3_REPORT"),
|
|
139 |
("send-files", "true"),
|
|
140 |
("to", self.report_email)))
|
|
141 |
ats3_report = ("FileStoreAction",
|
|
142 |
(("to-folder", report_path.joinpath("ATS3_REPORT")),
|
|
143 |
("report-type", "ATS_REPORT"),
|
|
144 |
("date-format", "yyyyMMdd"),
|
|
145 |
("time-format", "HHmmss")))
|
|
146 |
aste_report = ("FileStoreAction",
|
|
147 |
(("to-folder", report_path.joinpath("ASTE_REPORT")),
|
|
148 |
("report-type", "ASTE_REPORT"),
|
|
149 |
("run-log", "true"),
|
|
150 |
("date-format", "yyyyMMdd"),
|
|
151 |
("time-format", "HHmmss")))
|
|
152 |
|
|
153 |
diamonds_action = ("DiamondsAction", ())
|
|
154 |
if self.report_email:
|
|
155 |
actions.append(email_action)
|
|
156 |
if self.file_store:
|
|
157 |
actions.append(ats3_report)
|
|
158 |
actions.append(aste_report)
|
|
159 |
if self.diamonds_build_url:
|
|
160 |
actions.append(diamonds_action)
|
|
161 |
return actions
|
|
162 |
|
|
163 |
def __getitem__(self, key):
|
|
164 |
return self.__dict__[key]
|
|
165 |
|
|
166 |
|
|
167 |
|
|
168 |
class AsteTestDropGenerator(object):
|
|
169 |
"""
|
|
170 |
Generate test drop zip file for ATS3.
|
|
171 |
|
|
172 |
Generates drop zip files file from a TestPlan instance. The main
|
|
173 |
responsibility of this class is to serialize the plan into a valid XML
|
|
174 |
file and build a zip file for the drop.
|
|
175 |
|
|
176 |
Creates one <set> for ASTE tests.
|
|
177 |
|
|
178 |
ASTE harness, normal operation
|
|
179 |
------------------------------
|
|
180 |
|
|
181 |
- create logging dir for aste makedir (to C:\logs\TestFramework)
|
|
182 |
- execute asset from the testasset.zip execute-asset
|
|
183 |
- fetch logs fetch-log
|
|
184 |
|
|
185 |
"""
|
|
186 |
|
|
187 |
ASTE_LOG_DIR = r"c:\logs\testframework"
|
|
188 |
|
|
189 |
def __init__(self):
|
|
190 |
self.drop_path_root = path("ATS3Drop")
|
|
191 |
self.drop_path = None
|
|
192 |
self.defaults = {}
|
|
193 |
|
|
194 |
def generate(self, test_plan, output_file):
|
|
195 |
"""Generate a test drop file."""
|
|
196 |
xml = self.generate_xml(test_plan)
|
|
197 |
return self.generate_drop(test_plan, xml, output_file)
|
|
198 |
|
|
199 |
def generate_drop(self, test_plan, xml, output_file):
|
|
200 |
"""Generate test drop zip file."""
|
|
201 |
zfile = zipfile.ZipFile(output_file, "w", zipfile.ZIP_DEFLATED)
|
|
202 |
try:
|
|
203 |
for drop_file, src_file in self.drop_files(test_plan):
|
|
204 |
_logger.info(" + Adding: %s" % src_file.strip())
|
|
205 |
zfile.write(src_file.strip(), drop_file.encode('utf-8'))
|
|
206 |
doc = amara.parse(et.tostring(xml.getroot()))
|
|
207 |
_logger.debug("XML output: %s" % doc.xml(indent=u"yes"))
|
|
208 |
zfile.writestr("test.xml", doc.xml(indent="yes"))
|
|
209 |
finally:
|
|
210 |
zfile.close()
|
|
211 |
return zfile
|
|
212 |
|
|
213 |
def generate_xml(self, test_plan):
|
|
214 |
"""Generate test drop XML."""
|
|
215 |
self.defaults = {"enabled": "true",
|
|
216 |
"passrate": "100",
|
|
217 |
"significant": "false",
|
|
218 |
"harness": "%s" % test_plan["harness"]}
|
|
219 |
root = E("test")
|
|
220 |
root.append(et.XML("<name>%(testrun_name)s</name>" % test_plan))
|
|
221 |
if test_plan["diamonds_build_url"]:
|
|
222 |
root.append(
|
|
223 |
et.XML("<buildid>%(diamonds_build_url)s</buildid>" % test_plan))
|
|
224 |
self.generate_target(test_plan, root)
|
|
225 |
root.append(self.generate_plan(test_plan))
|
|
226 |
for post_action in self.generate_post_actions(test_plan):
|
|
227 |
root.append(post_action)
|
|
228 |
root.append(self.generate_files(test_plan))
|
|
229 |
etree = et.ElementTree(root)
|
|
230 |
return etree
|
|
231 |
|
|
232 |
def generate_target(self, test_plan, root):
|
|
233 |
"""Append target(s) into the XML"""
|
|
234 |
target = E("target")
|
|
235 |
device = SE(target, "device", rank="none", alias="DEFAULT")
|
|
236 |
SE(device, "property", name="harness", value=test_plan["harness"])
|
|
237 |
SE(device, "property", name="hardware", value=test_plan["device_type"])
|
|
238 |
SE(device, "setting", name="softwareVersion", value=test_plan["software_version"])
|
|
239 |
SE(device, "setting", name="softwareRelease", value=test_plan["software_release"])
|
|
240 |
SE(device, "setting", name="language", value=test_plan["device_language"])
|
|
241 |
|
|
242 |
if test_plan["device_hwid"] != "":
|
|
243 |
SE(device, "property", name="HWID", value=test_plan["device_hwid"])
|
|
244 |
root.append(target)
|
|
245 |
|
|
246 |
def generate_plan(self, test_plan):
|
|
247 |
"""Generate the test <plan> with multiple <set>s."""
|
|
248 |
plan = E("plan", name="Plan %s %s" % (test_plan["test_type"], test_plan["device_type"]), **self.defaults)
|
|
249 |
session = SE(plan, "session", name="session", **self.defaults)
|
|
250 |
# One set for each component.
|
|
251 |
for setd in test_plan.sets:
|
|
252 |
self.drop_path = self.drop_path_root.joinpath(setd["name"])
|
|
253 |
elem = SE(session, "set", name=setd["name"], **self.defaults)
|
|
254 |
SE(SE(elem, "target"), "device", rank="master", alias="DEFAULT")
|
|
255 |
case = SE(elem, "case", name="%s case" % setd["name"], **self.defaults)
|
|
256 |
self.generate_steps(setd, case, test_plan)
|
|
257 |
return plan
|
|
258 |
|
|
259 |
def generate_steps(self, setd, case, test_plan):
|
|
260 |
"""Generate the test plan <step>s."""
|
|
261 |
# Flash images.
|
|
262 |
images = self.drop_path_root.joinpath("images")
|
|
263 |
for image_file in setd["image_files"]:
|
|
264 |
flash = SE(case, "flash", images=images.joinpath(image_file.name))
|
|
265 |
flash.set("target-alias", "DEFAULT")
|
|
266 |
|
|
267 |
# If tracing enabled:
|
|
268 |
self.generate_execute_asset_steps(case, test_plan)
|
|
269 |
|
|
270 |
def generate_execute_asset_steps(self, case, test_plan):
|
|
271 |
"""Executes steps for TestAsset"""
|
|
272 |
time_out = test_plan["test_timeout"]
|
|
273 |
step = SE(case, "step",
|
|
274 |
name="Execute asset zip step" , **self.defaults)
|
|
275 |
SE(step, "command").text = "execute-asset"
|
|
276 |
params = SE(step, "params")
|
|
277 |
SE(params, "param", repeat="1")
|
|
278 |
elem = SE(params, "param")
|
|
279 |
elem.set("asset-source", path(r"ATS3Drop" + os.sep + "TestAssets" + os.sep + "TestAsset.zip"))
|
|
280 |
elem = SE(params, "param")
|
|
281 |
elem.set("testcase-ids", test_plan["testasset_caseids"])
|
|
282 |
SE(params, "param", timeout=time_out)
|
|
283 |
|
|
284 |
|
|
285 |
def generate_post_actions(self, test_plan):
|
|
286 |
"""Generate post actions."""
|
|
287 |
actions = []
|
|
288 |
for action_type, parameters in test_plan.post_actions:
|
|
289 |
action = E("postAction")
|
|
290 |
SE(action, "type").text = action_type
|
|
291 |
params = SE(action, "params")
|
|
292 |
for name, value in parameters:
|
|
293 |
SE(params, "param", name=name, value=value)
|
|
294 |
actions.append(action)
|
|
295 |
return actions
|
|
296 |
|
|
297 |
def generate_testasset_zip(self, test_plan, output_file=None):
|
|
298 |
"""Generate TestAsset.zip for the ASTE server"""
|
|
299 |
filename = test_plan["temp_directory"].joinpath(r"TestAsset.zip")
|
|
300 |
if output_file != None:
|
|
301 |
filename = output_file
|
|
302 |
testasset_location = path(test_plan["testasset_location"])
|
|
303 |
if re.search(r"[.]zip", testasset_location):
|
|
304 |
return testasset_location
|
|
305 |
else:
|
|
306 |
zfile = zipfile.ZipFile(filename, "w", zipfile.ZIP_DEFLATED)
|
|
307 |
try:
|
|
308 |
for file_ in list(testasset_location.walkfiles()):
|
|
309 |
file_mod = file_.replace(testasset_location, "")
|
|
310 |
zfile.write(file_, file_mod.encode('utf-8'))
|
|
311 |
finally:
|
|
312 |
zfile.close()
|
|
313 |
return filename
|
|
314 |
|
|
315 |
def drop_files(self, test_plan):
|
|
316 |
"""Yield a list of drop files."""
|
|
317 |
drop_set = set()
|
|
318 |
drop_files = []
|
|
319 |
zip_file = path(self.generate_testasset_zip(test_plan)) ##check here, I changed the variable name from "zipfile" to "zip_file"
|
|
320 |
for setd in test_plan.sets:
|
|
321 |
drop_path = self.drop_path_root.joinpath(setd["name"])
|
|
322 |
drop_files = ((drop_path.parent, "images", setd["image_files"]),
|
|
323 |
(drop_path.parent, "TestAssets", [zip_file]))
|
|
324 |
for drop_dir, sub_dir, files in drop_files:
|
|
325 |
for file_path in files:
|
|
326 |
if file_path != None:
|
|
327 |
drop_file = drop_dir.joinpath(sub_dir, file_path.name)
|
|
328 |
drop_file = drop_file.normpath()
|
|
329 |
if drop_file not in drop_set:
|
|
330 |
drop_set.add(drop_file)
|
|
331 |
yield (drop_file, file_path.normpath())
|
|
332 |
|
|
333 |
|
|
334 |
def generate_files(self, test_plan):
|
|
335 |
"""Generate the <files> section."""
|
|
336 |
files_elem = E("files")
|
|
337 |
for drop_file, _ in self.drop_files(test_plan):
|
|
338 |
SE(files_elem, "file").text = drop_file
|
|
339 |
return files_elem
|
|
340 |
|
|
341 |
|
|
342 |
class AsteComponentParser(object):
|
|
343 |
"""
|
|
344 |
Add information to the test_plan
|
|
345 |
"""
|
|
346 |
def __init__(self, config):
|
|
347 |
self.flash_images = [path(p) for p in config.flash_images]
|
|
348 |
self.build_drive = config.build_drive
|
|
349 |
self.test_timeout = config.test_timeout
|
|
350 |
|
|
351 |
def insert_test_set(self, test_plan):
|
|
352 |
"""Parse flash images and creates inserts into 'sets'"""
|
|
353 |
test_plan.insert_set(image_files=self.flash_images,
|
|
354 |
test_timeout=list(self.test_timeout))
|
|
355 |
|
|
356 |
|
|
357 |
class AsteTemplateTestDropGenerator(AsteTestDropGenerator):
|
|
358 |
""" ASTE template Drop generator tester"""
|
|
359 |
def getlogdir(self, setd):
|
|
360 |
""" get the logger directory"""
|
|
361 |
setd = setd #just to fool pylint
|
|
362 |
return self.ASTE_LOG_DIR
|
|
363 |
|
|
364 |
def aslfiles(self, test_plan):
|
|
365 |
"""get the list of .asl files"""
|
|
366 |
files = []
|
|
367 |
|
|
368 |
testasset_location = path(test_plan["testasset_location"])
|
|
369 |
for file_ in list(testasset_location.walkfiles()):
|
|
370 |
if file_.endswith('.asl'):
|
|
371 |
files.append(file_.replace(testasset_location + os.sep, ""))
|
|
372 |
return files
|
|
373 |
|
|
374 |
def generate_xml(self, test_plan):
|
|
375 |
""" generate an XML file"""
|
|
376 |
loader = jinja2.ChoiceLoader([jinja2.PackageLoader(__name__, 'templates')])
|
|
377 |
env = jinja2.Environment(loader=loader)
|
628
|
378 |
template = env.from_string(pkg_resources.resource_string(__name__, 'aste_template.xml'))# pylint: disable=E1101
|
587
|
379 |
|
|
380 |
xmltext = template.render(test_plan=test_plan, os=os, atspath=atspath, atsself=self).encode('ISO-8859-1')
|
|
381 |
return et.ElementTree(et.XML(xmltext))
|
|
382 |
|
|
383 |
def create_drop(config):
|
|
384 |
"""Create a test drop."""
|
|
385 |
_logger.debug("initialize test plan")
|
|
386 |
|
|
387 |
test_plan = AsteTestPlan(config)
|
|
388 |
parser = AsteComponentParser(config)
|
|
389 |
parser.insert_test_set(test_plan) ######check here if something goes wrong, removed ", path(tsrc)"
|
|
390 |
if config.ats4_enabled.lower() == 'true':
|
|
391 |
generator = AsteTemplateTestDropGenerator()
|
|
392 |
else:
|
|
393 |
generator = AsteTestDropGenerator()
|
|
394 |
_logger.info("generating drop file: %s" % config.drop_file)
|
|
395 |
generator.generate(test_plan, output_file=config.drop_file)
|
|
396 |
|
|
397 |
|
|
398 |
def main():
|
|
399 |
"""Main entry point."""
|
|
400 |
cli = OptionParser(usage="%prog [options] TSRC1 [TSRC2 [TSRC3 ...]]")
|
|
401 |
cli.add_option("--build-drive", help="Build area root drive")
|
|
402 |
cli.add_option("--device-type", help="Device type (e.g. 'PRODUCT')",
|
|
403 |
default="unknown")
|
|
404 |
cli.add_option("--device-hwid", help="Device hwid",
|
|
405 |
default="")
|
|
406 |
cli.add_option("--diamonds-build-url", help="Diamonds build url")
|
|
407 |
cli.add_option("--drop-file", help="Name for the final drop zip file",
|
|
408 |
default="ATS3Drop.zip")
|
|
409 |
cli.add_option("--file-store", help="Destination path for reports.",
|
|
410 |
default="")
|
|
411 |
cli.add_option("--flash-images", help="Paths to the flash image files",
|
|
412 |
default="")
|
|
413 |
cli.add_option("--minimum-flash-images", help="Minimum amount of flash images",
|
|
414 |
default=2)
|
|
415 |
cli.add_option("--report-email", help="Email notification receivers",
|
|
416 |
default="")
|
|
417 |
cli.add_option("--plan-name", help="Name of the test plan",
|
|
418 |
default="plan")
|
|
419 |
cli.add_option("--test-timeout", help="Test execution timeout value (default: %default)",
|
|
420 |
default="60")
|
|
421 |
cli.add_option("--testrun-name", help="Name of the test run",
|
|
422 |
default="run")
|
|
423 |
cli.add_option("--testasset-location", help="Path to the ASTE test assets location",
|
|
424 |
default="")
|
|
425 |
cli.add_option("--testasset-caseids", help="Test case IDs to run",
|
|
426 |
default="")
|
|
427 |
cli.add_option("--software-version", help="Sofwtare version",
|
|
428 |
default="")
|
|
429 |
cli.add_option("--test-type", help="Sofwtare version",
|
|
430 |
default="smoke")
|
|
431 |
cli.add_option("--device-language", help="language name e.g. English",
|
|
432 |
default="English")
|
|
433 |
cli.add_option("--software-release", help="Software release or product name e.g. PPD 52.50",
|
|
434 |
default="")
|
|
435 |
cli.add_option("--ats4-enabled", help="ATS4 enabled", default="False")
|
|
436 |
cli.add_option("--verbose", help="Increase output verbosity",
|
|
437 |
action="store_true", default=False)
|
|
438 |
|
|
439 |
opts, _ = cli.parse_args()
|
|
440 |
|
|
441 |
if not opts.flash_images:
|
|
442 |
cli.error("no flash image files given")
|
|
443 |
if len(opts.flash_images.split(",")) < int(opts.minimum_flash_images):
|
|
444 |
cli.error("Not enough flash files: %i defined, %i needed" % (len(opts.flash_images.split(",")), int(opts.minimum_flash_images) ))
|
|
445 |
|
|
446 |
if opts.verbose:
|
|
447 |
_logger.setLevel(logging.DEBUG)
|
|
448 |
logging.basicConfig(level=logging.DEBUG)
|
|
449 |
config = Configuration(opts)
|
|
450 |
create_drop(config)
|
|
451 |
|
|
452 |
if __name__ == "__main__":
|
|
453 |
main()
|