Skip to content

Commit

Permalink
Fix linting
Browse files Browse the repository at this point in the history
  • Loading branch information
raul-marquez-csa committed Jan 18, 2024
1 parent 0faca3a commit 2774322
Showing 1 changed file with 46 additions and 50 deletions.
96 changes: 46 additions & 50 deletions src/python_testing/TC_IDM_4_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# limitations under the License.
#

import time;
import random;
import time
import random
import logging
import chip.clusters as Clusters
from mobly import asserts
Expand All @@ -29,45 +29,45 @@


class TC_IDM_4_2(MatterBaseTest):

async def write_acl(self, acl):
result = await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl(acl))])
asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed")
print(result)

async def get_descriptor_server_list(self, ctrl):
return await self.read_single_attribute_check_success(
endpoint=0,
dev_ctrl=ctrl,
cluster=Clusters.Descriptor,
endpoint=0,
dev_ctrl=ctrl,
cluster=Clusters.Descriptor,
attribute=Clusters.Descriptor.Attributes.ServerList
)

async def get_idle_mode_duration(self, ctrl):
return await self.read_single_attribute_check_success(
endpoint=0,
dev_ctrl=ctrl,
cluster=Clusters.IcdManagement,
endpoint=0,
dev_ctrl=ctrl,
cluster=Clusters.IcdManagement,
attribute=Clusters.IcdManagement.Attributes.IdleModeDuration
)
def verify_attribute_data(self, sub, cluster, attribute, ep = 0):

def verify_attribute_data(self, sub, cluster, attribute, ep=0):
sub_attrs = sub
if isinstance(sub, Clusters.Attribute.SubscriptionTransaction):
sub_attrs = sub.GetAttributes()

asserts.assert_true(ep in sub_attrs, "Must have read endpoint %s data" % ep)
asserts.assert_true(cluster in sub_attrs[0], "Must have read %s cluster data" % cluster.__name__)
asserts.assert_true(attribute in sub_attrs[0][cluster],
asserts.assert_true(attribute in sub_attrs[0][cluster],
"Must have read back attribute %s" % attribute.__name__)
def get_attribute_from_sub_dict(self, sub, cluster, attribute, ep = 0):

def get_attribute_from_sub_dict(self, sub, cluster, attribute, ep=0):
return sub[ep][cluster][attribute]

def get_typed_attribute_path(self, attribute, ep = 0):
def get_typed_attribute_path(self, attribute, ep=0):
return TypedAttributePath(
Path=AttributePath(
EndpointId=ep,
EndpointId=ep,
Attribute=attribute
)
)
Expand All @@ -82,16 +82,16 @@ async def test_TC_IDM_4_2(self):
SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT = 0
node_label_attr = Clusters.BasicInformation.Attributes.NodeLabel
node_label_attr_path = [(0, node_label_attr)]

# Read ServerList attribute
self.print_step("0a", "CR1 reads the Descriptor cluster ServerList attribute from EP0")
ep0_servers = await self.get_descriptor_server_list(CR1)

# Check if ep0_servers contains the ICD Management cluster ID (0x0046)
if Clusters.IcdManagement.id in ep0_servers:
# Read the IdleModeDuration attribute value from the DUT
logging.info("CR1 reads from the DUT the IdleModeDuration attribute and sets SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT = IdleModeDuration")

idleModeDuration = await self.get_idle_mode_duration(CR1)

SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT = idleModeDuration
Expand All @@ -100,7 +100,7 @@ async def test_TC_IDM_4_2(self):
# Defaulting SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT to 60 minutes
logging.info("Set SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT = 60 mins")
SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT = 3600

'''
##########
Step 1
Expand All @@ -109,25 +109,25 @@ async def test_TC_IDM_4_2(self):
self.print_step(1, "CR1 sends a subscription message to the DUT with MaxIntervalCeiling set to a value greater than SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT. DUT sends a report data action to the TH. CR1 sends a success status response to the DUT. DUT sends a Subscribe Response Message to the CR1 to activate the subscription.")
min_interval_floor_sec = SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT - 60
max_interval_ceiling_sec = SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT + 60

# Subscribe to attribute
sub_cr1_step1 = await CR1.ReadAttribute(
nodeid=self.dut_node_id,
attributes=node_label_attr_path,
reportInterval=(min_interval_floor_sec, max_interval_ceiling_sec),
keepSubscriptions=False
)

# Verify attribute data came back
self.verify_attribute_data(
sub=sub_cr1_step1,
cluster=Clusters.BasicInformation,
attribute=node_label_attr
)

# Verify subscriptionId is of uint32 type
asserts.assert_true(self.is_uint32(sub_cr1_step1.subscriptionId), "subscriptionId is not of uint32 type.")

# Verify MaxInterval is of uint32 type
sub_cr1_step1_intervals = sub_cr1_step1.GetReportingIntervalsSeconds()
sub_cr1_step1_min_interval_floor_sec, sub_cr1_step1_max_interval_ceiling_sec = sub_cr1_step1_intervals
Expand All @@ -146,15 +146,15 @@ async def test_TC_IDM_4_2(self):
self.print_step(2, "CR1 sends a subscription message to the DUT with MaxIntervalCeiling set to a value less than SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT. DUT sends a report data action to the CR1. CR1 sends a success status response to the DUT. DUT sends a Subscribe Response Message to the CR1 to activate the subscription.")
min_interval_floor_sec = SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT - 60
max_interval_ceiling_sec = SUBSCRIPTION_MAX_INTERVAL_PUBLISHER_LIMIT - 30

# Subscribe to attribute
sub_cr1_step2 = await CR1.ReadAttribute(
nodeid=self.dut_node_id,
attributes=node_label_attr_path,
reportInterval=(min_interval_floor_sec, max_interval_ceiling_sec),
keepSubscriptions=False
)

# Verify attribute data came back
self.verify_attribute_data(
sub=sub_cr1_step2,
Expand All @@ -164,7 +164,7 @@ async def test_TC_IDM_4_2(self):

# Verify subscriptionId is of uint32 type
asserts.assert_true(self.is_uint32(sub_cr1_step2.subscriptionId), "subscriptionId is not of uint32 type.")

# Verify MaxInterval is of uint32 type
sub_cr1_step2_intervals = sub_cr1_step2.GetReportingIntervalsSeconds()
sub_cr1_step2_min_interval_floor_sec, sub_cr1_step2_max_interval_ceiling_sec = sub_cr1_step2_intervals
Expand All @@ -174,14 +174,14 @@ async def test_TC_IDM_4_2(self):
asserts.assert_true(sub_cr1_step2_max_interval_ceiling_sec <= max_interval_ceiling_sec, "MaxInterval is not less than or equal to MaxIntervalCeiling")

sub_cr1_step2.Shutdown()

'''
##########
Step 3 - 6
##########
'''
# # TODO: How to trigger desired Status responses

# # Controller 2 Setup
# fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0]
# CR2_nodeid = self.matter_test_config.controller_node_id + 1
Expand All @@ -206,7 +206,7 @@ async def test_TC_IDM_4_2(self):
# attribute=attribute,
# error=Status.InvalidAction
# )

'''
##########
Step 7
Expand All @@ -220,7 +220,7 @@ async def test_TC_IDM_4_2(self):
attributes=node_label_attr_path,
keepSubscriptions=False
)

# Verify DataVersion attribute data came back
self.verify_attribute_data(
sub=sub_cr1_empty_dvf,
Expand All @@ -230,12 +230,12 @@ async def test_TC_IDM_4_2(self):

# Get DataVersion
data_version = self.get_attribute_from_sub_dict(
sub=sub_cr1_empty_dvf,
sub=sub_cr1_empty_dvf,
cluster=Clusters.BasicInformation,
attribute=Clusters.Attribute.DataVersion
)
data_version_filter = [(0, Clusters.BasicInformation, data_version)]

# Subscribe to attribute with provided DataVersion
sub_cr1_provided_dvf = await CR1.ReadAttribute(
nodeid=self.dut_node_id,
Expand All @@ -244,20 +244,20 @@ async def test_TC_IDM_4_2(self):
keepSubscriptions=False,
dataVersionFilters=data_version_filter
)

# Verify that the subscription is activated between CR1 and DUT
asserts.assert_true(sub_cr1_provided_dvf.subscriptionId, "Subscription not activated")

sub_cr1_provided_dvf.Shutdown()

'''
##########
Step 8
##########
'''
self.print_step(8, "CR1 sends a subscription request action for an attribute and sets the MinIntervalFloor value to be same as MaxIntervalCeiling. Activate the Subscription between CR1 and DUT. Modify the attribute which has been subscribed to on the DUT.")
min_max_interval_sec = 3

# Subscribe to attribute
sub_cr1_update_value = await CR1.ReadAttribute(
nodeid=self.dut_node_id,
Expand All @@ -269,7 +269,7 @@ async def test_TC_IDM_4_2(self):
# Modify attribute value
new_node_label_write = "NewNodeLabel_" + str(random.randint(1000, 9999))
await CR1.WriteAttribute(
self.dut_node_id,
self.dut_node_id,
[(0, node_label_attr(value=new_node_label_write))]
)

Expand All @@ -282,14 +282,14 @@ async def test_TC_IDM_4_2(self):
asserts.assert_equal(new_node_label_read, new_node_label_write, "Attribute value not updated after write operation.")

sub_cr1_update_value.Shutdown()

'''
##########
Step 9
##########
'''
self.print_step(9, "CR1 sends a subscription request action for an attribute and set the MinIntervalFloor value to be greater than MaxIntervalCeiling.")

# Subscribe to attribute with invalid reportInterval arguments, expect and exception
sub_cr1_invalid_intervals = None
try:
Expand All @@ -299,7 +299,7 @@ async def test_TC_IDM_4_2(self):
reportInterval=(20, 10),
keepSubscriptions=False
)
except ChipStackError as e:
except ChipStackError:
# Verify no subscription is established
with asserts.assert_raises(AttributeError):
sub_cr1_invalid_intervals.subscriptionId
Expand All @@ -316,18 +316,14 @@ async def test_TC_IDM_4_2(self):
# Clusters.BasicInformation.Attributes.NodeLabel
# ]
# read_paths = [(0, attrib) for attrib in read_contents]

# # Subscribe to global attribute
# sub_cr1_invalid_intervals = await CR1.ReadAttribute(
# nodeid=self.dut_node_id,
# attributes=read_paths,
# reportInterval=(10, 10),
# keepSubscriptions=False,
# )



logging.info("debux - test_TC_IDM_4_2 end")
# )

if __name__ == "__main__":
default_matter_test_main()

0 comments on commit 2774322

Please sign in to comment.