Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

urgent event needs to honor min interval for subscription. #21938

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b
if (!aMoreChunks)
{
mPreviousReportsBeginGeneration = mCurrentReportsBeginGeneration;
ClearDirty();
ClearForceDirtyFlag();
InteractionModelEngine::GetInstance()->ReleaseDataVersionFilterList(mpDataVersionFilterList);
}

Expand Down
16 changes: 5 additions & 11 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
enum class ReadHandlerFlags : uint8_t
{
// mHoldReport is used to prevent subscription data delivery while we are
// waiting for the min reporting interval to elapse. If we have to send a
// report immediately due to an urgent event being queued,
// UnblockUrgentEventDelivery can be used to force mHoldReport to false.
// waiting for the min reporting interval to elapse.
HoldReport = (1 << 0),

// mHoldSync is used to prevent subscription empty report delivery while we
Expand All @@ -219,14 +217,15 @@ class ReadHandler : public Messaging::ExchangeDelegate
PrimingReports = (1 << 3),
ActiveSubscription = (1 << 4),
FabricFiltered = (1 << 5),

// For subscriptions, we record the dirty set generation when we started to generate the last report.
// The mCurrentReportsBeginGeneration records the generation at the start of the current report. This only/
// has a meaningful value while IsReporting() is true.
//
// mPreviousReportsBeginGeneration will be set to mCurrentReportsBeginGeneration after we send the last
// chunk of the current report. Anything that was dirty with a generation earlier than
// mPreviousReportsBeginGeneration has had its value sent to the client.
// when receiving initial request, it needs mark current handler as dirty.
// when there is urgent event, it needs mark current handler as dirty.
ForceDirty = (1 << 6),

// Don't need the response for report data if true
Expand Down Expand Up @@ -300,8 +299,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
{
return (mDirtyGeneration > mPreviousReportsBeginGeneration) || mFlags.Has(ReadHandlerFlags::ForceDirty);
}
void ClearDirty() { mFlags.Clear(ReadHandlerFlags::ForceDirty); }

void ClearForceDirtyFlag() { mFlags.Clear(ReadHandlerFlags::ForceDirty); }
NodeId GetInitiatorNodeId() const
{
auto session = GetSession();
Expand All @@ -319,11 +317,7 @@ class ReadHandler : public Messaging::ExchangeDelegate

auto GetTransactionStartGeneration() const { return mTransactionStartGeneration; }

void UnblockUrgentEventDelivery()
{
mFlags.Clear(ReadHandlerFlags::HoldReport);
mFlags.Set(ReadHandlerFlags::ForceDirty);
}
void UnblockUrgentEventDelivery() { mFlags.Set(ReadHandlerFlags::ForceDirty); }

const AttributeValueEncoder::AttributeEncodeState & GetAttributeEncodeState() const { return mAttributeEncoderState; }
void SetAttributeEncodeState(const AttributeValueEncoder::AttributeEncodeState & aState) { mAttributeEncoderState = aState; }
Expand Down
42 changes: 3 additions & 39 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,7 @@ void Engine::Run()

bool allReadClean = true;

imEngine->mReadHandlers.ForEachActiveObject([this, &allReadClean](ReadHandler * handler) {
UpdateReadHandlerDirty(*handler);
mrjerryjohns marked this conversation as resolved.
Show resolved Hide resolved
imEngine->mReadHandlers.ForEachActiveObject([&allReadClean](ReadHandler * handler) {
if (handler->IsDirty())
{
allReadClean = false;
Expand Down Expand Up @@ -850,41 +849,6 @@ CHIP_ERROR Engine::SetDirty(AttributePathParams & aAttributePath)
return CHIP_NO_ERROR;
}

void Engine::UpdateReadHandlerDirty(ReadHandler & aReadHandler)
{
if (!aReadHandler.IsDirty())
{
return;
}

if (!aReadHandler.IsType(ReadHandler::InteractionType::Subscribe))
{
return;
}

bool intersected = false;
for (auto object = aReadHandler.GetAttributePathList(); object != nullptr; object = object->mpNext)
{
mGlobalDirtySet.ForEachActiveObject([&](auto * path) {
if (path->Intersects(object->mValue) && path->mGeneration > aReadHandler.mPreviousReportsBeginGeneration)
{
intersected = true;
return Loop::Break;
}
return Loop::Continue;
});
if (intersected)
{
break;
}
}
if (!intersected)
{
aReadHandler.ClearDirty();
ChipLogDetail(InteractionModel, "clear read handler dirty in UpdateReadHandlerDirty!");
}
}

CHIP_ERROR Engine::SendReport(ReadHandler * apReadHandler, System::PacketBufferHandle && aPayload, bool aHasMoreChunks)
{
CHIP_ERROR err = CHIP_NO_ERROR;
Expand Down Expand Up @@ -974,8 +938,8 @@ CHIP_ERROR Engine::ScheduleEventDelivery(ConcreteEventPath & aPath, uint32_t aBy

if (isUrgentEvent)
{
ChipLogDetail(DataManagement, "urgent event schedule run");
return ScheduleRun();
ChipLogDetail(DataManagement, "urgent event would be sent after min interval");
return CHIP_NO_ERROR;
}

return ScheduleBufferPressureEventDelivery(aBytesWritten);
Expand Down
6 changes: 0 additions & 6 deletions src/app/reporting/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,6 @@ class Engine
bool IsClusterDataVersionMatch(const ObjectList<DataVersionFilter> * aDataVersionFilterList,
const ConcreteReadAttributePath & aPath);

/**
* Check all active subscription, if the subscription has no paths that intersect with global dirty set,
* it would clear dirty flag for that subscription
*
*/
void UpdateReadHandlerDirty(ReadHandler & aReadHandler);
/**
* Send Report via ReadHandler
*
Expand Down
50 changes: 33 additions & 17 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1514,9 +1514,8 @@ void TestReadInteraction::TestSubscribeRoundtrip(nlTestSuite * apSuite, void * a
{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);
readPrepareParams.mpEventPathParamsList[0].mIsUrgentEvent = true;
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();
Expand All @@ -1531,8 +1530,6 @@ void TestReadInteraction::TestSubscribeRoundtrip(nlTestSuite * apSuite, void * a
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty());
chip::app::AttributePathParams dirtyPath1;
dirtyPath1.mClusterId = kTestClusterId;
dirtyPath1.mEndpointId = kTestEndpointId;
Expand Down Expand Up @@ -1562,6 +1559,7 @@ void TestReadInteraction::TestSubscribeRoundtrip(nlTestSuite * apSuite, void * a
// Test report with 2 different path
delegate.mpReadHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false);
delegate.mGotReport = false;
delegate.mGotEventResponse = false;
delegate.mNumAttributeResponse = 0;

printf("HereHere\n");
Expand All @@ -1574,6 +1572,7 @@ void TestReadInteraction::TestSubscribeRoundtrip(nlTestSuite * apSuite, void * a
ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, delegate.mGotEventResponse == true);
yunhanw-google marked this conversation as resolved.
Show resolved Hide resolved
NL_TEST_ASSERT(apSuite, delegate.mNumAttributeResponse == 2);

// Test report with 2 different path, and 1 same path
Expand Down Expand Up @@ -1700,6 +1699,8 @@ void TestReadInteraction::TestSubscribeUrgentWildcardEvent(nlTestSuite * apSuite

ctx.DrainAndServiceIO();

System::Clock::Timestamp startTime = System::SystemClock().GetMonotonicTimestamp();

NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers() == 1);
NL_TEST_ASSERT(apSuite, engine->ActiveHandlerAt(0) != nullptr);
delegate.mpReadHandler = engine->ActiveHandlerAt(0);
Expand All @@ -1710,12 +1711,36 @@ void TestReadInteraction::TestSubscribeUrgentWildcardEvent(nlTestSuite * apSuite
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty() == true);
delegate.mGotEventResponse = false;
delegate.mGotReport = false;
ctx.DrainAndServiceIO();
NL_TEST_ASSERT(apSuite, delegate.mGotEventResponse);

// wait for min interval 2 seconds(in test, we use 1.9second considering the time variation), expect no event is received,
// then wait for 0.5 seconds, then the urgent event would be sent out
while (true)
mrjerryjohns marked this conversation as resolved.
Show resolved Hide resolved
{
ctx.GetIOContext().DriveIO(); // at least one IO loop is guaranteed

if ((System::SystemClock().GetMonotonicTimestamp() - startTime) >= System::Clock::Milliseconds32(1900))
{
break;
}
}

NL_TEST_ASSERT(apSuite, delegate.mGotEventResponse != true);

startTime = System::SystemClock().GetMonotonicTimestamp();
while (true)
{
ctx.GetIOContext().DriveIO(); // at least one IO loop is guaranteed

if ((System::SystemClock().GetMonotonicTimestamp() - startTime) >= System::Clock::Milliseconds32(500))
{
break;
}
}
NL_TEST_ASSERT(apSuite, delegate.mGotEventResponse == true);
}

// By now we should have closed all exchanges and sent all pending acks, so
Expand Down Expand Up @@ -2224,7 +2249,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripStatusReportTimeout(nlTestSu
{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);
readPrepareParams.mpEventPathParamsList[0].mIsUrgentEvent = true;
printf("\nSend first subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
Expand All @@ -2242,8 +2266,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripStatusReportTimeout(nlTestSu
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty());
chip::app::AttributePathParams dirtyPath1;
dirtyPath1.mClusterId = kTestClusterId;
dirtyPath1.mEndpointId = kTestEndpointId;
Expand Down Expand Up @@ -2544,7 +2566,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripChunkStatusReportTimeout(nlT
{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);
readPrepareParams.mpEventPathParamsList[0].mIsUrgentEvent = true;
printf("\nSend first subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
Expand All @@ -2561,8 +2582,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripChunkStatusReportTimeout(nlT
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty());
chip::app::AttributePathParams dirtyPath1;
dirtyPath1.mClusterId = Test::MockClusterId(2);
dirtyPath1.mEndpointId = Test::kMockEndpoint3;
Expand Down Expand Up @@ -2641,7 +2660,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripChunkReportTimeout(nlTestSui
{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);
readPrepareParams.mpEventPathParamsList[0].mIsUrgentEvent = true;
printf("\nSend first subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);
delegate.mGotReport = false;
err = readClient.SendRequest(readPrepareParams);
Expand All @@ -2658,8 +2676,6 @@ void TestReadInteraction::TestPostSubscribeRoundtripChunkReportTimeout(nlTestSui
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);

GenerateEvents(apSuite, apContext);
NL_TEST_ASSERT(apSuite, !delegate.mpReadHandler->mFlags.Has(ReadHandler::ReadHandlerFlags::HoldReport));
NL_TEST_ASSERT(apSuite, delegate.mpReadHandler->IsDirty());
chip::app::AttributePathParams dirtyPath1;
dirtyPath1.mClusterId = Test::MockClusterId(2);
dirtyPath1.mEndpointId = Test::kMockEndpoint3;
Expand Down