Skip to content

Commit

Permalink
Make WriteHandler an ExchangeDelegate (#14821)
Browse files Browse the repository at this point in the history
* Make WriteHandler an ExchangeDelegate

This is a meant to setup the WriteHandler's messaging processing
pathways correctly to permit chunking to get implemented correctly
(which will utilize this feature).

* Removed the `EnableResponseGeneration` method that I added to the
WriteHandler since that required enabling CONFIG_IM_BUILD_FOR_UNIT_TEST
when building the src/app/tests/*. This caused issues when building
these tests on EFR32 platforms which didn't have that defined.

So rather than disable those tests outright for those platforms, or do
some other hackery, I decided to just enable async dispatch for just the
test I was modifying to make it more representative of real-world
scenarios and consequently, avoid the special logic.

* Added some comments

* Fix comment typo.

Co-authored-by: Michael Sandstedt <[email protected]>

Co-authored-by: Boris Zbarsky <[email protected]>
Co-authored-by: Michael Sandstedt <[email protected]>
  • Loading branch information
3 people authored and pull[bot] committed Dec 20, 2023
1 parent e24fde7 commit 9df12c2
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void InteractionModelEngine::Shutdown()

for (auto & writeHandler : mWriteHandlers)
{
VerifyOrDie(writeHandler.IsFree());
writeHandler.Abort();
}

mReportingEngine.Shutdown();
Expand Down
81 changes: 78 additions & 3 deletions src/app/WriteHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* limitations under the License.
*/

#include "messaging/ExchangeContext.h"
#include <app/AppBuildConfig.h>
#include <app/InteractionModelEngine.h>
#include <app/MessageDef/EventPathIB.h>
Expand Down Expand Up @@ -48,19 +49,66 @@ CHIP_ERROR WriteHandler::Init()
return CHIP_NO_ERROR;
}

void WriteHandler::Shutdown()
void WriteHandler::Close()
{
VerifyOrReturn(mState != State::Uninitialized);

mMessageWriter.Reset();
mpExchangeCtx = nullptr;

if (mpExchangeCtx != nullptr)
{
mpExchangeCtx->SetDelegate(nullptr);
mpExchangeCtx = nullptr;
}

ClearState();
}

void WriteHandler::Abort()
{
#if 0
// TODO: When chunking gets added, we should add this back.
//
// If the exchange context hasn't already been gracefully closed
// (signaled by setting it to null), then we need to forcibly
// tear it down.
//
if (mpExchangeCtx != nullptr)
{
// We might be a delegate for this exchange, and we don't want the
// OnExchangeClosing notification in that case. Null out the delegate
// to avoid that.
//
// TODO: This makes all sorts of assumptions about what the delegate is
// (notice the "might" above!) that might not hold in practice. We
// really need a better solution here....
mpExchangeCtx->SetDelegate(nullptr);
mpExchangeCtx->Abort();
mpExchangeCtx = nullptr;
}

ClearState();
#else
//
// The WriteHandler should get synchronously allocated and destroyed in the same execution
// context given that it's just a 2 message exchange (request + response). Consequently, we should
// never arrive at a situation where we have active handlers at any time Abort() is called.
//
VerifyOrDie(mState == State::Uninitialized);
#endif
}

Status WriteHandler::OnWriteRequest(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload,
bool aIsTimedWrite)
{
mpExchangeCtx = apExchangeContext;

//
// Let's take over further message processing on this exchange from the IM.
// This is only relevant during chunked requests.
//
mpExchangeCtx->SetDelegate(this);

Status status = ProcessWriteRequest(std::move(aPayload), aIsTimedWrite);

// Do not send response on Group Write
Expand All @@ -73,10 +121,37 @@ Status WriteHandler::OnWriteRequest(Messaging::ExchangeContext * apExchangeConte
}
}

Shutdown();
Close();
return status;
}

CHIP_ERROR WriteHandler::OnMessageReceived(Messaging::ExchangeContext * apExchangeContext, const PayloadHeader & aPayloadHeader,
System::PacketBufferHandle && aPayload)
{
//
// As part of write handling, the exchange should get closed sychronously given there is always
// just a single response to a Write Request message before the exchange gets closed. There-after,
// even if we get any more messages on that exchange from a non-compliant client, our exchange layer
// should correctly discard those. If there is a bug there, this function here may get invoked.
//
// NOTE: Once chunking gets implemented, this will no longer be true.
//
VerifyOrDieWithMsg(false, DataManagement, "This function should never get invoked");
}

void WriteHandler::OnResponseTimeout(Messaging::ExchangeContext * apExchangeContext)
{
//
// As part of write handling, the exchange should get closed sychronously given there is always
// just a single response to a Write Request message before the exchange gets closed. That response
// does not solicit any further responses back. Consequently, we never expect to get notified
// of any response timeouts.
//
// NOTE: Once chunking gets implemented, this will no longer be true.
//
VerifyOrDieWithMsg(false, DataManagement, "This function should never get invoked");
}

CHIP_ERROR WriteHandler::FinalizeMessage(System::PacketBufferHandle & packet)
{
VerifyOrReturnError(mState == State::AddStatus, CHIP_ERROR_INCORRECT_STATE);
Expand Down
20 changes: 16 additions & 4 deletions src/app/WriteHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ namespace app {
/**
* @brief The write handler is responsible for processing a write request and sending a write reply.
*/
class WriteHandler
class WriteHandler : public Messaging::ExchangeDelegate
{
public:
/**
* Initialize the WriteHandler. Within the lifetime
* of this instance, this method is invoked once after object
* construction until a call to Shutdown is made to terminate the
* construction until a call to Close is made to terminate the
* instance.
*
* @retval #CHIP_ERROR_INCORRECT_STATE If the state is not equal to
Expand All @@ -53,7 +53,7 @@ class WriteHandler

/**
* Process a write request. Parts of the processing may end up being asynchronous, but the WriteHandler
* guarantees that it will call Shutdown on itself when processing is done (including if OnWriteRequest
* guarantees that it will call Close on itself when processing is done (including if OnWriteRequest
* returns an error).
*
* @param[in] apExchangeContext A pointer to the ExchangeContext.
Expand All @@ -66,6 +66,12 @@ class WriteHandler
Protocols::InteractionModel::Status OnWriteRequest(Messaging::ExchangeContext * apExchangeContext,
System::PacketBufferHandle && aPayload, bool aIsTimedWrite);

/*
* This forcibly closes the exchange context if a valid one is pointed to and de-initializes the object. Such a situation does
* not arise during normal message processing flows that all normally call Close() below.
*/
void Abort();

bool IsFree() const { return mState == State::Uninitialized; }

virtual ~WriteHandler() = default;
Expand Down Expand Up @@ -111,8 +117,14 @@ class WriteHandler
/**
* Clean up state when we are done sending the write response.
*/
void Shutdown();
void Close();

private: // ExchangeDelegate
CHIP_ERROR OnMessageReceived(Messaging::ExchangeContext * apExchangeContext, const PayloadHeader & aPayloadHeader,
System::PacketBufferHandle && aPayload) override;
void OnResponseTimeout(Messaging::ExchangeContext * apExchangeContext) override;

private:
Messaging::ExchangeContext * mpExchangeCtx = nullptr;
WriteResponseMessage::Builder mWriteResponseBuilder;
System::PacketBufferTLVWriter mMessageWriter;
Expand Down
40 changes: 32 additions & 8 deletions src/app/tests/TestWriteInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,22 @@ void TestWriteInteraction::TestWriteHandler(nlTestSuite * apSuite, void * apCont

TestContext & ctx = *static_cast<TestContext *>(apContext);

//
// We have to enable async dispatch here to ensure that the exchange
// gets correctly closed out in the test below. Otherwise, the following happens:
//
// 1. WriteHandler generates a response upon OnWriteRequest being called.
// 2. Since there is no matching active client-side exchange for that request, the IM engine
// handles it incorrectly and treats it like an unsolicited message.
// 3. It is invalid to receive a WriteResponse as an unsolicited message so it correctly sends back
// a StatusResponse containing an error to that message.
// 4. Without unwinding the existing call stack, a response is received on the same exchange that the handler
// generated a WriteResponse on. This exchange should have been closed in a normal execution model, but in
// a synchronous model, the exchange is still open, and the status response is sent to the WriteHandler.
// 5. WriteHandler::OnMessageReceived is invoked, and it correctly asserts.
//
ctx.EnableAsyncDispatch();

constexpr bool allBooleans[] = { true, false };
for (auto messageIsTimed : allBooleans)
{
Expand All @@ -278,27 +294,35 @@ void TestWriteInteraction::TestWriteHandler(nlTestSuite * apSuite, void * apCont

TestExchangeDelegate delegate;
Messaging::ExchangeContext * exchange = ctx.NewExchangeToBob(&delegate);
Status status = writeHandler.OnWriteRequest(exchange, std::move(buf), transactionIsTimed);

Status status = writeHandler.OnWriteRequest(exchange, std::move(buf), transactionIsTimed);
if (messageIsTimed == transactionIsTimed)
{
NL_TEST_ASSERT(apSuite, status == Status::Success);
}
else
{
NL_TEST_ASSERT(apSuite, status == Status::UnsupportedAccess);
// In the normal code flow, the exchange would now get closed
// when we send the error status on it (of if that fails when
// the stack unwinds). In the success case it's been closed
// already by the WriteHandler sending the response on it, but
// if we are in the error case we need to make sure it gets
// closed.
//
// In a normal execution flow, the exchange manager would have closed out the exchange after the
// message dispatch call path had unwound. In this test however, we've manually allocated the exchange
// ourselves (as opposed to the exchange manager), so we need to take ownership of closing out the exchange.
//
// Note that this doesn't happen in the success case above, since that results in a call to send a message through
// the exchange context, which results in the exchange manager correctly closing it.
//
exchange->Close();
NL_TEST_ASSERT(apSuite, status == Status::UnsupportedAccess);
}

ctx.DrainAndServiceIO();
ctx.DrainAndServiceIO();

Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr();
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);
}
}

ctx.DisableAsyncDispatch();
}

CHIP_ERROR WriteSingleClusterData(const Access::SubjectDescriptor & aSubjectDescriptor, ClusterInfo & aClusterInfo,
Expand Down
4 changes: 2 additions & 2 deletions src/controller/tests/data_model/TestRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleSubscription
auto onFailureCb = [&apSuite, &numFailureCalls](const app::ConcreteAttributePath * attributePath, CHIP_ERROR aError) {
numFailureCalls++;

NL_TEST_ASSERT(apSuite, aError == CHIP_IM_GLOBAL_STATUS(Protocols::InteractionModel::Status::ResourceExhausted));
NL_TEST_ASSERT(apSuite, aError == CHIP_IM_GLOBAL_STATUS(ResourceExhausted));
NL_TEST_ASSERT(apSuite, attributePath == nullptr);
};

Expand Down Expand Up @@ -499,7 +499,7 @@ void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleReads(nlTest
auto onFailureCb = [&apSuite, &numFailureCalls](const app::ConcreteAttributePath * attributePath, CHIP_ERROR aError) {
numFailureCalls++;

NL_TEST_ASSERT(apSuite, aError == CHIP_IM_GLOBAL_STATUS(Protocols::InteractionModel::Status::ResourceExhausted));
NL_TEST_ASSERT(apSuite, aError == CHIP_IM_GLOBAL_STATUS(ResourceExhausted));
NL_TEST_ASSERT(apSuite, attributePath == nullptr);
};

Expand Down
8 changes: 6 additions & 2 deletions src/lib/core/CHIPError.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,13 @@ using CHIP_ERROR = ::chip::ChipError;

#define CHIP_CORE_ERROR(e) CHIP_SDK_ERROR(::chip::ChipError::SdkPart::kCore, (e))

#define CHIP_IM_GLOBAL_STATUS(e) CHIP_SDK_ERROR(::chip::ChipError::SdkPart::kIMGlobalStatus, to_underlying(e))
#define CHIP_IM_GLOBAL_STATUS(type) \
CHIP_SDK_ERROR(::chip::ChipError::SdkPart::kIMGlobalStatus, to_underlying(Protocols::InteractionModel::Status::type))

#define CHIP_IM_CLUSTER_STATUS(e) CHIP_SDK_ERROR(::chip::ChipError::SdkPart::kIMClusterStatus, e)
//
// type must be a compile-time constant as mandated by CHIP_SDK_ERROR.
//
#define CHIP_IM_CLUSTER_STATUS(type) CHIP_SDK_ERROR(::chip::ChipError::SdkPart::kIMClusterStatus, type)

// clang-format off

Expand Down
14 changes: 14 additions & 0 deletions src/messaging/tests/MessagingContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,20 @@ class LoopbackMessagingContext : public MessagingContext
impl.EnableAsyncDispatch(&mIOContext.GetSystemLayer());
}

/*
* Reset the dispatch back to a model that synchronously dispatches received messages up the stack.
*
* NOTE: This results in highly atypical/complex call stacks that are not representative of what happens on real
* devices and can cause subtle and complex bugs to either appear or get masked in the system. Where possible, please
* use this sparingly!
*
*/
void DisableAsyncDispatch()
{
auto & impl = GetLoopback();
impl.DisableAsyncDispatch();
}

/*
* This drives the servicing of events using the embedded IOContext while there are pending
* messages in the loopback transport's pending message queue. This should run to completion
Expand Down
9 changes: 9 additions & 0 deletions src/transport/raw/tests/NetworkTestHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ class LoopbackTransport : public Transport::Base
mAsyncMessageDispatch = true;
}

/*
* Reset the dispatch back to a model that synchronously dispatches received messages up the stack.
*
* NOTE: This results in highly atypical/complex call stacks that are not representative of what happens on real
* devices and can cause subtle and complex bugs to either appear or get masked in the system. Where possible, please
* use this sparingly!
*/
void DisableAsyncDispatch() { mAsyncMessageDispatch = false; }

bool HasPendingMessages() { return !mPendingMessageQueue.empty(); }

static void OnMessageReceived(System::Layer * aSystemLayer, void * aAppState)
Expand Down

0 comments on commit 9df12c2

Please sign in to comment.