buildframework/helium/tools/common/python/lib/ats3/dropgenerator.py
changeset 179 d8ac696cc51f
parent 1 be27ed110b50
equal deleted inserted replaced
1:be27ed110b50 179:d8ac696cc51f
    38 import atsconfigparser
    38 import atsconfigparser
    39 
    39 
    40 # pylint: disable-msg=W0404
    40 # pylint: disable-msg=W0404
    41 from ntpath import sep as atssep
    41 from ntpath import sep as atssep
    42 import ntpath as atspath
    42 import ntpath as atspath
    43 from ntpath import sep as sossep
    43 
       
    44 import jinja2
    44 
    45 
    45 _logger = logging.getLogger('ats')
    46 _logger = logging.getLogger('ats')
    46 
    47 
    47 # Shortcuts
    48 # Shortcuts
    48 E = et.Element
    49 E = et.Element
   170         return plan
   171         return plan
   171 
   172 
   172     def generate_steps_logdir(self, setd, case):
   173     def generate_steps_logdir(self, setd, case):
   173         """generates STIF log dir."""
   174         """generates STIF log dir."""
   174         
   175         
   175         _qt_test_ = check_qt_harness(setd)
   176         _qt_test_ = self.check_qt_harness(setd)
   176         if _qt_test_:
   177         if _qt_test_:
   177             step = SE(case, "step", name="Create QT log dir", harness=setd["test_harness"], **self.defaults)
   178             step = SE(case, "step", name="Create QT log dir", harness=setd["test_harness"], **self.defaults)
   178         else:
   179         else:
   179             step = SE(case, "step", name="Create %s log dir" % setd["test_harness"], harness=setd["test_harness"], **self.defaults)
   180             step = SE(case, "step", name="Create %s log dir" % setd["test_harness"], harness=setd["test_harness"], **self.defaults)
   180         SE(step, "command").text = "makedir"
   181         SE(step, "command").text = "makedir"
   181         if setd["test_harness"] == "STIF":
   182         if setd["test_harness"] == "STIF":
   182             SE(SE(step, "params"), "param", dir=self.STIF_LOG_DIR)
   183             SE(SE(step, "params"), "param", dir=self.STIF_LOG_DIR)
   183         if setd["test_harness"] == "GENERIC":
   184         if setd["test_harness"] == "GENERIC":
   184             if check_mtf_harness(setd):
   185             if self.check_mtf_harness(setd):
   185                 SE(SE(step, "params"), "param", dir=self.MTF_LOG_DIR)
   186                 SE(SE(step, "params"), "param", dir=self.MTF_LOG_DIR)
   186             else:
   187             else:
   187                 SE(SE(step, "params"), "param", dir=self.TEF_LOG_DIR)
   188                 SE(SE(step, "params"), "param", dir=self.TEF_LOG_DIR)
   188         elif setd["test_harness"] == "EUNIT":
   189         elif setd["test_harness"] == "EUNIT":
   189             if _qt_test_:
   190             if _qt_test_:
   360         SE(step, "command").text = "execute"
   361         SE(step, "command").text = "execute"
   361         params = SE(step, "params")
   362         params = SE(step, "params")
   362         SE(params, "param", parameters="writefile")
   363         SE(params, "param", parameters="writefile")
   363         SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
   364         SE(params, "param", file=path(r"z:\sys\bin\ctcman.exe"))
   364             
   365             
   365         if test_plan["ats_network_drive"].strip() != "":
   366         if test_plan["ctc_run_process_params"].strip() != "":
   366             #preparing local-path for CTC step
   367             #preparing local-path for CTC step
   367             #getting '39865' as diamonds ID out of 'http://diamonds.nmp.nokia.com/diamonds/builds/39865/'
   368             #getting '39865' as diamonds ID out of 'http://diamonds.nmp.nokia.com/diamonds/builds/39865/'
   368             if test_plan["diamonds_build_url"].rfind("/", 0):
   369             if test_plan["diamonds_build_url"].rfind("/", 0):
   369                 diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 2)[1]
   370                 diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 2)[1]
   370             else:
   371             else:
   371                 diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 1)[1]
   372                 diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 1)[1]
   372             
   373             
   373             #separating network id and drop number from 10.11.3.2\share#ats\drop2.zip
   374             #separating network id and drop number from 10.11.3.2\share#ats\drop2.zip#3
   374             #'drop2' from the other part of the string conjuncted with a # sign
   375             #'drop2' from the other part of the string conjuncted with a # sign
   375             ats_network = r"\\" + test_plan["ats_network_drive"].rsplit("#", 1)[0] #network host
   376             ats_network = r"\\" + test_plan["ctc_run_process_params"].rsplit("#", 2)[0] #network host
   376             temp_drop_id = path(test_plan["ats_network_drive"].rsplit("#", 1)[1].rsplit(".", 1)[0]).normpath() #drop ID
   377             temp_drop_id = path(test_plan["ctc_run_process_params"].rsplit("#", 2)[1].rsplit(".", 1)[0]).normpath() #drop ID
   377             drop_id = temp_drop_id.rsplit(atssep, 1)[1]
   378             if atssep in temp_drop_id:
   378             
   379                 drop_id = temp_drop_id.rsplit(atssep, 1)[1]
       
   380             else:
       
   381                 drop_id = temp_drop_id
       
   382 
   379             ats_network_path = atspath.join(ats_network, "ctc_helium" , diamonds_id, drop_id, setd["name"], "ctcdata")
   383             ats_network_path = atspath.join(ats_network, "ctc_helium" , diamonds_id, drop_id, setd["name"], "ctcdata")
   380             ctc_helium_path_list.append(ats_network_path)
   384             ctc_helium_path_list.append(ats_network_path)
   381             
   385             
   382             step = SE(case, "step", name="Fetch CTC data for post commands execution", harness=setd["test_harness"], **self.defaults)
   386             step = SE(case, "step", name="Fetch CTC data for post commands execution", harness=setd["test_harness"], **self.defaults)
   383             SE(step, "command").text = "fetch-log"
   387             SE(step, "command").text = "fetch-log"
   403         SE(params, "param", type="text")
   407         SE(params, "param", type="text")
   404         SE(params, "param", delete="true")
   408         SE(params, "param", delete="true")
   405         if setd["test_harness"] == "STIF":
   409         if setd["test_harness"] == "STIF":
   406             SE(params, "param", path=path(self.STIF_LOG_DIR).joinpath(r"*"))
   410             SE(params, "param", path=path(self.STIF_LOG_DIR).joinpath(r"*"))
   407         if setd["test_harness"] == "GENERIC":
   411         if setd["test_harness"] == "GENERIC":
   408             if check_mtf_harness(setd):
   412             if self.check_mtf_harness(setd):
   409                 SE(params, "param", path=path(self.MTF_LOG_DIR).joinpath(r"*"))
   413                 SE(params, "param", path=path(self.MTF_LOG_DIR).joinpath(r"*"))
   410             else:
   414             else:
   411                 SE(params, "param", path=path(self.TEF_LOG_DIR).joinpath(r"*"))
   415                 SE(params, "param", path=path(self.TEF_LOG_DIR).joinpath(r"*"))
   412         elif setd["test_harness"] == "STIFUNIT":
   416         elif setd["test_harness"] == "STIFUNIT":
   413             SE(params, "param", path=path(self.STIFUNIT_LOG_DIR).joinpath(r"*"))
   417             SE(params, "param", path=path(self.STIFUNIT_LOG_DIR).joinpath(r"*"))
   414         elif setd["test_harness"] == "EUNIT":
   418         elif setd["test_harness"] == "EUNIT":
   415             if check_qt_harness(setd):
   419             if self.check_qt_harness(setd):
   416                 SE(params, "param", path=path(self.QT_LOG_DIR).joinpath(r"*"))
   420                 SE(params, "param", path=path(self.QT_LOG_DIR).joinpath(r"*"))
   417             else:
   421             else:
   418                 SE(params, "param", path=path(self.EUNIT_LOG_DIR).joinpath(r"*"))
   422                 SE(params, "param", path=path(self.EUNIT_LOG_DIR).joinpath(r"*"))
   419 
   423 
   420     def generate_steps(self, setd, case, test_plan):
   424     
   421         """Generate the test plan <step>s."""
   425     def get_sorted_images(self, setd):
   422         # Flash images.
       
   423         images = self.drop_path_root.joinpath("images")
       
   424         pmds = self.drop_path_root.joinpath("pmds")
       
   425         
       
   426         sorted_images = []
   426         sorted_images = []
   427         for image_file in setd["image_files"]:
   427         for image_file in setd["image_files"]:
   428             if 'core' in image_file.name:
   428             if 'core' in image_file.name:
   429                 sorted_images.append(image_file.name)
   429                 sorted_images.append(image_file.name)
   430         for image_file in setd["image_files"]:
   430         for image_file in setd["image_files"]:
   434             if 'rofs3' in image_file.name:
   434             if 'rofs3' in image_file.name:
   435                 sorted_images.append(image_file.name)
   435                 sorted_images.append(image_file.name)
   436         for image_file in setd["image_files"]:
   436         for image_file in setd["image_files"]:
   437             if 'core' not in image_file.name and 'rofs2' not in image_file.name and 'rofs3' not in image_file.name:
   437             if 'core' not in image_file.name and 'rofs2' not in image_file.name and 'rofs3' not in image_file.name:
   438                 sorted_images.append(image_file.name)
   438                 sorted_images.append(image_file.name)
   439                 
   439         if "rofs" in sorted_images[0]:
       
   440             return setd["image_files"]
       
   441         return sorted_images
       
   442     
       
   443     def generate_steps(self, setd, case, test_plan):
       
   444         """Generate the test plan <step>s."""
       
   445         # Flash images.
       
   446         images = self.drop_path_root.joinpath("images")
       
   447         pmds = self.drop_path_root.joinpath("pmds")
       
   448         
       
   449         sorted_images = self.get_sorted_images(setd)
   440         for image_file in sorted_images:
   450         for image_file in sorted_images:
   441             flash = SE(case, "flash", images=images.joinpath(image_file))
   451             flash = SE(case, "flash", images=images.joinpath(image_file))
   442             flash.set("target-alias", "DEFAULT_%s" % setd["test_harness"])
   452             flash.set("target-alias", "DEFAULT_%s" % setd["test_harness"])
   443 
   453             
   444         if not test_plan.custom_dir is None:
   454         if not test_plan.custom_dir is None:
   445             insert_custom_file(case, test_plan.custom_dir.joinpath("prestep_custom.xml"))
   455             insert_custom_file(case, test_plan.custom_dir.joinpath("prestep_custom.xml"))
   446 
   456 
   447         if setd["ctc_enabled"] == "True":
   457         if setd["ctc_enabled"] == "True":
   448             step = SE(case, "step", name="Create CTC log dir", harness=setd["test_harness"], **self.defaults)
   458             step = SE(case, "step", name="Create CTC log dir", harness=setd["test_harness"], **self.defaults)
   513                 if file1[2] == "testscript:mtf":
   523                 if file1[2] == "testscript:mtf":
   514                     SE(params, "param", {'result-file': self.MTF_LOG_DIR + os.sep + filename.replace('.script', '.htm')})
   524                     SE(params, "param", {'result-file': self.MTF_LOG_DIR + os.sep + filename.replace('.script', '.htm')})
   515                 else:
   525                 else:
   516                     SE(params, "param", {'result-file': self.TEF_LOG_DIR + os.sep + filename.replace('.script', '.htm')})
   526                     SE(params, "param", {'result-file': self.TEF_LOG_DIR + os.sep + filename.replace('.script', '.htm')})
   517                 SE(params, "param", timeout=time_out)
   527                 SE(params, "param", timeout=time_out)
   518                 SE(params, "param", parser="TEFTestResultParser")
   528                 if file1[2] == "testscript:mtf":
   519             if 'testmodule' in file1[2]:
   529                     SE(params, "param", parser="MTFResultParser")
       
   530                 else:
       
   531                     SE(params, "param", parser="TEFTestResultParser")
       
   532             if file1[2] == 'testmodule:rtest':
   520                 filename = file1[1]
   533                 filename = file1[1]
   521                 filename = filename[file1[1].rfind(os.sep)+1:]
   534                 filename = filename[file1[1].rfind(os.sep)+1:]
   522                 step = SE(case, "step", 
   535                 step = SE(case, "step", 
   523                               name="Execute test: %s" %  filename, harness=setd["test_harness"], 
   536                               name="Execute test: %s" %  filename, harness=setd["test_harness"], 
   524                               **self.defaults)
   537                               **self.defaults)
   752                         drop_file = drop_file.normpath()
   765                         drop_file = drop_file.normpath()
   753                         if drop_file not in drop_set:
   766                         if drop_file not in drop_set:
   754                             drop_set.add(drop_file)
   767                             drop_set.add(drop_file)
   755                             yield (drop_file, file_path.normpath())
   768                             yield (drop_file, file_path.normpath())
   756             for drop_dir, sub_dir, files in pkg_files:
   769             for drop_dir, sub_dir, files in pkg_files:
   757                 drop_file = drop_dir.joinpath(sub_dir)
   770                 drop_file = drop_dir.joinpath(sub_dir.replace('\\', os.sep))
   758                 drop_file = drop_file.normpath()
   771                 drop_file = drop_file.normpath()
   759                 file_path = path(files)
   772                 file_path = path(files)
   760                 if drop_file not in drop_set:
   773                 if drop_file not in drop_set:
   761                     drop_set.add(drop_file)
   774                     drop_set.add(drop_file)
   762                     yield (drop_file, file_path.normpath())
   775                     yield (drop_file, file_path.normpath())
   765         """Generate the <files> section."""
   778         """Generate the <files> section."""
   766         files_elem = E("files")
   779         files_elem = E("files")
   767         for drop_file, _ in self.drop_files(test_plan):
   780         for drop_file, _ in self.drop_files(test_plan):
   768             SE(files_elem, "file").text = drop_file
   781             SE(files_elem, "file").text = drop_file
   769         return files_elem
   782         return files_elem
       
   783         
       
   784     def check_mtf_harness(self, _setd_):
       
   785         for _srcdst_ in _setd_['src_dst']:
       
   786             if _srcdst_[2] == "testscript:mtf":
       
   787                 return True
       
   788         return False
       
   789 
       
   790     def check_qt_harness(self, _setd_):
       
   791         _setd_ = _setd_
       
   792         is_qt_test = False
       
   793         if _setd_.has_key("sis_files"):
       
   794             _dict_key_ = "sis_files"
       
   795         else:
       
   796             _dict_key_ = "src_dst"
       
   797             
       
   798         for _srcdst_ in _setd_[_dict_key_]:
       
   799             if "testmodule:qt" == _srcdst_[2]:
       
   800                 is_qt_test = True
       
   801         return is_qt_test 
   770 
   802 
   771 def generate_target(test_plan, root):
   803 def generate_target(test_plan, root):
   772     """Generate targets"""
   804     """Generate targets"""
   773     harness = test_plan["harness"]
   805     harness = test_plan["harness"]
   774     if harness == "MULTI_HARNESS":
   806     if harness == "MULTI_HARNESS":
   837 #        _logger.debug("This is for debugging only. Do not treat this as anything else. Anything is OK... The data: %s when prosessing %s\n" % (err, filename))
   869 #        _logger.debug("This is for debugging only. Do not treat this as anything else. Anything is OK... The data: %s when prosessing %s\n" % (err, filename))
   838         pass
   870         pass
   839     else: 
   871     else: 
   840         _logger.info("Included file %s" % ( filename))
   872         _logger.info("Included file %s" % ( filename))
   841 
   873 
   842 def check_qt_harness(_setd_):
       
   843     _setd_ = _setd_
       
   844     is_qt_test = False
       
   845     if _setd_.has_key("sis_files"):
       
   846         _dict_key_ = "sis_files"
       
   847     else:
       
   848         _dict_key_ = "src_dst"
       
   849         
       
   850     for _srcdst_ in _setd_[_dict_key_]:
       
   851         if "testmodule:qt" == _srcdst_[2]:
       
   852             is_qt_test = True
       
   853     return is_qt_test 
       
   854 
       
   855 def check_mtf_harness(_setd_):
       
   856     for _srcdst_ in _setd_['src_dst']:
       
   857         if _srcdst_[2] == "testscript:mtf":
       
   858             return True
       
   859     return False
       
   860 
       
   861 def generate_post_actions(test_plan):
   874 def generate_post_actions(test_plan):
   862     """Generate post actions."""
   875     """Generate post actions."""
   863     import string
       
   864     actions = []
   876     actions = []
   865     
   877     
   866     if not test_plan.custom_dir is None:
   878     if not test_plan.custom_dir is None:
   867         insert_custom_file(actions, test_plan.custom_dir.joinpath("prepostaction.xml"))
   879         insert_custom_file(actions, test_plan.custom_dir.joinpath("prepostaction.xml"))
   868     
   880     
   876 
   888 
   877     if not test_plan.custom_dir is None:
   889     if not test_plan.custom_dir is None:
   878         insert_custom_file(actions, test_plan.custom_dir.joinpath("postpostaction.xml"))
   890         insert_custom_file(actions, test_plan.custom_dir.joinpath("postpostaction.xml"))
   879 
   891 
   880     return actions
   892     return actions
       
   893     
       
   894     
       
   895 class Ats3TemplateTestDropGenerator(Ats3TestDropGenerator):
       
   896 
       
   897     STIF_LOG_DIR = r"c:\logs\testframework"
       
   898     TEF_LOG_DIR = r"c:\logs\testexecute"
       
   899     MTF_LOG_DIR = r"c:\logs\testresults"
       
   900     STIFUNIT_LOG_DIR = r"c:\logs\testframework"
       
   901     EUNIT_LOG_DIR = r"c:\Shared\EUnit\logs"
       
   902     #QT_LOG_DIR = r"c:\private\Qt\logs"
       
   903     QT_LOG_DIR = r"c:\shared\EUnit\logs"
       
   904     CTC_LOG_DIR = r"c:\data\ctc"
       
   905 
       
   906     def stif_init_file(self, src_dst):
       
   907         ini = cfg = dll = has_tf_ini = False
       
   908         ini_file = None
       
   909         cfg_files = dll_files = []
       
   910 
       
   911         for tf_ini in src_dst:
       
   912             if "testframework.ini" in tf_ini[1].lower():
       
   913                 has_tf_ini = True
       
   914         
       
   915         for file1 in src_dst:
       
   916             if "testframework.ini" in file1[1].lower() and file1[2] == "engine_ini" and has_tf_ini:
       
   917                 ini_file = file1
       
   918             elif file1[2] == "engine_ini" and not has_tf_ini:
       
   919                 pipe_ini = open(file1[0], 'r')
       
   920                 if "[engine_defaults]" in str(pipe_ini.readlines()).lower():
       
   921                     ini_file = file1
       
   922         return ini_file
       
   923 
       
   924     def ctcnetworkpath(self, setd, test_plan):
       
   925         #preparing local-path for CTC step
       
   926         #getting '39865' as diamonds ID out of 'http://diamonds.nmp.nokia.com/diamonds/builds/39865/'
       
   927         if test_plan["diamonds_build_url"].rfind("/", 0):
       
   928             diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 2)[1]
       
   929         else:
       
   930             diamonds_id = test_plan["diamonds_build_url"].rsplit(r"/", 1)[1]
       
   931         
       
   932         #separating network id and drop number from 10.11.3.2\share#ats\drop2.zip#3
       
   933         #'drop2' from the other part of the string conjuncted with a # sign
       
   934         ats_network = r"\\" + test_plan["ctc_run_process_params"].rsplit("#", 2)[0] #network host
       
   935         temp_drop_id = path(test_plan["ctc_run_process_params"].rsplit("#", 2)[1].rsplit(".", 1)[0]).normpath() #drop ID
       
   936         if atssep in temp_drop_id:
       
   937             drop_id = temp_drop_id.rsplit(atssep, 1)[1]
       
   938         else:
       
   939             drop_id = temp_drop_id
       
   940 
       
   941         return atspath.join(ats_network, "ctc_helium" , diamonds_id, drop_id, setd["name"], "ctcdata")
       
   942 
       
   943     def getlogdir(self, setd):
       
   944         if setd["test_harness"] == "STIF":
       
   945             return self.STIF_LOG_DIR
       
   946         elif setd["test_harness"] == "STIFUNIT":
       
   947             return self.STIFUNIT_LOG_DIR
       
   948         elif setd["test_harness"] == "GENERIC":
       
   949             if self.check_mtf_harness(setd):
       
   950                 return self.MTF_LOG_DIR
       
   951             else:
       
   952                 return self.TEF_LOG_DIR
       
   953         elif setd["test_harness"] == "EUNIT":
       
   954             if self.check_qt_harness(setd):
       
   955                 return self.QT_LOG_DIR
       
   956             else:
       
   957                 return self.EUNIT_LOG_DIR
       
   958 
       
   959     def generate_xml(self, test_plan):
       
   960         loader = jinja2.ChoiceLoader([jinja2.FileSystemLoader(os.path.join(os.environ['HELIUM_HOME'], 'tools/common/python/lib/ats3/templates')), jinja2.FileSystemLoader(test_plan.custom_dir)])
       
   961         env = jinja2.Environment(loader=loader)
       
   962         template = env.from_string(open(os.path.join(os.environ['HELIUM_HOME'], 'tools/common/python/lib/ats3/ats4_template.xml')).read())
       
   963         
       
   964         xmltext = template.render(test_plan=test_plan, os=os, atspath=atspath, atsself=self).encode('ISO-8859-1')
       
   965         return et.ElementTree(et.XML(xmltext))