Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update IDM_1_4 to be more accurate and correct #31686

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions src/python_testing/TC_IDM_1_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from chip import ChipUtility
from chip.exceptions import ChipStackError
from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, type_matches
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches
from mobly import asserts


Expand All @@ -35,21 +35,33 @@ def must_use_timed_invoke(cls) -> bool:

class TC_IDM_1_4(MatterBaseTest):

def steps_TC_IDM_1_4(self) -> list[TestStep]:
steps = [TestStep(1, "Get remote node's MaxPathsPerInvoke", is_commissioning=True),
TestStep(2, "Sending `MaxPathsPerInvoke + 1` InvokeRequest if it fits into single MTU"),
TestStep(3, "Sending two InvokeRequests with identical paths"),
TestStep(4, "Sending two InvokeRequests with unique paths, but identical CommandRefs"),
TestStep(5, "Verify DUT responds to InvokeRequestMessage containing two valid paths"),
TestStep(6, "Verify DUT responds to InvokeRequestMessage containing one valid paths, and one InvokeRequest to unsupported endpoint"),
TestStep(7, "Verify DUT responds to InvokeRequestMessage containing two valid paths. One of which requires timed invoke, and TimedRequest in InvokeResponseMessage set to true, but never sending preceding Timed Invoke Action"),
TestStep(8, "Verify DUT responds to InvokeRequestMessage containing two valid paths. One of which requires timed invoke, and TimedRequest in InvokeResponseMessage set to true"),
TestStep(9, "Verify DUT capable of responding to request with multiple InvokeResponseMessages")]
return steps

@async_test_body
async def test_TC_IDM_1_4(self):
dev_ctrl = self.default_controller
dut_node_id = self.dut_node_id

self.print_step(0, "Commissioning - already done")

self.print_step(1, "Get remote node's MaxPathsPerInvoke")
self.step(1)
session_parameters = dev_ctrl.GetRemoteSessionParameters(dut_node_id)
max_paths_per_invoke = session_parameters.maxPathsPerInvoke

asserts.assert_greater_equal(max_paths_per_invoke, 1, "Unexpected error returned from unsupported endpoint")
asserts.assert_less_equal(max_paths_per_invoke, 65535, "Unexpected error returned from unsupported endpoint")
asserts.assert_greater_equal(max_paths_per_invoke, 1, "Max Paths per Invoke less than spec min")
asserts.assert_less_equal(max_paths_per_invoke, 65535, "Max Paths per Invoke greater than spec max")

self.print_step(2, "Sending `MaxPathsPerInvoke + 1` InvokeRequest if it fits into single MTU")
self.step(2)
# In practice, it was noticed that we could only fit 57 commands before we hit the MTU limit as a result we
# conservatively try putting up to 100 commands into an Invoke request. We are expecting one of 2 things to
# happen if max_paths_per_invoke + 1 is greater than what cap_for_batch_commands is set to:
Expand All @@ -59,7 +71,7 @@ async def test_TC_IDM_1_4(self):
# 2. Client (TH) able to send command. While unexpected we will hit two different test failure depending on
# what the server does.
# a. Server successfully handle command and send InvokeResponse with results of all individual commands
# being failure. In this case, test fails on unexpected successes like this
# being failure. In this case, test already will fail on unexpected successes.
# b. Server fails to handle command that is between cap_for_batch_commands and max_paths_per_invoke + 1.
# In this case, test fails as device should have actually succeeded and been caught in 2.a.
cap_for_batch_commands = 100
Expand All @@ -80,42 +92,43 @@ async def test_TC_IDM_1_4(self):
"Step 2 is always expected to try sending at least 2 command, something wrong with test logic")
try:
await dev_ctrl.TestOnlySendBatchCommands(dut_node_id, list_of_commands_to_send, remoteMaxPathsPerInvoke=number_of_commands_to_send)
# If you get the assert below it is likely because cap_for_batch_commands is actually too low.
# This might happen after TCP is enabled and DUT supports TCP.
# This might happen after TCP is enabled and DUT supports TCP. See comment above `cap_for_batch_commands`
# for more information.
asserts.assert_not_equal(number_of_commands_to_send, cap_for_batch_commands,
"Test needs to be updated! Soft cap `cap_for_batch_commands` added in test is no longer correct")
asserts.fail(
f"Unexpected success return from sending too many commands, we sent {number_of_commands_to_send}, test capped at {cap_for_batch_commands}")
except InteractionModelError as e:
# This check is for 2.a., mentioned above introduction of variable cap_for_batch_commands.
# This check is for 2.b, mentioned above. If this assert occurs test likely needs updating. Although DUT is still going to fail.
asserts.assert_equal(number_of_commands_to_send, max_paths_per_invoke + 1,
"Test didn't send as many command as max_paths_per_invoke + 1, likely due to MTU cap_for_batch_commands, but we still got an error from server. This should have been a success from server")
asserts.assert_equal(e.status, Status.InvalidAction,
"DUT sent back an unexpected error, we were expecting InvalidAction")
self.print_step(2, "DUT successfully failed to process `MaxPathsPerInvoke + 1` InvokeRequests")
logging.info("DUT successfully failed to process `MaxPathsPerInvoke + 1` InvokeRequests")
except ChipStackError as e:
chip_error_no_memory = 0x0b
asserts.assert_equal(e.err, chip_error_no_memory, "Unexpected error while trying to send InvokeRequest")
# TODO it is possible we want to confirm DUT can handle up to MTU max. But that is not in test plan as of right now.
# Additionally CommandSender is not currently set up to enable caller to fill up to MTU. This might be coming soon,
# just that it is not supported today.
self.print_step(2, "DUTs reported MaxPathsPerInvoke + 1 is larger than what fits into MTU. Test step is skipped")
logging.info("DUTs reported MaxPathsPerInvoke + 1 is larger than what fits into MTU. Test step is skipped")

if max_paths_per_invoke == 1:
# TODO(#31139) After issue is resolved use that API properly to mark tests steps as skipped
self.print_step(3, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(4, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(5, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(6, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(7, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(8, "Skipping test step as max_paths_per_invoke == 1")
self.print_step(9, "Skipping test step as max_paths_per_invoke == 1")
self.skip_step(3)
tehampson marked this conversation as resolved.
Show resolved Hide resolved
self.skip_step(4)
self.skip_step(5)
self.skip_step(6)
self.skip_step(7)
self.skip_step(8)
self.skip_step(9)
else:
await self.steps_3_to_9(False)

async def steps_3_to_9(self, dummy_value):
dev_ctrl = self.default_controller
dut_node_id = self.dut_node_id

self.print_step(3, "Sending sending two InvokeRequests with idential paths")
self.step(3)
command = Clusters.BasicInformation.Commands.MfgSpecificPing()
endpoint = 0
invoke_request_1 = Clusters.Command.InvokeRequestInfo(endpoint, command)
Expand All @@ -127,7 +140,7 @@ async def steps_3_to_9(self, dummy_value):
"DUT sent back an unexpected error, we were expecting InvalidAction")
logging.info("DUT successfully failed to process two InvokeRequests that contains non-unique paths")

self.print_step(4, "Sending two InvokeRequests with unique paths, but identical CommandRefs")
self.step(4)
endpoint = 0
command = Clusters.OperationalCredentials.Commands.CertificateChainRequest(
Clusters.OperationalCredentials.Enums.CertificateChainTypeEnum.kDACCertificate)
Expand All @@ -144,7 +157,7 @@ async def steps_3_to_9(self, dummy_value):
"DUT sent back an unexpected error, we were expecting InvalidAction")
logging.info("DUT successfully failed to process two InvokeRequests that contains non-unique CommandRef")

self.print_step(5, "Verify DUT is able to responsed to InvokeRequestMessage that contains two valid paths")
self.step(5)
endpoint = 0
command = Clusters.OperationalCredentials.Commands.CertificateChainRequest(
Clusters.OperationalCredentials.Enums.CertificateChainTypeEnum.kDACCertificate)
Expand All @@ -164,8 +177,7 @@ async def steps_3_to_9(self, dummy_value):
except InteractionModelError:
asserts.fail("DUT failed to successfully responded to a InvokeRequest action with two valid commands")

self.print_step(
6, "Verify DUT is able to responsed to InvokeRequestMessage that contains one paths InvokeRequest, and one InvokeRequest to unsupported endpoint")
self.step(6)
# First finding non-existent endpoint
wildcard_descriptor = await dev_ctrl.ReadAttribute(dut_node_id, [(Clusters.Descriptor)])
endpoints = list(wildcard_descriptor.keys())
Expand Down Expand Up @@ -195,7 +207,7 @@ async def steps_3_to_9(self, dummy_value):
except InteractionModelError:
asserts.fail("DUT failed to successfully responded to a InvokeRequest action with two valid commands")

self.print_step(7, "Verify DUT is able to responsed to InvokeRequestMessage that contains two valid paths. One of which requires timed invoke, and TimedRequest in InvokeResponseMessage set to true, but never sent preceding Timed Invoke Action")
self.step(7)
endpoint = 0
command = Clusters.GroupKeyManagement.Commands.KeySetRead(0)
invoke_request_1 = Clusters.Command.InvokeRequestInfo(endpoint, command)
Expand All @@ -214,7 +226,7 @@ async def steps_3_to_9(self, dummy_value):
asserts.assert_equal(e.status, Status.TimedRequestMismatch,
"Unexpected error response from Invoke with TimedRequest flag and no TimedInvoke")

self.print_step(8, "Verify DUT is able to responsed to InvokeRequestMessage that contains two valid paths. One of which requires timed invoke, and TimedRequest in InvokeResponseMessage set to true")
self.step(8)
endpoint = 0
command = Clusters.GroupKeyManagement.Commands.KeySetRead(0)
invoke_request_1 = Clusters.Command.InvokeRequestInfo(endpoint, command)
Expand All @@ -235,12 +247,12 @@ async def steps_3_to_9(self, dummy_value):
window_not_open_cluster_error = 4
asserts.assert_equal(result[1].clusterStatus, window_not_open_cluster_error,
"Timed command, RevokeCommissioning, failed with incorrect cluster code")
self.print_step(
8, "DUT successfully responded to a InvokeRequest action with two valid commands. One of which required timed invoke, and TimedRequest in InvokeResponseMessage was set to true")
logging.info("DUT successfully responded to a InvokeRequest action with two valid commands. One of which required timed invoke, and TimedRequest in InvokeResponseMessage was set to true")
except InteractionModelError:
asserts.fail("DUT failed with non-path specific error when path specific error was expected")

self.print_step(9, "Skipping test until https://github.com/project-chip/connectedhomeip/issues/31434 resolved")
# Skipping test until https://github.com/project-chip/connectedhomeip/issues/31434 resolved
self.skip_step(9)


if __name__ == "__main__":
Expand Down
Loading