diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 64b33b0a025982..b882cce8330dff 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -500,6 +500,7 @@ jobs: scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_ACE_1_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --int-arg PIXIT.ACE.APPENDPOINT:1 PIXIT.ACE.APPDEVTYPEID:0x0100 --string-arg PIXIT.ACE.APPCLUSTER:OnOff PIXIT.ACE.APPATTRIBUTE:OnOff"' scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_ACE_1_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"' scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_CGEN_2_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"' + scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --script "src/python_testing/TestMatterTestingSupport.py"' - name: Uploading core files uses: actions/upload-artifact@v3 if: ${{ failure() && !env.ACT }} diff --git a/src/python_testing/TestMatterTestingSupport.py b/src/python_testing/TestMatterTestingSupport.py new file mode 100644 index 00000000000000..59476e033d4a52 --- /dev/null +++ b/src/python_testing/TestMatterTestingSupport.py @@ -0,0 +1,162 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import typing +from datetime import datetime, timedelta, timezone + +import chip.clusters as Clusters +from chip.clusters.Types import Nullable, NullValue +from chip.tlv import uint +from matter_testing_support import (MatterBaseTest, async_test_body, default_matter_test_main, parse_pics, type_matches, + utc_time_in_matter_epoch) +from mobly import asserts + + +def get_raw_type_list(): + test = Clusters.UnitTesting + struct = test.Structs.SimpleStruct() + struct_type = test.Structs.SimpleStruct + null_opt_struct = test.Structs.NullablesAndOptionalsStruct() + null_opt_struct_type = test.Structs.NullablesAndOptionalsStruct + double_nested_struct_list = test.Structs.DoubleNestedStructList() + double_nested_struct_list_type = test.Structs.DoubleNestedStructList + list_of_uints = [0, 1] + list_of_uints_type = typing.List[uint] + list_of_structs = [struct, struct] + list_of_structs_type = typing.List[struct_type] + list_of_double_nested_struct_list = [double_nested_struct_list, double_nested_struct_list] + list_of_double_nested_struct_list_type = typing.List[double_nested_struct_list_type] + + # Create a list with all the types and a list of the values that should match for that type + vals = {uint: [1], + str: ["str"], + struct_type: [struct], + null_opt_struct_type: [null_opt_struct], + double_nested_struct_list_type: [double_nested_struct_list], + list_of_uints_type: [list_of_uints], + list_of_structs_type: [list_of_structs], + list_of_double_nested_struct_list_type: [list_of_double_nested_struct_list]} + return vals + + +def test_type_matching_for_type(test_type, test_nullable: bool = False, test_optional: bool = False): + vals = get_raw_type_list() + + if test_nullable and test_optional: + match_type = typing.Union[Nullable, None, test_type] + elif test_nullable: + match_type = typing.Union[Nullable, test_type] + elif test_optional: + match_type = typing.Optional[test_type] + else: + match_type = test_type + + true_list = vals[test_type] + if test_nullable: + true_list.append(NullValue) + if test_optional: + true_list.append(None) + + del vals[test_type] + + # true_list is all the values that should match with the test type + for i in true_list: + asserts.assert_true(type_matches(i, match_type), "{} type checking failure".format(test_type)) + + # try every value in every type in the remaining dict - they should all fail + for v in vals.values(): + for i in v: + asserts.assert_false(type_matches(i, match_type), "{} falsely matched to type {}".format(i, match_type)) + + # Test the nullables or optionals that aren't supposed to work + if not test_nullable: + asserts.assert_false(type_matches(NullValue, match_type), "NullValue falsely matched to {}".format(match_type)) + + if not test_optional: + asserts.assert_false(type_matches(None, match_type), "None falsely matched to {}".format(match_type)) + + +def run_all_match_tests_for_type(test_type): + test_type_matching_for_type(test_type=test_type) + test_type_matching_for_type(test_type=test_type, test_nullable=True) + test_type_matching_for_type(test_type=test_type, test_optional=True) + test_type_matching_for_type(test_type=test_type, test_nullable=True, test_optional=True) + + +class TestMatterTestingSupport(MatterBaseTest): + @async_test_body + async def test_matter_epoch_time(self): + # Matter epoch should return zero + ret = utc_time_in_matter_epoch(datetime(2000, 1, 1, 0, 0, 0, 0, timezone.utc)) + asserts.assert_equal(ret, 0, "UTC epoch returned non-zero value") + + # Jan 2 is exactly 1 day after Jan 1 + ret = utc_time_in_matter_epoch(datetime(2000, 1, 2, 0, 0, 0, 0, timezone.utc)) + expected_delay = timedelta(days=1) + actual_delay = timedelta(microseconds=ret) + asserts.assert_equal(expected_delay, actual_delay, "Calculation for Jan 2 date is incorrect") + + # There's a catch 22 for knowing the current time, but we can check that it's + # going up, and that it's larger than when I wrote the test + # Check that the returned value is larger than the test writing date + writing_date = utc_time_in_matter_epoch(datetime(2023, 5, 5, 0, 0, 0, 0, timezone.utc)) + current_date = utc_time_in_matter_epoch() + asserts.assert_greater(current_date, writing_date, "Calculation for current date is smaller than writing date") + + # Check that the time is going up + last_date = current_date + current_date = utc_time_in_matter_epoch() + asserts.assert_greater(current_date, last_date, "Time does not appear to be incrementing") + + @async_test_body + async def test_type_checking(self): + vals = get_raw_type_list() + for k in vals.keys(): + run_all_match_tests_for_type(k) + + @async_test_body + async def test_pics_support(self): + pics_list = ['TEST.S.A0000=1', + 'TEST.S.A0001=0', + 'lower.s.a0000=1', + '', + ' ', + '# comment', + ' # comment', + ' SPACE.S.A0000 = 1'] + pics = parse_pics(pics_list) + # force the parsed pics here to be in the config so we can check the check_pics function + self.matter_test_config.pics = pics + + asserts.assert_true(self.check_pics("TEST.S.A0000"), "PICS parsed incorrectly for TEST.S.A0000") + asserts.assert_false(self.check_pics("TEST.S.A0001"), "PICS parsed incorrectly for TEST.S.A0001") + asserts.assert_true(self.check_pics("LOWER.S.A0000"), "PICS pased incorrectly for LOWER.S.A0000") + asserts.assert_true(self.check_pics("SPACE.S.A0000"), "PICS parsed incorrectly for SPACE.S.A0000") + asserts.assert_false(self.check_pics("NOT.S.A0000"), "PICS parsed incorrectly for NOT.S.A0000") + asserts.assert_true(self.check_pics(" test.s.a0000"), "PICS checker lowercase handled incorrectly") + + # invalid pics file should throw a value error + pics_list.append("BAD.S.A000=5") + try: + pics = parse_pics(pics_list) + asserts.assert_false(True, "PICS parser did not throw an error as expected") + except ValueError: + pass + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index c4a73c0179d33b..1e2e40a6b4ab3d 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -24,12 +24,16 @@ import pathlib import re import sys +import typing import uuid from binascii import hexlify, unhexlify from dataclasses import asdict as dataclass_asdict from dataclasses import dataclass, field +from datetime import datetime, timezone from typing import List, Optional, Tuple +from chip.tlv import float32, uint + # isort: off from chip import ChipDeviceCtrl # Needed before chip.FabricAdmin @@ -117,6 +121,68 @@ def get_default_paa_trust_store(root_path: pathlib.Path) -> pathlib.Path: return pathlib.Path.cwd() +def parse_pics(lines=typing.List[str]) -> dict[str, bool]: + pics = {} + for raw in lines: + line, _, _ = raw.partition("#") + line = line.strip() + + if not line: + continue + + key, _, val = line.partition("=") + val = val.strip() + if val not in ["1", "0"]: + raise ValueError('PICS {} must have a value of 0 or 1'.format(key)) + + pics[key.strip().upper()] = (val == "1") + return pics + + +def read_pics_from_file(filename: str) -> dict[str, bool]: + """ Reads a dictionary of PICS from a file. """ + with open(filename, 'r') as f: + lines = f.readlines() + return parse_pics(lines) + + +def type_matches(received_value, desired_type): + """ Checks if the value received matches the expected type. + + Handles unpacking Nullable and Optional types and + compares list value types for non-empty lists. + """ + if typing.get_origin(desired_type) == typing.Union: + return any(type_matches(received_value, t) for t in typing.get_args(desired_type)) + elif typing.get_origin(desired_type) == list: + if isinstance(received_value, list): + # Assume an empty list is of the correct type + return True if received_value == [] else any(type_matches(received_value[0], t) for t in typing.get_args(desired_type)) + else: + return False + elif desired_type == uint: + return isinstance(received_value, int) and received_value >= 0 + elif desired_type == float32: + return isinstance(received_value, float) + else: + return isinstance(received_value, desired_type) + + +def utc_time_in_matter_epoch(desired_datetime: datetime = None): + """ Returns the time in matter epoch in us. + + If desired_datetime is None, it will return the current time. + """ + if desired_datetime is None: + utc_native = datetime.now(tz=timezone.utc) + else: + utc_native = desired_datetime + # Matter epoch is 0 hours, 0 minutes, 0 seconds on Jan 1, 2000 UTC + utc_th_delta = utc_native - datetime(2000, 1, 1, 0, 0, 0, 0, timezone.utc) + utc_th_us = int(utc_th_delta.total_seconds() * 1000000) + return utc_th_us + + @dataclass class MatterTestConfig: storage_path: pathlib.Path = None @@ -266,6 +332,11 @@ def certificate_authority_manager(self) -> chip.CertificateAuthority.Certificate def dut_node_id(self) -> int: return self.matter_test_config.dut_node_id[0] + def check_pics(self, pics_key: str) -> bool: + picsd = self.matter_test_config.pics + pics_key = pics_key.strip().upper() + return pics_key in picsd and picsd[pics_key] + async def read_single_attribute( self, dev_ctrl: ChipDeviceCtrl, node_id: int, endpoint: int, attribute: object, fabricFiltered: bool = True) -> object: result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)], fabricFiltered=fabricFiltered) @@ -273,7 +344,7 @@ async def read_single_attribute( return list(data.values())[0][attribute] async def read_single_attribute_check_success( - self, cluster: object, attribute: object, + self, cluster: Clusters.ClusterObjects.ClusterCommand, attribute: Clusters.ClusterObjects.ClusterAttributeDescriptor, dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0) -> object: if dev_ctrl is None: dev_ctrl = self.default_controller @@ -285,6 +356,9 @@ async def read_single_attribute_check_success( err_msg = "Error reading {}:{}".format(str(cluster), str(attribute)) asserts.assert_true(attr_ret is not None, err_msg) asserts.assert_false(isinstance(attr_ret, Clusters.Attribute.ValueDecodeFailure), err_msg) + desired_type = attribute.attribute_type.Type + asserts.assert_true(type_matches(attr_ret, desired_type), + 'Returned attribute {} is wrong type expected {}, got {}'.format(attribute, desired_type, type(attr_ret))) return attr_ret async def read_single_attribute_expect_error( @@ -304,6 +378,18 @@ async def read_single_attribute_expect_error( asserts.assert_equal(attr_ret.Reason.status, error, err_msg) return attr_ret + async def send_single_cmd( + self, cmd: Clusters.ClusterObjects.ClusterCommand, + dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0, + timedRequestTimeoutMs: typing.Union[None, int] = None) -> object: + if dev_ctrl is None: + dev_ctrl = self.default_controller + if node_id is None: + node_id = self.dut_node_id + + result = await dev_ctrl.SendCommand(nodeid=node_id, endpoint=endpoint, payload=cmd, timedRequestTimeoutMs=timedRequestTimeoutMs) + return result + def print_step(self, stepnum: int, title: str) -> None: logging.info('***** Test Step %d : %s', stepnum, title) @@ -567,6 +653,7 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig: config.logs_path = pathlib.Path(_DEFAULT_LOG_PATH) if args.logs_path is None else args.logs_path config.paa_trust_store_path = args.paa_trust_store_path config.ble_interface_id = args.ble_interface_id + config.pics = {} if args.PICS is None else read_pics_from_file(args.PICS) config.controller_node_id = args.controller_node_id @@ -616,6 +703,7 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig: metavar='NODE_ID', default=[_DEFAULT_DUT_NODE_ID], help='Node ID for primary DUT communication, ' 'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID, nargs="+") + basic_group.add_argument("--PICS", help="PICS file path", type=str) commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node")