diff --git a/src/controller/python/chip/FabricAdmin.py b/src/controller/python/chip/FabricAdmin.py index 3cfe03ffdb7e95..97a729035f811e 100644 --- a/src/controller/python/chip/FabricAdmin.py +++ b/src/controller/python/chip/FabricAdmin.py @@ -102,7 +102,7 @@ def NewController(self, nodeId: int = None, paaTrustStorePath: str = "", useTest raise RuntimeError(f"Provided NodeId {nodeId} collides with an existing controller instance!") self.logger().warning( - f"Allocating new controller with CaIndex: {self._certificateAuthority.caIndex}, FabricId: 0x{self._fabricId:016X}, NodeId: 0x{nodeId:016X}") + f"Allocating new controller with CaIndex: {self._certificateAuthority.caIndex}, FabricId: 0x{self._fabricId:016X}, NodeId: 0x{nodeId:016X}, CatTags: {catTags}") controller = ChipDeviceCtrl.ChipDeviceController(opCredsContext=self._certificateAuthority.GetOpCredsContext(), fabricId=self._fabricId, nodeId=nodeId, adminVendorId=self._vendorId, paaTrustStorePath=paaTrustStorePath, useTestCommissioner=useTestCommissioner, fabricAdmin=self, catTags=catTags) diff --git a/src/controller/python/chip/utils/CommissioningBuildingBlocks.py b/src/controller/python/chip/utils/CommissioningBuildingBlocks.py index 65d1bf7091b3eb..b743c89a3cebd1 100644 --- a/src/controller/python/chip/utils/CommissioningBuildingBlocks.py +++ b/src/controller/python/chip/utils/CommissioningBuildingBlocks.py @@ -30,7 +30,7 @@ _UINT16_MAX = 65535 -logger = logging.getLogger() +logger = logging.getLogger('CommissioningBuildingBlocks') async def _IsNodeInFabricList(devCtrl, nodeId): @@ -43,7 +43,7 @@ async def _IsNodeInFabricList(devCtrl, nodeId): return False -async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDeviceController, privilege: Clusters.AccessControl.Enums.Privilege, targetNodeId: int): +async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDeviceController, privilege: Clusters.AccessControl.Enums.Privilege, targetNodeId: int, targetCatTags: typing.List[int] = []): ''' Given an existing controller with admin privileges over a target node, grants the specified privilege to the new ChipDeviceController instance to the entire Node. This is achieved by updating the ACL entries on the target. @@ -53,20 +53,29 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic Args: adminCtrl: ChipDeviceController instance with admin privileges over the target node grantedCtrl: ChipDeviceController instance that is being granted the new privilege. - privilege: Privilege to grant to the granted controller + privilege: Privilege to grant to the granted controller. If None, no privilege is granted. targetNodeId: Target node to which the controller is granted privilege. + targetCatTag: Target 32-bit CAT tag that is granted privilege. If provided, this will be used in the subject list instead of the nodeid of that of grantedCtrl. ''' - data = await adminCtrl.ReadAttribute(targetNodeId, [(Clusters.AccessControl.Attributes.Acl)]) if 0 not in data: raise ValueError("Did not get back any data (possible cause: controller has no access..") currentAcls = data[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl] + if len(targetCatTags) != 0: + # Convert to a full-qualified 32-bit Node Identifier + targetSubjects = [tag | 0xFFFF_FFFD_0000_0000 for tag in targetCatTags] + else: + targetSubjects = [grantedCtrl.nodeId] + + if (len(targetSubjects) > 4): + raise ValueError(f"List of target subjects of len {len(targetSubjects)} exceeeded the minima of 4!") + # Step 1: Wipe the subject from all existing ACLs. for acl in currentAcls: if (acl.subjects != NullValue): - acl.subjects = [subject for subject in acl.subjects if subject != grantedCtrl.nodeId] + acl.subjects = [subject for subject in acl.subjects if subject not in targetSubjects] if (privilege): addedPrivilege = False @@ -75,9 +84,11 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic # the existing privilege in that entry matches our desired privilege. for acl in currentAcls: if acl.privilege == privilege: - if grantedCtrl.nodeId not in acl.subjects: - acl.subjects.append(grantedCtrl.nodeId) + subjectSet = set(acl.subjects) + subjectSet.update(targetSubjects) + acl.subjects = list(subjectSet) addedPrivilege = True + break # Step 3: If there isn't an existing entry to add to, make a new one. if (not(addedPrivilege)): @@ -86,10 +97,12 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic f"Cannot add another ACL entry to grant privilege to existing count of {currentAcls} ACLs -- will exceed minimas!") currentAcls.append(Clusters.AccessControl.Structs.AccessControlEntry(privilege=privilege, authMode=Clusters.AccessControl.Enums.AuthMode.kCase, - subjects=[grantedCtrl.nodeId])) + subjects=targetSubjects)) # Step 4: Prune ACLs which have empty subjects. currentAcls = [acl for acl in currentAcls if acl.subjects != NullValue and len(acl.subjects) != 0] + + logger.info(f'GrantPrivilege: Writing acls: {currentAcls}') await adminCtrl.WriteAttribute(targetNodeId, [(0, Clusters.AccessControl.Attributes.Acl(currentAcls))]) @@ -102,14 +115,14 @@ async def CreateControllersOnFabric(fabricAdmin: FabricAdmin, adminDevCtrl: Chip controllerNodeIds: List of desired nodeIds for the controllers. privilege: The specific ACL privilege to grant to the newly minted controllers. targetNodeId: The Node ID of the target. - catTags: CAT Tags to include in the NOC of controller + catTags: CAT Tags to include in the NOC of controller, as well as when setting up the ACLs on the target. ''' controllerList = [] for nodeId in controllerNodeIds: newController = fabricAdmin.NewController(nodeId=nodeId, catTags=catTags) - await GrantPrivilege(adminDevCtrl, newController, privilege, targetNodeId) + await GrantPrivilege(adminDevCtrl, newController, privilege, targetNodeId, catTags) controllerList.append(newController) return controllerList diff --git a/src/controller/python/test/test_scripts/base.py b/src/controller/python/test/test_scripts/base.py index 8488b2e80b6ce6..8665c288276603 100644 --- a/src/controller/python/test/test_scripts/base.py +++ b/src/controller/python/test/test_scripts/base.py @@ -40,6 +40,7 @@ import copy import secrets import faulthandler +import ipdb logger = logging.getLogger('PythonMatterControllerTEST') logger.setLevel(logging.INFO) @@ -389,31 +390,28 @@ def TestFailsafe(self, nodeid: int): async def TestControllerCATValues(self, nodeid: int): ''' This tests controllers using CAT Values ''' - # Allocate a new controller instance with a CAT tag. - newController = self.fabricAdmin.NewController(nodeId=300, catTags=[0x00010001]) + newControllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(fabricAdmin=self.fabricAdmin, adminDevCtrl=self.devCtrl, controllerNodeIds=[300], targetNodeId=nodeid, privilege=None, catTags=[0x0001_0001]) # Read out an attribute using the new controller. It has no privileges, so this should fail with an UnsupportedAccess error. - res = await newController.ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)]) + res = await newControllers[0].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)]) if(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl].Reason.status != IM.Status.UnsupportedAccess): self.logger.error(f"1: Received data instead of an error:{res}") return False - # Do a read-modify-write operation on the ACL list to add the CAT tag to the ACL list. - aclList = (await self.devCtrl.ReadAttribute(nodeid, [(0, Clusters.AccessControl.Attributes.Acl)]))[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl] - origAclList = copy.deepcopy(aclList) - aclList[0].subjects.append(0xFFFFFFFD00010001) - await self.devCtrl.WriteAttribute(nodeid, [(0, Clusters.AccessControl.Attributes.Acl(aclList))]) + # Grant the new controller privilege by adding the CAT tag to the subject. + await CommissioningBuildingBlocks.GrantPrivilege(adminCtrl=self.devCtrl, grantedCtrl=newControllers[0], privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=nodeid, targetCatTags=[0x0001_0001]) # Read out the attribute again - this time, it should succeed. - res = await newController.ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)]) + res = await newControllers[0].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)]) if (type(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][0]) != Clusters.AccessControl.Structs.AccessControlEntry): self.logger.error(f"2: Received something other than data:{res}") return False - # Write back the old entry to reset ACL list back. - await self.devCtrl.WriteAttribute(nodeid, [(0, Clusters.AccessControl.Attributes.Acl(origAclList))]) - newController.Shutdown() + # Reset the privilege back to pre-test. + await CommissioningBuildingBlocks.GrantPrivilege(adminCtrl=self.devCtrl, grantedCtrl=newControllers[0], privilege=None, targetNodeId=nodeid) + + newControllers[0].Shutdown() return True diff --git a/src/python_testing/TC_RR_1_1.py b/src/python_testing/TC_RR_1_1.py index b5076d5df78722..00dd9a111b8308 100644 --- a/src/python_testing/TC_RR_1_1.py +++ b/src/python_testing/TC_RR_1_1.py @@ -114,7 +114,7 @@ async def test_TC_RR_1_1(self): client_list.append(dev_ctrl) if num_controllers_per_fabric > 1: - new_controllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(fabricAdmin=dev_ctrl.fabricAdmin, adminDevCtrl=dev_ctrl, controllerNodeIds=node_ids, privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=self.dut_node_id) + new_controllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(fabricAdmin=dev_ctrl.fabricAdmin, adminDevCtrl=dev_ctrl, controllerNodeIds=node_ids, privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=self.dut_node_id, catTags=[0x0001_0001]) for controller in new_controllers: controller.name = all_names.pop(0) client_list.extend(new_controllers) @@ -126,14 +126,14 @@ async def test_TC_RR_1_1(self): new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=admin_index) - new_admin_ctrl = new_fabric_admin.NewController(nodeId=dev_ctrl.nodeId) + new_admin_ctrl = new_fabric_admin.NewController(nodeId=dev_ctrl.nodeId, catTags=[0x0001_0001]) new_admin_ctrl.name = all_names.pop(0) client_list.append(new_admin_ctrl) await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=new_admin_ctrl, existingNodeId=self.dut_node_id, newNodeId=self.dut_node_id) if num_controllers_per_fabric > 1: new_controllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(fabricAdmin=new_fabric_admin, adminDevCtrl=new_admin_ctrl, - controllerNodeIds=node_ids, privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=self.dut_node_id) + controllerNodeIds=node_ids, privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=self.dut_node_id, catTags=[0x0001_0001]) for controller in new_controllers: controller.name = all_names.pop(0) @@ -376,14 +376,7 @@ def build_acl(self, fabric_number, client_by_name, num_controllers_per_fabric): # - Targets field: [{Cluster: 0xFFF1_FC40, DeviceType: 0xFFF1_FC20}, {Cluster: 0xFFF1_FC41, DeviceType: 0xFFF1_FC21}, {Cluster: 0xFFF1_FC02, DeviceType: 0xFFF1_FC42}] # Administer ACL entry - admin_subjects = [0xFFFF_FFFD_0001_0001] - # TODO: Replace the below with [0x2000_0000_0000_0001, 0x2000_0000_0000_0002, 0x2000_0000_0000_0003] once controllers - # all have CAT tag 0001_0001 in their NOC - - # Find node ID of all controllers (up to 3) and make them admin - for subject_idx in range(min(num_controllers_per_fabric, 3)): - subject_name = "RD%d%s" % (fabric_number, chr(ord('A') + subject_idx)) - admin_subjects.append(client_by_name[subject_name].nodeId) + admin_subjects = [0xFFFF_FFFD_0001_0001, 0x2000_0000_0000_0001, 0x2000_0000_0000_0002, 0x2000_0000_0000_0003] admin_targets = [ Clusters.AccessControl.Structs.Target(endpoint=0), @@ -428,4 +421,4 @@ def build_acl(self, fabric_number, client_by_name, num_controllers_per_fabric): if __name__ == "__main__": - default_matter_test_main(maximize_cert_chains=True) + default_matter_test_main(maximize_cert_chains=True, controller_cat_tags=[0x0001_0001]) diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 3a1286b9c8929a..7c21f0b6471ce8 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -714,7 +714,7 @@ def default_matter_test_main(argv=None, **kwargs): # TODO: If CASE Admin Subject is a CAT tag range, then make sure to issue NOC with that CAT tag default_controller = stack.certificate_authorities[0].adminList[0].NewController(nodeId=matter_test_config.controller_node_id, - paaTrustStorePath=str(matter_test_config.paa_trust_store_path)) + paaTrustStorePath=str(matter_test_config.paa_trust_store_path), catTags=matter_test_config.controller_cat_tags) test_config.user_params["default_controller"] = stash_globally(default_controller) test_config.user_params["matter_test_config"] = stash_globally(matter_test_config)