Package ats3 :: Module dropgenerator
[hide private]
[frames] | no frames]

Source Code for Module ats3.dropgenerator

  1  # -*- encoding: latin-1 -*- 
  2   
  3  #============================================================================  
  4  #Name        : dropgenerator.py  
  5  #Part of     : Helium  
  6   
  7  #Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). 
  8  #All rights reserved. 
  9  #This component and the accompanying materials are made available 
 10  #under the terms of the License "Eclipse Public License v1.0" 
 11  #which accompanies this distribution, and is available 
 12  #at the URL "http://www.eclipse.org/legal/epl-v10.html". 
 13  # 
 14  #Initial Contributors: 
 15  #Nokia Corporation - initial contribution. 
 16  # 
 17  #Contributors: 
 18  # 
 19  #Description: 
 20  #=============================================================================== 
 21   
 22   
 23  #w0142 => * and ** were used 
 24  #F0401 => pylint didn't find "path" module 
 25  #C0302 => Too many lines 
 26   
 27  import codecs 
 28  from  xml.parsers.expat import ExpatError 
 29   
 30  from xml.etree import ElementTree as et 
 31  from xml.sax.saxutils import quoteattr 
 32  from path import path 
 33  import logging 
 34  import os 
 35  import re 
 36  import zipfile 
 37  import amara 
 38  import atsconfigparser 
 39   
 40  _logger = logging.getLogger('ats3') 
 41   
 42  # Shortcuts 
 43  E = et.Element 
 44  SE = et.SubElement 
 45   
 46   
47 -class Ats3TestDropGenerator(object):
48 """ 49 Generate test drop zip file for ATS3. 50 51 Generates drom zip files file from a TestPlan instance. The main 52 responsibility of this class is to serialize the plan into a valid XML 53 file and build a zip file for the drop. 54 55 Creates one <set> for each component's tests. 56 57 Stif harness, normal operation 58 ------------------------------ 59 60 - create logging dir for stif makedir (to C:\logs\TestFramework) 61 - install data files install (to E:\testing\data) 62 - install configuration (.cfg) files " (to E:\testing\conf) 63 - install testmodule (.dll) files " (to C:\sys\bin) 64 - install engine ini (testframework.ini) " (to C:\testframework) 65 - execute cases from the engine ini run-cases 66 - fetch logs fetch-log 67 68 Stif harness, SIS package installation 69 -------------------------------------- 70 71 - like above but with data and config files replaced by sis files 72 - install sis to the device install-software 73 74 """ 75 76 STIF_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testframework" 77 STIFUNIT_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testframework" 78 EUNIT_LOG_DIR = r"c:" + os.sep + "Shared" + os.sep + "EUnit" + os.sep + "logs" 79 CTC_LOG_DIR = r"c:" + os.sep + "data" + os.sep + "ctc" 80
81 - def __init__(self):
82 self.drop_path_root = path("ATS3Drop") 83 self.drop_path = None 84 self.defaults = {}
85
86 - def generate(self, test_plan, output_file, config_file=None):
87 """Generate a test drop file.""" 88 xml = self.generate_xml(test_plan) 89 90 if config_file: 91 xmltext = et.tostring(xml.getroot(), "ISO-8859-1") 92 xmltext = atsconfigparser.converttestxml(config_file, xmltext) 93 xml = et.ElementTree(et.XML(xmltext)) 94 95 return self.generate_drop(test_plan, xml, output_file)
96
97 - def generate_drop(self, test_plan, xml, output_file):
98 """Generate test drop zip file.""" 99 zfile = zipfile.ZipFile(output_file, "w", zipfile.ZIP_DEFLATED) 100 try: 101 for drop_file, src_file in self.drop_files(test_plan): 102 _logger.info(" + Adding: %s" % src_file.strip()) 103 try: 104 zfile.write(src_file.strip(), drop_file.encode('utf-8')) 105 except WindowsError, expr: 106 _logger.error(expr) 107 doc = amara.parse(et.tostring(xml.getroot(), "ISO-8859-1")) 108 _logger.debug("XML output: %s\n" % doc.xml(indent=u"yes", encoding="ISO-8859-1")) 109 zfile.writestr("test.xml", doc.xml(indent="yes", encoding="ISO-8859-1")) 110 finally: 111 zfile.close() 112 113 return zfile
114
115 - def generate_xml(self, test_plan):
116 """Generate test drop XML.""" 117 self.defaults = {"enabled": "true", 118 "passrate": "100", 119 "significant": "false"} 120 root = E("test") 121 root.append(et.XML("<name>%(testrun_name)s</name>" % test_plan)) 122 if test_plan["diamonds_build_url"]: 123 root.append( 124 et.XML("<buildid>%(diamonds_build_url)s</buildid>" % test_plan)) 125 generate_target(test_plan, root) 126 root.append(self.generate_plan(test_plan)) 127 for post_action in generate_post_actions(test_plan): 128 root.append(post_action) 129 root.append(self.generate_files(test_plan)) 130 etree = et.ElementTree(root) 131 return etree
132
133 - def generate_plan(self, test_plan):
134 """Generate the test <plan> with multiple <set>s.""" 135 plan = E("plan", name="%s Plan" % test_plan["testrun_name"], 136 harness=test_plan["harness"], **self.defaults) 137 session = SE(plan, "session", name="session", harness=test_plan["harness"], **self.defaults) 138 139 if not test_plan.custom_dir is None: 140 insert_custom_file(session, test_plan.custom_dir.joinpath("preset_custom.xml")) 141 142 # One set for each component. 143 for setd in test_plan.sets: 144 self.drop_path = self.drop_path_root.joinpath(setd["name"]) 145 elem = SE(session, "set", name=setd["name"]+"-"+setd["component_path"], harness=setd["test_harness"], **self.defaults) 146 SE(SE(elem, "target"), "device", rank="master", alias="DEFAULT_%s" % setd["test_harness"]) 147 148 if not test_plan.custom_dir is None: 149 insert_custom_file(elem, test_plan.custom_dir.joinpath("precase_custom.xml")) 150 151 case = SE(elem, "case", name="%s case" % setd["name"], 152 harness=setd["test_harness"], **self.defaults) 153 self.generate_steps(setd, case, test_plan) 154 if not test_plan.custom_dir is None: 155 insert_custom_file(elem, test_plan.custom_dir.joinpath("postcase_custom.xml")) 156 157 if not test_plan.custom_dir is None: 158 insert_custom_file(session, test_plan.custom_dir.joinpath("postset_custom.xml")) 159 160 return plan
161
162 - def generate_steps_logdir(self, setd, case):
163 """generates STIF log dir.""" 164 step = SE(case, "step", name="Create %s log dir" % setd["test_harness"], harness=setd["test_harness"], **self.defaults) 165 SE(step, "command").text = "makedir" 166 if setd["test_harness"] == "STIF": 167 SE(SE(step, "params"), "param", dir=self.STIF_LOG_DIR) 168 elif setd["test_harness"] == "EUNIT": 169 SE(SE(step, "params"), "param", dir=self.EUNIT_LOG_DIR) 170 elif setd["test_harness"] == "STIFUNIT": 171 SE(SE(step, "params"), "param", dir=self.STIFUNIT_LOG_DIR) 172 173 if setd.has_key("sis_files") and setd["sis_files"]: 174 setd = dict(setd, src_dst=[]) # Added to pass the Sis tests, if removed - gives KeyError 175 for sis_file in setd["sis_files"]: 176 self.generate_install_step(case, "sis", sis_file.name, "sis", 177 r"c:" + os.sep + "testframework", setd["test_harness"]) 178 else: 179 if setd["src_dst"] != []: 180 self.generate_install_step(case, "", "", 181 "", r"", setd["test_harness"], setd["src_dst"]) 182 else: 183 # Data file install. 184 for data_file in setd["data_files"]: 185 self.generate_install_step(case, "data", data_file.name, "data", 186 r"e:\testing\data", setd["test_harness"]) 187 188 # Configuration file install. 189 for conf_file in setd["config_files"]: 190 self.generate_install_step(case, "conf", conf_file.name, "conf", 191 r"e:\testing\conf", setd["test_harness"]) 192 193 # Test module install. 194 for test_file in setd["testmodule_files"]: 195 self.generate_install_step(case, "testmodule", test_file.name, 196 "testmodules", r"c:\sys\bin", setd["test_harness"]) 197 return setd
198
199 - def generate_steps_engineini(self, setd, case):
200 """Engine ini install ( if one exists )""" 201 if setd.has_key("sis_files") and setd["sis_files"]: 202 self.generate_install_step(case, "engine_ini", 203 setd["engine_ini_file"].name, 204 "init", 205 r"c:" + os.sep + "testframework", setd["test_harness"]) 206 else: 207 if setd["src_dst"] == []: 208 self.generate_install_step(case, "engine_ini", 209 setd["engine_ini_file"].name, 210 "init", 211 r"c:" + os.sep + "testframework", setd["test_harness"])
212
213 - def generate_steps_sisfiles(self, setd, case, test_plan):
214 """generating steps for sis files""" 215 for sis_file in setd["sis_files"]: 216 step = SE(case, "step", name="Install SIS to the device: %s" % \ 217 sis_file.name, harness=setd["test_harness"], **self.defaults) 218 SE(step, "command").text = "install-software" 219 params = SE(step, "params") 220 sis_name = path(r"c:" + os.sep + "testframework").joinpath(sis_file.name) 221 for key, value in (("timeout", test_plan["test_timeout"]), 222 ("overWriteAllowed", "true"), 223 ("upgradeData", "true"), 224 ("downloadAllowed", "false"), 225 ("packageInfoAllowed", "true"), 226 ("untrustedAllowed", "true"), 227 ("ignoreOCSPWarnings", "true"), 228 ("userCapGranted", "true"), 229 ("optionalItemsAllowed", "true"), 230 ("killApp", "true"), 231 ("installDrive", "C"), 232 ("upgradeAllowed", "true"), 233 ("OCSP_Done", "true"), 234 ("sisPackageName", sis_name.normpath())): 235 SE(params, "param").set(key, value)
236
237 - def generate_steps_tracestart(self, setd, case, pmds):
238 """Tracing steps are added (Trace Start)""" 239 step = SE(case, "step", 240 name="Start tracing", harness=setd["test_harness"], 241 **self.defaults) 242 SE(step, "command").text = "trace-start" 243 params = SE(step, "params") 244 if setd.has_key("trace_activation_files") and setd["trace_activation_files"]: 245 #find out the group to activate 246 trace_group = et.parse(setd["trace_activation_files"][0]).getroot().find("Configurations").find("TraceActivation").find("Configuration").get("Name") 247 SE(params, "param", ta=self.drop_path.joinpath(r"trace_activation", setd["trace_activation_files"][0].name)) 248 SE(params, "param", tgrp=trace_group ) 249 if setd.has_key("pmd_files") and setd["pmd_files"]: 250 SE(params, "param", pmd=pmds.joinpath(setd["pmd_files"][0].name)) 251 SE(params, "param", log=setd["trace_path"]) 252 SE(params, "param", timeout="60") 253 elem = SE(params, "param") 254 elem.set('date-format', "yyyyMMdd") 255 elem = SE(params, "param") 256 elem.set('time-format', "HHmmss")
257
258 - def generate_steps_createstep(self, setd, case, test_plan):
259 """generates core steps for a single set""" 260 if setd["test_harness"] == "STIF" or setd["test_harness"] == "STIFUNIT": 261 if setd["src_dst"] == []: 262 # Test case execution. If ini file exists, use that 263 if setd["engine_ini_file"] != None: 264 step = SE(case, "step", 265 name="Execute test: %s" % setd["engine_ini_file"].name, 266 harness=setd["test_harness"], **self.defaults) 267 SE(step, "command").text = "run-cases" 268 params = SE(step, "params") 269 SE(params, "param", filter="*") 270 SE(params, "param", timeout=test_plan["test_timeout"]) 271 ini_name = setd["engine_ini_file"].name 272 SE(params, "param", engineini=path(r"c:" + os.sep + "testframework") / ini_name) 273 274 # if no inifile, but cfg files defined, use those 275 elif setd["config_files"]!=[]: 276 for config_file in setd["config_files"]: 277 step = SE(case, "step", 278 name="Execute test: %s" % config_file.name, 279 harness=setd["test_harness"], **self.defaults) 280 SE(step, "command").text = "run-cases" 281 params = SE(step, "params") 282 SE(params, "param", module="TESTSCRIPTER") 283 elem = SE(params, "param" ) 284 elem.set('testcase-file', path(r"e:\testing\conf") / config_file.name ) 285 SE(params, "param", filter="*") 286 SE(params, "param", timeout=test_plan["test_timeout"]) 287 288 # if no ini or cfg files, use dll directly 289 else: 290 for testmodule_file in setd["testmodule_files"]: 291 step = SE(case, "step", 292 name="Execute test: %s" % testmodule_file.name, harness=setd["test_harness"], 293 **self.defaults) 294 SE(step, "command").text = "run-cases" 295 params = SE(step, "params") 296 SE(params, "param", module=testmodule_file.name) 297 SE(params, "param", filter="*") 298 SE(params, "param", timeout=test_plan["test_timeout"]) 299 elif setd["src_dst"] != []: 300 self.generate_run_steps(case, setd, test_plan["test_timeout"], test_plan["eunitexerunner_flags"]) 301 elif setd["test_harness"] == "EUNIT": 302 self.generate_run_steps(case, setd, test_plan["test_timeout"], test_plan["eunitexerunner_flags"])
303
304 - def generate_steps_tracestop(self, setd, case, pmds):
305 """Tracing steps are added (Trace Stop)""" 306 step = SE(case, "step", name="Stop tracing", 307 harness=setd["test_harness"], **self.defaults) 308 SE(step, "command").text = "trace-stop" 309 params = SE(step, "params") 310 SE(params, "param", timeout="60") 311 312 step = SE(case, "step", name="Convert tracing", 313 harness=setd["test_harness"], **self.defaults) 314 SE(step, "command").text = "trace-convert" 315 params = SE(step, "params") 316 if setd.has_key("pmd_files") and setd["pmd_files"]: 317 SE(params, "param", pmd=pmds.joinpath(setd["pmd_files"][0].name)) 318 SE(params, "param", log=setd["trace_path"]) 319 SE(params, "param", timeout="60") 320 elem = SE(params, "param") 321 elem.set('date-format', "yyyyMMdd") 322 elem = SE(params, "param") 323 elem.set('time-format', "HHmmss")
324
325 - def generate_steps_ctcdata(self, setd, case, test_plan):
326 """generates steps for installing CTC data""" 327 step = SE(case, "step", name="Save CTC data", harness=setd["test_harness"], **self.defaults) 328 SE(step, "command").text = "execute" 329 params = SE(step, "params") 330 SE(params, "param", parameters="writelocal") 331 SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe")) 332 step = SE(case, "step", name="Save CTC data", harness=setd["test_harness"], **self.defaults) 333 SE(step, "command").text = "execute" 334 params = SE(step, "params") 335 SE(params, "param", parameters="writefile") 336 SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe")) 337 step = SE(case, "step", name="Fetch and clean CTC data", harness=setd["test_harness"], **self.defaults) 338 SE(step, "command").text = "fetch-log" 339 params = SE(step, "params") 340 SE(params, "param", delete="true") 341 SE(params, "param", path=path(self.CTC_LOG_DIR).joinpath(r"ctcdata.txt"))
342
343 - def generate_steps_logfetching(self, setd, case):
344 """generates steps for fetching log file""" 345 step = SE(case, "step", name="Fetch test module logs", harness=setd["test_harness"], **self.defaults) 346 SE(step, "command").text = "fetch-log" 347 params = SE(step, "params") 348 SE(params, "param", type="text") 349 SE(params, "param", delete="true") 350 if setd["test_harness"] == "STIF": 351 SE(params, "param", path=path(self.STIF_LOG_DIR).joinpath(r"*")) 352 elif setd["test_harness"] == "STIFUNIT": 353 SE(params, "param", path=path(self.STIFUNIT_LOG_DIR).joinpath(r"*")) 354 elif setd["test_harness"] == "EUNIT": 355 SE(params, "param", path=path(self.EUNIT_LOG_DIR).joinpath(r"*"))
356
357 - def generate_steps(self, setd, case, test_plan):
358 """Generate the test plan <step>s.""" 359 # Flash images. 360 images = self.drop_path_root.joinpath("images") 361 pmds = self.drop_path_root.joinpath("pmds") 362 for image_file in setd["image_files"]: 363 flash = SE(case, "flash", images=images.joinpath(image_file.name)) 364 flash.set("target-alias", "DEFAULT_%s" % setd["test_harness"]) 365 366 if not test_plan.custom_dir is None: 367 insert_custom_file(case, test_plan.custom_dir.joinpath("prestep_custom.xml")) 368 369 if setd["ctc_enabled"] == "True": 370 step = SE(case, "step", name="Create CTC log dir", harness=setd["test_harness"], **self.defaults) 371 SE(step, "command").text = "makedir" 372 params = SE(step, "params") 373 SE(params, "param", dir=self.CTC_LOG_DIR) 374 step = SE(case, "step", name="CTC start", harness=setd["test_harness"], **self.defaults) 375 SE(step, "command").text = "execute" 376 params = SE(step, "params") 377 SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe")) 378 379 # STIF log dir. 380 setd = self.generate_steps_logdir(setd, case) 381 382 # Engine ini install ( if one exists ) 383 if setd["engine_ini_file"] != None: 384 self.generate_steps_engineini(setd, case) 385 386 #If sis files 387 if setd.has_key("sis_files") and setd["sis_files"]: 388 self.generate_steps_sisfiles(setd, case, test_plan) 389 390 # If tracing enabled, Start Tracing: 391 if setd.has_key("trace_path") and setd["trace_path"] != "": 392 self.generate_steps_tracestart(setd, case, pmds) 393 394 #core steps of a step 395 396 if not test_plan.custom_dir is None: 397 insert_custom_file(case, test_plan.custom_dir.joinpath("prerun_custom.xml")) 398 self.generate_steps_createstep(setd, case, test_plan) 399 400 if not test_plan.custom_dir is None: 401 insert_custom_file(case, test_plan.custom_dir.joinpath("postrun_custom.xml")) 402 403 # If tracing enabled, Stop Tracing 404 if setd.has_key("trace_path") and setd["trace_path"] != "": 405 self.generate_steps_tracestop(setd, case, pmds) 406 407 #install CTC data 408 if setd["ctc_enabled"] == "True": 409 self.generate_steps_ctcdata(setd, case, test_plan) 410 411 # Log file fetching. 412 self.generate_steps_logfetching(setd, case) 413 414 if not test_plan.custom_dir is None: 415 insert_custom_file(case, test_plan.custom_dir.joinpath("poststep_custom.xml"))
416 417
418 - def generate_runsteps_stif(self, setd, case, src_dst, time_out):
419 """generates runsteps for stif""" 420 ini = cfg = dll = has_tf_ini = False 421 ini_file = None 422 cfg_files = dll_files = [] 423 424 for tf_ini in src_dst: 425 if "testframework.ini" in tf_ini[1].lower(): 426 has_tf_ini = True 427 428 for file1 in src_dst: 429 430 if "testframework.ini" in file1[1].lower() and file1[2] == "engine_ini" and has_tf_ini: 431 ini = True 432 ini_file = file1 433 434 elif file1[2] == "engine_ini" and not has_tf_ini: 435 pipe_ini = open(file1[0], 'r') 436 if "[engine_defaults]" in str(pipe_ini.readlines()).lower(): 437 ini = True 438 ini_file = file1 439 elif file1[2] == "conf": 440 if not ini: 441 cfg = True 442 cfg_files.append(file1) 443 elif file1[2] == "testmodule": 444 if not cfg and not ini: 445 dll = True 446 dll_files.append(file1) 447 if ini: 448 filename = ini_file[1] 449 filename = filename[ini_file[1].rfind(os.sep)+1:] 450 step = SE(case, "step", 451 name="Execute test: %s" % filename, 452 harness=setd["test_harness"], **self.defaults) 453 SE(step, "command").text = "run-cases" 454 params = SE(step, "params") 455 SE(params, "param", filter="*") 456 SE(params, "param", timeout=time_out) 457 SE(params, "param", engineini=ini_file[1]) 458 elif cfg: 459 for conf_file in cfg_files: 460 filename = conf_file[1] 461 filename = filename[conf_file[1].rfind(os.sep)+1:] 462 step = SE(case, "step", 463 name="Execute test: %s" % filename, 464 harness=setd["test_harness"], **self.defaults) 465 SE(step, "command").text = "run-cases" 466 params = SE(step, "params") 467 SE(params, "param", module="TESTSCRIPTER") 468 elem = SE(params, "param" ) 469 elem.set('testcase-file', conf_file[1] ) 470 SE(params, "param", filter="*") 471 SE(params, "param", timeout=time_out) 472 elif dll: 473 for dll_file in dll_files: 474 filename = dll_file[1] 475 filename = filename[dll_file[1].rfind(os.sep)+1:] 476 step = SE(case, "step", 477 name="Execute test: %s" % filename, harness=setd["test_harness"], 478 **self.defaults) 479 SE(step, "command").text = "run-cases" 480 params = SE(step, "params") 481 SE(params, "param", module=filename) 482 SE(params, "param", filter="*") 483 SE(params, "param", timeout=time_out)
484
485 - def generate_runsteps_eunit(self, setd, case, src_dst, time_out, eunit_flags):
486 """generates runsteps for eunit""" 487 for sdst in src_dst: 488 if sdst[2] == "testmodule": 489 if "." in sdst[1]: 490 fileextension = sdst[1].rsplit(".")[1].lower() 491 filename = sdst[1] 492 filename = filename[filename.rfind(os.sep)+1:] 493 eunit_exe = "EUNITEXERUNNER.EXE" 494 if fileextension == "dll" or fileextension == "exe": 495 re_dll = re.compile(r'[.]+%s' % fileextension, re.IGNORECASE) 496 no_dll = re_dll.sub('', filename) 497 no_dll_xml = ''.join([no_dll, u'_log.xml']) 498 if re_dll.search(filename): 499 step = SE(case, "step", name = "Execute test: %s" % filename, harness=setd["test_harness"], 500 **self.defaults) 501 SE(step, "command").text = "execute" 502 params = SE(step, "params") 503 SE(params, "param", file=path(r"z:" + os.sep + "sys" + os.sep + "bin") / eunit_exe) 504 elem = SE(params, "param") 505 elem.set('result-file', path(self.EUNIT_LOG_DIR) / no_dll_xml) 506 SE(params, "param", parameters="%s /F %s /l xml %s" % (eunit_flags, no_dll, filename)) 507 SE(params, "param", timeout=time_out)
508
509 - def generate_run_steps(self, case, setd, time_out, eunit_flags):
510 """Generates run-steps""" 511 src_dst = setd["src_dst"] 512 513 if setd["test_harness"] == "STIF": 514 self.generate_runsteps_stif(setd, case, src_dst, time_out) 515 516 if setd["test_harness"] == "STIFUNIT": 517 self.generate_runsteps_stif(setd, case, src_dst, time_out) 518 519 if setd["test_harness"] == "EUNIT": 520 self.generate_runsteps_eunit(setd, case, src_dst, time_out, eunit_flags)
521
522 - def generate_install_step(self, case, step_type, filename, src_dir, 523 dst_dir, case_harness, src_dst=None):
524 """Generate install <step>.""" 525 if src_dst == None or src_dst == []: 526 src_dst = [] 527 step = SE(case, "step", name="Install %s: %s" % (step_type, filename), 528 harness=case_harness, **self.defaults) 529 SE(step, "command").text = "install" 530 params = SE(step, "params") 531 SE(params, "param", src=self.drop_path.joinpath(src_dir, filename)) 532 SE(params, "param", dst=path(dst_dir).joinpath(filename)) 533 else: 534 for sdst in src_dst: 535 _, dst, type_ = sdst 536 if "testmodule" in type_ or ".dll" in dst: 537 src_dir = dst.replace(":","") 538 src_dir = path(src_dir[:src_dir.rfind(os.sep)]) 539 step_type = type_ 540 filename = dst[dst.rfind(os.sep)+1:] 541 step = SE(case, "step", name="Install %s: %s" % (step_type, filename), 542 harness=case_harness, **self.defaults) 543 SE(step, "command").text = "install" 544 params = SE(step, "params") 545 SE(params, "param", src=self.drop_path.joinpath(src_dir, filename)) 546 SE(params, "param", dst=path(dst)) 547 for sdst in src_dst: 548 _, dst, type_ = sdst 549 if "testmodule" not in type_ and ".dll" not in dst: 550 src_dir = dst.replace(":","") 551 src_dir = path(src_dir[:src_dir.rfind(os.sep)]) 552 step_type = type_ 553 filename = dst[dst.rfind(os.sep)+1:] 554 step = SE(case, "step", name="Install %s: %s" % (step_type, filename), 555 harness=case_harness, **self.defaults) 556 SE(step, "command").text = "install" 557 params = SE(step, "params") 558 SE(params, "param", src=self.drop_path.joinpath(src_dir, filename)) 559 SE(params, "param", dst=path(dst))
560
561 - def drop_files(self, test_plan):
562 """Yield a list of drop files.""" 563 drop_set = set() 564 drop_files = [] 565 pkg_files = [] 566 for setd in test_plan.sets: 567 drop_path = self.drop_path_root.joinpath(setd["name"]) 568 if setd.has_key("sis_files") and setd["sis_files"]: 569 if setd.has_key("pmd_files") and setd["pmd_files"]: 570 drop_files = ((drop_path.parent, "images", setd["image_files"]), 571 (drop_path.parent, "pmds", setd["pmd_files"]), 572 (drop_path, "sis", setd["sis_files"]), 573 (drop_path, "init", [setd["engine_ini_file"]]), 574 (drop_path, "trace_init", setd["trace_activation_files"])) 575 else: 576 drop_files = ((drop_path.parent, "images", setd["image_files"]), 577 (drop_path, "sis", setd["sis_files"]), 578 (drop_path, "init", [setd["engine_ini_file"]])) 579 elif setd["src_dst"] == []: 580 if setd.has_key("pmd_files") and setd["pmd_files"]: 581 drop_files = ((drop_path.parent, "images", setd["image_files"]), 582 (drop_path.parent, "pmds", setd["pmd_files"]), 583 (drop_path, "data", setd["data_files"]), 584 (drop_path, "conf", setd["config_files"]), 585 (drop_path, "testmodules", setd["testmodule_files"]), 586 (drop_path, "init", [setd["engine_ini_file"]]), 587 (drop_path, "trace_init", setd["trace_activation_files"])) 588 else: 589 drop_files = ((drop_path.parent, "images", setd["image_files"]), 590 (drop_path, "data", setd["data_files"]), 591 (drop_path, "conf", setd["config_files"]), 592 (drop_path, "testmodules", setd["testmodule_files"]), 593 (drop_path, "init", [setd["engine_ini_file"]])) 594 elif setd["src_dst"] != []: 595 for src, dst, _ in setd["src_dst"]: 596 dst2 = dst.replace(":","") 597 pkg_files.append((drop_path, dst2, src)) 598 if setd.has_key("pmd_files") and setd["pmd_files"]: 599 drop_files = ((drop_path.parent, "images", setd["image_files"]), 600 (drop_path.parent, "pmds", setd["pmd_files"]), 601 (drop_path, "trace_init", setd["trace_activation_files"])) 602 else: 603 drop_files = ((drop_path.parent, "images", setd["image_files"]),) 604 for drop_dir, sub_dir, files in drop_files: 605 for file_path in files: 606 if file_path != None: 607 drop_file = drop_dir.joinpath(sub_dir, file_path.name) 608 drop_file = drop_file.normpath() 609 if drop_file not in drop_set: 610 drop_set.add(drop_file) 611 yield (drop_file, file_path.normpath()) 612 for drop_dir, sub_dir, files in pkg_files: 613 drop_file = drop_dir.joinpath(sub_dir) 614 drop_file = drop_file.normpath() 615 file_path = path(files) 616 if drop_file not in drop_set: 617 drop_set.add(drop_file) 618 yield (drop_file, file_path.normpath())
619
620 - def generate_files(self, test_plan):
621 """Generate the <files> section.""" 622 files_elem = E("files") 623 for drop_file, _ in self.drop_files(test_plan): 624 SE(files_elem, "file").text = drop_file 625 return files_elem
626
627 -def generate_target(test_plan, root):
628 """Generate targets""" 629 harness = test_plan["harness"] 630 if harness == "MULTI_HARNESS": 631 input_targets(test_plan, root, ["STIF", "EUNIT"]) 632 elif harness == "STIF": 633 input_targets(test_plan, root, ["STIF"]) 634 elif harness == "EUNIT": 635 input_targets(test_plan, root, ["EUNIT"]) 636 elif harness == "STIFUNIT": 637 input_targets(test_plan, root, ["STIFUNIT"])
638
639 -def input_targets(test_plan, root, harness_type):
640 """Append target(s) into the XML""" 641 target = E("target") 642 for har in harness_type: 643 device = SE(target, "device", rank="none", alias="DEFAULT_%s" % har) 644 SE(device, "property", name="HARNESS", value=har) 645 SE(device, "property", name="TYPE", value=test_plan["device_type"]) 646 if test_plan["device_hwid"] != "": 647 SE(device, "property", name="HWID", value=test_plan["device_hwid"]) 648 if test_plan["trace_enabled"] != "": 649 if test_plan["trace_enabled"].lower() == "true": 650 SE(device, "property", name="TRACE_ENABLED", value=test_plan["trace_enabled"]) 651 root.append(target)
652 653
654 -def insert_custom_file(xmltree, filename):
655 """ 656 Inserts into the given XML tree the given customization file 657 Broken input XML inserts a comment to the XML tree 658 """ 659 try: 660 custom_action_file = codecs.open(filename, "r", "iso-8859-15") 661 loop = '' 662 cust = unicode(custom_action_file.read(1)) 663 try: 664 # try to read the file and addcharacter by character until the 665 # elementtree is happy and then reset the loop and continue until the file is 666 # completely processed. Known issue: file ending in comment will cause a warning. 667 while cust: 668 if loop != '' : 669 # if we have something left from the previous try 670 cust = loop + cust 671 # _logger.debug("what is cust \n %s \n" % cust) 672 try: 673 xmltree.append(et.XML(cust.encode("ISO-8859-15"))) 674 except ExpatError, err: 675 # _logger.debug("Error %s in XML when prosessing file %s \n Line and column refer to section:\n%s\n" % ( err, filename, loop)) 676 loop = cust 677 else: 678 # clear the loop variable 679 loop = '' 680 cust = unicode(custom_action_file.read(1)) 681 except Exception, err: 682 _logger.error("Error %s in XML when prosessing %s\n" % ( err, filename)) 683 xmltree.append(et.Comment("Error in XML file when prosessing %s\n" % ( filename))) 684 685 if loop != '' : 686 # we should have used all the input and cleared loop variable 687 _logger.warning("Issues in customization file %s in XML when prosessing issue %s \n Line and column refer to section:\n%s\n" % ( filename, err, loop)) 688 689 custom_action_file.close() 690 except IOError, err: 691 # _logger.debug("This is for debugging only. Do not treat this as anything else. Anything is OK... The data: %s when prosessing %s\n" % (err, filename)) 692 pass 693 else: 694 _logger.info("Included file %s" % ( filename))
695 696
697 -def generate_post_actions(test_plan):
698 """Generate post actions.""" 699 actions = [] 700 701 if not test_plan.custom_dir is None: 702 insert_custom_file(actions, test_plan.custom_dir.joinpath("prepostaction.xml")) 703 704 for action_type, parameters in test_plan.post_actions: 705 action = E("postAction") 706 SE(action, "type").text = action_type 707 params = SE(action, "params") 708 for name, value in parameters: 709 SE(params, "param", name=name, value=value) 710 actions.append(action) 711 712 if not test_plan.custom_dir is None: 713 insert_custom_file(actions, test_plan.custom_dir.joinpath("postpostaction.xml")) 714 715 return actions
716