diff --git a/scripts/py_matter_yamltests/matter_yamltests/hooks.py b/scripts/py_matter_yamltests/matter_yamltests/hooks.py index ca739b8ea27633..d14e90a7aabe90 100644 --- a/scripts/py_matter_yamltests/matter_yamltests/hooks.py +++ b/scripts/py_matter_yamltests/matter_yamltests/hooks.py @@ -152,7 +152,7 @@ def step_skipped(self, name: str, expression: str): """ pass - def step_start(self, request: TestStep): + def step_start(self, request: TestStep, endpoint: Optional[int] = None): """ This method is called when the runner starts running a step from the test. @@ -160,6 +160,8 @@ def step_start(self, request: TestStep): ---------- request: TestStep The original request as defined by the test step. + endpoint: int + An optional device endpoint the step will target. """ pass diff --git a/src/python_testing/TC_ACE_1_3.py b/src/python_testing/TC_ACE_1_3.py index 3c64171571eb95..3869ed23f28cff 100644 --- a/src/python_testing/TC_ACE_1_3.py +++ b/src/python_testing/TC_ACE_1_3.py @@ -47,16 +47,16 @@ async def write_acl(self, acl): asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed") print(result) - async def read_descriptor_expect_success(self, th): + async def read_descriptor_expect_success(self, th, endpoint: int): cluster = Clusters.Objects.Descriptor attribute = Clusters.Descriptor.Attributes.DeviceTypeList - await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute) + await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=endpoint, cluster=cluster, attribute=attribute) - async def read_descriptor_expect_unsupported_access(self, th): + async def read_descriptor_expect_unsupported_access(self, th, endpoint: int): cluster = Clusters.Objects.Descriptor attribute = Clusters.Descriptor.Attributes.DeviceTypeList await self.read_single_attribute_expect_error( - dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess) + dev_ctrl=th, endpoint=endpoint, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess) def desc_TC_ACE_1_3(self) -> str: return "[TC-ACE-1.3] Subjects" @@ -137,6 +137,8 @@ async def test_TC_ACE_1_3(self): cat2v3 = cat2_id | 0x0003 logging.info('cat1v1 0x%x', cat1v1) + endpoint = 0 + self.step(1) fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0] @@ -156,266 +158,266 @@ async def test_TC_ACE_1_3(self): paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path), catTags=[cat1v1, cat2v2]) - self.step(2) + self.step(2, endpoint) TH0_admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH0_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=0x001f)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint, cluster=0x001f)]) all_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, all_view] await self.write_acl(acl) - self.step(3) - await self.read_descriptor_expect_success(TH1) + self.step(3, endpoint) + await self.read_descriptor_expect_success(TH1, endpoint) - self.step(4) - await self.read_descriptor_expect_success(TH2) + self.step(4, endpoint) + await self.read_descriptor_expect_success(TH2, endpoint) - self.step(5) - await self.read_descriptor_expect_success(TH3) + self.step(5, endpoint) + await self.read_descriptor_expect_success(TH3, endpoint) - self.step(6) + self.step(6, endpoint) th1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th1_view] await self.write_acl(acl) - self.step(7) - await self.read_descriptor_expect_success(TH1) + self.step(7, endpoint) + await self.read_descriptor_expect_success(TH1, endpoint) - self.step(8) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(8, endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint) - self.step(9) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(9, endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint) - self.step(10) + self.step(10, endpoint) th2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH2_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th2_view] await self.write_acl(acl) - self.step(11) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(11, endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint) - self.step(12) - await self.read_descriptor_expect_success(TH2) + self.step(12, endpoint) + await self.read_descriptor_expect_success(TH2, endpoint) - self.step(13) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(13, endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint) - self.step(14) + self.step(14, endpoint) th3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th3_view] await self.write_acl(acl) - self.step(15) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(15, endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint) - self.step(16) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(16, endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint) - self.step(17) - await self.read_descriptor_expect_success(TH3) + self.step(17, endpoint) + await self.read_descriptor_expect_success(TH3, endpoint) - self.step(18) + self.step(18, endpoint) th12_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid, TH2_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th12_view] await self.write_acl(acl) - self.step(19) - await self.read_descriptor_expect_success(TH1) + self.step(19, endpoint) + await self.read_descriptor_expect_success(TH1, endpoint) - self.step(20) - await self.read_descriptor_expect_success(TH2) + self.step(20, endpoint) + await self.read_descriptor_expect_success(TH2, endpoint) - self.step(21) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(21, endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint) - self.step(22) + self.step(22, endpoint) th13_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid, TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th13_view] await self.write_acl(acl) - self.step(23) - await self.read_descriptor_expect_success(TH1) + self.step(23, endpoint) + await self.read_descriptor_expect_success(TH1, endpoint) - self.step(24) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(24, endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint) - self.step(25) - await self.read_descriptor_expect_success(TH3) + self.step(25, endpoint) + await self.read_descriptor_expect_success(TH3, endpoint) - self.step(26) + self.step(26, endpoint) th23_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH2_nodeid, TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th23_view] await self.write_acl(acl) - self.step(27) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(27, endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint) - self.step(28) - await self.read_descriptor_expect_success(TH2) + self.step(28, endpoint) + await self.read_descriptor_expect_success(TH2, endpoint) - self.step(29) - await self.read_descriptor_expect_success(TH3) + self.step(29, endpoint) + await self.read_descriptor_expect_success(TH3, endpoint) - self.step(30) + self.step(30, endpoint) th123_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid, TH2_nodeid, TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th123_view] await self.write_acl(acl) - self.step(31) - await self.read_descriptor_expect_success(TH1) + self.step(31, endpoint) + await self.read_descriptor_expect_success(TH1, endpoint) - self.step(32) - await self.read_descriptor_expect_success(TH2) + self.step(32, endpoint) + await self.read_descriptor_expect_success(TH2, endpoint) - self.step(33) - await self.read_descriptor_expect_success(TH3) + self.step(33, endpoint) + await self.read_descriptor_expect_success(TH3, endpoint) - self.step(34) + self.step(34, endpoint) cat1v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat1v1)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat1v1_view] await self.write_acl(acl) - self.step(35) - await self.read_descriptor_expect_success(TH1) + self.step(35, endpoint) + await self.read_descriptor_expect_success(TH1, endpoint) - self.step(36) - await self.read_descriptor_expect_success(TH2) + self.step(36, endpoint) + await self.read_descriptor_expect_success(TH2, endpoint) - self.step(37) - await self.read_descriptor_expect_success(TH3) + self.step(37, endpoint) + await self.read_descriptor_expect_success(TH3, endpoint) - self.step(38) + self.step(38, endpoint) cat1v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat1v2)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat1v2_view] await self.write_acl(acl) - self.step(39) - await self.read_descriptor_expect_success(TH1) + self.step(39, endpoint) + await self.read_descriptor_expect_success(TH1, endpoint) - self.step(40) - await self.read_descriptor_expect_success(TH2) + self.step(40, endpoint) + await self.read_descriptor_expect_success(TH2, endpoint) - self.step(41) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(41, endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint) - self.step(42) + self.step(42, endpoint) cat1v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat1v3)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat1v3_view] await self.write_acl(acl) - self.step(43) - await self.read_descriptor_expect_success(TH1) + self.step(43, endpoint) + await self.read_descriptor_expect_success(TH1, endpoint) - self.step(44) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(44, endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint) - self.step(45) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(45, endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint) - self.step(46) + self.step(46, endpoint) cat2v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat2v1)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat2v1_view] await self.write_acl(acl) - self.step(47) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(47, endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint) - self.step(48) - await self.read_descriptor_expect_success(TH2) + self.step(48, endpoint) + await self.read_descriptor_expect_success(TH2, endpoint) - self.step(49) - await self.read_descriptor_expect_success(TH3) + self.step(49, endpoint) + await self.read_descriptor_expect_success(TH3, endpoint) - self.step(50) + self.step(50, endpoint) cat2v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat2v2)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat2v2_view] await self.write_acl(acl) - self.step(51) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(51, endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint) - self.step(52) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(52, endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint) - self.step(53) - await self.read_descriptor_expect_success(TH3) + self.step(53, endpoint) + await self.read_descriptor_expect_success(TH3, endpoint) - self.step(54) + self.step(54, endpoint) cat2v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat2v3)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat2v3_view] await self.write_acl(acl) - self.step(55) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(55, endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint) - self.step(56) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(56, endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint) - self.step(57) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(57, endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint) self.step(58) diff --git a/src/python_testing/TC_BOOLCFG_2_1.py b/src/python_testing/TC_BOOLCFG_2_1.py index 484d590b258b6d..1eae9fc5dedbcc 100644 --- a/src/python_testing/TC_BOOLCFG_2_1.py +++ b/src/python_testing/TC_BOOLCFG_2_1.py @@ -74,12 +74,12 @@ async def test_TC_BOOLCFG_2_1(self): self.step(1) attributes = Clusters.BooleanStateConfiguration.Attributes - self.step(2) + self.step(2, endpoint) attribute_list = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) number_of_supported_levels = 0 - self.step(3) + self.step(3, endpoint) if attributes.SupportedSensitivityLevels.attribute_id in attribute_list: number_of_supported_levels = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.SupportedSensitivityLevels) asserts.assert_less_equal(number_of_supported_levels, 10, "SupportedSensitivityLevels attribute is out of range") @@ -87,7 +87,7 @@ async def test_TC_BOOLCFG_2_1(self): else: logging.info("Test step skipped") - self.step(4) + self.step(4, endpoint) if attributes.CurrentSensitivityLevel.attribute_id in attribute_list: current_sensitivity_level_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.CurrentSensitivityLevel) asserts.assert_less_equal(current_sensitivity_level_dut, number_of_supported_levels, @@ -95,7 +95,7 @@ async def test_TC_BOOLCFG_2_1(self): else: logging.info("Test step skipped") - self.step(5) + self.step(5, endpoint) if attributes.DefaultSensitivityLevel.attribute_id in attribute_list: default_sensitivity_level_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.DefaultSensitivityLevel) asserts.assert_less_equal(default_sensitivity_level_dut, number_of_supported_levels, @@ -103,35 +103,35 @@ async def test_TC_BOOLCFG_2_1(self): else: logging.info("Test step skipped") - self.step(6) + self.step(6, endpoint) if attributes.AlarmsActive.attribute_id in attribute_list: alarms_active_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsActive) asserts.assert_equal(alarms_active_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsActive is not in valid range") else: logging.info("Test step skipped") - self.step(7) + self.step(7, endpoint) if attributes.AlarmsSuppressed.attribute_id in attribute_list: alarms_suppressed_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsSuppressed) asserts.assert_equal(alarms_suppressed_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsSuppressed is not in valid range") else: logging.info("Test step skipped") - self.step(8) + self.step(8, endpoint) if attributes.AlarmsEnabled.attribute_id in attribute_list: alarms_enabled_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsEnabled) asserts.assert_equal(alarms_enabled_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsEnabled is not in valid range") else: logging.info("Test step skipped") - self.step(9) + self.step(9, endpoint) if attributes.AlarmsSupported.attribute_id in attribute_list: alarms_supported_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsSupported) asserts.assert_equal(alarms_supported_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsSupported is not in valid range") else: logging.info("Test step skipped") - self.step(10) + self.step(10, endpoint) if attributes.SensorFault.attribute_id in attribute_list: sensor_fault_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.SensorFault) asserts.assert_equal(sensor_fault_dut & ~all_sensor_fault_bitmap_bits, 0, "SensorFault is not in valid range") diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index c5d46e2306dae2..dc6a40caa4c5da 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -584,9 +584,9 @@ def step_skipped(self, name: str, expression: str): # TODO: Do we really need the expression as a string? We can evaluate this in code very easily logging.info(f'\t\t**** Skipping: {name}') - def step_start(self, name: str): - # The way I'm calling this, the name is already includes the step number, but it seems like it might be good to separate these - logging.info(f'\t\t***** Test Step {name}') + def step_start(self, name: str, endpoint: int | None = None): + # TODO: The way I'm calling this, the name already includes the step number, but it seems like it might be good to separate these + logging.info(f'\t\t***** Test Step {name} started with endpoint {endpoint} ') def step_success(self, logger, logs, duration: int, request): pass @@ -915,6 +915,7 @@ def hex_from_bytes(b: bytes) -> str: class TestStep: test_plan_number: typing.Union[int, str] description: str + endpoint: int | None = None expectation: str = "" is_commissioning: bool = False @@ -1280,8 +1281,9 @@ async def check_test_event_triggers_enabled(self): test_event_enabled = await self.read_single_attribute_check_success(endpoint=0, cluster=cluster, attribute=full_attr) asserts.assert_equal(test_event_enabled, True, "TestEventTriggersEnabled is False") - def print_step(self, stepnum: typing.Union[int, str], title: str) -> None: - logging.info(f'***** Test Step {stepnum} : {title}') + def print_step(self, stepnum: typing.Union[int, str], title: str, endpoint: int | None = None) -> None: + endpoint_info = f" with endpoint {endpoint}" if endpoint is not None else "" + logging.info(f'***** Test Step {stepnum} : {title}{endpoint_info}') def record_error(self, test_name: str, location: ProblemLocation, problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.ERROR, problem, spec_location)) @@ -1451,7 +1453,7 @@ def skip_all_remaining_steps(self, starting_step_number): for step in remaining: self.skip_step(step.test_plan_number) - def step(self, step: typing.Union[int, str]): + def step(self, step: typing.Union[int, str], endpoint: Optional[int] = None): test_name = self.current_test_info.name steps = self.get_test_steps(test_name) @@ -1459,6 +1461,7 @@ def step(self, step: typing.Union[int, str]): if len(steps) <= self.current_step_index or steps[self.current_step_index].test_plan_number != step: asserts.fail(f'Unexpected test step: {step} - steps not called in order, or step does not exist') + current_step = steps[self.current_step_index] if self.runner_hook: # If we've reached the next step with no assertion and the step wasn't skipped, it passed if not self.step_skipped and self.current_step_index != 0: @@ -1466,14 +1469,18 @@ def step(self, step: typing.Union[int, str]): step_duration = (datetime.now(timezone.utc) - self.step_start_time) / timedelta(microseconds=1) self.runner_hook.step_success(logger=None, logs=None, duration=step_duration, request=None) + current_step.endpoint = endpoint + # TODO: it seems like the step start should take a number and a name - name = f'{step} : {steps[self.current_step_index].description}' - self.runner_hook.step_start(name=name) + name = f'{step} : {current_step.description}' + + self.print_step(step, current_step.description, endpoint) + self.runner_hook.step_start(name=name, endpoint=current_step.endpoint) else: - self.print_step(step, steps[self.current_step_index].description) + self.print_step(step, current_step.description) self.step_start_time = datetime.now(tz=timezone.utc) - self.current_step_index = self.current_step_index + 1 + self.current_step_index += 1 self.step_skipped = False def get_setup_payload_info(self) -> List[SetupPayloadInfo]: diff --git a/src/python_testing/test_testing/TestDecorators.py b/src/python_testing/test_testing/TestDecorators.py index 1ad5bc550bc237..82b49eddd86953 100644 --- a/src/python_testing/test_testing/TestDecorators.py +++ b/src/python_testing/test_testing/TestDecorators.py @@ -86,7 +86,7 @@ def test_stop(self, exception: Exception, duration: int): def step_skipped(self, name: str, expression: str): pass - def step_start(self, name: str): + def step_start(self, name: str, endpoint: Optional[int] = None): pass def step_success(self, logger, logs, duration: int, request):