diff --git a/src/python_testing/TC_RR_1_1.py b/src/python_testing/TC_RR_1_1.py index 026220c9ceb9ba..1f691b8e925ab9 100644 --- a/src/python_testing/TC_RR_1_1.py +++ b/src/python_testing/TC_RR_1_1.py @@ -75,12 +75,19 @@ async def test_TC_RR_1_1(self): skip_user_label_cluster_steps = self.user_params.get("skip_user_label_cluster_steps", False) # Whether to do the local session ID comparison checks to prove new sessions have not been established. check_local_session_id_unchanged = self.user_params.get("check_local_session_id_unchanged", False) + # Whether to check heap statistics. Add `--bool-arg check_heap_watermarks:true` to command line to enable + check_heap_watermarks = self.user_params.get("check_heap_watermarks", False) BEFORE_LABEL = "Before Subscriptions 12345678912" AFTER_LABEL = "After Subscriptions 123456789123" # Pre-conditions + # Do a read-out of heap statistics before the test begins + if check_heap_watermarks: + logging.info("Read Heap info before stress test") + high_watermark_before, current_usage_before = await self.read_heap_statistics(dev_ctrl) + # Make sure all certificates are installed with maximal size dev_ctrl.fabricAdmin.certificateAuthority.maximizeCertChains = True @@ -406,6 +413,14 @@ async def test_TC_RR_1_1(self): num_fabrics_to_commission, fabric_unique_clients, group_key_map, groups_cluster_endpoints, indicated_max_groups_per_fabric) await self.validate_group_table(num_fabrics_to_commission, fabric_unique_clients, group_table_written) + # Read heap watermarks after the test + if check_heap_watermarks: + logging.info("Read Heap info after stress test") + high_watermark_after, current_usage_after = await self.read_heap_statistics(dev_ctrl) + logging.info("=== Heap Usage Diagnostics ===\nHigh watermark: {} (before) / {} (after)\n" + "Current usage: {} (before) / {} (after)".format(high_watermark_before, high_watermark_after, + current_usage_before, current_usage_after)) + def random_string(self, length) -> str: rnd = self._pseudo_random_generator return "".join([rnd.choice("abcdef0123456789") for _ in range(length)])[:length] @@ -619,7 +634,7 @@ def build_acl(self, fabric_number, client_by_name, num_controllers_per_fabric): # - Subjects field: [0x3000_0000_0000_0001, 0x3000_0000_0000_0002, 0x3000_0000_0000_0003, 0x3000_0000_0000_0004] # - Targets field: [{Cluster: 0xFFF1_FC40, DeviceType: 0xFFF1_FC20}, {Cluster: 0xFFF1_FC41, DeviceType: 0xFFF1_FC21}, {Cluster: 0xFFF1_FC02, DeviceType: 0xFFF1_FC42}] # . struct - # - Privilege field: View (3) + # - Privilege field: View (1) # - AuthMode field: CASE (2) # - Subjects field: [0x4000_0000_0000_0001, 0x4000_0000_0000_0002, 0x4000_0000_0000_0003, 0x4000_0000_0000_0004] # - Targets field: [{Cluster: 0xFFF1_FC80, DeviceType: 0xFFF1_FC20}, {Cluster: 0xFFF1_FC81, DeviceType: 0xFFF1_FC21}, {Cluster: 0xFFF1_FC82, DeviceType: 0xFFF1_FC22}] @@ -666,7 +681,7 @@ def build_acl(self, fabric_number, client_by_name, num_controllers_per_fabric): targets=operate_targets) acl.append(operate_acl_entry) - # Operate ACL entry + # View ACL entry view_subjects = [0x4000_0000_0000_0001, 0x4000_0000_0000_0002, 0x4000_0000_0000_0003, 0x4000_0000_0000_0004] view_targets = [ Clusters.AccessControl.Structs.Target(cluster=0xFFF1_FC80, deviceType=0xFFF1_BC20), @@ -701,6 +716,25 @@ def build_group_key(self, fabric_index: int, group_key_index: int, keys_per_fabr epochKey2=self.random_string(16).encode(), epochStartTime2=(set_id * 4 + 2)) + async def read_heap_statistics(self, dev_ctrl): + diagnostics_contents = [ + Clusters.SoftwareDiagnostics.Attributes.CurrentHeapHighWatermark, + Clusters.SoftwareDiagnostics.Attributes.CurrentHeapUsed, + ] + diagnostics_paths = [(0, attrib) for attrib in diagnostics_contents] + swdiag_info = await dev_ctrl.ReadAttribute(self.dut_node_id, diagnostics_paths) + + # Make sure everything came back from the read that we expected + asserts.assert_true(0 in swdiag_info.keys(), "Must have read endpoint 0 data") + asserts.assert_true(Clusters.SoftwareDiagnostics in swdiag_info[0].keys( + ), "Must have read Software Diagnostics cluster data") + for attribute in diagnostics_contents: + asserts.assert_true(attribute in swdiag_info[0][Clusters.SoftwareDiagnostics], + "Must have read back attribute %s" % (attribute.__name__)) + high_watermark = swdiag_info[0][Clusters.SoftwareDiagnostics][Clusters.SoftwareDiagnostics.Attributes.CurrentHeapHighWatermark] + current_usage = swdiag_info[0][Clusters.SoftwareDiagnostics][Clusters.SoftwareDiagnostics.Attributes.CurrentHeapUsed] + return high_watermark, current_usage + if __name__ == "__main__": default_matter_test_main(maximize_cert_chains=True, controller_cat_tags=[0x0001_0001])