Skip to content

Commit

Permalink
python test framework: PICS 2.0 (#34384)
Browse files Browse the repository at this point in the history
* [WIP] python test framework PICS 2.0

* typo

* use ms int for test duration

* Add documentation to decorators and helpers

* Remove unused parameters

* Restyled by autopep8

* Restyled by isort

* linter

* fix merge conflict

* formatting of imports

* address review comments

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
2 people authored and pull[bot] committed Oct 25, 2024
1 parent 18a59a1 commit 5479857
Show file tree
Hide file tree
Showing 6 changed files with 559 additions and 44 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,8 @@ jobs:
scripts/run_in_python_env.sh out/venv 'python3 ./src/python_testing/TestConformanceSupport.py'
scripts/run_in_python_env.sh out/venv 'python3 ./src/python_testing/test_testing/test_IDM_10_4.py'
scripts/run_in_python_env.sh out/venv 'python3 ./src/python_testing/test_testing/test_TC_SC_7_1.py'
scripts/run_in_python_env.sh out/venv 'python3 ./src/python_testing/test_testing/TestDecorators.py'
- name: Uploading core files
uses: actions/upload-artifact@v4
Expand Down
6 changes: 6 additions & 0 deletions scripts/py_matter_yamltests/matter_yamltests/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ def show_prompt(self,
"""
pass

def test_skipped(self, filename: str, name: str):
"""
This method is called when the test script determines that the test is not applicable for the DUT.
"""
pass


class WebSocketRunnerHooks():
def connecting(self, url: str):
Expand Down
55 changes: 24 additions & 31 deletions src/python_testing/TC_TIMESYNC_2_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,53 +32,46 @@

import chip.clusters as Clusters
from chip.clusters.Types import NullValue
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, utc_time_in_matter_epoch
from matter_testing_support import (MatterBaseTest, default_matter_test_main, has_attribute, has_cluster, per_endpoint_test,
utc_time_in_matter_epoch)
from mobly import asserts


class TC_TIMESYNC_2_1(MatterBaseTest):
async def read_ts_attribute_expect_success(self, endpoint, attribute):
async def read_ts_attribute_expect_success(self, attribute):
cluster = Clusters.Objects.TimeSynchronization
return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute)
return await self.read_single_attribute_check_success(endpoint=None, cluster=cluster, attribute=attribute)

def pics_TC_TIMESYNC_2_1(self) -> list[str]:
return ["TIMESYNC.S"]

@async_test_body
@per_endpoint_test(has_cluster(Clusters.TimeSynchronization) and has_attribute(Clusters.TimeSynchronization.Attributes.TimeSource))
async def test_TC_TIMESYNC_2_1(self):
endpoint = 0

features = await self.read_single_attribute(dev_ctrl=self.default_controller, node_id=self.dut_node_id,
endpoint=endpoint, attribute=Clusters.TimeSynchronization.Attributes.FeatureMap)
attributes = Clusters.TimeSynchronization.Attributes
features = await self.read_ts_attribute_expect_success(attribute=attributes.FeatureMap)

self.supports_time_zone = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kTimeZone)
self.supports_ntpc = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kNTPClient)
self.supports_ntps = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kNTPServer)
self.supports_trusted_time_source = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kTimeSyncClient)

time_cluster = Clusters.TimeSynchronization
timesync_attr_list = time_cluster.Attributes.AttributeList
attribute_list = await self.read_single_attribute_check_success(endpoint=endpoint, cluster=time_cluster, attribute=timesync_attr_list)
timesource_attr_id = time_cluster.Attributes.TimeSource.attribute_id
timesync_attr_list = attributes.AttributeList
attribute_list = await self.read_ts_attribute_expect_success(attribute=timesync_attr_list)
timesource_attr_id = attributes.TimeSource.attribute_id

self.print_step(1, "Commissioning, already done")
attributes = Clusters.TimeSynchronization.Attributes

self.print_step(2, "Read Granularity attribute")
granularity_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.Granularity)
granularity_dut = await self.read_ts_attribute_expect_success(attribute=attributes.Granularity)
asserts.assert_less(granularity_dut, Clusters.TimeSynchronization.Enums.GranularityEnum.kUnknownEnumValue,
"Granularity is not in valid range")

self.print_step(3, "Read TimeSource")
if timesource_attr_id in attribute_list:
time_source = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.TimeSource)
time_source = await self.read_ts_attribute_expect_success(attribute=attributes.TimeSource)
asserts.assert_less(time_source, Clusters.TimeSynchronization.Enums.TimeSourceEnum.kUnknownEnumValue,
"TimeSource is not in valid range")

self.print_step(4, "Read TrustedTimeSource")
if self.supports_trusted_time_source:
trusted_time_source = await self.read_ts_attribute_expect_success(endpoint=endpoint,
attribute=attributes.TrustedTimeSource)
trusted_time_source = await self.read_ts_attribute_expect_success(attribute=attributes.TrustedTimeSource)
if trusted_time_source is not NullValue:
asserts.assert_less_equal(trusted_time_source.fabricIndex, 0xFE,
"FabricIndex for the TrustedTimeSource is out of range")
Expand All @@ -87,7 +80,7 @@ async def test_TC_TIMESYNC_2_1(self):

self.print_step(5, "Read DefaultNTP")
if self.supports_ntpc:
default_ntp = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.DefaultNTP)
default_ntp = await self.read_ts_attribute_expect_success(attribute=attributes.DefaultNTP)
if default_ntp is not NullValue:
asserts.assert_less_equal(len(default_ntp), 128, "DefaultNTP length must be less than 128")
# Assume this is a valid web address if it has at least one . in the name
Expand All @@ -102,7 +95,7 @@ async def test_TC_TIMESYNC_2_1(self):

self.print_step(6, "Read TimeZone")
if self.supports_time_zone:
tz_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.TimeZone)
tz_dut = await self.read_ts_attribute_expect_success(attribute=attributes.TimeZone)
asserts.assert_greater_equal(len(tz_dut), 1, "TimeZone must have at least one entry in the list")
asserts.assert_less_equal(len(tz_dut), 2, "TimeZone may have a maximum of two entries in the list")
for entry in tz_dut:
Expand All @@ -117,7 +110,7 @@ async def test_TC_TIMESYNC_2_1(self):

self.print_step(7, "Read DSTOffset")
if self.supports_time_zone:
dst_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.DSTOffset)
dst_dut = await self.read_ts_attribute_expect_success(attribute=attributes.DSTOffset)
last_valid_until = -1
last_valid_starting = -1
for dst in dst_dut:
Expand All @@ -131,7 +124,7 @@ async def test_TC_TIMESYNC_2_1(self):
asserts.assert_equal(dst, dst_dut[-1], "DSTOffset list must have Null ValidUntil at the end")

self.print_step(8, "Read UTCTime")
utc_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.UTCTime)
utc_dut = await self.read_ts_attribute_expect_success(attribute=attributes.UTCTime)
if utc_dut is NullValue:
asserts.assert_equal(granularity_dut, Clusters.TimeSynchronization.Enums.GranularityEnum.kNoTimeGranularity)
else:
Expand All @@ -146,8 +139,8 @@ async def test_TC_TIMESYNC_2_1(self):

self.print_step(9, "Read LocalTime")
if self.supports_time_zone:
utc_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.UTCTime)
local_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.LocalTime)
utc_dut = await self.read_ts_attribute_expect_success(attribute=attributes.UTCTime)
local_dut = await self.read_ts_attribute_expect_success(attribute=attributes.LocalTime)
if utc_dut is NullValue:
asserts.assert_true(local_dut is NullValue, "LocalTime must be Null if UTC time is Null")
elif len(dst_dut) == 0:
Expand All @@ -161,30 +154,30 @@ async def test_TC_TIMESYNC_2_1(self):

self.print_step(10, "Read TimeZoneDatabase")
if self.supports_time_zone:
tz_db_dut = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.TimeZoneDatabase)
tz_db_dut = await self.read_ts_attribute_expect_success(attribute=attributes.TimeZoneDatabase)
asserts.assert_less(tz_db_dut, Clusters.TimeSynchronization.Enums.TimeZoneDatabaseEnum.kUnknownEnumValue,
"TimeZoneDatabase is not in valid range")

self.print_step(11, "Read NTPServerAvailable")
if self.supports_ntps:
# bool typechecking happens in the test read functions, so all we need to do here is do the read
await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.NTPServerAvailable)
await self.read_ts_attribute_expect_success(attribute=attributes.NTPServerAvailable)

self.print_step(12, "Read TimeZoneListMaxSize")
if self.supports_time_zone:
size = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.TimeZoneListMaxSize)
size = await self.read_ts_attribute_expect_success(attribute=attributes.TimeZoneListMaxSize)
asserts.assert_greater_equal(size, 1, "TimeZoneListMaxSize must be at least 1")
asserts.assert_less_equal(size, 2, "TimeZoneListMaxSize must be max 2")

self.print_step(13, "Read DSTOffsetListMaxSize")
if self.supports_time_zone:
size = await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.DSTOffsetListMaxSize)
size = await self.read_ts_attribute_expect_success(attribute=attributes.DSTOffsetListMaxSize)
asserts.assert_greater_equal(size, 1, "DSTOffsetListMaxSize must be at least 1")

self.print_step(14, "Read SupportsDNSResolve")
# bool typechecking happens in the test read functions, so all we need to do here is do the read
if self.supports_ntpc:
await self.read_ts_attribute_expect_success(endpoint=endpoint, attribute=attributes.SupportsDNSResolve)
await self.read_ts_attribute_expect_success(attribute=attributes.SupportsDNSResolve)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 5479857

Please sign in to comment.