Skip to content

Commit

Permalink
Update IDM_1_4 to be more accurate and correct (#31686)
Browse files Browse the repository at this point in the history
* Post code freeze crunch fixes

* Fix things

* Restyled by autopep8

* Small fix

* Restyled by autopep8

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
2 people authored and pull[bot] committed May 24, 2024
1 parent b4b6937 commit 2356240
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 30 deletions.
66 changes: 37 additions & 29 deletions src/python_testing/TC_IDM_1_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from chip import ChipUtility
from chip.exceptions import ChipStackError
from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, type_matches
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches
from mobly import asserts


Expand All @@ -35,21 +35,33 @@ def must_use_timed_invoke(cls) -> bool:

class TC_IDM_1_4(MatterBaseTest):

def steps_TC_IDM_1_4(self) -> list[TestStep]:
steps = [TestStep(1, "Get remote node's MaxPathsPerInvoke", is_commissioning=True),
TestStep(2, "Sending `MaxPathsPerInvoke + 1` InvokeRequest if it fits into single MTU"),
TestStep(3, "Sending two InvokeRequests with identical paths"),
TestStep(4, "Sending two InvokeRequests with unique paths, but identical CommandRefs"),
TestStep(5, "Verify DUT responds to InvokeRequestMessage containing two valid paths"),
TestStep(6, "Verify DUT responds to InvokeRequestMessage containing one valid paths, and one InvokeRequest to unsupported endpoint"),
TestStep(7, "Verify DUT responds to InvokeRequestMessage containing two valid paths. One of which requires timed invoke, and TimedRequest in InvokeResponseMessage set to true, but never sending preceding Timed Invoke Action"),
TestStep(8, "Verify DUT responds to InvokeRequestMessage containing two valid paths. One of which requires timed invoke, and TimedRequest in InvokeResponseMessage set to true"),
TestStep(9, "Verify DUT capable of responding to request with multiple InvokeResponseMessages")]
return steps

@async_test_body
async def test_TC_IDM_1_4(self):
dev_ctrl = self.default_controller
dut_node_id = self.dut_node_id

self.print_step(0, "Commissioning - already done")

self.print_step(1, "Get remote node's MaxPathsPerInvoke")
self.step(1)
session_parameters = dev_ctrl.GetRemoteSessionParameters(dut_node_id)
max_paths_per_invoke = session_parameters.maxPathsPerInvoke

asserts.assert_greater_equal(max_paths_per_invoke, 1, "Unexpected error returned from unsupported endpoint")
asserts.assert_less_equal(max_paths_per_invoke, 65535, "Unexpected error returned from unsupported endpoint")
asserts.assert_greater_equal(max_paths_per_invoke, 1, "Max Paths per Invoke less than spec min")
asserts.assert_less_equal(max_paths_per_invoke, 65535, "Max Paths per Invoke greater than spec max")

self.print_step(2, "Sending `MaxPathsPerInvoke + 1` InvokeRequest if it fits into single MTU")
self.step(2)
# In practice, it was noticed that we could only fit 57 commands before we hit the MTU limit as a result we
# conservatively try putting up to 100 commands into an Invoke request. We are expecting one of 2 things to
# happen if max_paths_per_invoke + 1 is greater than what cap_for_batch_commands is set to:
Expand All @@ -59,7 +71,7 @@ async def test_TC_IDM_1_4(self):
# 2. Client (TH) able to send command. While unexpected we will hit two different test failure depending on
# what the server does.
# a. Server successfully handle command and send InvokeResponse with results of all individual commands
# being failure. In this case, test fails on unexpected successes like this
# being failure. In this case, test already will fail on unexpected successes.
# b. Server fails to handle command that is between cap_for_batch_commands and max_paths_per_invoke + 1.
# In this case, test fails as device should have actually succeeded and been caught in 2.a.
cap_for_batch_commands = 100
Expand All @@ -80,42 +92,39 @@ async def test_TC_IDM_1_4(self):
"Step 2 is always expected to try sending at least 2 command, something wrong with test logic")
try:
await dev_ctrl.TestOnlySendBatchCommands(dut_node_id, list_of_commands_to_send, remoteMaxPathsPerInvoke=number_of_commands_to_send)
# If you get the assert below it is likely because cap_for_batch_commands is actually too low.
# This might happen after TCP is enabled and DUT supports TCP.
# This might happen after TCP is enabled and DUT supports TCP. See comment above `cap_for_batch_commands`
# for more information.
asserts.assert_not_equal(number_of_commands_to_send, cap_for_batch_commands,
"Test needs to be updated! Soft cap `cap_for_batch_commands` used in test is no longer correct")
asserts.fail(
f"Unexpected success return from sending too many commands, we sent {number_of_commands_to_send}, test capped at {cap_for_batch_commands}")
except InteractionModelError as e:
# This check is for 2.a., mentioned above introduction of variable cap_for_batch_commands.
# This check is for 2.b, mentioned above. If this assert occurs, test likely needs updating. Although DUT
# is still going to fail since it seemingly is failing to process a smaller number then it is reporting
# that it is capable of processing.
asserts.assert_equal(number_of_commands_to_send, max_paths_per_invoke + 1,
"Test didn't send as many command as max_paths_per_invoke + 1, likely due to MTU cap_for_batch_commands, but we still got an error from server. This should have been a success from server")
asserts.assert_equal(e.status, Status.InvalidAction,
"DUT sent back an unexpected error, we were expecting InvalidAction")
self.print_step(2, "DUT successfully failed to process `MaxPathsPerInvoke + 1` InvokeRequests")
logging.info("DUT successfully failed to process `MaxPathsPerInvoke + 1` InvokeRequests")
except ChipStackError as e:
chip_error_no_memory = 0x0b
asserts.assert_equal(e.err, chip_error_no_memory, "Unexpected error while trying to send InvokeRequest")
# TODO it is possible we want to confirm DUT can handle up to MTU max. But that is not in test plan as of right now.
# Additionally CommandSender is not currently set up to enable caller to fill up to MTU. This might be coming soon,
# just that it is not supported today.
self.print_step(2, "DUTs reported MaxPathsPerInvoke + 1 is larger than what fits into MTU. Test step is skipped")
logging.info("DUTs reported MaxPathsPerInvoke + 1 is larger than what fits into MTU. Test step is skipped")

if max_paths_per_invoke == 1:
# TODO(#31139) After issue is resolved use that API properly to mark tests steps as skipped
self.print_step(3, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(4, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(5, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(6, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(7, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(8, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(9, "Skipping test step as max_paths_per_invoke == 1")
self.skip_all_remaining_steps(3)
else:
await self.steps_3_to_9(False)

async def steps_3_to_9(self, dummy_value):
dev_ctrl = self.default_controller
dut_node_id = self.dut_node_id

self.print_step(3, "Sending sending two InvokeRequests with idential paths")
self.step(3)
command = Clusters.BasicInformation.Commands.MfgSpecificPing()
endpoint = 0
invoke_request_1 = Clusters.Command.InvokeRequestInfo(endpoint, command)
Expand All @@ -127,7 +136,7 @@ async def steps_3_to_9(self, dummy_value):
"DUT sent back an unexpected error, we were expecting InvalidAction")
logging.info("DUT successfully failed to process two InvokeRequests that contains non-unique paths")

self.print_step(4, "Sending two InvokeRequests with unique paths, but identical CommandRefs")
self.step(4)
endpoint = 0
command = Clusters.OperationalCredentials.Commands.CertificateChainRequest(
Clusters.OperationalCredentials.Enums.CertificateChainTypeEnum.kDACCertificate)
Expand All @@ -144,7 +153,7 @@ async def steps_3_to_9(self, dummy_value):
"DUT sent back an unexpected error, we were expecting InvalidAction")
logging.info("DUT successfully failed to process two InvokeRequests that contains non-unique CommandRef")

self.print_step(5, "Verify DUT is able to responsed to InvokeRequestMessage that contains two valid paths")
self.step(5)
endpoint = 0
command = Clusters.OperationalCredentials.Commands.CertificateChainRequest(
Clusters.OperationalCredentials.Enums.CertificateChainTypeEnum.kDACCertificate)
Expand All @@ -164,8 +173,7 @@ async def steps_3_to_9(self, dummy_value):
except InteractionModelError:
asserts.fail("DUT failed to successfully responded to a InvokeRequest action with two valid commands")

self.print_step(
6, "Verify DUT is able to responsed to InvokeRequestMessage that contains one paths InvokeRequest, and one InvokeRequest to unsupported endpoint")
self.step(6)
# First finding non-existent endpoint
wildcard_descriptor = await dev_ctrl.ReadAttribute(dut_node_id, [(Clusters.Descriptor)])
endpoints = list(wildcard_descriptor.keys())
Expand Down Expand Up @@ -195,7 +203,7 @@ async def steps_3_to_9(self, dummy_value):
except InteractionModelError:
asserts.fail("DUT failed to successfully responded to a InvokeRequest action with two valid commands")

self.print_step(7, "Verify DUT is able to responsed to InvokeRequestMessage that contains two valid paths. One of which requires timed invoke, and TimedRequest in InvokeResponseMessage set to true, but never sent preceding Timed Invoke Action")
self.step(7)
endpoint = 0
command = Clusters.GroupKeyManagement.Commands.KeySetRead(0)
invoke_request_1 = Clusters.Command.InvokeRequestInfo(endpoint, command)
Expand All @@ -214,7 +222,7 @@ async def steps_3_to_9(self, dummy_value):
asserts.assert_equal(e.status, Status.TimedRequestMismatch,
"Unexpected error response from Invoke with TimedRequest flag and no TimedInvoke")

self.print_step(8, "Verify DUT is able to responsed to InvokeRequestMessage that contains two valid paths. One of which requires timed invoke, and TimedRequest in InvokeResponseMessage set to true")
self.step(8)
endpoint = 0
command = Clusters.GroupKeyManagement.Commands.KeySetRead(0)
invoke_request_1 = Clusters.Command.InvokeRequestInfo(endpoint, command)
Expand All @@ -235,12 +243,12 @@ async def steps_3_to_9(self, dummy_value):
window_not_open_cluster_error = 4
asserts.assert_equal(result[1].clusterStatus, window_not_open_cluster_error,
"Timed command, RevokeCommissioning, failed with incorrect cluster code")
self.print_step(
8, "DUT successfully responded to a InvokeRequest action with two valid commands. One of which required timed invoke, and TimedRequest in InvokeResponseMessage was set to true")
logging.info("DUT successfully responded to a InvokeRequest action with two valid commands. One of which required timed invoke, and TimedRequest in InvokeResponseMessage was set to true")
except InteractionModelError:
asserts.fail("DUT failed with non-path specific error when path specific error was expected")

self.print_step(9, "Skipping test until https://github.com/project-chip/connectedhomeip/issues/31434 resolved")
# Skipping test until https://github.com/project-chip/connectedhomeip/issues/31434 resolved
self.skip_step(9)


if __name__ == "__main__":
Expand Down
12 changes: 11 additions & 1 deletion src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,6 @@ def mark_current_step_skipped(self):
steps = self.get_test_steps(self.current_test_info.name)
if self.current_step_index == 0:
asserts.fail("Script error: mark_current_step_skipped cannot be called before step()")
print(self.current_step_index-1)
num = steps[self.current_step_index-1].test_plan_number
except KeyError:
num = self.current_step_index
Expand All @@ -906,6 +905,17 @@ def skip_step(self, step):
self.step(step)
self.mark_current_step_skipped()

def skip_all_remaining_steps(self, starting_step):
''' Skips all remaining test steps starting with provided starting step
starting_step must be provided, and is not derived intentionally. By providing argument
test is more deliberately identifying where test skips are starting from, making
it easier to validate against the test plan for correctness.
'''
last_step = len(self.get_test_steps(self.current_test_info.name)) + 1
for index in range(starting_step, last_step):
self.skip_step(index)

def step(self, step: typing.Union[int, str]):
test_name = self.current_test_info.name
steps = self.get_test_steps(test_name)
Expand Down

0 comments on commit 2356240

Please sign in to comment.