Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update TC_ACL_2_11 for global attributes #35286

Merged
merged 3 commits into from
Aug 29, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions src/python_testing/TC_ACL_2_11.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,21 @@ def steps_TC_ACL_2_11(self) -> list[TestStep]:
TestStep(2, "TH1 reads DUT Endpoint 0 AccessControl cluster CommissioningARL attribute"),
TestStep(3, "TH1 reads DUT Endpoint 0 AccessControl cluster ARL attribute"),
TestStep(4, "For each entry in ARL, iterate over each restriction and attempt access the restriction's ID on the Endpoint and Cluster in the ARL entry.",
"If the restriction is Type AttributeAccessForbidden, read the restriction's attribute ID and verify the response is UNSUPPORTED_ACCESS."
"If the restriction is Type AttributeWriteForbidden, write restriction's the attribute ID and verify the response is UNSUPPORTED_ACCESS."
"If the restriction is Type CommandForbidden, invoke the restriction's command ID and verify the response is UNSUPPORTED_ACCESS."),
TestStep(5, "TH1 sends DUT Endpoint 0 AccessControl cluster command ReviewFabricRestrictions"),
TestStep(6, "Wait for up to 1 hour. Follow instructions provided by device maker to remove all access restrictions",
"If the restriction is Type AttributeAccessForbidden, read the restriction's attribute ID and verify the response is ACCESS_RESTRICTED."
"If the restriction is Type AttributeWriteForbidden, write restriction's the attribute ID and verify the response is ACCESS_RESTRICTED."
"If the restriction is Type CommandForbidden, invoke the restriction's command ID and verify the response is ACCESS_RESTRICTED."),
TestStep(5, "Ensure protected attributes are accessible"),
TestStep(6, "TH1 sends DUT Endpoint 0 AccessControl cluster command ReviewFabricRestrictions"),
TestStep(7, "Wait for up to 1 hour. Follow instructions provided by device maker to remove all access restrictions",
"AccessRestrictionReviewUpdate event is received"),
TestStep(7, "TH1 reads DUT Endpoint 0 AccessControl cluster ARL attribute", "ARL is empty")
TestStep(8, "TH1 reads DUT Endpoint 0 AccessControl cluster ARL attribute", "ARL is empty")
]
return steps

@async_test_body
async def test_TC_ACL_2_11(self):
dev_ctrl = self.default_controller
dut_node_id = self.dut_node_id
self.step(1)
self.step(2)
await self.read_single_attribute_check_success(
Expand Down Expand Up @@ -119,22 +122,22 @@ async def test_TC_ACL_2_11(self):
command = ALL_ACCEPTED_COMMANDS[C1][ID1]

if restriction_type == AccessControl.Enums.AccessRestrictionTypeEnum.kAttributeAccessForbidden:
await self.read_single_attribute_expect_error(cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess, endpoint=E1)
await self.read_single_attribute_expect_error(cluster=cluster, attribute=attribute, error=Status.AccessRestricted, endpoint=E1)
elif restriction_type == AccessControl.Enums.AccessRestrictionTypeEnum.kAttributeWriteForbidden:
status = await self.write_single_attribute(attribute_value=attribute, endpoint_id=E1)
asserts.assert_equal(status, Status.UnsupportedAccess,
f"Failed to verify UNSUPPORTED_ACCESS when writing to Attribute {ID1} Cluster {C1} Endpoint {E1}")
asserts.assert_equal(status, Status.AccessRestricted,
f"Failed to verify ACCESS_RESTRICTED when writing to Attribute {ID1} Cluster {C1} Endpoint {E1}")
elif restriction_type == AccessControl.Enums.AccessRestrictionTypeEnum.kCommandForbidden:
result = await self.send_single_cmd(cmd=command, endpoint=E1)
asserts.assert_equal(result.status, Status.UnsupportedAccess,
f"Failed to verify UNSUPPORTED_ACCESS when sending command {ID1} to Cluster {C1} Endpoint {E1}")
asserts.assert_equal(result.status, Status.AccessRestricted,
f"Failed to verify ACCESS_RESTRICTED when sending command {ID1} to Cluster {C1} Endpoint {E1}")

# Belongs to step 6, but needs to be subscribed before executing step 5: begin
arru_queue = queue.Queue()
arru_cb = EventChangeCallback(Clusters.AccessControl.Events.FabricRestrictionReviewUpdate, arru_queue)

urgent = 1
subscription_arru = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.FabricRestrictionReviewUpdate, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False)
subscription_arru = await dev_ctrl.ReadEvent(dut_node_id, events=[(0, Clusters.AccessControl.Events.FabricRestrictionReviewUpdate, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False)
subscription_arru.SetEventUpdateCallback(callback=arru_cb)
# end

Expand All @@ -143,20 +146,37 @@ async def test_TC_ACL_2_11(self):
arec_cb = EventChangeCallback(Clusters.AccessControl.Events.AccessRestrictionEntryChanged, arec_queue)

urgent = 1
subscription_arec = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessRestrictionEntryChanged, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False)
subscription_arec = await dev_ctrl.ReadEvent(dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessRestrictionEntryChanged, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False)
subscription_arec.SetEventUpdateCallback(callback=arec_cb)
# end

self.step(5)
root_node_endpoint = 0
root_part_list = await dev_ctrl.ReadAttribute(dut_node_id, [(root_node_endpoint, Clusters.Descriptor.Attributes.PartsList)])
set_of_endpoints = set(root_part_list[root_node_endpoint]
[Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList])
for endpoint in set_of_endpoints:
ret = await dev_ctrl.ReadAttribute(dut_node_id, [(endpoint, Clusters.Descriptor.Attributes.ServerList)])
server_list = ret[endpoint][Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList]
for server in server_list:
cluster = Clusters.ClusterObjects.ALL_CLUSTERS[server]
attribute_data = await dev_ctrl.ReadAttribute(dut_node_id, [endpoint,
cluster.Attributes.GeneratedCommandList,
cluster.Attributes.AcceptedCommandList,
cluster.Attributes.AttributeList,
cluster.Attributes.FeatureMap,
cluster.Attributes.ClusterRevision])

self.step(6)
response = await self.send_single_cmd(cmd=Clusters.AccessControl.Commands.ReviewFabricRestrictions([care_struct]), endpoint=0)
asserts.assert_true(isinstance(response, Clusters.AccessControl.Commands.ReviewFabricRestrictionsResponse),
"Result is not of type ReviewFabricRestrictionsResponse")

self.step(6)
self.step(7)
logging.info("Please follow instructions provided by the product maker to remove all ARL entries")
WaitForEventReport(arru_queue, Clusters.AccessControl.Events.FabricRestrictionReviewUpdate)

self.step(7)
self.step(8)
cluster = Clusters.AccessControl
attribute = Clusters.AccessControl.Attributes.Arl
arl = await self.read_single_attribute_check_success(
Expand Down
Loading