Package ats3 :: Module aste
[hide private]
[frames] | no frames]

Source Code for Module ats3.aste

  1  # -*- encoding: latin-1 -*- 
  2   
  3  #============================================================================  
  4  #Name        : aste.py  
  5  #Part of     : Helium  
  6   
  7  #Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). 
  8  #All rights reserved. 
  9  #This component and the accompanying materials are made available 
 10  #under the terms of the License "Eclipse Public License v1.0" 
 11  #which accompanies this distribution, and is available 
 12  #at the URL "http://www.eclipse.org/legal/epl-v10.html". 
 13  # 
 14  #Initial Contributors: 
 15  #Nokia Corporation - initial contribution. 
 16  # 
 17  #Contributors: 
 18  # 
 19  #Description: 
 20  #=============================================================================== 
 21   
 22  """ASTE test drop generation.""" 
 23   
 24  from optparse import OptionParser 
 25  from textwrap import dedent 
 26  from xml.etree import ElementTree as et 
 27  from xml.sax.saxutils import quoteattr 
 28  import logging 
 29  import os 
 30  import re 
 31  import sys 
 32  import tempfile 
 33  import zipfile 
 34   
 35  from path import path 
 36  import amara 
 37   
 38  _logger = logging.getLogger('ats3') 
 39   
 40  # Shortcuts 
 41  E = et.Element 
 42  SE = et.SubElement 
 43  TEMP_PATH = None 
 44   
45 -class Configuration(object):
46 """ 47 ASTE drop generation configuration. 48 """ 49
50 - def __init__(self, opts):
51 """ 52 Initialize from optparse configuration options. 53 """ 54 self._opts = opts 55 # Customize some attributes from how optparse leaves them. 56 self.build_drive = path(self._opts.build_drive) 57 self.file_store = path(self._opts.file_store) 58 self.flash_images = self.split_paths(self._opts.flash_images)
59
60 - def split_paths(self, arg, delim=","):
61 """ 62 Split the string by delim, removing extra whitespace. 63 """ 64 return [path(part.strip()) 65 for part in arg.split(delim) if part.strip()]
66
67 - def __getattr__(self, attr):
68 return getattr(self._opts, attr)
69
70 - def __str__(self):
71 dump = "Configuration:\n" 72 seen = set() 73 for key, value in vars(self).items(): 74 if not key.startswith("_"): 75 dump += "\t%s = %s\n" % (key, value) 76 seen.add(key) 77 for key, value in vars(self._opts).items(): 78 if key not in seen: 79 dump += "\t%s = %s\n" % (key, value) 80 seen.add(key) 81 return dump
82 83
84 -class AsteTestPlan(object):
85 """ 86 Tells ASTE server what to test and how. 87 88 The ASTE test plan from which the test.xml file can be written. The test 89 plan requires TestAsset(s) to perform the tests 90 """ 91 92 EMAIL_SUBJECT = (u"ATS3 report for §RUN_NAME§ §RUN_START_DATE§ " 93 u"§RUN_START_TIME§") 94 REPORT_PATH = u"§RUN_NAME§\§RUN_START_DATE§_§RUN_START_TIME§" 95
96 - def __init__(self, config):
97 self.diamonds_build_url = config.diamonds_build_url 98 self.testrun_name = config.testrun_name 99 self.harness = "ASTE" 100 self.device_type = config.device_type 101 self.test_type = config.test_type 102 self.device_hwid = config.device_hwid 103 self.plan_name = config.plan_name 104 self.report_email = config.report_email 105 self.file_store = config.file_store 106 self.test_timeout = config.test_timeout 107 self.testasset_location = config.testasset_location 108 self.testasset_caseids = config.testasset_caseids 109 self.software_version = config.software_version 110 self.device_language = config.device_language 111 self.software_release = config.software_release 112 self.sets = [] 113 self.temp_directory = path(tempfile.mkdtemp())
114
115 - def insert_set(self, image_files=None, test_timeout=None, ):
116 """ 117 Insert a test set into the test plan. 118 """ 119 if image_files is None: 120 image_files = [] 121 if test_timeout is None: 122 test_timeout = [] 123 test_harness = self.harness 124 setd = dict(name="set%d" % len(self.sets), image_files=image_files) 125 setd = dict(setd, test_timeout=test_timeout, test_harness=test_harness) 126 127 self.sets.append(setd)
128 129 @property
130 - def post_actions(self):
131 """ATS3 and ASTE post actions.""" 132 actions = [] 133 report_path = self.file_store.joinpath(self.REPORT_PATH) 134 email_action = ("SendEmailAction", 135 (("subject", self.EMAIL_SUBJECT), 136 ("type", "ATS3_REPORT"), 137 ("send-files", "true"), 138 ("to", self.report_email))) 139 ats3_report = ("FileStoreAction", 140 (("to-folder", report_path.joinpath("ATS3_REPORT")), 141 ("report-type", "ATS_REPORT"), 142 ("date-format", "yyyyMMdd"), 143 ("time-format", "HHmmss"))) 144 aste_report = ("FileStoreAction", 145 (("to-folder", report_path.joinpath("ASTE_REPORT")), 146 ("report-type", "ASTE_REPORT"), 147 ("run-log", "true"), 148 ("date-format", "yyyyMMdd"), 149 ("time-format", "HHmmss"))) 150 151 diamonds_action = ("DiamondsAction", ()) 152 if self.report_email: 153 actions.append(email_action) 154 if self.file_store: 155 actions.append(ats3_report) 156 actions.append(aste_report) 157 if self.diamonds_build_url: 158 actions.append(diamonds_action) 159 return actions
160
161 - def __getitem__(self, key):
162 return self.__dict__[key]
163 164 165
166 -class AsteTestDropGenerator(object):
167 """ 168 Generate test drop zip file for ATS3. 169 170 Generates drop zip files file from a TestPlan instance. The main 171 responsibility of this class is to serialize the plan into a valid XML 172 file and build a zip file for the drop. 173 174 Creates one <set> for ASTE tests. 175 176 ASTE harness, normal operation 177 ------------------------------ 178 179 - create logging dir for aste makedir (to C:\logs\TestFramework) 180 - execute asset from the testasset.zip execute-asset 181 - fetch logs fetch-log 182 183 """ 184 185 ASTE_LOG_DIR = r"c:\logs\testframework" 186
187 - def __init__(self):
188 self.drop_path_root = path("ATS3Drop") 189 self.drop_path = None 190 self.defaults = {}
191
192 - def generate(self, test_plan, output_file):
193 """Generate a test drop file.""" 194 xml = self.generate_xml(test_plan) 195 return self.generate_drop(test_plan, xml, output_file)
196
197 - def generate_drop(self, test_plan, xml, output_file):
198 """Generate test drop zip file.""" 199 zfile = zipfile.ZipFile(output_file, "w", zipfile.ZIP_DEFLATED) 200 try: 201 for drop_file, src_file in self.drop_files(test_plan): 202 _logger.info(" + Adding: %s" % src_file.strip()) 203 zfile.write(src_file.strip(), drop_file.encode('utf-8')) 204 doc = amara.parse(et.tostring(xml.getroot())) 205 _logger.debug("XML output: %s" % doc.xml(indent=u"yes")) 206 zfile.writestr("test.xml", doc.xml(indent="yes")) 207 finally: 208 zfile.close() 209 return zfile
210
211 - def generate_xml(self, test_plan):
212 """Generate test drop XML.""" 213 self.defaults = {"enabled": "true", 214 "passrate": "100", 215 "significant": "false", 216 "harness": "%s" % test_plan["harness"]} 217 root = E("test") 218 root.append(et.XML("<name>%(testrun_name)s</name>" % test_plan)) 219 if test_plan["diamonds_build_url"]: 220 root.append( 221 et.XML("<buildid>%(diamonds_build_url)s</buildid>" % test_plan)) 222 self.generate_target(test_plan, root) 223 root.append(self.generate_plan(test_plan)) 224 for post_action in self.generate_post_actions(test_plan): 225 root.append(post_action) 226 root.append(self.generate_files(test_plan)) 227 etree = et.ElementTree(root) 228 return etree
229
230 - def generate_target(self, test_plan, root):
231 """Append target(s) into the XML""" 232 target = E("target") 233 device = SE(target, "device", rank="none", alias="DEFAULT") 234 SE(device, "property", name="harness", value=test_plan["harness"]) 235 SE(device, "property", name="hardware", value=test_plan["device_type"]) 236 SE(device, "setting", name="softwareVersion", value=test_plan["software_version"]) 237 SE(device, "setting", name="softwareRelease", value=test_plan["software_release"]) 238 SE(device, "setting", name="language", value=test_plan["device_language"]) 239 240 if test_plan["device_hwid"] != "": 241 SE(device, "property", name="HWID", value=test_plan["device_hwid"]) 242 root.append(target)
243
244 - def generate_plan(self, test_plan):
245 """Generate the test <plan> with multiple <set>s.""" 246 plan = E("plan", name="Plan %s %s" % (test_plan["test_type"], test_plan["device_type"]), **self.defaults) 247 session = SE(plan, "session", name="session", **self.defaults) 248 # One set for each component. 249 for setd in test_plan.sets: 250 self.drop_path = self.drop_path_root.joinpath(setd["name"]) 251 elem = SE(session, "set", name=setd["name"], **self.defaults) 252 SE(SE(elem, "target"), "device", rank="master", alias="DEFAULT") 253 case = SE(elem, "case", name="%s case" % setd["name"], **self.defaults) 254 self.generate_steps(setd, case, test_plan) 255 return plan
256
257 - def generate_steps(self, setd, case, test_plan):
258 """Generate the test plan <step>s.""" 259 # Flash images. 260 images = self.drop_path_root.joinpath("images") 261 for image_file in setd["image_files"]: 262 flash = SE(case, "flash", images=images.joinpath(image_file.name)) 263 flash.set("target-alias", "DEFAULT") 264 265 # If tracing enabled: 266 self.generate_execute_asset_steps(case, test_plan)
267
268 - def generate_execute_asset_steps(self, case, test_plan):
269 """Executes steps for TestAsset""" 270 time_out = test_plan["test_timeout"] 271 step = SE(case, "step", 272 name="Execute asset zip step" , **self.defaults) 273 SE(step, "command").text = "execute-asset" 274 params = SE(step, "params") 275 SE(params, "param", repeat="1") 276 elem = SE(params, "param") 277 elem.set("asset-source", path(r"ATS3Drop" + os.sep + "TestAssets" + os.sep + "TestAsset.zip")) 278 elem = SE(params, "param") 279 elem.set("testcase-ids", test_plan["testasset_caseids"]) 280 SE(params, "param", timeout=time_out)
281 282
283 - def generate_post_actions(self, test_plan):
284 """Generate post actions.""" 285 actions = [] 286 for action_type, parameters in test_plan.post_actions: 287 action = E("postAction") 288 SE(action, "type").text = action_type 289 params = SE(action, "params") 290 for name, value in parameters: 291 SE(params, "param", name=name, value=value) 292 actions.append(action) 293 return actions
294
295 - def generate_testasset_zip(self, test_plan, output_file=None):
296 """Generate TestAsset.zip for the ASTE server""" 297 filename = test_plan["temp_directory"].joinpath(r"TestAsset.zip") 298 if output_file != None: 299 filename = output_file 300 testasset_location = path(test_plan["testasset_location"]) 301 if re.search(r"[.]zip", testasset_location): 302 return testasset_location 303 else: 304 zfile = zipfile.ZipFile(filename, "w", zipfile.ZIP_DEFLATED) 305 try: 306 for file_ in list(testasset_location.walkfiles()): 307 file_mod = file_.replace(testasset_location, "") 308 zfile.write(file_, file_mod.encode('utf-8')) 309 finally: 310 zfile.close() 311 return filename
312
313 - def drop_files(self, test_plan):
314 """Yield a list of drop files.""" 315 drop_set = set() 316 drop_files = [] 317 zip_file = path(self.generate_testasset_zip(test_plan)) ##check here, I changed the variable name from "zipfile" to "zip_file" 318 for setd in test_plan.sets: 319 drop_path = self.drop_path_root.joinpath(setd["name"]) 320 drop_files = ((drop_path.parent, "images", setd["image_files"]), 321 (drop_path.parent, "TestAssets", [zip_file])) 322 for drop_dir, sub_dir, files in drop_files: 323 for file_path in files: 324 if file_path != None: 325 drop_file = drop_dir.joinpath(sub_dir, file_path.name) 326 drop_file = drop_file.normpath() 327 if drop_file not in drop_set: 328 drop_set.add(drop_file) 329 yield (drop_file, file_path.normpath())
330 331
332 - def generate_files(self, test_plan):
333 """Generate the <files> section.""" 334 files_elem = E("files") 335 for drop_file, _ in self.drop_files(test_plan): 336 SE(files_elem, "file").text = drop_file 337 return files_elem
338
339 -class AsteComponentParser(object):
340 """ 341 342 Add information to the test_plan 343 344 """ 345
346 - def __init__(self, config):
347 self.flash_images = [path(p) for p in config.flash_images] 348 self.build_drive = config.build_drive 349 self.test_timeout = config.test_timeout
350 351
352 - def insert_test_set(self, test_plan):
353 """Parse flash images and creates inserts into 'sets'""" 354 355 test_plan.insert_set(image_files=self.flash_images, 356 test_timeout=list(self.test_timeout))
357 358
359 -def create_drop(config):
360 """Create a test drop.""" 361 _logger.debug("initialize test plan") 362 363 test_plan = AsteTestPlan(config) 364 parser = AsteComponentParser(config) 365 parser.insert_test_set(test_plan) ######check here if something goes wrong, removed ", path(tsrc)" 366 generator = AsteTestDropGenerator() 367 _logger.info("generating drop file: %s" % config.drop_file) 368 generator.generate(test_plan, output_file=config.drop_file)
369 370
371 -def main():
372 """Main entry point.""" 373 cli = OptionParser(usage="%prog [options] TSRC1 [TSRC2 [TSRC3 ...]]") 374 cli.add_option("--build-drive", help="Build area root drive") 375 cli.add_option("--device-type", help="Device type (e.g. 'PRODUCT')", 376 default="unknown") 377 cli.add_option("--device-hwid", help="Device hwid", 378 default="") 379 cli.add_option("--diamonds-build-url", help="Diamonds build url") 380 cli.add_option("--drop-file", help="Name for the final drop zip file", 381 default="ATS3Drop.zip") 382 cli.add_option("--file-store", help="Destination path for reports.", 383 default="") 384 cli.add_option("--flash-images", help="Paths to the flash image files", 385 default="") 386 cli.add_option("--minimum-flash-images", help="Minimum amount of flash images", 387 default=2) 388 cli.add_option("--report-email", help="Email notification receivers", 389 default="") 390 cli.add_option("--plan-name", help="Name of the test plan", 391 default="plan") 392 cli.add_option("--test-timeout", help="Test execution timeout value (default: %default)", 393 default="60") 394 cli.add_option("--testrun-name", help="Name of the test run", 395 default="run") 396 cli.add_option("--testasset-location", help="Path to the ASTE test assets location", 397 default="") 398 cli.add_option("--testasset-caseids", help="Test case IDs to run", 399 default="") 400 cli.add_option("--software-version", help="Sofwtare version", 401 default="") 402 cli.add_option("--test-type", help="Sofwtare version", 403 default="smoke") 404 cli.add_option("--device-language", help="language name e.g. English", 405 default="English") 406 cli.add_option("--software-release", help="Software release or product name e.g. PPD 52.50", 407 default="") 408 cli.add_option("--verbose", help="Increase output verbosity", 409 action="store_true", default=False) 410 411 opts, _ = cli.parse_args() 412 413 if not opts.flash_images: 414 cli.error("no flash image files given") 415 if len(opts.flash_images.split(",")) < int(opts.minimum_flash_images): 416 cli.error("Not enough flash files: %i defined, %i needed" % (len(opts.flash_images.split(",")), int(opts.minimum_flash_images) )) 417 418 if opts.verbose: 419 _logger.setLevel(logging.DEBUG) 420 logging.basicConfig(level=logging.DEBUG) 421 TEMP_PATH = tempfile.mkdtemp() 422 config = Configuration(opts) 423 create_drop(config)
424 425 if __name__ == "__main__": 426 main() 427