Skip to content

Commit

Permalink
Add onResubscriptionAttemptedCallback (#19196)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunhanw-google authored Jun 8, 2022
1 parent 5e11686 commit dce8007
Show file tree
Hide file tree
Showing 18 changed files with 1,314 additions and 1,124 deletions.
5 changes: 5 additions & 0 deletions src/app/BufferedReadCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ class BufferedReadCallback : public ReadClient::Callback
mCallback.OnSubscriptionEstablished(aSubscriptionId);
}

void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override
{
mCallback.OnResubscriptionAttempt(aTerminationCause, aNextResubscribeIntervalMsec);
}

void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override
{
return mCallback.OnDeallocatePaths(std::move(aReadPrepareParams));
Expand Down
5 changes: 5 additions & 0 deletions src/app/ClusterStateCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,11 @@ class ClusterStateCache : protected ReadClient::Callback
mCallback.OnSubscriptionEstablished(aSubscriptionId);
}

void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override
{
mCallback.OnResubscriptionAttempt(aTerminationCause, aNextResubscribeIntervalMsec);
}

void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override
{
mCallback.OnDeallocatePaths(std::move(aReadPrepareParams));
Expand Down
2 changes: 1 addition & 1 deletion src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ CHIP_ERROR InteractionModelEngine::OnUnsolicitedReportData(Messaging::ExchangeCo

for (auto * readClient = mpActiveReadClientList; readClient != nullptr; readClient = readClient->GetNextClient())
{
if (!readClient->IsSubscriptionIdle())
if (!readClient->IsSubscriptionActive())
{
continue;
}
Expand Down
41 changes: 26 additions & 15 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,31 @@ void ReadClient::Close(CHIP_ERROR aError)

mpExchangeCtx = nullptr;

if (aError != CHIP_NO_ERROR)
if (IsReadType())
{
if (ResubscribeIfNeeded())
if (aError != CHIP_NO_ERROR)
{
ClearActiveSubscriptionState();
return;
mpCallback.OnError(aError);
}
mpCallback.OnError(aError);
}

if (mReadPrepareParams.mResubscribePolicy != nullptr)
else
{
if (aError != CHIP_NO_ERROR)
{
uint32_t nextResubscribeMsec = 0;

if (ResubscribeIfNeeded(nextResubscribeMsec))
{
ChipLogProgress(DataManagement,
"Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32
"ms due to error %" CHIP_ERROR_FORMAT,
mFabricIndex, ChipLogValueX64(mPeerNodeId), mNumRetries, nextResubscribeMsec, aError.Format());
mpCallback.OnResubscriptionAttempt(aError, nextResubscribeMsec);
ClearActiveSubscriptionState();
return;
}
mpCallback.OnError(aError);
}
StopResubscription();
}

Expand Down Expand Up @@ -582,7 +595,7 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload)

if (!suppressResponse)
{
bool noResponseExpected = IsSubscriptionIdle() && !mPendingMoreChunks;
bool noResponseExpected = IsSubscriptionActive() && !mPendingMoreChunks;
err = StatusResponse::Send(err == CHIP_NO_ERROR ? Protocols::InteractionModel::Status::Success
: Protocols::InteractionModel::Status::InvalidSubscription,
mpExchangeCtx, !noResponseExpected);
Expand Down Expand Up @@ -957,10 +970,11 @@ void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void
_this->mNumRetries++;
}

bool ReadClient::ResubscribeIfNeeded()
bool ReadClient::ResubscribeIfNeeded(uint32_t & aNextResubscribeIntervalMsec)
{
bool shouldResubscribe = true;
uint32_t intervalMsec = 0;
bool shouldResubscribe = true;
uint32_t intervalMsec = 0;
aNextResubscribeIntervalMsec = 0;
if (mReadPrepareParams.mResubscribePolicy == nullptr)
{
ChipLogDetail(DataManagement, "mResubscribePolicy is null");
Expand All @@ -980,10 +994,7 @@ bool ReadClient::ResubscribeIfNeeded()
return false;
}

ChipLogProgress(DataManagement,
"Will try to Resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32 "ms",
mFabricIndex, ChipLogValueX64(mPeerNodeId), mNumRetries, intervalMsec);

aNextResubscribeIntervalMsec = intervalMsec;
return true;
}

Expand Down
27 changes: 24 additions & 3 deletions src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class ReadClient : public Messaging::ExchangeDelegate

/**
* OnSubscriptionEstablished will be called when a subscription is established for the given subscription transaction.
* If using auto resubscription, OnSubscriptionEstablished will be called whenever resubscription is established.
*
* This object MUST continue to exist after this call is completed. The application shall wait until it
* receives an OnDone call to destroy the object.
Expand All @@ -126,6 +127,16 @@ class ReadClient : public Messaging::ExchangeDelegate
*/
virtual void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) {}

/**
* OnResubscriptionAttempt will be called when a re-subscription has been scheduled as a result of the termination of an
* in-progress or previously active subscription. This object MUST continue to exist after this call is completed. The
* application shall wait until it receives an OnDone call to destroy the object.
*
* @param[in] aTerminationCause The cause of failure of the subscription that just terminated.
* @param[in] aNextResubscribeIntervalMsec How long we will wait before trying to auto-resubscribe.
*/
virtual void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) {}

/**
* OnError will be called when an error occurs *after* a successful call to SendRequest(). The following
* errors will be delivered through this call in the aError field:
Expand Down Expand Up @@ -276,7 +287,7 @@ class ReadClient : public Messaging::ExchangeDelegate
CHIP_ERROR GetReportingIntervals(uint16_t & aMinIntervalFloorSeconds, uint16_t & aMaxIntervalCeilingSeconds) const
{
VerifyOrReturnError(IsSubscriptionType(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(IsSubscriptionIdle(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(IsSubscriptionActive(), CHIP_ERROR_INCORRECT_STATE);

aMinIntervalFloorSeconds = mMinIntervalFloorSeconds;
aMaxIntervalCeilingSeconds = mMaxIntervalCeilingSeconds;
Expand Down Expand Up @@ -338,7 +349,7 @@ class ReadClient : public Messaging::ExchangeDelegate
*
*/
bool IsIdle() const { return mState == ClientState::Idle; }
bool IsSubscriptionIdle() const { return mState == ClientState::SubscriptionActive; }
bool IsSubscriptionActive() const { return mState == ClientState::SubscriptionActive; }
bool IsAwaitingInitialReport() const { return mState == ClientState::AwaitingInitialReport; }
bool IsAwaitingSubscribeResponse() const { return mState == ClientState::AwaitingSubscribeResponse; }

Expand All @@ -365,7 +376,17 @@ class ReadClient : public Messaging::ExchangeDelegate
CHIP_ERROR ProcessAttributePath(AttributePathIB::Parser & aAttributePath, ConcreteDataAttributePath & aClusterInfo);
CHIP_ERROR ProcessReportData(System::PacketBufferHandle && aPayload);
const char * GetStateStr() const;
bool ResubscribeIfNeeded();

/*
* Checks if we should re-subscribe based on the specified re-subscription policy. If we should, re-subscription is scheduled
* aNextResubscribeIntervalMsec is updated accordingly, and true is returned.
*
* If we should not resubscribe, false is returned.
*
* @param[out] aNextResubscribeIntervalMsec How long we will wait before trying to auto-resubscribe.
*/
bool ResubscribeIfNeeded(uint32_t & aNextResubscribeIntervalMsec);

// Specialized request-sending functions.
CHIP_ERROR SendReadRequest(ReadPrepareParams & aReadPrepareParams);
// SendSubscribeRequest performs som validation on aSubscribePrepareParams
Expand Down
2 changes: 2 additions & 0 deletions src/app/tests/integration/chip_im_initiator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class MockInteractionModelApp : public ::chip::app::CommandSender::Callback,
}
}
}

void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override {}
void OnAttributeData(const chip::app::ConcreteDataAttributePath & aPath, chip::TLV::TLVReader * aData,
const chip::app::StatusIB & status) override
{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ void InteractionModel::OnSubscriptionEstablished(SubscriptionId subscriptionId)
ContinueOnChipMainThread(CHIP_NO_ERROR);
}

void InteractionModel::OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) {}

/////////// WriteClient Callback Interface /////////
void InteractionModel::OnResponse(const WriteClient * client, const ConcreteDataAttributePath & path, StatusIB status)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ class InteractionModel : public InteractionModelReports,
void OnError(CHIP_ERROR error) override;
void OnDone(chip::app::ReadClient * aReadClient) override;
void OnSubscriptionEstablished(chip::SubscriptionId subscriptionId) override;

void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override;
/////////// WriteClient Callback Interface /////////
void OnResponse(const chip::app::WriteClient * client, const chip::app::ConcreteDataAttributePath & path,
chip::app::StatusIB status) override;
Expand Down
33 changes: 27 additions & 6 deletions src/controller/CHIPCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ using ReadResponseSuccessCallback = void (*)(void * context, T responseData)
using ReadResponseFailureCallback = void (*)(void * context, CHIP_ERROR err);
using ReadDoneCallback = void (*)(void * context);
using SubscriptionEstablishedCallback = void (*)(void * context);
using ResubscriptionAttemptCallback = void (*)(void * context, CHIP_ERROR aError, uint32_t aNextResubscribeIntervalMsec);

class DLL_EXPORT ClusterBase
{
Expand Down Expand Up @@ -263,20 +264,23 @@ class DLL_EXPORT ClusterBase
CHIP_ERROR
SubscribeAttribute(void * context, ReadResponseSuccessCallback<typename AttributeInfo::DecodableArgType> reportCb,
ReadResponseFailureCallback failureCb, uint16_t minIntervalFloorSeconds, uint16_t maxIntervalCeilingSeconds,
SubscriptionEstablishedCallback subscriptionEstablishedCb = nullptr, bool aIsFabricFiltered = true,
SubscriptionEstablishedCallback subscriptionEstablishedCb = nullptr,
ResubscriptionAttemptCallback resubscriptionAttemptCb = nullptr, bool aIsFabricFiltered = true,
bool aKeepPreviousSubscriptions = false, const Optional<DataVersion> & aDataVersion = NullOptional)
{
return SubscribeAttribute<typename AttributeInfo::DecodableType, typename AttributeInfo::DecodableArgType>(
context, AttributeInfo::GetClusterId(), AttributeInfo::GetAttributeId(), reportCb, failureCb, minIntervalFloorSeconds,
maxIntervalCeilingSeconds, subscriptionEstablishedCb, aIsFabricFiltered, aKeepPreviousSubscriptions, aDataVersion);
maxIntervalCeilingSeconds, subscriptionEstablishedCb, resubscriptionAttemptCb, aIsFabricFiltered,
aKeepPreviousSubscriptions, aDataVersion);
}

template <typename DecodableType, typename DecodableArgType>
CHIP_ERROR SubscribeAttribute(void * context, ClusterId clusterId, AttributeId attributeId,
ReadResponseSuccessCallback<DecodableArgType> reportCb, ReadResponseFailureCallback failureCb,
uint16_t minIntervalFloorSeconds, uint16_t maxIntervalCeilingSeconds,
SubscriptionEstablishedCallback subscriptionEstablishedCb = nullptr,
bool aIsFabricFiltered = true, bool aKeepPreviousSubscriptions = false,
ResubscriptionAttemptCallback resubscriptionAttemptCb = nullptr, bool aIsFabricFiltered = true,
bool aKeepPreviousSubscriptions = false,
const Optional<DataVersion> & aDataVersion = NullOptional)
{
VerifyOrReturnError(mDevice != nullptr, CHIP_ERROR_INCORRECT_STATE);
Expand All @@ -302,10 +306,18 @@ class DLL_EXPORT ClusterBase
}
};

auto onResubscriptionAttemptCb = [context, resubscriptionAttemptCb](const app::ReadClient & readClient, CHIP_ERROR aError,
uint32_t aNextResubscribeIntervalMsec) {
if (resubscriptionAttemptCb != nullptr)
{
resubscriptionAttemptCb(context, aError, aNextResubscribeIntervalMsec);
}
};

return Controller::SubscribeAttribute<DecodableType>(
mDevice->GetExchangeManager(), mDevice->GetSecureSession().Value(), mEndpoint, clusterId, attributeId, onReportCb,
onFailureCb, minIntervalFloorSeconds, maxIntervalCeilingSeconds, onSubscriptionEstablishedCb, aIsFabricFiltered,
aKeepPreviousSubscriptions, aDataVersion);
onFailureCb, minIntervalFloorSeconds, maxIntervalCeilingSeconds, onSubscriptionEstablishedCb, onResubscriptionAttemptCb,
aIsFabricFiltered, aKeepPreviousSubscriptions, aDataVersion);
}

/**
Expand Down Expand Up @@ -351,6 +363,7 @@ class DLL_EXPORT ClusterBase
ReadResponseFailureCallback failureCb, uint16_t minIntervalFloorSeconds,
uint16_t maxIntervalCeilingSeconds,
SubscriptionEstablishedCallback subscriptionEstablishedCb = nullptr,
ResubscriptionAttemptCallback resubscriptionAttemptCb = nullptr,
bool aKeepPreviousSubscriptions = false, bool aIsUrgentEvent = false)
{
VerifyOrReturnError(mDevice != nullptr, CHIP_ERROR_INCORRECT_STATE);
Expand All @@ -376,10 +389,18 @@ class DLL_EXPORT ClusterBase
}
};

auto onResubscriptionAttemptCb = [context, resubscriptionAttemptCb](const app::ReadClient & readClient, CHIP_ERROR aError,
uint32_t aNextResubscribeIntervalMsec) {
if (resubscriptionAttemptCb != nullptr)
{
resubscriptionAttemptCb(context, aError, aNextResubscribeIntervalMsec);
}
};

return Controller::SubscribeEvent<DecodableType>(mDevice->GetExchangeManager(), mDevice->GetSecureSession().Value(),
mEndpoint, onReportCb, onFailureCb, minIntervalFloorSeconds,
maxIntervalCeilingSeconds, onSubscriptionEstablishedCb,
aKeepPreviousSubscriptions, aIsUrgentEvent);
onResubscriptionAttemptCb, aKeepPreviousSubscriptions, aIsUrgentEvent);
}

protected:
Expand Down
Loading

0 comments on commit dce8007

Please sign in to comment.