Skip to content

Commit

Permalink
[Darwin] MTRDeviceController to limit concurrent subscriptions to Thr…
Browse files Browse the repository at this point in the history
…ead-enabled devices
  • Loading branch information
jtung-apple committed May 15, 2024
1 parent 0608519 commit 95cd7c5
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 65 deletions.
11 changes: 4 additions & 7 deletions src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
mClusterStateCache = std::move(aClusterStateCache);
}

// Used to reset Resubscription backoff on events that indicate likely availability of device to come back online
void ResetResubscriptionBackoff() { mResubscriptionNumRetries = 0; }

protected:
// Report an error, which may be due to issues in our own internal state or
// due to the OnError callback happening.
Expand Down Expand Up @@ -147,6 +144,10 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
NSMutableArray * _Nullable mAttributeReports = nil;
NSMutableArray * _Nullable mEventReports = nil;

void CallResubscriptionScheduledHandler(NSError * error, NSNumber * resubscriptionDelay);
// Copied from ReadClient and customized for MTRDevice resubscription time reset
uint32_t ComputeTimeTillNextSubscription();

private:
DataReportCallback _Nullable mAttributeReportCallback = nil;
DataReportCallback _Nullable mEventReportCallback = nil;
Expand Down Expand Up @@ -181,10 +182,6 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
bool mHaveQueuedDeletion = false;
OnDoneHandler _Nullable mOnDoneHandler = nil;
dispatch_block_t mInterimReportBlock = nil;

// Copied from ReadClient and customized for
uint32_t ComputeTimeTillNextSubscription();
uint32_t mResubscriptionNumRetries = 0;
};

NS_ASSUME_NONNULL_END
46 changes: 9 additions & 37 deletions src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
Original file line number Diff line number Diff line change
Expand Up @@ -125,57 +125,29 @@

void MTRBaseSubscriptionCallback::OnSubscriptionEstablished(SubscriptionId aSubscriptionId)
{
// ReadClient resets it at ProcessSubscribeResponse after calling OnSubscriptionEstablished, so this is equivalent
mResubscriptionNumRetries = 0;
if (mSubscriptionEstablishedHandler) {
auto subscriptionEstablishedHandler = mSubscriptionEstablishedHandler;
subscriptionEstablishedHandler();
}
}

uint32_t MTRBaseSubscriptionCallback::ComputeTimeTillNextSubscription()
CHIP_ERROR MTRBaseSubscriptionCallback::OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause)
{
uint32_t maxWaitTimeInMsec = 0;
uint32_t waitTimeInMsec = 0;
uint32_t minWaitTimeInMsec = 0;

if (mResubscriptionNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) {
maxWaitTimeInMsec = GetFibonacciForIndex(mResubscriptionNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS;
} else {
maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS;
}
CHIP_ERROR err = ClusterStateCache::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause);
ReturnErrorOnFailure(err);

if (maxWaitTimeInMsec != 0) {
minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100;
waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec));
}

return waitTimeInMsec;
auto error = [MTRError errorForCHIPErrorCode:aTerminationCause];
auto delayMs = @(apReadClient->ComputeTimeTillNextSubscription());
CallResubscriptionScheduledHandler(error, delayMs);
return CHIP_NO_ERROR;
}

CHIP_ERROR MTRBaseSubscriptionCallback::OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause)
void MTRBaseSubscriptionCallback::CallResubscriptionScheduledHandler(NSError * error, NSNumber * resubscriptionDelay)
{
// No need to check ReadClient internal state is Idle because ReadClient only calls OnResubscriptionNeeded after calling ClearActiveSubscriptionState(), which sets the state to Idle.

// This part is copied from ReadClient's DefaultResubscribePolicy:
auto timeTillNextResubscription = ComputeTimeTillNextSubscription();
ChipLogProgress(DataManagement,
"Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32
"ms due to error %" CHIP_ERROR_FORMAT,
apReadClient->GetFabricIndex(), ChipLogValueX64(apReadClient->GetPeerNodeId()), mResubscriptionNumRetries, timeTillNextResubscription,
aTerminationCause.Format());
ReturnErrorOnFailure(apReadClient->ScheduleResubscription(timeTillNextResubscription, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT));

// Not as good a place to increment as when resubscription timer fires, but as is, this should be as good, because OnResubscriptionNeeded is only called from ReadClient's Close() while Idle, and nothing should cause this to happen
mResubscriptionNumRetries++;

if (mResubscriptionCallback != nil) {
auto callback = mResubscriptionCallback;
auto error = [MTRError errorForCHIPErrorCode:aTerminationCause];
auto delayMs = @(apReadClient->ComputeTimeTillNextSubscription());
callback(error, delayMs);
callback(error, resubscriptionDelay);
}
return CHIP_NO_ERROR;
}

void MTRBaseSubscriptionCallback::OnUnsolicitedMessageFromPublisher(ReadClient *)
Expand Down
195 changes: 187 additions & 8 deletions src/darwin/Framework/CHIP/MTRDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "lib/core/CHIPError.h"
#include "lib/core/DataModelTypes.h"
#include <app/ConcreteAttributePath.h>
#include <lib/support/FibonacciUtils.h>

#include <app/AttributePathParams.h>
#include <app/BufferedReadCallback.h>
Expand Down Expand Up @@ -122,10 +123,19 @@ - (id)strongObject
{
}

// Used to reset Resubscription backoff on events that indicate likely availability of device to come back online
void ResetResubscriptionBackoff() { mResubscriptionNumRetries = 0; }

private:
void OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus) override;

void OnAttributeData(const ConcreteDataAttributePath & aPath, TLV::TLVReader * apData, const StatusIB & aStatus) override;

CHIP_ERROR OnResubscriptionNeeded(chip::app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override;

// Copied from ReadClient and customized for MTRDevice resubscription time reset
uint32_t ComputeTimeTillNextSubscription();
uint32_t mResubscriptionNumRetries = 0;
};

} // anonymous namespace
Expand Down Expand Up @@ -382,6 +392,12 @@ @implementation MTRDevice {
// This boolean keeps track of any device configuration changes received in an attribute report.
// If this is true when the report ends, we notify the delegate.
BOOL _deviceConfigurationChanged;

// The completion block is set when the subscription / resubscription work is enqueued, and called / cleared when:
// 1. Subscription establishes
// 2. When OnResubscriptionNeeded is called
// 3. On subscription reset (including when getSessionForNode fails)
MTRAsyncWorkCompletionBlock _subscriptionPoolWorkCompletionBlock;
}

- (instancetype)initWithNodeID:(NSNumber *)nodeID controller:(MTRDeviceController *)controller
Expand Down Expand Up @@ -884,6 +900,9 @@ - (void)_handleSubscriptionEstablished
{
os_unfair_lock_lock(&self->_lock);

// We have completed the subscription work - remove from the subscription pool.
[self _clearSubscriptionPoolWork];

// reset subscription attempt wait time when subscription succeeds
_lastSubscriptionAttemptWait = 0;
_internalDeviceState = MTRInternalDeviceStateInitalSubscriptionEstablished;
Expand Down Expand Up @@ -919,9 +938,79 @@ - (void)_handleSubscriptionError:(NSError *)error
[self _changeState:MTRDeviceStateUnreachable];
}

- (void)_handleResubscriptionNeeded
// This method is used for signaling whether to use the subscription pool. This functions as
// a heuristic for whether to throttle subscriptions to the device via a pool of subscriptions.
// If products appear that have both Thread and Wifi enabled but is primarily on wifi, this
// method will need to be updated to reflect that.
- (BOOL)_deviceUsesThread
{
std::lock_guard lock(_lock);
os_unfair_lock_assert_owner(&self->_lock);

// Device is thread-enabled if there is a Thread Network Diagnostics cluster on endpoint 0
for (MTRClusterPath * path in [self _knownClusters]) {
if (path.endpoint.unsignedShortValue != 0) {
continue;
}

if (path.cluster.unsignedLongValue == MTRClusterIDTypeThreadNetworkDiagnosticsID) {
return YES;
}
}

return NO;
}

- (void)_clearSubscriptionPoolWork
{
os_unfair_lock_assert_owner(&self->_lock);
MTRAsyncWorkCompletionBlock completion = self->_subscriptionPoolWorkCompletionBlock;
if (completion) {
self->_subscriptionPoolWorkCompletionBlock = nil;
completion(MTRAsyncWorkComplete);
}
}

- (void)_scheduleSubscriptionPoolWork:(void (^)(void))workBlock inNanoseconds:(int64_t)inNanoseconds description:(NSString *)description
{
os_unfair_lock_assert_owner(&self->_lock);

// Sanity check we are not scheduling for this device multiple times in the pool
if (_subscriptionPoolWorkCompletionBlock) {
MTR_LOG_DEFAULT("%@ already scheduled in subscription pool for this device - ignoring: %@", self, description);
return;
}

// Wait the required amount of time, then put it in the subscription pool to wait additionally for a spot, if needed
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, inNanoseconds), dispatch_get_main_queue(), ^{
// In the case where a resubscription triggering event happened and already established, running the work block should result in a no-op
MTRAsyncWorkItem * workItem = [[MTRAsyncWorkItem alloc] initWithQueue:self.queue];
[workItem setReadyHandler:^(id _Nonnull context, NSInteger retryCount, MTRAsyncWorkCompletionBlock _Nonnull completion) {
os_unfair_lock_lock(&self->_lock);
if (self->_subscriptionPoolWorkCompletionBlock) {
// This means a resubscription triggering event happened and is now in-progress
MTR_LOG_DEFAULT("%@ timer fired but already running in subscription pool - ignoring: %@", self, description);
os_unfair_lock_unlock(&self->_lock);

// call completion as complete to remove from queue
completion(MTRAsyncWorkComplete);
return;
}

// Otherwise, save the completion block
self->_subscriptionPoolWorkCompletionBlock = completion;
os_unfair_lock_unlock(&self->_lock);

workBlock();
}];
[self->_deviceController.concurrentSubscriptionPool enqueueWorkItem:workItem description:description];
});
}

- (void)_handleResubscriptionNeededWithDelay:(NSNumber *)resubscriptionDelayMs
{
BOOL deviceUsesThread;

os_unfair_lock_lock(&self->_lock);

[self _changeState:MTRDeviceStateUnknown];

Expand All @@ -932,6 +1021,36 @@ - (void)_handleResubscriptionNeeded
// retries immediately.
_lastSubscriptionFailureTime = [NSDate now];

deviceUsesThread = [self _deviceUsesThread];

// If a previous resubscription failed, remove the item from the subscription pool.
[self _clearSubscriptionPoolWork];

os_unfair_lock_unlock(&self->_lock);

// Use the existing _triggerResubscribeWithReason mechanism, which does the right checks when
// this block is run -- if other triggering events had happened, this would become a no-op.
auto resubscriptionBlock = ^{
[self->_deviceController asyncDispatchToMatterQueue:^{
[self _triggerResubscribeWithReason:"ResubscriptionNeeded timer fired" nodeLikelyReachable:NO];
} errorHandler:^(NSError * _Nonnull error) {
// If controller is not running, clear work item from the subscription queue
MTR_LOG_INFO("%@ could not dispatch to matter queue for resubscription - error %@", self, error);
std::lock_guard lock(self->_lock);
[self _clearSubscriptionPoolWork];
}];
};

int64_t resubscriptionDelayNs = static_cast<int64_t>(resubscriptionDelayMs.unsignedIntValue * NSEC_PER_MSEC);
if (deviceUsesThread) {
std::lock_guard lock(_lock);
// For Thread-enabled devices, schedule the _triggerResubscribeWithReason call to run in the subscription pool
[self _scheduleSubscriptionPoolWork:resubscriptionBlock inNanoseconds:resubscriptionDelayNs description:@"ReadClient resubscription"];
} else {
// For non-Thread-enabled devices, just call the resubscription block after the specified time
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, resubscriptionDelayNs), self.queue, resubscriptionBlock);
}

// Set up connectivity monitoring in case network routability changes for the positive, to accellerate resubscription
[self _setupConnectivityMonitoring];
}
Expand Down Expand Up @@ -984,11 +1103,26 @@ - (void)_handleSubscriptionReset:(NSNumber * _Nullable)retryDelay
}

MTR_LOG_DEFAULT("%@ scheduling to reattempt subscription in %f seconds", self, secondsToWait);
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t) (secondsToWait * NSEC_PER_SEC)), self.queue, ^{

// If we started subscription or session establishment but failed, remove item from the subscription pool so we can re-queue.
[self _clearSubscriptionPoolWork];

// Call _reattemptSubscriptionNowIfNeeded when timer fires - if subscription is
// in a better state at that time this will be a no-op.
auto resubscriptionBlock = ^{
os_unfair_lock_lock(&self->_lock);
[self _reattemptSubscriptionNowIfNeeded];
os_unfair_lock_unlock(&self->_lock);
});
};

int64_t resubscriptionDelayNs = static_cast<int64_t>(secondsToWait * NSEC_PER_SEC);
if ([self _deviceUsesThread]) {
// For Thread-enabled devices, schedule the _reattemptSubscriptionNowIfNeeded call to run in the subscription pool
[self _scheduleSubscriptionPoolWork:resubscriptionBlock inNanoseconds:resubscriptionDelayNs description:@"MTRDevice resubscription"];
} else {
// For non-Thread-enabled devices, just call the resubscription block after the specified time
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, resubscriptionDelayNs), self.queue, resubscriptionBlock);
}
}

- (void)_reattemptSubscriptionNowIfNeeded
Expand Down Expand Up @@ -1423,7 +1557,7 @@ - (void)_setupConnectivityMonitoring
self->_connectivityMonitor = [[MTRDeviceConnectivityMonitor alloc] initWithCompressedFabricID:compressedFabricID nodeID:self.nodeID];
[self->_connectivityMonitor startMonitoringWithHandler:^{
[self->_deviceController asyncDispatchToMatterQueue:^{
[self _triggerResubscribeWithReason:"read-through skipped while not subscribed" nodeLikelyReachable:YES];
[self _triggerResubscribeWithReason:"device connectivity changed" nodeLikelyReachable:YES];
}
errorHandler:nil];
} queue:self.queue];
Expand Down Expand Up @@ -1512,11 +1646,11 @@ - (void)_setupSubscription
[self _handleSubscriptionError:error];
});
},
^(NSError * error, NSNumber * resubscriptionDelay) {
MTR_LOG_DEFAULT("%@ got resubscription error %@ delay %@", self, error, resubscriptionDelay);
^(NSError * error, NSNumber * resubscriptionDelayMs) {
MTR_LOG_DEFAULT("%@ got resubscription error %@ delay %@", self, error, resubscriptionDelayMs);
dispatch_async(self.queue, ^{
// OnResubscriptionNeeded
[self _handleResubscriptionNeeded];
[self _handleResubscriptionNeededWithDelay:resubscriptionDelayMs];
});
},
^(void) {
Expand Down Expand Up @@ -3070,4 +3204,49 @@ - (void)invokeCommandWithEndpointID:(NSNumber *)endpointID

QueueInterimReport();
}

uint32_t SubscriptionCallback::ComputeTimeTillNextSubscription()
{
uint32_t maxWaitTimeInMsec = 0;
uint32_t waitTimeInMsec = 0;
uint32_t minWaitTimeInMsec = 0;

if (mResubscriptionNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) {
maxWaitTimeInMsec = GetFibonacciForIndex(mResubscriptionNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS;
} else {
maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS;
}

if (maxWaitTimeInMsec != 0) {
minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100;
waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec));
}

return waitTimeInMsec;
}

CHIP_ERROR SubscriptionCallback::OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause)
{
// No need to check ReadClient internal state is Idle because ReadClient only calls OnResubscriptionNeeded after calling ClearActiveSubscriptionState(), which sets the state to Idle.

// This part is copied from ReadClient's DefaultResubscribePolicy:
auto timeTillNextResubscriptionMs = ComputeTimeTillNextSubscription();
ChipLogProgress(DataManagement,
"Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32
"ms due to error %" CHIP_ERROR_FORMAT,
apReadClient->GetFabricIndex(), ChipLogValueX64(apReadClient->GetPeerNodeId()), mResubscriptionNumRetries, timeTillNextResubscriptionMs,
aTerminationCause.Format());

// Schedule a maximum time resubscription, to be triggered with TriggerResubscribeIfScheduled after a separate timer.
// This way the aReestablishCASE value is saved, and the sanity checks in ScheduleResubscription are observed and returned.
ReturnErrorOnFailure(apReadClient->ScheduleResubscription(UINT32_MAX, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT));

// Not as good a place to increment as when resubscription timer fires, but as is, this should be as good, because OnResubscriptionNeeded is only called from ReadClient's Close() while Idle, and nothing should cause this to happen
mResubscriptionNumRetries++;

auto error = [MTRError errorForCHIPErrorCode:aTerminationCause];
CallResubscriptionScheduledHandler(error, @(timeTillNextResubscriptionMs));

return CHIP_NO_ERROR;
}
} // anonymous namespace
Loading

0 comments on commit 95cd7c5

Please sign in to comment.