Skip to content

Commit

Permalink
[python] update cached attributes incrementally (#26774)
Browse files Browse the repository at this point in the history
* [python] update cached attributes incrementally

Instead of rebuilding the cache from scratch on every report, just
update the Endpoint/Cluster/Attributes which actually changed.
Obviously, this is significantly faster, especially for small updates.

* Update src/controller/python/chip/clusters/Attribute.py

Co-authored-by: Marcel van der Veldt <[email protected]>

* Address review feedback

* Address one more unnecessary if bracket

* Add attribute cache tests using integration tests

* Fix lintinig issues

* Apply same report interval

* Increase timeout for mobile-device-test.py

---------

Co-authored-by: Marcel van der Veldt <[email protected]>
  • Loading branch information
2 people authored and pull[bot] committed Jul 24, 2023
1 parent 53905b5 commit 4747296
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 52 deletions.
106 changes: 55 additions & 51 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, V

clusterCache[path.AttributeId] = data

def UpdateCachedData(self):
def UpdateCachedData(self, changedPathSet: set[AttributePath]):
''' This converts the raw TLV data into a cluster object format.
Two formats are available:
Expand All @@ -401,68 +401,72 @@ def UpdateCachedData(self):
tlvCache = self.attributeTLVCache
attributeCache = self.attributeCache

for endpoint in tlvCache:
if (endpoint not in attributeCache):
attributeCache[endpoint] = {}
for attributePath in changedPathSet:
endpointId = attributePath.EndpointId

endpointCache = attributeCache[endpoint]
if endpointId not in attributeCache:
attributeCache[endpointId] = {}

for cluster in tlvCache[endpoint]:
if cluster not in _ClusterIndex:
endpointCache = attributeCache[endpointId]

clusterId = attributePath.ClusterId

if clusterId not in _ClusterIndex:
#
# #22599 tracks dealing with unknown clusters more
# gracefully so that clients can still access this data.
#
continue

clusterType = _ClusterIndex[clusterId]

if clusterType not in endpointCache:
endpointCache[clusterType] = {}

clusterCache = endpointCache[clusterType]
clusterDataVersion = self.versionList.get(
endpointId, {}).get(clusterId, None)

if self.returnClusterObject:
try:
# Since the TLV data is already organized by attribute tags, we can trivially convert to a cluster object representation.
endpointCache[clusterType] = clusterType.FromDict(
data=clusterType.descriptor.TagDictToLabelDict([], tlvCache[endpointId][clusterId]))
endpointCache[clusterType].SetDataVersion(
clusterDataVersion)
except Exception as ex:
decodedValue = ValueDecodeFailure(
tlvCache[endpointId][clusterId], ex)
endpointCache[clusterType] = decodedValue
else:
clusterCache[DataVersion] = clusterDataVersion

attributeId = attributePath.AttributeId

value = tlvCache[endpointId][clusterId][attributeId]

if (clusterId, attributeId) not in _AttributeIndex:
#
# #22599 tracks dealing with unknown clusters more
# gracefully so that clients can still access this data.
#
continue

clusterType = _ClusterIndex[cluster]

if (clusterType not in endpointCache):
endpointCache[clusterType] = {}
attributeType = _AttributeIndex[(clusterId, attributeId)][0]

clusterCache = endpointCache[clusterType]
clusterDataVersion = self.versionList.get(
endpoint, {}).get(cluster, None)
if attributeType not in clusterCache:
clusterCache[attributeType] = {}

if (self.returnClusterObject):
if isinstance(value, ValueDecodeFailure):
clusterCache[attributeType] = value
else:
try:
# Since the TLV data is already organized by attribute tags, we can trivially convert to a cluster object representation.
endpointCache[clusterType] = clusterType.FromDict(
data=clusterType.descriptor.TagDictToLabelDict([], tlvCache[endpoint][cluster]))
endpointCache[clusterType].SetDataVersion(
clusterDataVersion)
decodedValue = attributeType.FromTagDictOrRawValue(
tlvCache[endpointId][clusterId][attributeId])
except Exception as ex:
decodedValue = ValueDecodeFailure(
tlvCache[endpoint][cluster], ex)
endpointCache[clusterType] = decodedValue
else:
clusterCache[DataVersion] = clusterDataVersion
for attribute in tlvCache[endpoint][cluster]:
value = tlvCache[endpoint][cluster][attribute]

if (cluster, attribute) not in _AttributeIndex:
#
# #22599 tracks dealing with unknown clusters more
# gracefully so that clients can still access this data.
#
continue

attributeType = _AttributeIndex[(
cluster, attribute)][0]

if (attributeType not in clusterCache):
clusterCache[attributeType] = {}

if (type(value) is ValueDecodeFailure):
clusterCache[attributeType] = value
else:
try:
decodedValue = attributeType.FromTagDictOrRawValue(
tlvCache[endpoint][cluster][attribute])
except Exception as ex:
decodedValue = ValueDecodeFailure(value, ex)
decodedValue = ValueDecodeFailure(value, ex)

clusterCache[attributeType] = decodedValue
clusterCache[attributeType] = decodedValue


class SubscriptionTransaction:
Expand Down Expand Up @@ -766,7 +770,7 @@ def _handleReportBegin(self):
pass

def _handleReportEnd(self):
self._cache.UpdateCachedData()
self._cache.UpdateCachedData(self._changedPathSet)

if (self._subscription_handler is not None):
for change in self._changedPathSet:
Expand Down
81 changes: 81 additions & 0 deletions src/controller/python/test/test_scripts/cluster_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,85 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction):

sub.Shutdown()

@ classmethod
@ base.test_case
async def TestAttributeCacheAttributeView(cls, devCtrl):
logger.info("Test AttributeCache Attribute-View")
sub: SubscriptionTransaction = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=[(1, Clusters.OnOff.Attributes.OnOff)], returnClusterObject=False, reportInterval=(3, 10))

event = asyncio.Event()

def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction):
event.set()

sub.SetAttributeUpdateCallback(subUpdate)

try:
data = sub.GetAttributes()
req = Clusters.OnOff.Commands.On()
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req)

await asyncio.wait_for(event.wait(), timeout=11)

if (data[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnOff] != 1):
raise ValueError("Current On/Off state should be 1")

event.clear()

req = Clusters.OnOff.Commands.Off()
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req)

await asyncio.wait_for(event.wait(), timeout=11)

if (data[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnOff] != 0):
raise ValueError("Current On/Off state should be 0")

except TimeoutError:
raise AssertionError("Did not receive updated attribute")
finally:
sub.Shutdown()

@ classmethod
@ base.test_case
async def TestAttributeCacheClusterView(cls, devCtrl):
logger.info("Test AttributeCache Cluster-View")
sub: SubscriptionTransaction = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=[(1, Clusters.OnOff.Attributes.OnOff)], returnClusterObject=True, reportInterval=(3, 10))

event = asyncio.Event()

def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction):
event.set()

sub.SetAttributeUpdateCallback(subUpdate)

try:
data = sub.GetAttributes()

req = Clusters.OnOff.Commands.On()
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req)

await asyncio.wait_for(event.wait(), timeout=11)

cluster: Clusters.OnOff = data[1][Clusters.OnOff]
if (not cluster.onOff):
raise ValueError("Current On/Off state should be True")

event.clear()

req = Clusters.OnOff.Commands.Off()
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req)

await asyncio.wait_for(event.wait(), timeout=11)

cluster: Clusters.OnOff = data[1][Clusters.OnOff]
if (cluster.onOff):
raise ValueError("Current On/Off state should be False")

except TimeoutError:
raise AssertionError("Did not receive updated attribute")
finally:
sub.Shutdown()

@ classmethod
@ base.test_case
async def TestSubscribeZeroMinInterval(cls, devCtrl):
Expand Down Expand Up @@ -638,6 +717,8 @@ async def RunTest(cls, devCtrl):
await cls.TestReadAttributeRequests(devCtrl)
await cls.TestSubscribeZeroMinInterval(devCtrl)
await cls.TestSubscribeAttribute(devCtrl)
await cls.TestAttributeCacheAttributeView(devCtrl)
await cls.TestAttributeCacheClusterView(devCtrl)
await cls.TestMixedReadAttributeAndEvents(devCtrl)
# Note: Write will change some attribute values, always put it after read tests
await cls.TestWriteRequest(devCtrl)
Expand Down
2 changes: 1 addition & 1 deletion src/test_driver/linux-cirque/MobileDeviceTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def run_controller_test(self):
CHIP_REPO, "out/debug/linux_x64_gcc/controller/python/chip_repl-0.0-py3-none-any.whl")))

command = ("gdb -batch -return-child-result -q -ex run -ex \"thread apply all bt\" "
"--args python3 {} -t 240 -a {} --paa-trust-store-path {}").format(
"--args python3 {} -t 300 -a {} --paa-trust-store-path {}").format(
os.path.join(
CHIP_REPO, "src/controller/python/test/test_scripts/mobile-device-test.py"), ethernet_ip,
os.path.join(CHIP_REPO, MATTER_DEVELOPMENT_PAA_ROOT_CERTS))
Expand Down

0 comments on commit 4747296

Please sign in to comment.