# -*- encoding: latin-1 -*-
#============================================================================
#Name : dropgenerator.py
#Part of : Helium
#Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
#All rights reserved.
#This component and the accompanying materials are made available
#under the terms of the License "Eclipse Public License v1.0"
#which accompanies this distribution, and is available
#at the URL "http://www.eclipse.org/legal/epl-v10.html".
#
#Initial Contributors:
#Nokia Corporation - initial contribution.
#
#Contributors:
#
#Description:
#===============================================================================
#w0142 => * and ** were used
#F0401 => pylint didn't find "path" module
#C0302 => Too many lines
import codecs
from xml.parsers.expat import ExpatError
from xml.etree import ElementTree as et
from xml.sax.saxutils import quoteattr
from path import path
import logging
import os
import re
import zipfile
import amara
import atsconfigparser
# pylint: disable-msg=W0404
from ntpath import sep as atssep
import ntpath as atspath
import jinja2
_logger = logging.getLogger('ats')
# Shortcuts
E = et.Element
SE = et.SubElement
CTC_PATHS_LIST = []
class Ats3TestDropGenerator(object):
"""
Generate test drop zip file for ATS3.
Generates drom zip files file from a TestPlan instance. The main
responsibility of this class is to serialize the plan into a valid XML
file and build a zip file for the drop.
Creates one <set> for each component's tests.
Stif harness, normal operation
------------------------------
- create logging dir for stif makedir (to C:\logs\TestFramework)
- install data files install (to E:\testing\data)
- install configuration (.cfg) files " (to E:\testing\conf)
- install testmodule (.dll) files " (to C:\sys\bin)
- install engine ini (testframework.ini) " (to C:\testframework)
- execute cases from the engine ini run-cases
- fetch logs fetch-log
Stif harness, SIS package installation
--------------------------------------
- like above but with data and config files replaced by sis files
- install sis to the device install-software
"""
STIF_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testframework"
TEF_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testexecute"
MTF_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testresults"
STIFUNIT_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testframework"
EUNIT_LOG_DIR = r"c:" + os.sep + "Shared" + os.sep + "EUnit" + os.sep + "logs"
#QT_LOG_DIR = r"c:" + os.sep + "private" + os.sep + "Qt" + os.sep + "logs"
QT_LOG_DIR = r"c:" + os.sep + "shared" + os.sep + "EUnit" + os.sep + "logs"
CTC_LOG_DIR = r"c:" + os.sep + "data" + os.sep + "ctc"
def __init__(self):
self.drop_path_root = path("ATS3Drop")
self.drop_path = None
self.defaults = {}
def generate(self, test_plan, output_file, config_file=None):
"""Generate a test drop file."""
xml = self.generate_xml(test_plan)
if config_file:
xmltext = et.tostring(xml.getroot(), "ISO-8859-1")
xmltext = atsconfigparser.converttestxml(config_file, xmltext)
xml = et.ElementTree(et.XML(xmltext))
return self.generate_drop(test_plan, xml, output_file)
def generate_drop(self, test_plan, xml, output_file):
"""Generate test drop zip file."""
zfile = zipfile.ZipFile(output_file, "w", zipfile.ZIP_DEFLATED)
try:
for drop_file, src_file in self.drop_files(test_plan):
_logger.info(" + Adding: %s" % src_file.strip())
try:
zfile.write(src_file.strip(), drop_file.encode('utf-8'))
except OSError, expr:
_logger.error(expr)
doc = amara.parse(et.tostring(xml.getroot(), "ISO-8859-1"))
_logger.debug("XML output: %s\n" % doc.xml(indent=u"yes", encoding="ISO-8859-1"))
zfile.writestr("test.xml", doc.xml(indent="yes", encoding="ISO-8859-1"))
finally:
zfile.close()
return zfile
def generate_xml(self, test_plan):
"""Generate test drop XML."""
self.defaults = {"enabled": "true",
"passrate": "100",
"significant": "false"}
root = E("test")
root.append(et.XML("<name>%(testrun_name)s</name>" % test_plan))
if test_plan["diamonds_build_url"]:
root.append(
et.XML("<buildid>%(diamonds_build_url)s</buildid>" % test_plan))
generate_target(test_plan, root)
root.append(self.generate_plan(test_plan))
for post_action in generate_post_actions(test_plan):
root.append(post_action)
root.append(self.generate_files(test_plan))
etree = et.ElementTree(root)
return etree
def generate_plan(self, test_plan):
"""Generate the test <plan> with multiple <set>s."""
plan = E("plan", name="%s Plan" % test_plan["testrun_name"],
harness=test_plan["harness"], **self.defaults)
session = SE(plan, "session", name="session", harness=test_plan["harness"], **self.defaults)
if not test_plan.custom_dir is None:
insert_custom_file(session, test_plan.custom_dir.joinpath("preset_custom.xml"))
# One set for each component.
for setd in test_plan.sets:
self.drop_path = self.drop_path_root.joinpath(setd["name"])
elem = SE(session, "set", name=setd["name"]+"-"+setd["component_path"], harness=setd["test_harness"], **self.defaults)
SE(SE(elem, "target"), "device", rank="master", alias="DEFAULT_%s" % setd["test_harness"])
if not test_plan.custom_dir is None:
insert_custom_file(elem, test_plan.custom_dir.joinpath("precase_custom.xml"))
case = SE(elem, "case", name="%s case" % setd["name"],
harness=setd["test_harness"], **self.defaults)
self.generate_steps(setd, case, test_plan)
if not test_plan.custom_dir is None:
insert_custom_file(elem, test_plan.custom_dir.joinpath("postcase_custom.xml"))
if not test_plan.custom_dir is None:
insert_custom_file(session, test_plan.custom_dir.joinpath("postset_custom.xml"))
return plan
def generate_steps_logdir(self, setd, case):
"""generates STIF log dir."""
_qt_test_ = self.check_qt_harness(setd)
if _qt_test_:
step = SE(case, "step", name="Create QT log dir", harness=setd["test_harness"], **self.defaults)
else:
step = SE(case, "step", name="Create %s log dir" % setd["test_harness"], harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "makedir"
if setd["test_harness"] == "STIF":
SE(SE(step, "params"), "param", dir=self.STIF_LOG_DIR)
if setd["test_harness"] == "GENERIC":
if self.check_mtf_harness(setd):
SE(SE(step, "params"), "param", dir=self.MTF_LOG_DIR)
else:
SE(SE(step, "params"), "param", dir=self.TEF_LOG_DIR)
elif setd["test_harness"] == "EUNIT":
if _qt_test_:
SE(SE(step, "params"), "param", dir=self.QT_LOG_DIR)
else:
SE(SE(step, "params"), "param", dir=self.EUNIT_LOG_DIR)
elif setd["test_harness"] == "STIFUNIT":
SE(SE(step, "params"), "param", dir=self.STIFUNIT_LOG_DIR)
if setd.has_key("sis_files") and setd["sis_files"]:
setd = dict(setd, src_dst=[]) # Added to pass the Sis tests, if removed - gives KeyError
for sis_file in setd["sis_files"]:
self.generate_install_step(case, "sis", sis_file.name, "sis",
r"c:" + os.sep + "testframework", setd["test_harness"])
else:
if setd["src_dst"] != []:
self.generate_install_step(case, "", "",
"", r"", setd["test_harness"], setd["src_dst"])
else:
# Data file install.
for data_file in setd["data_files"]:
self.generate_install_step(case, "data", data_file.name, "data",
r"e:\testing\data", setd["test_harness"])
# Configuration file install.
for conf_file in setd["config_files"]:
self.generate_install_step(case, "conf", conf_file.name, "conf",
r"e:\testing\conf", setd["test_harness"])
# Test module install.
for test_file in setd["testmodule_files"]:
self.generate_install_step(case, "testmodule", test_file.name,
"testmodules", r"c:\sys\bin", setd["test_harness"])
return setd
def generate_steps_engineini(self, setd, case):
"""Engine ini install ( if one exists )"""
if setd.has_key("sis_files") and setd["sis_files"]:
self.generate_install_step(case, "engine_ini",
setd["engine_ini_file"].name,
"init",
r"c:" + os.sep + "testframework", setd["test_harness"])
else:
if setd["src_dst"] == []:
self.generate_install_step(case, "engine_ini",
setd["engine_ini_file"].name,
"init",
r"c:" + os.sep + "testframework", setd["test_harness"])
def generate_steps_sisfiles(self, setd, case, test_plan):
"""generating steps for sis files"""
for sis_file in setd["sis_files"]:
step = SE(case, "step", name="Install SIS to the device: %s" % \
sis_file.name, harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "install-software"
params = SE(step, "params")
sis_name = path(r"c:" + os.sep + "testframework").joinpath(sis_file.name)
for key, value in (("timeout", test_plan["test_timeout"]),
("overWriteAllowed", "true"),
("upgradeData", "true"),
("downloadAllowed", "false"),
("packageInfoAllowed", "true"),
("untrustedAllowed", "true"),
("ignoreOCSPWarnings", "true"),
("userCapGranted", "true"),
("optionalItemsAllowed", "true"),
("killApp", "true"),
("installDrive", "C"),
("upgradeAllowed", "true"),
("OCSP_Done", "true"),
("sisPackageName", sis_name.normpath())):
SE(params, "param").set(key, value)
def generate_steps_tracestart(self, setd, case, pmds):
"""Tracing steps are added (Trace Start)"""
step = SE(case, "step",
name="Start tracing", harness=setd["test_harness"],
**self.defaults)
SE(step, "command").text = "trace-start"
params = SE(step, "params")
if setd.has_key("trace_activation_files") and setd["trace_activation_files"]:
#find out the group to activate
trace_group = et.parse(setd["trace_activation_files"][0]).getroot().find("Configurations").find("TraceActivation").find("Configuration").get("Name")
SE(params, "param", ta=self.drop_path.joinpath(r"trace_activation", setd["trace_activation_files"][0].name))
SE(params, "param", tgrp=trace_group )
if setd.has_key("pmd_files") and setd["pmd_files"]:
SE(params, "param", pmd=pmds.joinpath(setd["pmd_files"][0].name))
SE(params, "param", log=setd["trace_path"])
SE(params, "param", timeout="60")
elem = SE(params, "param")
elem.set('date-format', "yyyyMMdd")
elem = SE(params, "param")
elem.set('time-format', "HHmmss")
def generate_steps_createstep(self, setd, case, test_plan):
"""generates core steps for a single set"""
if setd["test_harness"] == "STIF" or setd["test_harness"] == "STIFUNIT" or setd["test_harness"] == "GENERIC":
if setd["src_dst"] == []:
# Test case execution. If ini file exists, use that
if setd["engine_ini_file"] != None:
step = SE(case, "step",
name="Execute test: %s" % setd["engine_ini_file"].name,
harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "run-cases"
params = SE(step, "params")
SE(params, "param", filter="*")
SE(params, "param", timeout=test_plan["test_timeout"])
ini_name = setd["engine_ini_file"].name
SE(params, "param", engineini=path(r"c:" + os.sep + "testframework") / ini_name)
# if no inifile, but cfg files defined, use those
elif setd["config_files"]!=[]:
for config_file in setd["config_files"]:
step = SE(case, "step",
name="Execute test: %s" % config_file.name,
harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "run-cases"
params = SE(step, "params")
SE(params, "param", module="TESTSCRIPTER")
elem = SE(params, "param" )
elem.set('testcase-file', path(r"e:\testing\conf") / config_file.name )
SE(params, "param", filter="*")
SE(params, "param", timeout=test_plan["test_timeout"])
# if no ini or cfg files, use dll directly
else:
for testmodule_file in setd["testmodule_files"]:
step = SE(case, "step",
name="Execute test: %s" % testmodule_file.name, harness=setd["test_harness"],
**self.defaults)
SE(step, "command").text = "run-cases"
params = SE(step, "params")
SE(params, "param", module=testmodule_file.name)
SE(params, "param", filter="*")
SE(params, "param", timeout=test_plan["test_timeout"])
elif setd["src_dst"] != []:
self.generate_run_steps(case, setd, test_plan["test_timeout"], test_plan["eunitexerunner_flags"])
elif setd["test_harness"] == "EUNIT":
self.generate_run_steps(case, setd, test_plan["test_timeout"], test_plan["eunitexerunner_flags"])
def generate_steps_tracestop(self, setd, case, pmds):
"""Tracing steps are added (Trace Stop)"""
step = SE(case, "step", name="Stop tracing",
harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "trace-stop"
params = SE(step, "params")
SE(params, "param", timeout="60")
step = SE(case, "step", name="Convert tracing",
harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "trace-convert"
params = SE(step, "params")
if setd.has_key("pmd_files") and setd["pmd_files"]:
SE(params, "param", pmd=pmds.joinpath(setd["pmd_files"][0].name))
SE(params, "param", log=setd["trace_path"])
SE(params, "param", timeout="60")
elem = SE(params, "param")
elem.set('date-format', "yyyyMMdd")
elem = SE(params, "param")
elem.set('time-format', "HHmmss")
def generate_steps_ctcdata(self, setd, case, test_plan):
"""generates steps for installing CTC data"""
global CTC_PATHS_LIST
ctc_helium_path_list = []
step = SE(case, "step", name="Save CTC data", harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "execute"
params = SE(step, "params")
SE(params, "param", parameters="writelocal")
SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
step = SE(case, "step", name="Save CTC data", harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "execute"
params = SE(step, "params")
SE(params, "param", parameters="writefile")
SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
if test_plan["ctc_run_process_params"].strip() != "":
#preparing local-path for CTC step
#getting '39865' as diamonds ID out of 'http://diamonds.nmp.nokia.com/diamonds/builds/39865/'
if test_plan["diamonds_build_url"].rfind("/", 0):
diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 2)[1]
else:
diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 1)[1]
#separating network id and drop number from 10.11.3.2\share#ats\drop2.zip#3
#'drop2' from the other part of the string conjuncted with a # sign
ats_network = r"\\" + test_plan["ctc_run_process_params"].rsplit("#", 2)[0] #network host
temp_drop_id = path(test_plan["ctc_run_process_params"].rsplit("#", 2)[1].rsplit(".", 1)[0]).normpath() #drop ID
if atssep in temp_drop_id:
drop_id = temp_drop_id.rsplit(atssep, 1)[1]
else:
drop_id = temp_drop_id
ats_network_path = atspath.join(ats_network, "ctc_helium" , diamonds_id, drop_id, setd["name"], "ctcdata")
ctc_helium_path_list.append(ats_network_path)
step = SE(case, "step", name="Fetch CTC data for post commands execution", harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "fetch-log"
params = SE(step, "params")
SE(params, "param", delete="false")
elem = SE(params, "param")
elem.set('local-path', ats_network_path)
SE(params, "param", path=path(self.CTC_LOG_DIR).joinpath(r"ctcdata.txt"))
CTC_PATHS_LIST += ctc_helium_path_list #creating list of ctcdata.txt files for runProcess postaction
step = SE(case, "step", name="Fetch and clean CTC data", harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "fetch-log"
params = SE(step, "params")
SE(params, "param", delete="true")
SE(params, "param", path=path(self.CTC_LOG_DIR).joinpath(r"ctcdata.txt"))
def generate_steps_logfetching(self, setd, case):
"""generates steps for fetching log file"""
step = SE(case, "step", name="Fetch test module logs", harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "fetch-log"
params = SE(step, "params")
SE(params, "param", type="text")
SE(params, "param", delete="true")
if setd["test_harness"] == "STIF":
SE(params, "param", path=path(self.STIF_LOG_DIR).joinpath(r"*"))
if setd["test_harness"] == "GENERIC":
if self.check_mtf_harness(setd):
SE(params, "param", path=path(self.MTF_LOG_DIR).joinpath(r"*"))
else:
SE(params, "param", path=path(self.TEF_LOG_DIR).joinpath(r"*"))
elif setd["test_harness"] == "STIFUNIT":
SE(params, "param", path=path(self.STIFUNIT_LOG_DIR).joinpath(r"*"))
elif setd["test_harness"] == "EUNIT":
if self.check_qt_harness(setd):
SE(params, "param", path=path(self.QT_LOG_DIR).joinpath(r"*"))
else:
SE(params, "param", path=path(self.EUNIT_LOG_DIR).joinpath(r"*"))
def get_sorted_images(self, setd):
sorted_images = []
for image_file in setd["image_files"]:
if 'core' in image_file.name:
sorted_images.append(image_file.name)
for image_file in setd["image_files"]:
if 'rofs2' in image_file.name:
sorted_images.append(image_file.name)
for image_file in setd["image_files"]:
if 'rofs3' in image_file.name:
sorted_images.append(image_file.name)
for image_file in setd["image_files"]:
if 'core' not in image_file.name and 'rofs2' not in image_file.name and 'rofs3' not in image_file.name:
sorted_images.append(image_file.name)
if "rofs" in sorted_images[0]:
return setd["image_files"]
return sorted_images
def generate_steps(self, setd, case, test_plan):
"""Generate the test plan <step>s."""
# Flash images.
images = self.drop_path_root.joinpath("images")
pmds = self.drop_path_root.joinpath("pmds")
sorted_images = self.get_sorted_images(setd)
for image_file in sorted_images:
flash = SE(case, "flash", images=images.joinpath(image_file))
flash.set("target-alias", "DEFAULT_%s" % setd["test_harness"])
if not test_plan.custom_dir is None:
insert_custom_file(case, test_plan.custom_dir.joinpath("prestep_custom.xml"))
if setd["ctc_enabled"] == "True":
step = SE(case, "step", name="Create CTC log dir", harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "makedir"
params = SE(step, "params")
SE(params, "param", dir=self.CTC_LOG_DIR)
step = SE(case, "step", name="CTC start", harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "execute"
params = SE(step, "params")
SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
# STIF log dir.
setd = self.generate_steps_logdir(setd, case)
# Engine ini install ( if one exists )
if setd["engine_ini_file"] != None:
self.generate_steps_engineini(setd, case)
#If sis files
if setd.has_key("sis_files") and setd["sis_files"]:
self.generate_steps_sisfiles(setd, case, test_plan)
# If tracing enabled, Start Tracing:
if setd.has_key("trace_path") and setd["trace_path"] != "":
self.generate_steps_tracestart(setd, case, pmds)
#core steps of a step
if not test_plan.custom_dir is None:
insert_custom_file(case, test_plan.custom_dir.joinpath("prerun_custom.xml"))
self.generate_steps_createstep(setd, case, test_plan)
if not test_plan.custom_dir is None:
insert_custom_file(case, test_plan.custom_dir.joinpath("postrun_custom.xml"))
# If tracing enabled, Stop Tracing
if setd.has_key("trace_path") and setd["trace_path"] != "":
self.generate_steps_tracestop(setd, case, pmds)
#install CTC data
if setd["ctc_enabled"] == "True":
self.generate_steps_ctcdata(setd, case, test_plan)
# Log file fetching.
self.generate_steps_logfetching(setd, case)
if not test_plan.custom_dir is None:
insert_custom_file(case, test_plan.custom_dir.joinpath("poststep_custom.xml"))
def generate_runsteps_tef(self, setd, case, src_dst, time_out):
"""generates runsteps for tef"""
for file1 in src_dst:
if 'testscript' in file1[2]:
filename = file1[1]
filename = filename[file1[1].rfind(os.sep)+1:]
harness = "testexecute.exe"
if file1[2] == "testscript:mtf":
harness = "testframework.exe"
step = SE(case, "step",
name="Execute test: %s" % filename, harness=setd["test_harness"],
**self.defaults)
SE(step, "command").text = "execute"
params = SE(step, "params")
SE(params, "param", file=harness)
SE(params, "param", parameters=file1[1])
if file1[2] == "testscript:mtf":
SE(params, "param", {'result-file': self.MTF_LOG_DIR + os.sep + filename.replace('.script', '.htm')})
else:
SE(params, "param", {'result-file': self.TEF_LOG_DIR + os.sep + filename.replace('.script', '.htm')})
SE(params, "param", timeout=time_out)
if file1[2] == "testscript:mtf":
SE(params, "param", parser="MTFResultParser")
else:
SE(params, "param", parser="TEFTestResultParser")
if file1[2] == 'testmodule:rtest':
filename = file1[1]
filename = filename[file1[1].rfind(os.sep)+1:]
step = SE(case, "step",
name="Execute test: %s" % filename, harness=setd["test_harness"],
**self.defaults)
SE(step, "command").text = "execute"
params = SE(step, "params")
SE(params, "param", file=file1[1])
SE(params, "param", {'result-file': self.TEF_LOG_DIR + os.sep + filename.replace(filename.split(".")[-1], 'htm')})
SE(params, "param", timeout=time_out)
SE(params, "param", parser="RTestResultParser")
def generate_runsteps_stif(self, setd, case, src_dst, time_out):
"""generates runsteps for stif"""
ini = cfg = dll = has_tf_ini = False
ini_file = None
cfg_files = dll_files = []
for tf_ini in src_dst:
if "testframework.ini" in tf_ini[1].lower():
has_tf_ini = True
for file1 in src_dst:
if "testframework.ini" in file1[1].lower() and file1[2] == "engine_ini" and has_tf_ini:
ini = True
ini_file = file1
elif file1[2] == "engine_ini" and not has_tf_ini:
pipe_ini = open(file1[0], 'r')
if "[engine_defaults]" in str(pipe_ini.readlines()).lower():
ini = True
ini_file = file1
elif file1[2] == "conf":
if not ini:
cfg = True
cfg_files.append(file1)
elif file1[2] == "testmodule":
if not cfg and not ini:
dll = True
dll_files.append(file1)
if ini:
filename = ini_file[1]
filename = filename[ini_file[1].rfind(os.sep)+1:]
step = SE(case, "step",
name="Execute test: %s" % filename,
harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "run-cases"
params = SE(step, "params")
SE(params, "param", filter="*")
SE(params, "param", timeout=time_out)
SE(params, "param", engineini=ini_file[1])
elif cfg:
for conf_file in cfg_files:
if ".dll" in conf_file[1].lower():
continue
filename = conf_file[1]
filename = filename[conf_file[1].rfind(os.sep)+1:]
step = SE(case, "step",
name="Execute test: %s" % filename,
harness=setd["test_harness"], **self.defaults)
SE(step, "command").text = "run-cases"
params = SE(step, "params")
SE(params, "param", module="TESTSCRIPTER")
elem = SE(params, "param" )
elem.set('testcase-file', conf_file[1] )
SE(params, "param", filter="*")
SE(params, "param", timeout=time_out)
elif dll:
for dll_file in dll_files:
filename = dll_file[1]
filename = filename[dll_file[1].rfind(os.sep)+1:]
step = SE(case, "step",
name="Execute test: %s" % filename, harness=setd["test_harness"],
**self.defaults)
SE(step, "command").text = "run-cases"
params = SE(step, "params")
SE(params, "param", module=filename)
SE(params, "param", filter="*")
SE(params, "param", timeout=time_out)
def generate_runsteps_eunit(self, setd, case, src_dst, time_out, eunit_flags):
"""generates runsteps for eunit"""
for sdst in src_dst:
if "." in sdst[1]:
fileextension = sdst[1].rsplit(".")[1].lower()
filename = sdst[1]
filename = filename[filename.rfind(os.sep)+1:]
if fileextension == "dll" or fileextension == "exe":
re_dll = re.compile(r'[.]+%s' % fileextension, re.IGNORECASE)
no_dll = re_dll.sub('', filename)
no_dll_xml = ''.join([no_dll, u'_log.xml'])
#for EUnit or other executables
if sdst[2] == "testmodule":
eunit_exe = "EUNITEXERUNNER.EXE"
if re_dll.search(filename):
step = SE(case, "step", name = "Execute test: %s" % filename, harness=setd["test_harness"],
**self.defaults)
SE(step, "command").text = "execute"
params = SE(step, "params")
SE(params, "param", file=path(r"z:" + os.sep + "sys" + os.sep + "bin") / eunit_exe)
elem = SE(params, "param")
elem.set('result-file', path(self.EUNIT_LOG_DIR) / no_dll_xml)
SE(params, "param", parameters="%s /F %s /l xml %s" % (eunit_flags, no_dll, filename))
SE(params, "param", timeout=time_out)
#for QtTest.lib executables
elif sdst[2] == "testmodule:qt":
step = SE(case, "step", name = "Execute Qt-test: %s" % filename, harness=setd["test_harness"],
**self.defaults)
SE(step, "command").text = "execute"
params = SE(step, "params")
SE(params, "param", file=path(sdst[1]))
SE(params, "param", parameters=r"-lightxml -o %s\%s" % (path(self.QT_LOG_DIR), no_dll_xml))
elem = SE(params, "param")
elem.set('result-file', path(self.QT_LOG_DIR) / no_dll_xml)
SE(params, "param", parser="QTestResultParser")
elem = SE(params, "param")
elem.set('delete-result',"true")
SE(params, "param", async="false")
SE(params, "param", timeout=time_out)
def generate_run_steps(self, case, setd, time_out, eunit_flags):
"""Generates run-steps"""
src_dst = setd["src_dst"]
if setd["test_harness"] == "STIF":
self.generate_runsteps_stif(setd, case, src_dst, time_out)
if setd["test_harness"] == "GENERIC":
self.generate_runsteps_tef(setd, case, src_dst, time_out)
if setd["test_harness"] == "STIFUNIT":
self.generate_runsteps_stif(setd, case, src_dst, time_out)
if setd["test_harness"] == "EUNIT":
self.generate_runsteps_eunit(setd, case, src_dst, time_out, eunit_flags)
def generate_install_step(self, case, step_type, filename, src_dir,
dst_dir, case_harness, src_dst=None):
"""Generate install <step>."""
if src_dst == None or src_dst == []:
src_dst = []
step = SE(case, "step", name="Install %s: %s" % (step_type, filename),
harness=case_harness, **self.defaults)
SE(step, "command").text = "install"
params = SE(step, "params")
SE(params, "param", src=self.drop_path.joinpath(src_dir, filename))
SE(params, "param", dst=path(dst_dir).joinpath(filename))
else:
for sdst in src_dst:
dst = sdst[1]
type_ = sdst[2]
if "testmodule" in type_ or ".dll" in dst:
src_dir = dst.replace(":","")
src_dir = path(src_dir[:src_dir.rfind(os.sep)])
step_type = type_
filename = dst[dst.rfind(os.sep)+1:]
step = SE(case, "step", name="Install %s: %s" % (step_type, filename),
harness=case_harness, **self.defaults)
SE(step, "command").text = "install"
params = SE(step, "params")
SE(params, "param", src=self.drop_path.joinpath(src_dir, filename))
SE(params, "param", dst=path(dst))
for sdst in src_dst:
dst = sdst[1]
type_ = sdst[2]
if "testmodule" not in type_ and ".dll" not in dst:
src_dir = dst.replace(":","")
src_dir = path(src_dir[:src_dir.rfind(os.sep)])
step_type = type_
filename = dst[dst.rfind(os.sep)+1:]
step = SE(case, "step", name="Install %s: %s" % (step_type, filename),
harness=case_harness, **self.defaults)
SE(step, "command").text = "install"
params = SE(step, "params")
SE(params, "param", src=self.drop_path.joinpath(src_dir, filename))
SE(params, "param", dst=path(dst))
def drop_files(self, test_plan):
"""Yield a list of drop files."""
drop_set = set()
drop_files = []
pkg_files = []
for setd in test_plan.sets:
drop_path = self.drop_path_root.joinpath(setd["name"])
if setd.has_key("sis_files") and setd["sis_files"]:
if setd.has_key("pmd_files") and setd["pmd_files"]:
drop_files = ((drop_path.parent, "images", setd["image_files"]),
(drop_path.parent, "pmds", setd["pmd_files"]),
(drop_path, "sis", setd["sis_files"]),
(drop_path, "init", [setd["engine_ini_file"]]),
(drop_path, "trace_init", setd["trace_activation_files"]))
else:
drop_files = ((drop_path.parent, "images", setd["image_files"]),
(drop_path, "sis", setd["sis_files"]),
(drop_path, "init", [setd["engine_ini_file"]]))
elif setd["src_dst"] == []:
if setd.has_key("pmd_files") and setd["pmd_files"]:
drop_files = ((drop_path.parent, "images", setd["image_files"]),
(drop_path.parent, "pmds", setd["pmd_files"]),
(drop_path, "data", setd["data_files"]),
(drop_path, "conf", setd["config_files"]),
(drop_path, "testmodules", setd["testmodule_files"]),
(drop_path, "init", [setd["engine_ini_file"]]),
(drop_path, "trace_init", setd["trace_activation_files"]))
else:
drop_files = ((drop_path.parent, "images", setd["image_files"]),
(drop_path, "data", setd["data_files"]),
(drop_path, "conf", setd["config_files"]),
(drop_path, "testmodules", setd["testmodule_files"]),
(drop_path, "init", [setd["engine_ini_file"]]))
elif setd["src_dst"] != []:
for x in setd["src_dst"]:
src = x[0]
dst = x[1]
dst2 = dst.replace(":","")
pkg_files.append((drop_path, dst2, src))
if setd.has_key("pmd_files") and setd["pmd_files"]:
drop_files = ((drop_path.parent, "images", setd["image_files"]),
(drop_path.parent, "pmds", setd["pmd_files"]),
(drop_path, "trace_init", setd["trace_activation_files"]))
else:
drop_files = ((drop_path.parent, "images", setd["image_files"]),)
for drop_dir, sub_dir, files in drop_files:
for file_path in files:
if file_path != None:
drop_file = drop_dir.joinpath(sub_dir, file_path.name)
drop_file = drop_file.normpath()
if drop_file not in drop_set:
drop_set.add(drop_file)
yield (drop_file, file_path.normpath())
for drop_dir, sub_dir, files in pkg_files:
drop_file = drop_dir.joinpath(sub_dir.replace('\\', os.sep))
drop_file = drop_file.normpath()
file_path = path(files)
if drop_file not in drop_set:
drop_set.add(drop_file)
yield (drop_file, file_path.normpath())
def generate_files(self, test_plan):
"""Generate the <files> section."""
files_elem = E("files")
for drop_file, _ in self.drop_files(test_plan):
SE(files_elem, "file").text = drop_file
return files_elem
def check_mtf_harness(self, _setd_):
for _srcdst_ in _setd_['src_dst']:
if _srcdst_[2] == "testscript:mtf":
return True
return False
def check_qt_harness(self, _setd_):
_setd_ = _setd_
is_qt_test = False
if _setd_.has_key("sis_files"):
_dict_key_ = "sis_files"
else:
_dict_key_ = "src_dst"
for _srcdst_ in _setd_[_dict_key_]:
if "testmodule:qt" == _srcdst_[2]:
is_qt_test = True
return is_qt_test
def generate_target(test_plan, root):
"""Generate targets"""
harness = test_plan["harness"]
if harness == "MULTI_HARNESS":
input_targets(test_plan, root, ["STIF", "EUNIT"])
elif harness == "STIF":
input_targets(test_plan, root, ["STIF"])
elif harness == "EUNIT":
input_targets(test_plan, root, ["EUNIT"])
elif harness == "STIFUNIT":
input_targets(test_plan, root, ["STIFUNIT"])
elif harness == "GENERIC":
input_targets(test_plan, root, ["GENERIC"])
def input_targets(test_plan, root, harness_type):
"""Append target(s) into the XML"""
target = E("target")
for har in harness_type:
device = SE(target, "device", rank="none", alias="DEFAULT_%s" % har)
SE(device, "property", name="HARNESS", value=har)
SE(device, "property", name="TYPE", value=test_plan["device_type"])
if test_plan["device_hwid"] != "":
SE(device, "property", name="HWID", value=test_plan["device_hwid"])
if test_plan["trace_enabled"] != "":
if test_plan["trace_enabled"].lower() == "true":
SE(device, "property", name="TRACE_ENABLED", value=test_plan["trace_enabled"])
root.append(target)
def insert_custom_file(xmltree, filename):
"""
Inserts into the given XML tree the given customization file
Broken input XML inserts a comment to the XML tree
"""
try:
custom_action_file = codecs.open(filename, "r", "iso-8859-15")
loop = ''
cust = unicode(custom_action_file.read(1))
try:
# try to read the file and addcharacter by character until the
# elementtree is happy and then reset the loop and continue until the file is
# completely processed. Known issue: file ending in comment will cause a warning.
while cust:
if loop != '' :
# if we have something left from the previous try
cust = loop + cust
# _logger.debug("what is cust \n %s \n" % cust)
try:
xmltree.append(et.XML(cust.encode("ISO-8859-15")))
except ExpatError, err:
# _logger.debug("Error %s in XML when prosessing file %s \n Line and column refer to section:\n%s\n" % ( err, filename, loop))
loop = cust
else:
# clear the loop variable
loop = ''
cust = unicode(custom_action_file.read(1))
except Exception, err:
_logger.error("Error %s in XML when prosessing %s\n" % ( err, filename))
xmltree.append(et.Comment("Error in XML file when prosessing %s\n" % ( filename)))
if loop != '' :
# we should have used all the input and cleared loop variable
_logger.warning("Issues in customization file %s in XML when prosessing issue %s \n Line and column refer to section:\n%s\n" % ( filename, err, loop))
custom_action_file.close()
except IOError, err:
# _logger.debug("This is for debugging only. Do not treat this as anything else. Anything is OK... The data: %s when prosessing %s\n" % (err, filename))
pass
else:
_logger.info("Included file %s" % ( filename))
def generate_post_actions(test_plan):
"""Generate post actions."""
actions = []
if not test_plan.custom_dir is None:
insert_custom_file(actions, test_plan.custom_dir.joinpath("prepostaction.xml"))
for action_type, parameters in test_plan.post_actions:
action = E("postAction")
SE(action, "type").text = action_type
params = SE(action, "params")
for name, value in parameters:
SE(params, "param", name=name, value=value)
actions.append(action)
if not test_plan.custom_dir is None:
insert_custom_file(actions, test_plan.custom_dir.joinpath("postpostaction.xml"))
return actions
class Ats3TemplateTestDropGenerator(Ats3TestDropGenerator):
STIF_LOG_DIR = r"c:\logs\testframework"
TEF_LOG_DIR = r"c:\logs\testexecute"
MTF_LOG_DIR = r"c:\logs\testresults"
STIFUNIT_LOG_DIR = r"c:\logs\testframework"
EUNIT_LOG_DIR = r"c:\Shared\EUnit\logs"
#QT_LOG_DIR = r"c:\private\Qt\logs"
QT_LOG_DIR = r"c:\shared\EUnit\logs"
CTC_LOG_DIR = r"c:\data\ctc"
def stif_init_file(self, src_dst):
ini = cfg = dll = has_tf_ini = False
ini_file = None
cfg_files = dll_files = []
for tf_ini in src_dst:
if "testframework.ini" in tf_ini[1].lower():
has_tf_ini = True
for file1 in src_dst:
if "testframework.ini" in file1[1].lower() and file1[2] == "engine_ini" and has_tf_ini:
ini_file = file1
elif file1[2] == "engine_ini" and not has_tf_ini:
pipe_ini = open(file1[0], 'r')
if "[engine_defaults]" in str(pipe_ini.readlines()).lower():
ini_file = file1
return ini_file
def ctcnetworkpath(self, setd, test_plan):
#preparing local-path for CTC step
#getting '39865' as diamonds ID out of 'http://diamonds.nmp.nokia.com/diamonds/builds/39865/'
if test_plan["diamonds_build_url"].rfind("/", 0):
diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 2)[1]
else:
diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 1)[1]
#separating network id and drop number from 10.11.3.2\share#ats\drop2.zip#3
#'drop2' from the other part of the string conjuncted with a # sign
ats_network = r"\\" + test_plan["ctc_run_process_params"].rsplit("#", 2)[0] #network host
temp_drop_id = path(test_plan["ctc_run_process_params"].rsplit("#", 2)[1].rsplit(".", 1)[0]).normpath() #drop ID
if atssep in temp_drop_id:
drop_id = temp_drop_id.rsplit(atssep, 1)[1]
else:
drop_id = temp_drop_id
return atspath.join(ats_network, "ctc_helium" , diamonds_id, drop_id, setd["name"], "ctcdata")
def getlogdir(self, setd):
if setd["test_harness"] == "STIF":
return self.STIF_LOG_DIR
elif setd["test_harness"] == "STIFUNIT":
return self.STIFUNIT_LOG_DIR
elif setd["test_harness"] == "GENERIC":
if self.check_mtf_harness(setd):
return self.MTF_LOG_DIR
else:
return self.TEF_LOG_DIR
elif setd["test_harness"] == "EUNIT":
if self.check_qt_harness(setd):
return self.QT_LOG_DIR
else:
return self.EUNIT_LOG_DIR
def generate_xml(self, test_plan):
loader = jinja2.ChoiceLoader([jinja2.FileSystemLoader(os.path.join(os.environ['HELIUM_HOME'], 'tools/common/python/lib/ats3/templates')), jinja2.FileSystemLoader(test_plan.custom_dir)])
env = jinja2.Environment(loader=loader)
template = env.from_string(open(os.path.join(os.environ['HELIUM_HOME'], 'tools/common/python/lib/ats3/ats4_template.xml')).read())
xmltext = template.render(test_plan=test_plan, os=os, atspath=atspath, atsself=self).encode('ISO-8859-1')
return et.ElementTree(et.XML(xmltext))