From 628faa251fb1795a2a5e18da59d1cc8a1f6e2a28 Mon Sep 17 00:00:00 2001 From: Cecille Freeman Date: Tue, 30 May 2023 18:25:43 -0400 Subject: [PATCH 1/7] Python script: start of TH integration code - adds required command line functionality (endpoint, nodeID, timeout) - adds the ability to pass in a hooks object for use by the TH - adds an internal hooks object when scripts are being used on the command line - adds hook calls as appropriate at test start / end / steps To setup a test for use in the TH, define a steps_ and desc_ function with the same name as the test, change print_step() calls to step() calls with the appropriate number, add pics_guard calls around pics checks. Changes the hello_test to demonstrate how to do this and also how the test works if the steps_ function is not defined. Added a demonstration of how the TH could call these to get the hooks back out. --- src/python_testing/TC_ACE_1_3.py | 184 ++++++++---- src/python_testing/hello_external_runner.py | 142 +++++++++ src/python_testing/hello_test.py | 45 ++- src/python_testing/matter_testing_support.py | 295 ++++++++++++++++++- 4 files changed, 593 insertions(+), 73 deletions(-) create mode 100755 src/python_testing/hello_external_runner.py diff --git a/src/python_testing/TC_ACE_1_3.py b/src/python_testing/TC_ACE_1_3.py index a24db8aa18de67..bb6e6f7be707eb 100644 --- a/src/python_testing/TC_ACE_1_3.py +++ b/src/python_testing/TC_ACE_1_3.py @@ -19,7 +19,7 @@ import chip.clusters as Clusters from chip.interaction_model import Status -from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, TestStep from mobly import asserts @@ -46,6 +46,72 @@ async def read_descriptor_expect_unsupported_access(self, th): await self.read_single_attribute_expect_error( dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess) + def desc_TC_ACE_1_3(self) -> str: + return "" + + def steps_TC_ACE_1_3(self) -> list[TestStep]: + steps = [ + TestStep(1, "Commissioning, already done", is_commissioning=True), + TestStep(2, "TH0 writes ACL all view on PIXIT.ACE.TESTENDPOINT"), + TestStep(3, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(4, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(5, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(6, "TH0 writes ACL TH1 view on EP0"), + TestStep(7, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(8, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(9, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(10, "TH0 writes ACL TH2 view on EP0"), + TestStep(11, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(12, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(13, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(14, "TH0 writes ACL TH3 view on EP0"), + TestStep(15, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(16, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(17, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(18, "TH0 writes ACL TH1 TH2 view on EP0"), + TestStep(19, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(20, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(21, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(22, "TH0 writes ACL TH1 TH3 view on EP0"), + TestStep(23, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(24, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(25, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(26, "TH0 writes ACL TH2 TH3 view on EP0"), + TestStep(27, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(28, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(29, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(30, "TH0 writes ACL TH1 TH2 TH3 view on EP0"), + TestStep(31, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(32, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(33, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(34, "TH0 writes ACL cat1v1 view on EP0"), + TestStep(35, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(36, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(37, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(38, "TH0 writes ACL cat1v2 view on EP0"), + TestStep(39, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(40, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(41, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(42, "TH0 writes ACL cat1v3 view on EP0"), + TestStep(43, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(44, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(45, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(46, "TH0 writes ACL cat2v1 view on EP0"), + TestStep(47, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(48, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(49, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(50, "TH0 writes ACL cat2v2 view on EP0"), + TestStep(51, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(52, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(53, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(54, "TH0 writes ACL cat2v3 view on EP0"), + TestStep(55, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(56, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(57, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(58, "TH0 writes ACL back to default") + ] + return steps + @async_test_body async def test_TC_ACE_1_3(self): cat1_id = 0x11110000 @@ -59,7 +125,7 @@ async def test_TC_ACE_1_3(self): cat2v3 = cat2_id | 0x0003 logging.info('cat1v1 0x%x', cat1v1) - self.print_step(1, "Commissioning, already done") + self.step(1) fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0] @@ -78,7 +144,7 @@ async def test_TC_ACE_1_3(self): paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path), catTags=[cat1v1, cat2v2]) - self.print_step(2, "TH0 writes ACL all view on PIXIT.ACE.TESTENDPOINT") + self.step(2) TH0_admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -92,16 +158,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, all_view] await self.write_acl(acl) - self.print_step(3, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(3) await self.read_descriptor_expect_success(TH1) - self.print_step(4, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(4) await self.read_descriptor_expect_success(TH2) - self.print_step(5, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(5) await self.read_descriptor_expect_success(TH3) - self.print_step(6, "TH0 writes ACL TH1 view on EP0") + self.step(6) th1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -109,16 +175,16 @@ async def test_TC_ACE_1_3(self): targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, th1_view] await self.write_acl(acl) - self.print_step(7, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(7) await self.read_descriptor_expect_success(TH1) - self.print_step(8, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(8) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(9, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(9) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(10, "TH0 writes ACL TH2 view on EP0") + self.step(10) th2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -127,16 +193,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th2_view] await self.write_acl(acl) - self.print_step(11, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(11) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(12, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(12) await self.read_descriptor_expect_success(TH2) - self.print_step(13, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(13) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(14, "TH0 writes ACL TH3 view on EP0") + self.step(14) th3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -145,16 +211,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th3_view] await self.write_acl(acl) - self.print_step(15, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(15) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(16, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(16) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(17, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(17) await self.read_descriptor_expect_success(TH3) - self.print_step(18, "TH0 writes ACL TH1 TH2 view on EP0") + self.step(18) th12_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -163,16 +229,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th12_view] await self.write_acl(acl) - self.print_step(19, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(19) await self.read_descriptor_expect_success(TH1) - self.print_step(20, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(20) await self.read_descriptor_expect_success(TH2) - self.print_step(21, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(21) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(22, "TH0 writes ACL TH1 TH3 view on EP0") + self.step(22) th13_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -181,16 +247,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th13_view] await self.write_acl(acl) - self.print_step(23, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(23) await self.read_descriptor_expect_success(TH1) - self.print_step(24, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(24) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(25, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(25) await self.read_descriptor_expect_success(TH3) - self.print_step(26, "TH0 writes ACL TH2 TH3 view on EP0") + self.step(26) th23_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -199,16 +265,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th23_view] await self.write_acl(acl) - self.print_step(27, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(27) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(28, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(28) await self.read_descriptor_expect_success(TH2) - self.print_step(29, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(29) await self.read_descriptor_expect_success(TH3) - self.print_step(30, "TH0 writes ACL TH1 TH2 TH3 view on EP0") + self.step(30) th123_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -217,16 +283,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th123_view] await self.write_acl(acl) - self.print_step(31, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(31) await self.read_descriptor_expect_success(TH1) - self.print_step(32, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(32) await self.read_descriptor_expect_success(TH2) - self.print_step(33, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(33) await self.read_descriptor_expect_success(TH3) - self.print_step(34, "TH0 writes ACL cat1v1 view on EP0") + self.step(34) cat1v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -235,16 +301,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat1v1_view] await self.write_acl(acl) - self.print_step(35, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(35) await self.read_descriptor_expect_success(TH1) - self.print_step(36, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(36) await self.read_descriptor_expect_success(TH2) - self.print_step(37, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(37) await self.read_descriptor_expect_success(TH3) - self.print_step(38, "TH0 writes ACL cat1v2 view on EP0") + self.step(38) cat1v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -254,16 +320,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat1v2_view] await self.write_acl(acl) - self.print_step(39, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(39) await self.read_descriptor_expect_success(TH1) - self.print_step(40, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(40) await self.read_descriptor_expect_success(TH2) - self.print_step(41, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(41) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(42, "TH0 writes ACL cat1v3 view on EP0") + self.step(42) cat1v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -273,16 +339,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat1v3_view] await self.write_acl(acl) - self.print_step(43, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(43) await self.read_descriptor_expect_success(TH1) - self.print_step(44, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(44) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(45, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(45) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(46, "TH0 writes ACL cat2v1 view on EP0") + self.step(46) cat2v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -292,16 +358,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat2v1_view] await self.write_acl(acl) - self.print_step(47, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(47) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(48, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(48) await self.read_descriptor_expect_success(TH2) - self.print_step(49, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(49) await self.read_descriptor_expect_success(TH3) - self.print_step(50, "TH0 writes ACL cat2v2 view on EP0") + self.step(50) cat2v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -311,16 +377,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat2v2_view] await self.write_acl(acl) - self.print_step(51, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(51) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(52, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(52) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(53, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(53) await self.read_descriptor_expect_success(TH3) - self.print_step(54, "TH0 writes ACL cat2v3 view on EP0") + self.step(54) cat2v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -330,16 +396,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat2v3_view] await self.write_acl(acl) - self.print_step(55, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(55) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(56, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(56) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(57, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(57) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(58, "TH0 writes ACL back to default") + self.step(58) full_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, diff --git a/src/python_testing/hello_external_runner.py b/src/python_testing/hello_external_runner.py new file mode 100755 index 00000000000000..764ac95de4c50b --- /dev/null +++ b/src/python_testing/hello_external_runner.py @@ -0,0 +1,142 @@ +#!/usr/bin/env -S python3 -B +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import signal +import subprocess +import sys + +from hello_test import HelloTest +from matter_testing_support import run_tests, MatterBaseTest, MatterTestConfig, TestInfo, get_test_info +from multiprocessing import Process, Manager, Value +from multiprocessing.managers import BaseManager + +try: + from matter_yamltests.hooks import TestRunnerHooks +except: + class TestRunnerHooks: + pass + +DEFAULT_CHIP_ROOT = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', '..')) + +MATTER_DEVELOPMENT_PAA_ROOT_CERTS = "credentials/development/paa-root-certs" + +class TestTestRunnerHooks(TestRunnerHooks): + def reset(self): + self.start_called = False + self.stop_called = False + self.test_start_called = False + self.test_stop_called = False + self.step_skipped_list = [] + self.step_start_list = [] + self.step_success_count = 0 + self.step_failure_count = 0 + self.step_unknown_count = 0 + def __init__(self): + self.reset() + + def start(self, count: int): + self.start_called = True + + def stop(self, duration: int): + self.stop_called = True + + def test_start(self, filename: str, name: str, count: int): + self.test_start_called = True + + def test_stop(self, exception: Exception, duration: int): + self.test_stop_called = True + + def step_skipped(self, name: str, expression: str): + self.step_skipped_list.append(name) + + def step_start(self, name: str): + self.step_start_list.append(name) + + def step_success(self, logger, logs, duration: int, request): + self.step_success_count = self.step_success_count + 1 + + def step_failure(self, logger, logs, duration: int, request, received): + self.step_failure_count = self.step_failure_count + 1 + + def step_unknown(self): + self.step_unknown_count = self.step_unknown_count + 1 + + def summary(self): + print(f'start_called = {self.start_called}') + print(f'stop_called = {self.stop_called}') + print(f'test_start_called = {self.test_start_called}') + print(f'test_stop_called = {self.test_stop_called}') + print(f'step_skipped_list = {self.step_skipped_list}') + print(f'step_start_list = {self.step_start_list}') + print(f'step_success_count = {self.step_success_count}') + print(f'step_failure_count = {self.step_failure_count}') + print(f'step_unknown_count = {self.step_unknown_count}') + +def run_in_process(test_name:str, config: MatterTestConfig) -> None: + BaseManager.register('TestTestRunnerHooks', TestTestRunnerHooks) + manager = BaseManager() + manager.start() + my_hooks = manager.TestTestRunnerHooks() + p = Process(target=run_tests, args=(HelloTest, config, my_hooks)) + p.start() + p.join() + print(f'Results from test {test_name}:') + print(my_hooks.summary()) + +def commission() -> None: + paa_path = os.path.join(DEFAULT_CHIP_ROOT, MATTER_DEVELOPMENT_PAA_ROOT_CERTS) + config = MatterTestConfig(commissioning_method="on-network", commission_only=True, discriminators=[3840], setup_passcodes=[20202021], dut_node_ids=[0x12344321], paa_trust_store_path=paa_path, storage_path='admin_storage.json') + run_in_process("commission", config) + +def one_test(test_name): + # Run a test NOT using the default main. Pass in our own runner and make sure it + # gets called back. + + # This config would be generated by the TH. PIXITs can be passed using the global_test_params["meta_config"] section. + config = MatterTestConfig(tests=[test_name], dut_node_ids=[0x12344321], storage_path='admin_storage.json') + + # TH can use get_test_info to get a list of steps and a description + list = get_test_info(HelloTest, config) + print(f'Test info for test {test_name}') + print(list) + + run_in_process(test_name, config) + +def main(): + + # Fire up an example app to test against + # TODO: make factory reset and app path configurable, maybe the storage location too. + subprocess.call("rm -rf /tmp/chip_* /tmp/repl* admin_storage.json", shell=True) + app_path = os.path.abspath(os.path.join(DEFAULT_CHIP_ROOT, 'out', + 'linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test', 'chip-all-clusters-app')) + app_cmd = str(app_path) + app_process = subprocess.Popen([app_cmd], stdout=sys.stdout, stderr=sys.stderr, bufsize=0) + + commission() + one_test('test_failure_on_wrong_endpoint') + one_test('test_names_as_expected') + one_test('test_pics') + + app_process.send_signal(signal.SIGINT.value) + app_process.wait() + print("app stopped") + +if __name__ == "__main__": + main() diff --git a/src/python_testing/hello_test.py b/src/python_testing/hello_test.py index d7bad4c2dc193d..6756655a27c8d4 100644 --- a/src/python_testing/hello_test.py +++ b/src/python_testing/hello_test.py @@ -19,11 +19,14 @@ import chip.clusters as Clusters from chip.interaction_model import Status -from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, TestStep from mobly import asserts class HelloTest(MatterBaseTest): + # This example test does not include the step_ and desc_ markers + # The MatterBaseTest will assume a single step and create a description + # based on the test name @async_test_body async def test_names_as_expected(self): dev_ctrl = self.default_controller @@ -37,8 +40,25 @@ async def test_names_as_expected(self): logging.info("Found VendorName: %s" % (vendor_name)) asserts.assert_equal(vendor_name, "TEST_VENDOR", "VendorName must be TEST_VENDOR!") + + # To include individual steps and description for the TH, define a steps_ and desc_ function + # for the test, then use self.step(#) to indicate how the test proceeds through the test plan. + # Support for keeping the TH up to date is built into MatterBaseTest when you use the step() + # function. + def steps_failure_on_wrong_endpoint(self) -> list[TestStep]: + steps = [TestStep(1, "Commissioning, already done", is_commissioning=True), + TestStep(2, "Read ProductName on endpoint 9999"), + ] + return steps + def desc_failure_on_wrong_endpoint(self) -> str: + return '#.#.#. [TC-HELLO-x.x] Test Failure On Wrong Endpoint' + + @async_test_body async def test_failure_on_wrong_endpoint(self): + self.step(1) # Commissioning + + self.step(2) dev_ctrl = self.default_controller result = await self.read_single_attribute( dev_ctrl, @@ -49,6 +69,29 @@ async def test_failure_on_wrong_endpoint(self): asserts.assert_true(isinstance(result, Clusters.Attribute.ValueDecodeFailure), "Should fail to read on endpoint 9999") asserts.assert_equal(result.Reason.status, Status.UnsupportedEndpoint, "Failure reason should be UnsupportedEndpoint") + def steps_pics(self) -> list[TestStep]: + steps = [TestStep(1, "Commissioning, already done", is_commissioning=True), + TestStep(2, "Skip this step"), + TestStep(3, "Run this step") + ] + return steps + + def desc_pics(self) -> str: + return "#.#.#. [TC-HELLO-x.x] Test pics" + + @async_test_body + async def test_pics(self): + self.step(1) # commissioning + print('This should be run') + + self.step(2) + if self.pics_guard(self.check_pics('NON-EXISTANT_PICS')): + asserts.fail('This should not be run') + + self.step(3) + print('This should also be run') + + if __name__ == "__main__": default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index a394952445de60..eb77ba6190841a 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -18,6 +18,7 @@ import argparse import asyncio import builtins +import inspect import json import logging import math @@ -28,6 +29,7 @@ import sys import typing import uuid +from aenum import Enum from binascii import hexlify, unhexlify from dataclasses import asdict as dataclass_asdict from dataclasses import dataclass, field @@ -59,6 +61,13 @@ from mobly.config_parser import ENV_MOBLY_LOGPATH, TestRunConfig from mobly.test_runner import TestRunner +try: + from matter_yamltests.hooks import TestRunnerHooks +except: + class TestRunnerHooks: + pass + + # TODO: Add utility to commission a device if needed # TODO: Add utilities to keep track of controllers/fabrics @@ -224,6 +233,42 @@ def name(self) -> str: return self._name +class InternalTestRunnerHooks(TestRunnerHooks): + + def start(self, count: int): + logging.info(f'Starting test set, running {count} tests') + + def stop(self, duration: int): + logging.info(f'Finished test set, ran for {duration}ms') + + def test_start(self, filename: str, name: str, count: int): + logging.info(f'Starting test from {filename}: {name} - {count} steps') + + def test_stop(self, exception: Exception, duration: int): + logging.info(f'Finished test in {duration}ms') + + def step_skipped(self, name: str, expression: str): + # TODO: Do we really need the expression as a string? We can evaluate this in code very easily + logging.info(f'\t\t**** Skipping: {name}') + + def step_start(self, name: str): + # The way I'm calling this, the name is already includes the step number, but it seems like it might be good to separate these + logging.info(f'\t\t***** Test Step {name}') + + def step_success(self, logger, logs, duration: int, request): + pass + + def step_failure(self, logger, logs, duration: int, request, received): + # TODO: there's supposed to be some kind of error message here, but I have no idea where it's meant to come from in this API + logging.info(f'\t\t***** Test Failure : ') + + def step_unknown(self): + """ + This method is called when the result of running a step is unknown. For example during a dry-run. + """ + pass + + @dataclass class MatterTestConfig: storage_path: pathlib.Path = pathlib.Path(".") @@ -237,6 +282,8 @@ class MatterTestConfig: global_test_params: dict = field(default_factory=dict) # List of explicit tests to run by name. If empty, all tests will run tests: List[str] = field(default_factory=list) + timeout: int = 90 + endpoint: int = 0 commissioning_method: Optional[str] = None discriminators: Optional[List[int]] = None @@ -442,12 +489,74 @@ def hex_from_bytes(b: bytes) -> str: return hexlify(b).decode("utf-8") +@dataclass +class TestStep: + test_plan_number: typing.Union[int, str] + description: str + is_commissioning: bool = False + + +@dataclass +class TestInfo: + function: str + desc: str + steps: typing.List[TestStep] + + class MatterBaseTest(base_test.BaseTestClass): def __init__(self, *args): super().__init__(*args) # List of accumulated problems across all tests self.problems = [] + self.is_commissioning = False + + def get_test_steps(self, test:str) -> list[TestStep]: + ''' Retrieves the test step list for the given test + + Test steps are defined in the function called steps_. + ex for test test_TC_TEST_1_1, the steps are in a function called + steps_TC_TEST_1_1. + + Test that implement a steps_ function should call each step + in order using self.step(number), where number is the test_plan_number + from each TestStep. + ''' + steps = self._get_defined_test_steps(test) + return [TestStep(1, "Run entire test")] if steps is None else steps + + + def _get_defined_test_steps(self, test: str) -> list[TestStep]: + steps_name = 'steps_' + test[5:] + try: + fn = getattr(self, steps_name) + return fn() + except AttributeError: + return None + + def get_test_desc(self, test: str) -> str: + ''' Returns a description of this test + + Test description is defined in the function called desc_. + ex for test test_TC_TEST_1_1, the steps are in a function called + steps_TC_TEST_1_1. + + Format: + [] + + ex: + 133.1.1. [TC-ACL-1.1] Global attributes + ''' + desc_name = 'desc_' + test[5:] + try: + fn = getattr(self, desc_name) + return fn() + except AttributeError: + return test + + @property + def runner_hook(self) -> TestRunnerHooks: + return unstash_globally(self.user_params.get("hooks")) @property def matter_test_config(self) -> MatterTestConfig: @@ -476,6 +585,23 @@ def setup_class(self): # TODO: Move to using non-generated code and rather use data model description (.matter or .xml) self.cluster_mapper = ClusterMapper(self.default_controller._Cluster) + def setup_test(self): + self.current_step_index = 0 + self.step_start_time = datetime.now(timezone.utc) + self.step_skipped = False + if self.runner_hook and not self.is_commissioning: + test_name = self.current_test_info.name + steps = self._get_defined_test_steps(test_name) + num_steps = 1 if steps is None else len(steps) + filename = inspect.getfile(self.__class__) + desc = self.get_test_desc(test_name) + self.runner_hook.test_start(filename=filename, name=desc, count=num_steps) + # If we don't have defined steps, we're going to start the one and only step now + # if there are steps defined by the test, rely on the test calling the step() function + # to indicates how it is proceeding + if steps is None: + self.step(1) + def teardown_class(self): """Final teardown after all tests: log all problems""" if len(self.problems) == 0: @@ -503,11 +629,13 @@ async def read_single_attribute( async def read_single_attribute_check_success( self, cluster: Clusters.ClusterObjects.ClusterCommand, attribute: Clusters.ClusterObjects.ClusterAttributeDescriptor, - dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0) -> object: + dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = None) -> object: if dev_ctrl is None: dev_ctrl = self.default_controller if node_id is None: node_id = self.dut_node_id + if endpoint is None: + endpoint = self.matter_test_config.endpoint result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)]) attr_ret = result[endpoint][cluster][attribute] @@ -521,11 +649,13 @@ async def read_single_attribute_check_success( async def read_single_attribute_expect_error( self, cluster: object, attribute: object, - error: Status, dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0) -> object: + error: Status, dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = None) -> object: if dev_ctrl is None: dev_ctrl = self.default_controller if node_id is None: node_id = self.dut_node_id + if endpoint is None: + endpoint = self.matter_test_config.endpoint result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)]) attr_ret = result[endpoint][cluster][attribute] @@ -538,12 +668,14 @@ async def read_single_attribute_expect_error( async def send_single_cmd( self, cmd: Clusters.ClusterObjects.ClusterCommand, - dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0, + dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = None, timedRequestTimeoutMs: typing.Union[None, int] = None) -> object: if dev_ctrl is None: dev_ctrl = self.default_controller if node_id is None: node_id = self.dut_node_id + if endpoint is None: + endpoint = self.matter_test_config.endpoint result = await dev_ctrl.SendCommand(nodeid=node_id, endpoint=endpoint, payload=cmd, timedRequestTimeoutMs=timedRequestTimeoutMs) return result @@ -560,6 +692,90 @@ def record_warning(self, test_name: str, location: Union[AttributePathLocation, def record_note(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.NOTE, problem, spec_location)) + def on_fail(self, record): + ''' Called by Mobly on test failure + + record is of type TestResultRecord + ''' + if self.runner_hook and not self.is_commissioning: + exception = record.termination_signal.exception + step_duration = (datetime.now(timezone.utc) - self.step_start_time) / timedelta(microseconds=1) + # This isn't QUITE the test duration because the commissioning is handled separately, but it's clsoe enough for now + # This is already given in milliseconds + test_duration = record.end_time - record.begin_time + # TODO: I have no idea what logger, logs, request or received are. Hope None works because I have nothing to give + self.runner_hook.step_failure(logger=None, logs=None, duration=step_duration, request=None, received=None) + self.runner_hook.test_stop(exception=exception, duration=test_duration) + + def on_pass(self, record): + ''' Called by Mobly on test pass + + record is of type TestResultRecord + ''' + if self.runner_hook and not self.is_commissioning: + # What is request? This seems like an implementation detail for the runner + # TODO: As with failure, I have no idea what logger, logs or request are meant to be + step_duration = (datetime.now(timezone.utc) - self.step_start_time) / timedelta(microseconds=1) + test_duration = record.end_time - record.begin_time + self.runner_hook.step_success(logger=None, logs=None, duration=step_duration, request=None) + + # TODO: this check could easily be annoying when doing dev. flag it somehow? Ditto with the in-order check + steps = self._get_defined_test_steps(record.test_name) + if steps is None: + # if we don't have a list of steps, assume they were all run + all_steps_run = True + else: + all_steps_run = len(steps) == self.current_step_index + + if not all_steps_run: + # The test is done, but we didn't execute all the steps + asserts.fail("Test script error: Not all required steps were run") + + if self.runner_hook and not self.is_commissioning: + self.runner_hook.test_stop(exception=None, duration=test_duration) + + def pics_guard(self, pics_condition: bool): + if not pics_condition: + try: + steps = self.get_test_steps() + num = steps[self.current_step_index].test_plan_number + except: + num = self.current_step_index + + if self.runner_hook: + # TODO: what does name represent here? The wordy test name? The test plan number? The number and name? + # TODO: I very much do not want to have people passing in strings here. Do we really need the expression + # as a string? Does it get used by the TH? + self.runner_hook.step_skipped(name=str(num), expression="") + else: + logging.info(f'**** Skipping: {num}') + self.step_skipped = True + + def step(self, step: typing.Union[int, str]): + test_name = sys._getframe().f_back.f_code.co_name + steps = self.get_test_steps(test_name) + + # TODO: this might be annoying during dev. Remove? Flag? + if len(steps) <= self.current_step_index or steps[self.current_step_index].test_plan_number != step: + asserts.fail(f'Unexpected test step: {step} - steps not called in order, or step does not exist') + + if self.runner_hook: + # If we've reached the next step with no assertion and the step wasn't skipped, it passed + if not self.step_skipped and self.current_step_index != 0: + # TODO: As with failure, I have no idea what loger, logs or request are meant to be + step_duration = (datetime.now(timezone.utc) - self.step_start_time) / timedelta(microseconds=1) + self.runner_hook.step_success(logger=None, logs=None, duration=step_duration, request=None) + + # TODO: it seems like the step start should take a number and a name + name = f'{step} : {steps[self.current_step_index].description}' + self.runner_hook.step_start(name=name) + else: + self.print_step(step, steps[self.current_step_index].description) + + self.step_start_time = utc_native = datetime.now(tz=timezone.utc) + self.current_step_index = self.current_step_index + 1 + self.step_skipped = False + def get_setup_payload_info(self) -> SetupPayloadInfo: if self.matter_test_config.qr_code_content is not None: qr_code = self.matter_test_config.qr_code_content @@ -861,6 +1077,8 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig: config.ble_interface_id = args.ble_interface_id config.pics = {} if args.PICS is None else read_pics_from_file(args.PICS) config.tests = [] if args.tests is None else args.tests + config.timeout = 90 if args.timeout is None else args.timeout + config.endpoint = 0 if args.endpoint is None else args.endpoint config.controller_node_id = args.controller_node_id config.trace_to = args.trace_to @@ -908,10 +1126,12 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig: metavar='NODE_ID', default=_DEFAULT_CONTROLLER_NODE_ID, help='NodeID to use for initial/default controller (default: %d)' % _DEFAULT_CONTROLLER_NODE_ID) - basic_group.add_argument('-n', '--dut-node-id', type=int_decimal_or_hex, + basic_group.add_argument('-n', '--dut-node-id', '--nodeId', type=int_decimal_or_hex, metavar='NODE_ID', dest='dut_node_ids', default=[_DEFAULT_DUT_NODE_ID], help='Node ID for primary DUT communication, ' 'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID, nargs="+") + basic_group.add_argument('--endpoint', type=int, default=0, help="Endpoint under test") + basic_group.add_argument('--timeout', type=int, default=90, help="Test timeout in seconds") basic_group.add_argument("--PICS", help="PICS file path", type=str) commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node") @@ -1004,14 +1224,19 @@ def async_test_body(body): synchronously, we need a mechanism to allow an `async def` to be converted to a asyncio-run synchronous method. This decorator does the wrapping. """ - def async_runner(*args, **kwargs): - return asyncio.run(body(*args, **kwargs)) + + def async_runner(self: MatterBaseTest, *args, **kwargs): + runner_with_timeout = asyncio.wait_for(body(self, *args, **kwargs), timeout=self.matter_test_config.timeout) + return asyncio.run(runner_with_timeout) return async_runner class CommissionDeviceTest(MatterBaseTest): """Test class auto-injected at the start of test list to commission a device when requested""" + def __init__(self, *args): + super().__init__(*args) + self.is_commissioning = True def test_run_commissioning(self): conf = self.matter_test_config @@ -1082,6 +1307,7 @@ def default_matter_test_main(argv=None, **kwargs): Args: argv: A list that is then parsed as command line args. If None, defaults to sys.argv """ + matter_test_config = parse_matter_test_args(argv) # Allow override of command line from optional arguments @@ -1091,6 +1317,38 @@ def default_matter_test_main(argv=None, **kwargs): # Find the test class in the test script. test_class = _find_test_class() + # This is required in case we need any testing with maximized certificate chains. + # We need *all* issuers from the start, even for default controller, to use + # maximized chains, before MatterStackState init, others some stale certs + # may not chain properly. + if "maximize_cert_chains" in kwargs: + matter_test_config.maximize_cert_chains = kwargs["maximize_cert_chains"] + + hooks = InternalTestRunnerHooks() + + run_tests(test_class, matter_test_config, hooks) + + +def get_test_info(test_class: MatterBaseTest, matter_test_config: MatterTestConfig) -> list[TestInfo]: + test_config = generate_mobly_test_config(matter_test_config) + base = test_class(test_config) + + if len(matter_test_config.tests) > 0: + tests = matter_test_config.tests + else: + tests = base.get_existing_test_names() + + info = [] + for t in tests: + info.append(TestInfo(t, steps=base.get_test_steps(t), desc=base.get_test_desc(t))) + + return info + + +def run_tests(test_class: MatterBaseTest, matter_test_config: MatterTestConfig, hooks: TestRunnerHooks) -> None: + + get_test_info(test_class, matter_test_config) + # Load test config file. test_config = generate_mobly_test_config(matter_test_config) @@ -1099,13 +1357,6 @@ def default_matter_test_main(argv=None, **kwargs): if len(matter_test_config.tests) > 0: tests = matter_test_config.tests - # This is required in case we need any testing with maximized certificate chains. - # We need *all* issuers from the start, even for default controller, to use - # maximized chains, before MatterStackState init, others some stale certs - # may not chain properly. - if "maximize_cert_chains" in kwargs: - matter_test_config.maximize_cert_chains = kwargs["maximize_cert_chains"] - stack = MatterStackState(matter_test_config) with TracingContext() as tracing_ctx: @@ -1125,6 +1376,10 @@ def default_matter_test_main(argv=None, **kwargs): test_config.user_params["default_controller"] = stash_globally(default_controller) test_config.user_params["matter_test_config"] = stash_globally(matter_test_config) + test_config.user_params["hooks"] = stash_globally(hooks) + + # Execute the test class with the config + ok = True test_config.user_params["certificate_authority_manager"] = stash_globally(stack.certificate_authority_manager) @@ -1142,15 +1397,29 @@ def default_matter_test_main(argv=None, **kwargs): if not matter_test_config.commission_only: runner.add_test_class(test_config, test_class, tests) + if hooks: + # Right now, we only support running a single test class at once, + # but it's relatively easy to exapand that to make the test process faster + # TODO: support a list of tests + hooks.start(count=1) + # Mobly gives the test run time in seconds, lets be a bit more precise + runner_start_time = datetime.now(timezone.utc) + try: runner.run() ok = runner.results.is_all_pass and ok + except TimeoutError: + ok = False except signals.TestAbortAll: ok = False except Exception: logging.exception('Exception when executing %s.', test_config.testbed_name) ok = False + if hooks: + duration = (datetime.now(timezone.utc) - runner_start_time) / timedelta(microseconds=1) + hooks.stop(duration=duration) + # Shutdown the stack when all done stack.Shutdown() From 92085b3ab4d012f75ea04566ecb76009b89dec38 Mon Sep 17 00:00:00 2001 From: cecille Date: Tue, 5 Dec 2023 13:38:19 -0500 Subject: [PATCH 2/7] Add pics_ handler, address review comments --- src/python_testing/TC_ACE_1_3.py | 2 +- src/python_testing/hello_external_runner.py | 14 ++++++++-- src/python_testing/matter_testing_support.py | 29 +++++++++++++++++--- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/src/python_testing/TC_ACE_1_3.py b/src/python_testing/TC_ACE_1_3.py index bb6e6f7be707eb..87b57f0c9ce3eb 100644 --- a/src/python_testing/TC_ACE_1_3.py +++ b/src/python_testing/TC_ACE_1_3.py @@ -47,7 +47,7 @@ async def read_descriptor_expect_unsupported_access(self, th): dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess) def desc_TC_ACE_1_3(self) -> str: - return "" + return "[TC-ACE-1.3] Subjects" def steps_TC_ACE_1_3(self) -> list[TestStep]: steps = [ diff --git a/src/python_testing/hello_external_runner.py b/src/python_testing/hello_external_runner.py index 764ac95de4c50b..84da1c49bc7a3d 100755 --- a/src/python_testing/hello_external_runner.py +++ b/src/python_testing/hello_external_runner.py @@ -37,6 +37,7 @@ class TestRunnerHooks: MATTER_DEVELOPMENT_PAA_ROOT_CERTS = "credentials/development/paa-root-certs" + class TestTestRunnerHooks(TestRunnerHooks): def reset(self): self.start_called = False @@ -48,6 +49,7 @@ def reset(self): self.step_success_count = 0 self.step_failure_count = 0 self.step_unknown_count = 0 + def __init__(self): self.reset() @@ -89,7 +91,8 @@ def summary(self): print(f'step_failure_count = {self.step_failure_count}') print(f'step_unknown_count = {self.step_unknown_count}') -def run_in_process(test_name:str, config: MatterTestConfig) -> None: + +def run_in_process(test_name: str, config: MatterTestConfig) -> None: BaseManager.register('TestTestRunnerHooks', TestTestRunnerHooks) manager = BaseManager() manager.start() @@ -100,11 +103,14 @@ def run_in_process(test_name:str, config: MatterTestConfig) -> None: print(f'Results from test {test_name}:') print(my_hooks.summary()) + def commission() -> None: paa_path = os.path.join(DEFAULT_CHIP_ROOT, MATTER_DEVELOPMENT_PAA_ROOT_CERTS) - config = MatterTestConfig(commissioning_method="on-network", commission_only=True, discriminators=[3840], setup_passcodes=[20202021], dut_node_ids=[0x12344321], paa_trust_store_path=paa_path, storage_path='admin_storage.json') + config = MatterTestConfig(commissioning_method="on-network", commission_only=True, discriminators=[3840], setup_passcodes=[ + 20202021], dut_node_ids=[0x12344321], paa_trust_store_path=paa_path, storage_path='admin_storage.json') run_in_process("commission", config) + def one_test(test_name): # Run a test NOT using the default main. Pass in our own runner and make sure it # gets called back. @@ -119,13 +125,14 @@ def one_test(test_name): run_in_process(test_name, config) + def main(): # Fire up an example app to test against # TODO: make factory reset and app path configurable, maybe the storage location too. subprocess.call("rm -rf /tmp/chip_* /tmp/repl* admin_storage.json", shell=True) app_path = os.path.abspath(os.path.join(DEFAULT_CHIP_ROOT, 'out', - 'linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test', 'chip-all-clusters-app')) + 'linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test', 'chip-all-clusters-app')) app_cmd = str(app_path) app_process = subprocess.Popen([app_cmd], stdout=sys.stdout, stderr=sys.stderr, bufsize=0) @@ -138,5 +145,6 @@ def main(): app_process.wait() print("app stopped") + if __name__ == "__main__": main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 94074b3954846a..1de33f3321989d 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -558,7 +558,8 @@ class TestStep: class TestInfo: function: str desc: str - steps: typing.List[TestStep] + steps: list[TestStep] + pics: list[str] class MatterBaseTest(base_test.BaseTestClass): @@ -569,7 +570,7 @@ def __init__(self, *args): self.problems = [] self.is_commissioning = False - def get_test_steps(self, test:str) -> list[TestStep]: + def get_test_steps(self, test: str) -> list[TestStep]: ''' Retrieves the test step list for the given test Test steps are defined in the function called steps_. @@ -583,7 +584,6 @@ def get_test_steps(self, test:str) -> list[TestStep]: steps = self._get_defined_test_steps(test) return [TestStep(1, "Run entire test")] if steps is None else steps - def _get_defined_test_steps(self, test: str) -> list[TestStep]: steps_name = 'steps_' + test[5:] try: @@ -592,6 +592,26 @@ def _get_defined_test_steps(self, test: str) -> list[TestStep]: except AttributeError: return None + def get_test_pics(self, test: str) -> list[str]: + ''' Retrieves a list of top-level PICS that should be checked before running this test + + An empty list means the test will always be run. + + PICS are defined in a function called pics_. + ex. for test test_TC_TEST_1_1, the pics are in a function called + pics_TC_TEST_1_1. + ''' + pics = self._get_defined_pics(test) + return [] if pics is None else pics + + def _get_defined_pics(self, test: str) -> list[TestStep]: + steps_name = 'pics_' + test[5:] + try: + fn = getattr(self, steps_name) + return fn() + except AttributeError: + return None + def get_test_desc(self, test: str) -> str: ''' Returns a description of this test @@ -1292,6 +1312,7 @@ def async_runner(self: MatterBaseTest, *args, **kwargs): class CommissionDeviceTest(MatterBaseTest): """Test class auto-injected at the start of test list to commission a device when requested""" + def __init__(self, *args): super().__init__(*args) self.is_commissioning = True @@ -1398,7 +1419,7 @@ def get_test_info(test_class: MatterBaseTest, matter_test_config: MatterTestConf info = [] for t in tests: - info.append(TestInfo(t, steps=base.get_test_steps(t), desc=base.get_test_desc(t))) + info.append(TestInfo(t, steps=base.get_test_steps(t), desc=base.get_test_desc(t), pics=base.get_test_pics(t))) return info From 2c578f2a9fdde51e165c28f1851708177271b316 Mon Sep 17 00:00:00 2001 From: cecille Date: Tue, 5 Dec 2023 16:05:23 -0500 Subject: [PATCH 3/7] lint --- src/python_testing/hello_external_runner.py | 6 +++--- src/python_testing/hello_test.py | 8 +++----- src/python_testing/matter_testing_support.py | 9 ++++----- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/src/python_testing/hello_external_runner.py b/src/python_testing/hello_external_runner.py index 84da1c49bc7a3d..81ac3d06551bb8 100755 --- a/src/python_testing/hello_external_runner.py +++ b/src/python_testing/hello_external_runner.py @@ -22,13 +22,13 @@ import sys from hello_test import HelloTest -from matter_testing_support import run_tests, MatterBaseTest, MatterTestConfig, TestInfo, get_test_info -from multiprocessing import Process, Manager, Value +from matter_testing_support import run_tests, MatterTestConfig, get_test_info +from multiprocessing import Process from multiprocessing.managers import BaseManager try: from matter_yamltests.hooks import TestRunnerHooks -except: +except ImportError: class TestRunnerHooks: pass diff --git a/src/python_testing/hello_test.py b/src/python_testing/hello_test.py index 6756655a27c8d4..1a740c94768b46 100644 --- a/src/python_testing/hello_test.py +++ b/src/python_testing/hello_test.py @@ -40,7 +40,6 @@ async def test_names_as_expected(self): logging.info("Found VendorName: %s" % (vendor_name)) asserts.assert_equal(vendor_name, "TEST_VENDOR", "VendorName must be TEST_VENDOR!") - # To include individual steps and description for the TH, define a steps_ and desc_ function # for the test, then use self.step(#) to indicate how the test proceeds through the test plan. # Support for keeping the TH up to date is built into MatterBaseTest when you use the step() @@ -48,12 +47,12 @@ async def test_names_as_expected(self): def steps_failure_on_wrong_endpoint(self) -> list[TestStep]: steps = [TestStep(1, "Commissioning, already done", is_commissioning=True), TestStep(2, "Read ProductName on endpoint 9999"), - ] + ] return steps + def desc_failure_on_wrong_endpoint(self) -> str: return '#.#.#. [TC-HELLO-x.x] Test Failure On Wrong Endpoint' - @async_test_body async def test_failure_on_wrong_endpoint(self): self.step(1) # Commissioning @@ -81,7 +80,7 @@ def desc_pics(self) -> str: @async_test_body async def test_pics(self): - self.step(1) # commissioning + self.step(1) # commissioning print('This should be run') self.step(2) @@ -92,6 +91,5 @@ async def test_pics(self): print('This should also be run') - if __name__ == "__main__": default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 1de33f3321989d..5e3de69a2d1d38 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -29,7 +29,6 @@ import sys import typing import uuid -from aenum import Enum from binascii import hexlify, unhexlify from dataclasses import asdict as dataclass_asdict from dataclasses import dataclass, field @@ -63,7 +62,7 @@ try: from matter_yamltests.hooks import TestRunnerHooks -except: +except ImportError: class TestRunnerHooks: pass @@ -260,7 +259,7 @@ def step_success(self, logger, logs, duration: int, request): def step_failure(self, logger, logs, duration: int, request, received): # TODO: there's supposed to be some kind of error message here, but I have no idea where it's meant to come from in this API - logging.info(f'\t\t***** Test Failure : ') + logging.info('\t\t***** Test Failure : ') def step_unknown(self): """ @@ -817,7 +816,7 @@ def pics_guard(self, pics_condition: bool): try: steps = self.get_test_steps() num = steps[self.current_step_index].test_plan_number - except: + except KeyError: num = self.current_step_index if self.runner_hook: @@ -850,7 +849,7 @@ def step(self, step: typing.Union[int, str]): else: self.print_step(step, steps[self.current_step_index].description) - self.step_start_time = utc_native = datetime.now(tz=timezone.utc) + self.step_start_time = datetime.now(tz=timezone.utc) self.current_step_index = self.current_step_index + 1 self.step_skipped = False From bc0bacd179729a4b0732d4e55614d5981c7081bd Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Tue, 5 Dec 2023 21:07:46 +0000 Subject: [PATCH 4/7] Restyled by isort --- src/python_testing/TC_ACE_1_3.py | 2 +- src/python_testing/hello_external_runner.py | 6 +++--- src/python_testing/hello_test.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/python_testing/TC_ACE_1_3.py b/src/python_testing/TC_ACE_1_3.py index 87b57f0c9ce3eb..ec34b50bf1a9e5 100644 --- a/src/python_testing/TC_ACE_1_3.py +++ b/src/python_testing/TC_ACE_1_3.py @@ -19,7 +19,7 @@ import chip.clusters as Clusters from chip.interaction_model import Status -from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, TestStep +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main from mobly import asserts diff --git a/src/python_testing/hello_external_runner.py b/src/python_testing/hello_external_runner.py index 81ac3d06551bb8..5ae98636e4cfdf 100755 --- a/src/python_testing/hello_external_runner.py +++ b/src/python_testing/hello_external_runner.py @@ -20,12 +20,12 @@ import signal import subprocess import sys - -from hello_test import HelloTest -from matter_testing_support import run_tests, MatterTestConfig, get_test_info from multiprocessing import Process from multiprocessing.managers import BaseManager +from hello_test import HelloTest +from matter_testing_support import MatterTestConfig, get_test_info, run_tests + try: from matter_yamltests.hooks import TestRunnerHooks except ImportError: diff --git a/src/python_testing/hello_test.py b/src/python_testing/hello_test.py index 1a740c94768b46..8d92feb7ce3d80 100644 --- a/src/python_testing/hello_test.py +++ b/src/python_testing/hello_test.py @@ -19,7 +19,7 @@ import chip.clusters as Clusters from chip.interaction_model import Status -from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, TestStep +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main from mobly import asserts From 89b210f22a21fdb104fec14a7da8a24b6cc7bd53 Mon Sep 17 00:00:00 2001 From: C Freeman Date: Thu, 7 Dec 2023 10:55:14 -0500 Subject: [PATCH 5/7] Update src/python_testing/matter_testing_support.py Co-authored-by: Carolina Lopes <116589288+ccruzagralopes@users.noreply.github.com> --- src/python_testing/matter_testing_support.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 5e3de69a2d1d38..fda47f74279ecd 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -616,7 +616,7 @@ def get_test_desc(self, test: str) -> str: Test description is defined in the function called desc_. ex for test test_TC_TEST_1_1, the steps are in a function called - steps_TC_TEST_1_1. + desc_TC_TEST_1_1. Format: [] From 9e2ca4802429b9fa7acdc0dd3282b880d1ba59c1 Mon Sep 17 00:00:00 2001 From: cecille Date: Thu, 7 Dec 2023 13:38:36 -0500 Subject: [PATCH 6/7] Init stuff in setup_class in case setup_class fails --- src/python_testing/matter_testing_support.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index fda47f74279ecd..57d5f8bf4a60fb 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -661,6 +661,9 @@ def setup_class(self): # Mappings of cluster IDs to names and metadata. # TODO: Move to using non-generated code and rather use data model description (.matter or .xml) self.cluster_mapper = ClusterMapper(self.default_controller._Cluster) + self.current_step_index = 0 + self.step_start_time = datetime.now(timezone.utc) + self.step_skipped = False def setup_test(self): self.current_step_index = 0 From f4af059b8d91d6ecf5eb2fd5de7a6aa81842c9fa Mon Sep 17 00:00:00 2001 From: cecille Date: Thu, 7 Dec 2023 15:28:56 -0500 Subject: [PATCH 7/7] Make default timeout configurable --- src/python_testing/TC_TIMESYNC_2_8.py | 4 ++++ src/python_testing/matter_testing_support.py | 15 +++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/python_testing/TC_TIMESYNC_2_8.py b/src/python_testing/TC_TIMESYNC_2_8.py index f324a845ba22db..828b962ea5fef8 100644 --- a/src/python_testing/TC_TIMESYNC_2_8.py +++ b/src/python_testing/TC_TIMESYNC_2_8.py @@ -29,6 +29,10 @@ class TC_TIMESYNC_2_8(MatterBaseTest): + @property + def default_timeout(self) -> int: + # This test has potentially 6 15 s waits, so set timeout to 100 + return 100 async def read_ts_attribute_expect_success(self, attribute): cluster = Clusters.Objects.TimeSynchronization diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 57d5f8bf4a60fb..e486fa0304ab58 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -281,7 +281,7 @@ class MatterTestConfig: global_test_params: dict = field(default_factory=dict) # List of explicit tests to run by name. If empty, all tests will run tests: List[str] = field(default_factory=list) - timeout: int = 90 + timeout: typing.Union[int, None] = None endpoint: int = 0 commissioning_method: Optional[str] = None @@ -631,6 +631,12 @@ def get_test_desc(self, test: str) -> str: except AttributeError: return test + # Override this if the test requires a different default timeout. + # This value will be overridden if a timeout is supplied on the command line. + @property + def default_timeout(self) -> int: + return 90 + @property def runner_hook(self) -> TestRunnerHooks: return unstash_globally(self.user_params.get("hooks")) @@ -1157,7 +1163,7 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig: config.ble_interface_id = args.ble_interface_id config.pics = {} if args.PICS is None else read_pics_from_file(args.PICS) config.tests = [] if args.tests is None else args.tests - config.timeout = 90 if args.timeout is None else args.timeout + config.timeout = args.timeout # This can be none, we pull the default from the test if it's unspecified config.endpoint = 0 if args.endpoint is None else args.endpoint config.controller_node_id = args.controller_node_id @@ -1211,7 +1217,7 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig: help='Node ID for primary DUT communication, ' 'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID, nargs="+") basic_group.add_argument('--endpoint', type=int, default=0, help="Endpoint under test") - basic_group.add_argument('--timeout', type=int, default=90, help="Test timeout in seconds") + basic_group.add_argument('--timeout', type=int, help="Test timeout in seconds") basic_group.add_argument("--PICS", help="PICS file path", type=str) commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node") @@ -1306,7 +1312,8 @@ def async_test_body(body): """ def async_runner(self: MatterBaseTest, *args, **kwargs): - runner_with_timeout = asyncio.wait_for(body(self, *args, **kwargs), timeout=self.matter_test_config.timeout) + timeout = self.matter_test_config.timeout if self.matter_test_config.timeout is not None else self.default_timeout + runner_with_timeout = asyncio.wait_for(body(self, *args, **kwargs), timeout=timeout) return asyncio.run(runner_with_timeout) return async_runner