From 1ae80039af08409262e0bb0be34b8d69347aec9e Mon Sep 17 00:00:00 2001 From: Andrei Litvin Date: Fri, 23 Sep 2022 16:50:55 -0400 Subject: [PATCH] Looking to split out logic for processing for zapxml --- scripts/idl/xml_parser.py | 761 +----------------------- scripts/idl/zapxml/__init__.py | 14 + scripts/idl/zapxml/handlers/__init__.py | 16 + scripts/idl/zapxml/handlers/context.py | 94 +++ scripts/idl/zapxml/handlers/handlers.py | 698 ++++++++++++++++++++++ 5 files changed, 827 insertions(+), 756 deletions(-) create mode 100644 scripts/idl/zapxml/__init__.py create mode 100644 scripts/idl/zapxml/handlers/__init__.py create mode 100644 scripts/idl/zapxml/handlers/context.py create mode 100644 scripts/idl/zapxml/handlers/handlers.py diff --git a/scripts/idl/xml_parser.py b/scripts/idl/xml_parser.py index 8fc85ca64a2d32..405a4344da5082 100755 --- a/scripts/idl/xml_parser.py +++ b/scripts/idl/xml_parser.py @@ -25,766 +25,15 @@ from typing import Optional, Union, List try: - from idl.matter_idl_types import * + from idl.zapxml.handlers import Context, ZapXmlHandler except: import sys sys.path.append(os.path.abspath( os.path.join(os.path.dirname(__file__), '..'))) - from idl.matter_idl_types import * - - -class HandledDepth: - """Defines how deep a XML element has been handled.""" - NOT_HANDLED = enum.auto() # Unknown/parsed element - ENTIRE_TREE = enum.auto() # Entire tree can be ignored - SINGLE_TAG = enum.auto() # Single tag processed, but not sub-items - - -def ParseInt(value: str) -> int: - if value.startswith('0x'): - return int(value[2:], 16) - else: - return int(value) - - -def ParseAclRole(attrs) -> AccessPrivilege: - # XML seems to use both role and privilege to mean the same thing - # they are used interchangeably - if 'role' in attrs: - role = attrs['role'] - else: - role = attrs['privilege'] - - if role.lower() == 'view': - return AccessPrivilege.VIEW - elif role.lower() == 'operate': - return AccessPrivilege.OPERATE - elif role.lower() == 'manage': - return AccessPrivilege.MANAGE - elif role.lower() == 'administer': - return AccessPrivilege.ADMINISTER - else: - raise Exception('Unknown ACL role: %r' % role) - - -def AttrsToAttribute(attrs) -> Attribute: - if attrs['type'].lower() == 'array': - data_type = DataType(name=attrs['entryType']) - else: - data_type = DataType(name=attrs['type']) - - if 'length' in attrs: - data_type.max_length = ParseInt(attrs['length']) - - field = Field( - data_type=data_type, - code=ParseInt(attrs['code']), - name=None, - is_list=(attrs['type'].lower() == 'array') - ) - - attribute = Attribute(definition=field) - - if attrs.get('optional', "false").lower() == 'true': - attribute.definition.attributes.add(FieldAttribute.OPTIONAL) - - if attrs.get('isNullable', "false").lower() == 'true': - attribute.definition.attributes.add(FieldAttribute.NULLABLE) - - if attrs.get('readable', "true").lower() == 'true': - attribute.tags.add(AttributeTag.READABLE) - - if attrs.get('writable', "false").lower() == 'true': - attribute.tags.add(AttributeTag.WRITABLE) - - # TODO: XML does not seem to contain information about - # - NOSUBSCRIBE - - # TODO: do we care about default value at all? - # General storage of default only applies to instantiation - - return attribute - - -class ProcessingPath: - def __init__(self, paths: List[str] = None): - if paths is None: - paths = [] - self.paths = paths - - def push(self, name: str): - self.paths.append(name) - - def pop(self): - self.paths.pop() - - def __str__(self): - return '::'.join(self.paths) - - def __repr__(self): - return 'ProcessingPath(%r)' % self.paths - - -class IdlPostProcessor: - """Defines a callback that will apply after an entire parsing - is complete. - """ - - def FinalizeProcessing(self, idl: Idl): - """Update idl with any post-processing directives.""" - pass - - -class ProcessingContext: - def __init__(self, locator: Optional[xml.sax.xmlreader.Locator] = None): - self.path = ProcessingPath() - self.locator = locator - self._not_handled = set() - self._idl_post_processors = [] - - # Map of code -> attribute - self._global_attributes = {} - - def GetCurrentLocationMeta(self) -> ParseMetaData: - if not self.locator: - return None - - return ParseMetaData(line=self.locator.getLineNumber(), column=self.locator.getColumnNumber()) - - def GetGlobalAttribute(self, code): - if code in self._global_attributes: - return self._global_attributes[code] - - raise Exception( - 'Global attribute 0x%X (%d) not found. You probably need to load global-attributes.xml' % (code, code)) - - def AddGlobalAttribute(self, attribute: Attribute): - # NOTE: this may get added several times as both 'client' and 'server' - # however matter should not differentiate between the two - code = attribute.definition.code - logging.info('Adding global attribute 0x%X (%d): %s' % (code, code, attribute.definition.name)) - - self._global_attributes[code] = attribute - - def MarkTagNotHandled(self): - path = str(self.path) - if path not in self._not_handled: - logging.warning("TAG %s was not handled/recognized" % path) - self._not_handled.add(path) - - def AddIdlPostProcessor(self, processor: IdlPostProcessor): - self._idl_post_processors.append(processor) - - def PostProcess(self, idl: Idl): - for p in self._idl_post_processors: - p.FinalizeProcessing(idl) - - self._idl_post_processors = [] - - -class ElementProcessor: - """A generic element processor. - - XML processing will be done in the form of a stack. whenever - a new element comes in, its processor is moved to the top of - the stack and poped once the element ends. - """ - - def __init__(self, context: ProcessingContext, handled=HandledDepth.NOT_HANDLED): - self.context = context - self._handled = handled - - def GetNextProcessor(self, name, attrs): - """Get the next processor to use for the given name""" - - if self._handled == HandledDepth.SINGLE_TAG: - handled = HandledDepth.NOT_HANDLED - else: - handled = self._handled - - return ElementProcessor(context=self.context, handled=handled) - - def HandleContent(self, content): - """Processes some content""" - pass - - def EndProcessing(self): - """Finalizes the processing of the current element""" - if self._handled == HandledDepth.NOT_HANDLED: - self.context.MarkTagNotHandled() - - -class ClusterNameProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, cluster): - super().__init__(context, handled=HandledDepth.SINGLE_TAG) - self._cluster = cluster - - def HandleContent(self, content): - self._cluster.name = content.replace(' ', '') - - -class AttributeDescriptionProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, attribute): - super().__init__(context, handled=HandledDepth.SINGLE_TAG) - self._attribute = attribute - - def HandleContent(self, content): - self._attribute.definition.name = content.replace(' ', '') - - -class ClusterCodeProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, cluster): - super().__init__(context, handled=HandledDepth.SINGLE_TAG) - self._cluster = cluster - - def HandleContent(self, content): - self._cluster.code = ParseInt(content) - - -class EventProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, cluster, attrs): - super().__init__(context) - self._cluster = cluster - - if attrs['priority'] == 'debug': - priority = EventPriority.DEBUG - elif attrs['priority'] == 'info': - priority = EventPriority.INFO - elif attrs['priority'] == 'critical': - priority = EventPriority.CRITICAL - else: - raise Exception("Unknown event priority: %s" % attrs['priority']) - - self._event = Event( - priority=priority, - code=ParseInt(attrs['code']), - name=attrs['name'], - fields=[], - ) - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'field': - data_type = DataType(name=attrs['type']) - if 'length' in attrs: - data_type.max_length = ParseInt(attrs['length']) - - field = Field( - data_type=data_type, - code=ParseInt(attrs['id']), - name=attrs['name'], - is_list=(attrs.get('array', 'false').lower() == 'true'), - ) - - if attrs.get('optional', "false").lower() == 'true': - field.attributes.add(FieldAttribute.OPTIONAL) - - if attrs.get('isNullable', "false").lower() == 'true': - field.attributes.add(FieldAttribute.NULLABLE) - - self._event.fields.append(field) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - elif name.lower() == 'access': - self._event.readacl = ParseAclRole(attrs) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - elif name.lower() == 'description': - return ElementProcessor(self.context, handled=HandledDepth.ENTIRE_TREE) - else: - return ElementProcessor(self.context) - - def EndProcessing(self): - self._cluster.events.append(self._event) - - -class AttributeProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, cluster, attrs): - super().__init__(context) - self._cluster = cluster - self._attribute = AttrsToAttribute(attrs) - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'access': - if 'modifier' in attrs: - if attrs['modifier'] != 'fabric-scoped': - raise Exception("UNKNOWN MODIFIER: %s" % attrs['modifier']) - self._attribute.tags.add(AttributeTag.FABRIC_SCOPED) - else: - role = ParseAclRole(attrs) - - if attrs['op'] == 'read': - self._attribute.readacl = role - elif attrs['op'] == 'write': - self._attribute.writeacl = role - else: - logging.error("Unknown access: %r" % attrs['op']) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - elif name.lower() == 'description': - return AttributeDescriptionProcessor(self.context, self._attribute) - else: - return ElementProcessor(self.context) - - def HandleContent(self, content): - # Content generally is the name EXCEPT if access controls - # exist, in which case `description` contains the name - content = content.strip() - if content and not self._attribute.definition.name: - self._attribute.definition.name = content - - def EndProcessing(self): - if self._attribute.definition.name is None: - raise Exception("Name for attribute was not parsed.") - - self._cluster.attributes.append(self._attribute) - - -class StructProcessor(ElementProcessor, IdlPostProcessor): - def __init__(self, context: ProcessingContext, attrs): - super().__init__(context) - - # if set, struct belongs to a specific cluster - self._cluster_codes = set() - self._struct = Struct(name=attrs['name'], fields=[]) - self._field_index = 0 - # The following are not set: - # - tag not set because not a request/response - # - code not set because not a response - - # TODO: handle this isFabricScoped attribute - self._is_fabric_scoped = (attrs.get('isFabricScoped', "false").lower() == 'true') - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'item': - data_type = DataType( - name=attrs['type'] - ) - - # TODO: handle isFabricSensitive - - if 'fieldId' in attrs: - self._field_index = ParseInt(attrs['fieldId']) - else: - # NOTE: code does NOT exist, so the number is incremental here - # this seems a defficiency in XML format. - self._field_index += 1 - - if 'length' in attrs: - data_type.max_length = ParseInt(attrs['length']) - - field = Field( - data_type=data_type, - code=self._field_index, - name=attrs['name'], - is_list=(attrs.get('array', 'false').lower() == 'true'), - ) - - if attrs.get('optional', "false").lower() == 'true': - field.attributes.add(FieldAttribute.OPTIONAL) - - if attrs.get('isNullable', "false").lower() == 'true': - field.attributes.add(FieldAttribute.NULLABLE) - - self._struct.fields.append(field) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - elif name.lower() == 'cluster': - self._cluster_codes.add(ParseInt(attrs['code'])) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - else: - return ElementProcessor(self.context) - - def FinalizeProcessing(self, idl: Idl): - # We have two choices of adding an enum: - # - inside a cluster if a code exists - # - inside top level if no codes were associated - - if self._cluster_codes: - for code in self._cluster_codes: - found = False - for c in idl.clusters: - if c.code == code: - c.structs.append(self._struct) - found = True - - if not found: - logging.error('Enum %s could not find cluster (code %d/0x%X)' % - (self._struct.name, code, code)) - else: - idl.structs.append(self._struct) - - def EndProcessing(self): - self.context.AddIdlPostProcessor(self) - - -class EnumProcessor(ElementProcessor, IdlPostProcessor): - def __init__(self, context: ProcessingContext, attrs): - super().__init__(context) - self._cluster_code = None # if set, enum belongs to a specific cluster - self._enum = Enum(name=attrs['name'], base_type=attrs['type'], entries=[]) - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'item': - self._enum.entries.append(ConstantEntry( - name=attrs['name'], - code=ParseInt(attrs['value']) - )) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - elif name.lower() == 'cluster': - if self._cluster_code is not None: - raise Exception('Multiple cluster codes for enum %s' % self._enum.name) - self._cluster_code = ParseInt(attrs['code']) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - else: - return ElementProcessor(self.context) - - def FinalizeProcessing(self, idl: Idl): - # We have two choices of adding an enum: - # - inside a cluster if a code exists - # - inside top level if a code does not exist - - if self._cluster_code is None: - idl.enums.append(self._enum) - else: - found = False - for c in idl.clusters: - if c.code == self._cluster_code: - c.enums.append(self._enum) - found = True - - if not found: - logging.error('Enum %s could not find its cluster (code %d/0x%X)' % - (self._enum.name, self._cluster_code, self._cluster_code)) - - def EndProcessing(self): - self.context.AddIdlPostProcessor(self) - - -class BitmapProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, attrs): - super().__init__(context) - self._cluster_code = None - self._bitmap = Bitmap(name=attrs['name'], base_type=attrs['type'], entries=[]) - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'cluster': - if self._cluster_code is not None: - raise Exception('Multiple cluster codes for structr %s' % self._struct.name) - self._cluster_code = ParseInt(attrs['code']) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - elif name.lower() == 'field': - self._bitmap.entries.append(ConstantEntry( - name=attrs['name'], - code=ParseInt(attrs['mask']) - )) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - elif name.lower() == 'description': - return ElementProcessor(self.context, handled=HandledDepth.ENTIRE_TREE) - else: - return ElementProcessor(self.context) - - def FinalizeProcessing(self, idl: Idl): - # We have two choices of adding an enum: - # - inside a cluster if a code exists - # - inside top level if a code does not exist - if self._cluster_code is None: - # Log only instead of critical, as not our XML is well formed. - # For example at the time of writing this, SwitchFeature in switch-cluster.xml - # did not have a code associated with it. - logging.error("Bitmap %r has no cluster code" % self._bitmap) - return - - found = False - for c in idl.clusters: - if c.code == self._cluster_code: - c.bitmaps.append(self._bitmap) - found = True - - if not found: - logging.error('Enum %s could not find its cluster (code %d/0x%X)' % - (self._bitmap.name, self._cluster_code, self._cluster_code)) - - def EndProcessing(self): - self.context.AddIdlPostProcessor(self) - - -class CommandProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, cluster, attrs): - super().__init__(context) - self._cluster = cluster - self._command = None - self._struct = Struct(name=attrs['name'], fields=[]) - self._field_index = 0 # commands DO NOT support field index it seems - - if attrs['source'].lower() == 'client': - self._struct.tag = StructTag.REQUEST - - name = attrs['name'] - - if name.endswith('Request'): - request_name = name - command_name = name[:-7] - else: - request_name = name+'Request' - command_name = name - - if 'response' in attrs: - response_name = attrs['response'] - else: - response_name = 'DefaultResponse' - - # TODO: what if no response? DefaultResponse? - self._command = Command( - name=name, - code=ParseInt(attrs['code']), - input_param=request_name, - output_param=response_name, - ) - - # TODO: command attributes: - # - timed invoke - # - fabric scoped - else: - self._struct.tag = StructTag.RESPONSE - self._struct.code = ParseInt(attrs['code']) - - def GetArgumentField(self, attrs): - data_type = DataType(name=attrs['type']) - - if 'length' in attrs: - data_type.max_length = ParseInt(attrs['length']) - - self._field_index += 1 - - field = Field( - data_type=data_type, - code=self._field_index, - name=attrs['name'], - is_list=(attrs.get('array', 'false') == 'true') - ) - - if attrs.get('optional', "false").lower() == 'true': - field.attributes.add(FieldAttribute.OPTIONAL) - - if attrs.get('isNullable', "false").lower() == 'true': - field.attributes.add(FieldAttribute.NULLABLE) - - return field - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'access': - if attrs['op'] != 'invoke': - raise Exception('Unknown access for %r' % self._struct) - - if self._command: - self._command.invokeacl = ParseAclRole(attrs) - else: - logging.warning("Ignored access role for reply %r" % self._struct) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - elif name.lower() == 'arg': - self._struct.fields.append(self.GetArgumentField(attrs)) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - elif name.lower() == 'description': - return ElementProcessor(self.context, handled=HandledDepth.ENTIRE_TREE) - else: - # TODO: implement - return ElementProcessor(self.context) - - def EndProcessing(self): - - if self._struct.fields: - self._cluster.structs.append(self._struct) - else: - # no input - self._command.input_param = None - - if self._command: - self._cluster.commands.append(self._command) - - -class ClusterGlobalAttributeProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, cluster: Cluster, code: int): - super().__init__(context) - self._cluster = cluster - self._code = code - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'featurebit': - # It is uncler what featurebits mean. likely a bitmap should be created - # here, however only one such example exists currently: door-lock-cluster.xml - logging.info('Ignoring featurebit tag for global attribute 0x%X (%d)' % (self._code, self._code)) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - else: - return ElementProcessor(self.context) - - def EndProcessing(self): - self._cluster.attributes.append(self.context.GetGlobalAttribute(self._code)) - - -class ClusterProcessor(ElementProcessor): - """Handles configurator/cluster processing""" - - def __init__(self, context: ProcessingContext, idl: Idl): - super().__init__(context) - self._cluster = Cluster( - side=ClusterSide.CLIENT, - name=None, - code=None, - parse_meta=context.GetCurrentLocationMeta() - ) - self._idl = idl - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'code': - return ClusterCodeProcessor(self.context, self._cluster) - elif name.lower() == 'name': - return ClusterNameProcessor(self.context, self._cluster) - elif name.lower() == 'attribute': - return AttributeProcessor(self.context, self._cluster, attrs) - elif name.lower() == 'event': - return EventProcessor(self.context, self._cluster, attrs) - elif name.lower() == 'globalattribute': - # We ignore 'side' and 'value' since they do not seem useful - return ClusterGlobalAttributeProcessor(self.context, self._cluster, ParseInt(attrs['code'])) - elif name.lower() == 'command': - return CommandProcessor(self.context, self._cluster, attrs) - elif name.lower() in ['define', 'description', 'domain', 'tag', 'client', 'server']: - # NOTE: we COULD use client and server to create separate definitions - # of each, but the usefulness of this is unclear as the definitions are - # likely identical and matter has no concept of differences between the two - return ElementProcessor(self.context, handled=HandledDepth.ENTIRE_TREE) - else: - return ElementProcessor(self.context) - - def EndProcessing(self): - if self._cluster.name is None: - raise Exception("Missing cluster name") - elif self._cluster.code is None: - raise Exception("Missing cluster code") - - self._idl.clusters.append(self._cluster) - -# Cluster extensions have extra bits for existing clusters. Can only be loaded -# IF the underlying cluster exits -class ClusterExtensionProcessor(ClusterProcessor, IdlPostProcessor): - def __init__(self, context: ProcessingContext, code: int): - # NOTE: IDL is set to NONE so that ClusterProcessor cannot - # inadvertently change it (it will be invalid anyway) - super().__init__(context, None) - self._cluster_code = code - - def EndProcessing(self): - self.context.AddIdlPostProcessor(self) - - def FinalizeProcessing(self, idl: Idl): - found = False - for c in idl.clusters: - if c.code == self._cluster_code: - found = True - - # Append everything that can be appended - c.enums.extend(self._cluster.enums) - c.bitmaps.extend(self._cluster.bitmaps) - c.events.extend(self._cluster.events) - c.attributes.extend(self._cluster.attributes) - c.structs.extend(self._cluster.structs) - c.commands.extend(self._cluster.commands) - - if not found: - logging.error('Could not extend cluster 0x%X (%d): cluster not found' % - (self._cluster_code, self._cluster_code)) - - - - -class GlobalAttributeProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, attribute: Attribute): - super().__init__(context, handled=HandledDepth.SINGLE_TAG) - self._attribute = attribute - - def HandleContent(self, content): - # Content generally is the name EXCEPT if access controls - # exist, in which case `description` contains the name - # - # Global attributes do not currently have access controls, so this - # case is not handled here - content = content.strip() - if content and not self._attribute.definition.name: - self._attribute.definition.name = content - - def EndProcessing(self): - if self._attribute.definition.name is None: - raise Exception("Name for attribute was not parsed.") - - self.context.AddGlobalAttribute(self._attribute) - - -class GlobalProcessor(ElementProcessor): - """Processes configurator/global """ - - def __init__(self, context: ProcessingContext): - super().__init__(context, handled=HandledDepth.SINGLE_TAG) - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'attribute': - if attrs['side'].lower() == 'client': - # We expect to also have 'server' equivalent, so ignore client - # side attributes - logging.debug('Ignoring global client-side attribute %s' % (attrs['code'])) - return ElementProcessor(self.context, handled=HandledDepth.SINGLE_TAG) - - return GlobalAttributeProcessor(self.context, AttrsToAttribute(attrs)) - else: - return ElementProcessor(self.context) - - -class ConfiguratorProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, idl: Idl): - super().__init__(context, handled=HandledDepth.SINGLE_TAG) - self._idl = idl - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'cluster': - return ClusterProcessor(self.context, self._idl) - elif name.lower() == 'enum': - return EnumProcessor(self.context, attrs) - elif name.lower() == 'struct': - return StructProcessor(self.context, attrs) - elif name.lower() == 'bitmap': - return BitmapProcessor(self.context, attrs) - elif name.lower() == 'domain': - return ElementProcessor(self.context, handled=HandledDepth.ENTIRE_TREE) - elif name.lower() == 'clusterextension': - return ClusterExtensionProcessor(self.context, ParseInt(attrs['code'])) - elif name.lower() == 'accesscontrol': - # These contain operation/role/modifier and generally only contain a - # description. These do not seem as useful to parse. - return ElementProcessor(self.context, handled=HandledDepth.ENTIRE_TREE) - elif name.lower() == 'atomic': - # A list of types in 'chip-types' - # Generally does not seem useful - matches a type id to a description, size and some discrete/analog flags - # - # Could be eventually used as a preload of types into base types, however matter idl - # generator logic has hardcoded sizing as well. - return ElementProcessor(self.context, handled=HandledDepth.ENTIRE_TREE) - elif name.lower() == 'devicetype': - # A list of device types in 'matter-devices.xml' - # Useful for conformance tests, but does not seem usable for serialization logic - return ElementProcessor(self.context, handled=HandledDepth.ENTIRE_TREE) - elif name.lower() == 'global': - return GlobalProcessor(self.context) - else: - return ElementProcessor(self.context) - - -class ZapXmlProcessor(ElementProcessor): - def __init__(self, context: ProcessingContext, idl: Idl): - super().__init__(context) - self._idl = idl - - def GetNextProcessor(self, name, attrs): - if name.lower() == 'configurator': - return ConfiguratorProcessor(self.context, self._idl) - else: - return ElementProcessor(self.context) + from idl.zapxml.handlers import Context, ZapXmlHandler +from idl.matter_idl_types import Idl class ParseHandler(xml.sax.handler.ContentHandler): def __init__(self): @@ -792,7 +41,7 @@ def __init__(self): self._idl = Idl() self._processing_stack = [] # Context persists across all - self._context = ProcessingContext() + self._context = Context() def PrepareParsing(self, filename): # This is a bit ugly: filename keeps changing during parse @@ -805,7 +54,7 @@ def ProcessResults(self) -> Idl: def startDocument(self): self._context.locator = self._locator - self._processing_stack = [ZapXmlProcessor(self._context, self._idl)] + self._processing_stack = [ZapXmlHandler(self._context, self._idl)] def endDocument(self): if len(self._processing_stack) != 1: diff --git a/scripts/idl/zapxml/__init__.py b/scripts/idl/zapxml/__init__.py new file mode 100644 index 00000000000000..a234340e82116d --- /dev/null +++ b/scripts/idl/zapxml/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2022 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/scripts/idl/zapxml/handlers/__init__.py b/scripts/idl/zapxml/handlers/__init__.py new file mode 100644 index 00000000000000..8529ed814d9c38 --- /dev/null +++ b/scripts/idl/zapxml/handlers/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2022 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .handlers import ZapXmlHandler +from .context import Context \ No newline at end of file diff --git a/scripts/idl/zapxml/handlers/context.py b/scripts/idl/zapxml/handlers/context.py new file mode 100644 index 00000000000000..46ea8bbbc16c17 --- /dev/null +++ b/scripts/idl/zapxml/handlers/context.py @@ -0,0 +1,94 @@ +# Copyright (c) 2022 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import xml.sax.xmlreader + +from idl.matter_idl_types import Idl, ParseMetaData, Attribute +from typing import Optional, List + +class IdlPostProcessor: + """Defines a callback that will apply after an entire parsing + is complete. + """ + + def FinalizeProcessing(self, idl: Idl): + """Update idl with any post-processing directives.""" + pass + +class ProcessingPath: + def __init__(self, paths: List[str] = None): + if paths is None: + paths = [] + self.paths = paths + + def push(self, name: str): + self.paths.append(name) + + def pop(self): + self.paths.pop() + + def __str__(self): + return '::'.join(self.paths) + + def __repr__(self): + return 'ProcessingPath(%r)' % self.paths + + + +class Context: + def __init__(self, locator: Optional[xml.sax.xmlreader.Locator] = None): + self.path = ProcessingPath() + self.locator = locator + self._not_handled = set() + self._idl_post_processors = [] + + # Map of code -> attribute + self._global_attributes = {} + + def GetCurrentLocationMeta(self) -> ParseMetaData: + if not self.locator: + return None + + return ParseMetaData(line=self.locator.getLineNumber(), column=self.locator.getColumnNumber()) + + def GetGlobalAttribute(self, code): + if code in self._global_attributes: + return self._global_attributes[code] + + raise Exception( + 'Global attribute 0x%X (%d) not found. You probably need to load global-attributes.xml' % (code, code)) + + def AddGlobalAttribute(self, attribute: Attribute): + # NOTE: this may get added several times as both 'client' and 'server' + # however matter should not differentiate between the two + code = attribute.definition.code + logging.info('Adding global attribute 0x%X (%d): %s' % (code, code, attribute.definition.name)) + + self._global_attributes[code] = attribute + + def MarkTagNotHandled(self): + path = str(self.path) + if path not in self._not_handled: + logging.warning("TAG %s was not handled/recognized" % path) + self._not_handled.add(path) + + def AddIdlPostProcessor(self, processor: IdlPostProcessor): + self._idl_post_processors.append(processor) + + def PostProcess(self, idl: Idl): + for p in self._idl_post_processors: + p.FinalizeProcessing(idl) + + self._idl_post_processors = [] diff --git a/scripts/idl/zapxml/handlers/handlers.py b/scripts/idl/zapxml/handlers/handlers.py new file mode 100644 index 00000000000000..3dfd267f7dfd6d --- /dev/null +++ b/scripts/idl/zapxml/handlers/handlers.py @@ -0,0 +1,698 @@ +# Copyright (c) 2022 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum +import logging + +from idl.matter_idl_types import * +from typing import Optional, Union, List + +from .context import Context, IdlPostProcessor + +class HandledDepth: + """Defines how deep a XML element has been handled.""" + NOT_HANDLED = enum.auto() # Unknown/parsed element + ENTIRE_TREE = enum.auto() # Entire tree can be ignored + SINGLE_TAG = enum.auto() # Single tag processed, but not sub-items + + +def ParseInt(value: str) -> int: + if value.startswith('0x'): + return int(value[2:], 16) + else: + return int(value) + + +def ParseAclRole(attrs) -> AccessPrivilege: + # XML seems to use both role and privilege to mean the same thing + # they are used interchangeably + if 'role' in attrs: + role = attrs['role'] + else: + role = attrs['privilege'] + + if role.lower() == 'view': + return AccessPrivilege.VIEW + elif role.lower() == 'operate': + return AccessPrivilege.OPERATE + elif role.lower() == 'manage': + return AccessPrivilege.MANAGE + elif role.lower() == 'administer': + return AccessPrivilege.ADMINISTER + else: + raise Exception('Unknown ACL role: %r' % role) + + +def AttrsToAttribute(attrs) -> Attribute: + if attrs['type'].lower() == 'array': + data_type = DataType(name=attrs['entryType']) + else: + data_type = DataType(name=attrs['type']) + + if 'length' in attrs: + data_type.max_length = ParseInt(attrs['length']) + + field = Field( + data_type=data_type, + code=ParseInt(attrs['code']), + name=None, + is_list=(attrs['type'].lower() == 'array') + ) + + attribute = Attribute(definition=field) + + if attrs.get('optional', "false").lower() == 'true': + attribute.definition.attributes.add(FieldAttribute.OPTIONAL) + + if attrs.get('isNullable', "false").lower() == 'true': + attribute.definition.attributes.add(FieldAttribute.NULLABLE) + + if attrs.get('readable', "true").lower() == 'true': + attribute.tags.add(AttributeTag.READABLE) + + if attrs.get('writable', "false").lower() == 'true': + attribute.tags.add(AttributeTag.WRITABLE) + + # TODO: XML does not seem to contain information about + # - NOSUBSCRIBE + + # TODO: do we care about default value at all? + # General storage of default only applies to instantiation + + return attribute + + + +class ElementHandler: + """A generic element processor. + + XML processing will be done in the form of a stack. whenever + a new element comes in, its processor is moved to the top of + the stack and poped once the element ends. + """ + + def __init__(self, context: Context, handled=HandledDepth.NOT_HANDLED): + self.context = context + self._handled = handled + + def GetNextProcessor(self, name, attrs): + """Get the next processor to use for the given name""" + + if self._handled == HandledDepth.SINGLE_TAG: + handled = HandledDepth.NOT_HANDLED + else: + handled = self._handled + + return ElementHandler(context=self.context, handled=handled) + + def HandleContent(self, content): + """Processes some content""" + pass + + def EndProcessing(self): + """Finalizes the processing of the current element""" + if self._handled == HandledDepth.NOT_HANDLED: + self.context.MarkTagNotHandled() + + +class ClusterNameHandler(ElementHandler): + def __init__(self, context: Context, cluster): + super().__init__(context, handled=HandledDepth.SINGLE_TAG) + self._cluster = cluster + + def HandleContent(self, content): + self._cluster.name = content.replace(' ', '') + + +class AttributeDescriptionHandler(ElementHandler): + def __init__(self, context: Context, attribute): + super().__init__(context, handled=HandledDepth.SINGLE_TAG) + self._attribute = attribute + + def HandleContent(self, content): + self._attribute.definition.name = content.replace(' ', '') + + +class ClusterCodeHandler(ElementHandler): + def __init__(self, context: Context, cluster): + super().__init__(context, handled=HandledDepth.SINGLE_TAG) + self._cluster = cluster + + def HandleContent(self, content): + self._cluster.code = ParseInt(content) + + +class EventHandler(ElementHandler): + def __init__(self, context: Context, cluster, attrs): + super().__init__(context) + self._cluster = cluster + + if attrs['priority'] == 'debug': + priority = EventPriority.DEBUG + elif attrs['priority'] == 'info': + priority = EventPriority.INFO + elif attrs['priority'] == 'critical': + priority = EventPriority.CRITICAL + else: + raise Exception("Unknown event priority: %s" % attrs['priority']) + + self._event = Event( + priority=priority, + code=ParseInt(attrs['code']), + name=attrs['name'], + fields=[], + ) + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'field': + data_type = DataType(name=attrs['type']) + if 'length' in attrs: + data_type.max_length = ParseInt(attrs['length']) + + field = Field( + data_type=data_type, + code=ParseInt(attrs['id']), + name=attrs['name'], + is_list=(attrs.get('array', 'false').lower() == 'true'), + ) + + if attrs.get('optional', "false").lower() == 'true': + field.attributes.add(FieldAttribute.OPTIONAL) + + if attrs.get('isNullable', "false").lower() == 'true': + field.attributes.add(FieldAttribute.NULLABLE) + + self._event.fields.append(field) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + elif name.lower() == 'access': + self._event.readacl = ParseAclRole(attrs) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + elif name.lower() == 'description': + return ElementHandler(self.context, handled=HandledDepth.ENTIRE_TREE) + else: + return ElementHandler(self.context) + + def EndProcessing(self): + self._cluster.events.append(self._event) + + +class AttributeHandler(ElementHandler): + def __init__(self, context: Context, cluster, attrs): + super().__init__(context) + self._cluster = cluster + self._attribute = AttrsToAttribute(attrs) + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'access': + if 'modifier' in attrs: + if attrs['modifier'] != 'fabric-scoped': + raise Exception("UNKNOWN MODIFIER: %s" % attrs['modifier']) + self._attribute.tags.add(AttributeTag.FABRIC_SCOPED) + else: + role = ParseAclRole(attrs) + + if attrs['op'] == 'read': + self._attribute.readacl = role + elif attrs['op'] == 'write': + self._attribute.writeacl = role + else: + logging.error("Unknown access: %r" % attrs['op']) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + elif name.lower() == 'description': + return AttributeDescriptionHandler(self.context, self._attribute) + else: + return ElementHandler(self.context) + + def HandleContent(self, content): + # Content generally is the name EXCEPT if access controls + # exist, in which case `description` contains the name + content = content.strip() + if content and not self._attribute.definition.name: + self._attribute.definition.name = content + + def EndProcessing(self): + if self._attribute.definition.name is None: + raise Exception("Name for attribute was not parsed.") + + self._cluster.attributes.append(self._attribute) + + +class StructHandler(ElementHandler, IdlPostProcessor): + def __init__(self, context: Context, attrs): + super().__init__(context) + + # if set, struct belongs to a specific cluster + self._cluster_codes = set() + self._struct = Struct(name=attrs['name'], fields=[]) + self._field_index = 0 + # The following are not set: + # - tag not set because not a request/response + # - code not set because not a response + + # TODO: handle this isFabricScoped attribute + self._is_fabric_scoped = (attrs.get('isFabricScoped', "false").lower() == 'true') + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'item': + data_type = DataType( + name=attrs['type'] + ) + + # TODO: handle isFabricSensitive + + if 'fieldId' in attrs: + self._field_index = ParseInt(attrs['fieldId']) + else: + # NOTE: code does NOT exist, so the number is incremental here + # this seems a defficiency in XML format. + self._field_index += 1 + + if 'length' in attrs: + data_type.max_length = ParseInt(attrs['length']) + + field = Field( + data_type=data_type, + code=self._field_index, + name=attrs['name'], + is_list=(attrs.get('array', 'false').lower() == 'true'), + ) + + if attrs.get('optional', "false").lower() == 'true': + field.attributes.add(FieldAttribute.OPTIONAL) + + if attrs.get('isNullable', "false").lower() == 'true': + field.attributes.add(FieldAttribute.NULLABLE) + + self._struct.fields.append(field) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + elif name.lower() == 'cluster': + self._cluster_codes.add(ParseInt(attrs['code'])) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + else: + return ElementHandler(self.context) + + def FinalizeProcessing(self, idl: Idl): + # We have two choices of adding an enum: + # - inside a cluster if a code exists + # - inside top level if no codes were associated + + if self._cluster_codes: + for code in self._cluster_codes: + found = False + for c in idl.clusters: + if c.code == code: + c.structs.append(self._struct) + found = True + + if not found: + logging.error('Enum %s could not find cluster (code %d/0x%X)' % + (self._struct.name, code, code)) + else: + idl.structs.append(self._struct) + + def EndProcessing(self): + self.context.AddIdlPostProcessor(self) + + +class EnumHandler(ElementHandler, IdlPostProcessor): + def __init__(self, context: Context, attrs): + super().__init__(context) + self._cluster_code = None # if set, enum belongs to a specific cluster + self._enum = Enum(name=attrs['name'], base_type=attrs['type'], entries=[]) + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'item': + self._enum.entries.append(ConstantEntry( + name=attrs['name'], + code=ParseInt(attrs['value']) + )) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + elif name.lower() == 'cluster': + if self._cluster_code is not None: + raise Exception('Multiple cluster codes for enum %s' % self._enum.name) + self._cluster_code = ParseInt(attrs['code']) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + else: + return ElementHandler(self.context) + + def FinalizeProcessing(self, idl: Idl): + # We have two choices of adding an enum: + # - inside a cluster if a code exists + # - inside top level if a code does not exist + + if self._cluster_code is None: + idl.enums.append(self._enum) + else: + found = False + for c in idl.clusters: + if c.code == self._cluster_code: + c.enums.append(self._enum) + found = True + + if not found: + logging.error('Enum %s could not find its cluster (code %d/0x%X)' % + (self._enum.name, self._cluster_code, self._cluster_code)) + + def EndProcessing(self): + self.context.AddIdlPostProcessor(self) + + +class BitmapHandler(ElementHandler): + def __init__(self, context: Context, attrs): + super().__init__(context) + self._cluster_code = None + self._bitmap = Bitmap(name=attrs['name'], base_type=attrs['type'], entries=[]) + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'cluster': + if self._cluster_code is not None: + raise Exception('Multiple cluster codes for structr %s' % self._struct.name) + self._cluster_code = ParseInt(attrs['code']) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + elif name.lower() == 'field': + self._bitmap.entries.append(ConstantEntry( + name=attrs['name'], + code=ParseInt(attrs['mask']) + )) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + elif name.lower() == 'description': + return ElementHandler(self.context, handled=HandledDepth.ENTIRE_TREE) + else: + return ElementHandler(self.context) + + def FinalizeProcessing(self, idl: Idl): + # We have two choices of adding an enum: + # - inside a cluster if a code exists + # - inside top level if a code does not exist + if self._cluster_code is None: + # Log only instead of critical, as not our XML is well formed. + # For example at the time of writing this, SwitchFeature in switch-cluster.xml + # did not have a code associated with it. + logging.error("Bitmap %r has no cluster code" % self._bitmap) + return + + found = False + for c in idl.clusters: + if c.code == self._cluster_code: + c.bitmaps.append(self._bitmap) + found = True + + if not found: + logging.error('Enum %s could not find its cluster (code %d/0x%X)' % + (self._bitmap.name, self._cluster_code, self._cluster_code)) + + def EndProcessing(self): + self.context.AddIdlPostProcessor(self) + + +class CommandHandler(ElementHandler): + def __init__(self, context: Context, cluster, attrs): + super().__init__(context) + self._cluster = cluster + self._command = None + self._struct = Struct(name=attrs['name'], fields=[]) + self._field_index = 0 # commands DO NOT support field index it seems + + if attrs['source'].lower() == 'client': + self._struct.tag = StructTag.REQUEST + + name = attrs['name'] + + if name.endswith('Request'): + request_name = name + command_name = name[:-7] + else: + request_name = name+'Request' + command_name = name + + if 'response' in attrs: + response_name = attrs['response'] + else: + response_name = 'DefaultResponse' + + # TODO: what if no response? DefaultResponse? + self._command = Command( + name=name, + code=ParseInt(attrs['code']), + input_param=request_name, + output_param=response_name, + ) + + # TODO: command attributes: + # - timed invoke + # - fabric scoped + else: + self._struct.tag = StructTag.RESPONSE + self._struct.code = ParseInt(attrs['code']) + + def GetArgumentField(self, attrs): + data_type = DataType(name=attrs['type']) + + if 'length' in attrs: + data_type.max_length = ParseInt(attrs['length']) + + self._field_index += 1 + + field = Field( + data_type=data_type, + code=self._field_index, + name=attrs['name'], + is_list=(attrs.get('array', 'false') == 'true') + ) + + if attrs.get('optional', "false").lower() == 'true': + field.attributes.add(FieldAttribute.OPTIONAL) + + if attrs.get('isNullable', "false").lower() == 'true': + field.attributes.add(FieldAttribute.NULLABLE) + + return field + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'access': + if attrs['op'] != 'invoke': + raise Exception('Unknown access for %r' % self._struct) + + if self._command: + self._command.invokeacl = ParseAclRole(attrs) + else: + logging.warning("Ignored access role for reply %r" % self._struct) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + elif name.lower() == 'arg': + self._struct.fields.append(self.GetArgumentField(attrs)) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + elif name.lower() == 'description': + return ElementHandler(self.context, handled=HandledDepth.ENTIRE_TREE) + else: + # TODO: implement + return ElementHandler(self.context) + + def EndProcessing(self): + + if self._struct.fields: + self._cluster.structs.append(self._struct) + else: + # no input + self._command.input_param = None + + if self._command: + self._cluster.commands.append(self._command) + + +class ClusterGlobalAttributeHandler(ElementHandler): + def __init__(self, context: Context, cluster: Cluster, code: int): + super().__init__(context) + self._cluster = cluster + self._code = code + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'featurebit': + # It is uncler what featurebits mean. likely a bitmap should be created + # here, however only one such example exists currently: door-lock-cluster.xml + logging.info('Ignoring featurebit tag for global attribute 0x%X (%d)' % (self._code, self._code)) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + else: + return ElementHandler(self.context) + + def EndProcessing(self): + self._cluster.attributes.append(self.context.GetGlobalAttribute(self._code)) + + +class ClusterHandler(ElementHandler): + """Handles configurator/cluster processing""" + + def __init__(self, context: Context, idl: Idl): + super().__init__(context) + self._cluster = Cluster( + side=ClusterSide.CLIENT, + name=None, + code=None, + parse_meta=context.GetCurrentLocationMeta() + ) + self._idl = idl + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'code': + return ClusterCodeHandler(self.context, self._cluster) + elif name.lower() == 'name': + return ClusterNameHandler(self.context, self._cluster) + elif name.lower() == 'attribute': + return AttributeHandler(self.context, self._cluster, attrs) + elif name.lower() == 'event': + return EventHandler(self.context, self._cluster, attrs) + elif name.lower() == 'globalattribute': + # We ignore 'side' and 'value' since they do not seem useful + return ClusterGlobalAttributeHandler(self.context, self._cluster, ParseInt(attrs['code'])) + elif name.lower() == 'command': + return CommandHandler(self.context, self._cluster, attrs) + elif name.lower() in ['define', 'description', 'domain', 'tag', 'client', 'server']: + # NOTE: we COULD use client and server to create separate definitions + # of each, but the usefulness of this is unclear as the definitions are + # likely identical and matter has no concept of differences between the two + return ElementHandler(self.context, handled=HandledDepth.ENTIRE_TREE) + else: + return ElementHandler(self.context) + + def EndProcessing(self): + if self._cluster.name is None: + raise Exception("Missing cluster name") + elif self._cluster.code is None: + raise Exception("Missing cluster code") + + self._idl.clusters.append(self._cluster) + +# Cluster extensions have extra bits for existing clusters. Can only be loaded +# IF the underlying cluster exits +class ClusterExtensionHandler(ClusterHandler, IdlPostProcessor): + def __init__(self, context: Context, code: int): + # NOTE: IDL is set to NONE so that ClusterHandler cannot + # inadvertently change it (it will be invalid anyway) + super().__init__(context, None) + self._cluster_code = code + + def EndProcessing(self): + self.context.AddIdlPostProcessor(self) + + def FinalizeProcessing(self, idl: Idl): + found = False + for c in idl.clusters: + if c.code == self._cluster_code: + found = True + + # Append everything that can be appended + c.enums.extend(self._cluster.enums) + c.bitmaps.extend(self._cluster.bitmaps) + c.events.extend(self._cluster.events) + c.attributes.extend(self._cluster.attributes) + c.structs.extend(self._cluster.structs) + c.commands.extend(self._cluster.commands) + + if not found: + logging.error('Could not extend cluster 0x%X (%d): cluster not found' % + (self._cluster_code, self._cluster_code)) + + + + +class GlobalAttributeHandler(ElementHandler): + def __init__(self, context: Context, attribute: Attribute): + super().__init__(context, handled=HandledDepth.SINGLE_TAG) + self._attribute = attribute + + def HandleContent(self, content): + # Content generally is the name EXCEPT if access controls + # exist, in which case `description` contains the name + # + # Global attributes do not currently have access controls, so this + # case is not handled here + content = content.strip() + if content and not self._attribute.definition.name: + self._attribute.definition.name = content + + def EndProcessing(self): + if self._attribute.definition.name is None: + raise Exception("Name for attribute was not parsed.") + + self.context.AddGlobalAttribute(self._attribute) + + +class GlobalHandler(ElementHandler): + """Processes configurator/global """ + + def __init__(self, context: Context): + super().__init__(context, handled=HandledDepth.SINGLE_TAG) + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'attribute': + if attrs['side'].lower() == 'client': + # We expect to also have 'server' equivalent, so ignore client + # side attributes + logging.debug('Ignoring global client-side attribute %s' % (attrs['code'])) + return ElementHandler(self.context, handled=HandledDepth.SINGLE_TAG) + + return GlobalAttributeHandler(self.context, AttrsToAttribute(attrs)) + else: + return ElementHandler(self.context) + + +class ConfigurationHandler(ElementHandler): + def __init__(self, context: Context, idl: Idl): + super().__init__(context, handled=HandledDepth.SINGLE_TAG) + self._idl = idl + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'cluster': + return ClusterHandler(self.context, self._idl) + elif name.lower() == 'enum': + return EnumHandler(self.context, attrs) + elif name.lower() == 'struct': + return StructHandler(self.context, attrs) + elif name.lower() == 'bitmap': + return BitmapHandler(self.context, attrs) + elif name.lower() == 'domain': + return ElementHandler(self.context, handled=HandledDepth.ENTIRE_TREE) + elif name.lower() == 'clusterextension': + return ClusterExtensionHandler(self.context, ParseInt(attrs['code'])) + elif name.lower() == 'accesscontrol': + # These contain operation/role/modifier and generally only contain a + # description. These do not seem as useful to parse. + return ElementHandler(self.context, handled=HandledDepth.ENTIRE_TREE) + elif name.lower() == 'atomic': + # A list of types in 'chip-types' + # Generally does not seem useful - matches a type id to a description, size and some discrete/analog flags + # + # Could be eventually used as a preload of types into base types, however matter idl + # generator logic has hardcoded sizing as well. + return ElementHandler(self.context, handled=HandledDepth.ENTIRE_TREE) + elif name.lower() == 'devicetype': + # A list of device types in 'matter-devices.xml' + # Useful for conformance tests, but does not seem usable for serialization logic + return ElementHandler(self.context, handled=HandledDepth.ENTIRE_TREE) + elif name.lower() == 'global': + return GlobalHandler(self.context) + else: + return ElementHandler(self.context) + + +class ZapXmlHandler(ElementHandler): + def __init__(self, context: Context, idl: Idl): + super().__init__(context) + self._idl = idl + + def GetNextProcessor(self, name, attrs): + if name.lower() == 'configurator': + return ConfigurationHandler(self.context, self._idl) + else: + return ElementHandler(self.context) +