Skip to content

Commit

Permalink
Restyled by autopep8
Browse files Browse the repository at this point in the history
  • Loading branch information
restyled-commits authored and plauric committed Jul 31, 2024
1 parent 6ec55f1 commit fee9906
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 79 deletions.
88 changes: 44 additions & 44 deletions src/python_testing/TC_SEAR_1_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ def __init__(self, *args):
self.app_pipe = "/tmp/chip_rvc_fifo_"
self.mapid_list = []

#this must be kept in sync with the definitions from the Common Landmark Semantic Tag Namespace
# this must be kept in sync with the definitions from the Common Landmark Semantic Tag Namespace
self.MAX_LANDMARK_ID = 0x33

#this must be kept in sync with the definitions from the Common Relative Position Semantic Tag Namespace
# this must be kept in sync with the definitions from the Common Relative Position Semantic Tag Namespace
self.MAX_RELPOS_ID = 0x07


async def read_sear_attribute_expect_success(self, endpoint, attribute):
cluster = Clusters.Objects.ServiceArea
return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute)
Expand All @@ -62,15 +61,15 @@ async def read_and_validate_supported_maps(self, step):
endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.SupportedMaps)
logging.info("SupportedMaps: %s" % (supported_maps))
asserts.assert_less_equal(len(supported_maps), 255,
"SupportedMaps should have max 255 entries")
"SupportedMaps should have max 255 entries")

mapid_list = [m.mapID for m in supported_maps]
asserts.assert_true(len(set(mapid_list)) == len(mapid_list), "SupportedMaps must have unique MapID values!")

name_list = [m.name for m in supported_maps]
asserts.assert_true(len(set(name_list)) == len(name_list), "SupportedMaps must have unique Name values!")

#save so other methods can use this if neeeded
# save so other methods can use this if neeeded
self.mapid_list = mapid_list

async def read_and_validate_supported_areas(self, step):
Expand All @@ -79,38 +78,40 @@ async def read_and_validate_supported_areas(self, step):
endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.SupportedAreas)
logging.info("SupportedAreas: %s" % (supported_areas))
asserts.assert_less_equal(len(supported_areas), 255,
"SupportedAreas should have max 255 entries")
"SupportedAreas should have max 255 entries")
areaid_list = []
areadesc_s = set()
for a in supported_areas:
asserts.assert_true(a.areaID not in areaid_list, "SupportedAreas must have unique AreaID values!")

areaid_list.append(a.areaID)

if len(self.mapid_list) > 0:
asserts.assert_is_not(a.mapID, NullValue,
f"SupportedAreas entry with AreaID({a.areaID}) should not have null MapID")
f"SupportedAreas entry with AreaID({a.areaID}) should not have null MapID")
asserts.assert_is(a.mapID in self.mapid_list,
f"SupportedAreas entry with AreaID({a.areaID}) has unknown MapID({a.mapID})")
f"SupportedAreas entry with AreaID({a.areaID}) has unknown MapID({a.mapID})")
k = f"mapID:{a.mapID} areaDesc:{a.areaDesc}"
asserts.assert_true(k not in areadesc_s, f"SupportedAreas must have unique MapID({a.mapID}) + AreaDesc({a.areaDesc}) values!")
asserts.assert_true(k not in areadesc_s,
f"SupportedAreas must have unique MapID({a.mapID}) + AreaDesc({a.areaDesc}) values!")
areadesc_s.add(k)
else:
#empty SupportedMaps
# empty SupportedMaps
asserts.assert_is(a.mapID, NullValue,
f"SupportedAreas entry with AreaID({a.areaID}) should have null MapID")
f"SupportedAreas entry with AreaID({a.areaID}) should have null MapID")
k = f"areaDesc:{a.areaDesc}"
asserts.assert_true(k not in areadesc_s, f"SupportedAreas must have unique AreaDesc({a.areaDesc}) values!")
areadesc_s.add(k)

if a.locationInfo is NullValue and a.landmarkTag is NullValue:
asserts.assert_true(f"SupportedAreas entry with AreaID({a.areaID}) should not have null LocationInfo and null LandmarkTag")
asserts.assert_true(
f"SupportedAreas entry with AreaID({a.areaID}) should not have null LocationInfo and null LandmarkTag")
if a.landmarkTag is not NullValue:
asserts.assert_true(a.landmarkTag <= self.MAX_LANDMARK_ID,
f"SupportedAreas entry with AreaID({a.areaID}) has invalid LandmarkTag({a.landmarkTag})")
f"SupportedAreas entry with AreaID({a.areaID}) has invalid LandmarkTag({a.landmarkTag})")
asserts.assert_true(a.positionTag is NullValue or a.positionTag in range(0, self.MAX_RELPOS_ID),
f"SupportedAreas entry with AreaID({a.areaID}) has invalid PositionTag({a.positionTag})")
#save so other methods can use this if neeeded
f"SupportedAreas entry with AreaID({a.areaID}) has invalid PositionTag({a.positionTag})")
# save so other methods can use this if neeeded
self.areaid_list = areaid_list

async def read_and_validate_selected_areas(self, step):
Expand All @@ -119,17 +120,17 @@ async def read_and_validate_selected_areas(self, step):
endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.SelectedAreas)
logging.info(f"SelectedAreas {selected_areas}")

#TODO how to check if all entries are uint32?
# TODO how to check if all entries are uint32?

asserts.assert_true(len(selected_areas) <= len(self.areaid_list),
f"SelectedAreas(len {len(selected_areas)}) should have at most {len(self.areaid_list)} entries")
f"SelectedAreas(len {len(selected_areas)}) should have at most {len(self.areaid_list)} entries")

asserts.assert_true(len(set(selected_areas)) == len(selected_areas), "SelectedAreas must have unique AreaID values!")
for a in selected_areas:

for a in selected_areas:
asserts.assert_true(a in self.areaid_list,
f"SelectedAreas entry {a} has invalid value")
#save so other methods can use this if neeeded
f"SelectedAreas entry {a} has invalid value")
# save so other methods can use this if neeeded
self.selareaid_list = selected_areas

async def read_and_validate_current_area(self, step):
Expand All @@ -139,12 +140,12 @@ async def read_and_validate_current_area(self, step):
logging.info(f"CurrentArea {current_area}")

asserts.assert_true((len(self.selareaid_list) == 0 and current_area is NullValue)
or
current_area in self.selareaid_list,
f"CurrentArea {current_area} is invalid. SelectedAreas is {self.selareaid_list}.")
#save so other methods can use this if neeeded
or
current_area in self.selareaid_list,
f"CurrentArea {current_area} is invalid. SelectedAreas is {self.selareaid_list}.")
# save so other methods can use this if neeeded
self.current_area = current_area

async def read_and_validate_estimated_end_time(self, step):
import time
read_time = int(time.time())
Expand All @@ -155,21 +156,20 @@ async def read_and_validate_estimated_end_time(self, step):

if self.current_area is NullValue:
asserts.assert_true(estimated_end_time is NullValue,
"EstimatedEndTime should be null if CurrentArea is null.")
"EstimatedEndTime should be null if CurrentArea is null.")
else:
#allow for some clock skew
# allow for some clock skew
asserts.assert_true(estimated_end_time >= read_time - 3*60,
f"EstimatedEndTime({estimated_end_time}) should be greater than the time when it was read({read_time})")
f"EstimatedEndTime({estimated_end_time}) should be greater than the time when it was read({read_time})")


async def read_and_validate_progress(self, step):
self.print_step(step, "Read Progress attribute")
progress = await self.read_sear_attribute_expect_success(
endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.Progress)
logging.info(f"Progress {progress}")

asserts.assert_true(len(progress) <= len(self.areaid_list),
f"Progress(len {len(progress)}) should have at most {len(self.areaid_list)} entries")
f"Progress(len {len(progress)}) should have at most {len(self.areaid_list)} entries")

progareaid_list = []
for p in progress:
Expand All @@ -178,16 +178,16 @@ async def read_and_validate_progress(self, step):
else:
progareaid_list.append(p.areaID)
asserts.assert_true(p.areaID in self.areaid_list,
f"Progress entry has invalid AreaID value ({p.areaID})")
f"Progress entry has invalid AreaID value ({p.areaID})")
asserts.assert_true(p.status in (Clusters.ServiceArea.OperationalStatusEnum.kPending,
Clusters.ServiceArea.OperationalStatusEnum.kOperating,
Clusters.ServiceArea.OperationalStatusEnum.kSkipped,
Clusters.ServiceArea.OperationalStatusEnum.kCompleted),
f"Progress entry has invalid Status value ({p.status})")
f"Progress entry has invalid Status value ({p.status})")
if p.status not in (Clusters.ServiceArea.OperationalStatusEnum.kSkipped, Clusters.ServiceArea.OperationalStatusEnum.kCompleted):
asserts.assert_true(p.totalOperationalTime is NullValue,
f"Progress entry should have a null TotalOperationalTime value (Status is {p.status})")
#TODO how to check that InitialTimeEstimate is either null or uint32?
asserts.assert_true(p.totalOperationalTime is NullValue,
f"Progress entry should have a null TotalOperationalTime value (Status is {p.status})")
# TODO how to check that InitialTimeEstimate is either null or uint32?

# Sends and out-of-band command to the rvc-app
def write_to_app_pipe(self, command):
Expand Down Expand Up @@ -251,7 +251,7 @@ async def test_TC_SEAR_1_2(self):
new_supported_maps = self.mapid_list
asserts.assert_true(len(old_supported_maps) > len(new_supported_maps), "Failed to remove map(s)")

#NOTE the following operations are all part of step 11 - read all these attributes and check the data consistency
# NOTE the following operations are all part of step 11 - read all these attributes and check the data consistency
# after removing map(s)
await self.read_and_validate_supported_areas(step=11)

Expand Down Expand Up @@ -284,7 +284,7 @@ async def test_TC_SEAR_1_2(self):
new_supported_maps = self.mapid_list
asserts.assert_true(len(old_supported_maps) < len(new_supported_maps), "Failed to add map(s)")

#NOTE the following operations are all part of step 15 - read all these attributes and check the data consistency
# NOTE the following operations are all part of step 15 - read all these attributes and check the data consistency
# after adding map(s)
await self.read_and_validate_supported_areas(step=15)

Expand Down Expand Up @@ -317,9 +317,9 @@ async def test_TC_SEAR_1_2(self):
new_supported_areas = self.areaid_list
asserts.assert_true(len(old_supported_areas) > len(new_supported_areas), "Failed to remove area(s)")

#NOTE the following operations are all part of step 19 - read all these attributes and check the data consistency
# NOTE the following operations are all part of step 19 - read all these attributes and check the data consistency
# after removing areas(s)

await self.read_and_validate_selected_areas(step=19)

if self.check_pics("SEAR.S.A0003"):
Expand Down Expand Up @@ -349,9 +349,9 @@ async def test_TC_SEAR_1_2(self):
new_supported_areas = self.areaid_list
asserts.assert_true(len(old_supported_areas) < len(new_supported_areas), "Failed to add area(s)")

#NOTE the following operations are all part of step 23 - read all these attributes and check the data consistency
# NOTE the following operations are all part of step 23 - read all these attributes and check the data consistency
# after removing areas(s)

await self.read_and_validate_selected_areas(step=23)

if self.check_pics("SEAR.S.A0003"):
Expand Down
10 changes: 5 additions & 5 deletions src/python_testing/TC_SEAR_1_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ async def read_selected_areas(self, step):
logging.info(f"SelectedAreas {selected_areas}")

return selected_areas

async def send_cmd_select_areas_expect_response(self, step, new_areas, expected_response):
self.print_step(step, f"Send SelectAreas command with NewAreas({new_areas})")
ret = await self.send_single_cmd(cmd=Clusters.Objects.ServiceArea.Commands.SelectAreas(newAreas=new_areas),
endpoint=self.endpoint)

asserts.assert_equal(ret.commandResponseState.errorStateID,
expected_response,
f"Command response ({ret.commandResponseState}) doesn't match the expected one")
Expand Down Expand Up @@ -107,7 +107,7 @@ async def test_TC_SEAR_1_3(self):

duplicated_areas = [valid_area_id, valid_area_id]

#FIXME need to check if this is the correct name of this status code
# FIXME need to check if this is the correct name of this status code
await self.send_cmd_select_areas_expect_response(step=3, new_areas=duplicated_areas, expected_response=Clusters.ServiceArea.SelectAreasStatus.kDuplicatedAreas)

await self.send_cmd_select_areas_expect_response(step=4, new_areas=[], expected_response=Clusters.ServiceArea.SelectAreasStatus.kSuccess)
Expand All @@ -134,7 +134,8 @@ async def test_TC_SEAR_1_3(self):
await self.send_cmd_select_areas_expect_response(step=10, new_areas=supported_area_ids, expected_response=Clusters.ServiceArea.SelectAreasStatus.kSuccess)

selected_areas = await self.read_selected_areas(step=11)
asserts.assert_true(len(selected_areas) == len(supported_area_ids), f"SelectedAreas({selected_areas}) should match SupportedAreas({supported_area_ids})")
asserts.assert_true(len(selected_areas) == len(supported_area_ids),
f"SelectedAreas({selected_areas}) should match SupportedAreas({supported_area_ids})")

await self.send_cmd_select_areas_expect_response(step=12, new_areas=supported_area_ids, expected_response=Clusters.ServiceArea.SelectAreasStatus.kSuccess)

Expand All @@ -150,6 +151,5 @@ async def test_TC_SEAR_1_3(self):
await self.send_cmd_select_areas_expect_response(step=14, new_areas=[valid_area_id], expected_response=Clusters.ServiceArea.SelectAreasStatus.kInvalidInMode)



if __name__ == "__main__":
default_matter_test_main()
9 changes: 5 additions & 4 deletions src/python_testing/TC_SEAR_1_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ async def test_TC_SEAR_1_4(self):
logging.info("AttributeList: %s" % (attribute_list))

if Clusters.ServiceArea.Attributes.CurrentArea not in attribute_list \
and Clusters.ServiceArea.Attributes.Progress not in attribute_list:
and Clusters.ServiceArea.Attributes.Progress not in attribute_list:

cmd_list = await self.read_sear_attribute_expect_success(
endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.AcceptedCommandList)
endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.AcceptedCommandList)
logging.info("AcceptedCommandList: %s" % (cmd_list))
asserts.assert_true(Clusters.ServiceArea.Commands.SkipArea not in cmd_list, "SkipArea command should not be implemented if both CurrentArea and Progress are not")
asserts.assert_true(Clusters.ServiceArea.Commands.SkipArea not in cmd_list,
"SkipArea command should not be implemented if both CurrentArea and Progress are not")


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit fee9906

Please sign in to comment.