Skip to content

Commit

Permalink
Add writer to modernpython (#36)
Browse files Browse the repository at this point in the history
- Add generation of CGMESProfile class (including profile URIs) for
modernpython
- Improve class properties and attribute properties
- Improve cpp/java using the new attributes, remove unused functions
- Improve base.py and templates for modernpython using the new
attributes
- Add CIM writer to modernpython (to export CIM data into several files
based on CGMES profiles)
  • Loading branch information
m-mirz authored Oct 30, 2024
2 parents 0ef6919 + 6e17f10 commit f041302
Show file tree
Hide file tree
Showing 23 changed files with 834 additions and 764 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ your publications:
Dinkelbach, J., Razik, L., Mirz, M., Benigni, A., Monti, A.: Template-based
generation of programming language specific code for smart grid modelling
compliant with CIM and CGMES.
J. Eng. 2023, 113 (2022). [https://doi.org/10.1049/tje2.12208](https://doi.org/10.1049/tje2.12208)
J. Eng. 2023, 1-13 (2022). [https://doi.org/10.1049/tje2.12208](https://doi.org/10.1049/tje2.12208)
181 changes: 97 additions & 84 deletions cimgen/cimgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, jsonObject):
self.jsonDefinition = jsonObject
return

def asJson(self, lang_pack):
def asJson(self):
jsonObject = {}
if self.about() is not None:
jsonObject["about"] = self.about()
Expand All @@ -42,27 +42,18 @@ def asJson(self, lang_pack):
jsonObject["subClassOf"] = self.subClassOf()
if self.inverseRole() is not None:
jsonObject["inverseRole"] = self.inverseRole()
if self.associationUsed() is not None:
jsonObject["associationUsed"] = self.associationUsed()
if "modernpython" in lang_pack.__name__:
jsonObject["isAssociationUsed"] = self.isAssociationUsed()
jsonObject["is_used"] = _get_bool_string(self.is_used())
return jsonObject

def about(self):
if "$rdf:about" in self.jsonDefinition:
return RDFSEntry._get_rid_of_hash(RDFSEntry._get_about_or_resource(self.jsonDefinition["$rdf:about"]))
else:
return None

def associationUsed(self):
if "cims:AssociationUsed" in self.jsonDefinition:
return RDFSEntry._extract_string(self.jsonDefinition["cims:AssociationUsed"])
return _get_rid_of_hash(RDFSEntry._get_about_or_resource(self.jsonDefinition["$rdf:about"]))
else:
return None

# Capitalized True/False is valid in python but not in json.
# Do not use this function in combination with json.load()
def isAssociationUsed(self) -> bool:
def is_used(self) -> bool:
if "cims:AssociationUsed" in self.jsonDefinition:
return "yes" == RDFSEntry._extract_string(self.jsonDefinition["cims:AssociationUsed"]).lower()
else:
Expand Down Expand Up @@ -91,7 +82,7 @@ def dataType(self):

def domain(self):
if "rdfs:domain" in self.jsonDefinition:
return RDFSEntry._get_rid_of_hash(RDFSEntry._extract_string(self.jsonDefinition["rdfs:domain"]))
return _get_rid_of_hash(RDFSEntry._extract_string(self.jsonDefinition["rdfs:domain"]))
else:
return None

Expand All @@ -115,7 +106,7 @@ def title(self):

def inverseRole(self):
if "cims:inverseRoleName" in self.jsonDefinition:
return RDFSEntry._get_rid_of_hash(RDFSEntry._extract_string(self.jsonDefinition["cims:inverseRoleName"]))
return _get_rid_of_hash(RDFSEntry._extract_string(self.jsonDefinition["cims:inverseRoleName"]))
else:
return None

Expand All @@ -135,7 +126,7 @@ def label(self):

def multiplicity(self):
if "cims:multiplicity" in self.jsonDefinition:
return RDFSEntry._get_rid_of_hash(RDFSEntry._extract_string(self.jsonDefinition["cims:multiplicity"]))
return _get_rid_of_hash(RDFSEntry._extract_string(self.jsonDefinition["cims:multiplicity"]))
else:
return None

Expand Down Expand Up @@ -165,7 +156,7 @@ def version_iri(self):

def subClassOf(self):
if "rdfs:subClassOf" in self.jsonDefinition:
return RDFSEntry._get_rid_of_hash(RDFSEntry._extract_string(self.jsonDefinition["rdfs:subClassOf"]))
return _get_rid_of_hash(RDFSEntry._extract_string(self.jsonDefinition["rdfs:subClassOf"]))
else:
return None

Expand Down Expand Up @@ -209,23 +200,13 @@ def _get_about_or_resource(object_dic):
return object_dic["$rdfs:Literal"]
return object_dic

# Some names are encoded as #name or http://some-url#name
# This function returns the name
def _get_rid_of_hash(name):
tokens = name.split("#")
if len(tokens) == 1:
return tokens[0]
if len(tokens) > 1:
return tokens[1]
return name


class CIMComponentDefinition:
def __init__(self, rdfsEntry):
self.about = rdfsEntry.about()
self.attribute_list = []
self.comment = rdfsEntry.comment()
self.instance_list = []
self.enum_instance_list = []
self.origin_list = []
self.super = rdfsEntry.subClassOf()
self.subclasses = []
Expand All @@ -236,15 +217,15 @@ def attributes(self):
def addAttribute(self, attribute):
self.attribute_list.append(attribute)

def has_instances(self):
return len(self.instance_list) > 0
def is_an_enum_class(self):
return len(self.enum_instance_list) > 0

def instances(self):
return self.instance_list
def enum_instances(self):
return self.enum_instance_list

def addInstance(self, instance):
instance["index"] = len(self.instance_list)
self.instance_list.append(instance)
def add_enum_instance(self, instance):
instance["index"] = len(self.enum_instance_list)
self.enum_instance_list.append(instance)

def addAttributes(self, attributes):
for attribute in attributes:
Expand Down Expand Up @@ -273,7 +254,7 @@ def _simple_float_attribute(attr):
return attr["label"] == "value" and attr["dataType"] == "#Float"
return False

def is_a_float(self):
def is_a_float_class(self):
if self.about == "Float":
return True
simple_float = False
Expand Down Expand Up @@ -417,12 +398,12 @@ def _add_profile_to_packages(profile_name, short_profile_name, profile_uri_list)
package_listed_by_short_name[short_profile_name].extend(profile_uri_list)


def _parse_rdf(input_dic, version, lang_pack):
def _parse_rdf(input_dic, version): # NOSONAR
classes_map = {}
profile_name = ""
profile_uri_list = []
attributes = []
instances = []
enum_instances = []

global cim_namespace
if not cim_namespace:
Expand All @@ -434,15 +415,15 @@ def _parse_rdf(input_dic, version, lang_pack):
# Iterate over list elements
for list_elem in descriptions:
rdfsEntry = RDFSEntry(list_elem)
object_dic = rdfsEntry.asJson(lang_pack)
object_dic = rdfsEntry.asJson()
rdfs_entry_types = _rdfs_entry_types(rdfsEntry, version)

if "class" in rdfs_entry_types:
_add_class(classes_map, rdfsEntry)
if "property" in rdfs_entry_types:
attributes.append(object_dic)
if "rest_non_class_category" in rdfs_entry_types:
instances.append(object_dic)
enum_instances.append(object_dic)
if "profile_name_v2_4" in rdfs_entry_types:
profile_name = rdfsEntry.about()
if "profile_name_v3" in rdfs_entry_types:
Expand All @@ -467,13 +448,13 @@ def _parse_rdf(input_dic, version, lang_pack):
else:
logger.info("Class {} for attribute {} not found.".format(clarse, attribute))

# Add instances to corresponding class
for instance in instances:
clarse = RDFSEntry._get_rid_of_hash(instance["type"])
# Add enum instances to corresponding class
for instance in enum_instances:
clarse = _get_rid_of_hash(instance["type"])
if clarse and clarse in classes_map:
classes_map[clarse].addInstance(instance)
classes_map[clarse].add_enum_instance(instance)
else:
logger.info("Class {} for instance {} not found.".format(clarse, instance))
logger.info("Class {} for enum instance {} not found.".format(clarse, instance))

return {short_profile_name: classes_map}

Expand All @@ -486,31 +467,18 @@ def _write_python_files(elem_dict, lang_pack, output_path, version):
# Setup called only once: make output directory, create base class, create profile class, etc.
lang_pack.setup(output_path, _get_profile_details(package_listed_by_short_name), cim_namespace)

float_classes = {}
enum_classes = {}

# Iterate over Classes
for class_definition in elem_dict:
if elem_dict[class_definition].is_a_float():
float_classes[class_definition] = True
if elem_dict[class_definition].has_instances():
enum_classes[class_definition] = True

lang_pack.set_float_classes(float_classes)
lang_pack.set_enum_classes(enum_classes)

recommended_class_profiles = _get_recommended_class_profiles(elem_dict)

for class_name in elem_dict.keys():

class_details = {
"attributes": _find_multiple_attributes(elem_dict[class_name].attributes()),
"class_location": lang_pack.get_class_location(class_name, elem_dict, output_path),
"class_location": lang_pack.get_class_location(class_name, elem_dict, version),
"class_name": class_name,
"class_origin": elem_dict[class_name].origins(),
"instances": elem_dict[class_name].instances(),
"has_instances": elem_dict[class_name].has_instances(),
"is_a_float": elem_dict[class_name].is_a_float(),
"enum_instances": elem_dict[class_name].enum_instances(),
"is_an_enum_class": elem_dict[class_name].is_an_enum_class(),
"is_a_float_class": elem_dict[class_name].is_a_float_class(),
"langPack": lang_pack,
"sub_class_of": elem_dict[class_name].superClass(),
"sub_classes": elem_dict[class_name].subClasses(),
Expand All @@ -537,11 +505,22 @@ def _write_python_files(elem_dict, lang_pack, output_path, version):
initial_indent="",
subsequent_indent=" " * 6,
)
attribute_class = _get_attribute_class(attribute)
is_an_enum_class = attribute_class in elem_dict and elem_dict[attribute_class].is_an_enum_class()
attribute_type = _get_attribute_type(attribute, is_an_enum_class)
attribute["is_class_attribute"] = _get_bool_string(attribute_type == "class")
attribute["is_enum_attribute"] = _get_bool_string(attribute_type == "enum")
attribute["is_list_attribute"] = _get_bool_string(attribute_type == "list")
attribute["is_primitive_attribute"] = _get_bool_string(attribute_type == "primitive")
attribute["attribute_class"] = attribute_class

class_details["attributes"].sort(key=lambda d: d["label"])
_write_files(class_details, output_path, version)


def get_rid_of_hash(name):
# Some names are encoded as #name or http://some-url#name
# This function returns the name
def _get_rid_of_hash(name):
tokens = name.split("#")
if len(tokens) == 1:
return tokens[0]
Expand All @@ -550,13 +529,6 @@ def get_rid_of_hash(name):
return name


def format_class(_range, _dataType):
if _range == "":
return get_rid_of_hash(_dataType)
else:
return get_rid_of_hash(_range)


def _write_files(class_details, output_path, version):
if class_details["sub_class_of"] is None:
# If class has no subClassOf key it is a subclass of the Base class
Expand All @@ -576,15 +548,6 @@ def _write_files(class_details, output_path, version):
):
class_details["attributes"][i]["dataType"] = class_details["attributes"][i]["multiplicity"]

for attr in class_details["attributes"]:
_range = ""
_dataType = ""
if "range" in attr:
_range = attr["range"]
if "dataType" in attr:
_dataType = attr["dataType"]
attr["class_name"] = format_class(_range, _dataType)

class_details["langPack"].run_template(output_path, class_details)


Expand Down Expand Up @@ -740,7 +703,7 @@ def cim_generate(directory, output_path, version, lang_pack):

# parse RDF files and create a dictionary from the RDF file
parse_result = xmltodict.parse(xmlstring, attr_prefix="$", cdata_key="_", dict_constructor=dict)
parsed = _parse_rdf(parse_result, version, lang_pack)
parsed = _parse_rdf(parse_result, version)
profiles_array.append(parsed)

# merge multiple profile definitions into one profile
Expand All @@ -765,10 +728,7 @@ def cim_generate(directory, output_path, version, lang_pack):
# get information for writing python files and write python files
_write_python_files(class_dict_with_origins, lang_pack, output_path, version)

if "modernpython" in lang_pack.__name__:
lang_pack.resolve_headers(output_path, version)
else:
lang_pack.resolve_headers(output_path)
lang_pack.resolve_headers(output_path, version)

logger.info("Elapsed Time: {}s\n\n".format(time() - t0))

Expand Down Expand Up @@ -850,6 +810,7 @@ def _get_recommended_class_profiles(elem_dict):
profiles = [origin["origin"] for origin in attribute["attr_origin"]]
ambiguous_profile = len(profiles) > 1
for profile in profiles:
# Use condition attribute["is_used"]? For CGMES 2.4.13/2.4.15/3.0.0 the results wouldn't change!
if ambiguous_profile and profile in class_profiles:
profile_count_map.setdefault(profile, []).append(attribute["label"])
name = elem_dict[name].superClass()
Expand All @@ -862,3 +823,55 @@ def _get_recommended_class_profiles(elem_dict):
else:
recommended_class_profiles[class_name] = _get_sorted_profile_keys(class_profiles)[0]
return recommended_class_profiles


def _get_attribute_class(attribute: dict) -> str:
"""Get the class name of an attribute.
:param attribute: Dictionary with information about an attribute of a class.
:return: Class name of the attribute.
"""
name = attribute.get("range") or attribute.get("dataType")
return _get_rid_of_hash(name)


def _get_attribute_type(attribute: dict, is_an_enum_class: bool) -> str:
"""Get the type of an attribute: "class", "enum", "list", or "primitive".
:param attribute: Dictionary with information about an attribute of a class.
:param is_an_enum_class: Is this attribute an enumation?
:return: Type of the attribute.
"""
so_far_not_primitive = _get_attribute_class(attribute) in (
"Date",
"DateTime",
"MonthDay",
"Status",
"StreetAddress",
"StreetDetail",
"TownDetail",
)
attribute_type = "class"
if "dataType" in attribute and not so_far_not_primitive:
attribute_type = "primitive"
elif is_an_enum_class:
attribute_type = "enum"
elif attribute.get("multiplicity") in ("M:0..n", "M:1..n"):
attribute_type = "list"
return attribute_type


def _get_bool_string(bool_value: bool) -> str:
"""Convert boolean value into a string which is usable in both Python and Json.
Valid boolean values in Python are capitalized True/False.
But these values are not valid in Json.
Strings with value "true" and "" are recognized as True/False in both languages.
:param bool_value: Valid boolean value.
:return: String "true" for True and "" for False.
"""
if bool_value:
return "true"
else:
return ""
Loading

0 comments on commit f041302

Please sign in to comment.