From 9743194f069e1f86f26320a080ebcfa10cfe018a Mon Sep 17 00:00:00 2001 From: Tennessee Carmel-Veilleux Date: Mon, 2 Sep 2024 17:09:57 -0400 Subject: [PATCH] Fix remaining post-TE2 TC-SWTCH issues (#35291) * Fix remaining post-TE2 TC-SWTCH issues - Fixes #34556 (-2.2, -2.3 and -2.4 to better match test plan) - Fixes #28620 (Better bound checks) - Fixes #35241 * Restyled by clang-format * Restyled by autopep8 * Restyled by isort * Fix step 2 of TC-SWTCH--2.4 --------- Co-authored-by: Restyled.io --- .../linux/AllClustersCommandDelegate.cpp | 37 ++++++ src/python_testing/TC_SWTCH.py | 110 +++++++++++------- src/python_testing/matter_testing_support.py | 26 ++++- 3 files changed, 133 insertions(+), 40 deletions(-) diff --git a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp index 6e5008a38685d8..0d665509407744 100644 --- a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp +++ b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp @@ -57,6 +57,17 @@ bool HasNumericField(Json::Value & jsonValue, const std::string & field) return jsonValue.isMember(field) && jsonValue[field].isNumeric(); } +uint8_t GetNumberOfSwitchPositions(EndpointId endpointId) +{ + // TODO: Move to using public API of cluster. + uint8_t numPositions = 0; + + // On failure, the numPositions won't be changed, so 0 returned. + (void) Switch::Attributes::NumberOfPositions::Get(endpointId, &numPositions); + + return numPositions; +} + /** * Named pipe handler for simulated long press * @@ -99,6 +110,15 @@ void HandleSimulateLongPress(Json::Value & jsonValue) EndpointId endpointId = static_cast(jsonValue["EndpointId"].asUInt()); uint8_t buttonId = static_cast(jsonValue["ButtonId"].asUInt()); + + uint8_t numPositions = GetNumberOfSwitchPositions(endpointId); + if (buttonId >= numPositions) + { + std::string inputJson = jsonValue.toStyledString(); + ChipLogError(NotSpecified, "Invalid ButtonId (out of range) in %s", inputJson.c_str()); + return; + } + System::Clock::Milliseconds32 longPressDelayMillis{ static_cast(jsonValue["LongPressDelayMillis"].asUInt()) }; System::Clock::Milliseconds32 longPressDurationMillis{ static_cast(jsonValue["LongPressDurationMillis"].asUInt()) }; uint32_t featureMap = static_cast(jsonValue["FeatureMap"].asUInt()); @@ -169,6 +189,15 @@ void HandleSimulateMultiPress(Json::Value & jsonValue) EndpointId endpointId = static_cast(jsonValue["EndpointId"].asUInt()); uint8_t buttonId = static_cast(jsonValue["ButtonId"].asUInt()); + + uint8_t numPositions = GetNumberOfSwitchPositions(endpointId); + if (buttonId >= numPositions) + { + std::string inputJson = jsonValue.toStyledString(); + ChipLogError(NotSpecified, "Invalid ButtonId (out of range) in %s", inputJson.c_str()); + return; + } + System::Clock::Milliseconds32 multiPressPressedTimeMillis{ static_cast( jsonValue["MultiPressPressedTimeMillis"].asUInt()) }; System::Clock::Milliseconds32 multiPressReleasedTimeMillis{ static_cast( @@ -227,6 +256,14 @@ void HandleSimulateLatchPosition(Json::Value & jsonValue) EndpointId endpointId = static_cast(jsonValue["EndpointId"].asUInt()); uint8_t positionId = static_cast(jsonValue["PositionId"].asUInt()); + uint8_t numPositions = GetNumberOfSwitchPositions(endpointId); + if (positionId >= numPositions) + { + std::string inputJson = jsonValue.toStyledString(); + ChipLogError(NotSpecified, "Invalid PositionId (out of range) in %s", inputJson.c_str()); + return; + } + uint8_t previousPositionId = 0; Protocols::InteractionModel::Status status = Switch::Attributes::CurrentPosition::Get(endpointId, &previousPositionId); VerifyOrReturn(Protocols::InteractionModel::Status::Success == status, diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py index 1001402ea2d2de..3db2b213428b39 100644 --- a/src/python_testing/TC_SWTCH.py +++ b/src/python_testing/TC_SWTCH.py @@ -38,8 +38,8 @@ from chip.clusters import ClusterObjects as ClusterObjects from chip.clusters.Attribute import EventReadResult from chip.tlv import uint -from matter_testing_support import (ClusterAttributeChangeAccumulator, EventChangeCallback, MatterBaseTest, TestStep, - await_sequence_of_reports, default_matter_test_main, has_feature, per_endpoint_test) +from matter_testing_support import (AttributeValue, ClusterAttributeChangeAccumulator, EventChangeCallback, MatterBaseTest, + TestStep, await_sequence_of_reports, default_matter_test_main, has_feature, per_endpoint_test) from mobly import asserts logger = logging.getLogger(__name__) @@ -70,6 +70,10 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._default_pressed_position = self.user_params.get("default_pressed_position", 1) + def setup_test(self): + super().setup_test() + self.is_ci = self._use_button_simulator() + def _send_named_pipe_command(self, command_dict: dict[str, Any]): app_pid = self.matter_test_config.app_pid if app_pid == 0: @@ -259,32 +263,39 @@ def _received_event(self, event_listener: EventChangeCallback, target_event: Clu def steps_TC_SWTCH_2_2(self): return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), - TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), + TestStep(2, "Set up subscription to all events and attributes of Switch cluster on the endpoint"), TestStep(3, "Operator sets switch to first position on the DUT"), - TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), + TestStep(4, "TH reads the CurrentPosition attribute from the DUT.", "Verify that the value is 0."), TestStep(5, "Operator sets switch to second position (one) on the DUT", - "Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT"), - TestStep(6, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1"), + "Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT."), + TestStep(6, "TH reads the CurrentPosition attribute from the DUT", + "Verify that the value is 1, and that a subscription report was received for that change."), TestStep(7, "If there are more than 2 positions, test subsequent positions of the DUT"), TestStep(8, "Operator sets switch to first position on the DUT."), TestStep(9, "Wait 10 seconds for event reports stable." "Verify that last SwitchLatched event received is for NewPosition 0."), - TestStep(10, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), + TestStep(10, "TH reads the CurrentPosition attribute from the DUT", + "Verify that the value is 0, and that a subscription report was received for that change."), ] @per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kLatchingSwitch)) async def test_TC_SWTCH_2_2(self): post_prompt_settle_delay_seconds = 10.0 + cluster = Clusters.Switch + endpoint_id = self.matter_test_config.endpoint # Step 1: Commissioning - already done self.step(1) - cluster = Clusters.Switch - endpoint_id = self.matter_test_config.endpoint - # Step 2: Set up subscription to all events of Switch cluster on the endpoint. self.step(2) event_listener = EventChangeCallback(cluster) await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) + attrib_listener = ClusterAttributeChangeAccumulator(cluster) + await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) + + # Pre-get number of positions for step 7 later. + num_positions = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.NumberOfPositions) + asserts.assert_greater(num_positions, 1, "Latching switch has only 1 position, this is impossible.") # Step 3: Operator sets switch to first position on the DUT. self.step(3) @@ -296,25 +307,39 @@ async def test_TC_SWTCH_2_2(self): button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) asserts.assert_equal(button_val, 0, "Switch position value is not 0") - # Step 5: Operator sets switch to second position (one) on the DUT", - # Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT - self.step(5) - expected_switch_position = 1 - self._ask_for_switch_position(endpoint_id, expected_switch_position) - - data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched, timeout_sec=post_prompt_settle_delay_seconds) - logging.info(f"-> SwitchLatched event last received: {data}") - asserts.assert_equal(data, cluster.Events.SwitchLatched( - newPosition=expected_switch_position), "Did not get expected switch position") - - # Step 6: TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1 - self.step(6) - button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) - asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}") - - # Step 7: If there are more than 2 positions, test subsequent positions of the DUT - # # TODO(#34656): Implement loop for > 2 total positions - self.skip_step(7) + attrib_listener.reset() + event_listener.reset() + + for expected_switch_position in range(1, num_positions): + # Step 5: Operator sets switch to second position (one) on the DUT", + # Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT + if expected_switch_position == 1: + self.step(5) + self._ask_for_switch_position(endpoint_id, expected_switch_position) + + data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched, timeout_sec=post_prompt_settle_delay_seconds) + logging.info(f"-> SwitchLatched event last received: {data}") + asserts.assert_equal(data, cluster.Events.SwitchLatched( + newPosition=expected_switch_position), "Did not get expected switch position") + + # Step 6: TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1 + if expected_switch_position == 1: # Indicate step 7 only once + self.step(6) + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) + asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}") + logging.info(f"Checking to see if a report for {expected_switch_position} is received") + attrib_listener.await_sequence_of_reports(attribute=cluster.Attributes.CurrentPosition, sequence=[ + expected_switch_position], timeout_sec=post_prompt_settle_delay_seconds) + + # Step 7: If there are more than 2 positions, test subsequent positions of the DUT + if expected_switch_position == 1: + if num_positions > 2: # Indicate step 7 only once + self.step(7) + else: + self.skip_step(7) + + if num_positions > 2: + logging.info("Looping for the other positions") # Step 8: Operator sets switch to first position on the DUT. self.step(8) @@ -324,7 +349,7 @@ async def test_TC_SWTCH_2_2(self): # Step 9: Wait 10 seconds for event reports stable. # Verify that last SwitchLatched event received is for NewPosition 0. self.step(9) - time.sleep(10.0) + time.sleep(10.0 if not self.is_ci else 1.0) expected_switch_position = 0 last_event = event_listener.get_last_event() @@ -337,15 +362,21 @@ async def test_TC_SWTCH_2_2(self): # Step 10: TH reads the CurrentPosition attribute from the DUT. # Verify that the value is 0 self.step(10) + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) asserts.assert_equal(button_val, 0, "Button value is not 0") + logging.info(f"Checking to see if a report for {expected_switch_position} is received") + expected_final_value = [AttributeValue( + endpoint_id, attribute=cluster.Attributes.CurrentPosition, value=expected_switch_position)] + attrib_listener.await_all_final_values_reported(expected_final_value, timeout_sec=post_prompt_settle_delay_seconds) + def steps_TC_SWTCH_2_3(self): return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), TestStep(3, "Operator does not operate switch on the DUT"), TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), - TestStep(5, "Operator operates switch (keep it pressed)", + TestStep(5, "Operator operates switch (keep it pressed, and wait at least 5 seconds)", "Verify that the TH receives InitialPress event with NewPosition set to 1 on the DUT"), TestStep(6, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1"), TestStep(7, "Operator releases switch on the DUT"), @@ -421,15 +452,15 @@ def desc_TC_SWTCH_2_4(self) -> str: def steps_TC_SWTCH_2_4(self): return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), - TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), + TestStep(2, "Set up subscription to all events and attributes of Switch cluster on the endpoint"), TestStep(3, "Operator does not operate switch on the DUT"), TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), - TestStep(5, "Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, the release it", + TestStep(5, "Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, then release it", """ * TH expects receiving a subscription report of CurrentPosition 1, followed by a report of Current Position 0. * TH expects receiving at InitialPress event with NewPosition = 1. - * if MSL or AS feature is supported, TH expect receiving LongPress/LongRelease in that order. - * if MS & (!MSL & !AS & !MSR) features present, TH expects receiving no further events for 10 seconds after release. + * if MSL feature is supported, TH expect receiving LongPress/LongRelease in that order. + * if MS & (!MSL & !AS & !MSR & !MSM) features present, TH expects receiving no further events for 10 seconds after release. * if (MSR & !MSL) features present, TH expects receiving ShortRelease event. """) ] @@ -451,17 +482,18 @@ async def test_TC_SWTCH_2_4(self): has_ms_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitch) != 0 has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0 has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0 + has_msm_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchMultiPress) != 0 has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0 if not has_ms_feature: logging.info("Skipping rest of test: SWTCH.S.F01(MS) feature not present") self.skip_all_remaining_steps("2") - # Step 2: Set up subscription to all events of Switch cluster on the endpoint + # Step 2: Set up subscription to all events and attributes of Switch cluster on the endpoint self.step(2) event_listener = EventChangeCallback(cluster) - attrib_listener = ClusterAttributeChangeAccumulator(cluster) await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) + attrib_listener = ClusterAttributeChangeAccumulator(cluster) await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) # Step 3: Operator does not operate switch on the DUT @@ -503,8 +535,8 @@ async def test_TC_SWTCH_2_4(self): self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds) - # - if MS & (!MSL & !AS & !MSR) features present, expect no further events for 10 seconds after release. - if not has_msl_feature and not has_as_feature and not has_msr_feature: + # - if MS & (!MSL & !AS & !MSR & !MSM) features present, expect no further events for 10 seconds after release. + if not has_msl_feature and not has_as_feature and not has_msr_feature and not has_msm_feature: self._expect_no_events_for_cluster(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, expected_cluster=cluster, timeout_sec=10.0) diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index ea937306cad925..1e0fa26ae8d722 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -301,6 +301,10 @@ def flush_events(self) -> None: _ = self.get_last_event() return + def reset(self) -> None: + """Resets state as if no events had ever been received.""" + self.flush_events() + @property def event_queue(self) -> queue.Queue: return self._q @@ -356,7 +360,7 @@ class AttributeValue: timestamp_utc: Optional[datetime] = None -def await_sequence_of_reports(report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float): +def await_sequence_of_reports(report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float) -> None: """Given a queue.Queue hooked-up to an attribute change accumulator, await a given expected sequence of attribute reports. Args: @@ -418,6 +422,7 @@ def __init__(self, expected_cluster: ClusterObjects.Cluster): self._subscription = None self._lock = threading.Lock() self._q = queue.Queue() + self._endpoint_id = 0 self.reset() def reset(self): @@ -441,6 +446,7 @@ async def start(self, dev_ctrl, node_id: int, endpoint: int, fabric_filtered: bo fabricFiltered=fabric_filtered, keepSubscriptions=keepSubscriptions ) + self._endpoint_id = endpoint self._subscription.SetAttributeUpdateCallback(self.__call__) return self._subscription @@ -513,6 +519,24 @@ def await_all_final_values_reported(self, expected_final_values: Iterable[Attrib logging.info(f" -> {expected_element} found: {last_report_matches.get(expected_idx)}") asserts.fail("Did not find all expected last report values before time-out") + def await_sequence_of_reports(self, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float) -> None: + """Await a given expected sequence of attribute reports in the accumulator for the endpoint associated. + + Args: + - attribute: attribute to match for reports to check. + - sequence: list of attribute values in order that are expected. + - timeout_sec: number of seconds to wait for. + + *** WARNING: The queue contains every report since the sub was established. Use + self.reset() to make it empty. *** + + This will fail current Mobly test with assertion failure if the data is not as expected in order. + + Returns nothing on success so the test can go on. + """ + await_sequence_of_reports(report_queue=self.attribute_queue, endpoint_id=self._endpoint_id, + attribute=attribute, sequence=sequence, timeout_sec=timeout_sec) + @property def attribute_queue(self) -> queue.Queue: return self._q