Skip to content

Commit

Permalink
Ensure that all MTRDevice state/internalState changes happen on the M…
Browse files Browse the repository at this point in the history
…atter queue. (#35490)

This avoids races where we queue blocks to different queues that both try to
change the state, which were resulting in non-deterministic final state.

Fixes #34796
  • Loading branch information
bzbarsky-apple authored Sep 9, 2024
1 parent 2e5b709 commit 3f0f242
Showing 1 changed file with 89 additions and 46 deletions.
135 changes: 89 additions & 46 deletions src/darwin/Framework/CHIP/MTRDevice_Concrete.mm
Original file line number Diff line number Diff line change
Expand Up @@ -757,11 +757,16 @@ - (void)_ensureSubscriptionForExistingDelegates:(NSString *)reason
if ([self _deviceUsesThread]) {
MTR_LOG(" => %@ - device is a thread device, scheduling in pool", self);
[self _scheduleSubscriptionPoolWork:^{
std::lock_guard lock(self->_lock);
[self _setupSubscriptionWithReason:[NSString stringWithFormat:@"%@ and scheduled subscription is happening", reason]];
[self->_deviceController asyncDispatchToMatterQueue:^{
std::lock_guard lock(self->_lock);
[self _setupSubscriptionWithReason:[NSString stringWithFormat:@"%@ and scheduled subscription is happening", reason]];
} errorHandler:nil];
} inNanoseconds:0 description:@"MTRDevice setDelegate first subscription"];
} else {
[self _setupSubscriptionWithReason:[NSString stringWithFormat:@"%@ and subscription is needed", reason]];
[_deviceController asyncDispatchToMatterQueue:^{
std::lock_guard lock(self->_lock);
[self _setupSubscriptionWithReason:[NSString stringWithFormat:@"%@ and subscription is needed", reason]];
} errorHandler:nil];
}
}

Expand Down Expand Up @@ -946,6 +951,15 @@ - (void)_callDelegateDeviceCachePrimed
// assume lock is held
- (void)_changeState:(MTRDeviceState)state
{
// We want to avoid situations where something changes our state and then an
// async block that was queued earlier in response to something changes it
// again, to a value that no longer makes sense. To avoid that:
//
// 1) All state changes happen on the Matter queue.
// 2) All state changes happen synchronously with the event that actually
// triggers the state change.
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&self->_lock);
MTRDeviceState lastState = _state;
_state = state;
Expand All @@ -970,6 +984,15 @@ - (void)_changeState:(MTRDeviceState)state

- (void)_changeInternalState:(MTRInternalDeviceState)state
{
// We want to avoid situations where something changes our state and then an
// async block that was queued earlier in response to something changes it
// again, to a value that no longer makes sense. To avoid that:
//
// 1) All state changes happen on the Matter queue.
// 2) All state changes happen synchronously with the event that actually
// triggers the state change.
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&self->_lock);
MTRInternalDeviceState lastState = _internalDeviceState;
_internalDeviceState = state;
Expand Down Expand Up @@ -1053,12 +1076,16 @@ - (void)_handleSubscriptionEstablished

- (void)_handleSubscriptionError:(NSError *)error
{
assertChipStackLockedByCurrentThread();

std::lock_guard lock(_lock);
[self _doHandleSubscriptionError:error];
}

- (void)_doHandleSubscriptionError:(NSError *)error
{
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&_lock);

[self _changeInternalState:MTRInternalDeviceStateUnsubscribed];
Expand Down Expand Up @@ -1174,13 +1201,23 @@ - (void)_scheduleSubscriptionPoolWork:(dispatch_block_t)workBlock inNanoseconds:

- (void)_handleResubscriptionNeededWithDelay:(NSNumber *)resubscriptionDelayMs
{
BOOL deviceUsesThread;
assertChipStackLockedByCurrentThread();

os_unfair_lock_lock(&self->_lock);
std::lock_guard lock(_lock);

// Change our state before going async.
[self _changeState:MTRDeviceStateUnknown];
[self _changeInternalState:MTRInternalDeviceStateResubscribing];

dispatch_async(self.queue, ^{
[self _handleResubscriptionNeededWithDelayOnDeviceQueue:resubscriptionDelayMs];
});
}

- (void)_handleResubscriptionNeededWithDelayOnDeviceQueue:(NSNumber *)resubscriptionDelayMs
{
os_unfair_lock_lock(&self->_lock);

// If we are here, then the ReadClient either just detected a subscription
// drop or just tried again and failed. Either way, count it as "tried and
// failed to subscribe": in the latter case it's actually true, and in the
Expand All @@ -1192,7 +1229,7 @@ - (void)_handleResubscriptionNeededWithDelay:(NSNumber *)resubscriptionDelayMs
_lastSubscriptionFailureTimeForDescription = _lastSubscriptionFailureTime;
}
[self _notifyDelegateOfPrivateInternalPropertiesChanges];
deviceUsesThread = [self _deviceUsesThread];
BOOL deviceUsesThread = [self _deviceUsesThread];

// If a previous resubscription failed, remove the item from the subscription pool.
[self _clearSubscriptionPoolWork];
Expand Down Expand Up @@ -1228,6 +1265,8 @@ - (void)_handleResubscriptionNeededWithDelay:(NSNumber *)resubscriptionDelayMs

- (void)_handleSubscriptionReset:(NSNumber * _Nullable)retryDelay
{
assertChipStackLockedByCurrentThread();

std::lock_guard lock(_lock);
[self _doHandleSubscriptionReset:retryDelay];
}
Expand All @@ -1247,6 +1286,8 @@ - (void)_setLastSubscriptionAttemptWait:(uint32_t)lastSubscriptionAttemptWait

- (void)_doHandleSubscriptionReset:(NSNumber * _Nullable)retryDelay
{
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&_lock);

if (_deviceController.isSuspended) {
Expand Down Expand Up @@ -1309,8 +1350,11 @@ - (void)_doHandleSubscriptionReset:(NSNumber * _Nullable)retryDelay
// Call _reattemptSubscriptionNowIfNeededWithReason when timer fires - if subscription is
// in a better state at that time this will be a no-op.
auto resubscriptionBlock = ^{
std::lock_guard lock(self->_lock);
[self _reattemptSubscriptionNowIfNeededWithReason:@"got subscription reset"];
[self->_deviceController asyncDispatchToMatterQueue:^{
std::lock_guard lock(self->_lock);
[self _reattemptSubscriptionNowIfNeededWithReason:@"got subscription reset"];
}
errorHandler:nil];
};

int64_t resubscriptionDelayNs = static_cast<int64_t>(secondsToWait * NSEC_PER_SEC);
Expand All @@ -1326,6 +1370,8 @@ - (void)_doHandleSubscriptionReset:(NSNumber * _Nullable)retryDelay

- (void)_reattemptSubscriptionNowIfNeededWithReason:(NSString *)reason
{
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&self->_lock);
if (!self.reattemptingSubscription) {
return;
Expand All @@ -1338,6 +1384,8 @@ - (void)_reattemptSubscriptionNowIfNeededWithReason:(NSString *)reason

- (void)_handleUnsolicitedMessageFromPublisher
{
assertChipStackLockedByCurrentThread();

std::lock_guard lock(_lock);

[self _changeState:MTRDeviceStateReachable];
Expand All @@ -1358,18 +1406,23 @@ - (void)_handleUnsolicitedMessageFromPublisher

- (void)_markDeviceAsUnreachableIfNeverSubscribed
{
os_unfair_lock_assert_owner(&self->_lock);
[_deviceController asyncDispatchToMatterQueue:^{
std::lock_guard lock(self->_lock);

if (HadSubscriptionEstablishedOnce(_internalDeviceState)) {
return;
}
if (HadSubscriptionEstablishedOnce(self->_internalDeviceState)) {
return;
}

MTR_LOG("%@ still not subscribed, marking the device as unreachable", self);
[self _changeState:MTRDeviceStateUnreachable];
MTR_LOG("%@ still not subscribed, marking the device as unreachable", self);
[self _changeState:MTRDeviceStateUnreachable];
}
errorHandler:nil];
}

- (void)_handleReportBegin
{
assertChipStackLockedByCurrentThread();

std::lock_guard lock(_lock);

_receivingReport = YES;
Expand Down Expand Up @@ -1807,11 +1860,14 @@ - (void)unitTestInjectEventReport:(NSArray<NSDictionary<NSString *, id> *> *)eve

- (void)unitTestInjectAttributeReport:(NSArray<NSDictionary<NSString *, id> *> *)attributeReport fromSubscription:(BOOL)isFromSubscription
{
dispatch_async(self.queue, ^{
[_deviceController asyncDispatchToMatterQueue:^{
[self _handleReportBegin];
[self _handleAttributeReport:attributeReport fromSubscription:isFromSubscription];
[self _handleReportEnd];
});
dispatch_async(self.queue, ^{
[self _handleAttributeReport:attributeReport fromSubscription:isFromSubscription];
[self _handleReportEnd];
});
}
errorHandler:nil];
}
#endif

Expand Down Expand Up @@ -2242,6 +2298,8 @@ - (void)unitTestResetSubscription
// assume lock is held
- (void)_setupSubscriptionWithReason:(NSString *)reason
{
assertChipStackLockedByCurrentThread();

os_unfair_lock_assert_owner(&self->_lock);

if (![self _subscriptionsAllowed]) {
Expand Down Expand Up @@ -2287,7 +2345,6 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, static_cast<int64_t>(kSecondsToWaitBeforeMarkingUnreachableAfterSettingUpSubscription) * static_cast<int64_t>(NSEC_PER_SEC)), self.queue, ^{
mtr_strongify(self);
if (self != nil) {
std::lock_guard lock(self->_lock);
[self _markDeviceAsUnreachableIfNeverSubscribed];
}
});
Expand All @@ -2305,10 +2362,8 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
NSNumber * _Nullable retryDelay) {
if (error != nil) {
MTR_LOG_ERROR("%@ getSessionForNode error %@", self, error);
dispatch_async(self.queue, ^{
[self _handleSubscriptionError:error];
[self _handleSubscriptionReset:retryDelay];
});
[self _handleSubscriptionError:error];
[self _handleSubscriptionReset:retryDelay];
return;
}

Expand All @@ -2332,17 +2387,13 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
},
^(NSError * error) {
MTR_LOG_ERROR("%@ got subscription error %@", self, error);
dispatch_async(self.queue, ^{
// OnError
[self _handleSubscriptionError:error];
});
// OnError
[self _handleSubscriptionError:error];
},
^(NSError * error, NSNumber * resubscriptionDelayMs) {
MTR_LOG_ERROR("%@ got resubscription error %@ delay %@", self, error, resubscriptionDelayMs);
dispatch_async(self.queue, ^{
// OnResubscriptionNeeded
[self _handleResubscriptionNeededWithDelay:resubscriptionDelayMs];
});
// OnResubscriptionNeeded
[self _handleResubscriptionNeededWithDelay:resubscriptionDelayMs];
},
^(void) {
MTR_LOG("%@ got subscription established", self);
Expand Down Expand Up @@ -2373,23 +2424,17 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
self->_currentReadClient = nullptr;
self->_currentSubscriptionCallback = nullptr;

dispatch_async(self.queue, ^{
// OnDone
[self _handleSubscriptionReset:nil];
});
// OnDone
[self _doHandleSubscriptionReset:nil];
},
^(void) {
MTR_LOG("%@ got unsolicited message from publisher", self);
dispatch_async(self.queue, ^{
// OnUnsolicitedMessageFromPublisher
[self _handleUnsolicitedMessageFromPublisher];
});
// OnUnsolicitedMessageFromPublisher
[self _handleUnsolicitedMessageFromPublisher];
},
^(void) {
MTR_LOG("%@ got report begin", self);
dispatch_async(self.queue, ^{
[self _handleReportBegin];
});
[self _handleReportBegin];
},
^(void) {
MTR_LOG("%@ got report end", self);
Expand Down Expand Up @@ -2459,10 +2504,8 @@ - (void)_setupSubscriptionWithReason:(NSString *)reason
if (err != CHIP_NO_ERROR) {
NSError * error = [MTRError errorForCHIPErrorCode:err logContext:self];
MTR_LOG_ERROR("%@ SendAutoResubscribeRequest error %@", self, error);
dispatch_async(self.queue, ^{
[self _handleSubscriptionError:error];
[self _handleSubscriptionReset:nil];
});
[self _handleSubscriptionError:error];
[self _handleSubscriptionReset:nil];

return;
}
Expand Down

0 comments on commit 3f0f242

Please sign in to comment.