diff --git a/src/python_testing/TC_OpstateCommon.py b/src/python_testing/TC_OpstateCommon.py index 557b7606ccbea3..1a4cfbe7150e30 100644 --- a/src/python_testing/TC_OpstateCommon.py +++ b/src/python_testing/TC_OpstateCommon.py @@ -210,8 +210,8 @@ def STEPS_TC_OPSTATE_BASE_1_1(self) -> list[TestStep]: async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature_map=0): cluster = self.test_info.cluster attributes = cluster.Attributes - events = cluster.Events commands = cluster.Commands + events = cluster.Events self.init_test() @@ -245,7 +245,7 @@ async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature attributes.ClusterRevision.attribute_id ] - if self.check_pics(f"{self.test_info.pics_code}.S.A0002"): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.CountdownTime): expected_value.append(attributes.CountdownTime.attribute_id) await self.read_and_expect_array_contains(endpoint=endpoint, @@ -259,7 +259,7 @@ async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature events.OperationalError.event_id, ] - if self.check_pics(f"{self.test_info.pics_code}.S.E01"): + if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.E01")): expected_value.append(events.OperationCompletion.event_id) await self.read_and_expect_array_contains(endpoint=endpoint, @@ -270,19 +270,19 @@ async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature self.step(6) expected_value = [] - if (self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Pause)) or + (await self.command_guard(endpoint=endpoint, command=commands.Resume))): expected_value.append(commands.Pause.command_id) - if (self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Stop)) or + (await self.command_guard(endpoint=endpoint, command=commands.Start))): expected_value.append(commands.Stop.command_id) - if self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp"): + if await self.command_guard(endpoint=endpoint, command=commands.Start): expected_value.append(commands.Start.command_id) - if (self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Pause)) or + (await self.command_guard(endpoint=endpoint, command=commands.Resume))): expected_value.append(commands.Resume.command_id) await self.read_and_expect_array_contains(endpoint=endpoint, @@ -293,10 +293,10 @@ async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature self.step(7) expected_value = [] - if (self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") or - self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Pause)) or + (await self.command_guard(endpoint=endpoint, command=commands.Resume)) or + (await self.command_guard(endpoint=endpoint, command=commands.Stop)) or + (await self.command_guard(endpoint=endpoint, command=commands.Start))): expected_value.append(commands.OperationalCommandResponse.command_id) await self.read_and_expect_array_contains(endpoint=endpoint, @@ -344,7 +344,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 2: TH reads from the DUT the PhaseList attribute self.step(2) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0000")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.PhaseList): phase_list = await self.read_expect_success(endpoint=endpoint, attribute=attributes.PhaseList) if phase_list is not NullValue: @@ -354,7 +354,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 3: TH reads from the DUT the CurrentPhase attribute self.step(3) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0001")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.CurrentPhase): current_phase = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CurrentPhase) if (phase_list == NullValue) or (not phase_list): @@ -366,7 +366,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 4: TH reads from the DUT the CountdownTime attribute self.step(4) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.CountdownTime): countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) if countdown_time is not NullValue: @@ -375,7 +375,7 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 5: TH reads from the DUT the OperationalStateList attribute self.step(5) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0003")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.OperationalStateList): operational_state_list = await self.read_expect_success(endpoint=endpoint, attribute=attributes.OperationalStateList) defined_states = [state.value for state in cluster.Enums.OperationalStateEnum @@ -396,73 +396,72 @@ async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1): # STEP 6: TH reads from the DUT the OperationalState attribute self.step(6) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - operational_state = await self.read_expect_success(endpoint=endpoint, - attribute=attributes.OperationalState) - in_range = (0x80 <= operational_state <= 0xBF) - asserts.assert_true(operational_state in defined_states or in_range, - "OperationalState has an invalid ID value!") - - # STEP 6a: Manually put the device in the Stopped(0x00) operational state - self.step("6a") - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_STOPPED")): - self.send_manual_or_pipe_command(name="OperationalStateChange", - device=self.device, - operation="Stop") - # STEP 6b: TH reads from the DUT the OperationalState attribute - self.step("6b") - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kStopped) - else: - self.skip_step("6b") + operational_state = await self.read_expect_success(endpoint=endpoint, + attribute=attributes.OperationalState) + in_range = (0x80 <= operational_state <= 0xBF) + asserts.assert_true(operational_state in defined_states or in_range, + "OperationalState has an invalid ID value!") + + # STEP 6a: Manually put the device in the Stopped(0x00) operational state + self.step("6a") + if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_STOPPED")): + self.send_manual_or_pipe_command(name="OperationalStateChange", + device=self.device, + operation="Stop") + # STEP 6b: TH reads from the DUT the OperationalState attribute + self.step("6b") + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kStopped) + else: + self.skip_step("6b") - # STEP 6c: Manually put the device in the Running(0x01) operational state - self.step("6c") - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")): - self.send_manual_or_pipe_command(name="OperationalStateChange", - device=self.device, - operation="Start") - # STEP 6d: TH reads from the DUT the OperationalState attribute - self.step("6d") - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kRunning) - else: - self.skip_step("6d") + # STEP 6c: Manually put the device in the Running(0x01) operational state + self.step("6c") + if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")): + self.send_manual_or_pipe_command(name="OperationalStateChange", + device=self.device, + operation="Start") + # STEP 6d: TH reads from the DUT the OperationalState attribute + self.step("6d") + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kRunning) + else: + self.skip_step("6d") - # STEP 6e: Manually put the device in the Paused(0x02) operational state - self.step("6e") - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_PAUSED")): - self.send_manual_or_pipe_command(name="OperationalStateChange", - device=self.device, - operation="Pause") - # STEP 6f: TH reads from the DUT the OperationalState attribute - self.step("6f") - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kPaused) - else: - self.skip_step("6f") + # STEP 6e: Manually put the device in the Paused(0x02) operational state + self.step("6e") + if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_PAUSED")): + self.send_manual_or_pipe_command(name="OperationalStateChange", + device=self.device, + operation="Pause") + # STEP 6f: TH reads from the DUT the OperationalState attribute + self.step("6f") + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kPaused) + else: + self.skip_step("6f") - # STEP 6g: Manually put the device in the Error(0x03) operational state - self.step("6g") - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_ERROR")): - self.send_manual_or_pipe_command(name="OperationalStateChange", - device=self.device, - operation="OnFault", - param=cluster.Enums.ErrorStateEnum.kUnableToStartOrResume) - # STEP 6h: TH reads from the DUT the OperationalState attribute - self.step("6h") - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kError) - else: - self.skip_step("6h") + # STEP 6g: Manually put the device in the Error(0x03) operational state + self.step("6g") + if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_ERROR")): + self.send_manual_or_pipe_command(name="OperationalStateChange", + device=self.device, + operation="OnFault", + param=cluster.Enums.ErrorStateEnum.kUnableToStartOrResume) + # STEP 6h: TH reads from the DUT the OperationalState attribute + self.step("6h") + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kError) + else: + self.skip_step("6h") # STEP 7: TH reads from the DUT the OperationalError attribute self.step(7) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0005")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.OperationalError): operational_error = await self.read_expect_success(endpoint=endpoint, attribute=attributes.OperationalError) # Defined Errors @@ -566,7 +565,9 @@ def STEPS_TC_OPSTATE_BASE_2_2(self) -> list[TestStep]: async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): cluster = self.test_info.cluster attributes = cluster.Attributes + commands = cluster.Commands + generated_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attributes.GeneratedCommandList) self.init_test() @@ -595,7 +596,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 3: TH reads from the DUT the OperationalStateList attribute self.step(3) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0003")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.OperationalStateList): operational_state_list = await self.read_expect_success(endpoint=endpoint, attribute=attributes.OperationalStateList) @@ -610,22 +611,20 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 4: TH sends Start command to the DUT self.step(4) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Start)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Start(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 5: TH reads from the DUT the OperationalState attribute self.step(5) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kRunning) + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kRunning) # STEP 6: TH reads from the DUT the OperationalError attribute self.step(6) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0005")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.OperationalError): await self.read_and_expect_property_value(endpoint=endpoint, attribute=attributes.OperationalError, attr_property="errorStateID", @@ -633,7 +632,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 7: TH reads from the DUT the CountdownTime attribute self.step(7) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.CountdownTime): initial_countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) if initial_countdown_time is not NullValue: @@ -642,7 +641,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 8: TH reads from the DUT the PhaseList attribute self.step(8) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0000")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.PhaseList): phase_list = await self.read_expect_success(endpoint=endpoint, attribute=attributes.PhaseList) phase_list_len = 0 @@ -653,7 +652,7 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 9: TH reads from the DUT the CurrentPhase attribute self.step(9) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0001")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.CurrentPhase): current_phase = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CurrentPhase) if (phase_list == NullValue) or (not phase_list): @@ -666,12 +665,12 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 10: TH waits for {PIXIT.WAITTIME.COUNTDOWN} self.step(10) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.CountdownTime): time.sleep(wait_time) # STEP 11: TH reads from the DUT the CountdownTime attribute self.step(11) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.CountdownTime): countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) @@ -683,31 +682,27 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 12: TH sends Start command to the DUT self.step(12) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Start)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Start(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 13: TH sends Stop command to the DUT self.step(13) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Stop)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Stop(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 14: TH reads from the DUT the OperationalState attribute self.step(14) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kStopped) + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kStopped) # STEP 15: TH sends Stop command to the DUT self.step(15) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Stop)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Stop(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) @@ -722,9 +717,9 @@ async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1): # STEP 17: TH sends Start command to the DUT self.step(17) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ERR_UNABLE_TO_START_OR_RESUME") and - self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if self.pics_guard((self.check_pics(f"{self.test_info.pics_code}.S.M.ERR_UNABLE_TO_START_OR_RESUME")) and + ((await self.command_guard(endpoint=endpoint, command=commands.Start)) and + (commands.OperationalCommandResponse.command_id in generated_cmd_list))): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Start(), expected_response=cluster.Enums.ErrorStateEnum.kUnableToStartOrResume) @@ -757,7 +752,9 @@ def STEPS_TC_OPSTATE_BASE_2_3(self) -> list[TestStep]: async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): cluster = self.test_info.cluster attributes = cluster.Attributes + commands = cluster.Commands + generated_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attributes.GeneratedCommandList) self.init_test() @@ -786,37 +783,34 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 3: TH reads from the DUT the OperationalStateList attribute self.step(3) - if self.pics_guard(self.check_pics((f"{self.test_info.pics_code}.S.A0003"))): - operational_state_list = await self.read_expect_success(endpoint=endpoint, - attribute=attributes.OperationalStateList) + operational_state_list = await self.read_expect_success(endpoint=endpoint, + attribute=attributes.OperationalStateList) - operational_state_list_ids = [op_state.operationalStateID for op_state in operational_state_list] + operational_state_list_ids = [op_state.operationalStateID for op_state in operational_state_list] - defined_states = [state.value for state in cluster.Enums.OperationalStateEnum - if state != cluster.Enums.OperationalStateEnum.kUnknownEnumValue] + defined_states = [state.value for state in cluster.Enums.OperationalStateEnum + if state != cluster.Enums.OperationalStateEnum.kUnknownEnumValue] - for state in defined_states: - if state not in operational_state_list_ids: - asserts.fail(f"The list shall include structs with the following OperationalStateIds: {defined_states}") + for state in defined_states: + if state not in operational_state_list_ids: + asserts.fail(f"The list shall include structs with the following OperationalStateIds: {defined_states}") # STEP 4: TH sends Pause command to the DUT self.step(4) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Pause)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Pause(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 5: TH reads from the DUT the OperationalState attribute self.step(5) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kPaused) + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kPaused) # STEP 6: TH reads from the DUT the CountdownTime attribute self.step(6) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.CountdownTime): initial_countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) if initial_countdown_time is not NullValue: @@ -830,7 +824,7 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 8: TH reads from the DUT the CountdownTime attribute self.step(8) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.CountdownTime): countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) @@ -842,31 +836,27 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 9: TH sends Pause command to the DUT self.step(9) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Pause)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Pause(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 10: TH sends Resume command to the DUT self.step(10) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Resume)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Resume(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 11: TH reads from the DUT the OperationalState attribute self.step(11) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kRunning) + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kRunning) # STEP 12: TH sends Resume command to the DUT self.step(12) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Resume)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Resume(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) @@ -880,16 +870,14 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 14: TH sends Pause command to the DUT self.step(14) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Pause)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Pause(), expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState) # STEP 15: TH sends Resume command to the DUT self.step(15) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Resume)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Resume(), expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState) @@ -904,16 +892,14 @@ async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1): # STEP 17: TH sends Pause command to the DUT self.step(17) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Pause)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Pause(), expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState) # STEP 18: TH sends Resume command to the DUT self.step(18) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Resume)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Resume(), expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState) @@ -946,7 +932,7 @@ async def TEST_TC_OPSTATE_BASE_2_4(self, endpoint=1): # STEP 1: Commission DUT to TH (can be skipped if done in a preceding test) self.step(1) - if self.pics_guard(error_event_gen): + if error_event_gen: # STEP 2: Set up a subscription to the OperationalError event self.step(2) # Subscribe to Events and when they are sent push them to a queue for checking later @@ -976,10 +962,11 @@ async def TEST_TC_OPSTATE_BASE_2_4(self, endpoint=1): # STEP 4: TH reads from the DUT the OperationalState attribute self.step(4) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kError) + + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kError) + else: self.skip_step(2) self.skip_step(3) @@ -1017,7 +1004,10 @@ def STEPS_TC_OPSTATE_BASE_2_5(self) -> list[TestStep]: async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): cluster = self.test_info.cluster attributes = cluster.Attributes + commands = cluster.Commands + generated_cmd_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attributes.GeneratedCommandList) + events = cluster.Events self.init_test() @@ -1058,25 +1048,23 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 4: TH sends Start command to the DUT self.step(4) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Start)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Start(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 5: TH reads from the DUT the CountdownTime attribute self.step(5) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")): + if await self.attribute_guard(endpoint=endpoint, attribute=attributes.CountdownTime): initial_countdown_time = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) if initial_countdown_time is not NullValue: # STEP 6: TH reads from the DUT the OperationalState attribute self.step(6) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kRunning) + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kRunning) # STEP 7: TH waits for initial-countdown-time self.step(7) @@ -1085,8 +1073,7 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 8: TH sends Stop command to the DUT self.step(8) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Stop)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Stop(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) @@ -1109,10 +1096,9 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 10: TH reads from the DUT the OperationalState attribute self.step(10) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kStopped) + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kStopped) # STEP 11: Restart DUT self.step(11) @@ -1135,33 +1121,29 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 13: TH sends Start command to the DUT self.step(13) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Start)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Start(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 14: TH reads from the DUT the OperationalState attribute self.step(14) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kRunning) + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kRunning) # STEP 15: TH sends Pause command to the DUT self.step(15) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Pause)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Pause(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 16: TH reads from the DUT the OperationalState attribute self.step(16) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kPaused) + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kPaused) # STEP 17: TH waits for half of initial-countdown-time self.step(17) @@ -1169,18 +1151,16 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 18: TH sends Resume command to the DUT self.step(18) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Resume)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Resume(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) # STEP 19: TH reads from the DUT the OperationalState attribute self.step(19) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")): - await self.read_and_expect_value(endpoint=endpoint, - attribute=attributes.OperationalState, - expected_value=cluster.Enums.OperationalStateEnum.kRunning) + await self.read_and_expect_value(endpoint=endpoint, + attribute=attributes.OperationalState, + expected_value=cluster.Enums.OperationalStateEnum.kRunning) # STEP 20: TH waits for initial-countdown-time self.step(20) @@ -1188,8 +1168,7 @@ async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1): # STEP 21: TH sends Stop command to the DUT self.step(21) - if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and - self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")): + if ((await self.command_guard(endpoint=endpoint, command=commands.Stop)) and (commands.OperationalCommandResponse.command_id in generated_cmd_list)): await self.send_cmd_expect_response(endpoint=endpoint, cmd=commands.Stop(), expected_response=cluster.Enums.ErrorStateEnum.kNoError) diff --git a/src/python_testing/TC_RVCOPSTATE_2_1.py b/src/python_testing/TC_RVCOPSTATE_2_1.py index 428aa9a5ab14fd..4fc63fbe236632 100644 --- a/src/python_testing/TC_RVCOPSTATE_2_1.py +++ b/src/python_testing/TC_RVCOPSTATE_2_1.py @@ -85,6 +85,8 @@ def TC_RVCOPSTATE_2_1(self) -> list[str]: @async_test_body async def test_TC_RVCOPSTATE_2_1(self): + if self.matter_test_config.endpoint is None or self.matter_test_config.endpoint == 0: + asserts.fail("--endpoint must be set and not set to 0 for this test to run correctly.") self.endpoint = self.get_endpoint() asserts.assert_false(self.endpoint is None, "--endpoint must be included on the command line in.") self.is_ci = self.check_pics("PICS_SDK_CI_ONLY") @@ -94,7 +96,8 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set") self.app_pipe = self.app_pipe + str(app_pid) - attributes = Clusters.RvcOperationalState.Attributes + cluster = Clusters.RvcOperationalState + attributes = cluster.Attributes self.print_step(1, "Commissioning, already done") @@ -102,7 +105,7 @@ async def test_TC_RVCOPSTATE_2_1(self): if self.is_ci: self.write_to_app_pipe({"Name": "Reset"}) - if self.check_pics("RVCOPSTATE.S.A0000"): + if await self.attribute_guard(endpoint=self.endpoint, attribute=attributes.PhaseList): self.print_step(2, "Read PhaseList attribute") phase_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.PhaseList) @@ -115,7 +118,7 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.assert_less_equal(phase_list_len, 32, "PhaseList length(%d) must be less than 32!" % phase_list_len) - if self.check_pics("RVCOPSTATE.S.A0001"): + if await self.attribute_guard(endpoint=self.endpoint, attribute=attributes.CurrentPhase): self.print_step(3, "Read CurrentPhase attribute") current_phase = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CurrentPhase) logging.info("CurrentPhase: %s" % (current_phase)) @@ -126,7 +129,7 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.assert_true(0 <= current_phase < phase_list_len, "CurrentPhase(%s) must be between 0 and %d" % (current_phase, (phase_list_len - 1))) - if self.check_pics("RVCOPSTATE.S.A0002"): + if await self.attribute_guard(endpoint=self.endpoint, attribute=attributes.CountdownTime): self.print_step(4, "Read CountdownTime attribute") countdown_time = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CountdownTime) @@ -136,7 +139,7 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.assert_true(countdown_time >= 0 and countdown_time <= 259200, "CountdownTime(%s) must be between 0 and 259200" % countdown_time) - if self.check_pics("RVCOPSTATE.S.A0003"): + if await self.attribute_guard(endpoint=self.endpoint, attribute=attributes.OperationalStateList): self.print_step(5, "Read OperationalStateList attribute") operational_state_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.OperationalStateList) @@ -159,7 +162,7 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.assert_true(error_state_present, "The OperationalStateList does not have an ID entry of Error(0x03)") - if self.check_pics("RVCOPSTATE.S.A0004"): + if await self.attribute_guard(endpoint=self.endpoint, attribute=attributes.OperationalState): self.print_step(6, "Read OperationalState attribute") operational_state = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.OperationalState) @@ -226,7 +229,7 @@ async def test_TC_RVCOPSTATE_2_1(self): self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") await self.read_and_validate_opstate(step="6n", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kDocked) - if self.check_pics("RVCOPSTATE.S.A0005"): + if await self.attribute_guard(endpoint=self.endpoint, attribute=attributes.OperationalError): self.print_step(7, "Read OperationalError attribute") operational_error = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.OperationalError) diff --git a/src/python_testing/TC_TestAttrAvail.py b/src/python_testing/TC_TestAttrAvail.py new file mode 100644 index 00000000000000..b2fc40eae600c6 --- /dev/null +++ b/src/python_testing/TC_TestAttrAvail.py @@ -0,0 +1,164 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: +# run1: +# app: ${ALL_CLUSTERS_APP} +# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json +# script-args: > +# --storage-path admin_storage.json +# --manual-code 10054912339 +# --PICS src/app/tests/suites/certification/ci-pics-values +# --trace-to json:${TRACE_TEST_JSON}.json +# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto +# --endpoint 1 +# factory-reset: true +# quiet: true +# run2: +# app: ${ALL_CLUSTERS_APP} +# app-args: --discriminator 1234 --passcode 20202021 --KVS kvs1 +# script-args: > +# --storage-path admin_storage.json +# --discriminator 1234 +# --passcode 20202021 +# --endpoint 1 +# --commissioning-method on-network +# factory-reset: true +# quiet: true +# run3: +# app: ${ALL_CLUSTERS_APP} +# app-args: --discriminator 1234 --KVS kvs1 +# script-args: > +# --storage-path admin_storage.json +# --endpoint 1 +# --discriminator 1234 +# --passcode 20202021 +# factory-reset: false +# quiet: true +# === END CI TEST ARGUMENTS === + +# Run 1: Tests PASE connection using manual code +# Run 2: Tests CASE connection using manual discriminator and passcode +# Run 3: Tests without factory reset + +import asyncio + +import chip.clusters as Clusters +from chip.testing.matter_testing import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from mobly import asserts + + +class TC_TestAttrAvail(MatterBaseTest): + # Using get_code and a modified version of setup_class_helper functions from chip.testing.basic_composition module + def get_code(self, dev_ctrl): + created_codes = [] + for idx, discriminator in enumerate(self.matter_test_config.discriminators): + created_codes.append(dev_ctrl.CreateManualCode(discriminator, self.matter_test_config.setup_passcodes[idx])) + + setup_codes = self.matter_test_config.qr_code_content + self.matter_test_config.manual_code + created_codes + if not setup_codes: + return None + asserts.assert_equal(len(setup_codes), 1, + "Require exactly one of either --qr-code, --manual-code or (--discriminator and --passcode).") + return setup_codes[0] + + async def setup_class_helper(self, allow_pase: bool = True): + dev_ctrl = self.default_controller + self.problems = [] + + node_id = self.dut_node_id + + task_list = [] + if allow_pase and self.get_code(dev_ctrl): + setup_code = self.get_code(dev_ctrl) + pase_future = dev_ctrl.EstablishPASESession(setup_code, self.dut_node_id) + task_list.append(asyncio.create_task(pase_future)) + + case_future = dev_ctrl.GetConnectedDevice(nodeid=node_id, allowPASE=False) + task_list.append(asyncio.create_task(case_future)) + + for task in task_list: + asyncio.ensure_future(task) + + done, pending = await asyncio.wait(task_list, return_when=asyncio.FIRST_COMPLETED) + + for task in pending: + try: + task.cancel() + await task + except asyncio.CancelledError: + pass + + wildcard_read = (await dev_ctrl.Read(node_id, [()])) + + # ======= State kept for use by all tests ======= + # All endpoints in "full object" indexing format + self.endpoints = wildcard_read.attributes + + def steps_TC_TestAttrAvail(self) -> list[TestStep]: + return [ + TestStep(1, "Commissioning, already done", is_commissioning=True), + TestStep(2, "Checking OperationalState attribute is available on endpoint"), + TestStep(3, "Checking Operational Resume command is available on endpoint"), + TestStep(4, "Checking Timezone feature is available on endpoint"), + ] + + def TC_TestAttrAvail(self) -> list[str]: + return ["RVCOPSTATE.S"] + + @async_test_body + async def setup_class(self): + super().setup_class() + await self.setup_class_helper() + + # ======= START OF ACTUAL TESTS ======= + @async_test_body + async def test_TC_TestAttrAvail(self): + self.step(1) + + if self.matter_test_config.endpoint is None or self.matter_test_config.endpoint == 0: + asserts.fail("--endpoint must be set and not set to 0 for this test to run correctly.") + self.endpoint = self.get_endpoint() + asserts.assert_false(self.endpoint is None, "--endpoint must be included on the command line in.") + + cluster = Clusters.RvcOperationalState + attributes = cluster.Attributes + commands = cluster.Commands + self.th1 = self.default_controller + + self.step(2) + attr_should_be_there = await self.attribute_guard(endpoint=self.endpoint, attribute=attributes.OperationalState) + asserts.assert_true(attr_should_be_there, True) + self.print_step("Operational State Attr", attr_should_be_there) + + self.step(3) + cmd_should_be_there = await self.command_guard(endpoint=self.endpoint, command=commands.Resume) + asserts.assert_true(cmd_should_be_there, True) + self.print_step("Operational Resume Command available ", cmd_should_be_there) + + self.step(4) + feat_should_be_there = await self.feature_guard(endpoint=self.endpoint, cluster=Clusters.BooleanStateConfiguration, feature_int=Clusters.BooleanStateConfiguration.Bitmaps.Feature.kAudible) + asserts.assert_true(feat_should_be_there, True) + self.print_step("Boolean State Config Audio Feature available ", feat_should_be_there) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/matter_testing_infrastructure/chip/testing/matter_testing.py b/src/python_testing/matter_testing_infrastructure/chip/testing/matter_testing.py index 3b3fb6270b613c..0ad55369e106cb 100644 --- a/src/python_testing/matter_testing_infrastructure/chip/testing/matter_testing.py +++ b/src/python_testing/matter_testing_infrastructure/chip/testing/matter_testing.py @@ -1124,6 +1124,12 @@ def setup_class(self): self.current_step_index = 0 self.step_start_time = datetime.now(timezone.utc) self.step_skipped = False + self.global_wildcard = asyncio.wait_for(self.default_controller.Read(self.dut_node_id, [(Clusters.Descriptor), Attribute.AttributePath(None, None, GlobalAttributeIds.ATTRIBUTE_LIST_ID), Attribute.AttributePath( + None, None, GlobalAttributeIds.FEATURE_MAP_ID), Attribute.AttributePath(None, None, GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID)]), timeout=60) + # self.stored_global_wildcard stores value of self.global_wildcard after first async call. + # Because setup_class can be called before commissioning, this variable is lazy-initialized + # where the read is deferred until the first guard function call that requires global attributes. + self.stored_global_wildcard = None def setup_test(self): self.current_step_index = 0 @@ -1455,6 +1461,66 @@ def pics_guard(self, pics_condition: bool): self.mark_current_step_skipped() return pics_condition + async def attribute_guard(self, endpoint: int, attribute: ClusterObjects.ClusterAttributeDescriptor): + """Similar to pics_guard above, except checks a condition and if False marks the test step as skipped and + returns False using attributes against attributes_list, otherwise returns True. + For example can be used to check if a test step should be run: + + self.step("1") + if self.attribute_guard(condition1_needs_to_be_true_to_execute): + # do the test for step 1 + + self.step("2") + if self.attribute_guard(condition2_needs_to_be_false_to_skip_step): + # skip step 2 if condition not met + """ + if self.stored_global_wildcard is None: + self.stored_global_wildcard = await self.global_wildcard + attr_condition = _has_attribute(wildcard=self.stored_global_wildcard, endpoint=endpoint, attribute=attribute) + if not attr_condition: + self.mark_current_step_skipped() + return attr_condition + + async def command_guard(self, endpoint: int, command: ClusterObjects.ClusterCommand): + """Similar to attribute_guard above, except checks a condition and if False marks the test step as skipped and + returns False using command id against AcceptedCmdsList, otherwise returns True. + For example can be used to check if a test step should be run: + + self.step("1") + if self.command_guard(condition1_needs_to_be_true_to_execute): + # do the test for step 1 + + self.step("2") + if self.command_guard(condition2_needs_to_be_false_to_skip_step): + # skip step 2 if condition not met + """ + if self.stored_global_wildcard is None: + self.stored_global_wildcard = await self.global_wildcard + cmd_condition = _has_command(wildcard=self.stored_global_wildcard, endpoint=endpoint, command=command) + if not cmd_condition: + self.mark_current_step_skipped() + return cmd_condition + + async def feature_guard(self, endpoint: int, cluster: ClusterObjects.ClusterObjectDescriptor, feature_int: IntFlag): + """Similar to command_guard and attribute_guard above, except checks a condition and if False marks the test step as skipped and + returns False using feature id against feature_map, otherwise returns True. + For example can be used to check if a test step should be run: + + self.step("1") + if self.feature_guard(condition1_needs_to_be_true_to_execute): + # do the test for step 1 + + self.step("2") + if self.feature_guard(condition2_needs_to_be_false_to_skip_step): + # skip step 2 if condition not met + """ + if self.stored_global_wildcard is None: + self.stored_global_wildcard = await self.global_wildcard + feat_condition = _has_feature(wildcard=self.stored_global_wildcard, endpoint=endpoint, cluster=cluster, feature=feature_int) + if not feat_condition: + self.mark_current_step_skipped() + return feat_condition + def mark_current_step_skipped(self): try: steps = self.get_test_steps(self.current_test_info.name) @@ -2105,6 +2171,41 @@ def has_attribute(attribute: ClusterObjects.ClusterAttributeDescriptor) -> Endpo return partial(_has_attribute, attribute=attribute) +def _has_command(wildcard, endpoint, command: ClusterObjects.ClusterCommand) -> bool: + cluster = get_cluster_from_command(command) + try: + cmd_list = wildcard.attributes[endpoint][cluster][cluster.Attributes.AcceptedCommandList] + if not isinstance(cmd_list, list): + asserts.fail( + f"Failed to read mandatory AcceptedCommandList command value for cluster {cluster} on endpoint {endpoint}: {cmd_list}.") + return command.command_id in cmd_list + except KeyError: + return False + + +def has_command(command: ClusterObjects.ClusterCommand) -> EndpointCheckFunction: + """ EndpointCheckFunction that can be passed as a parameter to the run_if_endpoint_matches decorator. + + Use this function with the run_if_endpoint_matches decorator to run this test on all endpoints with + the specified attribute. For example, given a device with the following conformance + + EP0: cluster A, B, C + EP1: cluster D with command d, E + EP2, cluster D with command d + EP3, cluster D without command d + + And the following test specification: + @run_if_endpoint_matches(has_command(Clusters.D.Commands.d)) + test_mytest(self): + ... + + If you run this test with --endpoint 1 or --endpoint 2, the test will be run. If you run this test + with any other --endpoint the run_if_endpoint_matches decorator will call the on_skip function to + notify the test harness that the test is not applicable to this node and the test will not be run. + """ + return partial(_has_command, command=command) + + def _has_feature(wildcard, endpoint: int, cluster: ClusterObjects.ClusterObjectDescriptor, feature: IntFlag) -> bool: try: feature_map = wildcard.attributes[endpoint][cluster][cluster.Attributes.FeatureMap] diff --git a/src/python_testing/test_metadata.yaml b/src/python_testing/test_metadata.yaml index a6f0ba5bf6174e..dcabdeffdadb1d 100644 --- a/src/python_testing/test_metadata.yaml +++ b/src/python_testing/test_metadata.yaml @@ -97,6 +97,7 @@ slow_tests: - { name: TC_PS_2_3.py, duration: 30 seconds } - { name: TC_RR_1_1.py, duration: 25 seconds } - { name: TC_SWTCH.py, duration: 1 minute } + - { name: TC_TestAttrAvail.py, duration: 30 seconds } - { name: TC_TIMESYNC_2_10.py, duration: 20 seconds } - { name: TC_TIMESYNC_2_11.py, duration: 30 seconds } - { name: TC_TIMESYNC_2_12.py, duration: 20 seconds }