From cb224f75c275993e9400a95faf2c3f1b7d9f7830 Mon Sep 17 00:00:00 2001 From: Thomas Lea <35579828+tleacmcsa@users.noreply.github.com> Date: Wed, 28 Aug 2024 12:09:00 -0500 Subject: [PATCH] Added TC-ACL-2-11 test script (#35163) Implement the M-ACL / Managed NIM / ARL test plan. The test plan (currently in review form) can be found here: CHIP-Specifications/chip-test-plans#4316 -------- * Added TC-ACL-2-11 test script * Restyled by autopep8 * Restyled by isort * fixed linter issues * review updates * Use network-manager-app for TC_ACL_2_11.py * Fix REPL build commands for network manager * add trace support to network-manager-app --------- Co-authored-by: Restyled.io Co-authored-by: tennessee.carmelveilleux@gmail.com --- .github/workflows/tests.yaml | 2 + examples/network-manager-app/linux/args.gni | 2 + src/python_testing/TC_ACL_2_11.py | 172 ++++++++++++++++++++ 3 files changed, 176 insertions(+) create mode 100644 src/python_testing/TC_ACL_2_11.py diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index dbbda0aa36580f..b37ed3c6ca4370 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -485,6 +485,7 @@ jobs: --target linux-x64-energy-management-ipv6only-no-ble-no-wifi-tsan-clang-test \ --target linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test \ --target linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test \ + --target linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test \ --target linux-x64-python-bindings \ build \ --copy-artifacts-to objdir-clone \ @@ -498,6 +499,7 @@ jobs: echo "LIT_ICD_APP: out/linux-x64-lit-icd-ipv6only-no-ble-no-wifi-tsan-clang-test/lit-icd-app" >> /tmp/test_env.yaml echo "CHIP_MICROWAVE_OVEN_APP: out/linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-microwave-oven-app" >> /tmp/test_env.yaml echo "CHIP_RVC_APP: out/linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-rvc-app" >> /tmp/test_env.yaml + echo "NETWORK_MANAGEMENT_APP: out/linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test/matter-network-manager-app" >> /tmp/test_env.yaml echo "TRACE_APP: out/trace_data/app-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml echo "TRACE_TEST_JSON: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml echo "TRACE_TEST_PERFETTO: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml diff --git a/examples/network-manager-app/linux/args.gni b/examples/network-manager-app/linux/args.gni index e97ddb13e7c46e..e463c7d01aaff4 100644 --- a/examples/network-manager-app/linux/args.gni +++ b/examples/network-manager-app/linux/args.gni @@ -25,3 +25,5 @@ chip_config_network_layer_ble = false # This enables AccessRestrictionList (ARL) support used by the NIM sample app chip_enable_access_restrictions = true + +matter_enable_tracing_support = true diff --git a/src/python_testing/TC_ACL_2_11.py b/src/python_testing/TC_ACL_2_11.py new file mode 100644 index 00000000000000..a979b1ca3f5b1a --- /dev/null +++ b/src/python_testing/TC_ACL_2_11.py @@ -0,0 +1,172 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: ${NETWORK_MANAGEMENT_APP} +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/quiet: True +# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json --commissioning-arl-entries "[{\"endpoint\": 1,\"cluster\": 1105,\"restrictions\": [{\"type\": 0,\"id\": 0}]}]" --arl-entries "[{\"endpoint\": 1,\"cluster\": 1105,\"restrictions\": [{\"type\": 0,\"id\": 0}]}]" +# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto +# === END CI TEST ARGUMENTS === + +import logging +import queue + +import chip.clusters as Clusters +from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction +from chip.clusters.ClusterObjects import ALL_ACCEPTED_COMMANDS, ALL_ATTRIBUTES, ALL_CLUSTERS, ClusterEvent +from chip.clusters.Objects import AccessControl +from chip.interaction_model import Status +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from mobly import asserts + + +class EventChangeCallback: + def __init__(self, expected_event: ClusterEvent, output: queue.Queue): + self._output = output + self._expected_cluster_id = expected_event.cluster_id + self._expected_event_id = expected_event.event_id + + def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction): + if res.Status == Status.Success and res.Header.ClusterId == self._expected_cluster_id and res.Header.EventId == self._expected_event_id: + logging.info( + f'Got subscription report for event {self._expected_event_id} on cluster {self._expected_cluster_id}: {res.Data}') + self._output.put(res) + + +def WaitForEventReport(q: queue.Queue, expected_event: ClusterEvent): + try: + res = q.get(block=True, timeout=10) + except queue.Empty: + asserts.fail("Failed to receive a report for the event {}".format(expected_event)) + + asserts.assert_equal(res.Header.ClusterId, expected_event.cluster_id, "Expected cluster ID not found in event report") + asserts.assert_equal(res.Header.EventId, expected_event.event_id, "Expected event ID not found in event report") + + +class TC_ACL_2_11(MatterBaseTest): + + def desc_TC_ACL_2_11(self) -> str: + return "[TC-ACL-2.11] Verification of Managed Device feature" + + def steps_TC_ACL_2_11(self) -> list[TestStep]: + steps = [ + TestStep(1, "Commissioning, already done"), + TestStep(2, "TH1 reads DUT Endpoint 0 AccessControl cluster CommissioningARL attribute"), + TestStep(3, "TH1 reads DUT Endpoint 0 AccessControl cluster ARL attribute"), + TestStep(4, "For each entry in ARL, iterate over each restriction and attempt access the restriction's ID on the Endpoint and Cluster in the ARL entry.", + "If the restriction is Type AttributeAccessForbidden, read the restriction's attribute ID and verify the response is UNSUPPORTED_ACCESS." + "If the restriction is Type AttributeWriteForbidden, write restriction's the attribute ID and verify the response is UNSUPPORTED_ACCESS." + "If the restriction is Type CommandForbidden, invoke the restriction's command ID and verify the response is UNSUPPORTED_ACCESS."), + TestStep(5, "TH1 sends DUT Endpoint 0 AccessControl cluster command ReviewFabricRestrictions"), + TestStep(6, "Wait for up to 1 hour. Follow instructions provided by device maker to remove all access restrictions", + "AccessRestrictionReviewUpdate event is received"), + TestStep(7, "TH1 reads DUT Endpoint 0 AccessControl cluster ARL attribute", "ARL is empty") + ] + return steps + + @async_test_body + async def test_TC_ACL_2_11(self): + self.step(1) + self.step(2) + await self.read_single_attribute_check_success( + endpoint=0, + cluster=Clusters.AccessControl, + attribute=Clusters.AccessControl.Attributes.CommissioningARL + ) + self.step(3) + arl = await self.read_single_attribute_check_success( + endpoint=0, + cluster=Clusters.AccessControl, + attribute=Clusters.AccessControl.Attributes.Arl + ) + self.step(4) + + care_struct = None + + for arl_entry in arl: + E1 = arl_entry.endpoint + C1 = arl_entry.cluster + R1 = arl_entry.restrictions + + care_struct = Clusters.AccessControl.Structs.AccessRestrictionEntryStruct(E1, C1, R1) + + cluster = ALL_CLUSTERS[C1] + + for restriction in R1: + restriction_type = restriction.type + ID1 = restriction.id + + attribute = ALL_ATTRIBUTES[C1][ID1] + command = ALL_ACCEPTED_COMMANDS[C1][ID1] + + if restriction_type == AccessControl.Enums.AccessRestrictionTypeEnum.kAttributeAccessForbidden: + await self.read_single_attribute_expect_error(cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess, endpoint=E1) + elif restriction_type == AccessControl.Enums.AccessRestrictionTypeEnum.kAttributeWriteForbidden: + status = await self.write_single_attribute(attribute_value=attribute, endpoint_id=E1) + asserts.assert_equal(status, Status.UnsupportedAccess, + f"Failed to verify UNSUPPORTED_ACCESS when writing to Attribute {ID1} Cluster {C1} Endpoint {E1}") + elif restriction_type == AccessControl.Enums.AccessRestrictionTypeEnum.kCommandForbidden: + result = await self.send_single_cmd(cmd=command, endpoint=E1) + asserts.assert_equal(result.status, Status.UnsupportedAccess, + f"Failed to verify UNSUPPORTED_ACCESS when sending command {ID1} to Cluster {C1} Endpoint {E1}") + + # Belongs to step 6, but needs to be subscribed before executing step 5: begin + arru_queue = queue.Queue() + arru_cb = EventChangeCallback(Clusters.AccessControl.Events.FabricRestrictionReviewUpdate, arru_queue) + + urgent = 1 + subscription_arru = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.FabricRestrictionReviewUpdate, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False) + subscription_arru.SetEventUpdateCallback(callback=arru_cb) + # end + + # Belongs to step 7, but needs to be subscribed before executing step 5: begin + arec_queue = queue.Queue() + arec_cb = EventChangeCallback(Clusters.AccessControl.Events.AccessRestrictionEntryChanged, arec_queue) + + urgent = 1 + subscription_arec = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessRestrictionEntryChanged, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False) + subscription_arec.SetEventUpdateCallback(callback=arec_cb) + # end + + self.step(5) + response = await self.send_single_cmd(cmd=Clusters.AccessControl.Commands.ReviewFabricRestrictions([care_struct]), endpoint=0) + asserts.assert_true(isinstance(response, Clusters.AccessControl.Commands.ReviewFabricRestrictionsResponse), + "Result is not of type ReviewFabricRestrictionsResponse") + + self.step(6) + logging.info("Please follow instructions provided by the product maker to remove all ARL entries") + WaitForEventReport(arru_queue, Clusters.AccessControl.Events.FabricRestrictionReviewUpdate) + + self.step(7) + cluster = Clusters.AccessControl + attribute = Clusters.AccessControl.Attributes.Arl + arl = await self.read_single_attribute_check_success( + node_id=self.dut_node_id, + endpoint=0, + cluster=cluster, + attribute=attribute + ) + asserts.assert_equal(arl, [], "Unexpected Arl; Not empty") + + +if __name__ == "__main__": + default_matter_test_main()