buildframework/helium/sf/python/pythoncore/lib/ats3/dropgenerator.py
author wbernard
Thu, 22 Jul 2010 17:08:43 +0300
branchhelium-9.0
changeset 618 df88fead2976
parent 587 85df38eb4012
permissions -rw-r--r--
helium_9.0.5-4399343f4f50

# -*- encoding: latin-1 -*-

#============================================================================ 
#Name        : dropgenerator.py 
#Part of     : Helium 

#Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
#All rights reserved.
#This component and the accompanying materials are made available
#under the terms of the License "Eclipse Public License v1.0"
#which accompanies this distribution, and is available
#at the URL "http://www.eclipse.org/legal/epl-v10.html".
#
#Initial Contributors:
#Nokia Corporation - initial contribution.
#
#Contributors:
#
#Description:
#===============================================================================

""" Generate test drop zip file for ATS3"""

# pylint: disable-msg=W0142,R0912,R0201,R0915,R0913,R0904
# pylint: disable-msg=C0302
# pylint: disable-msg=W0404,W0603

#W0142 => * and ** were used
#C0302 => Too many lines
#R* removed when refactored
#W => use of global statement

import codecs
from  xml.parsers.expat import ExpatError

from xml.etree import ElementTree as et
import pkg_resources
from path import path # pylint: disable-msg=F0401
import logging
import os
import re
import zipfile
import amara
import atsconfigparser

# pylint: disable-msg=W0404
from ntpath import sep as atssep
import ntpath as atspath

import jinja2 # pylint: disable-msg=F0401

_logger = logging.getLogger('ats')

# Shortcuts
E = et.Element
SE = et.SubElement

CTC_PATHS_LIST = []

class Ats3TestDropGenerator(object):
    """
    Generate test drop zip file for ATS3.

    Generates drom zip files file from a TestPlan instance. The main
    responsibility of this class is to serialize the plan into a valid XML
    file and build a zip file for the drop.
    
    Creates one <set> for each component's tests.

    Stif harness, normal operation
    ------------------------------
    
    - create logging dir for stif             makedir (to C:\logs\TestFramework)
    - install data files                      install (to E:\testing\data)
    - install configuration (.cfg) files      "       (to E:\testing\conf)
    - install testmodule (.dll) files         "       (to C:\sys\bin)
    - install engine ini (testframework.ini)  "       (to C:\testframework)
    - execute cases from the engine ini       run-cases
    - fetch logs                              fetch-log

    Stif harness, SIS package installation
    --------------------------------------
    
    - like above but with data and config files replaced by sis files
    - install sis to the device               install-software

    """

    STIF_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testframework"
    TEF_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testexecute"
    MTF_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testresults"
    STIFUNIT_LOG_DIR = r"c:" + os.sep + "logs" + os.sep + "testframework"
    EUNIT_LOG_DIR = r"c:" + os.sep + "Shared" + os.sep + "EUnit" + os.sep + "logs"
    #QT_LOG_DIR = r"c:" + os.sep + "private" + os.sep + "Qt" + os.sep + "logs"
    QT_LOG_DIR = r"c:" + os.sep + "shared" + os.sep + "EUnit" + os.sep + "logs"
    CTC_LOG_DIR = r"c:" + os.sep + "data" + os.sep + "ctc"

    def __init__(self):
        self.drop_path_root = path("ATS3Drop")
        self.drop_path = None
        self.defaults = {}

    def generate(self, test_plan, output_file, config_file=None):
        """Generate a test drop file."""
        xml = self.generate_xml(test_plan)
        
        if config_file:
            xmltext = et.tostring(xml.getroot(), "ISO-8859-1")
            xmltext = atsconfigparser.converttestxml(config_file, xmltext)
            xml = et.ElementTree(et.XML(xmltext))
            
        return self.generate_drop(test_plan, xml, output_file)

    def generate_drop(self, test_plan, xml, output_file):
        """Generate test drop zip file."""
        zfile = zipfile.ZipFile(output_file, "w", zipfile.ZIP_DEFLATED)
        try:
            for drop_file, src_file in self.drop_files(test_plan):
                if not src_file.startswith('\\\\'):
                    _logger.info("   + Adding: %s" % src_file.strip())
                    try:
                        zfile.write(src_file.strip(), drop_file.encode('utf-8'))
                    except OSError, expr:
                        _logger.error(expr)
            doc = amara.parse(et.tostring(xml.getroot(), "ISO-8859-1"))
            _logger.debug("XML output: %s\n" % doc.xml(indent=u"yes", encoding="ISO-8859-1"))
            zfile.writestr("test.xml", doc.xml(indent="yes", encoding="ISO-8859-1"))
        finally:
            zfile.close()

        return zfile

    def generate_xml(self, test_plan):
        """Generate test drop XML."""
        self.defaults = {"enabled": "true", 
                         "passrate": "100", 
                         "significant": "false"}
        root = E("test")
        root.append(et.XML("<name>%(testrun_name)s</name>" % test_plan))
        if test_plan["diamonds_build_url"]:
            root.append(
                et.XML("<buildid>%(diamonds_build_url)s</buildid>" % test_plan))
        generate_target(test_plan, root)
        root.append(self.generate_plan(test_plan))
        for post_action in generate_post_actions(test_plan):
            root.append(post_action)
        root.append(self.generate_files(test_plan))
        etree = et.ElementTree(root)
        return etree
            
    def generate_plan(self, test_plan):
        """Generate the test <plan> with multiple <set>s."""
        plan = E("plan", name="%s Plan" % test_plan["testrun_name"],
                 harness=test_plan["harness"], **self.defaults)
        session = SE(plan, "session", name="session", harness=test_plan["harness"], **self.defaults)

        if not test_plan.custom_dir is None:
            insert_custom_file(session, test_plan.custom_dir.joinpath("preset_custom.xml"))
        
        # One set for each component.
        for setd in test_plan.sets:
            self.drop_path = self.drop_path_root.joinpath(setd["name"])
            elem = SE(session, "set", name=setd["name"]+"-"+setd["component_path"], harness=setd["test_harness"], **self.defaults)
            SE(SE(elem, "target"), "device", rank="master", alias="DEFAULT_%s" % setd["test_harness"])
             
            if not test_plan.custom_dir is None:
                insert_custom_file(elem, test_plan.custom_dir.joinpath("precase_custom.xml"))
        
            case = SE(elem, "case", name="%s case" % setd["name"],
                      harness=setd["test_harness"], **self.defaults)
            self.generate_steps(setd, case, test_plan)
            if not test_plan.custom_dir is None:
                insert_custom_file(elem, test_plan.custom_dir.joinpath("postcase_custom.xml"))

        if not test_plan.custom_dir is None:
            insert_custom_file(session, test_plan.custom_dir.joinpath("postset_custom.xml"))

        return plan

    def generate_steps_logdir(self, setd, case):
        """generates STIF log dir."""
        
        _qt_test_ = self.check_qt_harness(setd)
        if _qt_test_:
            step = SE(case, "step", name="Create QT log dir", harness=setd["test_harness"], **self.defaults)
        else:
            step = SE(case, "step", name="Create %s log dir" % setd["test_harness"], harness=setd["test_harness"], **self.defaults)
        SE(step, "command").text = "makedir"
        if setd["test_harness"] == "STIF":
            SE(SE(step, "params"), "param", dir=self.STIF_LOG_DIR)
        if setd["test_harness"] == "GENERIC":
            if self.check_mtf_harness(setd):
                SE(SE(step, "params"), "param", dir=self.MTF_LOG_DIR)
            else:
                SE(SE(step, "params"), "param", dir=self.TEF_LOG_DIR)
        elif setd["test_harness"] == "EUNIT":
            if _qt_test_:
                SE(SE(step, "params"), "param", dir=self.QT_LOG_DIR)
            else: 
                SE(SE(step, "params"), "param", dir=self.EUNIT_LOG_DIR)
                
        elif setd["test_harness"] == "STIFUNIT":
            SE(SE(step, "params"), "param", dir=self.STIFUNIT_LOG_DIR)
            
        if setd.has_key("sis_files") and setd["sis_files"]:
            setd = dict(setd, src_dst=[]) # Added to pass the Sis tests, if removed - gives KeyError
            for sis_file in setd["sis_files"]:
                self.generate_install_step(case, "sis", sis_file.name, "sis", 
                                           r"c:" + os.sep + "testframework", setd["test_harness"])
        else:
            if setd["src_dst"] != []:
                self.generate_install_step(case, "", "", 
                                               "", r"", setd["test_harness"], setd["src_dst"])
            else:
                # Data file install.
                for data_file in setd["data_files"]:                                
                    self.generate_install_step(case, "data", data_file.name, "data", 
                                               r"e:\testing\data", setd["test_harness"])

                # Configuration file install.
                for conf_file in setd["config_files"]:
                    self.generate_install_step(case, "conf", conf_file.name, "conf", 
                                               r"e:\testing\conf", setd["test_harness"])

                # Test module install.
                for test_file in setd["testmodule_files"]:
                    self.generate_install_step(case, "testmodule", test_file.name, 
                                               "testmodules", r"c:\sys\bin", setd["test_harness"]) 
        return setd

    def generate_steps_engineini(self, setd, case):
        """Engine ini install ( if one exists )"""
        if setd.has_key("sis_files") and setd["sis_files"]:
            self.generate_install_step(case, "engine_ini",
                                       setd["engine_ini_file"].name,
                                       "init",
                                       r"c:" + os.sep + "testframework", setd["test_harness"])
        else:
            if setd["src_dst"] == []:
                self.generate_install_step(case, "engine_ini",
                                       setd["engine_ini_file"].name,
                                       "init",
                                       r"c:" + os.sep + "testframework", setd["test_harness"])

    def generate_steps_sisfiles(self, setd, case, test_plan):
        """generating steps for sis files"""
        for sis_file in setd["sis_files"]:
            step = SE(case, "step", name="Install SIS to the device: %s" % \
                      sis_file.name, harness=setd["test_harness"], **self.defaults)
            SE(step, "command").text = "install-software"
            params = SE(step, "params")
            sis_name = path(r"c:" + os.sep + "testframework").joinpath(sis_file.name)
            for key, value in (("timeout", test_plan["test_timeout"]),
                               ("overWriteAllowed", "true"),
                               ("upgradeData", "true"),
                               ("downloadAllowed", "false"),
                               ("packageInfoAllowed", "true"),
                               ("untrustedAllowed", "true"),
                               ("ignoreOCSPWarnings", "true"),
                               ("userCapGranted", "true"),
                               ("optionalItemsAllowed", "true"),
                               ("killApp", "true"),
                               ("installDrive", "C"),
                               ("upgradeAllowed", "true"),
                               ("OCSP_Done", "true"),
                               ("sisPackageName", sis_name.normpath())):
                SE(params, "param").set(key, value)

    def generate_steps_tracestart(self, setd, case, pmds):
        """Tracing steps are added (Trace Start)"""
        step = SE(case, "step", 
                  name="Start tracing", harness=setd["test_harness"],
                  **self.defaults)
        SE(step, "command").text = "trace-start"
        params = SE(step, "params")
        if setd.has_key("trace_activation_files") and setd["trace_activation_files"]:
            #find out the group to activate
            trace_group = et.parse(setd["trace_activation_files"][0]).getroot().find("Configurations").find("TraceActivation").find("Configuration").get("Name")
            SE(params, "param", ta=self.drop_path.joinpath(r"trace_activation", setd["trace_activation_files"][0].name)) 
            SE(params, "param", tgrp=trace_group )                                            
        if setd.has_key("pmd_files") and setd["pmd_files"]:
            SE(params, "param", pmd=pmds.joinpath(setd["pmd_files"][0].name))
        SE(params, "param", log=setd["trace_path"])            
        SE(params, "param", timeout="60")
        elem = SE(params, "param")
        elem.set('date-format', "yyyyMMdd")
        elem = SE(params, "param")
        elem.set('time-format', "HHmmss")

    def generate_steps_createstep(self, setd, case, test_plan):
        """generates core steps for a single set"""
        if setd["test_harness"] == "STIF" or setd["test_harness"] == "STIFUNIT" or setd["test_harness"] == "GENERIC":
            if setd["src_dst"] == []:
                # Test case execution. If ini file exists, use that
                if setd["engine_ini_file"] != None:
                    step = SE(case, "step", 
                              name="Execute test: %s" % setd["engine_ini_file"].name, 
                              harness=setd["test_harness"], **self.defaults)
                    SE(step, "command").text = "run-cases"
                    params = SE(step, "params")
                    SE(params, "param", filter="*")
                    SE(params, "param", timeout=test_plan["test_timeout"])
                    ini_name = setd["engine_ini_file"].name
                    SE(params, "param", engineini=path(r"c:" + os.sep + "testframework") / ini_name)            
                    
                # if no inifile, but cfg files defined, use those
                elif setd["config_files"]!=[]:
                    for config_file in setd["config_files"]:
                        step = SE(case, "step", 
                                  name="Execute test: %s" % config_file.name, 
                                  harness=setd["test_harness"], **self.defaults)
                        SE(step, "command").text = "run-cases"
                        params = SE(step, "params")
                        SE(params, "param", module="TESTSCRIPTER")
                        elem = SE(params, "param" )
                        elem.set('testcase-file', path(r"e:\testing\conf") / config_file.name )
                        SE(params, "param", filter="*")
                        SE(params, "param", timeout=test_plan["test_timeout"])

                # if no ini or cfg files, use dll directly
                else:
                    for testmodule_file in setd["testmodule_files"]:
                        step = SE(case, "step", 
                                  name="Execute test: %s" %  testmodule_file.name, harness=setd["test_harness"], 
                                  **self.defaults)
                        SE(step, "command").text = "run-cases"
                        params = SE(step, "params")
                        SE(params, "param", module=testmodule_file.name)
                        SE(params, "param", filter="*")
                        SE(params, "param", timeout=test_plan["test_timeout"])
            elif setd["src_dst"] != []:
                self.generate_run_steps(case, setd, test_plan["test_timeout"], test_plan["eunitexerunner_flags"])
        elif setd["test_harness"] == "EUNIT":
            self.generate_run_steps(case, setd, test_plan["test_timeout"], test_plan["eunitexerunner_flags"])

    def generate_steps_tracestop(self, setd, case, pmds):
        """Tracing steps are added (Trace Stop)"""
        step = SE(case, "step", name="Stop tracing",
                  harness=setd["test_harness"], **self.defaults)        
        SE(step, "command").text = "trace-stop"
        params = SE(step, "params")
        SE(params, "param", timeout="60")

        step = SE(case, "step", name="Convert tracing",
                  harness=setd["test_harness"], **self.defaults)        
        SE(step, "command").text = "trace-convert"
        params = SE(step, "params")
        if setd.has_key("pmd_files") and setd["pmd_files"]:
            SE(params, "param", pmd=pmds.joinpath(setd["pmd_files"][0].name))
        SE(params, "param", log=setd["trace_path"])            
        SE(params, "param", timeout="60")
        elem = SE(params, "param")
        elem.set('date-format', "yyyyMMdd")
        elem = SE(params, "param")
        elem.set('time-format', "HHmmss")

    def generate_steps_ctcdata(self, setd, case, test_plan):
        """generates steps for installing CTC data"""
        global CTC_PATHS_LIST
        ctc_helium_path_list = []
        
        step = SE(case, "step", name="Save CTC data", harness=setd["test_harness"], **self.defaults)
        SE(step, "command").text = "execute"
        params = SE(step, "params")
        SE(params, "param", parameters="writelocal")
        SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
        step = SE(case, "step", name="Save CTC data", harness=setd["test_harness"], **self.defaults)
        SE(step, "command").text = "execute"
        params = SE(step, "params")
        SE(params, "param", parameters="writefile")
        SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
            
        if test_plan["ctc_run_process_params"].strip() != "":
            #preparing local-path for CTC step
            #getting '39865' as diamonds ID out of 'http://diamonds.server.domain.invalid/diamonds/builds/39865/'
            if not test_plan["diamonds_build_url"]:
                raise Exception('Diamonds needed for CTC')
            if test_plan["diamonds_build_url"].rfind("/", 0):
                diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 2)[1]
            else:
                diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 1)[1]
            
            #separating network id and drop number from 10.11.3.2\share#ats\drop2.zip#3
            #'drop2' from the other part of the string conjuncted with a # sign
            ats_network = r"\\" + test_plan["ctc_run_process_params"].rsplit("#", 2)[0] #network host
            temp_drop_id = path(test_plan["ctc_run_process_params"].rsplit("#", 2)[1].rsplit(".", 1)[0]).normpath() #drop ID
            if atssep in temp_drop_id:
                drop_id = temp_drop_id.rsplit(atssep, 1)[1]
            else:
                drop_id = temp_drop_id

            ats_network_path = atspath.join(ats_network, "ctc_helium" , diamonds_id, drop_id, setd["name"], "ctcdata")
            ctc_helium_path_list.append(ats_network_path)
            
            step = SE(case, "step", name="Fetch CTC data for post commands execution", harness=setd["test_harness"], **self.defaults)
            SE(step, "command").text = "fetch-log"
            params = SE(step, "params")
            SE(params, "param", delete="false")
            elem = SE(params, "param")
            elem.set('local-path', ats_network_path)
            SE(params, "param", path=path(self.CTC_LOG_DIR).joinpath(r"ctcdata.txt"))

            CTC_PATHS_LIST += ctc_helium_path_list #creating list of ctcdata.txt files for runProcess postaction
        
        step = SE(case, "step", name="Fetch and clean CTC data", harness=setd["test_harness"], **self.defaults)
        SE(step, "command").text = "fetch-log"
        params = SE(step, "params")
        SE(params, "param", delete="true")
        SE(params, "param", path=path(self.CTC_LOG_DIR).joinpath(r"ctcdata.txt"))
        
    def generate_steps_logfetching(self, setd, case):
        """generates steps for fetching log file"""
        step = SE(case, "step", name="Fetch test module logs", harness=setd["test_harness"], **self.defaults)
        SE(step, "command").text = "fetch-log"
        params = SE(step, "params")
        SE(params, "param", type="text")
        SE(params, "param", delete="true")
        if setd["test_harness"] == "STIF":
            SE(params, "param", path=path(self.STIF_LOG_DIR).joinpath(r"*"))
        if setd["test_harness"] == "GENERIC":
            if self.check_mtf_harness(setd):
                SE(params, "param", path=path(self.MTF_LOG_DIR).joinpath(r"*"))
            else:
                SE(params, "param", path=path(self.TEF_LOG_DIR).joinpath(r"*"))
        elif setd["test_harness"] == "STIFUNIT":
            SE(params, "param", path=path(self.STIFUNIT_LOG_DIR).joinpath(r"*"))
        elif setd["test_harness"] == "EUNIT":
            if self.check_qt_harness(setd):
                SE(params, "param", path=path(self.QT_LOG_DIR).joinpath(r"*"))
            else:
                SE(params, "param", path=path(self.EUNIT_LOG_DIR).joinpath(r"*"))

    
    def get_sorted_images(self, setd):
        """sort the images """
        sorted_images = []
        for image_file in setd["image_files"]:
            if 'core' in image_file.name:
                sorted_images.append(image_file)
        for image_file in setd["image_files"]:
            if 'rofs2' in image_file.name:
                sorted_images.append(image_file)
        for image_file in setd["image_files"]:
            if 'rofs3' in image_file.name:
                sorted_images.append(image_file)
        for image_file in setd["image_files"]:
            if 'core' not in image_file.name and 'rofs2' not in image_file.name and 'rofs3' not in image_file.name:
                sorted_images.append(image_file)
        if len(sorted_images) > 0 and "rofs" in sorted_images[0]:
            return setd["image_files"]
        return sorted_images
    
    def generate_steps(self, setd, case, test_plan):
        """Generate the test plan <step>s."""
        # Flash images.
        images = self.drop_path_root.joinpath("images")
        pmds = self.drop_path_root.joinpath("pmds")
        
        sorted_images = self.get_sorted_images(setd)
        for image_file in sorted_images:
            flash = SE(case, "flash", images=images.joinpath(os.path.basename(image_file)))
            flash.set("target-alias", "DEFAULT_%s" % setd["test_harness"])
            
        if setd['custom_dir']:
            insert_custom_file(case, os.path.join(setd['custom_dir'], "prestep_custom.xml"))

        if setd["ctc_enabled"] == "True":
            step = SE(case, "step", name="Create CTC log dir", harness=setd["test_harness"], **self.defaults)
            SE(step, "command").text = "makedir"
            params = SE(step, "params")
            SE(params, "param", dir=self.CTC_LOG_DIR)
            step = SE(case, "step", name="CTC start", harness=setd["test_harness"], **self.defaults)
            SE(step, "command").text = "execute"
            params = SE(step, "params")
            SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
            
        # STIF log dir.
        setd = self.generate_steps_logdir(setd, case)

        # Engine ini install ( if one exists )
        if setd["engine_ini_file"] != None:
            self.generate_steps_engineini(setd, case)
        
        #If sis files
        if setd.has_key("sis_files") and setd["sis_files"]:
            self.generate_steps_sisfiles(setd, case, test_plan)    

        # If tracing enabled, Start Tracing:
        if setd.has_key("trace_path") and setd["trace_path"] != "":
            self.generate_steps_tracestart(setd, case, pmds)

        #core steps of a step

        if setd['custom_dir']:
            insert_custom_file(case, os.path.join(setd['custom_dir'], "prerun_custom.xml"))
        self.generate_steps_createstep(setd, case, test_plan)

        if setd['custom_dir']:
            insert_custom_file(case, os.path.join(setd['custom_dir'], "postrun_custom.xml"))
        
        # If tracing enabled, Stop Tracing
        if setd.has_key("trace_path") and setd["trace_path"] != "":
            self.generate_steps_tracestop(setd, case, pmds)

        #install CTC data
        if setd["ctc_enabled"] == "True":
            self.generate_steps_ctcdata(setd, case, test_plan)
            
        # Log file fetching.
        self.generate_steps_logfetching(setd, case)

        if setd['custom_dir']:
            insert_custom_file(case, os.path.join(setd['custom_dir'], "poststep_custom.xml"))


    def generate_runsteps_tef(self, setd, case, src_dst, time_out):
        """generates runsteps for tef"""
        for file1 in src_dst:
            if 'testscript' in file1[2]:
                filename = file1[1]
                filename = filename[file1[1].rfind(os.sep)+1:]
                harness = "testexecute.exe"
                if file1[2] == "testscript:mtf":
                    harness = "testframework.exe"
                step = SE(case, "step", 
                              name="Execute test: %s" %  filename, harness=setd["test_harness"], 
                              **self.defaults)
                SE(step, "command").text = "execute"
                params = SE(step, "params")
                SE(params, "param", file=harness)
                SE(params, "param", parameters=file1[1])
                
                if file1[2] == "testscript:mtf":
                    SE(params, "param", {'result-file': self.MTF_LOG_DIR + os.sep + filename.replace('.script', '.htm')})
                else:
                    SE(params, "param", {'result-file': self.TEF_LOG_DIR + os.sep + filename.replace('.script', '.htm')})
                SE(params, "param", timeout=time_out)
                if file1[2] == "testscript:mtf":
                    SE(params, "param", parser="MTFResultParser")
                else:
                    SE(params, "param", parser="TEFTestResultParser")
            if file1[2] == 'testmodule:rtest':
                filename = file1[1]
                filename = filename[file1[1].rfind(os.sep)+1:]
                step = SE(case, "step", 
                              name="Execute test: %s" %  filename, harness=setd["test_harness"], 
                              **self.defaults)
                SE(step, "command").text = "execute"
                params = SE(step, "params")
                SE(params, "param", file=file1[1])
                #SE(params, "param", {'parameters': '-Dtextshell --'})
                SE(params, "param", {'result-file': self.TEF_LOG_DIR + os.sep + filename.replace(filename.split(".")[-1], 'htm')})
                SE(params, "param", timeout=time_out)
                SE(params, "param", parser="RTestResultParser")
    
    def generate_runsteps_stif(self, setd, case, src_dst, time_out):
        """generates runsteps for stif"""
        ini = cfg = dll = has_tf_ini = False
        ini_file = None
        cfg_files = dll_files = []

        for tf_ini in src_dst:
            if "testframework.ini" in tf_ini[1].lower():
                has_tf_ini = True
        
        for file1 in src_dst:
                
            if "testframework.ini" in file1[1].lower() and file1[2] == "engine_ini" and has_tf_ini:
                ini = True
                ini_file = file1
                
            elif file1[2] == "engine_ini" and not has_tf_ini:
                pipe_ini = open(file1[0], 'r')
                if "[engine_defaults]" in str(pipe_ini.readlines()).lower():
                    ini = True
                    ini_file = file1
            elif file1[2] == "conf":
                if not ini:
                    cfg = True
                    cfg_files.append(file1)
            elif file1[2] == "testmodule":
                if not cfg and not ini:
                    dll = True
                    dll_files.append(file1)
        if ini:
            filename = ini_file[1]
            filename = filename[ini_file[1].rfind(os.sep)+1:]
            step = SE(case, "step",
                      name="Execute test: %s" % filename, 
                      harness=setd["test_harness"], **self.defaults)
            SE(step, "command").text = "run-cases"
            params = SE(step, "params")
            SE(params, "param", filter="*")
            SE(params, "param", timeout=time_out)
            SE(params, "param", engineini=ini_file[1]) 
        elif cfg:
            for conf_file in cfg_files:
                if ".dll" in conf_file[1].lower():
                    continue
                filename = conf_file[1]
                filename = filename[conf_file[1].rfind(os.sep)+1:]
                step = SE(case, "step", 
                              name="Execute test: %s" % filename, 
                              harness=setd["test_harness"], **self.defaults)
                SE(step, "command").text = "run-cases"
                params = SE(step, "params")
                SE(params, "param", module="TESTSCRIPTER")
                elem = SE(params, "param" )
                elem.set('testcase-file', conf_file[1] )
                SE(params, "param", filter="*")
                SE(params, "param", timeout=time_out)
        elif dll:
            for dll_file in dll_files:
                filename = dll_file[1]
                filename = filename[dll_file[1].rfind(os.sep)+1:]
                step = SE(case, "step", 
                              name="Execute test: %s" %  filename, harness=setd["test_harness"], 
                              **self.defaults)
                SE(step, "command").text = "run-cases"
                params = SE(step, "params")
                SE(params, "param", module=filename)
                SE(params, "param", filter="*")
                SE(params, "param", timeout=time_out)

    def generate_runsteps_eunit(self, setd, case, src_dst, time_out, eunit_flags):
        """generates runsteps for eunit"""

        for sdst in src_dst:
            if "." in sdst[1]:
                fileextension = sdst[1].rsplit(".")[1].lower()
            filename = sdst[1]
            filename = filename[filename.rfind(os.sep)+1:]
            if fileextension == "dll" or fileextension == "exe":
                re_dll = re.compile(r'[.]+%s' % fileextension, re.IGNORECASE)
                no_dll = re_dll.sub('', filename)
                no_dll_xml = ''.join([no_dll, u'_log.xml'])


            
            #for EUnit or other executables
            if sdst[2] == "testmodule":
                eunit_exe = "EUNITEXERUNNER.EXE"
                if re_dll.search(filename):                    
                    step = SE(case, "step", name = "Execute test: %s" % filename, harness=setd["test_harness"],
                              **self.defaults)
                    SE(step, "command").text = "execute"
                    params = SE(step, "params")
                    SE(params, "param", file=path(r"z:" + os.sep + "sys" + os.sep + "bin") / eunit_exe)
                    elem = SE(params, "param")
                    elem.set('result-file', path(self.EUNIT_LOG_DIR) / no_dll_xml)
                    SE(params, "param", parameters="%s /F %s /l xml %s" % (eunit_flags, no_dll, filename))
                    SE(params, "param", timeout=time_out)
            
            #for QtTest.lib executables
            elif sdst[2] == "testmodule:qt":
                step = SE(case, "step", name = "Execute Qt-test: %s" % filename, harness=setd["test_harness"],
                          **self.defaults)
                SE(step, "command").text = "execute"
                params = SE(step, "params")
                SE(params, "param", file=path(sdst[1]))
                SE(params, "param", parameters=r"-lightxml -o %s\%s" % (path(self.QT_LOG_DIR),  no_dll_xml))
                elem = SE(params, "param")
                elem.set('result-file', path(self.QT_LOG_DIR) / no_dll_xml)
                SE(params, "param", parser="QTestResultParser")
                elem = SE(params, "param")
                elem.set('delete-result',"true")
                SE(params, "param", async="false")
                SE(params, "param", timeout=time_out)

                

    def generate_run_steps(self, case, setd, time_out, eunit_flags):
        """Generates run-steps"""
        src_dst = setd["src_dst"]
              
        if setd["test_harness"] == "STIF":
            self.generate_runsteps_stif(setd, case, src_dst, time_out)
        if setd["test_harness"] == "GENERIC":
            self.generate_runsteps_tef(setd, case, src_dst, time_out)
        if setd["test_harness"] == "STIFUNIT":
            self.generate_runsteps_stif(setd, case, src_dst, time_out)
            
        if setd["test_harness"] == "EUNIT":
            self.generate_runsteps_eunit(setd, case, src_dst, time_out, eunit_flags)

    def generate_install_step(self, case, step_type, filename, src_dir, 
                              dst_dir, case_harness, src_dst=None):
        """Generate install <step>."""
        if src_dst == None or src_dst == []:
            src_dst = []
            step = SE(case, "step", name="Install %s: %s" % (step_type, filename), 
                      harness=case_harness, **self.defaults)
            SE(step, "command").text = "install"
            params = SE(step, "params")
            SE(params, "param", src=self.drop_path.joinpath(src_dir, filename))
            SE(params, "param", dst=path(dst_dir).joinpath(filename))
        else:
            for sdst in src_dst:
                dst = sdst[1]
                type_ = sdst[2]
                if "testmodule" in type_ or ".dll" in dst:
                    src_dir = dst.replace(":","")
                    src_dir = path(src_dir[:src_dir.rfind(os.sep)])
                    step_type = type_
                    filename = dst[dst.rfind(os.sep)+1:]
                    step = SE(case, "step", name="Install %s: %s" % (step_type, filename), 
                              harness=case_harness, **self.defaults)
                    SE(step, "command").text = "install"
                    params = SE(step, "params")
                    SE(params, "param", src=self.drop_path.joinpath(src_dir, filename))
                    SE(params, "param", dst=path(dst))
            for sdst in src_dst:
                dst = sdst[1]
                type_ = sdst[2]
                if "testmodule" not in type_ and ".dll" not in dst:
                    src_dir = dst.replace(":","")
                    src_dir = path(src_dir[:src_dir.rfind(os.sep)])
                    step_type = type_
                    filename = dst[dst.rfind(os.sep)+1:]
                    step = SE(case, "step", name="Install %s: %s" % (step_type, filename), 
                              harness=case_harness, **self.defaults)
                    SE(step, "command").text = "install"
                    params = SE(step, "params")
                    SE(params, "param", src=self.drop_path.joinpath(src_dir, filename))
                    SE(params, "param", dst=path(dst))

    def drop_files(self, test_plan):
        """Yield a list of drop files."""
        drop_set = set()
        drop_files = []
        pkg_files = []
        for setd in test_plan.sets:
            drop_path = self.drop_path_root.joinpath(setd["name"])
            if setd.has_key("sis_files") and setd["sis_files"]:
                if setd.has_key("pmd_files") and setd["pmd_files"]:
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),
                                  (drop_path.parent, "pmds", setd["pmd_files"]),
                                  (drop_path, "sis", setd["sis_files"]),
                                  (drop_path, "init", [setd["engine_ini_file"]]),
                                  (drop_path, "trace_init", setd["trace_activation_files"]))
                else:
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),
                                  (drop_path, "sis", setd["sis_files"]),
                                  (drop_path, "init", [setd["engine_ini_file"]]))
            elif setd["src_dst"] == []:
                if setd.has_key("pmd_files") and setd["pmd_files"]:
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),
                                  (drop_path.parent, "pmds", setd["pmd_files"]),
                                  (drop_path, "data", setd["data_files"]),
                                  (drop_path, "conf", setd["config_files"]),
                                  (drop_path, "testmodules", setd["testmodule_files"]),
                                  (drop_path, "init", [setd["engine_ini_file"]]),
                                  (drop_path, "trace_init", setd["trace_activation_files"]))
                else:
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),
                                  (drop_path, "data", setd["data_files"]),
                                  (drop_path, "conf", setd["config_files"]),
                                  (drop_path, "testmodules", setd["testmodule_files"]),
                                  (drop_path, "init", [setd["engine_ini_file"]]))
            elif setd["src_dst"] != []:
                for x_temp in setd["src_dst"]:
                    src = x_temp[0]
                    dst = x_temp[1]
                    dst2 = dst.replace(":","")
                    pkg_files.append((drop_path, dst2, src))
                if setd.has_key("pmd_files") and setd["pmd_files"]:
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),
                                  (drop_path.parent, "pmds", setd["pmd_files"]),
                                  (drop_path, "trace_init", setd["trace_activation_files"]))
                else:
                    drop_files = ((drop_path.parent, "images", setd["image_files"]),)
            for drop_dir, sub_dir, files in drop_files:
                for file_path in files:
                    if file_path != None:
                        drop_file = drop_dir.joinpath(sub_dir, file_path.name)
                        drop_file = drop_file.normpath()
                        if drop_file not in drop_set:
                            drop_set.add(drop_file)
                            yield (drop_file, file_path.normpath())
            for drop_dir, sub_dir, files in pkg_files:
                drop_file = drop_dir.joinpath(sub_dir.replace('\\', os.sep))
                drop_file = drop_file.normpath()
                file_path = path(files)
                if drop_file not in drop_set:
                    drop_set.add(drop_file)
                    yield (drop_file, file_path.normpath())

    def generate_files(self, test_plan):
        """Generate the <files> section."""
        files_elem = E("files")
        for drop_file, _ in self.drop_files(test_plan):
            SE(files_elem, "file").text = drop_file
        return files_elem
        
    def check_mtf_harness(self, _setd_):
        """check the testscript.mtf file is present"""
        for _srcdst_ in _setd_['src_dst']:
            if _srcdst_[2] == "testscript:mtf":
                return True
        return False

    def check_qt_harness(self, _setd_):
        """ check the QT harness is OK """
        _setd_ = _setd_
        is_qt_test = False
        if _setd_.has_key("sis_files"):
            _dict_key_ = "sis_files"
        else:
            _dict_key_ = "src_dst"
            
        for _srcdst_ in _setd_[_dict_key_]:
            if "testmodule:qt" == _srcdst_[2]:
                is_qt_test = True
        return is_qt_test 

def generate_target(test_plan, root):
    """Generate targets"""
    harness = test_plan["harness"]
    if harness == "MULTI_HARNESS":
        input_targets(test_plan, root, ["STIF", "EUNIT"])
    elif harness == "MULTI_HARNESS_GENERIC_STIF":
        input_targets(test_plan, root, ["STIF", "GENERIC"])
    elif harness == "STIF":
        input_targets(test_plan, root, ["STIF"])
    elif harness == "EUNIT":
        input_targets(test_plan, root, ["EUNIT"])
    elif harness == "STIFUNIT":
        input_targets(test_plan, root, ["STIFUNIT"])
    elif harness == "GENERIC":
        input_targets(test_plan, root, ["GENERIC"])
                
def input_targets(test_plan, root, harness_type):
    """Append target(s) into the XML"""
    target = E("target")
    for har in harness_type:
        device = SE(target, "device", rank="none", alias="DEFAULT_%s" % har)
        SE(device, "property", name="HARNESS", value=har)
        SE(device, "property", name="TYPE", value=test_plan["device_type"])
        if test_plan["device_hwid"] != "":
            SE(device, "property", name="HWID", value=test_plan["device_hwid"])
        if test_plan["trace_enabled"] != "":
            if test_plan["trace_enabled"].lower() == "true":
                SE(device, "property", name="TRACE_ENABLED", value=test_plan["trace_enabled"])
    root.append(target)


def insert_custom_file(xmltree, filename):
    """
    Inserts into the given XML tree the given customization file
    Broken input XML inserts a comment to the XML tree
    """
    try:
        custom_action_file = codecs.open(filename, "r", "iso-8859-15")
        loop = ''
        cust = unicode(custom_action_file.read(1))
        try:
            # try to read the file  and addcharacter by character until the 
            # elementtree is happy and then reset the loop and continue until the file is 
            # completely processed. Known issue: file ending in comment will cause a warning.
            while cust:
                if loop != '' :
                  # if we have something left from the previous try
                    cust = loop + cust
#                _logger.debug("what is cust  \n %s \n" % cust)
                try: 
                    xmltree.append(et.XML(cust.encode("ISO-8859-15")))
                except ExpatError, err:
#                    _logger.debug("Error %s in XML when prosessing file %s \n Line and column refer to section:\n%s\n" % ( err, filename, loop))
                    loop = cust
                else:
                # clear the loop variable 
                    loop = ''
                cust = unicode(custom_action_file.read(1))
        except Exception, err:
            _logger.error("Error %s in XML when prosessing %s\n" % ( err, filename))
            xmltree.append(et.Comment("Error in XML file when prosessing %s\n" % ( filename)))

        if loop != '' :
            # we should have used all the input and cleared loop variable
            _logger.warning("Issues in customization file %s in XML when prosessing issue %s \n Line and column refer to section:\n%s\n" % ( filename, err,  loop))

        custom_action_file.close()
    except IOError, err:
#        _logger.debug("This is for debugging only. Do not treat this as anything else. Anything is OK... The data: %s when prosessing %s\n" % (err, filename))
        pass
    else: 
        _logger.info("Included file %s" % ( filename))

def generate_post_actions(test_plan):
    """Generate post actions."""
    actions = []
    
    if not test_plan.custom_dir is None:
        insert_custom_file(actions, test_plan.custom_dir.joinpath("prepostaction.xml"))
    
    for action_type, parameters in test_plan.post_actions:
        action = E("postAction")
        SE(action, "type").text = action_type
        params = SE(action, "params")
        for name, value in parameters:
            SE(params, "param", name=name, value=value)
        actions.append(action)

    if not test_plan.custom_dir is None:
        insert_custom_file(actions, test_plan.custom_dir.joinpath("postpostaction.xml"))

    return actions


class Ats3TemplateTestDropGenerator(Ats3TestDropGenerator):
    """ATS3 template for test drop generator"""
    STIF_LOG_DIR = r"c:\logs\testframework"
    TEF_LOG_DIR = r"c:\logs\testexecute"
    MTF_LOG_DIR = r"c:\logs\testresults"
    STIFUNIT_LOG_DIR = r"c:\logs\testframework"
    EUNIT_LOG_DIR = r"c:\Shared\EUnit\logs"
    #QT_LOG_DIR = r"c:\private\Qt\logs"
    QT_LOG_DIR = r"c:\shared\EUnit\logs"
    CTC_LOG_DIR = r"c:\data\ctc"
    AtsInterface_LOG_DIR = r"c:\spd_logs\xml"

    def stif_init_file(self, src_dst):
        """init the STIF format file"""
        has_tf_ini = False
        ini_file = None

        for tf_ini in src_dst:
            if "testframework.ini" in tf_ini[1].lower():
                has_tf_ini = True
        
        for file1 in src_dst:
            if "testframework.ini" in file1[1].lower() and file1[2] == "engine_ini" and has_tf_ini:
                ini_file = file1
            elif file1[2] == "engine_ini" and not has_tf_ini:
                pipe_ini = open(file1[0], 'r')
                if "[engine_defaults]" in str(pipe_ini.readlines()).lower():
                    ini_file = file1
        return ini_file

    def ctcnetworkpath(self, setd, test_plan):
        """CTC network path handling"""
        #preparing local-path for CTC step
        #getting '39865' as diamonds ID out of 'http://diamonds.server.domain.invalid/diamonds/builds/39865/'
        if test_plan["diamonds_build_url"].rfind("/", 0):
            diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 2)[1]
        else:
            diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 1)[1]
        
        #separating network id and drop number from 10.11.3.2\share#ats\drop2.zip#3
        #'drop2' from the other part of the string conjuncted with a # sign
        ats_network = r"\\" + test_plan["ctc_run_process_params"].rsplit("#", 2)[0] #network host
        temp_drop_id = path(test_plan["ctc_run_process_params"].rsplit("#", 2)[1].rsplit(".", 1)[0]).normpath() #drop ID
        if atssep in temp_drop_id:
            drop_id = temp_drop_id.rsplit(atssep, 1)[1]
        else:
            drop_id = temp_drop_id

        return atspath.join(ats_network, "ctc_helium" , diamonds_id, drop_id, setd["name"], "ctcdata")
    
    def stifmodulename(self, ini_file):
        modname = None
        ini = open(ini_file)
        for l in ini:
            if l.startswith('ModuleName'):
                modname = l.split('=')[1].strip()
        ini.close()
        return modname

    def getlogdir(self, test_plan, setd):
        """ find the logger directory"""
        if setd["test_harness"] == "STIF":
            if test_plan['hti'] == 'True':
                return self.STIF_LOG_DIR
            return self.AtsInterface_LOG_DIR
        elif setd["test_harness"] == "STIFUNIT":
            return self.STIFUNIT_LOG_DIR
        elif setd["test_harness"] == "GENERIC":
            if self.check_mtf_harness(setd):
                return self.MTF_LOG_DIR
            else:
                return self.TEF_LOG_DIR
        elif setd["test_harness"] == "EUNIT":
            if self.check_qt_harness(setd):
                return self.QT_LOG_DIR
            else:
                return self.EUNIT_LOG_DIR

    def generate_xml(self, test_plan):
        """generate the XML"""
        
        customdirs = []
        if test_plan.custom_dir:
            customdirs = [jinja2.FileSystemLoader(test_plan.custom_dir)]
        for setd in test_plan.sets:
            if setd['custom_dir']:
                customdirs.append(jinja2.FileSystemLoader(setd['custom_dir']))
        
        loader = jinja2.ChoiceLoader([jinja2.PackageLoader(__name__, 'templates')] + customdirs)
        env = jinja2.Environment(loader=loader)
        
        if hasattr(test_plan, 'custom_template'):
            template = env.from_string(open(test_plan.custom_template).read())
        else:
            template = env.from_string(pkg_resources.resource_string(__name__, 'ats4_template.xml'))# pylint: disable-msg=E1101

        xmltext = template.render(test_plan=test_plan, os=os, atspath=atspath, atsself=self).encode('ISO-8859-1')
        return et.ElementTree(et.XML(xmltext))