Skip to content

Commit

Permalink
Add python CSA DM XML parsing support for derived clusters (#30036)
Browse files Browse the repository at this point in the history
* Start preparing to store derived clusters

* reformat, make sure parsing works

* More leniency to allow ModeBase parsing

* More leniency and documentation, be ready to attach base clusters

* Refactor to add some separate derivation logic

* Restyle

* More work on actually handling inheritance

* Restyle

* Implement actual base class derivation

* Add unit test for derived, make diffs a LOT better

* Restyle

* Switch to unified diff for a nicer diff view

* Return after the first assert

* Make type checker happy at places

* Make mypy happy even on an edge case

* Fix linter errors

* Restyle

* more typing for attrs

* Some name changes for base clusters: use abstract to make it clear that other clusters could be base too

* Also change variable name

---------

Co-authored-by: Andrei Litvin <[email protected]>
  • Loading branch information
2 people authored and pull[bot] committed Jan 11, 2024
1 parent b317ce0 commit 1912837
Show file tree
Hide file tree
Showing 9 changed files with 510 additions and 64 deletions.
1 change: 1 addition & 0 deletions scripts/py_matter_idl/files.gni
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ matter_idl_generator_sources = [
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/__init__.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/base.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/context.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/derivation.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/handlers.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/data_model_xml/handlers/parsing.py",
"${chip_root}/scripts/py_matter_idl/matter_idl/generators/__init__.py",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from xml.sax.xmlreader import AttributesImpl

from matter_idl.matter_idl_types import Idl

from .base import BaseHandler
from .context import Context
from .handlers import ClusterHandler
from .parsing import NormalizeName

LOGGER = logging.getLogger('data-model-xml-data-parsing')


def contains_valid_cluster_id(attrs: AttributesImpl) -> bool:
# Does not check numeric format ... assuming scraper is smart enough for that
return 'id' in attrs and len(attrs['id']) > 0


class DataModelXmlHandler(BaseHandler):
Expand All @@ -27,8 +38,14 @@ def __init__(self, context: Context, idl: Idl):
super().__init__(context)
self._idl = idl

def GetNextProcessor(self, name, attrs):
def GetNextProcessor(self, name, attrs: AttributesImpl):
if name.lower() == 'cluster':
return ClusterHandler(self.context, self._idl, attrs)
if contains_valid_cluster_id(attrs):
return ClusterHandler.ForAttributes(self.context, self._idl, attrs)

LOGGER.info(
"Found an abstract base cluster (no id): '%s'", attrs['name'])

return ClusterHandler.IntoCluster(self.context, self._idl, self.context.AddAbstractBaseCluster(NormalizeName(attrs['name']), self.context.GetCurrentLocationMeta()))
else:
return BaseHandler(self.context)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import xml.sax.xmlreader
from typing import List, Optional

from matter_idl.matter_idl_types import Idl, ParseMetaData
from matter_idl.matter_idl_types import Cluster, ClusterSide, Idl, ParseMetaData


class IdlPostProcessor:
Expand Down Expand Up @@ -82,6 +82,19 @@ def __init__(self, locator: Optional[xml.sax.xmlreader.Locator] = None):
self.file_name = None
self._not_handled: set[str] = set()
self._idl_post_processors: list[IdlPostProcessor] = []
self.abstract_base_clusters: dict[str, Cluster] = {}

def AddAbstractBaseCluster(self, name: str, parse_meta: Optional[ParseMetaData] = None) -> Cluster:
"""Creates a new cluster entry for the given name in the list of known
base clusters.
"""
assert name not in self.abstract_base_clusters # be unique

cluster = Cluster(side=ClusterSide.CLIENT, name=name,
code=-1, parse_meta=parse_meta)
self.abstract_base_clusters[name] = cluster

return cluster

def GetCurrentLocationMeta(self) -> Optional[ParseMetaData]:
if not self.locator:
Expand Down
173 changes: 173 additions & 0 deletions scripts/py_matter_idl/matter_idl/data_model_xml/handlers/derivation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#
# Copyright (c) 2023 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from typing import Iterable, Optional, Protocol, TypeVar

from matter_idl.matter_idl_types import Attribute, Bitmap, Cluster, Command, Enum, Event, Idl, Struct

from .context import Context, IdlPostProcessor
from .parsing import NormalizeName

LOGGER = logging.getLogger('data-model-xml-data-parsing')

T = TypeVar("T")


class HasName(Protocol):
name: str


NAMED = TypeVar('NAMED', bound=HasName)


def get_item_with_name(items: Iterable[NAMED], name: str) -> Optional[NAMED]:
"""Find an item with the given name.
Returns none if that item does not exist
"""
for item in items:
if item.name == name:
return item
return None


def merge_enum_into(e: Enum, cluster: Cluster):
existing = get_item_with_name(cluster.enums, e.name)

if existing:
# Remove existing but merge constants into e
cluster.enums.remove(existing)
for value in existing.entries:
if not get_item_with_name(e.entries, value.name):
e.entries.append(value)

cluster.enums.append(e)


def merge_bitmap_into(b: Bitmap, cluster: Cluster):
existing = get_item_with_name(cluster.bitmaps, b.name)

if existing:
# Remove existing but merge constants into e
cluster.bitmaps.remove(existing)
for value in existing.entries:
if not get_item_with_name(b.entries, value.name):
b.entries.append(value)

cluster.bitmaps.append(b)


def merge_event_into(e: Event, cluster: Cluster):
existing = get_item_with_name(cluster.events, e.name)
if existing:
LOGGER.error("TODO: Do not know how to merge event for %s::%s",
cluster.name, existing.name)
cluster.events.remove(existing)

cluster.events.append(e)


def merge_attribute_into(a: Attribute, cluster: Cluster):
existing: Optional[Attribute] = None
for existing_a in cluster.attributes:
if existing_a.definition.name == a.definition.name:
existing = existing_a
break

if existing:
# Do not provide merging as it seems only conformance is changed from
# the base cluster
#
# This should fix the correct types
#
# LOGGER.error("TODO: Do not know how to merge attribute for %s::%s", cluster.name, existing.definition.name)
cluster.attributes.remove(existing)

cluster.attributes.append(a)


def merge_struct_into(s: Struct, cluster: Cluster):
existing = get_item_with_name(cluster.structs, s.name)
if existing:
# Do not provide merging as it seems XML only adds
# constraints and conformance to struct elements
#
# TODO: at some point we may be able to merge some things,
# if we find that derived clusters actually add useful things here
#
# LOGGER.error("TODO: Do not know how to merge structs for %s::%s", cluster.name, existing.name)
cluster.structs.remove(existing)

cluster.structs.append(s)


def merge_command_into(c: Command, cluster: Cluster):
existing = get_item_with_name(cluster.commands, c.name)

if existing:
LOGGER.error("TODO: Do not know how to merge command for %s::%s",
cluster.name, existing.name)
cluster.commands.remove(existing)

cluster.commands.append(c)


def inherit_cluster_data(from_cluster: Cluster, into_cluster: Cluster):
for e in from_cluster.enums:
merge_enum_into(e, into_cluster)

for b in from_cluster.bitmaps:
merge_bitmap_into(b, into_cluster)

for ev in from_cluster.events:
merge_event_into(ev, into_cluster)

for a in from_cluster.attributes:
merge_attribute_into(a, into_cluster)

for s in from_cluster.structs:
merge_struct_into(s, into_cluster)

for c in from_cluster.commands:
merge_command_into(c, into_cluster)


class AddBaseInfoPostProcessor(IdlPostProcessor):
def __init__(self, destination_cluster: Cluster, source_cluster_name: str, context: Context):
self.destination = destination_cluster
self.source_name = NormalizeName(source_cluster_name)
self.context = context

def FinalizeProcessing(self, idl: Idl):
# attempt to find the base. It may be in the "names without ID" however it may also be inside
# existing clusters (e.g. Basic Information)
base: Optional[Cluster] = None
if self.source_name in self.context.abstract_base_clusters:
base = self.context.abstract_base_clusters[self.source_name]
else:
for c in idl.clusters:
if c.name == self.source_name:
base = c
break

if not base:
LOGGER.error(
"Could not find the base cluster named '%s'", self.source_name)
return

LOGGER.info("Copying base data from '%s' into '%s'",
base.name, self.destination.name)
inherit_cluster_data(from_cluster=base, into_cluster=self.destination)
Loading

0 comments on commit 1912837

Please sign in to comment.