From 551651e9a03d652fc1cad18bd5126055a717c14d Mon Sep 17 00:00:00 2001 From: Tennessee Carmel-Veilleux Date: Thu, 8 Jun 2023 00:05:30 -0400 Subject: [PATCH] Improvements in Python testing support (#27116) * Improvements in Python testing support Problem: - It was no longer possible to run `matter_testing_support.py`-based tests on already commissioned devices due to changes in command line logic that impacted fabric ID selection and broke stack init. - Some variables that contained multiple entries in `matter_test_support.py` were singular which was somewhat confusing and a change from the initial semantics where unique values only were allowed. Changes in this PR: - Pluralize required fields - Fix running tests without commissioning - Improve setup_payload decoder module API - Give access to raw TLV results for reads, not just subs Testing done: - Re-ran `TC_DA_1_7.py` with updates (thanks cecille@) - Re-ran `TC_CGEN_2_4.py` with updates. - Used updated APIs in a seperate branch (upcoming) * Restyled by isort * Fix TC_RR_1_1.py * Fix indent * Restyled by autopep8 --------- Co-authored-by: Restyled.io Co-authored-by: tennessee.carmelveilleux@gmail.com --- .../python/chip/clusters/Attribute.py | 9 +- .../chip/setup_payload/setup_payload.py | 55 ++++++- src/python_testing/TC_CGEN_2_4.py | 6 +- src/python_testing/TC_DA_1_7.py | 8 +- src/python_testing/TC_RR_1_1.py | 2 + src/python_testing/matter_testing_support.py | 142 ++++++++++-------- 6 files changed, 146 insertions(+), 76 deletions(-) diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index 5967cbc386de8f..13a529d14316ad 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -505,7 +505,7 @@ def OverrideLivenessTimeoutMs(self, timeoutMs: int): def GetReportingIntervalsSeconds(self) -> Tuple[int, int]: ''' - Retrieve the reporting intervals associated with an active subscription. + Retrieve the reporting intervals associated with an active subscription. This should only be called if we're of subscription interaction type and after a subscription has been established. ''' handle = chip.native.GetLibraryHandle() @@ -650,8 +650,9 @@ def _BuildEventIndex(): class AsyncReadTransaction: @dataclass class ReadResponse: - attributes: AttributeCache = None - events: List[ClusterEvent] = None + attributes: dict[Any, Any] + events: list[ClusterEvent] + tlvAttributes: dict[int, Any] def __init__(self, future: Future, eventLoop, devCtrl, returnClusterObject: bool): self._event_loop = eventLoop @@ -796,7 +797,7 @@ def _handleDone(self): self._future.set_exception(chip.exceptions.ChipStackError(self._resultError)) else: self._future.set_result(AsyncReadTransaction.ReadResponse( - attributes=self._cache.attributeCache, events=self._events)) + attributes=self._cache.attributeCache, events=self._events, tlvAttributes=self._cache.attributeTLVCache)) # # Decrement the ref on ourselves to match the increment that happened at allocation. diff --git a/src/controller/python/chip/setup_payload/setup_payload.py b/src/controller/python/chip/setup_payload/setup_payload.py index fd3d8e1db036b7..ea5f786fa6140d 100644 --- a/src/controller/python/chip/setup_payload/setup_payload.py +++ b/src/controller/python/chip/setup_payload/setup_payload.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 Project CHIP Authors +# Copyright (c) 2021-2023 Project CHIP Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ # from ctypes import CFUNCTYPE, c_char_p, c_int32, c_uint8 +from typing import Optional from chip.exceptions import ChipStackError from chip.native import GetLibraryHandle, NativeLibraryHandleMethodArguments @@ -65,6 +66,7 @@ def ParseManualPairingCode(self, manualPairingCode: str): return self + # DEPRECATED def PrintOnboardingCodes(self, passcode, vendorId, productId, discriminator, customFlow, capabilities, version): self.Clear() err = self.chipLib.pychip_SetupPayload_PrintOnboardingCodes( @@ -73,6 +75,7 @@ def PrintOnboardingCodes(self, passcode, vendorId, productId, discriminator, cus if err != 0: raise ChipStackError(err) + # DEPRECATED def Print(self): for name, value in self.attributes.items(): decorated_value = self.__DecorateValue(name, value) @@ -83,10 +86,12 @@ def Print(self): print( f"Vendor attribute '{tag:>3}': {self.vendor_attributes[tag]}") + # DEPRECATED def Clear(self): self.attributes.clear() self.vendor_attributes.clear() + # DEPRECATED def __DecorateValue(self, name, value): if name == "RendezvousInformation": rendezvous_methods = [] @@ -113,3 +118,51 @@ def __InitNativeFunctions(self, chipLib): setter.Set("pychip_SetupPayload_PrintOnboardingCodes", c_int32, [c_uint32, c_uint16, c_uint16, c_uint16, uint8_t, uint8_t, uint8_t]) + + # Getters from parsed contents. + # Prefer using the methods below to access setup payload information once parse. + + @property + def vendor_id(self) -> int: + return int(self.attributes.get("VendorID", "0")) + + @property + def product_id(self) -> int: + return int(self.attributes.get("ProductID", "0")) + + @property + def setup_passcode(self) -> int: + if "SetUpPINCode" not in self.attributes: + raise KeyError("Missing setup passcode in setup payload: parsing likely not yet done") + + return int(self.attributes["SetUpPINCode"]) + + @property + def long_discriminator(self) -> Optional[int]: + if "Long discriminator" not in self.attributes: + return None + + return int(self.attributes["Long discriminator"]) + + @property + def short_discriminator(self) -> Optional[int]: + if "Short discriminator" not in self.attributes: + return None + + return int(self.attributes["Short discriminator"]) + + @property + def commissioning_flow(self) -> int: + return int(self.attributes.get("CommissioningFlow", "0")) + + @property + def rendezvous_information(self) -> int: + return int(self.attributes.get("RendezvousInformation", "0")) + + @property + def supports_ble_commissioning(self) -> bool: + return (self.rendezvous_information & 0b010) != 0 + + @property + def supports_on_network_commissioning(self) -> bool: + return (self.rendezvous_information & 0b100) != 0 diff --git a/src/python_testing/TC_CGEN_2_4.py b/src/python_testing/TC_CGEN_2_4.py index 6c8e36b907bb00..682e4b02f927d8 100644 --- a/src/python_testing/TC_CGEN_2_4.py +++ b/src/python_testing/TC_CGEN_2_4.py @@ -32,7 +32,7 @@ class TC_CGEN_2_4(MatterBaseTest): def OpenCommissioningWindow(self) -> int: try: pin, code = self.th1.OpenCommissioningWindow( - nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=self.matter_test_config.discriminator[0], option=1) + nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=self.matter_test_config.discriminators[0], option=1) time.sleep(5) return pin, code @@ -51,7 +51,7 @@ async def CommissionToStageSendCompleteAndCleanup( self.th2.SetTestCommissionerPrematureCompleteAfter(stage) success, errcode = self.th2.CommissionOnNetwork( nodeId=self.dut_node_id, setupPinCode=pin, - filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.matter_test_config.discriminator[0]) + filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.matter_test_config.discriminators[0]) logging.info('Commissioning complete done. Successful? {}, errorcode = {}'.format(success, errcode)) asserts.assert_false(success, 'Commissioning complete did not error as expected') asserts.assert_true(errcode.sdk_part == expectedErrorPart, 'Unexpected error type returned from CommissioningComplete') @@ -91,7 +91,7 @@ async def test_TC_CGEN_2_4(self): self.th2.ResetTestCommissioner() success, errcode = self.th2.CommissionOnNetwork( nodeId=self.dut_node_id, setupPinCode=pin, - filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.matter_test_config.discriminator[0]) + filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.matter_test_config.discriminators[0]) logging.info('Commissioning complete done. Successful? {}, errorcode = {}'.format(success, errcode)) logging.info('Step 17 - TH1 sends an arm failsafe') diff --git a/src/python_testing/TC_DA_1_7.py b/src/python_testing/TC_DA_1_7.py index dafa50f0e21add..634dbb8885da2c 100644 --- a/src/python_testing/TC_DA_1_7.py +++ b/src/python_testing/TC_DA_1_7.py @@ -113,11 +113,13 @@ async def test_TC_DA_1_7(self): # On the CI, this doesn't make sense to do since all the examples use the same DAC # To specify more than 1 DUT, use a list of discriminators and passcodes allow_sdk_dac = self.user_params.get("allow_sdk_dac", False) + if allow_sdk_dac: + asserts.assert_equal(len(self.matter_test_config.discriminators), 1, "Only one device can be tested with SDK DAC") if not allow_sdk_dac: - asserts.assert_equal(len(self.matter_test_config.discriminator), 2, "This test requires 2 DUTs") + asserts.assert_equal(len(self.matter_test_config.discriminators), 2, "This test requires 2 DUTs") pk = [] - for i in range(len(self.matter_test_config.dut_node_id)): - pk.append(await self.single_DUT(i, self.matter_test_config.dut_node_id[i])) + for i in range(len(self.matter_test_config.dut_node_ids)): + pk.append(await self.single_DUT(i, self.matter_test_config.dut_node_ids[i])) asserts.assert_equal(len(pk), len(set(pk)), "Found matching public keys in different DUTs") diff --git a/src/python_testing/TC_RR_1_1.py b/src/python_testing/TC_RR_1_1.py index 895de247c22cb9..98db3c3aae5fbd 100644 --- a/src/python_testing/TC_RR_1_1.py +++ b/src/python_testing/TC_RR_1_1.py @@ -434,6 +434,8 @@ async def test_TC_RR_1_1(self): node_id=self.dut_node_id, endpoint=0, attribute=Clusters.GroupKeyManagement.Attributes.MaxGroupsPerFabric) + logging.info( + f"MaxGroupsPerFabric value: {indicated_max_groups_per_fabric}, number of endpoints with Groups clusters cluster: {counted_groups_clusters}, which are: {list(groups_cluster_endpoints.keys())}") if indicated_max_groups_per_fabric < 4 * counted_groups_clusters: asserts.fail("Failed Step 11: MaxGroupsPerFabric < 4 * counted_groups_clusters") diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index fd8bece2a64dd4..4c1e24e8eb8a21 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -195,45 +195,48 @@ def compare_time(received: int, offset: timedelta = timedelta(), utc: int = None @dataclass class MatterTestConfig: - storage_path: pathlib.Path = None - logs_path: pathlib.Path = None - paa_trust_store_path: pathlib.Path = None - ble_interface_id: int = None + storage_path: pathlib.Path = pathlib.Path(".") + logs_path: pathlib.Path = pathlib.Path(".") + paa_trust_store_path: Optional[pathlib.Path] = None + ble_interface_id: Optional[int] = None commission_only: bool = False admin_vendor_id: int = _DEFAULT_ADMIN_VENDOR_ID - case_admin_subject: int = None + case_admin_subject: Optional[int] = None global_test_params: dict = field(default_factory=dict) # List of explicit tests to run by name. If empty, all tests will run tests: List[str] = field(default_factory=list) - commissioning_method: str = None - discriminator: List[int] = None - setup_passcode: List[int] = None - commissionee_ip_address_just_for_testing: str = None + commissioning_method: Optional[str] = None + discriminators: Optional[List[int]] = None + setup_passcodes: Optional[List[int]] = None + commissionee_ip_address_just_for_testing: Optional[str] = None maximize_cert_chains: bool = False - qr_code_content: str = None - manual_code: str = None + qr_code_content: Optional[str] = None + manual_code: Optional[str] = None - wifi_ssid: str = None - wifi_passphrase: str = None - thread_operational_dataset: str = None + wifi_ssid: Optional[str] = None + wifi_passphrase: Optional[str] = None + thread_operational_dataset: Optional[str] = None + + pics: dict[bool, str] = field(default_factory=dict) # Node ID for basic DUT - dut_node_id: List[int] = None + dut_node_ids: Optional[List[int]] = None # Node ID to use for controller/commissioner controller_node_id: int = _DEFAULT_CONTROLLER_NODE_ID # CAT Tags for default controller/commissioner controller_cat_tags: List[int] = field(default_factory=list) # Fabric ID which to use - fabric_id: int = None + fabric_id: int = 1 + # "Alpha" by default root_of_trust_index: int = _DEFAULT_TRUST_ROOT_INDEX # If this is set, we will reuse root of trust keys at that location - chip_tool_credentials_path: pathlib.Path = None + chip_tool_credentials_path: Optional[pathlib.Path] = None class MatterStackState: @@ -340,7 +343,7 @@ def certificate_authority_manager(self) -> chip.CertificateAuthority.Certificate @property def dut_node_id(self) -> int: - return self.matter_test_config.dut_node_id[0] + return self.matter_test_config.dut_node_ids[0] def check_pics(self, pics_key: str) -> bool: picsd = self.matter_test_config.pics @@ -400,8 +403,8 @@ async def send_single_cmd( result = await dev_ctrl.SendCommand(nodeid=node_id, endpoint=endpoint, payload=cmd, timedRequestTimeoutMs=timedRequestTimeoutMs) return result - def print_step(self, stepnum: typing.Union[int, str], title: str) -> None: - logging.info('***** Test Step {} : {}'.format(stepnum, title)) + def print_step(self, stepnum: int, title: str) -> None: + logging.info('***** Test Step %d : %s', stepnum, title) def generate_mobly_test_config(matter_test_config: MatterTestConfig): @@ -456,6 +459,7 @@ def byte_string_from_hex(s: str) -> bytes: def int_from_manual_code(s: str) -> int: + s = s.replace('-', '') regex = r"^([0-9]{11}|[0-9]{21})$" match = re.match(regex, s) if not match: @@ -560,67 +564,71 @@ def root_index(s: str) -> int: def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConfig) -> bool: - config.dut_node_id = args.dut_node_id + config.root_of_trust_index = args.root_index + # Follow root of trust index if ID not provided to have same behavior as legacy + # chip-tool that fabricID == commissioner_name == root of trust index + config.fabric_id = args.fabric_id if args.fabric_id is not None else config.root_of_trust_index - if args.commissioning_method is None: - return True + if args.chip_tool_credentials_path is not None and not args.chip_tool_credentials_path.exists(): + print("error: chip-tool credentials path %s doesn't exist!" % args.chip_tool_credentials_path) + return False + config.chip_tool_credentials_path = args.chip_tool_credentials_path + + if args.dut_node_ids is None: + print("error: --dut-node-id is mandatory!") + return False + config.dut_node_ids = args.dut_node_ids config.commissioning_method = args.commissioning_method config.commission_only = args.commission_only - if args.dut_node_id is None: - print("error: When --commissioning-method present, --dut-node-id is mandatory!") - return False + # TODO: this should also allow multiple once QR and manual codes are supported. + config.qr_code_content = args.qr_code + if args.manual_code: + config.manual_code = "%d" % args.manual_code + else: + config.manual_code = None - if args.discriminator is None and (args.qr_code is None and args.manual_code is None): + if args.commissioning_method is None: + return True + + if args.discriminators is None and (args.qr_code is None and args.manual_code is None): print("error: Missing --discriminator when no --qr-code/--manual-code present!") return False - config.discriminator = args.discriminator + config.discriminators = args.discriminators - if args.passcode is None and (args.qr_code is None and args.manual_code is None): + if args.passcodes is None and (args.qr_code is None and args.manual_code is None): print("error: Missing --passcode when no --qr-code/--manual-code present!") return False - config.setup_passcode = args.passcode + config.setup_passcodes = args.passcodes if args.qr_code is not None and args.manual_code is not None: print("error: Cannot have both --qr-code and --manual-code present!") return False - if len(config.discriminator) != len(config.setup_passcode): + if len(config.discriminators) != len(config.setup_passcodes): print("error: supplied number of discriminators does not match number of passcodes") return False - if len(config.dut_node_id) > len(config.discriminator): + if len(config.dut_node_ids) > len(config.discriminators): print("error: More node IDs provided than discriminators") return False - if len(config.dut_node_id) < len(config.discriminator): - missing = len(config.discriminator) - len(config.dut_node_id) + if len(config.dut_node_ids) < len(config.discriminators): + missing = len(config.discriminators) - len(config.dut_node_ids) + # We generate new node IDs sequentially from the last one seen for all + # missing NodeIDs when commissioning many nodes at once. for i in range(missing): - config.dut_node_id.append(config.dut_node_id[-1] + 1) + config.dut_node_ids.append(config.dut_node_ids[-1] + 1) - if len(config.dut_node_id) != len(set(config.dut_node_id)): + if len(config.dut_node_ids) != len(set(config.dut_node_ids)): print("error: Duplicate values in node id list") return False - if len(config.discriminator) != len(set(config.discriminator)): + if len(config.discriminators) != len(set(config.discriminators)): print("error: Duplicate value in discriminator list") return False - # TODO: this should also allow multiple once QR and manual codes are supported. - config.qr_code_content = args.qr_code - config.manual_code = args.manual_code - - config.root_of_trust_index = args.root_index - # Follow root of trust index if ID not provided to have same behavior as legacy - # chip-tool that fabricID == commissioner_name == root of trust index - config.fabric_id = args.fabric_id if args.fabric_id is not None else config.root_of_trust_index - - if args.chip_tool_credentials_path is not None and not args.chip_tool_credentials_path.exists(): - print("error: chip-tool credentials path %s doesn't exist!" % args.chip_tool_credentials_path) - return False - config.chip_tool_credentials_path = args.chip_tool_credentials_path - if config.commissioning_method == "ble-wifi": if args.wifi_ssid is None: print("error: missing --wifi-ssid for --commissioning-method ble-wifi!") @@ -711,7 +719,7 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig: default=_DEFAULT_CONTROLLER_NODE_ID, help='NodeID to use for initial/default controller (default: %d)' % _DEFAULT_CONTROLLER_NODE_ID) basic_group.add_argument('-n', '--dut-node-id', type=int_decimal_or_hex, - metavar='NODE_ID', default=[_DEFAULT_DUT_NODE_ID], + metavar='NODE_ID', dest='dut_node_ids', default=[_DEFAULT_DUT_NODE_ID], help='Node ID for primary DUT communication, ' 'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID, nargs="+") basic_group.add_argument("--PICS", help="PICS file path", type=str) @@ -724,9 +732,13 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig: help='Name of commissioning method to use') commission_group.add_argument('-d', '--discriminator', type=int_decimal_or_hex, metavar='LONG_DISCRIMINATOR', + dest='discriminators', + default=[], help='Discriminator to use for commissioning', nargs="+") commission_group.add_argument('-p', '--passcode', type=int_decimal_or_hex, metavar='PASSCODE', + dest='passcodes', + default=[], help='PAKE passcode to use', nargs="+") commission_group.add_argument('-i', '--ip-addr', type=str, metavar='RAW_IP_ADDRESS', @@ -813,12 +825,12 @@ class CommissionDeviceTest(MatterBaseTest): def test_run_commissioning(self): conf = self.matter_test_config - for i in range(len(conf.dut_node_id)): + for commission_idx, node_id in enumerate(conf.dut_node_ids): logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" % - (conf.root_of_trust_index, conf.fabric_id, conf.dut_node_id[i])) + (conf.root_of_trust_index, conf.fabric_id, node_id)) logging.info("Commissioning method: %s" % conf.commissioning_method) - if not self._commission_device(i): + if not self._commission_device(commission_idx): raise signals.TestAbortAll("Failed to commission node") def _commission_device(self, i) -> bool: @@ -829,31 +841,31 @@ def _commission_device(self, i) -> bool: if conf.commissioning_method == "on-network": return dev_ctrl.CommissionOnNetwork( - nodeId=conf.dut_node_id[i], - setupPinCode=conf.setup_passcode[i], + nodeId=conf.dut_node_ids[i], + setupPinCode=conf.setup_passcodes[i], filterType=DiscoveryFilterType.LONG_DISCRIMINATOR, - filter=conf.discriminator[i] + filter=conf.discriminators[i] ) elif conf.commissioning_method == "ble-wifi": return dev_ctrl.CommissionWiFi( - conf.discriminator[i], - conf.setup_passcode[i], - conf.dut_node_id[i], + conf.discriminators[i], + conf.setup_passcodes[i], + conf.dut_node_ids[i], conf.wifi_ssid, conf.wifi_passphrase ) elif conf.commissioning_method == "ble-thread": return dev_ctrl.CommissionThread( - conf.discriminator[i], - conf.setup_passcode[i], - conf.dut_node_id[i], + conf.discriminators[i], + conf.setup_passcodes[i], + conf.dut_node_ids[i], conf.thread_operational_dataset ) elif conf.commissioning_method == "on-network-ip": logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====") return dev_ctrl.CommissionIP( ipaddr=conf.commissionee_ip_address_just_for_testing, - setupPinCode=conf.setup_passcode[i], nodeid=conf.dut_node_id[i] + setupPinCode=conf.setup_passcodes[i], nodeid=conf.dut_node_ids[i] ) else: raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)