From c971761384875c63671d9dc44af2853a0d862941 Mon Sep 17 00:00:00 2001 From: Jerry Johns Date: Fri, 17 Jun 2022 07:37:25 -0700 Subject: [PATCH] Add a new callback on the `SubscriptionTransaction` object to permit registering for resubscription notifications. --- .../python/chip/clusters/Attribute.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index a70f49dda601e7..92b1ed47cda3a1 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -466,6 +466,7 @@ def UpdateCachedData(self): class SubscriptionTransaction: def __init__(self, transaction: 'AsyncReadTransaction', subscriptionId, devCtrl): + self._onResubscriptionAttemptedCb = DefaultResubscriptionAttemptedCallback self._onAttributeChangeCb = DefaultAttributeChangeCallback self._onEventChangeCb = DefaultEventChangeCallback self._readTransaction = transaction @@ -491,6 +492,15 @@ def GetAttribute(self, path: TypedAttributePath) -> Any: def GetEvents(self): return self._readTransaction.GetAllEventValues() + def SetResubscriptionAttemptedCallback(self, callback: Callable[[SubscriptionTransaction, int, int], None]): + ''' + Sets the callback function that gets invoked anytime a re-subscription is attempted. The callback is expected + to have the following signature: + def Callback(transaction: SubscriptionTransaction, errorEncountered: int, nextResubscribeIntervalMsec: int) + ''' + if callback is not None: + self._onResubscriptionAttemptedCb = callback + def SetAttributeUpdateCallback(self, callback: Callable[[TypedAttributePath, SubscriptionTransaction], None]): ''' Sets the callback function for the attribute value change event, accepts a Callable accepts an attribute path and the cached data. @@ -528,6 +538,10 @@ def __repr__(self): return f'' +def DefaultResubscriptionAttemptedCallback(transaction: SubscriptionTransaction, terminationError, nextResubscribeIntervalMsec): + print(f"Previous subscription failed with Error: {terminationError} - re-subscribing in {nextResubscribeIntervalMsec}ms...") + + def DefaultAttributeChangeCallback(path: TypedAttributePath, transaction: SubscriptionTransaction): data = transaction.GetAttribute(path) value = { @@ -677,8 +691,9 @@ def handleSubscriptionEstablished(self, subscriptionId): self._event_loop.call_soon_threadsafe( self._handleSubscriptionEstablished, subscriptionId) - def handleResubscriptionAttempted(self, terminationCause, nextResubscribeIntervalMsec): - print("would resubscribe with error " + str(terminationCause) + " in " + str(nextResubscribeIntervalMsec)) + def handleResubscriptionAttempted(self, terminationCause: int, nextResubscribeIntervalMsec: int): + self._event_loop.call_soon_threadsafe( + self._subscription_handler._onResubscriptionAttemptedCb, self._subscription_handler, terminationCause, nextResubscribeIntervalMsec) def _handleReportBegin(self): pass @@ -781,7 +796,7 @@ def _OnSubscriptionEstablishedCallback(closure, subscriptionId): @_OnResubscriptionAttemptedCallbackFunct -def _OnResubscriptionAttemptedCallback(closure, terminationCause, nextResubscribeIntervalMsec): +def _OnResubscriptionAttemptedCallback(closure, terminationCause: int, nextResubscribeIntervalMsec: int): closure.handleResubscriptionAttempted(terminationCause, nextResubscribeIntervalMsec)