Skip to content

Commit

Permalink
[ReadHandler] Removed Scheduling of report from OnReadHandlerCreated (#…
Browse files Browse the repository at this point in the history
…28536)

* Removed Scheduling of report from OnReadHandlerCreated since it caused immediate scheduling before the intervals are negotiated

* Separated Read reports from Subscription reports and renamed flags and accessors for clarity

* Apply suggestions from code review

Co-authored-by: Boris Zbarsky <[email protected]>

* Put back rescheduling in the OnReadHandlerSubscribed, added a CanEmitReadReport() method for readhandler for read request. Fixed call order in TestReportScheduler tests

* Removed redundancy by removing un-necessary OnSubscriptionAction(), added comment for OnReadHandlerSubscribed and modified reporting condition for Read reports to be sent independently from the report scheduler

* Apply suggestions from code review

Co-authored-by: Boris Zbarsky <[email protected]>

* Appplied change to method name and fixe condition in SetStateFlag

* Update src/app/ReadHandler.h

Co-authored-by: Boris Zbarsky <[email protected]>

* Removed un-necessary check in SetStateFlag

---------

Co-authored-by: Boris Zbarsky <[email protected]>
  • Loading branch information
2 people authored and pull[bot] committed Sep 12, 2023
1 parent 774c102 commit 2567157
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 128 deletions.
42 changes: 23 additions & 19 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon

VerifyOrDie(observer != nullptr);
mObserver = observer;
mObserver->OnReadHandlerCreated(this);
}

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
Expand All @@ -92,7 +91,6 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :

VerifyOrDie(observer != nullptr);
mObserver = observer;
mObserver->OnReadHandlerCreated(this);
}

void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
Expand Down Expand Up @@ -235,6 +233,7 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange
{
appCallback->OnSubscriptionEstablished(*this);
}
mObserver->OnSubscriptionEstablished(this);
}
}
else
Expand All @@ -246,10 +245,10 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange
return CHIP_NO_ERROR;
}

MoveToState(HandlerState::GeneratingReports);
MoveToState(HandlerState::CanStartReporting);
break;

case HandlerState::GeneratingReports:
case HandlerState::CanStartReporting:
case HandlerState::Idle:
default:
err = CHIP_ERROR_INCORRECT_STATE;
Expand All @@ -262,7 +261,7 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange

CHIP_ERROR ReadHandler::SendStatusReport(Protocols::InteractionModel::Status aStatus)
{
VerifyOrReturnLogError(mState == HandlerState::GeneratingReports, CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnLogError(mState == HandlerState::CanStartReporting, CHIP_ERROR_INCORRECT_STATE);
if (IsPriming() || IsChunkedReport())
{
mSessionHandle.Grab(mExchangeCtx->GetSessionHandle());
Expand All @@ -286,7 +285,7 @@ CHIP_ERROR ReadHandler::SendStatusReport(Protocols::InteractionModel::Status aSt

CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, bool aMoreChunks)
{
VerifyOrReturnLogError(mState == HandlerState::GeneratingReports, CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnLogError(mState == HandlerState::CanStartReporting, CHIP_ERROR_INCORRECT_STATE);
VerifyOrDie(!IsAwaitingReportResponse()); // Should not be reportable!
if (IsPriming() || IsChunkedReport())
{
Expand Down Expand Up @@ -335,7 +334,7 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b
// Priming reports are handled when we send a SubscribeResponse.
if (IsType(InteractionType::Subscribe) && !IsPriming() && !IsChunkedReport())
{
mObserver->OnSubscriptionAction(this);
mObserver->OnSubscriptionReportSent(this);
}
}
if (!aMoreChunks)
Expand Down Expand Up @@ -456,7 +455,7 @@ CHIP_ERROR ReadHandler::ProcessReadRequest(System::PacketBufferHandle && aPayloa
ReturnErrorOnFailure(readRequestParser.GetIsFabricFiltered(&isFabricFiltered));
SetStateFlag(ReadHandlerFlags::FabricFiltered, isFabricFiltered);
ReturnErrorOnFailure(readRequestParser.ExitContainer());
MoveToState(HandlerState::GeneratingReports);
MoveToState(HandlerState::CanStartReporting);

mExchangeCtx->WillSendMessage();

Expand Down Expand Up @@ -574,8 +573,8 @@ const char * ReadHandler::GetStateStr() const
return "Idle";
case HandlerState::AwaitingDestruction:
return "AwaitingDestruction";
case HandlerState::GeneratingReports:
return "GeneratingReports";
case HandlerState::CanStartReporting:
return "CanStartReporting";

case HandlerState::AwaitingReportResponse:
return "AwaitingReportResponse";
Expand Down Expand Up @@ -603,10 +602,17 @@ void ReadHandler::MoveToState(const HandlerState aTargetState)
// If we just unblocked sending reports, let's go ahead and schedule the reporting
// engine to run to kick that off.
//
if (aTargetState == HandlerState::GeneratingReports)
if (aTargetState == HandlerState::CanStartReporting)
{
// mObserver will take care of scheduling the report as soon as allowed
mObserver->OnBecameReportable(this);
if (ShouldReportUnscheduled())
{
InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
}
else
{
// If we became reportable, the scheduler will schedule a run as soon as allowed
mObserver->OnBecameReportable(this);
}
}
}

Expand Down Expand Up @@ -647,8 +653,6 @@ CHIP_ERROR ReadHandler::SendSubscribeResponse()
ReturnErrorOnFailure(writer.Finalize(&packet));
VerifyOrReturnLogError(mExchangeCtx, CHIP_ERROR_INCORRECT_STATE);

mObserver->OnSubscriptionAction(this);

ClearStateFlag(ReadHandlerFlags::PrimingReports);
return mExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::SubscribeResponse, std::move(packet));
}
Expand Down Expand Up @@ -785,7 +789,7 @@ CHIP_ERROR ReadHandler::ProcessSubscribeRequest(System::PacketBufferHandle && aP
SetStateFlag(ReadHandlerFlags::FabricFiltered, isFabricFiltered);
ReturnErrorOnFailure(Crypto::DRBG_get_bytes(reinterpret_cast<uint8_t *>(&mSubscriptionId), sizeof(mSubscriptionId)));
ReturnErrorOnFailure(subscribeRequestParser.ExitContainer());
MoveToState(HandlerState::GeneratingReports);
MoveToState(HandlerState::CanStartReporting);

mExchangeCtx->WillSendMessage();

Expand Down Expand Up @@ -872,11 +876,11 @@ void ReadHandler::ForceDirtyState()

void ReadHandler::SetStateFlag(ReadHandlerFlags aFlag, bool aValue)
{
bool oldReportable = IsReportable();
bool oldReportable = ShouldStartReporting();
mFlags.Set(aFlag, aValue);

// If we became reportable, schedule a reporting run.
if (!oldReportable && IsReportable())
if (!oldReportable && ShouldStartReporting())
{
// If we became reportable, the scheduler will schedule a run as soon as allowed
mObserver->OnBecameReportable(this);
Expand All @@ -895,7 +899,7 @@ void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManag

_this->mSessionHandle.Grab(sessionHandle);

_this->MoveToState(HandlerState::GeneratingReports);
_this->MoveToState(HandlerState::CanStartReporting);

ObjectList<AttributePathParams> * attributePath = _this->mpAttributePathList;
while (attributePath)
Expand Down
44 changes: 28 additions & 16 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,20 +166,23 @@ class ReadHandler : public Messaging::ExchangeDelegate
public:
virtual ~Observer() = default;

/// @brief Callback invoked to notify a ReadHandler was created and can be registered
/// @param[in] apReadHandler ReadHandler getting added
virtual void OnReadHandlerCreated(ReadHandler * apReadHandler) = 0;

/// @brief Callback invoked when a ReadHandler went from a non reportable state to a reportable state so a report can be
/// sent immediately if the minimal interval allows it. Otherwise the report should be rescheduled to the earliest time
/// allowed.
/// @param[in] apReadHandler ReadHandler that became dirty
/// @brief Callback invoked to notify a subscription was successfully established for the ReadHandler
/// @param[in] apReadHandler ReadHandler that completed its subscription
virtual void OnSubscriptionEstablished(ReadHandler * apReadHandler) = 0;

/// @brief Callback invoked when a ReadHandler went from a non reportable state to a reportable state. Indicates to the
/// observer that a report should be emitted when the min interval allows it.
///
/// This will only be invoked for subscribe-type ReadHandler objects, and only after
/// OnSubscriptionEstablished has been called.
///
/// @param[in] apReadHandler ReadHandler that became dirty and in HandlerState::CanStartReporting state
virtual void OnBecameReportable(ReadHandler * apReadHandler) = 0;

/// @brief Callback invoked when the read handler needs to make sure to send a message to the subscriber within the next
/// maxInterval time period.
/// @param[in] apReadHandler ReadHandler that has generated a report
virtual void OnSubscriptionAction(ReadHandler * apReadHandler) = 0;
virtual void OnSubscriptionReportSent(ReadHandler * apReadHandler) = 0;

/// @brief Callback invoked when a ReadHandler is getting removed so it can be unregistered
/// @param[in] apReadHandler ReadHandler getting destroyed
Expand Down Expand Up @@ -333,13 +336,22 @@ class ReadHandler : public Messaging::ExchangeDelegate
bool IsIdle() const { return mState == HandlerState::Idle; }

/// @brief Returns whether the ReadHandler is in a state where it can send a report and there is data to report.
bool IsReportable() const
bool ShouldStartReporting() const
{
// Important: Anything that changes the state IsReportable must call mObserver->OnBecameReportable(this) for the scheduler
// to plan the next run accordingly.
return mState == HandlerState::GeneratingReports && IsDirty();
// Important: Anything that changes ShouldStartReporting() from false to true
// (which can only happen for subscriptions) must call
// mObserver->OnBecameReportable(this).
return CanStartReporting() && (ShouldReportUnscheduled() || IsDirty());
}
/// @brief CanStartReporting() is true if the ReadHandler is in a state where it could generate
/// a (possibly empty) report if someone asked it to.
bool CanStartReporting() const { return mState == HandlerState::CanStartReporting; }
/// @brief ShouldReportUnscheduled() is true if the ReadHandler should be asked to generate reports
/// without consulting the report scheduler.
bool ShouldReportUnscheduled() const
{
return CanStartReporting() && (IsType(ReadHandler::InteractionType::Read) || IsPriming());
}
bool IsGeneratingReports() const { return mState == HandlerState::GeneratingReports; }
bool IsAwaitingReportResponse() const { return mState == HandlerState::AwaitingReportResponse; }

// Resets the path iterator to the beginning of the whole report for generating a series of new reports.
Expand Down Expand Up @@ -418,14 +430,14 @@ class ReadHandler : public Messaging::ExchangeDelegate
friend class chip::app::reporting::Engine;
friend class chip::app::InteractionModelEngine;

// The report scheduler needs to be able to access StateFlag private functions IsReportable(), IsGeneratingReports(),
// The report scheduler needs to be able to access StateFlag private functions ShouldStartReporting(), CanStartReporting(),
// ForceDirtyState() and IsDirty() to know when to schedule a run so it is declared as a friend class.
friend class chip::app::reporting::ReportScheduler;

enum class HandlerState : uint8_t
{
Idle, ///< The handler has been initialized and is ready
GeneratingReports, ///< The handler has is now capable of generating reports and may generate one immediately
CanStartReporting, ///< The handler has is now capable of generating reports and may generate one immediately
///< or later when other criteria are satisfied (e.g hold-off for min reporting interval).
AwaitingReportResponse, ///< The handler has sent the report to the client and is awaiting a status response.
AwaitingDestruction, ///< The object has completed its work and is awaiting destruction by the application.
Expand Down
4 changes: 2 additions & 2 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ void Engine::Run()
ReadHandler * readHandler = imEngine->ActiveHandlerAt(mCurReadHandlerIdx % (uint32_t) imEngine->mReadHandlers.Allocated());
VerifyOrDie(readHandler != nullptr);

if (imEngine->GetReportScheduler()->IsReportableNow(readHandler))
if (readHandler->ShouldReportUnscheduled() || imEngine->GetReportScheduler()->IsReportableNow(readHandler))
{
mRunningReadHandler = readHandler;
CHIP_ERROR err = BuildAndSendSingleReportData(readHandler);
Expand Down Expand Up @@ -831,7 +831,7 @@ CHIP_ERROR Engine::SetDirty(AttributePathParams & aAttributePath)
// We call AttributePathIsDirty for both read interactions and subscribe interactions, since we may send inconsistent
// attribute data between two chunks. AttributePathIsDirty will not schedule a new run for read handlers which are
// waiting for a response to the last message chunk for read interactions.
if (handler->IsGeneratingReports() || handler->IsAwaitingReportResponse())
if (handler->CanStartReporting() || handler->IsAwaitingReportResponse())
{
for (auto object = handler->GetAttributePathList(); object != nullptr; object = object->mpNext)
{
Expand Down
10 changes: 7 additions & 3 deletions src/app/reporting/ReportScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ReportScheduler : public ReadHandler::Observer, public ICDStateObserver
{
Timestamp now = mTimerDelegate->GetCurrentMonotonicTimestamp();

return (mReadHandler->IsGeneratingReports() &&
return (mReadHandler->CanStartReporting() &&
(now >= mMinTimestamp && (mReadHandler->IsDirty() || now >= mMaxTimestamp || now >= mSyncTimestamp)));
}

Expand Down Expand Up @@ -139,9 +139,13 @@ class ReportScheduler : public ReadHandler::Observer, public ICDStateObserver

/// @brief Check whether a ReadHandler is reportable right now, taking into account its minimum and maximum intervals.
/// @param aReadHandler read handler to check
bool IsReportableNow(ReadHandler * aReadHandler) { return FindReadHandlerNode(aReadHandler)->IsReportableNow(); }
bool IsReportableNow(ReadHandler * aReadHandler)
{
ReadHandlerNode * node = FindReadHandlerNode(aReadHandler);
return (nullptr != node) ? node->IsReportableNow() : false;
}
/// @brief Check if a ReadHandler is reportable without considering the timing
bool IsReadHandlerReportable(ReadHandler * aReadHandler) const { return aReadHandler->IsReportable(); }
bool IsReadHandlerReportable(ReadHandler * aReadHandler) const { return aReadHandler->ShouldStartReporting(); }
/// @brief Sets the ForceDirty flag of a ReadHandler
void HandlerForceDirtyState(ReadHandler * aReadHandler) { aReadHandler->ForceDirtyState(); }

Expand Down
13 changes: 5 additions & 8 deletions src/app/reporting/ReportSchedulerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ void ReportSchedulerImpl::OnEnterActiveMode()
#endif
}

/// @brief When a ReadHandler is added, register it, which will schedule an engine run
void ReportSchedulerImpl::OnReadHandlerCreated(ReadHandler * aReadHandler)
/// @brief When a ReadHandler is added, register it in the scheduler node pool. Scheduling the report here is un-necessary since the
/// ReadHandler will call MoveToState(HandlerState::CanStartReporting);, which will call OnBecameReportable() and schedule the
/// report.
void ReportSchedulerImpl::OnSubscriptionEstablished(ReadHandler * aReadHandler)
{
ReadHandlerNode * newNode = FindReadHandlerNode(aReadHandler);
// Handler must not be registered yet; it's just being constructed.
Expand All @@ -68,11 +70,6 @@ void ReportSchedulerImpl::OnReadHandlerCreated(ReadHandler * aReadHandler)
"Registered a ReadHandler that will schedule a report between system Timestamp: %" PRIu64
" and system Timestamp %" PRIu64 ".",
newNode->GetMinTimestamp().count(), newNode->GetMaxTimestamp().count());

Milliseconds32 newTimeout;
// No need to check for error here, since the node is already in the list otherwise we would have Died
CalculateNextReportTimeout(newTimeout, newNode);
ScheduleReport(newTimeout, newNode);
}

/// @brief When a ReadHandler becomes reportable, schedule, recalculate and reschedule the report.
Expand All @@ -85,7 +82,7 @@ void ReportSchedulerImpl::OnBecameReportable(ReadHandler * aReadHandler)
ScheduleReport(newTimeout, node);
}

void ReportSchedulerImpl::OnSubscriptionAction(ReadHandler * aReadHandler)
void ReportSchedulerImpl::OnSubscriptionReportSent(ReadHandler * aReadHandler)
{
ReadHandlerNode * node = FindReadHandlerNode(aReadHandler);
VerifyOrReturn(nullptr != node);
Expand Down
4 changes: 2 additions & 2 deletions src/app/reporting/ReportSchedulerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class ReportSchedulerImpl : public ReportScheduler
void OnEnterActiveMode() override;

// ReadHandlerObserver
void OnReadHandlerCreated(ReadHandler * aReadHandler) final;
void OnSubscriptionEstablished(ReadHandler * aReadHandler) final;
void OnBecameReportable(ReadHandler * aReadHandler) final;
void OnSubscriptionAction(ReadHandler * aReadHandler) final;
void OnSubscriptionReportSent(ReadHandler * aReadHandler) final;
void OnReadHandlerDestroyed(ReadHandler * aReadHandler) override;

bool IsReportScheduled(ReadHandler * aReadHandler);
Expand Down
8 changes: 4 additions & 4 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2234,7 +2234,7 @@ void TestReadInteraction::TestSubscribeUrgentWildcardEvent(nlTestSuite * apSuite
reportScheduler->GetMinTimestampForHandler(delegate.mpReadHandler) <
System::SystemClock().GetMonotonicTimestamp());
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->IsDirty());
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->IsReportable());
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->ShouldStartReporting());

// And the non-urgent one should not have changed state either, since
// it's waiting for the max-interval.
Expand All @@ -2245,7 +2245,7 @@ void TestReadInteraction::TestSubscribeUrgentWildcardEvent(nlTestSuite * apSuite
reportScheduler->GetMaxTimestampForHandler(nonUrgentDelegate.mpReadHandler) >
System::SystemClock().GetMonotonicTimestamp());
NL_TEST_ASSERT(apSuite, !nonUrgentDelegate.mpReadHandler->IsDirty());
NL_TEST_ASSERT(apSuite, !nonUrgentDelegate.mpReadHandler->IsReportable());
NL_TEST_ASSERT(apSuite, !nonUrgentDelegate.mpReadHandler->ShouldStartReporting());

// There should be no reporting run scheduled. This is very important;
// otherwise we can get a false-positive pass below because the run was
Expand All @@ -2257,12 +2257,12 @@ void TestReadInteraction::TestSubscribeUrgentWildcardEvent(nlTestSuite * apSuite

// Urgent read handler should now be dirty, and reportable.
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty());
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsReportable());
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->ShouldStartReporting());
NL_TEST_ASSERT(apSuite, reportScheduler->IsReadHandlerReportable(delegate.mpReadHandler));

// Non-urgent read handler should not be reportable.
NL_TEST_ASSERT(apSuite, !nonUrgentDelegate.mpReadHandler->IsDirty());
NL_TEST_ASSERT(apSuite, !nonUrgentDelegate.mpReadHandler->IsReportable());
NL_TEST_ASSERT(apSuite, !nonUrgentDelegate.mpReadHandler->ShouldStartReporting());

// Still no reporting should have happened.
NL_TEST_ASSERT(apSuite, !delegate.mGotEventResponse);
Expand Down
Loading

0 comments on commit 2567157

Please sign in to comment.