Skip to content

Commit

Permalink
Merge branch 'master' into feature/add-validation-logic
Browse files Browse the repository at this point in the history
  • Loading branch information
lazarkov authored Jul 26, 2024
2 parents a3ffe06 + 95bf668 commit a5fdd43
Show file tree
Hide file tree
Showing 14 changed files with 253 additions and 16 deletions.
4 changes: 3 additions & 1 deletion BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ if (current_toolchain != "${dir_pw_toolchain}/default:default") {
"//examples/common/pigweed/rpc_console/py:chip_rpc",
"//integrations/mobly:chip_mobly",
"//scripts/py_matter_yamltests:matter_yamltests",
"//src/python_testing/matter_testing_infrastructure:metadata_parser",
]

pw_python_venv("matter_build_venv") {
Expand Down Expand Up @@ -107,6 +108,7 @@ if (current_toolchain != "${dir_pw_toolchain}/default:default") {
deps = [
"${chip_root}/scripts:matter_yamltests_distribution.wheel",
"${chip_root}/src/controller/python:chip-repl",
"${chip_root}/src/python_testing/matter_testing_infrastructure:metadata_parser.wheel",
]
if (enable_pylib) {
deps += [ "${chip_root}/src/pybindings/pycontroller" ]
Expand Down Expand Up @@ -234,8 +236,8 @@ if (current_toolchain != "${dir_pw_toolchain}/default:default") {
"//scripts/build:build_examples.tests",
"//scripts/py_matter_idl:matter_idl.tests",
"//scripts/py_matter_yamltests:matter_yamltests.tests",
"//scripts/tests/py:metadata_parser.tests",
"//src:tests_run",
"//src/python_testing/matter_testing_infrastructure:metadata_parser.tests",
]

if (current_os == "linux" || current_os == "mac") {
Expand Down
3 changes: 3 additions & 0 deletions scripts/build_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ else
WHEEL=("$OUTPUT_ROOT"/controller/python/chip*.whl)
fi

# Add the matter_testing_infrastructure wheel
WHEEL+=("$OUTPUT_ROOT"/python/obj/src/python_testing/matter_testing_infrastructure/metadata_parser._build_wheel/metadata_parser-*.whl)

if [ -n "$extra_packages" ]; then
WHEEL+=("$extra_packages")
fi
Expand Down
2 changes: 1 addition & 1 deletion scripts/tests/run_python_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import click
import coloredlogs
from colorama import Fore, Style
from py.metadata import Metadata, MetadataReader
from metadata_parser.metadata import Metadata, MetadataReader

DEFAULT_CHIP_ROOT = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..'))
Expand Down
99 changes: 95 additions & 4 deletions src/python_testing/TC_DeviceConformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# test-runner-run/run1/script-args: --storage-path admin_storage.json --manual-code 10054912339 --bool-arg ignore_in_progress:True allow_provisional:True --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --tests test_TC_IDM_10_2
# === END CI TEST ARGUMENTS ===

# TODO: Enable 10.5 in CI once the door lock OTA requestor problem is sorted.
from typing import Callable

import chip.clusters as Clusters
Expand All @@ -35,16 +36,19 @@
from choice_conformance_support import (evaluate_attribute_choice_conformance, evaluate_command_choice_conformance,
evaluate_feature_choice_conformance)
from conformance_support import ConformanceDecision, conformance_allowed
from global_attribute_ids import GlobalAttributeIds
from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, ProblemNotice,
ProblemSeverity, async_test_body, default_matter_test_main)
from spec_parsing_support import CommandType, build_xml_clusters
from global_attribute_ids import (ClusterIdType, DeviceTypeIdType, GlobalAttributeIds, cluster_id_type, device_type_id_type,
is_valid_device_type_id)
from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, DeviceTypePathLocation,
MatterBaseTest, ProblemNotice, ProblemSeverity, async_test_body, default_matter_test_main)
from spec_parsing_support import CommandType, build_xml_clusters, build_xml_device_types


class DeviceConformanceTests(BasicCompositionTests):
async def setup_class_helper(self):
await super().setup_class_helper()
self.xml_clusters, self.problems = build_xml_clusters()
self.xml_device_types, problems = build_xml_device_types()
self.problems.extend(problems)

def check_conformance(self, ignore_in_progress: bool, is_ci: bool):
problems = []
Expand Down Expand Up @@ -245,6 +249,86 @@ def record_warning(location, problem):

return success, problems

def check_device_type(self, fail_on_extra_clusters: bool = True, allow_provisional: bool = False) -> tuple[bool, list[ProblemNotice]]:
success = True
problems = []

def record_problem(location, problem, severity):
problems.append(ProblemNotice("IDM-10.5", location, severity, problem, ""))

def record_error(location, problem):
nonlocal success
record_problem(location, problem, ProblemSeverity.ERROR)
success = False

def record_warning(location, problem):
record_problem(location, problem, ProblemSeverity.WARNING)

for endpoint_id, endpoint in self.endpoints.items():
if Clusters.Descriptor not in endpoint:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=Clusters.Descriptor.id)
record_error(location=location, problem='No descriptor cluster found on endpoint')
continue

device_type_list = endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList]
invalid_device_types = [x for x in device_type_list if not is_valid_device_type_id(device_type_id_type(x.deviceType))]
standard_device_types = [x for x in endpoint[Clusters.Descriptor]
[Clusters.Descriptor.Attributes.DeviceTypeList] if device_type_id_type(x.deviceType) == DeviceTypeIdType.kStandard]
endpoint_clusters = []
server_clusters = []
for device_type in invalid_device_types:
location = DeviceTypePathLocation(device_type_id=device_type.deviceType)
record_error(location=location, problem='Invalid device type ID (out of valid range)')

for device_type in standard_device_types:
device_type_id = device_type.deviceType
location = DeviceTypePathLocation(device_type_id=device_type_id)
if device_type_id not in self.xml_device_types.keys():
record_error(location=location, problem='Unknown device type ID in standard range')
continue

if device_type_id not in self.xml_device_types.keys():
location = DeviceTypePathLocation(device_type_id=device_type_id)
record_error(location=location, problem='Unknown device type')
continue

# TODO: check revision. Possibly in another test?

xml_device = self.xml_device_types[device_type_id]
# IDM 10.1 checks individual clusters for validity,
# so here we can ignore checks for invalid and manufacturer clusters.
server_clusters = [x for x in endpoint[Clusters.Descriptor]
[Clusters.Descriptor.Attributes.ServerList] if cluster_id_type(x) == ClusterIdType.kStandard]

# As a start, we are only checking server clusters
# TODO: check client clusters too?
for cluster_id, cluster_requirement in xml_device.server_clusters.items():
# Device type cluster conformances do not include any conformances based on cluster elements
conformance_decision_with_choice = cluster_requirement.conformance(0, [], [])
location = DeviceTypePathLocation(device_type_id=device_type_id, cluster_id=cluster_id)
if conformance_decision_with_choice.decision == ConformanceDecision.MANDATORY and cluster_id not in server_clusters:
record_error(location=location,
problem=f"Mandatory cluster {cluster_requirement.name} for device type {xml_device.name} is not present in the server list")
success = False

if cluster_id in server_clusters and not conformance_allowed(conformance_decision_with_choice, allow_provisional):
record_error(location=location,
problem=f"Disallowed cluster {cluster_requirement.name} found in server list for device type {xml_device.name}")
success = False
# If we want to check for extra clusters on the endpoint, we need to know the entire set of clusters in all the device type
# lists across all the device types on the endpoint.
endpoint_clusters += xml_device.server_clusters.keys()
if fail_on_extra_clusters:
fn = record_error
else:
fn = record_warning
extra_clusters = set(server_clusters) - set(endpoint_clusters)
for extra in extra_clusters:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=extra)
fn(location=location, problem=f"Extra cluster found on endpoint with device types {device_type_list}")

return success, problems


class TC_DeviceConformance(MatterBaseTest, DeviceConformanceTests):
@async_test_body
Expand All @@ -267,6 +351,13 @@ def test_TC_IDM_10_3(self):
if not success:
self.fail_current_test("Problems with cluster revision on at least one cluster")

def test_TC_IDM_10_5(self):
fail_on_extra_clusters = self.user_params.get("fail_on_extra_clusters", True)
success, problems = self.check_device_type(fail_on_extra_clusters)
self.problems.extend(problems)
if not success:
self.fail_current_test("Problems with Device type conformance on one or more endpoints")


if __name__ == "__main__":
default_matter_test_main()
149 changes: 144 additions & 5 deletions src/python_testing/TestSpecParsingDeviceType.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,27 @@
#
import xml.etree.ElementTree as ElementTree

import chip.clusters as Clusters
from chip.clusters import Attribute
from chip.tlv import uint
from conformance_support import conformance_allowed
from jinja2 import Template
from matter_testing_support import MatterBaseTest, default_matter_test_main
from mobly import asserts
from spec_parsing_support import build_xml_device_types, parse_single_device_type
from spec_parsing_support import build_xml_clusters, build_xml_device_types, parse_single_device_type
from TC_DeviceConformance import DeviceConformanceTests


class TestSpecParsingDeviceType(MatterBaseTest):

# This just tests that the current spec can be parsed without failures
def test_spec_device_parsing(self):
device_types, problems = build_xml_device_types()
self.problems += problems
for id, d in device_types.items():
for id, d in self.xml_device_types.items():
print(str(d))

def setup_class(self):
self.xml_clusters, self.xml_cluster_problems = build_xml_clusters()
self.xml_device_types, self.xml_device_types_problems = build_xml_device_types()

self.device_type_id = 0xBBEF
self.revision = 2
self.classification_class = "simple"
Expand Down Expand Up @@ -106,6 +111,140 @@ def test_bad_scope(self):
device_type, problems = parse_single_device_type(et)
asserts.assert_equal(len(problems), 1, "Device with no scope did not generate a problem notice")

# All these tests are based on the temp sensor device type because it is very simple
# it requires temperature measurement, identify and the base devices.
# Right now I'm not testing for binding condition.
# The test is entirely based on the descriptor cluster so that's all I'm populating here
# because it makes the test less complex to write.
def create_test(self, server_list: list[uint], no_descriptor: bool = False, bad_device_id: bool = False) -> DeviceConformanceTests:
self.test = DeviceConformanceTests()
self.test.xml_device_types = self.xml_device_types
self.test.xml_clusters = self.xml_clusters

if bad_device_id:
known_ids = list(self.test.xml_device_types.keys())
device_type_id = [a for a in range(min(known_ids), max(known_ids)) if a not in known_ids][0]
else:
device_type_id = 0x0302

resp = Attribute.AsyncReadTransaction.ReadResponse({}, [], {})
if no_descriptor:
resp.attributes = {1: {}}
else:
desc = Clusters.Descriptor
server_list_attr = Clusters.Descriptor.Attributes.ServerList
device_type_list_attr = Clusters.Descriptor.Attributes.DeviceTypeList
device_type_list = [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=device_type_id, revision=2)]
resp.attributes = {1: {desc: {device_type_list_attr: device_type_list, server_list_attr: server_list}}}
self.test.endpoints = resp.attributes

def create_good_device(self, device_type_id: int) -> DeviceConformanceTests:
self.test = DeviceConformanceTests()
self.test.xml_device_types = self.xml_device_types
self.test.xml_clusters = self.xml_clusters

resp = Attribute.AsyncReadTransaction.ReadResponse({}, [], {})
desc = Clusters.Descriptor
server_list_attr = Clusters.Descriptor.Attributes.ServerList
device_type_list_attr = Clusters.Descriptor.Attributes.DeviceTypeList
device_type_list = [Clusters.Descriptor.Structs.DeviceTypeStruct(
deviceType=device_type_id, revision=self.xml_device_types[device_type_id].revision)]
server_list = [k for k, v in self.xml_device_types[device_type_id].server_clusters.items(
) if conformance_allowed(v.conformance(0, [], []), False)]
resp.attributes = {1: {desc: {device_type_list_attr: device_type_list, server_list_attr: server_list}}}

self.test.endpoints = resp.attributes

# Test with temp sensor with temp sensor, identify and descriptor
def test_ts_minimal_clusters(self):
self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.Descriptor.id])
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_true(success, "Failure on Temperature Sensor device type test")

# Temp sensor with temp sensor, identify, descriptor, binding
def test_ts_minimal_with_binding(self):
self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.Binding.id, Clusters.Descriptor.id])
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_true(success, "Failure on Temperature Sensor device type test")
asserts.assert_false(problems, "Found problems on Temperature sensor device type test")

# Temp sensor with temp sensor, identify, descriptor, fixed label
def test_ts_minimal_with_label(self):
self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.FixedLabel.id, Clusters.Descriptor.id])
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_true(success, "Failure on Temperature Sensor device type test")
asserts.assert_false(problems, "Found problems on Temperature sensor device type test")

# Temp sensor with temp sensor, descriptor
def test_ts_missing_identify(self):
self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Descriptor.id])
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_equal(len(problems), 1, "Unexpected number of problems")
asserts.assert_false(success, "Unexpected success running test that should fail")

# endpoint 1 empty
def test_endpoint_missing_descriptor(self):
self.create_test([], no_descriptor=True)
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_equal(len(problems), 1, "Unexpected number of problems")
asserts.assert_false(success, "Unexpected success running test that should fail")

# Temp sensor with temp sensor, descriptor, identify, onoff
def test_ts_extra_cluster(self):
self.create_test([Clusters.TemperatureMeasurement.id, Clusters.Identify.id, Clusters.Descriptor.id, Clusters.OnOff.id])
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_equal(len(problems), 1, "Unexpected number of problems")
asserts.assert_false(success, "Unexpected success running test that should fail")

success, problems = self.test.check_device_type(fail_on_extra_clusters=False)
asserts.assert_equal(len(problems), 1, "Did not receive expected warning for extra clusters")
asserts.assert_true(success, "Unexpected failure")

def test_bad_device_type_id_device_type_test(self):
self.create_test([], bad_device_id=True)
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_equal(len(problems), 1, "Unexpected number of problems")
asserts.assert_false(success, "Unexpected success running test that should fail")

def test_all_device_types(self):
for id in self.xml_device_types.keys():
self.create_good_device(id)
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_false(problems, f"Unexpected problems on device type {id}")
asserts.assert_true(success, f"Unexpected failure on device type {id}")

def test_disallowed_cluster(self):
for id, dt in self.xml_device_types.items():
expected_problems = 0
self.create_good_device(id)
for cluster_id, cluster in dt.server_clusters.items():
if not conformance_allowed(cluster.conformance(0, [], []), False):
self.test.endpoints[1][Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList].append(cluster_id)
expected_problems += 1
if expected_problems == 0:
continue
success, problems = self.test.check_device_type(fail_on_extra_clusters=True)
if problems:
print(problems)
asserts.assert_equal(len(problems), expected_problems, "Unexpected number of problems")
asserts.assert_false(success, "Unexpected success running test that should fail")


if __name__ == "__main__":
default_matter_test_main()
4 changes: 3 additions & 1 deletion src/python_testing/conformance_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,9 @@ def __call__(self, feature_map: uint, attribute_list: list[uint], all_command_li
for op in self.op_list:
decision_with_choice = op(feature_map, attribute_list, all_command_list)
# and operations can't happen on optional or disallowed
if decision_with_choice.decision in [ConformanceDecision.OPTIONAL, ConformanceDecision.DISALLOWED, ConformanceDecision.PROVISIONAL]:
if decision_with_choice.decision == ConformanceDecision.OPTIONAL and all([type(op) == device_feature for op in self.op_list]):
return decision_with_choice
elif decision_with_choice.decision in [ConformanceDecision.OPTIONAL, ConformanceDecision.DISALLOWED, ConformanceDecision.PROVISIONAL]:
raise ConformanceException('AND operation on optional or disallowed item')
elif decision_with_choice.decision == ConformanceDecision.NOT_APPLICABLE:
return decision_with_choice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pw_python_package("metadata_parser") {
inputs = [ "env_test.yaml" ]

sources = [
"__init__.py",
"metadata.py",
"metadata_parser/__init__.py",
"metadata_parser/metadata.py",
]

tests = [ "test_metadata.py" ]
tests = [ "metadata_parser/test_metadata.py" ]
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit a5fdd43

Please sign in to comment.