Skip to content

Commit

Permalink
Fixed CAT tag testing
Browse files Browse the repository at this point in the history
  • Loading branch information
mrjerryjohns authored and tcarmelveilleux committed Aug 19, 2022
1 parent 47df515 commit 1376091
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/controller/python/chip/FabricAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def NewController(self, nodeId: int = None, paaTrustStorePath: str = "", useTest
raise RuntimeError(f"Provided NodeId {nodeId} collides with an existing controller instance!")

self.logger().warning(
f"Allocating new controller with CaIndex: {self._certificateAuthority.caIndex}, FabricId: 0x{self._fabricId:016X}, NodeId: 0x{nodeId:016X}")
f"Allocating new controller with CaIndex: {self._certificateAuthority.caIndex}, FabricId: 0x{self._fabricId:016X}, NodeId: 0x{nodeId:016X}, CatTags: {catTags}")

controller = ChipDeviceCtrl.ChipDeviceController(opCredsContext=self._certificateAuthority.GetOpCredsContext(), fabricId=self._fabricId, nodeId=nodeId,
adminVendorId=self._vendorId, paaTrustStorePath=paaTrustStorePath, useTestCommissioner=useTestCommissioner, fabricAdmin=self, catTags=catTags)
Expand Down
33 changes: 23 additions & 10 deletions src/controller/python/chip/utils/CommissioningBuildingBlocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

_UINT16_MAX = 65535

logger = logging.getLogger()
logger = logging.getLogger('CommissioningBuildingBlocks')


async def _IsNodeInFabricList(devCtrl, nodeId):
Expand All @@ -43,7 +43,7 @@ async def _IsNodeInFabricList(devCtrl, nodeId):
return False


async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDeviceController, privilege: Clusters.AccessControl.Enums.Privilege, targetNodeId: int):
async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDeviceController, privilege: Clusters.AccessControl.Enums.Privilege, targetNodeId: int, targetCatTags: typing.List[int] = []):
''' Given an existing controller with admin privileges over a target node, grants the specified privilege to the new ChipDeviceController instance to the entire Node. This is achieved
by updating the ACL entries on the target.
Expand All @@ -53,20 +53,29 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic
Args:
adminCtrl: ChipDeviceController instance with admin privileges over the target node
grantedCtrl: ChipDeviceController instance that is being granted the new privilege.
privilege: Privilege to grant to the granted controller
privilege: Privilege to grant to the granted controller. If None, no privilege is granted.
targetNodeId: Target node to which the controller is granted privilege.
targetCatTag: Target 32-bit CAT tag that is granted privilege. If provided, this will be used in the subject list instead of the nodeid of that of grantedCtrl.
'''

data = await adminCtrl.ReadAttribute(targetNodeId, [(Clusters.AccessControl.Attributes.Acl)])
if 0 not in data:
raise ValueError("Did not get back any data (possible cause: controller has no access..")

currentAcls = data[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl]

if len(targetCatTags) != 0:
# Convert to a full-qualified 32-bit Node Identifier
targetSubjects = [tag | 0xFFFF_FFFD_0000_0000 for tag in targetCatTags]
else:
targetSubjects = [grantedCtrl.nodeId]

if (len(targetSubjects) > 4):
raise ValueError(f"List of target subjects of len {len(targetSubjects)} exceeeded the minima of 4!")

# Step 1: Wipe the subject from all existing ACLs.
for acl in currentAcls:
if (acl.subjects != NullValue):
acl.subjects = [subject for subject in acl.subjects if subject != grantedCtrl.nodeId]
acl.subjects = [subject for subject in acl.subjects if subject not in targetSubjects]

if (privilege):
addedPrivilege = False
Expand All @@ -75,9 +84,11 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic
# the existing privilege in that entry matches our desired privilege.
for acl in currentAcls:
if acl.privilege == privilege:
if grantedCtrl.nodeId not in acl.subjects:
acl.subjects.append(grantedCtrl.nodeId)
subjectSet = set(acl.subjects)
subjectSet.update(targetSubjects)
acl.subjects = list(subjectSet)
addedPrivilege = True
break

# Step 3: If there isn't an existing entry to add to, make a new one.
if (not(addedPrivilege)):
Expand All @@ -86,10 +97,12 @@ async def GrantPrivilege(adminCtrl: ChipDeviceController, grantedCtrl: ChipDevic
f"Cannot add another ACL entry to grant privilege to existing count of {currentAcls} ACLs -- will exceed minimas!")

currentAcls.append(Clusters.AccessControl.Structs.AccessControlEntry(privilege=privilege, authMode=Clusters.AccessControl.Enums.AuthMode.kCase,
subjects=[grantedCtrl.nodeId]))
subjects=targetSubjects))

# Step 4: Prune ACLs which have empty subjects.
currentAcls = [acl for acl in currentAcls if acl.subjects != NullValue and len(acl.subjects) != 0]

logger.info(f'GrantPrivilege: Writing acls: {currentAcls}')
await adminCtrl.WriteAttribute(targetNodeId, [(0, Clusters.AccessControl.Attributes.Acl(currentAcls))])


Expand All @@ -102,14 +115,14 @@ async def CreateControllersOnFabric(fabricAdmin: FabricAdmin, adminDevCtrl: Chip
controllerNodeIds: List of desired nodeIds for the controllers.
privilege: The specific ACL privilege to grant to the newly minted controllers.
targetNodeId: The Node ID of the target.
catTags: CAT Tags to include in the NOC of controller
catTags: CAT Tags to include in the NOC of controller, as well as when setting up the ACLs on the target.
'''

controllerList = []

for nodeId in controllerNodeIds:
newController = fabricAdmin.NewController(nodeId=nodeId, catTags=catTags)
await GrantPrivilege(adminDevCtrl, newController, privilege, targetNodeId)
await GrantPrivilege(adminDevCtrl, newController, privilege, targetNodeId, catTags)
controllerList.append(newController)

return controllerList
Expand Down
22 changes: 10 additions & 12 deletions src/controller/python/test/test_scripts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import copy
import secrets
import faulthandler
import ipdb

logger = logging.getLogger('PythonMatterControllerTEST')
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -389,31 +390,28 @@ def TestFailsafe(self, nodeid: int):
async def TestControllerCATValues(self, nodeid: int):
''' This tests controllers using CAT Values
'''

# Allocate a new controller instance with a CAT tag.
newController = self.fabricAdmin.NewController(nodeId=300, catTags=[0x00010001])
newControllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(fabricAdmin=self.fabricAdmin, adminDevCtrl=self.devCtrl, controllerNodeIds=[300], targetNodeId=nodeid, privilege=None, catTags=[0x0001_0001])

# Read out an attribute using the new controller. It has no privileges, so this should fail with an UnsupportedAccess error.
res = await newController.ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
res = await newControllers[0].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
if(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl].Reason.status != IM.Status.UnsupportedAccess):
self.logger.error(f"1: Received data instead of an error:{res}")
return False

# Do a read-modify-write operation on the ACL list to add the CAT tag to the ACL list.
aclList = (await self.devCtrl.ReadAttribute(nodeid, [(0, Clusters.AccessControl.Attributes.Acl)]))[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl]
origAclList = copy.deepcopy(aclList)
aclList[0].subjects.append(0xFFFFFFFD00010001)
await self.devCtrl.WriteAttribute(nodeid, [(0, Clusters.AccessControl.Attributes.Acl(aclList))])
# Grant the new controller privilege by adding the CAT tag to the subject.
await CommissioningBuildingBlocks.GrantPrivilege(adminCtrl=self.devCtrl, grantedCtrl=newControllers[0], privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=nodeid, targetCatTags=[0x0001_0001])

# Read out the attribute again - this time, it should succeed.
res = await newController.ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
res = await newControllers[0].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
if (type(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][0]) != Clusters.AccessControl.Structs.AccessControlEntry):
self.logger.error(f"2: Received something other than data:{res}")
return False

# Write back the old entry to reset ACL list back.
await self.devCtrl.WriteAttribute(nodeid, [(0, Clusters.AccessControl.Attributes.Acl(origAclList))])
newController.Shutdown()
# Reset the privilege back to pre-test.
await CommissioningBuildingBlocks.GrantPrivilege(adminCtrl=self.devCtrl, grantedCtrl=newControllers[0], privilege=None, targetNodeId=nodeid)

newControllers[0].Shutdown()

return True

Expand Down
17 changes: 5 additions & 12 deletions src/python_testing/TC_RR_1_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async def test_TC_RR_1_1(self):
client_list.append(dev_ctrl)

if num_controllers_per_fabric > 1:
new_controllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(fabricAdmin=dev_ctrl.fabricAdmin, adminDevCtrl=dev_ctrl, controllerNodeIds=node_ids, privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=self.dut_node_id)
new_controllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(fabricAdmin=dev_ctrl.fabricAdmin, adminDevCtrl=dev_ctrl, controllerNodeIds=node_ids, privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=self.dut_node_id, catTags=[0x0001_0001])
for controller in new_controllers:
controller.name = all_names.pop(0)
client_list.extend(new_controllers)
Expand All @@ -126,14 +126,14 @@ async def test_TC_RR_1_1(self):
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=admin_index)

new_admin_ctrl = new_fabric_admin.NewController(nodeId=dev_ctrl.nodeId)
new_admin_ctrl = new_fabric_admin.NewController(nodeId=dev_ctrl.nodeId, catTags=[0x0001_0001])
new_admin_ctrl.name = all_names.pop(0)
client_list.append(new_admin_ctrl)
await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=new_admin_ctrl, existingNodeId=self.dut_node_id, newNodeId=self.dut_node_id)

if num_controllers_per_fabric > 1:
new_controllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(fabricAdmin=new_fabric_admin, adminDevCtrl=new_admin_ctrl,
controllerNodeIds=node_ids, privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=self.dut_node_id)
controllerNodeIds=node_ids, privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=self.dut_node_id, catTags=[0x0001_0001])
for controller in new_controllers:
controller.name = all_names.pop(0)

Expand Down Expand Up @@ -376,14 +376,7 @@ def build_acl(self, fabric_number, client_by_name, num_controllers_per_fabric):
# - Targets field: [{Cluster: 0xFFF1_FC40, DeviceType: 0xFFF1_FC20}, {Cluster: 0xFFF1_FC41, DeviceType: 0xFFF1_FC21}, {Cluster: 0xFFF1_FC02, DeviceType: 0xFFF1_FC42}]

# Administer ACL entry
admin_subjects = [0xFFFF_FFFD_0001_0001]
# TODO: Replace the below with [0x2000_0000_0000_0001, 0x2000_0000_0000_0002, 0x2000_0000_0000_0003] once controllers
# all have CAT tag 0001_0001 in their NOC

# Find node ID of all controllers (up to 3) and make them admin
for subject_idx in range(min(num_controllers_per_fabric, 3)):
subject_name = "RD%d%s" % (fabric_number, chr(ord('A') + subject_idx))
admin_subjects.append(client_by_name[subject_name].nodeId)
admin_subjects = [0xFFFF_FFFD_0001_0001, 0x2000_0000_0000_0001, 0x2000_0000_0000_0002, 0x2000_0000_0000_0003]

admin_targets = [
Clusters.AccessControl.Structs.Target(endpoint=0),
Expand Down Expand Up @@ -428,4 +421,4 @@ def build_acl(self, fabric_number, client_by_name, num_controllers_per_fabric):


if __name__ == "__main__":
default_matter_test_main(maximize_cert_chains=True)
default_matter_test_main(maximize_cert_chains=True, controller_cat_tags=[0x0001_0001])
2 changes: 1 addition & 1 deletion src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ def default_matter_test_main(argv=None, **kwargs):
# TODO: If CASE Admin Subject is a CAT tag range, then make sure to issue NOC with that CAT tag

default_controller = stack.certificate_authorities[0].adminList[0].NewController(nodeId=matter_test_config.controller_node_id,
paaTrustStorePath=str(matter_test_config.paa_trust_store_path))
paaTrustStorePath=str(matter_test_config.paa_trust_store_path), catTags=matter_test_config.controller_cat_tags)
test_config.user_params["default_controller"] = stash_globally(default_controller)

test_config.user_params["matter_test_config"] = stash_globally(matter_test_config)
Expand Down

0 comments on commit 1376091

Please sign in to comment.