diff --git a/src/python_testing/TC_ACL_2_11.py b/src/python_testing/TC_ACL_2_11.py index 2ac145d7645d17..91df448217e2f9 100644 --- a/src/python_testing/TC_ACL_2_11.py +++ b/src/python_testing/TC_ACL_2_11.py @@ -76,15 +76,18 @@ def steps_TC_ACL_2_11(self) -> list[TestStep]: "If the restriction is Type AttributeAccessForbidden, read the restriction's attribute ID and verify the response is ACCESS_RESTRICTED." "If the restriction is Type AttributeWriteForbidden, write restriction's the attribute ID and verify the response is ACCESS_RESTRICTED." "If the restriction is Type CommandForbidden, invoke the restriction's command ID and verify the response is ACCESS_RESTRICTED."), - TestStep(5, "TH1 sends DUT Endpoint 0 AccessControl cluster command ReviewFabricRestrictions"), - TestStep(6, "Wait for up to 1 hour. Follow instructions provided by device maker to remove all access restrictions", + TestStep(5, "Ensure protected attributes are accessible"), + TestStep(6, "TH1 sends DUT Endpoint 0 AccessControl cluster command ReviewFabricRestrictions"), + TestStep(7, "Wait for up to 1 hour. Follow instructions provided by device maker to remove all access restrictions", "AccessRestrictionReviewUpdate event is received"), - TestStep(7, "TH1 reads DUT Endpoint 0 AccessControl cluster ARL attribute", "ARL is empty") + TestStep(8, "TH1 reads DUT Endpoint 0 AccessControl cluster ARL attribute", "ARL is empty") ] return steps @async_test_body async def test_TC_ACL_2_11(self): + dev_ctrl = self.default_controller + dut_node_id = self.dut_node_id self.step(1) self.step(2) await self.read_single_attribute_check_success( @@ -134,7 +137,7 @@ async def test_TC_ACL_2_11(self): arru_cb = EventChangeCallback(Clusters.AccessControl.Events.FabricRestrictionReviewUpdate, arru_queue) urgent = 1 - subscription_arru = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.FabricRestrictionReviewUpdate, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False) + subscription_arru = await dev_ctrl.ReadEvent(dut_node_id, events=[(0, Clusters.AccessControl.Events.FabricRestrictionReviewUpdate, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False) subscription_arru.SetEventUpdateCallback(callback=arru_cb) # end @@ -143,20 +146,37 @@ async def test_TC_ACL_2_11(self): arec_cb = EventChangeCallback(Clusters.AccessControl.Events.AccessRestrictionEntryChanged, arec_queue) urgent = 1 - subscription_arec = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessRestrictionEntryChanged, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False) + subscription_arec = await dev_ctrl.ReadEvent(dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessRestrictionEntryChanged, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False) subscription_arec.SetEventUpdateCallback(callback=arec_cb) # end self.step(5) + root_node_endpoint = 0 + root_part_list = await dev_ctrl.ReadAttribute(dut_node_id, [(root_node_endpoint, Clusters.Descriptor.Attributes.PartsList)]) + set_of_endpoints = set(root_part_list[root_node_endpoint] + [Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]) + for endpoint in set_of_endpoints: + ret = await dev_ctrl.ReadAttribute(dut_node_id, [(endpoint, Clusters.Descriptor.Attributes.ServerList)]) + server_list = ret[endpoint][Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList] + for server in server_list: + cluster = Clusters.ClusterObjects.ALL_CLUSTERS[server] + await dev_ctrl.ReadAttribute(dut_node_id, [endpoint, + cluster.Attributes.GeneratedCommandList, + cluster.Attributes.AcceptedCommandList, + cluster.Attributes.AttributeList, + cluster.Attributes.FeatureMap, + cluster.Attributes.ClusterRevision]) + + self.step(6) response = await self.send_single_cmd(cmd=Clusters.AccessControl.Commands.ReviewFabricRestrictions([care_struct]), endpoint=0) asserts.assert_true(isinstance(response, Clusters.AccessControl.Commands.ReviewFabricRestrictionsResponse), "Result is not of type ReviewFabricRestrictionsResponse") - self.step(6) + self.step(7) logging.info("Please follow instructions provided by the product maker to remove all ARL entries") WaitForEventReport(arru_queue, Clusters.AccessControl.Events.FabricRestrictionReviewUpdate) - self.step(7) + self.step(8) cluster = Clusters.AccessControl attribute = Clusters.AccessControl.Attributes.Arl arl = await self.read_single_attribute_check_success(