author Alex Gilkes <>
Wed, 28 Oct 2009 14:39:48 +0000
changeset 1 be27ed110b50
child 179 d8ac696cc51f
permissions -rw-r--r--
Bringing in Helium, imaker and cmaker

# -*- encoding: latin-1 -*-

#Name        : 
#Part of     : Helium 

#Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
#All rights reserved.
#This component and the accompanying materials are made available
#under the terms of the License "Eclipse Public License v1.0"
#which accompanies this distribution, and is available
#at the URL "".
#Initial Contributors:
#Nokia Corporation - initial contribution.

#w0142 => * and ** were used
#F0401 => pylint didn't find "path" module
#C0302 => Too many lines

import codecs
from  xml.parsers.expat import ExpatError

from xml.etree import ElementTree as et
from xml.sax.saxutils import quoteattr
from path import path
import logging
import os
import re
import zipfile
import amara
import atsconfigparser

# pylint: disable-msg=W0404
from ntpath import sep as atssep
import ntpath as atspath
from ntpath import sep as sossep

_logger = logging.getLogger('ats')

# Shortcuts
E = et.Element
SE = et.SubElement


class Ats3TestDropGenerator(object):
    Generate test drop zip file for ATS3.

    Generates drom zip files file from a TestPlan instance. The main
    responsibility of this class is to serialize the plan into a valid XML
    file and build a zip file for the drop.
    Creates one <set> for each component's tests.

    Stif harness, normal operation
    - create logging dir for stif             makedir (to C:\logs\TestFramework)
    - install data files                      install (to E:\testing\data)
    - install configuration (.cfg) files      "       (to E:\testing\conf)
    - install testmodule (.dll) files         "       (to C:\sys\bin)
    - install engine ini (testframework.ini)  "       (to C:\testframework)
    - execute cases from the engine ini       run-cases
    - fetch logs                              fetch-log

    Stif harness, SIS package installation
    - like above but with data and config files replaced by sis files
    - install sis to the device               install-software


    STIF_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testframework"
    TEF_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testexecute"
    MTF_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testresults"
    STIFUNIT_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testframework"
    EUNIT_LOG_DIR = r"c:" + os.sep + "Shared" + os.sep + "EUnit" + os.sep + "logs"
    #QT_LOG_DIR = r"c:" + os.sep + "private" + os.sep + "Qt" + os.sep + "logs"
    QT_LOG_DIR = r"c:" + os.sep + "shared" + os.sep + "EUnit" + os.sep + "logs"
    CTC_LOG_DIR = r"c:" + os.sep + "data" + os.sep + "ctc"

    def __init__(self):
        self.drop_path_root = path("ATS3Drop")
        self.drop_path = None
        self.defaults = {}

    def generate(self, test_plan, output_file, config_file=None):
        """Generate a test drop file."""
        xml = self.generate_xml(test_plan)
        if config_file:
            xmltext = et.tostring(xml.getroot(), "ISO-8859-1")
            xmltext = atsconfigparser.converttestxml(config_file, xmltext)
            xml = et.ElementTree(et.XML(xmltext))
        return self.generate_drop(test_plan, xml, output_file)

    def generate_drop(self, test_plan, xml, output_file):
        """Generate test drop zip file."""
        zfile = zipfile.ZipFile(output_file, "w", zipfile.ZIP_DEFLATED)
            for drop_file, src_file in self.drop_files(test_plan):
      "   + Adding: %s" % src_file.strip())
                    zfile.write(src_file.strip(), drop_file.encode('utf-8'))
                except OSError, expr:
            doc = amara.parse(et.tostring(xml.getroot(), "ISO-8859-1"))
            _logger.debug("XML output: %s\n" % doc.xml(indent=u"yes", encoding="ISO-8859-1"))
            zfile.writestr("test.xml", doc.xml(indent="yes", encoding="ISO-8859-1"))

        return zfile

    def generate_xml(self, test_plan):
        """Generate test drop XML."""
        self.defaults = {"enabled": "true", 
                         "passrate": "100", 
                         "significant": "false"}
        root = E("test")
        root.append(et.XML("<name>%(testrun_name)s</name>" % test_plan))
        if test_plan["diamonds_build_url"]:
                et.XML("<buildid>%(diamonds_build_url)s</buildid>" % test_plan))
        generate_target(test_plan, root)
        for post_action in generate_post_actions(test_plan):
        etree = et.ElementTree(root)
        return etree
    def generate_plan(self, test_plan):
        """Generate the test <plan> with multiple <set>s."""
        plan = E("plan", name="%s Plan" % test_plan["testrun_name"],
                 harness=test_plan["harness"], **self.defaults)
        session = SE(plan, "session", name="session", harness=test_plan["harness"], **self.defaults)

        if not test_plan.custom_dir is None:
            insert_custom_file(session, test_plan.custom_dir.joinpath("preset_custom.xml"))
        # One set for each component.
        for setd in test_plan.sets:
            self.drop_path = self.drop_path_root.joinpath(setd["name"])
            elem = SE(session, "set", name=setd["name"]+"-"+setd["component_path"], harness=setd["test_harness"], **self.defaults)
            SE(SE(elem, "target"), "device", rank="master", alias="DEFAULT_%s" % setd["test_harness"])
            if not test_plan.custom_dir is None:
                insert_custom_file(elem, test_plan.custom_dir.joinpath("precase_custom.xml"))
            case = SE(elem, "case", name="%s case" % setd["name"],
                      harness=setd["test_harness"], **self.defaults)
            self.generate_steps(setd, case, test_plan)
            if not test_plan.custom_dir is None:
                insert_custom_file(elem, test_plan.custom_dir.joinpath("postcase_custom.xml"))

        if not test_plan.custom_dir is None:
            insert_custom_file(session, test_plan.custom_dir.joinpath("postset_custom.xml"))

        return plan

    def generate_steps_logdir(self, setd, case):
        """generates STIF log dir."""
        _qt_test_ = check_qt_harness(setd)
        if _qt_test_:
            step = SE(case, "step", name="Create QT log dir", harness=setd["test_harness"], **self.defaults)
            step = SE(case, "step", name="Create %s log dir" % setd["test_harness"], harness=setd["test_harness"], **self.defaults)
        SE(step, "command").text = "makedir"
        if setd["test_harness"] == "STIF":
            SE(SE(step, "params"), "param", dir=self.STIF_LOG_DIR)
        if setd["test_harness"] == "GENERIC":
            if check_mtf_harness(setd):
                SE(SE(step, "params"), "param", dir=self.MTF_LOG_DIR)
                SE(SE(step, "params"), "param", dir=self.TEF_LOG_DIR)
        elif setd["test_harness"] == "EUNIT":
            if _qt_test_:
                SE(SE(step, "params"), "param", dir=self.QT_LOG_DIR)
                SE(SE(step, "params"), "param", dir=self.EUNIT_LOG_DIR)
        elif setd["test_harness"] == "STIFUNIT":
            SE(SE(step, "params"), "param", dir=self.STIFUNIT_LOG_DIR)
        if setd.has_key("sis_files") and setd["sis_files"]:
            setd = dict(setd, src_dst=[]) # Added to pass the Sis tests, if removed - gives KeyError
            for sis_file in setd["sis_files"]:
                self.generate_install_step(case, "sis",, "sis", 
                                           r"c:" + os.sep + "testframework", setd["test_harness"])
            if setd["src_dst"] != []:
                self.generate_install_step(case, "", "", 
                                               "", r"", setd["test_harness"], setd["src_dst"])
                # Data file install.
                for data_file in setd["data_files"]:                                
                    self.generate_install_step(case, "data",, "data", 
                                               r"e:\testing\data", setd["test_harness"])

                # Configuration file install.
                for conf_file in setd["config_files"]:
                    self.generate_install_step(case, "conf",, "conf", 
                                               r"e:\testing\conf", setd["test_harness"])

                # Test module install.
                for test_file in setd["testmodule_files"]:
                    self.generate_install_step(case, "testmodule",, 
                                               "testmodules", r"c:\sys\bin", setd["test_harness"]) 
        return setd

    def generate_steps_engineini(self, setd, case):
        """Engine ini install ( if one exists )"""
        if setd.has_key("sis_files") and setd["sis_files"]:
            self.generate_install_step(case, "engine_ini",
                                       r"c:" + os.sep + "testframework", setd["test_harness"])
            if setd["src_dst"] == []:
                self.generate_install_step(case, "engine_ini",
                                       r"c:" + os.sep + "testframework", setd["test_harness"])

    def generate_steps_sisfiles(self, setd, case, test_plan):
        """generating steps for sis files"""
        for sis_file in setd["sis_files"]:
            step = SE(case, "step", name="Install SIS to the device: %s" % \
            , harness=setd["test_harness"], **self.defaults)
            SE(step, "command").text = "install-software"
            params = SE(step, "params")
            sis_name = path(r"c:" + os.sep + "testframework").joinpath(
            for key, value in (("timeout", test_plan["test_timeout"]),
                               ("overWriteAllowed", "true"),
                               ("upgradeData", "true"),
                               ("downloadAllowed", "false"),
                               ("packageInfoAllowed", "true"),
                               ("untrustedAllowed", "true"),
                               ("ignoreOCSPWarnings", "true"),
                               ("userCapGranted", "true"),
                               ("optionalItemsAllowed", "true"),
                               ("killApp", "true"),
                               ("installDrive", "C"),
                               ("upgradeAllowed", "true"),
                               ("OCSP_Done", "true"),
                               ("sisPackageName", sis_name.normpath())):
                SE(params, "param").set(key, value)

    def generate_steps_tracestart(self, setd, case, pmds):
        """Tracing steps are added (Trace Start)"""
        step = SE(case, "step", 
                  name="Start tracing", harness=setd["test_harness"],
        SE(step, "command").text = "trace-start"
        params = SE(step, "params")
        if setd.has_key("trace_activation_files") and setd["trace_activation_files"]:
            #find out the group to activate
            trace_group = et.parse(setd["trace_activation_files"][0]).getroot().find("Configurations").find("TraceActivation").find("Configuration").get("Name")
            SE(params, "param", ta=self.drop_path.joinpath(r"trace_activation", setd["trace_activation_files"][0].name)) 
            SE(params, "param", tgrp=trace_group )                                            
        if setd.has_key("pmd_files") and setd["pmd_files"]:
            SE(params, "param", pmd=pmds.joinpath(setd["pmd_files"][0].name))
        SE(params, "param", log=setd["trace_path"])            
        SE(params, "param", timeout="60")
        elem = SE(params, "param")
        elem.set('date-format', "yyyyMMdd")
        elem = SE(params, "param")
        elem.set('time-format', "HHmmss")

    def generate_steps_createstep(self, setd, case, test_plan):
        """generates core steps for a single set"""
        if setd["test_harness"] == "STIF" or setd["test_harness"] == "STIFUNIT" or setd["test_harness"] == "GENERIC":
            if setd["src_dst"] == []:
                # Test case execution. If ini file exists, use that
                if setd["engine_ini_file"] != None:
                    step = SE(case, "step", 
                              name="Execute test: %s" % setd["engine_ini_file"].name, 
                              harness=setd["test_harness"], **self.defaults)
                    SE(step, "command").text = "run-cases"
                    params = SE(step, "params")
                    SE(params, "param", filter="*")
                    SE(params, "param", timeout=test_plan["test_timeout"])
                    ini_name = setd["engine_ini_file"].name
                    SE(params, "param", engineini=path(r"c:" + os.sep + "testframework") / ini_name)            
                # if no inifile, but cfg files defined, use those
                elif setd["config_files"]!=[]:
                    for config_file in setd["config_files"]:
                        step = SE(case, "step", 
                                  name="Execute test: %s" %, 
                                  harness=setd["test_harness"], **self.defaults)
                        SE(step, "command").text = "run-cases"
                        params = SE(step, "params")
                        SE(params, "param", module="TESTSCRIPTER")
                        elem = SE(params, "param" )
                        elem.set('testcase-file', path(r"e:\testing\conf") / )
                        SE(params, "param", filter="*")
                        SE(params, "param", timeout=test_plan["test_timeout"])

                # if no ini or cfg files, use dll directly
                    for testmodule_file in setd["testmodule_files"]:
                        step = SE(case, "step", 
                                  name="Execute test: %s" %, harness=setd["test_harness"], 
                        SE(step, "command").text = "run-cases"
                        params = SE(step, "params")
                        SE(params, "param",
                        SE(params, "param", filter="*")
                        SE(params, "param", timeout=test_plan["test_timeout"])
            elif setd["src_dst"] != []:
                self.generate_run_steps(case, setd, test_plan["test_timeout"], test_plan["eunitexerunner_flags"])
        elif setd["test_harness"] == "EUNIT":
            self.generate_run_steps(case, setd, test_plan["test_timeout"], test_plan["eunitexerunner_flags"])

    def generate_steps_tracestop(self, setd, case, pmds):
        """Tracing steps are added (Trace Stop)"""
        step = SE(case, "step", name="Stop tracing",
                  harness=setd["test_harness"], **self.defaults)        
        SE(step, "command").text = "trace-stop"
        params = SE(step, "params")
        SE(params, "param", timeout="60")

        step = SE(case, "step", name="Convert tracing",
                  harness=setd["test_harness"], **self.defaults)        
        SE(step, "command").text = "trace-convert"
        params = SE(step, "params")
        if setd.has_key("pmd_files") and setd["pmd_files"]:
            SE(params, "param", pmd=pmds.joinpath(setd["pmd_files"][0].name))
        SE(params, "param", log=setd["trace_path"])            
        SE(params, "param", timeout="60")
        elem = SE(params, "param")
        elem.set('date-format', "yyyyMMdd")
        elem = SE(params, "param")
        elem.set('time-format', "HHmmss")

    def generate_steps_ctcdata(self, setd, case, test_plan):
        """generates steps for installing CTC data"""
        global CTC_PATHS_LIST
        ctc_helium_path_list = []
        step = SE(case, "step", name="Save CTC data", harness=setd["test_harness"], **self.defaults)
        SE(step, "command").text = "execute"
        params = SE(step, "params")
        SE(params, "param", parameters="writelocal")
        SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
        step = SE(case, "step", name="Save CTC data", harness=setd["test_harness"], **self.defaults)
        SE(step, "command").text = "execute"
        params = SE(step, "params")
        SE(params, "param", parameters="writefile")
        SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
        if test_plan["ats_network_drive"].strip() != "":
            #preparing local-path for CTC step
            #getting '39865' as diamonds ID out of ''
            if test_plan["diamonds_build_url"].rfind("/", 0):
                diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 2)[1]
                diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 1)[1]
            #separating network id and drop number from\share#ats\
            #'drop2' from the other part of the string conjuncted with a # sign
            ats_network = r"\\" + test_plan["ats_network_drive"].rsplit("#", 1)[0] #network host
            temp_drop_id = path(test_plan["ats_network_drive"].rsplit("#", 1)[1].rsplit(".", 1)[0]).normpath() #drop ID
            drop_id = temp_drop_id.rsplit(atssep, 1)[1]
            ats_network_path = atspath.join(ats_network, "ctc_helium" , diamonds_id, drop_id, setd["name"], "ctcdata")
            step = SE(case, "step", name="Fetch CTC data for post commands execution", harness=setd["test_harness"], **self.defaults)
            SE(step, "command").text = "fetch-log"
            params = SE(step, "params")
            SE(params, "param", delete="false")
            elem = SE(params, "param")
            elem.set('local-path', ats_network_path)
            SE(params, "param", path=path(self.CTC_LOG_DIR).joinpath(r"ctcdata.txt"))

            CTC_PATHS_LIST += ctc_helium_path_list #creating list of ctcdata.txt files for runProcess postaction
        step = SE(case, "step", name="Fetch and clean CTC data", harness=setd["test_harness"], **self.defaults)
        SE(step, "command").text = "fetch-log"
        params = SE(step, "params")
        SE(params, "param", delete="true")
        SE(params, "param", path=path(self.CTC_LOG_DIR).joinpath(r"ctcdata.txt"))
    def generate_steps_logfetching(self, setd, case):
        """generates steps for fetching log file"""
        step = SE(case, "step", name="Fetch test module logs", harness=setd["test_harness"], **self.defaults)
        SE(step, "command").text = "fetch-log"
        params = SE(step, "params")
        SE(params, "param", type="text")
        SE(params, "param", delete="true")
        if setd["test_harness"] == "STIF":
            SE(params, "param", path=path(self.STIF_LOG_DIR).joinpath(r"*"))
        if setd["test_harness"] == "GENERIC":
            if check_mtf_harness(setd):
                SE(params, "param", path=path(self.MTF_LOG_DIR).joinpath(r"*"))
                SE(params, "param", path=path(self.TEF_LOG_DIR).joinpath(r"*"))
        elif setd["test_harness"] == "STIFUNIT":
            SE(params, "param", path=path(self.STIFUNIT_LOG_DIR).joinpath(r"*"))
        elif setd["test_harness"] == "EUNIT":
            if check_qt_harness(setd):
                SE(params, "param", path=path(self.QT_LOG_DIR).joinpath(r"*"))
                SE(params, "param", path=path(self.EUNIT_LOG_DIR).joinpath(r"*"))

    def generate_steps(self, setd, case, test_plan):
        """Generate the test plan <step>s."""
        # Flash images.
        images = self.drop_path_root.joinpath("images")
        pmds = self.drop_path_root.joinpath("pmds")
        sorted_images = []
        for image_file in setd["image_files"]:
            if 'core' in
        for image_file in setd["image_files"]:
            if 'rofs2' in
        for image_file in setd["image_files"]:
            if 'rofs3' in
        for image_file in setd["image_files"]:
            if 'core' not in and 'rofs2' not in and 'rofs3' not in
        for image_file in sorted_images:
            flash = SE(case, "flash", images=images.joinpath(image_file))
            flash.set("target-alias", "DEFAULT_%s" % setd["test_harness"])

        if not test_plan.custom_dir is None:
            insert_custom_file(case, test_plan.custom_dir.joinpath("prestep_custom.xml"))

        if setd["ctc_enabled"] == "True":
            step = SE(case, "step", name="Create CTC log dir", harness=setd["test_harness"], **self.defaults)
            SE(step, "command").text = "makedir"
            params = SE(step, "params")
            SE(params, "param", dir=self.CTC_LOG_DIR)
            step = SE(case, "step", name="CTC start", harness=setd["test_harness"], **self.defaults)
            SE(step, "command").text = "execute"
            params = SE(step, "params")
            SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
        # STIF log dir.
        setd = self.generate_steps_logdir(setd, case)

        # Engine ini install ( if one exists )
        if setd["engine_ini_file"] != None:
            self.generate_steps_engineini(setd, case)
        #If sis files
        if setd.has_key("sis_files") and setd["sis_files"]:
            self.generate_steps_sisfiles(setd, case, test_plan)    

        # If tracing enabled, Start Tracing:
        if setd.has_key("trace_path") and setd["trace_path"] != "":
            self.generate_steps_tracestart(setd, case, pmds)

        #core steps of a step

        if not test_plan.custom_dir is None:
            insert_custom_file(case, test_plan.custom_dir.joinpath("prerun_custom.xml"))
        self.generate_steps_createstep(setd, case, test_plan)

        if not test_plan.custom_dir is None:
            insert_custom_file(case, test_plan.custom_dir.joinpath("postrun_custom.xml"))
        # If tracing enabled, Stop Tracing
        if setd.has_key("trace_path") and setd["trace_path"] != "":
            self.generate_steps_tracestop(setd, case, pmds)

        #install CTC data
        if setd["ctc_enabled"] == "True":
            self.generate_steps_ctcdata(setd, case, test_plan)
        # Log file fetching.
        self.generate_steps_logfetching(setd, case)

        if not test_plan.custom_dir is None:
            insert_custom_file(case, test_plan.custom_dir.joinpath("poststep_custom.xml"))

    def generate_runsteps_tef(self, setd, case, src_dst, time_out):
        """generates runsteps for tef"""
        for file1 in src_dst:
            if 'testscript' in file1[2]:
                filename = file1[1]
                filename = filename[file1[1].rfind(os.sep)+1:]
                harness = "testexecute.exe"
                if file1[2] == "testscript:mtf":
                    harness = "testframework.exe"
                step = SE(case, "step", 
                              name="Execute test: %s" %  filename, harness=setd["test_harness"], 
                SE(step, "command").text = "execute"
                params = SE(step, "params")
                SE(params, "param", file=harness)
                SE(params, "param", parameters=file1[1])
                if file1[2] == "testscript:mtf":
                    SE(params, "param", {'result-file': self.MTF_LOG_DIR + os.sep + filename.replace('.script', '.htm')})
                    SE(params, "param", {'result-file': self.TEF_LOG_DIR + os.sep + filename.replace('.script', '.htm')})
                SE(params, "param", timeout=time_out)
                SE(params, "param", parser="TEFTestResultParser")
            if 'testmodule' in file1[2]:
                filename = file1[1]
                filename = filename[file1[1].rfind(os.sep)+1:]
                step = SE(case, "step", 
                              name="Execute test: %s" %  filename, harness=setd["test_harness"], 
                SE(step, "command").text = "execute"
                params = SE(step, "params")
                SE(params, "param", file=file1[1])
                SE(params, "param", {'result-file': self.TEF_LOG_DIR + os.sep + filename.replace(filename.split(".")[-1], 'htm')})
                SE(params, "param", timeout=time_out)
                SE(params, "param", parser="RTestResultParser")
    def generate_runsteps_stif(self, setd, case, src_dst, time_out):
        """generates runsteps for stif"""
        ini = cfg = dll = has_tf_ini = False
        ini_file = None
        cfg_files = dll_files = []

        for tf_ini in src_dst:
            if "testframework.ini" in tf_ini[1].lower():
                has_tf_ini = True
        for file1 in src_dst:
            if "testframework.ini" in file1[1].lower() and file1[2] == "engine_ini" and has_tf_ini:
                ini = True
                ini_file = file1
            elif file1[2] == "engine_ini" and not has_tf_ini:
                pipe_ini = open(file1[0], 'r')
                if "[engine_defaults]" in str(pipe_ini.readlines()).lower():
                    ini = True
                    ini_file = file1
            elif file1[2] == "conf":
                if not ini:
                    cfg = True
            elif file1[2] == "testmodule":
                if not cfg and not ini:
                    dll = True
        if ini:
            filename = ini_file[1]
            filename = filename[ini_file[1].rfind(os.sep)+1:]
            step = SE(case, "step",
                      name="Execute test: %s" % filename, 
                      harness=setd["test_harness"], **self.defaults)
            SE(step, "command").text = "run-cases"
            params = SE(step, "params")
            SE(params, "param", filter="*")
            SE(params, "param", timeout=time_out)
            SE(params, "param", engineini=ini_file[1]) 
        elif cfg:
            for conf_file in cfg_files:
                if ".dll" in conf_file[1].lower():
                filename = conf_file[1]
                filename = filename[conf_file[1].rfind(os.sep)+1:]
                step = SE(case, "step", 
                              name="Execute test: %s" % filename, 
                              harness=setd["test_harness"], **self.defaults)
                SE(step, "command").text = "run-cases"
                params = SE(step, "params")
                SE(params, "param", module="TESTSCRIPTER")
                elem = SE(params, "param" )
                elem.set('testcase-file', conf_file[1] )
                SE(params, "param", filter="*")
                SE(params, "param", timeout=time_out)
        elif dll:
            for dll_file in dll_files:
                filename = dll_file[1]
                filename = filename[dll_file[1].rfind(os.sep)+1:]
                step = SE(case, "step", 
                              name="Execute test: %s" %  filename, harness=setd["test_harness"], 
                SE(step, "command").text = "run-cases"
                params = SE(step, "params")
                SE(params, "param", module=filename)
                SE(params, "param", filter="*")
                SE(params, "param", timeout=time_out)

    def generate_runsteps_eunit(self, setd, case, src_dst, time_out, eunit_flags):
        """generates runsteps for eunit"""

        for sdst in src_dst:
            if "." in sdst[1]:
                fileextension = sdst[1].rsplit(".")[1].lower()
            filename = sdst[1]
            filename = filename[filename.rfind(os.sep)+1:]
            if fileextension == "dll" or fileextension == "exe":
                re_dll = re.compile(r'[.]+%s' % fileextension, re.IGNORECASE)
                no_dll = re_dll.sub('', filename)
                no_dll_xml = ''.join([no_dll, u'_log.xml'])

            #for EUnit or other executables
            if sdst[2] == "testmodule":
                eunit_exe = "EUNITEXERUNNER.EXE"
                    step = SE(case, "step", name = "Execute test: %s" % filename, harness=setd["test_harness"],
                    SE(step, "command").text = "execute"
                    params = SE(step, "params")
                    SE(params, "param", file=path(r"z:" + os.sep + "sys" + os.sep + "bin") / eunit_exe)
                    elem = SE(params, "param")
                    elem.set('result-file', path(self.EUNIT_LOG_DIR) / no_dll_xml)
                    SE(params, "param", parameters="%s /F %s /l xml %s" % (eunit_flags, no_dll, filename))
                    SE(params, "param", timeout=time_out)
            #for QtTest.lib executables
            elif sdst[2] == "testmodule:qt":
                step = SE(case, "step", name = "Execute Qt-test: %s" % filename, harness=setd["test_harness"],
                SE(step, "command").text = "execute"
                params = SE(step, "params")
                SE(params, "param", file=path(sdst[1]))
                SE(params, "param", parameters=r"-lightxml -o %s\%s" % (path(self.QT_LOG_DIR),  no_dll_xml))
                elem = SE(params, "param")
                elem.set('result-file', path(self.QT_LOG_DIR) / no_dll_xml)
                SE(params, "param", parser="QTestResultParser")
                elem = SE(params, "param")
                SE(params, "param", async="false")
                SE(params, "param", timeout=time_out)


    def generate_run_steps(self, case, setd, time_out, eunit_flags):
        """Generates run-steps"""
        src_dst = setd["src_dst"]
        if setd["test_harness"] == "STIF":
            self.generate_runsteps_stif(setd, case, src_dst, time_out)
        if setd["test_harness"] == "GENERIC":
            self.generate_runsteps_tef(setd, case, src_dst, time_out)
        if setd["test_harness"] == "STIFUNIT":
            self.generate_runsteps_stif(setd, case, src_dst, time_out)
        if setd["test_harness"] == "EUNIT":
            self.generate_runsteps_eunit(setd, case, src_dst, time_out, eunit_flags)

    def generate_install_step(self, case, step_type, filename, src_dir, 
                              dst_dir, case_harness, src_dst=None):
        """Generate install <step>."""
        if src_dst == None or src_dst == []:
            src_dst = []
            step = SE(case, "step", name="Install %s: %s" % (step_type, filename), 
                      harness=case_harness, **self.defaults)
            SE(step, "command").text = "install"
            params = SE(step, "params")
            SE(params, "param", src=self.drop_path.joinpath(src_dir, filename))
            SE(params, "param", dst=path(dst_dir).joinpath(filename))
            for sdst in src_dst:
                dst = sdst[1]
                type_ = sdst[2]
                if "testmodule" in type_ or ".dll" in dst:
                    src_dir = dst.replace(":","")
                    src_dir = path(src_dir[:src_dir.rfind(os.sep)])
                    step_type = type_
                    filename = dst[dst.rfind(os.sep)+1:]
                    step = SE(case, "step", name="Install %s: %s" % (step_type, filename), 
                              harness=case_harness, **self.defaults)
                    SE(step, "command").text = "install"
                    params = SE(step, "params")
                    SE(params, "param", src=self.drop_path.joinpath(src_dir, filename))
                    SE(params, "param", dst=path(dst))
            for sdst in src_dst:
                dst = sdst[1]
                type_ = sdst[2]
                if "testmodule" not in type_ and ".dll" not in dst:
                    src_dir = dst.replace(":","")
                    src_dir = path(src_dir[:src_dir.rfind(os.sep)])
                    step_type = type_
                    filename = dst[dst.rfind(os.sep)+1:]
                    step = SE(case, "step", name="Install %s: %s" % (step_type, filename), 
                              harness=case_harness, **self.defaults)
                    SE(step, "command").text = "install"
                    params = SE(step, "params")
                    SE(params, "param", src=self.drop_path.joinpath(src_dir, filename))
                    SE(params, "param", dst=path(dst))

    def drop_files(self, test_plan):
        """Yield a list of drop files."""
        drop_set = set()
        drop_files = []
        pkg_files = []
        for setd in test_plan.sets:
            drop_path = self.drop_path_root.joinpath(setd["name"])
            if setd.has_key("sis_files") and setd["sis_files"]:
                if setd.has_key("pmd_files") and setd["pmd_files"]:
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),
                                  (drop_path.parent, "pmds", setd["pmd_files"]),
                                  (drop_path, "sis", setd["sis_files"]),
                                  (drop_path, "init", [setd["engine_ini_file"]]),
                                  (drop_path, "trace_init", setd["trace_activation_files"]))
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),
                                  (drop_path, "sis", setd["sis_files"]),
                                  (drop_path, "init", [setd["engine_ini_file"]]))
            elif setd["src_dst"] == []:
                if setd.has_key("pmd_files") and setd["pmd_files"]:
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),
                                  (drop_path.parent, "pmds", setd["pmd_files"]),
                                  (drop_path, "data", setd["data_files"]),
                                  (drop_path, "conf", setd["config_files"]),
                                  (drop_path, "testmodules", setd["testmodule_files"]),
                                  (drop_path, "init", [setd["engine_ini_file"]]),
                                  (drop_path, "trace_init", setd["trace_activation_files"]))
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),
                                  (drop_path, "data", setd["data_files"]),
                                  (drop_path, "conf", setd["config_files"]),
                                  (drop_path, "testmodules", setd["testmodule_files"]),
                                  (drop_path, "init", [setd["engine_ini_file"]]))
            elif setd["src_dst"] != []:
                for x in setd["src_dst"]:
                    src = x[0]
                    dst = x[1]
                    dst2 = dst.replace(":","")
                    pkg_files.append((drop_path, dst2, src))
                if setd.has_key("pmd_files") and setd["pmd_files"]:
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),
                                  (drop_path.parent, "pmds", setd["pmd_files"]),
                                  (drop_path, "trace_init", setd["trace_activation_files"]))
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),)
            for drop_dir, sub_dir, files in drop_files:
                for file_path in files:
                    if file_path != None:
                        drop_file = drop_dir.joinpath(sub_dir,
                        drop_file = drop_file.normpath()
                        if drop_file not in drop_set:
                            yield (drop_file, file_path.normpath())
            for drop_dir, sub_dir, files in pkg_files:
                drop_file = drop_dir.joinpath(sub_dir)
                drop_file = drop_file.normpath()
                file_path = path(files)
                if drop_file not in drop_set:
                    yield (drop_file, file_path.normpath())

    def generate_files(self, test_plan):
        """Generate the <files> section."""
        files_elem = E("files")
        for drop_file, _ in self.drop_files(test_plan):
            SE(files_elem, "file").text = drop_file
        return files_elem

def generate_target(test_plan, root):
    """Generate targets"""
    harness = test_plan["harness"]
    if harness == "MULTI_HARNESS":
        input_targets(test_plan, root, ["STIF", "EUNIT"])
    elif harness == "STIF":
        input_targets(test_plan, root, ["STIF"])
    elif harness == "EUNIT":
        input_targets(test_plan, root, ["EUNIT"])
    elif harness == "STIFUNIT":
        input_targets(test_plan, root, ["STIFUNIT"])
    elif harness == "GENERIC":
        input_targets(test_plan, root, ["GENERIC"])
def input_targets(test_plan, root, harness_type):
    """Append target(s) into the XML"""
    target = E("target")
    for har in harness_type:
        device = SE(target, "device", rank="none", alias="DEFAULT_%s" % har)
        SE(device, "property", name="HARNESS", value=har)
        SE(device, "property", name="TYPE", value=test_plan["device_type"])
        if test_plan["device_hwid"] != "":
            SE(device, "property", name="HWID", value=test_plan["device_hwid"])
        if test_plan["trace_enabled"] != "":
            if test_plan["trace_enabled"].lower() == "true":
                SE(device, "property", name="TRACE_ENABLED", value=test_plan["trace_enabled"])

def insert_custom_file(xmltree, filename):
    Inserts into the given XML tree the given customization file
    Broken input XML inserts a comment to the XML tree
        custom_action_file =, "r", "iso-8859-15")
        loop = ''
        cust = unicode(
            # try to read the file  and addcharacter by character until the 
            # elementtree is happy and then reset the loop and continue until the file is 
            # completely processed. Known issue: file ending in comment will cause a warning.
            while cust:
                if loop != '' :
                  # if we have something left from the previous try
                    cust = loop + cust
#                _logger.debug("what is cust  \n %s \n" % cust)
                except ExpatError, err:
#                    _logger.debug("Error %s in XML when prosessing file %s \n Line and column refer to section:\n%s\n" % ( err, filename, loop))
                    loop = cust
                # clear the loop variable 
                    loop = ''
                cust = unicode(
        except Exception, err:
            _logger.error("Error %s in XML when prosessing %s\n" % ( err, filename))
            xmltree.append(et.Comment("Error in XML file when prosessing %s\n" % ( filename)))

        if loop != '' :
            # we should have used all the input and cleared loop variable
            _logger.warning("Issues in customization file %s in XML when prosessing issue %s \n Line and column refer to section:\n%s\n" % ( filename, err,  loop))

    except IOError, err:
#        _logger.debug("This is for debugging only. Do not treat this as anything else. Anything is OK... The data: %s when prosessing %s\n" % (err, filename))
    else:"Included file %s" % ( filename))

def check_qt_harness(_setd_):
    _setd_ = _setd_
    is_qt_test = False
    if _setd_.has_key("sis_files"):
        _dict_key_ = "sis_files"
        _dict_key_ = "src_dst"
    for _srcdst_ in _setd_[_dict_key_]:
        if "testmodule:qt" == _srcdst_[2]:
            is_qt_test = True
    return is_qt_test 

def check_mtf_harness(_setd_):
    for _srcdst_ in _setd_['src_dst']:
        if _srcdst_[2] == "testscript:mtf":
            return True
    return False

def generate_post_actions(test_plan):
    """Generate post actions."""
    import string
    actions = []
    if not test_plan.custom_dir is None:
        insert_custom_file(actions, test_plan.custom_dir.joinpath("prepostaction.xml"))
    for action_type, parameters in test_plan.post_actions:
        action = E("postAction")
        SE(action, "type").text = action_type
        params = SE(action, "params")
        for name, value in parameters:
            SE(params, "param", name=name, value=value)

    if not test_plan.custom_dir is None:
        insert_custom_file(actions, test_plan.custom_dir.joinpath("postpostaction.xml"))

    return actions