Skip to content

Commit

Permalink
[IM] Add refcount to CommandHandler for async commands (#11367)
Browse files Browse the repository at this point in the history
* [IM] Add refcount methnism for async background work

* Invalid CommandHandler::Handle when chip stack shutdown

* mRefCount -> mPendingWork

* Upd
  • Loading branch information
erjiaqing authored and pull[bot] committed Jun 14, 2023
1 parent 79622aa commit 73faef2
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 31 deletions.
77 changes: 66 additions & 11 deletions src/app/CommandHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,19 @@ CHIP_ERROR CommandHandler::AllocateBuffer()
CHIP_ERROR CommandHandler::OnInvokeCommandRequest(Messaging::ExchangeContext * ec, const PayloadHeader & payloadHeader,
System::PacketBufferHandle && payload)
{
CHIP_ERROR err = CHIP_NO_ERROR;
System::PacketBufferHandle response;

VerifyOrReturnError(mState == CommandState::Idle, CHIP_ERROR_INCORRECT_STATE);

// NOTE: we already know this is an InvokeCommand Request message because we explicitly registered with the
// Exchange Manager for unsolicited InvokeCommand Requests.

mpExchangeCtx = ec;

err = ProcessInvokeRequest(std::move(payload));
SuccessOrExit(err);

err = SendCommandResponse();
SuccessOrExit(err);
// Use the RAII feature, if this is the only Handle when this function returns, DecrementHoldOff will trigger sending response.
Handle workHandle(this);
mpExchangeCtx->WillSendMessage();
ReturnErrorOnFailure(ProcessInvokeRequest(std::move(payload)));

exit:
Close();
return err;
return CHIP_NO_ERROR;
}

CHIP_ERROR CommandHandler::ProcessInvokeRequest(System::PacketBufferHandle && payload)
Expand Down Expand Up @@ -125,6 +119,12 @@ void CommandHandler::Close()
mSuppressResponse = false;
MoveToState(CommandState::AwaitingDestruction);

// We must finish all async work before we can shut down a CommandHandler. The actual CommandHandler MUST finish their work
// in reasonable time or there is a bug. The only case for releasing CommandHandler without CommandHandler::Handle releasing its
// reference is the stack shutting down, in which case Close() is not called. So the below check should always pass.
VerifyOrDieWithMsg(mPendingWork == 0, DataManagement, "CommandHandler::Close() called with %zu unfinished async work items",
mPendingWork);

Command::Close();

if (mpCallback)
Expand All @@ -133,10 +133,37 @@ void CommandHandler::Close()
}
}

void CommandHandler::IncrementHoldOff()
{
mPendingWork++;
}

void CommandHandler::DecrementHoldOff()
{
mPendingWork--;
ChipLogDetail(DataManagement, "Decreasing reference count for CommandHandler, remaining %zu", mPendingWork);
if (mPendingWork != 0)
{
return;
}
CHIP_ERROR err = SendCommandResponse();
if (err != CHIP_NO_ERROR)
{
ChipLogError(DataManagement, "Failed to send command response: %" CHIP_ERROR_FORMAT, err.Format());
// We marked the exchange as "WillSendMessage", need to shutdown the exchange manually to avoid leaking exchanges.
if (mpExchangeCtx != nullptr)
{
mpExchangeCtx->Close();
}
}
Close();
}

CHIP_ERROR CommandHandler::SendCommandResponse()
{
System::PacketBufferHandle commandPacket;

VerifyOrReturnError(mPendingWork == 0, CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(mState == CommandState::AddedCommand, CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(mpExchangeCtx != nullptr, CHIP_ERROR_INCORRECT_STATE);

Expand Down Expand Up @@ -343,6 +370,34 @@ TLV::TLVWriter * CommandHandler::GetCommandDataIBTLVWriter()
}
}

CommandHandler * CommandHandler::Handle::Get()
{
return (mMagic == InteractionModelEngine::GetInstance()->GetMagicNumber()) ? mpHandler : nullptr;
}

void CommandHandler::Handle::Release()
{
if (mpHandler != nullptr)
{
if (mMagic == InteractionModelEngine::GetInstance()->GetMagicNumber())
{
mpHandler->DecrementHoldOff();
}
mpHandler = nullptr;
mMagic = 0;
}
}

CommandHandler::Handle::Handle(CommandHandler * handle)
{
if (handle != nullptr)
{
handle->IncrementHoldOff();
mpHandler = handle;
mMagic = InteractionModelEngine::GetInstance()->GetMagicNumber();
}
}

} // namespace app
} // namespace chip

Expand Down
72 changes: 67 additions & 5 deletions src/app/CommandHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,51 @@ class CommandHandler : public Command
virtual bool CommandExists(const ConcreteCommandPath & aCommandPath) = 0;
};

class Handle
{
public:
Handle() {}
Handle(const Handle & handle) = delete;
Handle(Handle && handle)
{
mpHandler = handle.mpHandler;
mMagic = handle.mMagic;
handle.mpHandler = nullptr;
handle.mMagic = 0;
}
Handle(decltype(nullptr)) {}
Handle(CommandHandler * handle);
~Handle() { Release(); }

Handle & operator=(Handle && handle)
{
Release();
mpHandler = handle.mpHandler;
mMagic = handle.mMagic;
handle.mpHandler = nullptr;
handle.mMagic = 0;
return *this;
}

Handle & operator=(decltype(nullptr))
{
Release();
return *this;
}

/**
* Get the CommandHandler object it holds. Get() may return a nullptr if the CommandHandler object is holds is no longer
* valid.
*/
CommandHandler * Get();

void Release();

private:
CommandHandler * mpHandler = nullptr;
uint32_t mMagic = 0;
};

/*
* Constructor.
*
Expand Down Expand Up @@ -122,6 +167,22 @@ class CommandHandler : public Command

private:
friend class TestCommandInteraction;
friend class CommandHandler::Handle;

/**
* IncrementHoldOff will increase the inner refcount of the CommandHandler.
*
* Users should use CommandHandler::Handle for management the lifespan of the CommandHandler.
* DefRef should be released in reasonable time, and Close() should only be called when the refcount reached 0.
*/
void IncrementHoldOff();

/**
* DecrementHoldOff is used by CommandHandler::Handle for decreasing the refcount of the CommandHandler.
* When refcount reached 0, CommandHandler will send the response to the peer and shutdown.
*/
void DecrementHoldOff();

/*
* Allocates a packet buffer used for encoding an invoke response payload.
*
Expand All @@ -130,11 +191,11 @@ class CommandHandler : public Command
*/
CHIP_ERROR AllocateBuffer();

//
// Called internally to signal the completion of all work on this object, gracefully close the
// exchange (by calling into the base class) and finally, signal to a registerd callback that it's
// safe to release this object.
//
/**
* Called internally to signal the completion of all work on this object, gracefully close the
* exchange (by calling into the base class) and finally, signal to a registerd callback that it's
* safe to release this object.
*/
void Close();

CHIP_ERROR ProcessCommandDataIB(CommandDataIB::Parser & aCommandElement);
Expand All @@ -146,6 +207,7 @@ class CommandHandler : public Command
Callback * mpCallback = nullptr;
InvokeResponseMessage::Builder mInvokeResponseBuilder;
TLV::TLVType mDataElementContainerType = TLV::kTLVType_NotSpecified;
size_t mPendingWork = 0;
bool mSuppressResponse = false;
bool mTimedRequest = false;
};
Expand Down
31 changes: 16 additions & 15 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ CHIP_ERROR InteractionModelEngine::Init(Messaging::ExchangeManager * apExchangeM
mClusterInfoPool[CHIP_IM_SERVER_MAX_NUM_PATH_GROUPS - 1].mpNext = nullptr;
mpNextAvailableClusterInfo = mClusterInfoPool;

mMagic++;

return CHIP_NO_ERROR;
}

Expand All @@ -76,21 +78,20 @@ void InteractionModelEngine::Shutdown()

mCommandHandlerList = nullptr;

//
// Since modifying the pool during iteration is generally frowned upon,
// I've chosen to just destroy the object but not necessarily de-allocate it.
//
// This poses a problem when shutting down and restarting the stack, since the
// IMEngine is a statically constructed singleton, and this lingering state will
// cause issues.
//
// This doesn't pose a problem right now because there shouldn't be any actual objects
// left here due to the synchronous nature of command handling.
//
// Filed #10332 to track this.
//
mCommandHandlerObjs.ForEachActiveObject([](CommandHandler * obj) -> bool {
obj->~CommandHandler();
// Increase magic number to invalidate all Handle-s.
mMagic++;

mCommandHandlerObjs.ForEachActiveObject([this](CommandHandler * obj) -> bool {
// Modifying the pool during iteration is generally frowned upon.
// This is almost safe since mCommandHandlerObjs is a BitMapObjectPool which won't malfunction when modifying the inner
// record while during traversal. But this behavior is not guranteed, so we should fix this by implementing DeallocateAll.
//
// Deallocate an CommandHandler will call its destructor (and abort the exchange context it holds) without calling
// Shutdown().
//
// TODO(@kghost, #10332) Implement DeallocateAll and replace this.

mCommandHandlerObjs.Deallocate(obj);
return true;
});

Expand Down
9 changes: 9 additions & 0 deletions src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ class InteractionModelEngine : public Messaging::ExchangeDelegate, public Comman

uint16_t GetWriteClientArrayIndex(const WriteClient * const apWriteClient) const;

/**
* The Magic number of this InteractionModelEngine, the magic number is set during Init()
*/
uint32_t GetMagicNumber() { return mMagic; }

reporting::Engine & GetReportingEngine() { return mReportingEngine; }

void ReleaseClusterInfoList(ClusterInfo *& aClusterInfo);
Expand Down Expand Up @@ -245,6 +250,10 @@ class InteractionModelEngine : public Messaging::ExchangeDelegate, public Comman
reporting::Engine mReportingEngine;
ClusterInfo mClusterInfoPool[CHIP_IM_SERVER_MAX_NUM_PATH_GROUPS];
ClusterInfo * mpNextAvailableClusterInfo = nullptr;

// A magic number for tracking values between stack Shutdown()-s and Init()-s.
// An ObjectHandle is valid iff. its magic equals to this one.
uint32_t mMagic = 0;
};

void DispatchSingleClusterCommand(const ConcreteCommandPath & aCommandPath, chip::TLV::TLVReader & aReader,
Expand Down
42 changes: 42 additions & 0 deletions src/app/tests/TestCommandInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ namespace {
bool isCommandDispatched = false;

bool sendResponse = true;
bool asyncCommand = false;

constexpr EndpointId kTestEndpointId = 1;
constexpr ClusterId kTestClusterId = 3;
Expand All @@ -61,6 +62,9 @@ constexpr CommandId kTestNonExistCommandId = 0;
} // namespace

namespace app {

CommandHandler::Handle asyncCommandHandle;

bool ServerClusterCommandExists(const ConcreteCommandPath & aCommandPath)
{
// Mock cluster catalog, only support one command on one cluster on one endpoint.
Expand All @@ -75,6 +79,12 @@ void DispatchSingleClusterCommand(const ConcreteCommandPath & aCommandPath, chip
"Received Cluster Command: Endpoint=%" PRIx16 " Cluster=" ChipLogFormatMEI " Command=" ChipLogFormatMEI,
aCommandPath.mEndpointId, ChipLogValueMEI(aCommandPath.mClusterId), ChipLogValueMEI(aCommandPath.mCommandId));

if (asyncCommand)
{
asyncCommandHandle = apCommandObj;
asyncCommand = false;
}

if (sendResponse)
{
if (aCommandPath.mCommandId == kTestCommandId)
Expand Down Expand Up @@ -167,6 +177,7 @@ class TestCommandInteraction
static void TestCommandHandlerWithProcessReceivedEmptyDataMsg(nlTestSuite * apSuite, void * apContext);

static void TestCommandSenderCommandSuccessResponseFlow(nlTestSuite * apSuite, void * apContext);
static void TestCommandSenderCommandAsyncSuccessResponseFlow(nlTestSuite * apSuite, void * apContext);
static void TestCommandSenderCommandFailureResponseFlow(nlTestSuite * apSuite, void * apContext);
static void TestCommandSenderCommandSpecificResponseFlow(nlTestSuite * apSuite, void * apContext);

Expand Down Expand Up @@ -593,6 +604,36 @@ void TestCommandInteraction::TestCommandSenderCommandSuccessResponseFlow(nlTestS
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}

void TestCommandInteraction::TestCommandSenderCommandAsyncSuccessResponseFlow(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
CHIP_ERROR err = CHIP_NO_ERROR;

mockCommandSenderDelegate.ResetCounter();
app::CommandSender commandSender(&mockCommandSenderDelegate, &ctx.GetExchangeManager());

AddInvokeRequestData(apSuite, apContext, &commandSender);
asyncCommand = true;
err = commandSender.SendCommandRequest(ctx.GetSessionBobToAlice());

NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);
NL_TEST_ASSERT(apSuite,
mockCommandSenderDelegate.onResponseCalledTimes == 0 && mockCommandSenderDelegate.onFinalCalledTimes == 0 &&
mockCommandSenderDelegate.onErrorCalledTimes == 0);

NL_TEST_ASSERT(apSuite, GetNumActiveHandlerObjects() == 1);
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 2);

// Decrease CommandHandler refcount and send response
asyncCommandHandle = nullptr;
NL_TEST_ASSERT(apSuite,
mockCommandSenderDelegate.onResponseCalledTimes == 1 && mockCommandSenderDelegate.onFinalCalledTimes == 1 &&
mockCommandSenderDelegate.onErrorCalledTimes == 0);

NL_TEST_ASSERT(apSuite, GetNumActiveHandlerObjects() == 0);
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}

void TestCommandInteraction::TestCommandSenderCommandSpecificResponseFlow(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
Expand Down Expand Up @@ -692,6 +733,7 @@ const nlTest sTests[] =
NL_TEST_DEF("TestCommandHandlerWithProcessReceivedEmptyDataMsg", chip::app::TestCommandInteraction::TestCommandHandlerWithProcessReceivedEmptyDataMsg),

NL_TEST_DEF("TestCommandSenderCommandSuccessResponseFlow", chip::app::TestCommandInteraction::TestCommandSenderCommandSuccessResponseFlow),
NL_TEST_DEF("TestCommandSenderCommandAsyncSuccessResponseFlow", chip::app::TestCommandInteraction::TestCommandSenderCommandAsyncSuccessResponseFlow),
NL_TEST_DEF("TestCommandSenderCommandSpecificResponseFlow", chip::app::TestCommandInteraction::TestCommandSenderCommandSpecificResponseFlow),
NL_TEST_DEF("TestCommandSenderCommandFailureResponseFlow", chip::app::TestCommandInteraction::TestCommandSenderCommandFailureResponseFlow),
NL_TEST_DEF("TestCommandSenderAbruptDestruction", chip::app::TestCommandInteraction::TestCommandSenderAbruptDestruction),
Expand Down
Loading

0 comments on commit 73faef2

Please sign in to comment.