diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index 97897e8086c941..d6c580e643e9ff 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -468,6 +468,7 @@ jobs:
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_RVCCLEANM_1_2.py" --script-args "--int-arg PIXIT_ENDPOINT:1 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_RVCRUNM_1_2.py" --script-args "--int-arg PIXIT_ENDPOINT:1 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestMatterTestingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
+ scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestConformanceSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DRLK_2_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DRLK_2_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DRLK_2_12.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"'
diff --git a/src/python_testing/TC_DeviceBasicComposition.py b/src/python_testing/TC_DeviceBasicComposition.py
index b333085fa2eef1..1f5dc38813a975 100644
--- a/src/python_testing/TC_DeviceBasicComposition.py
+++ b/src/python_testing/TC_DeviceBasicComposition.py
@@ -31,8 +31,11 @@
import chip.clusters.ClusterObjects
import chip.tlv
from chip.clusters.Attribute import ValueDecodeFailure
-from matter_testing_support import AttributePathLocation, MatterBaseTest, async_test_body, default_matter_test_main
+from conformance_support import ConformanceDecision, conformance_allowed
+from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest,
+ async_test_body, default_matter_test_main)
from mobly import asserts
+from spec_parsing_support import CommandType, build_xml_clusters
def MatterTlvToJson(tlv_data: dict[int, Any]) -> dict[str, Any]:
@@ -870,6 +873,129 @@ def test_DESC_2_2(self):
if problems or root_problems:
self.fail_current_test("Problems with tags lists")
+ def test_spec_conformance(self):
+ success = True
+ # TODO: provisional needs to be an input parameter
+ allow_provisional = True
+ clusters, problems = build_xml_clusters()
+ self.problems = self.problems + problems
+ for id in sorted(list(clusters.keys())):
+ print(f'{id} 0x{id:02x}: {clusters[id].name}')
+ for endpoint_id, endpoint in self.endpoints_tlv.items():
+ for cluster_id, cluster in endpoint.items():
+ if cluster_id not in clusters.keys():
+ if (cluster_id & 0xFFFF_0000) != 0:
+ # manufacturer cluster
+ continue
+ location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id)
+ # TODO: update this from a warning once we have all the data
+ self.record_warning(self.get_test_name(), location=location,
+ problem='Standard cluster found on device, but is not present in spec data')
+ continue
+
+ # TODO: switch to use global FEATURE_MAP_ID etc. once the IDM-10.1 change is merged.
+ FEATURE_MAP_ID = 0xFFFC
+ ATTRIBUTE_LIST_ID = 0xFFFB
+ ACCEPTED_COMMAND_ID = 0xFFF9
+ GENERATED_COMMAND_ID = 0xFFF8
+
+ feature_map = cluster[FEATURE_MAP_ID]
+ attribute_list = cluster[ATTRIBUTE_LIST_ID]
+ all_command_list = cluster[ACCEPTED_COMMAND_ID] + cluster[GENERATED_COMMAND_ID]
+
+ # Feature conformance checking
+ feature_masks = [1 << i for i in range(32) if feature_map & (1 << i)]
+ for f in feature_masks:
+ location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=FEATURE_MAP_ID)
+ if f not in clusters[cluster_id].features.keys():
+ self.record_error(self.get_test_name(), location=location, problem=f'Unknown feature with mask 0x{f:02x}')
+ success = False
+ continue
+ xml_feature = clusters[cluster_id].features[f]
+ conformance_decision = xml_feature.conformance(feature_map, attribute_list, all_command_list)
+ if not conformance_allowed(conformance_decision, allow_provisional):
+ self.record_error(self.get_test_name(), location=location,
+ problem=f'Disallowed feature with mask 0x{f:02x}')
+ success = False
+ for feature_mask, xml_feature in clusters[cluster_id].features.items():
+ conformance_decision = xml_feature.conformance(feature_map, attribute_list, all_command_list)
+ if conformance_decision == ConformanceDecision.MANDATORY and feature_mask not in feature_masks:
+ self.record_error(self.get_test_name(), location=location,
+ problem=f'Required feature with mask 0x{f:02x} is not present in feature map')
+ success = False
+
+ # Attribute conformance checking
+ for attribute_id, attribute in cluster.items():
+ location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
+ if attribute_id not in clusters[cluster_id].attributes.keys():
+ # TODO: Consolidate the range checks with IDM-10.1 once that lands
+ if attribute_id <= 0x4FFF:
+ # manufacturer attribute
+ self.record_error(self.get_test_name(), location=location,
+ problem='Standard attribute found on device, but not in spec')
+ success = False
+ continue
+ xml_attribute = clusters[cluster_id].attributes[attribute_id]
+ conformance_decision = xml_attribute.conformance(feature_map, attribute_list, all_command_list)
+ if not conformance_allowed(conformance_decision, allow_provisional):
+ location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
+ self.record_error(self.get_test_name(), location=location,
+ problem=f'Attribute 0x{attribute_id:02x} is included, but is disallowed by conformance')
+ success = False
+ for attribute_id, xml_attribute in clusters[cluster_id].attributes.items():
+ conformance_decision = xml_attribute.conformance(feature_map, attribute_list, all_command_list)
+ if conformance_decision == ConformanceDecision.MANDATORY and attribute_id not in cluster.keys():
+ location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
+ self.record_error(self.get_test_name(), location=location,
+ problem=f'Attribute 0x{attribute_id:02x} is required, but is not present on the DUT')
+ success = False
+
+ def check_spec_conformance_for_commands(command_type: CommandType) -> bool:
+ success = True
+ # TODO: once IDM-10.1 lands, use the globals
+ global_attribute_id = 0xFFF9 if command_type == CommandType.ACCEPTED else 0xFFF8
+ xml_commands_dict = clusters[cluster_id].accepted_commands if command_type == CommandType.ACCEPTED else clusters[cluster_id].generated_commands
+ command_list = cluster[global_attribute_id]
+ for command_id in command_list:
+ location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
+ if command_id not in xml_commands_dict:
+ # TODO: Consolidate range checks with IDM-10.1 once that lands
+ if command_id <= 0xFF:
+ # manufacturer command
+ continue
+ self.record_error(self.get_test_name(), location=location,
+ problem='Standard command found on device, but not in spec')
+ success = False
+ continue
+ xml_command = xml_commands_dict[command_id]
+ conformance_decision = xml_command.conformance(feature_map, attribute_list, all_command_list)
+ if not conformance_allowed(conformance_decision, allow_provisional):
+ self.record_error(self.get_test_name(), location=location,
+ problem=f'Command 0x{command_id:02x} is included, but disallowed by conformance')
+ success = False
+ for command_id, xml_command in xml_commands_dict.items():
+ conformance_decision = xml_command.conformance(feature_map, attribute_list, all_command_list)
+ if conformance_decision == ConformanceDecision.MANDATORY and command_id not in command_list:
+ location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
+ self.record_error(self.get_test_name(), location=location,
+ problem=f'Command 0x{command_id:02x} is required, but is not present on the DUT')
+ success = False
+ return success
+
+ # Command conformance checking
+ cmd_success = check_spec_conformance_for_commands(CommandType.ACCEPTED)
+ success = False if not cmd_success else success
+ cmd_success = check_spec_conformance_for_commands(CommandType.GENERATED)
+ success = False if not cmd_success else success
+
+ # TODO: Add choice checkers
+
+ if not success:
+ # TODO: Right now, we have failures in all-cluster, so we can't fail this test and keep it in CI. For now, just log.
+ # Issue tracking: #29812
+ # self.fail_current_test("Problems with conformance")
+ logging.error("Problems found with conformance, this should turn into a test failure once #29812 is resolved")
+
if __name__ == "__main__":
default_matter_test_main()
diff --git a/src/python_testing/TestConformanceSupport.py b/src/python_testing/TestConformanceSupport.py
new file mode 100644
index 00000000000000..53f9e885ff9449
--- /dev/null
+++ b/src/python_testing/TestConformanceSupport.py
@@ -0,0 +1,575 @@
+#
+# Copyright (c) 2023 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import xml.etree.ElementTree as ElementTree
+
+from conformance_support import ConformanceDecision, ConformanceParseParameters, parse_callable_from_xml
+from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
+from mobly import asserts
+
+
+class TestConformanceSupport(MatterBaseTest):
+ @async_test_body
+ async def setup_class(self):
+ super().setup_class()
+ # a small feature map
+ self.feature_names_to_bits = {'AB': 0x01, 'CD': 0x02}
+
+ # none, AB, CD, AB&CD
+ self.feature_maps = [0x00, 0x01, 0x02, 0x03]
+ self.has_ab = [False, True, False, True]
+ self.has_cd = [False, False, True, True]
+
+ self.attribute_names_to_values = {'attr1': 0x00, 'attr2': 0x01}
+ self.attribute_lists = [[], [0x00], [0x01], [0x00, 0x01]]
+ self.has_attr1 = [False, True, False, True]
+ self.has_attr2 = [False, False, True, True]
+
+ self.command_names_to_values = {'cmd1': 0x00, 'cmd2': 0x01}
+ self.cmd_lists = [[], [0x00], [0x01], [0x00, 0x01]]
+ self.has_cmd1 = [False, True, False, True]
+ self.has_cmd2 = [False, False, True, True]
+ self.params = ConformanceParseParameters(
+ feature_map=self.feature_names_to_bits, attribute_map=self.attribute_names_to_values, command_map=self.command_names_to_values)
+
+ @async_test_body
+ async def test_conformance_mandatory(self):
+ xml = ''
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for f in self.feature_maps:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+
+ @async_test_body
+ async def test_conformance_optional(self):
+ xml = ''
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for f in self.feature_maps:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL)
+
+ @async_test_body
+ async def test_conformance_disallowed(self):
+ xml = ''
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for f in self.feature_maps:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.DISALLOWED)
+
+ xml = ''
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for f in self.feature_maps:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.DISALLOWED)
+
+ @async_test_body
+ async def test_conformance_provisional(self):
+ xml = ''
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for f in self.feature_maps:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.PROVISIONAL)
+
+ @async_test_body
+ async def test_conformance_mandatory_on_condition(self):
+ xml = (''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if self.has_ab[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ xml = (''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if self.has_cd[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ # single attribute mandatory
+ xml = (''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, a in enumerate(self.attribute_lists):
+ if self.has_attr1[i]:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE)
+
+ xml = (''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, a in enumerate(self.attribute_lists):
+ if self.has_attr2[i]:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE)
+
+ # test command in optional and in boolean - this is the same as attribute essentially, so testing every permutation is overkill
+
+ @async_test_body
+ async def test_conformance_optional_on_condition(self):
+ # single feature optional
+ xml = (''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if self.has_ab[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ xml = (''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if self.has_cd[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ # single attribute optional
+ xml = (''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, a in enumerate(self.attribute_lists):
+ if self.has_attr1[i]:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE)
+
+ xml = (''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, a in enumerate(self.attribute_lists):
+ if self.has_attr2[i]:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE)
+
+ # single command optional
+ xml = (''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, c in enumerate(self.cmd_lists):
+ if self.has_cmd1[i]:
+ asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.NOT_APPLICABLE)
+
+ xml = (''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, c in enumerate(self.cmd_lists):
+ if self.has_cmd2[i]:
+ asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.NOT_APPLICABLE)
+
+ @async_test_body
+ async def test_conformance_not_term_mandatory(self):
+ # single feature not mandatory
+ xml = (''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if not self.has_ab[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ xml = (''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if not self.has_cd[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ # single attribute not mandatory
+ xml = (''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, a in enumerate(self.attribute_lists):
+ if not self.has_attr1[i]:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE)
+
+ xml = (''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, a in enumerate(self.attribute_lists):
+ if not self.has_attr2[i]:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE)
+
+ @async_test_body
+ async def test_conformance_not_term_optional(self):
+ # single feature not optional
+ xml = (''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if not self.has_ab[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ xml = (''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if not self.has_cd[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ @async_test_body
+ async def test_conformance_and_term(self):
+ # and term for features only
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if self.has_ab[i] and self.has_cd[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ # and term for attributes only
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, a in enumerate(self.attribute_lists):
+ if self.has_attr1[i] and self.has_attr2[i]:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE)
+
+ # and term for feature and attribute
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ for j, a in enumerate(self.attribute_lists):
+ if self.has_ab[i] and self.has_attr2[j]:
+ asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.NOT_APPLICABLE)
+
+ @async_test_body
+ async def test_conformance_or_term(self):
+ # or term feature only
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if self.has_ab[i] or self.has_cd[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ # or term attribute only
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, a in enumerate(self.attribute_lists):
+ if self.has_attr1[i] or self.has_attr2[i]:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE)
+
+ # or term feature and attribute
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ for j, a in enumerate(self.attribute_lists):
+ if self.has_ab[i] or self.has_attr2[j]:
+ asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.NOT_APPLICABLE)
+
+ @async_test_body
+ async def test_conformance_and_term_with_not(self):
+ # and term with not
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if not self.has_ab[i] and self.has_cd[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ @async_test_body
+ async def test_conformance_or_term_with_not(self):
+ # or term with not on second feature
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if self.has_ab[i] or not self.has_cd[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ # not around or term with
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if not (self.has_ab[i] or self.has_cd[i]):
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ @async_test_body
+ async def test_conformance_and_term_with_three_terms(self):
+ # and term with three features
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ ''
+ '')
+ self.feature_names_to_bits['EF'] = 0x04
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ # no features
+ asserts.assert_equal(xml_callable(0x00, [], []), ConformanceDecision.NOT_APPLICABLE)
+ # one feature
+ asserts.assert_equal(xml_callable(0x01, [], []), ConformanceDecision.NOT_APPLICABLE)
+ # all features
+ asserts.assert_equal(xml_callable(0x07, [], []), ConformanceDecision.OPTIONAL)
+
+ # and term with one of each
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ for j, a in enumerate(self.attribute_lists):
+ for k, c in enumerate(self.cmd_lists):
+ if self.has_ab[i] and self.has_attr1[j] and self.has_cmd1[k]:
+ asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.NOT_APPLICABLE)
+
+ @async_test_body
+ async def test_conformance_or_term_with_three_terms(self):
+ # or term with three features
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ # no features
+ asserts.assert_equal(xml_callable(0x00, [], []), ConformanceDecision.NOT_APPLICABLE)
+ # one feature
+ asserts.assert_equal(xml_callable(0x01, [], []), ConformanceDecision.OPTIONAL)
+ # all features
+ asserts.assert_equal(xml_callable(0x07, [], []), ConformanceDecision.OPTIONAL)
+
+ # or term with one of each
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ for j, a in enumerate(self.attribute_lists):
+ for k, c in enumerate(self.cmd_lists):
+ if self.has_ab[i] or self.has_attr1[j] or self.has_cmd1[k]:
+ asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.NOT_APPLICABLE)
+
+ def test_conformance_otherwise(self):
+ # AB, O
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if self.has_ab[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL)
+
+ # AB, [CD]
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if self.has_ab[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+ elif self.has_cd[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE)
+
+ # AB & !CD, P
+ xml = (''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ '')
+ et = ElementTree.fromstring(xml)
+ xml_callable = parse_callable_from_xml(et, self.params)
+ for i, f in enumerate(self.feature_maps):
+ if self.has_ab[i] and not self.has_cd[i]:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY)
+ else:
+ asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.PROVISIONAL)
+
+
+if __name__ == "__main__":
+ default_matter_test_main()
diff --git a/src/python_testing/conformance_support.py b/src/python_testing/conformance_support.py
new file mode 100644
index 00000000000000..2dabb584c9d0f7
--- /dev/null
+++ b/src/python_testing/conformance_support.py
@@ -0,0 +1,263 @@
+#
+# Copyright (c) 2023 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import xml.etree.ElementTree as ElementTree
+from dataclasses import dataclass
+from enum import Enum, auto
+from typing import Callable
+
+from chip.tlv import uint
+
+OTHERWISE_CONFORM = 'otherwiseConform'
+OPTIONAL_CONFORM = 'optionalConform'
+PROVISIONAL_CONFORM = 'provisionalConform'
+MANDATORY_CONFORM = 'mandatoryConform'
+DEPRECATE_CONFORM = 'deprecateConform'
+DISALLOW_CONFORM = 'disallowConform'
+AND_TERM = 'andTerm'
+OR_TERM = 'orTerm'
+NOT_TERM = 'notTerm'
+FEATURE_TAG = 'feature'
+ATTRIBUTE_TAG = 'attribute'
+COMMAND_TAG = 'command'
+
+
+class ConformanceException(Exception):
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __str__(self):
+ return f"ConformanceException({self.msg})"
+
+
+class ConformanceDecision(Enum):
+ MANDATORY = auto()
+ OPTIONAL = auto()
+ NOT_APPLICABLE = auto()
+ DISALLOWED = auto()
+ PROVISIONAL = auto()
+
+
+@dataclass
+class ConformanceParseParameters:
+ feature_map: dict[str, uint]
+ attribute_map: dict[str, uint]
+ command_map: dict[str, uint]
+
+
+def conformance_allowed(conformance_decision: ConformanceDecision, allow_provisional: bool):
+ if conformance_decision == ConformanceDecision.NOT_APPLICABLE or conformance_decision == ConformanceDecision.DISALLOWED:
+ return False
+ if conformance_decision == ConformanceDecision.PROVISIONAL:
+ return allow_provisional
+ return True
+
+
+def mandatory(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ return ConformanceDecision.MANDATORY
+
+
+def optional(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ return ConformanceDecision.OPTIONAL
+
+
+def deprecated(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ return ConformanceDecision.DISALLOWED
+
+
+def disallowed(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ return ConformanceDecision.DISALLOWED
+
+
+def provisional(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ return ConformanceDecision.PROVISIONAL
+
+
+def feature(requiredFeature: uint) -> Callable:
+ def feature_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ if requiredFeature & feature_map != 0:
+ return ConformanceDecision.MANDATORY
+ return ConformanceDecision.NOT_APPLICABLE
+ return feature_inner
+
+
+def attribute(requiredAttribute: uint) -> Callable:
+ def attribute_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ if requiredAttribute in attribute_list:
+ return ConformanceDecision.MANDATORY
+ return ConformanceDecision.NOT_APPLICABLE
+ return attribute_inner
+
+
+def command(requiredCommand: uint) -> Callable:
+ def command_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ if requiredCommand in all_command_list:
+ return ConformanceDecision.MANDATORY
+ return ConformanceDecision.NOT_APPLICABLE
+ return command_inner
+
+
+def optional_wrapper(op: Callable) -> Callable:
+ def optional_wrapper_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ decision = op(feature_map, attribute_list, all_command_list)
+ if decision == ConformanceDecision.MANDATORY or decision == ConformanceDecision.OPTIONAL:
+ return ConformanceDecision.OPTIONAL
+ elif decision == ConformanceDecision.NOT_APPLICABLE:
+ return ConformanceDecision.NOT_APPLICABLE
+ else:
+ raise ConformanceException(f'Optional wrapping invalid op {decision}')
+ return optional_wrapper_inner
+
+
+def mandatory_wrapper(op: Callable) -> Callable:
+ def mandatory_wrapper_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ return op(feature_map, attribute_list, all_command_list)
+ return mandatory_wrapper_inner
+
+
+def not_operation(op: Callable):
+ def not_operation_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ # not operations can't be used with anything that returns DISALLOWED
+ # not operations also can't be used with things that are optional
+ # ie, ![AB] doesn't make sense, nor does !O
+ decision = op(feature_map, attribute_list, all_command_list)
+ if decision == ConformanceDecision.OPTIONAL or decision == ConformanceDecision.DISALLOWED or decision == ConformanceDecision.PROVISIONAL:
+ raise ConformanceException('NOT operation on optional or disallowed item')
+ elif decision == ConformanceDecision.NOT_APPLICABLE:
+ return ConformanceDecision.MANDATORY
+ elif decision == ConformanceDecision.MANDATORY:
+ return ConformanceDecision.NOT_APPLICABLE
+ else:
+ raise ConformanceException('NOT called on item with non-conformance value')
+ return not_operation_inner
+
+
+def and_operation(op_list: list[Callable]) -> Callable:
+ def and_operation_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ for op in op_list:
+ decision = op(feature_map, attribute_list, all_command_list)
+ # and operations can't happen on optional or disallowed
+ if decision == ConformanceDecision.OPTIONAL or decision == ConformanceDecision.DISALLOWED or decision == ConformanceDecision.PROVISIONAL:
+ raise ConformanceException('AND operation on optional or disallowed item')
+ elif decision == ConformanceDecision.NOT_APPLICABLE:
+ return ConformanceDecision.NOT_APPLICABLE
+ elif decision == ConformanceDecision.MANDATORY:
+ continue
+ else:
+ raise ConformanceException('Oplist item returned non-conformance value')
+ return ConformanceDecision.MANDATORY
+ return and_operation_inner
+
+
+def or_operation(op_list: list[Callable]) -> Callable:
+ def or_operation_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ for op in op_list:
+ decision = op(feature_map, attribute_list, all_command_list)
+ if decision == ConformanceDecision.DISALLOWED or decision == ConformanceDecision.PROVISIONAL:
+ raise ConformanceException('OR operation on optional or disallowed item')
+ elif decision == ConformanceDecision.NOT_APPLICABLE:
+ continue
+ elif decision == ConformanceDecision.MANDATORY:
+ return ConformanceDecision.MANDATORY
+ elif decision == ConformanceDecision.OPTIONAL:
+ return ConformanceDecision.OPTIONAL
+ else:
+ raise ConformanceException('Oplist item returned non-conformance value')
+ return ConformanceDecision.NOT_APPLICABLE
+ return or_operation_inner
+
+# TODO: add xor operation once it's required
+# TODO: how would equal and unequal operations work here?
+
+
+def otherwise(op_list: list[Callable]) -> Callable:
+ def otherwise_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision:
+ # Otherwise operations apply from left to right. If any of them
+ # has a definite decision (optional, mandatory or disallowed), that is the one that applies
+ # Provisional items are meant to be marked as the first item in the list
+ # Deprecated items are either on their own, or follow an O as O,D.
+ # For O,D, optional applies (leftmost), but we should consider some way to warn here as well,
+ # possibly in another function
+ for op in op_list:
+ decision = op(feature_map, attribute_list, all_command_list)
+ if decision == ConformanceDecision.NOT_APPLICABLE:
+ continue
+ return decision
+ return ConformanceDecision.NOT_APPLICABLE
+ return otherwise_inner
+
+
+def parse_callable_from_xml(element: ElementTree.Element, params: ConformanceParseParameters) -> Callable:
+ if len(list(element)) == 0:
+ # no subchildren here, so this can only be mandatory, optional, provisional, deprecated, disallowed, feature or attribute
+ if element.tag == MANDATORY_CONFORM:
+ return mandatory
+ elif element.tag == OPTIONAL_CONFORM:
+ return optional
+ elif element.tag == PROVISIONAL_CONFORM:
+ return provisional
+ elif element.tag == DEPRECATE_CONFORM:
+ return deprecated
+ elif element.tag == DISALLOW_CONFORM:
+ return disallowed
+ elif element.tag == FEATURE_TAG:
+ try:
+ return feature(params.feature_map[element.get('name')])
+ except KeyError:
+ raise ConformanceException(f'Conformance specifies feature not in feature table: {element.get("name")}')
+ elif element.tag == ATTRIBUTE_TAG:
+ # Some command conformance tags are marked as attribute, so if this key isn't in attribute, try command
+ name = element.get('name')
+ if name in params.attribute_map:
+ return attribute(params.attribute_map[name])
+ elif name in params.command_map:
+ return command(params.command_map[name])
+ else:
+ raise ConformanceException(f'Conformance specifies attribute or command not in table: {name}')
+ elif element.tag == COMMAND_TAG:
+ return command(params.command_map[element.get('name')])
+ else:
+ raise ConformanceException(
+ f'Unexpected xml conformance element with no children {str(element.tag)} {str(element.attrib)}')
+
+ # First build the list, then create the callable for this element
+ ops = []
+ for sub in element:
+ ops.append(parse_callable_from_xml(sub, params))
+
+ # optional can be a wrapper as well as a standalone
+ # This can be any of the boolean operations, optional or otherwise
+ if element.tag == OPTIONAL_CONFORM:
+ if len(ops) > 1:
+ raise ConformanceException(f'OPTIONAL term found with more than one subelement {list(element)}')
+ return optional_wrapper(ops[0])
+ elif element.tag == MANDATORY_CONFORM:
+ if len(ops) > 1:
+ raise ConformanceException(f'MANDATORY term found with more than one subelement {list(element)}')
+ return mandatory_wrapper(ops[0])
+ elif element.tag == AND_TERM:
+ return and_operation(ops)
+ elif element.tag == OR_TERM:
+ return or_operation(ops)
+ elif element.tag == NOT_TERM:
+ if len(ops) > 1:
+ raise ConformanceException(f'NOT term found with more than one subelement {list(element)}')
+ return not_operation(ops[0])
+ elif element.tag == OTHERWISE_CONFORM:
+ return otherwise(ops)
+ else:
+ raise ConformanceException(f'Unexpected conformance tag with children {element}')
diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py
index a394952445de60..398a01f6cd9d17 100644
--- a/src/python_testing/matter_testing_support.py
+++ b/src/python_testing/matter_testing_support.py
@@ -333,6 +333,19 @@ class CommandPathLocation:
cluster_id: int
command_id: int
+
+@dataclass
+class ClusterPathLocation:
+ endpoint_id: int
+ cluster_id: int
+
+
+@dataclass
+class FeaturePathLocation:
+ endpoint_id: int
+ cluster_id: int
+ feature_code: str
+
# ProblemSeverity is not using StrEnum, but rather Enum, since StrEnum only
# appeared in 3.11. To make it JSON serializable easily, multiple inheritance
# from `str` is used. See https://stackoverflow.com/a/51976841.
@@ -347,7 +360,7 @@ class ProblemSeverity(str, Enum):
@dataclass
class ProblemNotice:
test_name: str
- location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation]
+ location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation]
severity: ProblemSeverity
problem: str
spec_location: str = ""
@@ -551,13 +564,13 @@ async def send_single_cmd(
def print_step(self, stepnum: typing.Union[int, str], title: str) -> None:
logging.info(f'***** Test Step {stepnum} : {title}')
- def record_error(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""):
+ def record_error(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation], problem: str, spec_location: str = ""):
self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.ERROR, problem, spec_location))
- def record_warning(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""):
+ def record_warning(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation], problem: str, spec_location: str = ""):
self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.WARNING, problem, spec_location))
- def record_note(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""):
+ def record_note(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation], problem: str, spec_location: str = ""):
self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.NOTE, problem, spec_location))
def get_setup_payload_info(self) -> SetupPayloadInfo:
diff --git a/src/python_testing/spec_parsing_support.py b/src/python_testing/spec_parsing_support.py
new file mode 100644
index 00000000000000..9e014aa95dd2aa
--- /dev/null
+++ b/src/python_testing/spec_parsing_support.py
@@ -0,0 +1,343 @@
+#
+# Copyright (c) 2023 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import glob
+import logging
+import os
+import xml.etree.ElementTree as ElementTree
+from copy import deepcopy
+from dataclasses import dataclass
+from enum import Enum, auto
+from typing import Callable
+
+from chip.tlv import uint
+from conformance_support import (DEPRECATE_CONFORM, DISALLOW_CONFORM, MANDATORY_CONFORM, OPTIONAL_CONFORM, OTHERWISE_CONFORM,
+ PROVISIONAL_CONFORM, ConformanceDecision, ConformanceException, ConformanceParseParameters,
+ or_operation, parse_callable_from_xml)
+from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, EventPathLocation,
+ FeaturePathLocation, ProblemNotice, ProblemSeverity)
+
+
+@dataclass
+class XmlFeature:
+ code: str
+ name: str
+ conformance: Callable[[uint], ConformanceDecision]
+
+
+@dataclass
+class XmlAttribute:
+ name: str
+ datatype: str
+ conformance: Callable[[uint], ConformanceDecision]
+
+
+@dataclass
+class XmlCommand:
+ name: str
+ conformance: Callable[[uint], ConformanceDecision]
+
+
+@dataclass
+class XmlEvent:
+ name: str
+ conformance: Callable[[uint], ConformanceDecision]
+
+
+@dataclass
+class XmlCluster:
+ name: str
+ revision: int
+ derived: str
+ feature_map: dict[str, uint]
+ attribute_map: dict[str, uint]
+ command_map: dict[str, uint]
+ features: dict[str, XmlFeature]
+ attributes: dict[uint, XmlAttribute]
+ accepted_commands: dict[uint, XmlCommand]
+ generated_commands: dict[uint, XmlCommand]
+ events: dict[uint, XmlEvent]
+
+
+class CommandType(Enum):
+ ACCEPTED = auto()
+ GENERATED = auto()
+
+
+def has_zigbee_conformance(conformance: ElementTree.Element) -> bool:
+ # For clusters, things with zigbee conformance can share IDs with the matter elements, so we don't want them
+
+ # TODO: it's actually possible for a thing to have a zigbee conformance AND to have other conformances, and we should check
+ # for that, but for now, this is fine because that hasn't happened in the cluster conformances YET.
+ # It does happen for device types, so we need to be careful there.
+ condition = conformance.iter('condition')
+ for c in condition:
+ try:
+ c.attrib['name'].lower() == "zigbee"
+ return True
+ except KeyError:
+ continue
+ return False
+
+
+class ClusterParser:
+ def __init__(self, cluster, cluster_id, name):
+ self._problems: list[ProblemNotice] = []
+ self._cluster = cluster
+ self._cluster_id = cluster_id
+ self._name = name
+
+ self._derived = None
+ try:
+ classification = next(cluster.iter('classification'))
+ hierarchy = classification.attrib['hierarchy']
+ if hierarchy.lower() == 'derived':
+ self._derived = classification.attrib['baseCluster']
+ except (KeyError, StopIteration):
+ self._derived = None
+
+ self.feature_elements = self.get_all_feature_elements()
+ self.attribute_elements = self.get_all_attribute_elements()
+ self.command_elements = self.get_all_command_elements()
+ self.event_elements = self.get_all_event_elements()
+ self.params = ConformanceParseParameters(feature_map=self.create_feature_map(), attribute_map=self.create_attribute_map(),
+ command_map=self.create_command_map())
+
+ def get_conformance(self, element: ElementTree.Element) -> ElementTree.Element:
+ for sub in element:
+ if sub.tag == OTHERWISE_CONFORM or sub.tag == MANDATORY_CONFORM or sub.tag == OPTIONAL_CONFORM or sub.tag == PROVISIONAL_CONFORM or sub.tag == DEPRECATE_CONFORM or sub.tag == DISALLOW_CONFORM:
+ return sub
+
+ # Conformance is missing, so let's record the problem and treat it as optional for lack of a better choice
+ if element.tag == 'feature':
+ location = FeaturePathLocation(endpoint_id=0, cluster_id=self._cluster_id, feature_code=element.attrib['code'])
+ elif element.tag == 'command':
+ location = CommandPathLocation(endpoint_id=0, cluster_id=self._cluster_id, command_id=element.attrib['id'])
+ elif element.tag == 'attribute':
+ location = AttributePathLocation(endpoint_id=0, cluster_id=self._cluster_id, attribute_id=element.attrib['id'])
+ elif element.tag == 'event':
+ location = EventPathLocation(endpoint_id=0, cluster_id=self._cluster_id, event_id=element.attrib['id'])
+ else:
+ location = ClusterPathLocation(endpoing_id=0, cluster_id=self._cluster_id)
+ self._problems.append(ProblemNotice(test_name='Spec XML parsing', location=location,
+ severity=ProblemSeverity.WARNING, problem='Unable to find conformance element'))
+
+ return ElementTree.Element(OPTIONAL_CONFORM)
+
+ def get_all_type(self, type_container: str, type_name: str, key_name: str) -> list[tuple[ElementTree.Element, ElementTree.Element]]:
+ ret = []
+ container_tags = self._cluster.iter(type_container)
+ for container in container_tags:
+ elements = container.iter(type_name)
+ for element in elements:
+ try:
+ element.attrib[key_name]
+ except KeyError:
+ # This is a conformance tag, which uses the same name
+ continue
+ conformance = self.get_conformance(element)
+ if has_zigbee_conformance(conformance):
+ continue
+ ret.append((element, conformance))
+ return ret
+
+ def get_all_feature_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]:
+ ''' Returns a list of features and their conformances'''
+ return self.get_all_type('features', 'feature', 'code')
+
+ def get_all_attribute_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]:
+ ''' Returns a list of attributes and their conformances'''
+ return self.get_all_type('attributes', 'attribute', 'id')
+
+ def get_all_command_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]:
+ ''' Returns a list of commands and their conformances '''
+ return self.get_all_type('commands', 'command', 'id')
+
+ def get_all_event_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]:
+ ''' Returns a list of events and their conformances'''
+ return self.get_all_type('events', 'event', 'id')
+
+ def create_feature_map(self) -> dict[str, uint]:
+ features = {}
+ for element, conformance in self.feature_elements:
+ features[element.attrib['code']] = 1 << int(element.attrib['bit'], 0)
+ return features
+
+ def create_attribute_map(self) -> dict[str, uint]:
+ attributes = {}
+ for element, conformance in self.attribute_elements:
+ attributes[element.attrib['name']] = int(element.attrib['id'], 0)
+ return attributes
+
+ def create_command_map(self) -> dict[str, uint]:
+ commands = {}
+ for element, conformance in self.command_elements:
+ commands[element.attrib['name']] = int(element.attrib['id'], 0)
+ return commands
+
+ def parse_conformance(self, conformance_xml: ElementTree.Element) -> Callable:
+ try:
+ return parse_callable_from_xml(conformance_xml, self.params)
+ except ConformanceException as ex:
+ # Just point to the general cluster, because something is mismatched, but it's not clear what
+ location = ClusterPathLocation(endpoint_id=0, cluster_id=self._cluster_id)
+ self._problems.append(ProblemNotice(test_name='Spec XML parsing', location=location,
+ severity=ProblemSeverity.WARNING, problem=str(ex)))
+ return None
+
+ def parse_features(self) -> dict[uint, XmlFeature]:
+ features = {}
+ for element, conformance_xml in self.feature_elements:
+ mask = 1 << int(element.attrib['bit'], 0)
+ conformance = self.parse_conformance(conformance_xml)
+ if conformance is None:
+ continue
+ features[mask] = XmlFeature(code=element.attrib['code'], name=element.attrib['name'],
+ conformance=conformance)
+ return features
+
+ def parse_attributes(self) -> dict[uint, XmlAttribute]:
+ attributes = {}
+ for element, conformance_xml in self.attribute_elements:
+ code = int(element.attrib['id'], 0)
+ # Some deprecated attributes don't have their types included, for now, lets just fallback to UNKNOWN
+ try:
+ datatype = element.attrib['type']
+ except KeyError:
+ datatype = 'UNKNOWN'
+ conformance = self.parse_conformance(conformance_xml)
+ if conformance is None:
+ continue
+ if code in attributes:
+ # This is one of those fun ones where two different rows have the same id and name, but differ in conformance and ranges
+ # I don't have a good way to relate the ranges to the conformance, but they're both acceptable, so let's just or them.
+ conformance = or_operation([conformance, attributes[code].conformance])
+ attributes[code] = XmlAttribute(name=element.attrib['name'], datatype=datatype,
+ conformance=conformance)
+ return attributes
+
+ def parse_commands(self, command_type: CommandType) -> dict[uint, XmlAttribute]:
+ commands = {}
+ for element, conformance_xml in self.command_elements:
+ code = int(element.attrib['id'], 0)
+ dir = CommandType.ACCEPTED
+ try:
+ if element.attrib['direction'].lower() == 'responsefromserver':
+ dir = CommandType.GENERATED
+ except KeyError:
+ pass
+ if dir != command_type:
+ continue
+ code = int(element.attrib['id'], 0)
+ conformance = self.parse_conformance(conformance_xml)
+ if conformance is None:
+ continue
+ if code in commands:
+ conformance = or_operation([conformance, commands[code].conformance])
+ commands[code] = XmlCommand(name=element.attrib['name'], conformance=conformance)
+ return commands
+
+ def parse_events(self) -> dict[uint, XmlAttribute]:
+ events = {}
+ for element, conformance_xml in self.event_elements:
+ code = int(element.attrib['id'], 0)
+ conformance = self.parse_conformance(conformance_xml)
+ if conformance is None:
+ continue
+ if code in events:
+ conformance = or_operation([conformance, events[code].conformance])
+ events[code] = XmlEvent(name=element.attrib['name'], conformance=conformance)
+ return events
+
+ def create_cluster(self) -> XmlCluster:
+ return XmlCluster(revision=self._cluster.attrib['revision'], derived=self._derived,
+ name=self._name, feature_map=self.params.feature_map,
+ attribute_map=self.params.attribute_map, command_map=self.params.command_map,
+ features=self.parse_features(),
+ attributes=self.parse_attributes(),
+ accepted_commands=self.parse_commands(CommandType.ACCEPTED),
+ generated_commands=self.parse_commands(CommandType.GENERATED),
+ events=self.parse_events())
+
+ def get_problems(self) -> list[ProblemNotice]:
+ return self._problems
+
+
+def build_xml_clusters() -> tuple[list[XmlCluster], list[ProblemNotice]]:
+ dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', 'data_model', 'clusters')
+ clusters: dict[int, XmlCluster] = {}
+ derived_clusters: dict[str, XmlCluster] = {}
+ ids_by_name = {}
+ problems = []
+ for xml in glob.glob(f"{dir}/*.xml"):
+ logging.info(f'Parsing file {xml}')
+ tree = ElementTree.parse(f'{xml}')
+ root = tree.getroot()
+ cluster = root.iter('cluster')
+ for c in cluster:
+ name = c.attrib['name']
+ if not c.attrib['id']:
+ # Fully derived clusters have no id, but also shouldn't appear on a device.
+ # We do need to keep them, though, because we need to update the derived
+ # clusters. We keep them in a special dict by name, so they can be thrown
+ # away later.
+ cluster_id = None
+ else:
+ cluster_id = int(c.attrib['id'], 0)
+ ids_by_name[name] = cluster_id
+
+ parser = ClusterParser(c, cluster_id, name)
+ new = parser.create_cluster()
+ problems = problems + parser.get_problems()
+
+ if cluster_id:
+ clusters[cluster_id] = new
+ else:
+ derived_clusters[name] = new
+
+ # We have the information now about which clusters are derived, so we need to fix them up. Apply first the base cluster,
+ # then add the specific cluster overtop
+ for id, c in clusters.items():
+ if c.derived:
+ base_name = c.derived
+ if base_name in ids_by_name:
+ base = clusters[ids_by_name[c.derived]]
+ else:
+ base = derived_clusters[base_name]
+
+ feature_map = deepcopy(base.feature_map)
+ feature_map.update(c.feature_map)
+ attribute_map = deepcopy(base.attribute_map)
+ attribute_map.update(c.attribute_map)
+ command_map = deepcopy(base.command_map)
+ command_map.update(c.command_map)
+ features = deepcopy(base.features)
+ features.update(c.features)
+ attributes = deepcopy(base.attributes)
+ attributes.update(c.attributes)
+ accepted_commands = deepcopy(base.accepted_commands)
+ accepted_commands.update(c.accepted_commands)
+ generated_commands = deepcopy(base.generated_commands)
+ generated_commands.update(c.generated_commands)
+ events = deepcopy(base.events)
+ events.update(c.events)
+ new = XmlCluster(revision=c.revision, derived=c.derived, name=c.name,
+ feature_map=feature_map, attribute_map=attribute_map, command_map=command_map,
+ features=features, attributes=attributes, accepted_commands=accepted_commands,
+ generated_commands=generated_commands, events=events)
+ clusters[id] = new
+ return clusters, problems