From 405010748dc337c084a7c6a982d7361c4389e26f Mon Sep 17 00:00:00 2001 From: Boris Zbarsky Date: Fri, 18 Feb 2022 13:38:40 -0500 Subject: [PATCH] Stop dropping reads on the floor if more than one happens at once. (#15338) Fixes https://github.com/project-chip/connectedhomeip/issues/15304 The fix for #15304 is the change in the while loop condition in Engine::Run. Before that change, we would compare numReadHandled to the current count of allocated read handlers. But processing a read handler would deallocate it, so we would only end up processing half the read handlers that were outstanding on entry to Run (because after that numReadHandled would be larger than the remaining number allocated). The change in InteractionModelEngine::OnDone and the management of mRunningReadHandler are to handle a slightly more complicated situation I ran into while writing some tests for this code. If we have at least two subscriptions and some number of reads after them pending when Engine::Run is entered, when we would process the first read, it would be deallocated, mCurReadHandlerIdx would get reset to 0, we would then increment it by 1, and end up walking all but the first subscription again. Which means that our numReadHandled would increase to the point where the loop terminates before we had gotten to all the read handlers. If we had N subscriptions we would miss N-1 read handlers. Those could then get stuck in limbo indefinitely, until either a subscription heartbeat had to happen or someone else issued a read. The change in Engine::OnReportConfirm is to handle the case when we have more than CHIP_IM_MAX_REPORTS_IN_FLIGHT subscriptions that all need reporting, fire off the first CHIP_IM_MAX_REPORTS_IN_FLIGHT of them, and then never call ScheduleRun() after that, so all the other subscriptions get stuck. The issue with CHIP_IM_MAX_REPORTS_IN_FLIGHT was not being caught by the existing tests because those tests manually called Run() on the reporting engine (in a loop, in fact). That was needed because we could end up in a situation where DrainAndServiceIO() has processed all the pending messages, but we have a queued task (from ScheduleRun) that is not a message and will not get run, so Engine::Run was not getting called properly via the "normal" codepath in the test. This was fixed by removing all the manual Run() calls and making DrainAndServiceIO() do a better job of handling async things queued by message reception. TestReadAttributeTimeout had to be modified slightly because in the new setup we could no longer rely on DrainAndServiceIO() after we send the reads not triggering the reports and giving us a chance to tear down the session before the reports did get triggered. So instead of first expiring the client-to-server session, we expire the server-to-client one _before_ doing DrainAndServiceIO. This ensures that we never get replies to our reads, and then we can proceed to expire the client-to-server session, which gets treated as a timeout. --- src/app/InteractionModelEngine.cpp | 10 +- src/app/reporting/Engine.cpp | 18 +- src/app/reporting/Engine.h | 26 ++- src/controller/tests/data_model/TestRead.cpp | 228 +++++++++++++------ src/messaging/tests/MessagingContext.h | 29 ++- 5 files changed, 235 insertions(+), 76 deletions(-) diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index 6cf158eca18c7d..68ea7c7e986448 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -210,14 +210,14 @@ void InteractionModelEngine::OnDone(CommandHandler & apCommandObj) void InteractionModelEngine::OnDone(ReadHandler & apReadObj) { - mReadHandlers.ReleaseObject(&apReadObj); - // // Deleting an item can shift down the contents of the underlying pool storage, - // rendering any tracker using positional indexes invalid. Let's reset it and - // have it start from index 0. + // rendering any tracker using positional indexes invalid. Let's reset it, + // based on which readHandler we are getting rid of. // - mReportingEngine.ResetReadHandlerTracker(); + mReportingEngine.ResetReadHandlerTracker(&apReadObj); + + mReadHandlers.ReleaseObject(&apReadObj); } CHIP_ERROR InteractionModelEngine::OnInvokeCommandRequest(Messaging::ExchangeContext * apExchangeContext, diff --git a/src/app/reporting/Engine.cpp b/src/app/reporting/Engine.cpp index 667a5909647dd6..004a5222f458d7 100644 --- a/src/app/reporting/Engine.cpp +++ b/src/app/reporting/Engine.cpp @@ -528,14 +528,19 @@ void Engine::Run() mRunScheduled = false; - while ((mNumReportsInFlight < CHIP_IM_MAX_REPORTS_IN_FLIGHT) && (numReadHandled < imEngine->mReadHandlers.Allocated())) + // We may be deallocating read handlers as we go. Track how many we had + // initially, so we make sure to go through all of them. + size_t initialAllocated = imEngine->mReadHandlers.Allocated(); + while ((mNumReportsInFlight < CHIP_IM_MAX_REPORTS_IN_FLIGHT) && (numReadHandled < initialAllocated)) { ReadHandler * readHandler = imEngine->ActiveHandlerAt(mCurReadHandlerIdx % (uint32_t) imEngine->mReadHandlers.Allocated()); VerifyOrDie(readHandler != nullptr); if (readHandler->IsReportable()) { - CHIP_ERROR err = BuildAndSendSingleReportData(readHandler); + mRunningReadHandler = readHandler; + CHIP_ERROR err = BuildAndSendSingleReportData(readHandler); + mRunningReadHandler = nullptr; if (err != CHIP_NO_ERROR) { return; @@ -543,6 +548,9 @@ void Engine::Run() } numReadHandled++; + // If readHandler removed itself from our list, we also decremented + // mCurReadHandlerIdx to account for that removal, so it's safe to + // increment here. mCurReadHandlerIdx++; } @@ -678,6 +686,12 @@ void Engine::OnReportConfirm() { VerifyOrDie(mNumReportsInFlight > 0); + if (mNumReportsInFlight == CHIP_IM_MAX_REPORTS_IN_FLIGHT) + { + // We could have other things waiting to go now that this report is no + // longer in flight. + ScheduleRun(); + } mNumReportsInFlight--; ChipLogDetail(DataManagement, " OnReportConfirm: NumReports = %" PRIu32, mNumReportsInFlight); } diff --git a/src/app/reporting/Engine.h b/src/app/reporting/Engine.h index 759d6d50dc8f09..ff5bbf42dd2302 100644 --- a/src/app/reporting/Engine.h +++ b/src/app/reporting/Engine.h @@ -98,8 +98,25 @@ class Engine /* * Resets the tracker that tracks the currently serviced read handler. - */ - void ResetReadHandlerTracker() { mCurReadHandlerIdx = 0; } + * apReadHandler can be non-null to indicate that the reset is due to a + * specific ReadHandler being deallocated. + */ + void ResetReadHandlerTracker(ReadHandler * apReadHandlerBeingDeleted) + { + if (apReadHandlerBeingDeleted == mRunningReadHandler) + { + // Just decrement, so our increment after we finish running it will + // do the right thing. + --mCurReadHandlerIdx; + } + else + { + // No idea what to do here to make the indexing sane. Just start at + // the beginning. We need to do better here; see + // https://github.com/project-chip/connectedhomeip/issues/13809 + mCurReadHandlerIdx = 0; + } + } private: friend class TestReportingEngine; @@ -175,6 +192,11 @@ class Engine */ uint32_t mCurReadHandlerIdx = 0; + /** + * The read handler we're calling BuildAndSendSingleReportData on right now. + */ + ReadHandler * mRunningReadHandler = nullptr; + /** * mGlobalDirtySet is used to track the set of attribute/event paths marked dirty for reporting purposes. * diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp index 7799519a34001b..9e325a26a444e0 100644 --- a/src/controller/tests/data_model/TestRead.cpp +++ b/src/controller/tests/data_model/TestRead.cpp @@ -46,6 +46,10 @@ enum ResponseDirective ResponseDirective responseDirective; +// Number of reads of TestCluster::Attributes::Int16u that we have observed. +// Every read will increment this count by 1 and return the new value. +uint16_t totalReadCount = 0; + } // namespace namespace chip { @@ -73,6 +77,16 @@ CHIP_ERROR ReadSingleClusterData(const Access::SubjectDescriptor & aSubjectDescr return CHIP_NO_ERROR; }); } + else if (aPath.mClusterId == app::Clusters::TestCluster::Id && + aPath.mAttributeId == app::Clusters::TestCluster::Attributes::Int16u::Id) + { + AttributeValueEncoder::AttributeEncodeState state = + (apEncoderState == nullptr ? AttributeValueEncoder::AttributeEncodeState() : *apEncoderState); + AttributeValueEncoder valueEncoder(aAttributeReports, aSubjectDescriptor.fabricIndex, aPath, 0 /* data version */, + aIsFabricFiltered, state); + + return valueEncoder.Encode(++totalReadCount); + } else { AttributeReportIB::Builder & attributeReport = aAttributeReports.CreateAttributeReport(); @@ -155,11 +169,20 @@ class TestReadInteraction static void TestReadFabricScopedWithoutFabricFilter(nlTestSuite * apSuite, void * apContext); static void TestReadFabricScopedWithFabricFilter(nlTestSuite * apSuite, void * apContext); static void TestReadHandler_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext); + static void TestReadHandler_MultipleReads(nlTestSuite * apSuite, void * apContext); + static void TestReadHandler_OneSubscribeMultipleReads(nlTestSuite * apSuite, void * apContext); + static void TestReadHandler_TwoSubscribesMultipleReads(nlTestSuite * apSuite, void * apContext); static void TestReadHandler_MultipleSubscriptionsWithDataVersionFilter(nlTestSuite * apSuite, void * apContext); static void TestReadHandlerResourceExhaustion_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext); static void TestReadHandlerResourceExhaustion_MultipleReads(nlTestSuite * apSuite, void * apContext); private: + // Issue the given number of reads in parallel and wait for them all to + // succeed. + static void MultipleReadHelper(nlTestSuite * apSuite, TestContext & aCtx, size_t aReadCount); + // Establish the given number of subscriptions, then issue the given number + // of reads in parallel and wait for them all to succeed. + static void SubscribeThenReadHelper(nlTestSuite * apSuite, TestContext & aCtx, size_t aSubscribeCount, size_t aReadCount); }; void TestReadInteraction::TestReadAttributeResponse(nlTestSuite * apSuite, void * apContext) @@ -198,8 +221,6 @@ void TestReadInteraction::TestReadAttributeResponse(nlTestSuite * apSuite, void kTestEndpointId, onSuccessCb, onFailureCb); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, onSuccessCbInvoked && !onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -243,8 +264,6 @@ void TestReadInteraction::TestReadDataVersionFilter(nlTestSuite * apSuite, void &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, true, dataVersion); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, !onSuccessCbInvoked && !onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -276,8 +295,6 @@ void TestReadInteraction::TestReadEventResponse(nlTestSuite * apSuite, void * ap onSuccessCb, onFailureCb); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, !onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -310,8 +327,6 @@ void TestReadInteraction::TestReadAttributeError(nlTestSuite * apSuite, void * a kTestEndpointId, onSuccessCb, onFailureCb); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, !onSuccessCbInvoked && onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -343,9 +358,11 @@ void TestReadInteraction::TestReadAttributeTimeout(nlTestSuite * apSuite, void * Controller::ReadAttribute(&ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb); + ctx.ExpireSessionAliceToBob(); + ctx.DrainAndServiceIO(); - NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 2); + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 1); ctx.ExpireSessionBobToAlice(); @@ -353,13 +370,7 @@ void TestReadInteraction::TestReadAttributeTimeout(nlTestSuite * apSuite, void * NL_TEST_ASSERT(apSuite, !onSuccessCbInvoked && onFailureCbInvoked); - NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 1); - - ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); - - ctx.ExpireSessionAliceToBob(); + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers() == 0); @@ -370,10 +381,7 @@ void TestReadInteraction::TestReadAttributeTimeout(nlTestSuite * apSuite, void * ctx.CreateSessionAliceToBob(); ctx.CreateSessionBobToAlice(); - // - // TODO: Figure out why I cannot enable this line below. - // - // NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext) @@ -415,16 +423,7 @@ void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * ap onSubscriptionEstablishedCb, false, true) == CHIP_NO_ERROR); } - // - // It may take a couple of service calls since we may hit the limit of CHIP_IM_MAX_REPORTS_IN_FLIGHT - // reports. - // - for (int i = 0; i < 10 && (numSubscriptionEstablishedCalls != (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); i++) - { - ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); - } + ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, numSuccessCalls == (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); NL_TEST_ASSERT(apSuite, numSubscriptionEstablishedCalls == (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); @@ -434,6 +433,135 @@ void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * ap NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } +void TestReadInteraction::TestReadHandler_MultipleReads(nlTestSuite * apSuite, void * apContext) +{ + TestContext & ctx = *static_cast(apContext); + + static_assert(CHIP_IM_MAX_REPORTS_IN_FLIGHT <= CHIP_IM_MAX_NUM_READ_HANDLER, + "How can we have more reports in flight than read handlers?"); + + MultipleReadHelper(apSuite, ctx, CHIP_IM_MAX_REPORTS_IN_FLIGHT); + + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); +} + +void TestReadInteraction::TestReadHandler_OneSubscribeMultipleReads(nlTestSuite * apSuite, void * apContext) +{ + TestContext & ctx = *static_cast(apContext); + + static_assert(CHIP_IM_MAX_REPORTS_IN_FLIGHT <= CHIP_IM_MAX_NUM_READ_HANDLER, + "How can we have more reports in flight than read handlers?"); + static_assert(CHIP_IM_MAX_REPORTS_IN_FLIGHT > 1, "We won't do any reads"); + + SubscribeThenReadHelper(apSuite, ctx, 1, CHIP_IM_MAX_REPORTS_IN_FLIGHT - 1); + + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); +} + +void TestReadInteraction::TestReadHandler_TwoSubscribesMultipleReads(nlTestSuite * apSuite, void * apContext) +{ + TestContext & ctx = *static_cast(apContext); + + static_assert(CHIP_IM_MAX_REPORTS_IN_FLIGHT <= CHIP_IM_MAX_NUM_READ_HANDLER, + "How can we have more reports in flight than read handlers?"); + static_assert(CHIP_IM_MAX_REPORTS_IN_FLIGHT > 2, "We won't do any reads"); + + SubscribeThenReadHelper(apSuite, ctx, 2, CHIP_IM_MAX_REPORTS_IN_FLIGHT - 2); + + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); +} + +void TestReadInteraction::SubscribeThenReadHelper(nlTestSuite * apSuite, TestContext & aCtx, size_t aSubscribeCount, + size_t aReadCount) +{ + auto sessionHandle = aCtx.GetSessionBobToAlice(); + uint32_t numSuccessCalls = 0; + uint32_t numSubscriptionEstablishedCalls = 0; + + responseDirective = kSendDataResponse; + + // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's + // not safe to do so. + auto onSuccessCb = [&numSuccessCalls](const app::ConcreteDataAttributePath & attributePath, const auto & dataResponse) { + numSuccessCalls++; + }; + + // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's + // not safe to do so. + auto onFailureCb = [&apSuite](const app::ConcreteDataAttributePath * attributePath, CHIP_ERROR aError) { + // + // We shouldn't be encountering any failures in this test. + // + NL_TEST_ASSERT(apSuite, false); + }; + + auto onSubscriptionEstablishedCb = [&numSubscriptionEstablishedCalls, &apSuite, &aCtx, aSubscribeCount, aReadCount]() { + numSubscriptionEstablishedCalls++; + if (numSubscriptionEstablishedCalls == aSubscribeCount) + { + MultipleReadHelper(apSuite, aCtx, aReadCount); + } + }; + + for (size_t i = 0; i < aSubscribeCount; ++i) + { + NL_TEST_ASSERT(apSuite, + Controller::SubscribeAttribute( + &aCtx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, 0, 10, + onSubscriptionEstablishedCb, false, true) == CHIP_NO_ERROR); + } + + aCtx.DrainAndServiceIO(); + + NL_TEST_ASSERT(apSuite, numSuccessCalls == aSubscribeCount); + NL_TEST_ASSERT(apSuite, numSubscriptionEstablishedCalls == aSubscribeCount); +} + +void TestReadInteraction::MultipleReadHelper(nlTestSuite * apSuite, TestContext & aCtx, size_t aReadCount) +{ + auto sessionHandle = aCtx.GetSessionBobToAlice(); + uint32_t numSuccessCalls = 0; + uint32_t numFailureCalls = 0; + + responseDirective = kSendDataResponse; + + uint16_t firstExpectedResponse = totalReadCount + 1; + + // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's + // not safe to do so. + auto onFailureCb = [&apSuite, &numFailureCalls](const app::ConcreteDataAttributePath * attributePath, CHIP_ERROR aError) { + numFailureCalls++; + + NL_TEST_ASSERT(apSuite, attributePath == nullptr); + }; + + for (size_t i = 0; i < aReadCount; ++i) + { + // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, + // it's not safe to do so. + auto onSuccessCb = [&numSuccessCalls, &apSuite, firstExpectedResponse, + i](const app::ConcreteDataAttributePath & attributePath, const auto & dataResponse) { + NL_TEST_ASSERT(apSuite, dataResponse == firstExpectedResponse + i); + numSuccessCalls++; + }; + + NL_TEST_ASSERT(apSuite, + Controller::ReadAttribute( + &aCtx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb) == CHIP_NO_ERROR); + } + + aCtx.DrainAndServiceIO(); + + NL_TEST_ASSERT(apSuite, numSuccessCalls == aReadCount); + NL_TEST_ASSERT(apSuite, numFailureCalls == 0); +} + void TestReadInteraction::TestReadHandler_MultipleSubscriptionsWithDataVersionFilter(nlTestSuite * apSuite, void * apContext) { TestContext & ctx = *static_cast(apContext); @@ -476,16 +604,7 @@ void TestReadInteraction::TestReadHandler_MultipleSubscriptionsWithDataVersionFi onSubscriptionEstablishedCb, false, true, dataVersion) == CHIP_NO_ERROR); } - // - // It may take a couple of service calls since we may hit the limit of CHIP_IM_MAX_REPORTS_IN_FLIGHT - // reports. - // - for (int i = 0; i < 10 && (numSubscriptionEstablishedCalls != (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); i++) - { - ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); - } + ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, numSuccessCalls == (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); NL_TEST_ASSERT(apSuite, numSubscriptionEstablishedCalls == (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); @@ -537,16 +656,7 @@ void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleSubscription &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, 0, 10, onSubscriptionEstablishedCb, false, true) == CHIP_NO_ERROR); - // - // It may take a couple of service calls since we may hit the limit of CHIP_IM_MAX_REPORTS_IN_FLIGHT - // reports. - // - for (int i = 0; i < 10; i++) - { - ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); - } + ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, numSuccessCalls == 1); NL_TEST_ASSERT(apSuite, numSubscriptionEstablishedCalls == 1); @@ -589,16 +699,7 @@ void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleReads(nlTest Controller::ReadAttribute( &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb) == CHIP_NO_ERROR); - // - // It may take a couple of service calls since we may hit the limit of CHIP_IM_MAX_REPORTS_IN_FLIGHT - // reports. - // - for (int i = 0; i < 10; i++) - { - ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); - } + ctx.DrainAndServiceIO(); app::InteractionModelEngine::GetInstance()->SetHandlerCapacity(-1); app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); @@ -649,8 +750,6 @@ void TestReadInteraction::TestReadFabricScopedWithoutFabricFilter(nlTestSuite * &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, false /* fabric filtered */); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, onSuccessCbInvoked && !onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -707,8 +806,6 @@ void TestReadInteraction::TestReadFabricScopedWithFabricFilter(nlTestSuite * apS &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, true /* fabric filtered */); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, onSuccessCbInvoked && !onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -727,6 +824,9 @@ const nlTest sTests[] = NL_TEST_DEF("TestReadFabricScopedWithFabricFilter", TestReadInteraction::TestReadFabricScopedWithFabricFilter), NL_TEST_DEF("TestReadHandler_MultipleSubscriptions", TestReadInteraction::TestReadHandler_MultipleSubscriptions), NL_TEST_DEF("TestReadHandler_MultipleSubscriptionsWithDataVersionFilter", TestReadInteraction::TestReadHandler_MultipleSubscriptionsWithDataVersionFilter), + NL_TEST_DEF("TestReadHandler_MultipleReads", TestReadInteraction::TestReadHandler_MultipleReads), + NL_TEST_DEF("TestReadHandler_OneSubscribeMultipleReads", TestReadInteraction::TestReadHandler_OneSubscribeMultipleReads), + NL_TEST_DEF("TestReadHandler_TwoSubscribesMultipleReads", TestReadInteraction::TestReadHandler_TwoSubscribesMultipleReads), NL_TEST_DEF("TestReadHandlerResourceExhaustion_MultipleSubscriptions", TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleSubscriptions), NL_TEST_DEF("TestReadHandlerResourceExhaustion_MultipleReads", TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleReads), NL_TEST_DEF("TestReadAttributeTimeout", TestReadInteraction::TestReadAttributeTimeout), diff --git a/src/messaging/tests/MessagingContext.h b/src/messaging/tests/MessagingContext.h index 2f543db4191359..0b498c5ec0038c 100644 --- a/src/messaging/tests/MessagingContext.h +++ b/src/messaging/tests/MessagingContext.h @@ -259,21 +259,44 @@ class LoopbackMessagingContext : public MessagingContext * Consequently, this is guarded with a user-provided timeout to ensure we don't have unit-tests that stall * in CI due to bugs in the code that is being tested. * - * This DOES NOT ensure that all pending events are serviced to completion (i.e timers, any ScheduleWork calls). + * This DOES NOT ensure that all pending events are serviced to completion + * (i.e timers, any ScheduleWork calls), but does: * + * 1) Guarantee that every call will make some progress on ready-to-run + * things, by calling DriveIO at least once. + * 2) Try to ensure that any ScheduleWork calls that happend directly as a + * result of message reception, and any messages those async tasks send, + * get handled before DrainAndServiceIO returns. */ void DrainAndServiceIO(System::Clock::Timeout maxWait = chip::System::Clock::Seconds16(5)) { auto & impl = GetLoopback(); System::Clock::Timestamp startTime = System::SystemClock().GetMonotonicTimestamp(); - while (impl.HasPendingMessages()) + while (true) { + bool hadPendingMessages = impl.HasPendingMessages(); + while (impl.HasPendingMessages()) + { + mIOContext.DriveIO(); + if ((System::SystemClock().GetMonotonicTimestamp() - startTime) >= maxWait) + { + return; + } + } + // Processing those messages might have queued some run-ASAP async + // work. Make sure to process that too, in case it generates + // response messages. mIOContext.DriveIO(); - if ((System::SystemClock().GetMonotonicTimestamp() - startTime) >= maxWait) + if (!hadPendingMessages && !impl.HasPendingMessages()) { + // We're not making any progress on messages. Just stop. break; } + // No need to check our timer here: either impl.HasPendingMessages() + // is true and we will check it next iteration, or it's false and we + // will either stop on the next iteration or it will become true and + // we will check the timer then. } }