diff --git a/cimgen/cimgen.py b/cimgen/cimgen.py index 73632da..abc12dc 100644 --- a/cimgen/cimgen.py +++ b/cimgen/cimgen.py @@ -21,6 +21,8 @@ def as_json(self) -> dict[str, str]: json_object = {} if self.about(): json_object["about"] = self.about() + if self.namespace(): + json_object["namespace"] = self.namespace() if self.comment(): json_object["comment"] = self.comment() if self.datatype(): @@ -52,6 +54,13 @@ def about(self) -> str: else: return "" + def namespace(self) -> str: + if "$rdf:about" in self.json_definition: + about = RDFSEntry._get_about_or_resource(self.json_definition["$rdf:about"]) + return about[: -len(self.about())] + else: + return "" + # Capitalized True/False is valid in python but not in json. # Do not use this function in combination with json.load() def is_used(self) -> bool: @@ -194,6 +203,7 @@ def __init__(self, rdfs_entry: RDFSEntry): self.superclass: str = rdfs_entry.subclass_of() self.subclass_list: list[str] = [] self.stereotype: str = rdfs_entry.stereotype() + self.namespace = rdfs_entry.namespace() def attributes(self) -> list[dict]: return self.attribute_list @@ -391,6 +401,13 @@ def _parse_rdf(input_dic: dict, version: str) -> dict[str, dict[str, CIMComponen for instance in enum_instances: clarse = _get_rid_of_hash(instance["type"]) if clarse and clarse in classes_map: + if instance.get("comment"): + instance["wrapped_comment"] = _wrap_and_clean( + instance["comment"], + width=100, + initial_indent="# ", + subsequent_indent=(" # "), + ) classes_map[clarse].add_enum_instance(instance) else: logger.info("Class {} for enum instance {} not found.".format(clarse, instance)) @@ -417,6 +434,7 @@ def _write_all_files( "class_location": lang_pack.get_class_location(class_name, elem_dict, version), "class_name": class_name, "class_origin": elem_dict[class_name].origins(), + "class_namespace": _get_namespace(elem_dict[class_name].namespace), "enum_instances": elem_dict[class_name].enum_instances(), "is_an_enum_class": elem_dict[class_name].is_an_enum_class(), "is_a_primitive_class": elem_dict[class_name].is_a_primitive_class(), @@ -455,6 +473,7 @@ def _write_all_files( attribute["is_primitive_attribute"] = _get_bool_string(attribute_type == "primitive") attribute["is_datatype_attribute"] = _get_bool_string(attribute_type == "datatype") attribute["attribute_class"] = attribute_class + attribute["attribute_namespace"] = _get_namespace(attribute["namespace"]) class_details["attributes"].sort(key=lambda d: d["label"]) _write_files(class_details, output_path) @@ -738,6 +757,14 @@ def _get_attribute_type(attribute: dict, class_infos: CIMComponentDefinition) -> return attribute_type +def _get_namespace(parsed_namespace: str) -> str: + if parsed_namespace == "#": + namespace = cim_namespace + else: + namespace = parsed_namespace + return namespace + + def _get_bool_string(bool_value: bool) -> str: """Convert boolean value into a string which is usable in both Python and Json. diff --git a/cimgen/languages/modernpython/lang_pack.py b/cimgen/languages/modernpython/lang_pack.py index 89c89ba..8fca48d 100644 --- a/cimgen/languages/modernpython/lang_pack.py +++ b/cimgen/languages/modernpython/lang_pack.py @@ -52,7 +52,7 @@ def _get_type_and_default(text: str, render: Callable[[str], str]) -> tuple[str, attribute_txt = render(text) attribute_json = eval(attribute_txt) if attribute_json["is_class_attribute"]: - return ("Optional[str]", "default=None") + return ("str | None", "default=None") elif attribute_json["is_list_attribute"]: return ("list", "default_factory=list") elif attribute_json["is_datatype_attribute"]: @@ -88,14 +88,6 @@ def _get_python_type(datatype: str) -> str: return "float" -def _set_imports(attributes: list[dict]) -> list[str]: - import_set = set() - for attribute in attributes: - if attribute["is_datatype_attribute"] or attribute["is_primitive_attribute"]: - import_set.add(attribute["attribute_class"]) - return sorted(import_set) - - def _set_datatype_attributes(attributes: list[dict]) -> dict: datatype_attributes = {} datatype_attributes["python_type"] = "None" @@ -137,13 +129,20 @@ def run_template(output_path: str, class_details: dict) -> None: template = class_template_file class_details["set_default"] = _set_default class_details["set_type"] = _set_type - class_details["imports"] = _set_imports(class_details["attributes"]) resource_file = _create_file(output_path, class_details, template) _write_templated_file(resource_file, class_details, template["filename"]) def _create_file(output_path: str, class_details: dict, template: dict[str, str]) -> Path: - resource_file = Path(output_path) / "resources" / (class_details["class_name"] + template["ext"]) + if ( + class_details["is_a_primitive_class"] + or class_details["is_a_datatype_class"] + or class_details["is_an_enum_class"] + ): + class_category = "types" + else: + class_category = "" + resource_file = Path(output_path) / "resources" / class_category / (class_details["class_name"] + template["ext"]) resource_file.parent.mkdir(exist_ok=True) return resource_file diff --git a/cimgen/languages/modernpython/templates/class_template.mustache b/cimgen/languages/modernpython/templates/class_template.mustache index e733da3..d3af0aa 100644 --- a/cimgen/languages/modernpython/templates/class_template.mustache +++ b/cimgen/languages/modernpython/templates/class_template.mustache @@ -3,7 +3,6 @@ Generated from the CGMES files via cimgen: https://github.com/sogno-platform/cim """ from functools import cached_property -from typing import Optional from pydantic import Field from pydantic.dataclasses import dataclass @@ -35,16 +34,17 @@ class {{class_name}}({{subclass_of}}): {{/attr_origin}} ], "is_used": {{#is_used}}True{{/is_used}}{{^is_used}}False{{/is_used}}, + "namespace": "{{attribute_namespace}}", # NOSONAR "is_class_attribute": {{#is_class_attribute}}True{{/is_class_attribute}}{{^is_class_attribute}}False{{/is_class_attribute}}, "is_datatype_attribute": {{#is_datatype_attribute}}True{{/is_datatype_attribute}}{{^is_datatype_attribute}}False{{/is_datatype_attribute}}, "is_enum_attribute": {{#is_enum_attribute}}True{{/is_enum_attribute}}{{^is_enum_attribute}}False{{/is_enum_attribute}}, "is_list_attribute": {{#is_list_attribute}}True{{/is_list_attribute}}{{^is_list_attribute}}False{{/is_list_attribute}}, "is_primitive_attribute": {{#is_primitive_attribute}}True{{/is_primitive_attribute}}{{^is_primitive_attribute}}False{{/is_primitive_attribute}}, {{#is_datatype_attribute}} - "attribute_class": {{attribute_class}}, + "attribute_class": "{{attribute_class}}", {{/is_datatype_attribute}} {{#is_primitive_attribute}} - "attribute_class": {{attribute_class}}, + "attribute_class": "{{attribute_class}}", {{/is_primitive_attribute}} }, ) diff --git a/cimgen/languages/modernpython/templates/datatype_template.mustache b/cimgen/languages/modernpython/templates/datatype_template.mustache index 5bf8881..e81ce7f 100644 --- a/cimgen/languages/modernpython/templates/datatype_template.mustache +++ b/cimgen/languages/modernpython/templates/datatype_template.mustache @@ -2,8 +2,8 @@ Generated from the CGMES files via cimgen: https://github.com/sogno-platform/cimgen """ -from ..utils.datatypes import CIMDatatype -from ..utils.profile import Profile +from ...utils.datatypes import CIMDatatype +from ...utils.profile import Profile {{#is_fixed_imports}} from .{{.}} import {{.}} {{/is_fixed_imports}} diff --git a/cimgen/languages/modernpython/templates/primitive_template.mustache b/cimgen/languages/modernpython/templates/primitive_template.mustache index be44ab0..2a1a758 100644 --- a/cimgen/languages/modernpython/templates/primitive_template.mustache +++ b/cimgen/languages/modernpython/templates/primitive_template.mustache @@ -3,8 +3,8 @@ Generated from the CGMES files via cimgen: https://github.com/sogno-platform/cim """ from datetime import date, datetime, time -from ..utils.datatypes import Primitive -from ..utils.profile import Profile +from ...utils.datatypes import Primitive +from ...utils.profile import Profile {{class_name}} = Primitive( name="{{class_name}}", diff --git a/cimgen/languages/modernpython/utils/base.py b/cimgen/languages/modernpython/utils/base.py index 8acd55f..75ad7a7 100644 --- a/cimgen/languages/modernpython/utils/base.py +++ b/cimgen/languages/modernpython/utils/base.py @@ -4,6 +4,7 @@ from functools import cached_property from typing import Any, TypeAlias, TypedDict +from lxml import etree from pydantic.dataclasses import dataclass from .config import cgmes_resource_config @@ -24,7 +25,7 @@ def possible_profiles(self) -> set[BaseProfile]: A resource can be used by multiple profiles. This is the set of profiles where this element can be found. """ - raise NotImplementedError("Method not implemented because not relevant in Base.") + return {self.recommended_profile} @cached_property def recommended_profile(self) -> BaseProfile: @@ -88,6 +89,24 @@ def apparent_name(cls) -> str: """ return cls.__name__ + def get_attribute_main_profile(self, attr: str) -> BaseProfile | None: + """Get the profile for this attribute of the CIM object. + + This function searches for the profile of an attribute for the CIM type of an object. + If the main profile of the type is a possible profile of the attribute it should be choosen. + Otherwise, the first profile in the list of possible profiles ordered by profile number. + + :param attr: Attribute to check + :return: Attribute profile. + """ + attr_profiles_map = self.possible_attribute_profiles + profiles = attr_profiles_map.get(attr, []) + if self.recommended_profile in profiles: + return self.recommended_profile + if profiles: + return sorted(profiles)[0] + return None + def cgmes_attribute_names_in_profile(self, profile: BaseProfile | None) -> set[Field]: """ Returns all fields accross the parent tree which are in the profile in parameter. @@ -131,39 +150,221 @@ def cgmes_attributes_in_profile(self, profile: BaseProfile | None) -> dict[str, for parent in reversed(self.__class__.__mro__[:-1]): for f in fields(parent): shortname = f.name - qualname = f"{parent.apparent_name()}.{shortname}" # type: ignore + qualname = f"{parent.apparent_name()}.{shortname}" + infos = dict() + if f not in self.cgmes_attribute_names_in_profile(profile) or shortname in seen_attrs: - # Wrong profile or already found from a parent. continue - else: - # Namespace finding - # "class namespace" means the first namespace defined in the inheritance tree. - # This can go up to Base, which will give the default cim NS. - if (extra := getattr(f.default, "json_schema_extra", None)) is None: - # The attribute does not have extra metadata. It might be a custom atttribute - # without it, or a base type (int...). - # Use the class namespace. - namespace = self.namespace - elif (attr_ns := extra.get("namespace", None)) is None: - # The attribute has some extras, but not namespace. - # Use the class namespace. - namespace = self.namespace - else: + + # Namespace finding + # "class namespace" means the first namespace defined in the inheritance tree. + # This can go up to Base, which will give the default cim NS. + infos["namespace"] = self.namespace + extra = getattr(f.default, "json_schema_extra", None) + if extra and extra.get("is_used"): + # adding the extras, used for xml generation + extra_info = { + "attr_name": qualname, + "is_class_attribute": extra.get("is_class_attribute"), + "is_enum_attribute": extra.get("is_enum_attribute"), + "is_list_attribute": extra.get("is_list_attribute"), + "is_primitive_attribute": extra.get("is_primitive_attribute"), + "is_datatype_attribute": extra.get("is_datatype_attribute"), + "attribute_class": extra.get("attribute_class"), + "attribute_main_profile": self.get_attribute_main_profile(shortname), + } + if extra.get("namespace"): # The attribute has an explicit namesapce - namespace = attr_ns + extra_info["namespace"] = extra.get("namespace", self.namespace) + infos.update(extra_info) + + infos["value"] = getattr(self, shortname) - qual_attrs[qualname] = CgmesAttribute( - value=getattr(self, shortname), - namespace=namespace, - ) - seen_attrs.add(shortname) + qual_attrs[qualname] = CgmesAttribute(infos) # type: ignore + seen_attrs.add(shortname) return qual_attrs + def to_xml(self, profile_to_export: BaseProfile, id: str | None = None) -> etree._Element | None: + """Creates an etree element of self with all non-empty attributes of the profile_to_export + that are not already defined in the recommanded profile + This can then be used to generate the xml file of the profile_to_export + :param profile_to_export: Profile for which we want to obtain the xml tree (eg. Profile.EQ) + :param id: "mRID" some objects don't have mRID attribute. Defaults to None. + :return: etree describing self for the profile_to_export, None if nothing to export + """ + profile_attributes = self._get_attributes_to_export(profile_to_export) + + if "mRID" in self.to_dict(): + obj_id = self.mRID # type: ignore + else: + obj_id = id + + # if no attribute to export or no mRID, return None + if profile_attributes == {} or obj_id is None: + root = None + else: + obj_id = "_" + obj_id + # Create root element + nsmap = NAMESPACES + root = etree.Element("{" + self.namespace + "}" + self.resource_name, nsmap=nsmap) + + # Add the ID as attribute to the root + if self.recommended_profile.value == profile_to_export.value: + root.set(f"""{{{nsmap["rdf"]}}}""" + "ID", obj_id) + else: + root.set(f"""{{{nsmap["rdf"]}}}""" + "about", "#" + obj_id) + + root = self._add_attribute_to_etree(attributes=profile_attributes, root=root, nsmap=nsmap) + return root + + def _get_attributes_to_export(self, profile_to_export: BaseProfile) -> dict: + attributes_to_export = {} + attributes_in_profile = self.cgmes_attributes_in_profile(profile_to_export) + for key, attribute in attributes_in_profile.items(): + if attribute["attribute_main_profile"] == profile_to_export: + attributes_to_export[key] = attribute + attributes_to_export = self._remove_empty_attributes(attributes_to_export) + return attributes_to_export + + @staticmethod + def _remove_empty_attributes(attributes: dict) -> dict: + for key, attribute in list(attributes.items()): + # Remove empty attributes + if attribute["value"] in [None, "", []]: + del attributes[key] + elif attribute.get("attribute_class") and attribute["attribute_class"] == "Boolean": + attribute["value"] = str(attribute["value"]).lower() + return attributes + + @staticmethod + def _add_attribute_to_etree(attributes: dict, root: etree._Element, nsmap: dict) -> etree._Element: + rdf_namespace = f"""{{{nsmap["rdf"]}}}""" + for field_name, attribute in attributes.items(): + # add all attributes relevant to the profile as SubElements + attr_namespace = attribute["namespace"] + element_name = f"{{{attr_namespace}}}{field_name}" + + if attribute["is_class_attribute"]: + # class_attributes are exported as rdf: resource #mRID_of_target + element = etree.SubElement(root, element_name) + element.set(rdf_namespace + "resource", "#_" + attribute["value"]) + elif attribute["is_enum_attribute"]: + element = etree.SubElement(root, element_name) + element.set(rdf_namespace + "resource", nsmap["cim"] + attribute["value"]) + elif attribute["is_list_attribute"]: + for item in attribute["value"]: + element = etree.SubElement(root, element_name) + element.text = str(item) + else: + element = etree.SubElement(root, element_name) + element.text = str(attribute["value"]) + return root + def __str__(self) -> str: """Returns the string representation of this resource.""" return "\n".join([f"{k}={v}" for k, v in sorted(self.to_dict().items())]) + def _parse_xml_fragment(self, xml_fragment: str) -> dict: + """parses an xml fragment into a dict defining the class attributes + + :param xml_fragment: xml string defining an instance of the current class + :return: a dictionnary of attributes to create/update the class instance + """ + attribute_dict = {} + xml_tree = etree.fromstring(xml_fragment) + + attribute_dict.update(self._extract_mrid_from_etree(xml_tree=xml_tree)) + + # parsing attributes defined in class + class_attributes = self.cgmes_attributes_in_profile(None) + for key, class_attribute in class_attributes.items(): + xml_attributes = xml_tree.findall(".//{*}" + key) + if not xml_attributes: + continue + + attr = key.rsplit(".")[-1] + + attr_value = self._extract_attr_value_from_etree(attr, class_attribute, xml_attributes) + + if hasattr(self, attr) and attr_value not in [None, []]: + attribute_dict[attr] = attr_value + + return attribute_dict + + def _extract_mrid_from_etree(self, xml_tree: etree._Element) -> dict: + """Parsing the mRID from etree attributes""" + mrid_dict = {} + for key, value in xml_tree.attrib.items(): + if key.endswith("ID") or key.endswith("about"): + if value.startswith("#"): + value = value[1:] + if value.startswith("_"): + value = value[1:] + if hasattr(self, "mRID") and value is not None: + mrid_dict = {"mRID": value} + return mrid_dict + + def _extract_attr_value_from_etree( + self, attr_name: str, class_attribute: "CgmesAttribute", xml_attributes: list[etree._Element] + ): + """Parsing the attribute value from etree attributes""" + attr_value = None + # class attributes are pointing to another class/instance defined in .attrib + if class_attribute["is_class_attribute"]: + attr_value = xml_attributes[0].values()[0] + attr_value = attr_value.lstrip("#_") + + # enum attributes are defined in .attrib and has a prefix ending in "#" + elif class_attribute["is_enum_attribute"]: + attr_value = xml_attributes[0].values()[0] + attr_value = attr_value.split("#")[-1] + + elif class_attribute["is_list_attribute"]: + attr_value = [] + for item in xml_attributes: + item_value = item.values()[0] + attr_value.append(item_value.split("#")[-1]) + + elif class_attribute["is_primitive_attribute"] or class_attribute["is_datatype_attribute"]: + attr_value = xml_attributes[0].text + if self.__dataclass_fields__[attr_name].type == bool: + attr_value = {"true": True, "false": False}.get(attr_value, None) + else: + # types are int, float or str (date, time and datetime treated as str) + attr_value = self.__dataclass_fields__[attr_name].type(attr_value) + else: + # other attributes types are defined in .text + attr_value = xml_attributes[0].text + return attr_value + + def update_from_xml(self, xml_fragment: str): + """ + Updates the instance by parsing an xml fragment defining the attributes of this instance + example: updating the instance by parsing the corresponding fragment from the SSH profile + """ + attribute_dict = self._parse_xml_fragment(xml_fragment) + + if attribute_dict["mRID"] == self.mRID: # type: ignore + for key, value in attribute_dict.items(): + attr = getattr(self, key) + if isinstance(attr, list): + getattr(self, key).extend(value) + else: + setattr(self, key, value) + + @classmethod + def from_xml(cls, xml_fragment: str): + """ + Returns an instance of the class from an xml fragment defining the attributes written in the form: + <cim:IdentifiedObject.name>...</cim:IdentifiedObject.name> + example: creating an instance by parsing a fragment from the EQ profile + """ + attribute_dict = cls()._parse_xml_fragment(xml_fragment) + + # Instantiate the class with the dictionary + return cls(**attribute_dict) + @staticmethod def get_extra_prop(field: Field, prop: str) -> Any: # The field is defined as a pydantic field, not a dataclass field, @@ -184,3 +385,11 @@ class CgmesAttribute(TypedDict): # Custom attributes might have something different, given as metadata. # See readme for more information. namespace: str + attr_name: str + is_class_attribute: bool + is_enum_attribute: bool + is_list_attribute: bool + is_primitive_attribute: bool + is_datatype_attribute: bool + attribute_class: str + attribute_main_profile: BaseProfile diff --git a/cimgen/languages/modernpython/utils/datatypes.py b/cimgen/languages/modernpython/utils/datatypes.py index 04f3442..d9a93aa 100644 --- a/cimgen/languages/modernpython/utils/datatypes.py +++ b/cimgen/languages/modernpython/utils/datatypes.py @@ -1,27 +1,25 @@ -from typing import List, Optional, Union - from pydantic import Field from pydantic.dataclasses import dataclass +from .constants import NAMESPACES -from ..resources.Currency import Currency -from ..resources.UnitMultiplier import UnitMultiplier -from ..resources.UnitSymbol import UnitSymbol from .config import cgmes_resource_config -from .constants import NAMESPACES from .profile import BaseProfile +from ..resources.types.UnitMultiplier import UnitMultiplier +from ..resources.types.UnitSymbol import UnitSymbol +from ..resources.types.Currency import Currency @dataclass(config=cgmes_resource_config) class Primitive: name: str = Field(frozen=True) type: object = Field(frozen=True) + profiles: list[BaseProfile] = Field(frozen=True) namespace: str = Field(frozen=True, default=NAMESPACES["cim"]) - profiles: List[BaseProfile] = Field(frozen=True) @dataclass(config=cgmes_resource_config) class CIMDatatype(Primitive): - multiplier: Optional[UnitMultiplier] = Field(default=None, frozen=True) - unit: Optional[Union[UnitSymbol, Currency]] = Field(default=None, frozen=True) - denominatorMultiplier: Optional[UnitMultiplier] = Field(default=None, frozen=True) - denominatorUnit: Optional[UnitSymbol] = Field(default=None, frozen=True) + multiplier: UnitMultiplier | None = Field(default=None, frozen=True) + unit: UnitSymbol | Currency | None = Field(default=None, frozen=True) + denominatorMultiplier: UnitMultiplier | None = Field(default=None, frozen=True) + denominatorUnit: UnitSymbol | None = Field(default=None, frozen=True) diff --git a/cimgen/languages/modernpython/utils/reader.py b/cimgen/languages/modernpython/utils/reader.py new file mode 100644 index 0000000..2619b65 --- /dev/null +++ b/cimgen/languages/modernpython/utils/reader.py @@ -0,0 +1,175 @@ +import logging +from importlib import import_module +from importlib.util import find_spec +from typing import Literal + +from lxml import etree +from pydantic import BaseModel, Field + +from .profile import Profile + +logger = logging.getLogger(__name__) + + +class Reader(BaseModel): + """Parses profiles to create the corresponding python objects + + :param cgmes_version_path: Path to the cgmes resources folder containing the class definition + :param custom_namespaces: {"namespace_prefix": "namespace_uri"} + :param custom_folder: "path_to_custom_resources_folder" + """ + + cgmes_version_path: str = "pycgmes.resources" + custom_namespaces: dict[str, str] | None = None + custom_folder: str | None = None + logger_grouped: dict[str, dict[str, int]] = Field(default_factory=lambda: {"errors": {}, "info": {}}) + import_result: dict = Field(default_factory=lambda: {"meta_info": {}, "topology": {}}) + + def parse_profiles(self, xml_files: list[str], start_dict: dict | None = None): + """Parses all profiles contained in xml_files and returns a list containing + all the objects defined in the profiles "mRID": Object\n + Errors encounterd in the parsing can be recovered in Reader.logger_grouped + + :param xml_files: List of the path to all the profiles to parse + :param start_dict: To parse profiles on top of an existing list dict(meta_info, topology) + + :return: ["topology": dict of all the objects defined in the profiles {"mRID": Object}, "meta_info"] + """ + if start_dict: + self.import_result = start_dict + self.import_result["meta_info"] = dict(namespaces=self._get_namespaces(xml_files[0]), urls={}) + namespace_rdf = self._get_rdf_namespace() + + bases = ["{" + self.import_result["meta_info"]["namespaces"]["cim"] + "}"] + if self.custom_namespaces: + for custom_namespace in self.custom_namespaces.values(): + bases.append("{" + custom_namespace + "}") + bases = tuple(bases) + + for xml_file in xml_files: + self._instantiate_classes(xml_file=xml_file, bases=bases, namespace_rdf=namespace_rdf) + return self.import_result + + def _instantiate_classes(self, xml_file: str, bases: tuple, namespace_rdf: str): + """creates/updates the python objects with the information of xml_file + + :param xml_file: Path to the profile + :param bases: Contains the possible namespaces uris defining the classes, can be custom + :param namespace_rdf: rdf namespace uri + """ + context = etree.iterparse(xml_file, ("start", "end")) + level = 0 + + for event, elem in context: + if event == "end": + level -= 1 + if event == "start": + level += 1 + + class_namespace = next((namespace for namespace in bases if str(elem.tag).startswith(namespace)), None) + if event == "start" and class_namespace is not None and level == 2: + class_name, uuid = self._extract_classname_uuid(elem, class_namespace, namespace_rdf) + if uuid is not None: + self._process_element(class_name, uuid, elem) + # Check which package is read + elif event == "end": + self._check_metadata(elem) + + @staticmethod + def _extract_classname_uuid(elem, class_namespace: str, namespace_rdf: str) -> tuple: + """Extracts class name and instance uuid ("mRID") + + :param elem: description of the instance for the given profile + :param class_namespace: namespace uri defining the class + :param namespace_rdf: rdf namespace uri + + :return: class_name, uuid (example "ACLineSgement", instance_uuid: "mRID") + """ + class_name = elem.tag[len(class_namespace) :] + uuid = elem.get("{%s}ID" % namespace_rdf) + if uuid is None: + uuid = elem.get("{%s}about" % namespace_rdf) + if uuid and uuid.startswith("#"): + uuid = uuid[1:] + if uuid and uuid.startswith("_"): + uuid = uuid[1:] + return class_name, uuid + + def _process_element(self, class_name: str, uuid: str, elem): + """Creates or updates (if an object with the same uuid exists) + an instance of the class based on the fragment of the profile + + :param class_name: Name of the class of the instance to create/update (example: ACLineSegment) + :param uuid: mRID + :param elem: description of the instance for the given profile + """ + topology = self.import_result["topology"] + elem_str = etree.tostring(elem, encoding="utf-8") + try: + # Import the module for the CGMES object. + module_name = self._get_path_to_module(class_name) + module = import_module(module_name) + + klass = getattr(module, class_name) + if uuid not in topology: + topology[uuid] = klass().from_xml(elem_str) + info_msg = "CIM object {} created".format(module_name.split(".")[-1]) + else: + obj = topology[uuid] + obj.update_from_xml(elem_str) + info_msg = "CIM object {} updated".format(module_name.split(".")[-1]) + self._log_message("info", info_msg) + + except ModuleNotFoundError: + error_msg = "Module {} not implemented".format(class_name) + self._log_message("errors", error_msg) + except Exception as e: + error_msg = "Could not create/update {}, {}".format(uuid, e) + self._log_message("errors", error_msg) + + def _check_metadata(self, elem): + if "Model.profile" in elem.tag: + for package_key in [e.value for e in Profile]: + if package_key in elem.text: + break + # the author of all imported files should be the same, avoid multiple entries + elif "author" not in self.import_result["meta_info"].keys(): + if any(author_field in elem.tag for author_field in ("Model.createdBy", "Model.modelingAuthoritySet")): + self.import_result["meta_info"]["author"] = elem.text + + # Returns a map of class_namespace to namespace for the given XML file. + @staticmethod + def _get_namespaces(source) -> dict: + """Recovers the namespaces defined at the beginning of the xml file without reading the complete file.""" + namespaces = {} + events = ("end", "start-ns") + for event, elem in etree.iterparse(source, events): + if event == "start-ns" and elem is not None: + # Corresponds to the attributes defined in <rdf:RDF ...> + class_namespace, ns = elem + namespaces[class_namespace] = ns + elif event == "end": + # Stops once the first attribute that is not a namespace is read (</...>) + break + + return namespaces + + # Returns the RDF Namespace from the namespaces dictionary + def _get_rdf_namespace(self) -> str: + try: + namespace = self.import_result["meta_info"]["namespaces"]["rdf"] + except KeyError: + namespace = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" # NOSONAR + logger.warning("No rdf namespace found. Using %s" % namespace) + return namespace + + def _get_path_to_module(self, class_name: str) -> str: + if self.custom_folder and find_spec(self.custom_folder + "." + class_name): + path_to_module = self.custom_folder + "." + class_name + else: + path_to_module = self.cgmes_version_path + "." + class_name + return path_to_module + + def _log_message(self, log_type: Literal["errors", "info"], message: str): + self.logger_grouped[log_type].setdefault(message, 0) + self.logger_grouped[log_type][message] += 1 diff --git a/cimgen/languages/modernpython/utils/writer.py b/cimgen/languages/modernpython/utils/writer.py new file mode 100644 index 0000000..c4545ec --- /dev/null +++ b/cimgen/languages/modernpython/utils/writer.py @@ -0,0 +1,99 @@ +from lxml import etree +from pydantic import BaseModel + +from .constants import NAMESPACES +from .profile import BaseProfile, Profile + + +class Writer(BaseModel): + """Class for writing CIM RDF/XML files + + :param objects: Mapping {mRID: CIM object} + :param writer_metadata: Any additional data in header (default: {"modelingAuthoritySet": "www.sogno.energy"}) + """ + + objects: dict + writer_metadata: dict[str, str] = {"modelingAuthoritySet": "www.sogno.energy"} + + def write( + self, + outputfile: str, + model_id: str, + custom_profiles: list[BaseProfile] = [], + custom_namespaces: dict[str, str] = {}, + ) -> dict[BaseProfile, str]: + """Write CIM RDF/XML files. + This function writes CIM objects into one or more RDF/XML files separated by profiles. + Each CIM object will be written to its corresponding profile file depending on class_profile_map. + But some objects to more than one file if some attribute profiles are not the same as the class profile. + + :param outputfile: Stem of the output file, resulting files: <outputfile>_<profile.long_name>.xml. + :param model_id: Stem of the model IDs, resulting IDs: <model_id>_<profile.long_name>. + :param custom_profiles: List of custom profiles to export. + :param custom_namespaces: {"namespace_prefix": "namespace_uri"} + + :return: Mapping of profile to outputfile. + """ + profile_list: list[BaseProfile] = list(Profile) + profile_list += {p for p in custom_profiles if p not in profile_list} + profile_file_map: dict[BaseProfile, str] = {} + for profile in profile_list: + profile_name = profile.long_name + full_file_name = outputfile + "_" + profile.long_name + ".xml" + output = self._generate(profile, model_id + "_" + profile_name, custom_namespaces) + if output: + output.write(full_file_name, pretty_print=True, xml_declaration=True, encoding="utf-8") + profile_file_map[profile] = full_file_name + return profile_file_map + + def _generate( + self, profile: BaseProfile, model_id: str, custom_namespaces: dict[str, str] = {} + ) -> etree._ElementTree | None: + """Write CIM objects as RDF/XML data to a string. + + This function creates RDF/XML tree corresponding to one profile. + + :param profile: Only data for this profile should be written. + :param model_id: Stem of the model IDs, resulting IDs: <modelID>_<profileName>. + :param custom_namespaces: {"namespace_prefix": "namespace_uri"} + + :return: etree of the profile + """ + fullmodel = { + "id": model_id, + "Model": self.writer_metadata, + } + fullmodel["Model"].update({"profile": profile.uris}) + + nsmap = NAMESPACES + nsmap.update(custom_namespaces) + + rdf_namespace = f"""{{{nsmap["rdf"]}}}""" + md_namespace = f"""{{{nsmap["md"]}}}""" + + root = etree.Element(rdf_namespace + "RDF", nsmap=nsmap) + + # FullModel header + model = etree.Element(md_namespace + "FullModel", nsmap=nsmap) + model.set(rdf_namespace + "about", fullmodel["id"]) + for key, value in fullmodel["Model"].items(): + if isinstance(value, list): + for item in value: + element = etree.SubElement(model, md_namespace + "Model." + key) + element.text = item + else: + element = etree.SubElement(model, md_namespace + "Model." + key) + element.text = value + root.append(model) + + count = 0 + for id, obj in self.objects.items(): + obj_etree = obj.to_xml(profile_to_export=profile, id=id) + if obj_etree is not None: + root.append(obj_etree) + count += 1 + if count > 0: + output = etree.ElementTree(root) + else: + output = None + return output