diff -r 87cfa131b535 -r e7e0ae78773e configurationengine/source/cone/validation/schemavalidation.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/configurationengine/source/cone/validation/schemavalidation.py Tue Aug 10 14:29:28 2010 +0300 @@ -0,0 +1,495 @@ +# +# Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). +# All rights reserved. +# This component and the accompanying materials are made available +# under the terms of "Eclipse Public License v1.0" +# which accompanies this distribution, and is available +# at the URL "http://www.eclipse.org/legal/epl-v10.html". +# +# Initial Contributors: +# Nokia Corporation - initial contribution. +# +# Contributors: +# +# Description: +# + +import sys, os +import logging +import StringIO +import pkg_resources +import jinja2 +from cone.public import api, utils, exceptions +import cone.public.plugin + +log = logging.getLogger('cone.schemavalidation') + +ROOT_PATH = os.path.abspath(os.path.dirname(__file__)) + +CONFML_SCHEMA_DIR = 'confml_xsd' + +SCHEMA_FILES_BY_NAMESPACE = {'http://www.s60.com/xml/confml/1': 'confml.xsd', + 'http://www.s60.com/xml/confml/2': 'confml2.xsd'} + +_schema_cache = {} + +# ============================================================================ + +def validate_confml_file(config, ref): + """ + Schema-validate the given ConfML file in a configuration. + @param config: The configuration. + @param ref: The resource reference of the file to validate. + @return: A list of api.Problem objects. + """ + return _validate_file(config, ref, validate_confml_data) + +def validate_implml_file(config, ref): + """ + Schema-validate the given ImplML file in a configuration. + @param config: The configuration. + @param ref: The resource reference of the file to validate. + @return: A list of api.Problem objects. + """ + return _validate_file(config, ref, validate_implml_data) + +def validate_confml_data(data): + """ + Schema-validate the given ConfML data. + + @raise ParseError: Something is wrong with the data (invalid XML, + unsupported ConfML namespace or not conforming to the schema) + """ + _validate_data(data, _get_confml_schema_for_namespace, 'xml.confml') + +def validate_implml_data(data): + """ + Schema-validate the given ImplML data. + + @raise ParseError: Something is wrong with the data (invalid XML, + unsupported ImplML namespace or not conforming to the schema) + """ + _validate_data(data, _get_implml_schema_for_namespace, 'xml.implml') + +# ============================================================================ + +def _validate_file(config, ref, data_validator_func): + res = config.get_resource(ref) + try: data = res.read() + finally: res.close() + + problem = None + try: + data_validator_func(data) + except exceptions.ParseError, e: + problem = api.Problem.from_exception(e) + + if problem: + problem.file = ref + return [problem] + else: + return [] + +def _parse_schema(filename, file_data_dict): + """ + Parse a schema using a filename-data dictionary as the source. + @param filename: Name of the schema file to parse. + @param file_data_dict: Dictionary mapping file names to file data. + @return: The parsed schema object. + """ + if filename not in file_data_dict: + raise RuntimeError("Could not parse XML schema file '%s', no such file" % filename) + + schema_data = file_data_dict[filename] + + import lxml.etree + + parser = lxml.etree.XMLParser() + class Resolver(lxml.etree.Resolver): + def resolve(self, url, id, context): + if url not in file_data_dict: + log.error("Could not resolve schema file '%s', no such file" % url) + raise RuntimeError("No file named '%s'" % url) + data = file_data_dict[url] + return self.resolve_string(data, context) + parser.resolvers.add(Resolver()) + + try: + schema_doc = lxml.etree.fromstring(schema_data, parser=parser) + schema = lxml.etree.XMLSchema(schema_doc) + except lxml.etree.LxmlError, e: + raise RuntimeError( + "Error parsing schema file '%s': %s: %s" \ + % (filename, e.__class__.__name__, str(e))) + return schema + +def _validate_data(data, schema_resolver_func, xml_parse_problem_type): + """ + Validate the given XML data. + @param data: The raw binary data to validate. + @param schema_resolver_func: The function used to resolve the + schema used for validation. The function is given the namespace + of the root element and is supposed to return the schema object + and problem type to use, or raise a ParseError. + @param xml_parse_problem_type: Problem type to use if XML parsing + fails of the data fails. + + @raise ParseError: Something is wrong with the data (invalid XML + or not conforming to the schema) + """ + # Find out the XML namespace in the root element + try: + namespace, _ = utils.xml.get_xml_root(StringIO.StringIO(data)) + except exceptions.XmlParseError, e: + e.problem_type = xml_parse_problem_type + raise e + + schema, problem_type = schema_resolver_func(namespace) + + # Parse the XML document + import lxml.etree + try: + doc = lxml.etree.fromstring(data) + except lxml.etree.XMLSyntaxError, e: + raise exceptions.XmlParseError( + "XML parse error on line %d: %s" % (e.position[0], e), + problem_lineno = e.position[0], + problem_msg = str(e), + problem_type = xml_parse_problem_type) + + # Validate the document against the schema + if not schema.validate(doc): + error = schema.error_log.last_error + raise exceptions.XmlSchemaValidationError( + "Line %d: %s" % (error.line, error.message), + problem_lineno = error.line, + problem_msg = error.message, + problem_type = problem_type) + + +class UnsupportedNamespaceError(exceptions.ParseError): + pass + +_confml_schema_file_cache = None +def get_confml_schema_files(): + global _confml_schema_file_cache + if _confml_schema_file_cache is None: + _confml_schema_file_cache = _load_confml_schema_files() + return _confml_schema_file_cache + +def get_schema_file_data(file): + """ + Return the data of the given XML schema file. + + @raise ValueError: No such schema file exists. + """ + resource_path = CONFML_SCHEMA_DIR + '/' + file + if pkg_resources.resource_exists('cone.validation', resource_path): + data = pkg_resources.resource_string('cone.validation', resource_path) + return data + else: + msg = "Could not get schema file '%s': Package resource '%s' does not exist" \ + % (file, resource_path) + raise ValueError(msg) + +def get_schema_file_for_namespace(namespace): + """ + Return the correct schema file name for the given namespace. + + @param namespace: The namespace for which to get the schema file. + @return: The name of the schema file (suitable for calling + get_schema_file_data() with), or None if no schema is associated + with the namespace. + """ + return SCHEMA_FILES_BY_NAMESPACE.get(namespace, None) + + +def _get_confml_schema_for_namespace(namespace): + """ + Return the correct XML schema and problem type ID for + the given ConfML namespace. + @return: Tuple (schema, problem_type). + """ + PROBLEM_TYPE = 'schema.confml' + + # Return a cached schema if possible + if namespace in _schema_cache: + return _schema_cache[namespace], PROBLEM_TYPE + + # Get the schema file and its raw byte data + schema_file = get_schema_file_for_namespace(namespace) + if schema_file is None: + raise exceptions.ConfmlParseError( + "Unsupported ConfML namespace '%s'" % namespace) + schema_data = get_schema_file_data(schema_file) + + # Parse the schema + import lxml.etree + parser = lxml.etree.XMLParser() + class PackageDataResolver(lxml.etree.Resolver): + def resolve(self, url, id, context): + data = get_schema_file_data(url) + return self.resolve_string(data, context) + parser.resolvers.add(PackageDataResolver()) + schema_doc = lxml.etree.fromstring(schema_data, parser=parser) + schema = lxml.etree.XMLSchema(schema_doc) + + _schema_cache[namespace] = schema + return schema, PROBLEM_TYPE + +def _load_confml_schema_files(): + files = {} + for name in pkg_resources.resource_listdir('cone.validation', CONFML_SCHEMA_DIR): + path = CONFML_SCHEMA_DIR + '/' + name + if path.lower().endswith('.xsd'): + files[name] = pkg_resources.resource_string('cone.validation', path) + return files + +# ============================================================================ +# +# +# ============================================================================ + +# Reader class list stored here so that it can be used to check if the reader +# class list changes, and reload the schema files in that case +_implml_reader_class_list = None + +_implml_schema_file_cache = None +_implml_schema_cache = {} + +def _check_reader_class_list(): + """ + Check if the reader class list has changed, and clear all caches if so. + """ + global _implml_reader_class_list + global _implml_schema_file_cache + global _implml_schema_cache + + rc_list = cone.public.plugin.ImplFactory.get_reader_classes() + if _implml_reader_class_list is not rc_list: + _implml_reader_class_list = rc_list + _implml_schema_file_cache = None + _implml_schema_cache = {} + +def dump_schema_files(dump_dir): + CONFML_SCHEMA_DIR = os.path.join(dump_dir, 'confml') + IMPLML_SCHEMA_DIR = os.path.join(dump_dir, 'implml') + if not os.path.exists(CONFML_SCHEMA_DIR): + os.makedirs(CONFML_SCHEMA_DIR) + if not os.path.exists(IMPLML_SCHEMA_DIR): + os.makedirs(IMPLML_SCHEMA_DIR) + + def dump_files(files, dir): + for name, data in files.iteritems(): + path = os.path.join(dir, name) + f = open(path, 'wb') + try: f.write(data) + finally: f.close() + + dump_files(get_confml_schema_files(), CONFML_SCHEMA_DIR) + dump_files(get_implml_schema_files(), IMPLML_SCHEMA_DIR) + +class _ImplmlReaderEntry(object): + def __init__(self, id, namespace, data, root_elem_name, schema_problem_sub_id): + self.id = id + self.filename = id + '.xsd' + self.namespace = namespace + self.data = data + self.root_elem_name = root_elem_name + self.schema_problem_sub_id = schema_problem_sub_id + +def get_implml_schema_files(): + """ + Return a dictionary of ImplML schema file data by file name. + """ + global _implml_schema_file_cache + + _check_reader_class_list() + if _implml_schema_file_cache is None: + _implml_schema_file_cache = _load_implml_schema_files() + return _implml_schema_file_cache + +def _load_implml_schema_files(): + result = {} + result['implml.xsd'] = _generate_implml_schema_data() + + result['XInclude.xsd'] = pkg_resources.resource_string( + 'cone.validation', CONFML_SCHEMA_DIR + '/XInclude.xsd') + + for entry in _get_implml_reader_entries(): + if entry.data is not None: + result[entry.filename] = entry.data + else: + result[entry.filename] = _generate_default_schema_data(entry) + return result + +def _get_implml_reader_entries(): + entries = [] + for rc in cone.public.plugin.ImplFactory.get_reader_classes(): + # Skip ImplContainerReader + if rc is cone.public.plugin.ImplContainerReader: + continue + + entry = _ImplmlReaderEntry(rc.NAMESPACE_ID, + rc.NAMESPACE, + rc.get_schema_data(), + rc.ROOT_ELEMENT_NAME, + rc.SCHEMA_PROBLEM_SUB_ID) + entries.append(entry) + return entries + +def _generate_implml_schema_data(): + template_data = pkg_resources.resource_string('cone.validation', 'implml_xsd/implml-template.xsd') + template = jinja2.Template(template_data) + data = template.render(data=_get_implml_reader_entries()).encode('utf-8') + return data + +def _generate_default_schema_data(entry): + template_data = pkg_resources.resource_string('cone.validation', 'implml_xsd/default-impl-schema-template.xsd') + template = jinja2.Template(template_data) + data = template.render(entry=entry).encode('utf-8') + return data + +def _get_implml_schema_for_namespace(namespace): + """ + Return the correct XML schema and problem type ID for + the given ImplML namespace. + @return: Tuple (schema, problem_type). + """ + global _implml_schema_cache + + problem_type_sub_id = None + filename = None + if namespace == 'http://www.symbianfoundation.org/xml/implml/1': + filename = 'implml.xsd' + problem_type_sub_id = 'implml' + else: + for entry in _get_implml_reader_entries(): + if entry.namespace == namespace: + filename = entry.filename + problem_type_sub_id = entry.schema_problem_sub_id + break + if filename is None: + raise exceptions.ImplmlParseError( + "Unsupported ImplML namespace: %s" % namespace) + + # Check reader classes before trying to use the schema cache + _check_reader_class_list() + + # Get the schema from cache if possible + if filename in _implml_schema_cache: + return _implml_schema_cache[filename] + + file_data_dict = get_implml_schema_files() + if filename not in file_data_dict: + raise exceptions.ImplmlParseError( + "ImplML schema file '%s' does not exist!" % filename) + + schema = _parse_schema(filename, file_data_dict) + problem_type = 'schema.implml' + if problem_type_sub_id: + problem_type += '.' + problem_type_sub_id + return schema, problem_type + +# ============================================================================ +# +# +# ============================================================================ + +class SchemaValidationTestMixin(object): + """ + Mix-in class for providing assertion methods for unittest.TestCase sub-classes + testing schema validation. + """ + + def assert_schemavalidation_succeeds(self, type, dir, namespace=None): + """ + Assert that schema validation succeeds for all the files in the given directory. + @param type: Type of the schema validation to perform, can be 'confml' or 'implml'. + @param dir: The directory containing the files to validate + @param namespace: If not None, specifies the namespace that the root element + in all the must have. If any of the files has a different namespace, the + assertion fails. + """ + errors = [] + for file in self._get_files(dir): + f = open(file, 'rb') + try: data = f.read() + finally: f.close() + + if namespace is not None: + self._check_root_element_namespace(file, data, namespace) + + validate_data = self._get_validator_function_for_type(type) + try: + validate_data(data) + except Exception, e: + errors.append(file) + errors.append("Raised: %r" % e) + + if errors: + self.fail('\n'.join(errors)) + + def assert_schemavalidation_fails(self, type, dir, namespace=None, problem_type=None): + """ + Assert that schema validation fails for all the files in the given directory. + @param type: Type of the schema validation to perform, can be 'confml' or 'implml'. + @param dir: The directory containing the files to validate + @param namespace: If not None, specifies the namespace that the root element + in all the must have. If any of the files has a different namespace, the + assertion fails. + @param: problem_type: If not None, specifies the problem type that the + SchemaValidationError raised from validation must contain. + """ + errors = [] + for file in self._get_files(dir): + f = open(file, 'rb') + try: data = f.read() + finally: f.close() + + if namespace is not None: + self._check_root_element_namespace(file, data, namespace) + + validate_data = self._get_validator_function_for_type(type) + try: + validate_data(data) + errors.append(file) + except exceptions.XmlSchemaValidationError, e: + if problem_type is not None: + if e.problem_type != problem_type: + errors.append(file) + errors.append("Problem type was '%s', expected '%s'" % (e.problem_type, problem_type)) + + if errors: + self.fail('The following files were reported as valid when they should not have been:\n%s' % '\n'.join(errors)) + + + def _get_files(self, dir): + """ + Return a list of all files in the given directory. + @param dir: The directory. + @return: List of all files in the dir. Each entry has the + also the directory joined to it. + """ + files = [] + for name in os.listdir(dir): + path = os.path.join(dir, name) + if os.path.isfile(path): + files.append(path) + return files + + def _check_root_element_namespace(self, file_path, data, expected_namespace): + file_namespace, _ = utils.xml.get_xml_root(StringIO.StringIO(data)) + if file_namespace != expected_namespace: + msg = "Error testing schema validation with file '%s': "\ + "Root element namespace is not what was expected (expected '%s', got '%s')"\ + % (file_path, expected_namespace, file_namespace) + self.fail(msg) + + def _get_validator_function_for_type(self, type): + if type == 'implml': + return validate_implml_data + elif type == 'confml': + return validate_confml_data + else: + raise ValueError("Invalid schema validation type '%s', should be 'implml' or 'confml'" % type)