diff --git a/src/python_testing/TC_IDM_4_2.py b/src/python_testing/TC_IDM_4_2.py index 78a0537a8eeb8f..636180037d4cbc 100644 --- a/src/python_testing/TC_IDM_4_2.py +++ b/src/python_testing/TC_IDM_4_2.py @@ -15,8 +15,8 @@ # limitations under the License. # -import time; -import random; +import time +import random import logging import chip.clusters as Clusters from mobly import asserts @@ -29,45 +29,45 @@ class TC_IDM_4_2(MatterBaseTest): - + async def write_acl(self, acl): result = await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl(acl))]) asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed") print(result) - + async def get_descriptor_server_list(self, ctrl): return await self.read_single_attribute_check_success( - endpoint=0, - dev_ctrl=ctrl, - cluster=Clusters.Descriptor, + endpoint=0, + dev_ctrl=ctrl, + cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.ServerList ) - + async def get_idle_mode_duration(self, ctrl): return await self.read_single_attribute_check_success( - endpoint=0, - dev_ctrl=ctrl, - cluster=Clusters.IcdManagement, + endpoint=0, + dev_ctrl=ctrl, + cluster=Clusters.IcdManagement, attribute=Clusters.IcdManagement.Attributes.IdleModeDuration ) - - def verify_attribute_data(self, sub, cluster, attribute, ep = 0): + + def verify_attribute_data(self, sub, cluster, attribute, ep=0): sub_attrs = sub if isinstance(sub, Clusters.Attribute.SubscriptionTransaction): sub_attrs = sub.GetAttributes() - + asserts.assert_true(ep in sub_attrs, "Must have read endpoint %s data" % ep) asserts.assert_true(cluster in sub_attrs[0], "Must have read %s cluster data" % cluster.__name__) - asserts.assert_true(attribute in sub_attrs[0][cluster], + asserts.assert_true(attribute in sub_attrs[0][cluster], "Must have read back attribute %s" % attribute.__name__) - - def get_attribute_from_sub_dict(self, sub, cluster, attribute, ep = 0): + + def get_attribute_from_sub_dict(self, sub, cluster, attribute, ep=0): return sub[ep][cluster][attribute] - def get_typed_attribute_path(self, attribute, ep = 0): + def get_typed_attribute_path(self, attribute, ep=0): return TypedAttributePath( Path=AttributePath( - EndpointId=ep, + EndpointId=ep, Attribute=attribute ) ) @@ -82,16 +82,16 @@ async def test_TC_IDM_4_2(self): SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT = 0 node_label_attr = Clusters.BasicInformation.Attributes.NodeLabel node_label_attr_path = [(0, node_label_attr)] - + # Read ServerList attribute self.print_step("0a", "CR1 reads the Descriptor cluster ServerList attribute from EP0") ep0_servers = await self.get_descriptor_server_list(CR1) - + # Check if ep0_servers contains the ICD Management cluster ID (0x0046) if Clusters.IcdManagement.id in ep0_servers: # Read the IdleModeDuration attribute value from the DUT logging.info("CR1 reads from the DUT the IdleModeDuration attribute and sets SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT = IdleModeDuration") - + idleModeDuration = await self.get_idle_mode_duration(CR1) SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT = idleModeDuration @@ -100,7 +100,7 @@ async def test_TC_IDM_4_2(self): # Defaulting SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT to 60 minutes logging.info("Set SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT = 60 mins") SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT = 3600 - + ''' ########## Step 1 @@ -109,7 +109,7 @@ async def test_TC_IDM_4_2(self): self.print_step(1, "CR1 sends a subscription message to the DUT with MaxIntervalCeiling set to a value greater than SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT. DUT sends a report data action to the TH. CR1 sends a success status response to the DUT. DUT sends a Subscribe Response Message to the CR1 to activate the subscription.") min_interval_floor_sec = SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT - 60 max_interval_ceiling_sec = SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT + 60 - + # Subscribe to attribute sub_cr1_step1 = await CR1.ReadAttribute( nodeid=self.dut_node_id, @@ -117,17 +117,17 @@ async def test_TC_IDM_4_2(self): reportInterval=(min_interval_floor_sec, max_interval_ceiling_sec), keepSubscriptions=False ) - + # Verify attribute data came back self.verify_attribute_data( sub=sub_cr1_step1, cluster=Clusters.BasicInformation, attribute=node_label_attr ) - + # Verify subscriptionId is of uint32 type asserts.assert_true(self.is_uint32(sub_cr1_step1.subscriptionId), "subscriptionId is not of uint32 type.") - + # Verify MaxInterval is of uint32 type sub_cr1_step1_intervals = sub_cr1_step1.GetReportingIntervalsSeconds() sub_cr1_step1_min_interval_floor_sec, sub_cr1_step1_max_interval_ceiling_sec = sub_cr1_step1_intervals @@ -146,7 +146,7 @@ async def test_TC_IDM_4_2(self): self.print_step(2, "CR1 sends a subscription message to the DUT with MaxIntervalCeiling set to a value less than SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT. DUT sends a report data action to the CR1. CR1 sends a success status response to the DUT. DUT sends a Subscribe Response Message to the CR1 to activate the subscription.") min_interval_floor_sec = SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT - 60 max_interval_ceiling_sec = SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT - 30 - + # Subscribe to attribute sub_cr1_step2 = await CR1.ReadAttribute( nodeid=self.dut_node_id, @@ -154,7 +154,7 @@ async def test_TC_IDM_4_2(self): reportInterval=(min_interval_floor_sec, max_interval_ceiling_sec), keepSubscriptions=False ) - + # Verify attribute data came back self.verify_attribute_data( sub=sub_cr1_step2, @@ -164,7 +164,7 @@ async def test_TC_IDM_4_2(self): # Verify subscriptionId is of uint32 type asserts.assert_true(self.is_uint32(sub_cr1_step2.subscriptionId), "subscriptionId is not of uint32 type.") - + # Verify MaxInterval is of uint32 type sub_cr1_step2_intervals = sub_cr1_step2.GetReportingIntervalsSeconds() sub_cr1_step2_min_interval_floor_sec, sub_cr1_step2_max_interval_ceiling_sec = sub_cr1_step2_intervals @@ -174,14 +174,14 @@ async def test_TC_IDM_4_2(self): asserts.assert_true(sub_cr1_step2_max_interval_ceiling_sec <= max_interval_ceiling_sec, "MaxInterval is not less than or equal to MaxIntervalCeiling") sub_cr1_step2.Shutdown() - + ''' ########## Step 3 - 6 ########## ''' # # TODO: How to trigger desired Status responses - + # # Controller 2 Setup # fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0] # CR2_nodeid = self.matter_test_config.controller_node_id + 1 @@ -206,7 +206,7 @@ async def test_TC_IDM_4_2(self): # attribute=attribute, # error=Status.InvalidAction # ) - + ''' ########## Step 7 @@ -220,7 +220,7 @@ async def test_TC_IDM_4_2(self): attributes=node_label_attr_path, keepSubscriptions=False ) - + # Verify DataVersion attribute data came back self.verify_attribute_data( sub=sub_cr1_empty_dvf, @@ -230,12 +230,12 @@ async def test_TC_IDM_4_2(self): # Get DataVersion data_version = self.get_attribute_from_sub_dict( - sub=sub_cr1_empty_dvf, + sub=sub_cr1_empty_dvf, cluster=Clusters.BasicInformation, attribute=Clusters.Attribute.DataVersion ) data_version_filter = [(0, Clusters.BasicInformation, data_version)] - + # Subscribe to attribute with provided DataVersion sub_cr1_provided_dvf = await CR1.ReadAttribute( nodeid=self.dut_node_id, @@ -244,12 +244,12 @@ async def test_TC_IDM_4_2(self): keepSubscriptions=False, dataVersionFilters=data_version_filter ) - + # Verify that the subscription is activated between CR1 and DUT asserts.assert_true(sub_cr1_provided_dvf.subscriptionId, "Subscription not activated") - + sub_cr1_provided_dvf.Shutdown() - + ''' ########## Step 8 @@ -257,7 +257,7 @@ async def test_TC_IDM_4_2(self): ''' self.print_step(8, "CR1 sends a subscription request action for an attribute and sets the MinIntervalFloor value to be same as MaxIntervalCeiling. Activate the Subscription between CR1 and DUT. Modify the attribute which has been subscribed to on the DUT.") min_max_interval_sec = 3 - + # Subscribe to attribute sub_cr1_update_value = await CR1.ReadAttribute( nodeid=self.dut_node_id, @@ -269,7 +269,7 @@ async def test_TC_IDM_4_2(self): # Modify attribute value new_node_label_write = "NewNodeLabel_" + str(random.randint(1000, 9999)) await CR1.WriteAttribute( - self.dut_node_id, + self.dut_node_id, [(0, node_label_attr(value=new_node_label_write))] ) @@ -282,14 +282,14 @@ async def test_TC_IDM_4_2(self): asserts.assert_equal(new_node_label_read, new_node_label_write, "Attribute value not updated after write operation.") sub_cr1_update_value.Shutdown() - + ''' ########## Step 9 ########## ''' self.print_step(9, "CR1 sends a subscription request action for an attribute and set the MinIntervalFloor value to be greater than MaxIntervalCeiling.") - + # Subscribe to attribute with invalid reportInterval arguments, expect and exception sub_cr1_invalid_intervals = None try: @@ -299,7 +299,7 @@ async def test_TC_IDM_4_2(self): reportInterval=(20, 10), keepSubscriptions=False ) - except ChipStackError as e: + except ChipStackError: # Verify no subscription is established with asserts.assert_raises(AttributeError): sub_cr1_invalid_intervals.subscriptionId @@ -316,18 +316,14 @@ async def test_TC_IDM_4_2(self): # Clusters.BasicInformation.Attributes.NodeLabel # ] # read_paths = [(0, attrib) for attrib in read_contents] - + # # Subscribe to global attribute # sub_cr1_invalid_intervals = await CR1.ReadAttribute( # nodeid=self.dut_node_id, # attributes=read_paths, # reportInterval=(10, 10), # keepSubscriptions=False, - # ) - - - - logging.info("debux - test_TC_IDM_4_2 end") + # ) if __name__ == "__main__": default_matter_test_main()