From c2d6fced62172d91c8822e46bd9f365de7861732 Mon Sep 17 00:00:00 2001 From: C Freeman Date: Mon, 11 Dec 2023 10:58:08 -0500 Subject: [PATCH] Python script: start of TH integration code (#28708) * Python script: start of TH integration code - adds required command line functionality (endpoint, nodeID, timeout) - adds the ability to pass in a hooks object for use by the TH - adds an internal hooks object when scripts are being used on the command line - adds hook calls as appropriate at test start / end / steps To setup a test for use in the TH, define a steps_ and desc_ function with the same name as the test, change print_step() calls to step() calls with the appropriate number, add pics_guard calls around pics checks. Changes the hello_test to demonstrate how to do this and also how the test works if the steps_ function is not defined. Added a demonstration of how the TH could call these to get the hooks back out. * Add pics_ handler, address review comments * lint * Restyled by isort * Update src/python_testing/matter_testing_support.py Co-authored-by: Carolina Lopes <116589288+ccruzagralopes@users.noreply.github.com> * Init stuff in setup_class in case setup_class fails * Make default timeout configurable --------- Co-authored-by: Restyled.io Co-authored-by: Carolina Lopes <116589288+ccruzagralopes@users.noreply.github.com> --- src/python_testing/TC_ACE_1_3.py | 184 +++++++---- src/python_testing/TC_TIMESYNC_2_8.py | 4 + src/python_testing/hello_external_runner.py | 150 +++++++++ src/python_testing/hello_test.py | 43 ++- src/python_testing/matter_testing_support.py | 325 ++++++++++++++++++- 5 files changed, 633 insertions(+), 73 deletions(-) create mode 100755 src/python_testing/hello_external_runner.py diff --git a/src/python_testing/TC_ACE_1_3.py b/src/python_testing/TC_ACE_1_3.py index a24db8aa18de67..ec34b50bf1a9e5 100644 --- a/src/python_testing/TC_ACE_1_3.py +++ b/src/python_testing/TC_ACE_1_3.py @@ -19,7 +19,7 @@ import chip.clusters as Clusters from chip.interaction_model import Status -from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main from mobly import asserts @@ -46,6 +46,72 @@ async def read_descriptor_expect_unsupported_access(self, th): await self.read_single_attribute_expect_error( dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess) + def desc_TC_ACE_1_3(self) -> str: + return "[TC-ACE-1.3] Subjects" + + def steps_TC_ACE_1_3(self) -> list[TestStep]: + steps = [ + TestStep(1, "Commissioning, already done", is_commissioning=True), + TestStep(2, "TH0 writes ACL all view on PIXIT.ACE.TESTENDPOINT"), + TestStep(3, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(4, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(5, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(6, "TH0 writes ACL TH1 view on EP0"), + TestStep(7, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(8, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(9, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(10, "TH0 writes ACL TH2 view on EP0"), + TestStep(11, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(12, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(13, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(14, "TH0 writes ACL TH3 view on EP0"), + TestStep(15, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(16, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(17, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(18, "TH0 writes ACL TH1 TH2 view on EP0"), + TestStep(19, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(20, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(21, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(22, "TH0 writes ACL TH1 TH3 view on EP0"), + TestStep(23, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(24, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(25, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(26, "TH0 writes ACL TH2 TH3 view on EP0"), + TestStep(27, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(28, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(29, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(30, "TH0 writes ACL TH1 TH2 TH3 view on EP0"), + TestStep(31, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(32, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(33, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(34, "TH0 writes ACL cat1v1 view on EP0"), + TestStep(35, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(36, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(37, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(38, "TH0 writes ACL cat1v2 view on EP0"), + TestStep(39, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(40, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(41, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(42, "TH0 writes ACL cat1v3 view on EP0"), + TestStep(43, "TH1 reads EP0 descriptor - expect SUCCESS"), + TestStep(44, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(45, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(46, "TH0 writes ACL cat2v1 view on EP0"), + TestStep(47, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(48, "TH2 reads EP0 descriptor - expect SUCCESS"), + TestStep(49, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(50, "TH0 writes ACL cat2v2 view on EP0"), + TestStep(51, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(52, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(53, "TH3 reads EP0 descriptor - expect SUCCESS"), + TestStep(54, "TH0 writes ACL cat2v3 view on EP0"), + TestStep(55, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(56, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(57, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS"), + TestStep(58, "TH0 writes ACL back to default") + ] + return steps + @async_test_body async def test_TC_ACE_1_3(self): cat1_id = 0x11110000 @@ -59,7 +125,7 @@ async def test_TC_ACE_1_3(self): cat2v3 = cat2_id | 0x0003 logging.info('cat1v1 0x%x', cat1v1) - self.print_step(1, "Commissioning, already done") + self.step(1) fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0] @@ -78,7 +144,7 @@ async def test_TC_ACE_1_3(self): paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path), catTags=[cat1v1, cat2v2]) - self.print_step(2, "TH0 writes ACL all view on PIXIT.ACE.TESTENDPOINT") + self.step(2) TH0_admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -92,16 +158,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, all_view] await self.write_acl(acl) - self.print_step(3, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(3) await self.read_descriptor_expect_success(TH1) - self.print_step(4, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(4) await self.read_descriptor_expect_success(TH2) - self.print_step(5, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(5) await self.read_descriptor_expect_success(TH3) - self.print_step(6, "TH0 writes ACL TH1 view on EP0") + self.step(6) th1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -109,16 +175,16 @@ async def test_TC_ACE_1_3(self): targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, th1_view] await self.write_acl(acl) - self.print_step(7, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(7) await self.read_descriptor_expect_success(TH1) - self.print_step(8, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(8) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(9, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(9) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(10, "TH0 writes ACL TH2 view on EP0") + self.step(10) th2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -127,16 +193,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th2_view] await self.write_acl(acl) - self.print_step(11, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(11) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(12, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(12) await self.read_descriptor_expect_success(TH2) - self.print_step(13, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(13) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(14, "TH0 writes ACL TH3 view on EP0") + self.step(14) th3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -145,16 +211,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th3_view] await self.write_acl(acl) - self.print_step(15, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(15) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(16, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(16) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(17, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(17) await self.read_descriptor_expect_success(TH3) - self.print_step(18, "TH0 writes ACL TH1 TH2 view on EP0") + self.step(18) th12_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -163,16 +229,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th12_view] await self.write_acl(acl) - self.print_step(19, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(19) await self.read_descriptor_expect_success(TH1) - self.print_step(20, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(20) await self.read_descriptor_expect_success(TH2) - self.print_step(21, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(21) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(22, "TH0 writes ACL TH1 TH3 view on EP0") + self.step(22) th13_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -181,16 +247,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th13_view] await self.write_acl(acl) - self.print_step(23, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(23) await self.read_descriptor_expect_success(TH1) - self.print_step(24, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(24) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(25, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(25) await self.read_descriptor_expect_success(TH3) - self.print_step(26, "TH0 writes ACL TH2 TH3 view on EP0") + self.step(26) th23_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -199,16 +265,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th23_view] await self.write_acl(acl) - self.print_step(27, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(27) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(28, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(28) await self.read_descriptor_expect_success(TH2) - self.print_step(29, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(29) await self.read_descriptor_expect_success(TH3) - self.print_step(30, "TH0 writes ACL TH1 TH2 TH3 view on EP0") + self.step(30) th123_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -217,16 +283,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, th123_view] await self.write_acl(acl) - self.print_step(31, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(31) await self.read_descriptor_expect_success(TH1) - self.print_step(32, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(32) await self.read_descriptor_expect_success(TH2) - self.print_step(33, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(33) await self.read_descriptor_expect_success(TH3) - self.print_step(34, "TH0 writes ACL cat1v1 view on EP0") + self.step(34) cat1v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -235,16 +301,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat1v1_view] await self.write_acl(acl) - self.print_step(35, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(35) await self.read_descriptor_expect_success(TH1) - self.print_step(36, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(36) await self.read_descriptor_expect_success(TH2) - self.print_step(37, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(37) await self.read_descriptor_expect_success(TH3) - self.print_step(38, "TH0 writes ACL cat1v2 view on EP0") + self.step(38) cat1v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -254,16 +320,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat1v2_view] await self.write_acl(acl) - self.print_step(39, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(39) await self.read_descriptor_expect_success(TH1) - self.print_step(40, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(40) await self.read_descriptor_expect_success(TH2) - self.print_step(41, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(41) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(42, "TH0 writes ACL cat1v3 view on EP0") + self.step(42) cat1v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -273,16 +339,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat1v3_view] await self.write_acl(acl) - self.print_step(43, "TH1 reads EP0 descriptor - expect SUCCESS") + self.step(43) await self.read_descriptor_expect_success(TH1) - self.print_step(44, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(44) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(45, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(45) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(46, "TH0 writes ACL cat2v1 view on EP0") + self.step(46) cat2v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -292,16 +358,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat2v1_view] await self.write_acl(acl) - self.print_step(47, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(47) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(48, "TH2 reads EP0 descriptor - expect SUCCESS") + self.step(48) await self.read_descriptor_expect_success(TH2) - self.print_step(49, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(49) await self.read_descriptor_expect_success(TH3) - self.print_step(50, "TH0 writes ACL cat2v2 view on EP0") + self.step(50) cat2v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -311,16 +377,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat2v2_view] await self.write_acl(acl) - self.print_step(51, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(51) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(52, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(52) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(53, "TH3 reads EP0 descriptor - expect SUCCESS") + self.step(53) await self.read_descriptor_expect_success(TH3) - self.print_step(54, "TH0 writes ACL cat2v3 view on EP0") + self.step(54) cat2v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, @@ -330,16 +396,16 @@ async def test_TC_ACE_1_3(self): acl = [TH0_admin_acl, cat2v3_view] await self.write_acl(acl) - self.print_step(55, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(55) await self.read_descriptor_expect_unsupported_access(TH1) - self.print_step(56, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(56) await self.read_descriptor_expect_unsupported_access(TH2) - self.print_step(57, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + self.step(57) await self.read_descriptor_expect_unsupported_access(TH3) - self.print_step(58, "TH0 writes ACL back to default") + self.step(58) full_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, diff --git a/src/python_testing/TC_TIMESYNC_2_8.py b/src/python_testing/TC_TIMESYNC_2_8.py index f324a845ba22db..828b962ea5fef8 100644 --- a/src/python_testing/TC_TIMESYNC_2_8.py +++ b/src/python_testing/TC_TIMESYNC_2_8.py @@ -29,6 +29,10 @@ class TC_TIMESYNC_2_8(MatterBaseTest): + @property + def default_timeout(self) -> int: + # This test has potentially 6 15 s waits, so set timeout to 100 + return 100 async def read_ts_attribute_expect_success(self, attribute): cluster = Clusters.Objects.TimeSynchronization diff --git a/src/python_testing/hello_external_runner.py b/src/python_testing/hello_external_runner.py new file mode 100755 index 00000000000000..5ae98636e4cfdf --- /dev/null +++ b/src/python_testing/hello_external_runner.py @@ -0,0 +1,150 @@ +#!/usr/bin/env -S python3 -B +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import signal +import subprocess +import sys +from multiprocessing import Process +from multiprocessing.managers import BaseManager + +from hello_test import HelloTest +from matter_testing_support import MatterTestConfig, get_test_info, run_tests + +try: + from matter_yamltests.hooks import TestRunnerHooks +except ImportError: + class TestRunnerHooks: + pass + +DEFAULT_CHIP_ROOT = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', '..')) + +MATTER_DEVELOPMENT_PAA_ROOT_CERTS = "credentials/development/paa-root-certs" + + +class TestTestRunnerHooks(TestRunnerHooks): + def reset(self): + self.start_called = False + self.stop_called = False + self.test_start_called = False + self.test_stop_called = False + self.step_skipped_list = [] + self.step_start_list = [] + self.step_success_count = 0 + self.step_failure_count = 0 + self.step_unknown_count = 0 + + def __init__(self): + self.reset() + + def start(self, count: int): + self.start_called = True + + def stop(self, duration: int): + self.stop_called = True + + def test_start(self, filename: str, name: str, count: int): + self.test_start_called = True + + def test_stop(self, exception: Exception, duration: int): + self.test_stop_called = True + + def step_skipped(self, name: str, expression: str): + self.step_skipped_list.append(name) + + def step_start(self, name: str): + self.step_start_list.append(name) + + def step_success(self, logger, logs, duration: int, request): + self.step_success_count = self.step_success_count + 1 + + def step_failure(self, logger, logs, duration: int, request, received): + self.step_failure_count = self.step_failure_count + 1 + + def step_unknown(self): + self.step_unknown_count = self.step_unknown_count + 1 + + def summary(self): + print(f'start_called = {self.start_called}') + print(f'stop_called = {self.stop_called}') + print(f'test_start_called = {self.test_start_called}') + print(f'test_stop_called = {self.test_stop_called}') + print(f'step_skipped_list = {self.step_skipped_list}') + print(f'step_start_list = {self.step_start_list}') + print(f'step_success_count = {self.step_success_count}') + print(f'step_failure_count = {self.step_failure_count}') + print(f'step_unknown_count = {self.step_unknown_count}') + + +def run_in_process(test_name: str, config: MatterTestConfig) -> None: + BaseManager.register('TestTestRunnerHooks', TestTestRunnerHooks) + manager = BaseManager() + manager.start() + my_hooks = manager.TestTestRunnerHooks() + p = Process(target=run_tests, args=(HelloTest, config, my_hooks)) + p.start() + p.join() + print(f'Results from test {test_name}:') + print(my_hooks.summary()) + + +def commission() -> None: + paa_path = os.path.join(DEFAULT_CHIP_ROOT, MATTER_DEVELOPMENT_PAA_ROOT_CERTS) + config = MatterTestConfig(commissioning_method="on-network", commission_only=True, discriminators=[3840], setup_passcodes=[ + 20202021], dut_node_ids=[0x12344321], paa_trust_store_path=paa_path, storage_path='admin_storage.json') + run_in_process("commission", config) + + +def one_test(test_name): + # Run a test NOT using the default main. Pass in our own runner and make sure it + # gets called back. + + # This config would be generated by the TH. PIXITs can be passed using the global_test_params["meta_config"] section. + config = MatterTestConfig(tests=[test_name], dut_node_ids=[0x12344321], storage_path='admin_storage.json') + + # TH can use get_test_info to get a list of steps and a description + list = get_test_info(HelloTest, config) + print(f'Test info for test {test_name}') + print(list) + + run_in_process(test_name, config) + + +def main(): + + # Fire up an example app to test against + # TODO: make factory reset and app path configurable, maybe the storage location too. + subprocess.call("rm -rf /tmp/chip_* /tmp/repl* admin_storage.json", shell=True) + app_path = os.path.abspath(os.path.join(DEFAULT_CHIP_ROOT, 'out', + 'linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test', 'chip-all-clusters-app')) + app_cmd = str(app_path) + app_process = subprocess.Popen([app_cmd], stdout=sys.stdout, stderr=sys.stderr, bufsize=0) + + commission() + one_test('test_failure_on_wrong_endpoint') + one_test('test_names_as_expected') + one_test('test_pics') + + app_process.send_signal(signal.SIGINT.value) + app_process.wait() + print("app stopped") + + +if __name__ == "__main__": + main() diff --git a/src/python_testing/hello_test.py b/src/python_testing/hello_test.py index d7bad4c2dc193d..8d92feb7ce3d80 100644 --- a/src/python_testing/hello_test.py +++ b/src/python_testing/hello_test.py @@ -19,11 +19,14 @@ import chip.clusters as Clusters from chip.interaction_model import Status -from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main from mobly import asserts class HelloTest(MatterBaseTest): + # This example test does not include the step_ and desc_ markers + # The MatterBaseTest will assume a single step and create a description + # based on the test name @async_test_body async def test_names_as_expected(self): dev_ctrl = self.default_controller @@ -37,8 +40,24 @@ async def test_names_as_expected(self): logging.info("Found VendorName: %s" % (vendor_name)) asserts.assert_equal(vendor_name, "TEST_VENDOR", "VendorName must be TEST_VENDOR!") + # To include individual steps and description for the TH, define a steps_ and desc_ function + # for the test, then use self.step(#) to indicate how the test proceeds through the test plan. + # Support for keeping the TH up to date is built into MatterBaseTest when you use the step() + # function. + def steps_failure_on_wrong_endpoint(self) -> list[TestStep]: + steps = [TestStep(1, "Commissioning, already done", is_commissioning=True), + TestStep(2, "Read ProductName on endpoint 9999"), + ] + return steps + + def desc_failure_on_wrong_endpoint(self) -> str: + return '#.#.#. [TC-HELLO-x.x] Test Failure On Wrong Endpoint' + @async_test_body async def test_failure_on_wrong_endpoint(self): + self.step(1) # Commissioning + + self.step(2) dev_ctrl = self.default_controller result = await self.read_single_attribute( dev_ctrl, @@ -49,6 +68,28 @@ async def test_failure_on_wrong_endpoint(self): asserts.assert_true(isinstance(result, Clusters.Attribute.ValueDecodeFailure), "Should fail to read on endpoint 9999") asserts.assert_equal(result.Reason.status, Status.UnsupportedEndpoint, "Failure reason should be UnsupportedEndpoint") + def steps_pics(self) -> list[TestStep]: + steps = [TestStep(1, "Commissioning, already done", is_commissioning=True), + TestStep(2, "Skip this step"), + TestStep(3, "Run this step") + ] + return steps + + def desc_pics(self) -> str: + return "#.#.#. [TC-HELLO-x.x] Test pics" + + @async_test_body + async def test_pics(self): + self.step(1) # commissioning + print('This should be run') + + self.step(2) + if self.pics_guard(self.check_pics('NON-EXISTANT_PICS')): + asserts.fail('This should not be run') + + self.step(3) + print('This should also be run') + if __name__ == "__main__": default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 1b3fab9b8c8a3e..e486fa0304ab58 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -18,6 +18,7 @@ import argparse import asyncio import builtins +import inspect import json import logging import math @@ -59,6 +60,13 @@ from mobly.config_parser import ENV_MOBLY_LOGPATH, TestRunConfig from mobly.test_runner import TestRunner +try: + from matter_yamltests.hooks import TestRunnerHooks +except ImportError: + class TestRunnerHooks: + pass + + # TODO: Add utility to commission a device if needed # TODO: Add utilities to keep track of controllers/fabrics @@ -224,6 +232,42 @@ def name(self) -> str: return self._name +class InternalTestRunnerHooks(TestRunnerHooks): + + def start(self, count: int): + logging.info(f'Starting test set, running {count} tests') + + def stop(self, duration: int): + logging.info(f'Finished test set, ran for {duration}ms') + + def test_start(self, filename: str, name: str, count: int): + logging.info(f'Starting test from {filename}: {name} - {count} steps') + + def test_stop(self, exception: Exception, duration: int): + logging.info(f'Finished test in {duration}ms') + + def step_skipped(self, name: str, expression: str): + # TODO: Do we really need the expression as a string? We can evaluate this in code very easily + logging.info(f'\t\t**** Skipping: {name}') + + def step_start(self, name: str): + # The way I'm calling this, the name is already includes the step number, but it seems like it might be good to separate these + logging.info(f'\t\t***** Test Step {name}') + + def step_success(self, logger, logs, duration: int, request): + pass + + def step_failure(self, logger, logs, duration: int, request, received): + # TODO: there's supposed to be some kind of error message here, but I have no idea where it's meant to come from in this API + logging.info('\t\t***** Test Failure : ') + + def step_unknown(self): + """ + This method is called when the result of running a step is unknown. For example during a dry-run. + """ + pass + + @dataclass class MatterTestConfig: storage_path: pathlib.Path = pathlib.Path(".") @@ -237,6 +281,8 @@ class MatterTestConfig: global_test_params: dict = field(default_factory=dict) # List of explicit tests to run by name. If empty, all tests will run tests: List[str] = field(default_factory=list) + timeout: typing.Union[int, None] = None + endpoint: int = 0 commissioning_method: Optional[str] = None discriminators: Optional[List[int]] = None @@ -500,12 +546,100 @@ def hex_from_bytes(b: bytes) -> str: return hexlify(b).decode("utf-8") +@dataclass +class TestStep: + test_plan_number: typing.Union[int, str] + description: str + is_commissioning: bool = False + + +@dataclass +class TestInfo: + function: str + desc: str + steps: list[TestStep] + pics: list[str] + + class MatterBaseTest(base_test.BaseTestClass): def __init__(self, *args): super().__init__(*args) # List of accumulated problems across all tests self.problems = [] + self.is_commissioning = False + + def get_test_steps(self, test: str) -> list[TestStep]: + ''' Retrieves the test step list for the given test + + Test steps are defined in the function called steps_. + ex for test test_TC_TEST_1_1, the steps are in a function called + steps_TC_TEST_1_1. + + Test that implement a steps_ function should call each step + in order using self.step(number), where number is the test_plan_number + from each TestStep. + ''' + steps = self._get_defined_test_steps(test) + return [TestStep(1, "Run entire test")] if steps is None else steps + + def _get_defined_test_steps(self, test: str) -> list[TestStep]: + steps_name = 'steps_' + test[5:] + try: + fn = getattr(self, steps_name) + return fn() + except AttributeError: + return None + + def get_test_pics(self, test: str) -> list[str]: + ''' Retrieves a list of top-level PICS that should be checked before running this test + + An empty list means the test will always be run. + + PICS are defined in a function called pics_. + ex. for test test_TC_TEST_1_1, the pics are in a function called + pics_TC_TEST_1_1. + ''' + pics = self._get_defined_pics(test) + return [] if pics is None else pics + + def _get_defined_pics(self, test: str) -> list[TestStep]: + steps_name = 'pics_' + test[5:] + try: + fn = getattr(self, steps_name) + return fn() + except AttributeError: + return None + + def get_test_desc(self, test: str) -> str: + ''' Returns a description of this test + + Test description is defined in the function called desc_. + ex for test test_TC_TEST_1_1, the steps are in a function called + desc_TC_TEST_1_1. + + Format: + [] + + ex: + 133.1.1. [TC-ACL-1.1] Global attributes + ''' + desc_name = 'desc_' + test[5:] + try: + fn = getattr(self, desc_name) + return fn() + except AttributeError: + return test + + # Override this if the test requires a different default timeout. + # This value will be overridden if a timeout is supplied on the command line. + @property + def default_timeout(self) -> int: + return 90 + + @property + def runner_hook(self) -> TestRunnerHooks: + return unstash_globally(self.user_params.get("hooks")) @property def matter_test_config(self) -> MatterTestConfig: @@ -533,6 +667,26 @@ def setup_class(self): # Mappings of cluster IDs to names and metadata. # TODO: Move to using non-generated code and rather use data model description (.matter or .xml) self.cluster_mapper = ClusterMapper(self.default_controller._Cluster) + self.current_step_index = 0 + self.step_start_time = datetime.now(timezone.utc) + self.step_skipped = False + + def setup_test(self): + self.current_step_index = 0 + self.step_start_time = datetime.now(timezone.utc) + self.step_skipped = False + if self.runner_hook and not self.is_commissioning: + test_name = self.current_test_info.name + steps = self._get_defined_test_steps(test_name) + num_steps = 1 if steps is None else len(steps) + filename = inspect.getfile(self.__class__) + desc = self.get_test_desc(test_name) + self.runner_hook.test_start(filename=filename, name=desc, count=num_steps) + # If we don't have defined steps, we're going to start the one and only step now + # if there are steps defined by the test, rely on the test calling the step() function + # to indicates how it is proceeding + if steps is None: + self.step(1) def teardown_class(self): """Final teardown after all tests: log all problems""" @@ -561,11 +715,13 @@ async def read_single_attribute( async def read_single_attribute_check_success( self, cluster: Clusters.ClusterObjects.ClusterCommand, attribute: Clusters.ClusterObjects.ClusterAttributeDescriptor, - dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0) -> object: + dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = None) -> object: if dev_ctrl is None: dev_ctrl = self.default_controller if node_id is None: node_id = self.dut_node_id + if endpoint is None: + endpoint = self.matter_test_config.endpoint result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)]) attr_ret = result[endpoint][cluster][attribute] @@ -579,11 +735,13 @@ async def read_single_attribute_check_success( async def read_single_attribute_expect_error( self, cluster: object, attribute: object, - error: Status, dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0) -> object: + error: Status, dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = None) -> object: if dev_ctrl is None: dev_ctrl = self.default_controller if node_id is None: node_id = self.dut_node_id + if endpoint is None: + endpoint = self.matter_test_config.endpoint result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)]) attr_ret = result[endpoint][cluster][attribute] @@ -596,12 +754,14 @@ async def read_single_attribute_expect_error( async def send_single_cmd( self, cmd: Clusters.ClusterObjects.ClusterCommand, - dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0, + dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = None, timedRequestTimeoutMs: typing.Union[None, int] = None) -> object: if dev_ctrl is None: dev_ctrl = self.default_controller if node_id is None: node_id = self.dut_node_id + if endpoint is None: + endpoint = self.matter_test_config.endpoint result = await dev_ctrl.SendCommand(nodeid=node_id, endpoint=endpoint, payload=cmd, timedRequestTimeoutMs=timedRequestTimeoutMs) return result @@ -618,6 +778,90 @@ def record_warning(self, test_name: str, location: Union[AttributePathLocation, def record_note(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation], problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.NOTE, problem, spec_location)) + def on_fail(self, record): + ''' Called by Mobly on test failure + + record is of type TestResultRecord + ''' + if self.runner_hook and not self.is_commissioning: + exception = record.termination_signal.exception + step_duration = (datetime.now(timezone.utc) - self.step_start_time) / timedelta(microseconds=1) + # This isn't QUITE the test duration because the commissioning is handled separately, but it's clsoe enough for now + # This is already given in milliseconds + test_duration = record.end_time - record.begin_time + # TODO: I have no idea what logger, logs, request or received are. Hope None works because I have nothing to give + self.runner_hook.step_failure(logger=None, logs=None, duration=step_duration, request=None, received=None) + self.runner_hook.test_stop(exception=exception, duration=test_duration) + + def on_pass(self, record): + ''' Called by Mobly on test pass + + record is of type TestResultRecord + ''' + if self.runner_hook and not self.is_commissioning: + # What is request? This seems like an implementation detail for the runner + # TODO: As with failure, I have no idea what logger, logs or request are meant to be + step_duration = (datetime.now(timezone.utc) - self.step_start_time) / timedelta(microseconds=1) + test_duration = record.end_time - record.begin_time + self.runner_hook.step_success(logger=None, logs=None, duration=step_duration, request=None) + + # TODO: this check could easily be annoying when doing dev. flag it somehow? Ditto with the in-order check + steps = self._get_defined_test_steps(record.test_name) + if steps is None: + # if we don't have a list of steps, assume they were all run + all_steps_run = True + else: + all_steps_run = len(steps) == self.current_step_index + + if not all_steps_run: + # The test is done, but we didn't execute all the steps + asserts.fail("Test script error: Not all required steps were run") + + if self.runner_hook and not self.is_commissioning: + self.runner_hook.test_stop(exception=None, duration=test_duration) + + def pics_guard(self, pics_condition: bool): + if not pics_condition: + try: + steps = self.get_test_steps() + num = steps[self.current_step_index].test_plan_number + except KeyError: + num = self.current_step_index + + if self.runner_hook: + # TODO: what does name represent here? The wordy test name? The test plan number? The number and name? + # TODO: I very much do not want to have people passing in strings here. Do we really need the expression + # as a string? Does it get used by the TH? + self.runner_hook.step_skipped(name=str(num), expression="") + else: + logging.info(f'**** Skipping: {num}') + self.step_skipped = True + + def step(self, step: typing.Union[int, str]): + test_name = sys._getframe().f_back.f_code.co_name + steps = self.get_test_steps(test_name) + + # TODO: this might be annoying during dev. Remove? Flag? + if len(steps) <= self.current_step_index or steps[self.current_step_index].test_plan_number != step: + asserts.fail(f'Unexpected test step: {step} - steps not called in order, or step does not exist') + + if self.runner_hook: + # If we've reached the next step with no assertion and the step wasn't skipped, it passed + if not self.step_skipped and self.current_step_index != 0: + # TODO: As with failure, I have no idea what loger, logs or request are meant to be + step_duration = (datetime.now(timezone.utc) - self.step_start_time) / timedelta(microseconds=1) + self.runner_hook.step_success(logger=None, logs=None, duration=step_duration, request=None) + + # TODO: it seems like the step start should take a number and a name + name = f'{step} : {steps[self.current_step_index].description}' + self.runner_hook.step_start(name=name) + else: + self.print_step(step, steps[self.current_step_index].description) + + self.step_start_time = datetime.now(tz=timezone.utc) + self.current_step_index = self.current_step_index + 1 + self.step_skipped = False + def get_setup_payload_info(self) -> SetupPayloadInfo: if self.matter_test_config.qr_code_content is not None: qr_code = self.matter_test_config.qr_code_content @@ -919,6 +1163,8 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig: config.ble_interface_id = args.ble_interface_id config.pics = {} if args.PICS is None else read_pics_from_file(args.PICS) config.tests = [] if args.tests is None else args.tests + config.timeout = args.timeout # This can be none, we pull the default from the test if it's unspecified + config.endpoint = 0 if args.endpoint is None else args.endpoint config.controller_node_id = args.controller_node_id config.trace_to = args.trace_to @@ -966,10 +1212,12 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig: metavar='NODE_ID', default=_DEFAULT_CONTROLLER_NODE_ID, help='NodeID to use for initial/default controller (default: %d)' % _DEFAULT_CONTROLLER_NODE_ID) - basic_group.add_argument('-n', '--dut-node-id', type=int_decimal_or_hex, + basic_group.add_argument('-n', '--dut-node-id', '--nodeId', type=int_decimal_or_hex, metavar='NODE_ID', dest='dut_node_ids', default=[_DEFAULT_DUT_NODE_ID], help='Node ID for primary DUT communication, ' 'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID, nargs="+") + basic_group.add_argument('--endpoint', type=int, default=0, help="Endpoint under test") + basic_group.add_argument('--timeout', type=int, help="Test timeout in seconds") basic_group.add_argument("--PICS", help="PICS file path", type=str) commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node") @@ -1062,8 +1310,11 @@ def async_test_body(body): synchronously, we need a mechanism to allow an `async def` to be converted to a asyncio-run synchronous method. This decorator does the wrapping. """ - def async_runner(*args, **kwargs): - return asyncio.run(body(*args, **kwargs)) + + def async_runner(self: MatterBaseTest, *args, **kwargs): + timeout = self.matter_test_config.timeout if self.matter_test_config.timeout is not None else self.default_timeout + runner_with_timeout = asyncio.wait_for(body(self, *args, **kwargs), timeout=timeout) + return asyncio.run(runner_with_timeout) return async_runner @@ -1071,6 +1322,10 @@ def async_runner(*args, **kwargs): class CommissionDeviceTest(MatterBaseTest): """Test class auto-injected at the start of test list to commission a device when requested""" + def __init__(self, *args): + super().__init__(*args) + self.is_commissioning = True + def test_run_commissioning(self): conf = self.matter_test_config for commission_idx, node_id in enumerate(conf.dut_node_ids): @@ -1140,6 +1395,7 @@ def default_matter_test_main(argv=None, **kwargs): Args: argv: A list that is then parsed as command line args. If None, defaults to sys.argv """ + matter_test_config = parse_matter_test_args(argv) # Allow override of command line from optional arguments @@ -1149,6 +1405,38 @@ def default_matter_test_main(argv=None, **kwargs): # Find the test class in the test script. test_class = _find_test_class() + # This is required in case we need any testing with maximized certificate chains. + # We need *all* issuers from the start, even for default controller, to use + # maximized chains, before MatterStackState init, others some stale certs + # may not chain properly. + if "maximize_cert_chains" in kwargs: + matter_test_config.maximize_cert_chains = kwargs["maximize_cert_chains"] + + hooks = InternalTestRunnerHooks() + + run_tests(test_class, matter_test_config, hooks) + + +def get_test_info(test_class: MatterBaseTest, matter_test_config: MatterTestConfig) -> list[TestInfo]: + test_config = generate_mobly_test_config(matter_test_config) + base = test_class(test_config) + + if len(matter_test_config.tests) > 0: + tests = matter_test_config.tests + else: + tests = base.get_existing_test_names() + + info = [] + for t in tests: + info.append(TestInfo(t, steps=base.get_test_steps(t), desc=base.get_test_desc(t), pics=base.get_test_pics(t))) + + return info + + +def run_tests(test_class: MatterBaseTest, matter_test_config: MatterTestConfig, hooks: TestRunnerHooks) -> None: + + get_test_info(test_class, matter_test_config) + # Load test config file. test_config = generate_mobly_test_config(matter_test_config) @@ -1157,13 +1445,6 @@ def default_matter_test_main(argv=None, **kwargs): if len(matter_test_config.tests) > 0: tests = matter_test_config.tests - # This is required in case we need any testing with maximized certificate chains. - # We need *all* issuers from the start, even for default controller, to use - # maximized chains, before MatterStackState init, others some stale certs - # may not chain properly. - if "maximize_cert_chains" in kwargs: - matter_test_config.maximize_cert_chains = kwargs["maximize_cert_chains"] - stack = MatterStackState(matter_test_config) with TracingContext() as tracing_ctx: @@ -1183,6 +1464,10 @@ def default_matter_test_main(argv=None, **kwargs): test_config.user_params["default_controller"] = stash_globally(default_controller) test_config.user_params["matter_test_config"] = stash_globally(matter_test_config) + test_config.user_params["hooks"] = stash_globally(hooks) + + # Execute the test class with the config + ok = True test_config.user_params["certificate_authority_manager"] = stash_globally(stack.certificate_authority_manager) @@ -1200,15 +1485,29 @@ def default_matter_test_main(argv=None, **kwargs): if not matter_test_config.commission_only: runner.add_test_class(test_config, test_class, tests) + if hooks: + # Right now, we only support running a single test class at once, + # but it's relatively easy to exapand that to make the test process faster + # TODO: support a list of tests + hooks.start(count=1) + # Mobly gives the test run time in seconds, lets be a bit more precise + runner_start_time = datetime.now(timezone.utc) + try: runner.run() ok = runner.results.is_all_pass and ok + except TimeoutError: + ok = False except signals.TestAbortAll: ok = False except Exception: logging.exception('Exception when executing %s.', test_config.testbed_name) ok = False + if hooks: + duration = (datetime.now(timezone.utc) - runner_start_time) / timedelta(microseconds=1) + hooks.stop(duration=duration) + # Shutdown the stack when all done stack.Shutdown()