diff -r 7685cec9fd3c -r f2ddfa555b0f doc/api/python/ats3.aste-pysrc.html --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/api/python/ats3.aste-pysrc.html Fri Sep 11 11:54:49 2009 +0100 @@ -0,0 +1,979 @@ + + + + + ats3.aste + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + Package ats3 :: + Module aste + + + + + + +
[hide private]
[frames] | no frames]
+
+

Source Code for Module ats3.aste

+
+  1  # -*- encoding: latin-1 -*- 
+  2   
+  3  #============================================================================  
+  4  #Name        : aste.py  
+  5  #Part of     : Helium  
+  6   
+  7  #Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). 
+  8  #All rights reserved. 
+  9  #This component and the accompanying materials are made available 
+ 10  #under the terms of the License "Eclipse Public License v1.0" 
+ 11  #which accompanies this distribution, and is available 
+ 12  #at the URL "http://www.eclipse.org/legal/epl-v10.html". 
+ 13  # 
+ 14  #Initial Contributors: 
+ 15  #Nokia Corporation - initial contribution. 
+ 16  # 
+ 17  #Contributors: 
+ 18  # 
+ 19  #Description: 
+ 20  #=============================================================================== 
+ 21   
+ 22  """ASTE test drop generation.""" 
+ 23   
+ 24  from optparse import OptionParser 
+ 25  from textwrap import dedent 
+ 26  from xml.etree import ElementTree as et 
+ 27  from xml.sax.saxutils import quoteattr 
+ 28  import logging 
+ 29  import os 
+ 30  import re 
+ 31  import sys 
+ 32  import tempfile 
+ 33  import zipfile 
+ 34   
+ 35  from path import path 
+ 36  import amara 
+ 37   
+ 38  _logger = logging.getLogger('ats3') 
+ 39   
+ 40  # Shortcuts 
+ 41  E = et.Element 
+ 42  SE = et.SubElement 
+ 43  TEMP_PATH = None 
+ 44   
+
45 -class Configuration(object): +
46 """ + 47 ASTE drop generation configuration. + 48 """ + 49 +
50 - def __init__(self, opts): +
51 """ + 52 Initialize from optparse configuration options. + 53 """ + 54 self._opts = opts + 55 # Customize some attributes from how optparse leaves them. + 56 self.build_drive = path(self._opts.build_drive) + 57 self.file_store = path(self._opts.file_store) + 58 self.flash_images = self.split_paths(self._opts.flash_images) +
59 +
60 - def split_paths(self, arg, delim=","): +
61 """ + 62 Split the string by delim, removing extra whitespace. + 63 """ + 64 return [path(part.strip()) + 65 for part in arg.split(delim) if part.strip()] +
66 +
67 - def __getattr__(self, attr): +
68 return getattr(self._opts, attr) +
69 +
70 - def __str__(self): +
71 dump = "Configuration:\n" + 72 seen = set() + 73 for key, value in vars(self).items(): + 74 if not key.startswith("_"): + 75 dump += "\t%s = %s\n" % (key, value) + 76 seen.add(key) + 77 for key, value in vars(self._opts).items(): + 78 if key not in seen: + 79 dump += "\t%s = %s\n" % (key, value) + 80 seen.add(key) + 81 return dump +
82 + 83 +
84 -class AsteTestPlan(object): +
85 """ + 86 Tells ASTE server what to test and how. + 87 + 88 The ASTE test plan from which the test.xml file can be written. The test + 89 plan requires TestAsset(s) to perform the tests + 90 """ + 91 + 92 EMAIL_SUBJECT = (u"ATS3 report for §RUN_NAME§ §RUN_START_DATE§ " + 93 u"§RUN_START_TIME§") + 94 REPORT_PATH = u"§RUN_NAME§\§RUN_START_DATE§_§RUN_START_TIME§" + 95 +
96 - def __init__(self, config): +
97 self.diamonds_build_url = config.diamonds_build_url + 98 self.testrun_name = config.testrun_name + 99 self.harness = "ASTE" +100 self.device_type = config.device_type +101 self.test_type = config.test_type +102 self.device_hwid = config.device_hwid +103 self.plan_name = config.plan_name +104 self.report_email = config.report_email +105 self.file_store = config.file_store +106 self.test_timeout = config.test_timeout +107 self.testasset_location = config.testasset_location +108 self.testasset_caseids = config.testasset_caseids +109 self.software_version = config.software_version +110 self.device_language = config.device_language +111 self.software_release = config.software_release +112 self.sets = [] +113 self.temp_directory = path(tempfile.mkdtemp()) +
114 +
115 - def insert_set(self, image_files=None, test_timeout=None, ): +
116 """ +117 Insert a test set into the test plan. +118 """ +119 if image_files is None: +120 image_files = [] +121 if test_timeout is None: +122 test_timeout = [] +123 test_harness = self.harness +124 setd = dict(name="set%d" % len(self.sets), image_files=image_files) +125 setd = dict(setd, test_timeout=test_timeout, test_harness=test_harness) +126 +127 self.sets.append(setd) +
128 +129 @property +
130 - def post_actions(self): +
131 """ATS3 and ASTE post actions.""" +132 actions = [] +133 report_path = self.file_store.joinpath(self.REPORT_PATH) +134 email_action = ("SendEmailAction", +135 (("subject", self.EMAIL_SUBJECT), +136 ("type", "ATS3_REPORT"), +137 ("send-files", "true"), +138 ("to", self.report_email))) +139 ats3_report = ("FileStoreAction", +140 (("to-folder", report_path.joinpath("ATS3_REPORT")), +141 ("report-type", "ATS_REPORT"), +142 ("date-format", "yyyyMMdd"), +143 ("time-format", "HHmmss"))) +144 aste_report = ("FileStoreAction", +145 (("to-folder", report_path.joinpath("ASTE_REPORT")), +146 ("report-type", "ASTE_REPORT"), +147 ("run-log", "true"), +148 ("date-format", "yyyyMMdd"), +149 ("time-format", "HHmmss"))) +150 +151 diamonds_action = ("DiamondsAction", ()) +152 if self.report_email: +153 actions.append(email_action) +154 if self.file_store: +155 actions.append(ats3_report) +156 actions.append(aste_report) +157 if self.diamonds_build_url: +158 actions.append(diamonds_action) +159 return actions +
160 +
161 - def __getitem__(self, key): +
162 return self.__dict__[key] +
163 +164 +165 +
166 -class AsteTestDropGenerator(object): +
167 """ +168 Generate test drop zip file for ATS3. +169 +170 Generates drop zip files file from a TestPlan instance. The main +171 responsibility of this class is to serialize the plan into a valid XML +172 file and build a zip file for the drop. +173 +174 Creates one <set> for ASTE tests. +175 +176 ASTE harness, normal operation +177 ------------------------------ +178 +179 - create logging dir for aste makedir (to C:\logs\TestFramework) +180 - execute asset from the testasset.zip execute-asset +181 - fetch logs fetch-log +182 +183 """ +184 +185 ASTE_LOG_DIR = r"c:\logs\testframework" +186 +
187 - def __init__(self): +
188 self.drop_path_root = path("ATS3Drop") +189 self.drop_path = None +190 self.defaults = {} +
191 +
192 - def generate(self, test_plan, output_file): +
193 """Generate a test drop file.""" +194 xml = self.generate_xml(test_plan) +195 return self.generate_drop(test_plan, xml, output_file) +
196 +
197 - def generate_drop(self, test_plan, xml, output_file): +
198 """Generate test drop zip file.""" +199 zfile = zipfile.ZipFile(output_file, "w", zipfile.ZIP_DEFLATED) +200 try: +201 for drop_file, src_file in self.drop_files(test_plan): +202 _logger.info(" + Adding: %s" % src_file.strip()) +203 zfile.write(src_file.strip(), drop_file.encode('utf-8')) +204 doc = amara.parse(et.tostring(xml.getroot())) +205 _logger.debug("XML output: %s" % doc.xml(indent=u"yes")) +206 zfile.writestr("test.xml", doc.xml(indent="yes")) +207 finally: +208 zfile.close() +209 return zfile +
210 +
211 - def generate_xml(self, test_plan): +
212 """Generate test drop XML.""" +213 self.defaults = {"enabled": "true", +214 "passrate": "100", +215 "significant": "false", +216 "harness": "%s" % test_plan["harness"]} +217 root = E("test") +218 root.append(et.XML("<name>%(testrun_name)s</name>" % test_plan)) +219 if test_plan["diamonds_build_url"]: +220 root.append( +221 et.XML("<buildid>%(diamonds_build_url)s</buildid>" % test_plan)) +222 self.generate_target(test_plan, root) +223 root.append(self.generate_plan(test_plan)) +224 for post_action in self.generate_post_actions(test_plan): +225 root.append(post_action) +226 root.append(self.generate_files(test_plan)) +227 etree = et.ElementTree(root) +228 return etree +
229 +
230 - def generate_target(self, test_plan, root): +
231 """Append target(s) into the XML""" +232 target = E("target") +233 device = SE(target, "device", rank="none", alias="DEFAULT") +234 SE(device, "property", name="harness", value=test_plan["harness"]) +235 SE(device, "property", name="hardware", value=test_plan["device_type"]) +236 SE(device, "setting", name="softwareVersion", value=test_plan["software_version"]) +237 SE(device, "setting", name="softwareRelease", value=test_plan["software_release"]) +238 SE(device, "setting", name="language", value=test_plan["device_language"]) +239 +240 if test_plan["device_hwid"] != "": +241 SE(device, "property", name="HWID", value=test_plan["device_hwid"]) +242 root.append(target) +
243 +
244 - def generate_plan(self, test_plan): +
245 """Generate the test <plan> with multiple <set>s.""" +246 plan = E("plan", name="Plan %s %s" % (test_plan["test_type"], test_plan["device_type"]), **self.defaults) +247 session = SE(plan, "session", name="session", **self.defaults) +248 # One set for each component. +249 for setd in test_plan.sets: +250 self.drop_path = self.drop_path_root.joinpath(setd["name"]) +251 elem = SE(session, "set", name=setd["name"], **self.defaults) +252 SE(SE(elem, "target"), "device", rank="master", alias="DEFAULT") +253 case = SE(elem, "case", name="%s case" % setd["name"], **self.defaults) +254 self.generate_steps(setd, case, test_plan) +255 return plan +
256 +
257 - def generate_steps(self, setd, case, test_plan): +
258 """Generate the test plan <step>s.""" +259 # Flash images. +260 images = self.drop_path_root.joinpath("images") +261 for image_file in setd["image_files"]: +262 flash = SE(case, "flash", images=images.joinpath(image_file.name)) +263 flash.set("target-alias", "DEFAULT") +264 +265 # If tracing enabled: +266 self.generate_execute_asset_steps(case, test_plan) +
267 +
268 - def generate_execute_asset_steps(self, case, test_plan): +
269 """Executes steps for TestAsset""" +270 time_out = test_plan["test_timeout"] +271 step = SE(case, "step", +272 name="Execute asset zip step" , **self.defaults) +273 SE(step, "command").text = "execute-asset" +274 params = SE(step, "params") +275 SE(params, "param", repeat="1") +276 elem = SE(params, "param") +277 elem.set("asset-source", path(r"ATS3Drop" + os.sep + "TestAssets" + os.sep + "TestAsset.zip")) +278 elem = SE(params, "param") +279 elem.set("testcase-ids", test_plan["testasset_caseids"]) +280 SE(params, "param", timeout=time_out) +
281 +282 +
283 - def generate_post_actions(self, test_plan): +
284 """Generate post actions.""" +285 actions = [] +286 for action_type, parameters in test_plan.post_actions: +287 action = E("postAction") +288 SE(action, "type").text = action_type +289 params = SE(action, "params") +290 for name, value in parameters: +291 SE(params, "param", name=name, value=value) +292 actions.append(action) +293 return actions +
294 +
295 - def generate_testasset_zip(self, test_plan, output_file=None): +
296 """Generate TestAsset.zip for the ASTE server""" +297 filename = test_plan["temp_directory"].joinpath(r"TestAsset.zip") +298 if output_file != None: +299 filename = output_file +300 testasset_location = path(test_plan["testasset_location"]) +301 if re.search(r"[.]zip", testasset_location): +302 return testasset_location +303 else: +304 zfile = zipfile.ZipFile(filename, "w", zipfile.ZIP_DEFLATED) +305 try: +306 for file_ in list(testasset_location.walkfiles()): +307 file_mod = file_.replace(testasset_location, "") +308 zfile.write(file_, file_mod.encode('utf-8')) +309 finally: +310 zfile.close() +311 return filename +
312 +
313 - def drop_files(self, test_plan): +
314 """Yield a list of drop files.""" +315 drop_set = set() +316 drop_files = [] +317 zip_file = path(self.generate_testasset_zip(test_plan)) ##check here, I changed the variable name from "zipfile" to "zip_file" +318 for setd in test_plan.sets: +319 drop_path = self.drop_path_root.joinpath(setd["name"]) +320 drop_files = ((drop_path.parent, "images", setd["image_files"]), +321 (drop_path.parent, "TestAssets", [zip_file])) +322 for drop_dir, sub_dir, files in drop_files: +323 for file_path in files: +324 if file_path != None: +325 drop_file = drop_dir.joinpath(sub_dir, file_path.name) +326 drop_file = drop_file.normpath() +327 if drop_file not in drop_set: +328 drop_set.add(drop_file) +329 yield (drop_file, file_path.normpath()) +
330 +331 +
332 - def generate_files(self, test_plan): +
333 """Generate the <files> section.""" +334 files_elem = E("files") +335 for drop_file, _ in self.drop_files(test_plan): +336 SE(files_elem, "file").text = drop_file +337 return files_elem +
338 +
339 -class AsteComponentParser(object): +
340 """ +341 +342 Add information to the test_plan +343 +344 """ +345 +
346 - def __init__(self, config): +
347 self.flash_images = [path(p) for p in config.flash_images] +348 self.build_drive = config.build_drive +349 self.test_timeout = config.test_timeout +
350 +351 +
352 - def insert_test_set(self, test_plan): +
353 """Parse flash images and creates inserts into 'sets'""" +354 +355 test_plan.insert_set(image_files=self.flash_images, +356 test_timeout=list(self.test_timeout)) +
357 +358 +
359 -def create_drop(config): +
360 """Create a test drop.""" +361 _logger.debug("initialize test plan") +362 +363 test_plan = AsteTestPlan(config) +364 parser = AsteComponentParser(config) +365 parser.insert_test_set(test_plan) ######check here if something goes wrong, removed ", path(tsrc)" +366 generator = AsteTestDropGenerator() +367 _logger.info("generating drop file: %s" % config.drop_file) +368 generator.generate(test_plan, output_file=config.drop_file) +
369 +370 +
371 -def main(): +
372 """Main entry point.""" +373 cli = OptionParser(usage="%prog [options] TSRC1 [TSRC2 [TSRC3 ...]]") +374 cli.add_option("--build-drive", help="Build area root drive") +375 cli.add_option("--device-type", help="Device type (e.g. 'PRODUCT')", +376 default="unknown") +377 cli.add_option("--device-hwid", help="Device hwid", +378 default="") +379 cli.add_option("--diamonds-build-url", help="Diamonds build url") +380 cli.add_option("--drop-file", help="Name for the final drop zip file", +381 default="ATS3Drop.zip") +382 cli.add_option("--file-store", help="Destination path for reports.", +383 default="") +384 cli.add_option("--flash-images", help="Paths to the flash image files", +385 default="") +386 cli.add_option("--minimum-flash-images", help="Minimum amount of flash images", +387 default=2) +388 cli.add_option("--report-email", help="Email notification receivers", +389 default="") +390 cli.add_option("--plan-name", help="Name of the test plan", +391 default="plan") +392 cli.add_option("--test-timeout", help="Test execution timeout value (default: %default)", +393 default="60") +394 cli.add_option("--testrun-name", help="Name of the test run", +395 default="run") +396 cli.add_option("--testasset-location", help="Path to the ASTE test assets location", +397 default="") +398 cli.add_option("--testasset-caseids", help="Test case IDs to run", +399 default="") +400 cli.add_option("--software-version", help="Sofwtare version", +401 default="") +402 cli.add_option("--test-type", help="Sofwtare version", +403 default="smoke") +404 cli.add_option("--device-language", help="language name e.g. English", +405 default="English") +406 cli.add_option("--software-release", help="Software release or product name e.g. PPD 52.50", +407 default="") +408 cli.add_option("--verbose", help="Increase output verbosity", +409 action="store_true", default=False) +410 +411 opts, _ = cli.parse_args() +412 +413 if not opts.flash_images: +414 cli.error("no flash image files given") +415 if len(opts.flash_images.split(",")) < int(opts.minimum_flash_images): +416 cli.error("Not enough flash files: %i defined, %i needed" % (len(opts.flash_images.split(",")), int(opts.minimum_flash_images) )) +417 +418 if opts.verbose: +419 _logger.setLevel(logging.DEBUG) +420 logging.basicConfig(level=logging.DEBUG) +421 TEMP_PATH = tempfile.mkdtemp() +422 config = Configuration(opts) +423 create_drop(config) +
424 +425 if __name__ == "__main__": +426 main() +427 +
+
+ + + + + + + + + + + + + + + + + + + + + +
+ + + + +