Skip to content

Commit

Permalink
Fix remaining post-TE2 TC-SWTCH issues (#35291)
Browse files Browse the repository at this point in the history
* Fix remaining post-TE2 TC-SWTCH issues

- Fixes #34556 (-2.2, -2.3 and -2.4 to better match test plan)
- Fixes #28620 (Better bound checks)
- Fixes #35241

* Restyled by clang-format

* Restyled by autopep8

* Restyled by isort

* Fix step 2 of TC-SWTCH--2.4

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
2 people authored and pull[bot] committed Sep 25, 2024
1 parent bc45a29 commit 1433016
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 40 deletions.
37 changes: 37 additions & 0 deletions examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ bool HasNumericField(Json::Value & jsonValue, const std::string & field)
return jsonValue.isMember(field) && jsonValue[field].isNumeric();
}

uint8_t GetNumberOfSwitchPositions(EndpointId endpointId)
{
// TODO: Move to using public API of cluster.
uint8_t numPositions = 0;

// On failure, the numPositions won't be changed, so 0 returned.
(void) Switch::Attributes::NumberOfPositions::Get(endpointId, &numPositions);

return numPositions;
}

/**
* Named pipe handler for simulated long press
*
Expand Down Expand Up @@ -99,6 +110,15 @@ void HandleSimulateLongPress(Json::Value & jsonValue)

EndpointId endpointId = static_cast<EndpointId>(jsonValue["EndpointId"].asUInt());
uint8_t buttonId = static_cast<uint8_t>(jsonValue["ButtonId"].asUInt());

uint8_t numPositions = GetNumberOfSwitchPositions(endpointId);
if (buttonId >= numPositions)
{
std::string inputJson = jsonValue.toStyledString();
ChipLogError(NotSpecified, "Invalid ButtonId (out of range) in %s", inputJson.c_str());
return;
}

System::Clock::Milliseconds32 longPressDelayMillis{ static_cast<unsigned>(jsonValue["LongPressDelayMillis"].asUInt()) };
System::Clock::Milliseconds32 longPressDurationMillis{ static_cast<unsigned>(jsonValue["LongPressDurationMillis"].asUInt()) };
uint32_t featureMap = static_cast<uint32_t>(jsonValue["FeatureMap"].asUInt());
Expand Down Expand Up @@ -169,6 +189,15 @@ void HandleSimulateMultiPress(Json::Value & jsonValue)

EndpointId endpointId = static_cast<EndpointId>(jsonValue["EndpointId"].asUInt());
uint8_t buttonId = static_cast<uint8_t>(jsonValue["ButtonId"].asUInt());

uint8_t numPositions = GetNumberOfSwitchPositions(endpointId);
if (buttonId >= numPositions)
{
std::string inputJson = jsonValue.toStyledString();
ChipLogError(NotSpecified, "Invalid ButtonId (out of range) in %s", inputJson.c_str());
return;
}

System::Clock::Milliseconds32 multiPressPressedTimeMillis{ static_cast<unsigned>(
jsonValue["MultiPressPressedTimeMillis"].asUInt()) };
System::Clock::Milliseconds32 multiPressReleasedTimeMillis{ static_cast<unsigned>(
Expand Down Expand Up @@ -227,6 +256,14 @@ void HandleSimulateLatchPosition(Json::Value & jsonValue)
EndpointId endpointId = static_cast<EndpointId>(jsonValue["EndpointId"].asUInt());
uint8_t positionId = static_cast<uint8_t>(jsonValue["PositionId"].asUInt());

uint8_t numPositions = GetNumberOfSwitchPositions(endpointId);
if (positionId >= numPositions)
{
std::string inputJson = jsonValue.toStyledString();
ChipLogError(NotSpecified, "Invalid PositionId (out of range) in %s", inputJson.c_str());
return;
}

uint8_t previousPositionId = 0;
Protocols::InteractionModel::Status status = Switch::Attributes::CurrentPosition::Get(endpointId, &previousPositionId);
VerifyOrReturn(Protocols::InteractionModel::Status::Success == status,
Expand Down
110 changes: 71 additions & 39 deletions src/python_testing/TC_SWTCH.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
from chip.clusters import ClusterObjects as ClusterObjects
from chip.clusters.Attribute import EventReadResult
from chip.tlv import uint
from matter_testing_support import (ClusterAttributeChangeAccumulator, EventChangeCallback, MatterBaseTest, TestStep,
await_sequence_of_reports, default_matter_test_main, has_feature, per_endpoint_test)
from matter_testing_support import (AttributeValue, ClusterAttributeChangeAccumulator, EventChangeCallback, MatterBaseTest,
TestStep, await_sequence_of_reports, default_matter_test_main, has_feature, per_endpoint_test)
from mobly import asserts

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -70,6 +70,10 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._default_pressed_position = self.user_params.get("default_pressed_position", 1)

def setup_test(self):
super().setup_test()
self.is_ci = self._use_button_simulator()

def _send_named_pipe_command(self, command_dict: dict[str, Any]):
app_pid = self.matter_test_config.app_pid
if app_pid == 0:
Expand Down Expand Up @@ -259,32 +263,39 @@ def _received_event(self, event_listener: EventChangeCallback, target_event: Clu

def steps_TC_SWTCH_2_2(self):
return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True),
TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"),
TestStep(2, "Set up subscription to all events and attributes of Switch cluster on the endpoint"),
TestStep(3, "Operator sets switch to first position on the DUT"),
TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"),
TestStep(4, "TH reads the CurrentPosition attribute from the DUT.", "Verify that the value is 0."),
TestStep(5, "Operator sets switch to second position (one) on the DUT",
"Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT"),
TestStep(6, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1"),
"Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT."),
TestStep(6, "TH reads the CurrentPosition attribute from the DUT",
"Verify that the value is 1, and that a subscription report was received for that change."),
TestStep(7, "If there are more than 2 positions, test subsequent positions of the DUT"),
TestStep(8, "Operator sets switch to first position on the DUT."),
TestStep(9, "Wait 10 seconds for event reports stable." "Verify that last SwitchLatched event received is for NewPosition 0."),
TestStep(10, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"),
TestStep(10, "TH reads the CurrentPosition attribute from the DUT",
"Verify that the value is 0, and that a subscription report was received for that change."),
]

@per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kLatchingSwitch))
async def test_TC_SWTCH_2_2(self):
post_prompt_settle_delay_seconds = 10.0
cluster = Clusters.Switch
endpoint_id = self.matter_test_config.endpoint

# Step 1: Commissioning - already done
self.step(1)

cluster = Clusters.Switch
endpoint_id = self.matter_test_config.endpoint

# Step 2: Set up subscription to all events of Switch cluster on the endpoint.
self.step(2)
event_listener = EventChangeCallback(cluster)
await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)
attrib_listener = ClusterAttributeChangeAccumulator(cluster)
await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)

# Pre-get number of positions for step 7 later.
num_positions = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.NumberOfPositions)
asserts.assert_greater(num_positions, 1, "Latching switch has only 1 position, this is impossible.")

# Step 3: Operator sets switch to first position on the DUT.
self.step(3)
Expand All @@ -296,25 +307,39 @@ async def test_TC_SWTCH_2_2(self):
button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(button_val, 0, "Switch position value is not 0")

# Step 5: Operator sets switch to second position (one) on the DUT",
# Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT
self.step(5)
expected_switch_position = 1
self._ask_for_switch_position(endpoint_id, expected_switch_position)

data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched, timeout_sec=post_prompt_settle_delay_seconds)
logging.info(f"-> SwitchLatched event last received: {data}")
asserts.assert_equal(data, cluster.Events.SwitchLatched(
newPosition=expected_switch_position), "Did not get expected switch position")

# Step 6: TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1
self.step(6)
button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}")

# Step 7: If there are more than 2 positions, test subsequent positions of the DUT
# # TODO(#34656): Implement loop for > 2 total positions
self.skip_step(7)
attrib_listener.reset()
event_listener.reset()

for expected_switch_position in range(1, num_positions):
# Step 5: Operator sets switch to second position (one) on the DUT",
# Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT
if expected_switch_position == 1:
self.step(5)
self._ask_for_switch_position(endpoint_id, expected_switch_position)

data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched, timeout_sec=post_prompt_settle_delay_seconds)
logging.info(f"-> SwitchLatched event last received: {data}")
asserts.assert_equal(data, cluster.Events.SwitchLatched(
newPosition=expected_switch_position), "Did not get expected switch position")

# Step 6: TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1
if expected_switch_position == 1: # Indicate step 7 only once
self.step(6)
button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}")
logging.info(f"Checking to see if a report for {expected_switch_position} is received")
attrib_listener.await_sequence_of_reports(attribute=cluster.Attributes.CurrentPosition, sequence=[
expected_switch_position], timeout_sec=post_prompt_settle_delay_seconds)

# Step 7: If there are more than 2 positions, test subsequent positions of the DUT
if expected_switch_position == 1:
if num_positions > 2: # Indicate step 7 only once
self.step(7)
else:
self.skip_step(7)

if num_positions > 2:
logging.info("Looping for the other positions")

# Step 8: Operator sets switch to first position on the DUT.
self.step(8)
Expand All @@ -324,7 +349,7 @@ async def test_TC_SWTCH_2_2(self):
# Step 9: Wait 10 seconds for event reports stable.
# Verify that last SwitchLatched event received is for NewPosition 0.
self.step(9)
time.sleep(10.0)
time.sleep(10.0 if not self.is_ci else 1.0)

expected_switch_position = 0
last_event = event_listener.get_last_event()
Expand All @@ -337,15 +362,21 @@ async def test_TC_SWTCH_2_2(self):
# Step 10: TH reads the CurrentPosition attribute from the DUT.
# Verify that the value is 0
self.step(10)

button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(button_val, 0, "Button value is not 0")

logging.info(f"Checking to see if a report for {expected_switch_position} is received")
expected_final_value = [AttributeValue(
endpoint_id, attribute=cluster.Attributes.CurrentPosition, value=expected_switch_position)]
attrib_listener.await_all_final_values_reported(expected_final_value, timeout_sec=post_prompt_settle_delay_seconds)

def steps_TC_SWTCH_2_3(self):
return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True),
TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"),
TestStep(3, "Operator does not operate switch on the DUT"),
TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"),
TestStep(5, "Operator operates switch (keep it pressed)",
TestStep(5, "Operator operates switch (keep it pressed, and wait at least 5 seconds)",
"Verify that the TH receives InitialPress event with NewPosition set to 1 on the DUT"),
TestStep(6, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1"),
TestStep(7, "Operator releases switch on the DUT"),
Expand Down Expand Up @@ -421,15 +452,15 @@ def desc_TC_SWTCH_2_4(self) -> str:

def steps_TC_SWTCH_2_4(self):
return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True),
TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"),
TestStep(2, "Set up subscription to all events and attributes of Switch cluster on the endpoint"),
TestStep(3, "Operator does not operate switch on the DUT"),
TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"),
TestStep(5, "Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, the release it",
TestStep(5, "Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, then release it",
"""
* TH expects receiving a subscription report of CurrentPosition 1, followed by a report of Current Position 0.
* TH expects receiving at InitialPress event with NewPosition = 1.
* if MSL or AS feature is supported, TH expect receiving LongPress/LongRelease in that order.
* if MS & (!MSL & !AS & !MSR) features present, TH expects receiving no further events for 10 seconds after release.
* if MSL feature is supported, TH expect receiving LongPress/LongRelease in that order.
* if MS & (!MSL & !AS & !MSR & !MSM) features present, TH expects receiving no further events for 10 seconds after release.
* if (MSR & !MSL) features present, TH expects receiving ShortRelease event.
""")
]
Expand All @@ -451,17 +482,18 @@ async def test_TC_SWTCH_2_4(self):
has_ms_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitch) != 0
has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0
has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0
has_msm_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchMultiPress) != 0
has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0

if not has_ms_feature:
logging.info("Skipping rest of test: SWTCH.S.F01(MS) feature not present")
self.skip_all_remaining_steps("2")

# Step 2: Set up subscription to all events of Switch cluster on the endpoint
# Step 2: Set up subscription to all events and attributes of Switch cluster on the endpoint
self.step(2)
event_listener = EventChangeCallback(cluster)
attrib_listener = ClusterAttributeChangeAccumulator(cluster)
await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)
attrib_listener = ClusterAttributeChangeAccumulator(cluster)
await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)

# Step 3: Operator does not operate switch on the DUT
Expand Down Expand Up @@ -503,8 +535,8 @@ async def test_TC_SWTCH_2_4(self):
self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id,
sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds)

# - if MS & (!MSL & !AS & !MSR) features present, expect no further events for 10 seconds after release.
if not has_msl_feature and not has_as_feature and not has_msr_feature:
# - if MS & (!MSL & !AS & !MSR & !MSM) features present, expect no further events for 10 seconds after release.
if not has_msl_feature and not has_as_feature and not has_msr_feature and not has_msm_feature:
self._expect_no_events_for_cluster(event_queue=event_listener.event_queue,
endpoint_id=endpoint_id, expected_cluster=cluster, timeout_sec=10.0)

Expand Down
26 changes: 25 additions & 1 deletion src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ def flush_events(self) -> None:
_ = self.get_last_event()
return

def reset(self) -> None:
"""Resets state as if no events had ever been received."""
self.flush_events()

@property
def event_queue(self) -> queue.Queue:
return self._q
Expand Down Expand Up @@ -356,7 +360,7 @@ class AttributeValue:
timestamp_utc: Optional[datetime] = None


def await_sequence_of_reports(report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float):
def await_sequence_of_reports(report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float) -> None:
"""Given a queue.Queue hooked-up to an attribute change accumulator, await a given expected sequence of attribute reports.
Args:
Expand Down Expand Up @@ -418,6 +422,7 @@ def __init__(self, expected_cluster: ClusterObjects.Cluster):
self._subscription = None
self._lock = threading.Lock()
self._q = queue.Queue()
self._endpoint_id = 0
self.reset()

def reset(self):
Expand All @@ -441,6 +446,7 @@ async def start(self, dev_ctrl, node_id: int, endpoint: int, fabric_filtered: bo
fabricFiltered=fabric_filtered,
keepSubscriptions=keepSubscriptions
)
self._endpoint_id = endpoint
self._subscription.SetAttributeUpdateCallback(self.__call__)
return self._subscription

Expand Down Expand Up @@ -513,6 +519,24 @@ def await_all_final_values_reported(self, expected_final_values: Iterable[Attrib
logging.info(f" -> {expected_element} found: {last_report_matches.get(expected_idx)}")
asserts.fail("Did not find all expected last report values before time-out")

def await_sequence_of_reports(self, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float) -> None:
"""Await a given expected sequence of attribute reports in the accumulator for the endpoint associated.
Args:
- attribute: attribute to match for reports to check.
- sequence: list of attribute values in order that are expected.
- timeout_sec: number of seconds to wait for.
*** WARNING: The queue contains every report since the sub was established. Use
self.reset() to make it empty. ***
This will fail current Mobly test with assertion failure if the data is not as expected in order.
Returns nothing on success so the test can go on.
"""
await_sequence_of_reports(report_queue=self.attribute_queue, endpoint_id=self._endpoint_id,
attribute=attribute, sequence=sequence, timeout_sec=timeout_sec)

@property
def attribute_queue(self) -> queue.Queue:
return self._q
Expand Down

0 comments on commit 1433016

Please sign in to comment.