Skip to content

Commit

Permalink
Make exchanges handle closing on message receipt more automatically.
Browse files Browse the repository at this point in the history
The idea is that the cases that want to keep the exchange open should
do so explicitly and in all other cases the exchange should close.
This helps avoid exchange leaks and makes it much clearer when an
exchange is being kept open.
  • Loading branch information
bzbarsky-apple committed Jul 1, 2021
1 parent 08b5650 commit 675c69d
Show file tree
Hide file tree
Showing 24 changed files with 169 additions and 78 deletions.
1 change: 0 additions & 1 deletion examples/shell/shell_common/cmd_send.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class MockAppDelegate : public Messaging::ExchangeDelegate
streamer_printf(sout, "Response received: len=%u time=%.3fms\n", buffer->DataLength(),
static_cast<double>(transitTime) / 1000);

gExchangeCtx->Close();
gExchangeCtx = nullptr;
return CHIP_NO_ERROR;
}
Expand Down
2 changes: 2 additions & 0 deletions src/app/CommandHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ CHIP_ERROR CommandHandler::SendCommandResponse()
MoveToState(CommandState::Sending);

exit:
// Keep Shutdown() from double-closing our exchange.
mpExchangeCtx = nullptr;
Shutdown();
ChipLogFunctError(err);
return err;
Expand Down
5 changes: 2 additions & 3 deletions src/app/CommandSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ CHIP_ERROR CommandSender::OnMessageReceived(Messaging::ExchangeContext * apExcha
exit:
ChipLogFunctError(err);

// Close the exchange cleanly so that the ExchangeManager will send an ack for the message we just received.
// This needs to be done before the Reset() call, because Reset() aborts mpExchangeCtx if its not null.
mpExchangeCtx->Close();
// Null out mpExchangeCtx, so our Shutdown() call below won't try to abort
// it and fail to send an ack for the message we just received.
mpExchangeCtx = nullptr;

if (mpDelegate != nullptr)
Expand Down
1 change: 0 additions & 1 deletion src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ CHIP_ERROR InteractionModelEngine::OnUnknownMsgType(Messaging::ExchangeContext *
// err = SendStatusReport(ec, kChipProfile_Common, kStatus_UnsupportedMessage);
// SuccessOrExit(err);

apExchangeContext->Close();
apExchangeContext = nullptr;

ChipLogFunctError(err);
Expand Down
4 changes: 2 additions & 2 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ CHIP_ERROR ReadClient::OnMessageReceived(Messaging::ExchangeContext * apExchange
exit:
ChipLogFunctError(err);

// Close the exchange cleanly so that the ExchangeManager will send an ack for the message we just received.
mpExchangeCtx->Close();
// Null out mpExchangeCtx, so our Shutdown() call below won't try to abort
// it and fail to send an ack for the message we just received.
mpExchangeCtx = nullptr;
MoveToState(ClientState::Initialized);

Expand Down
10 changes: 10 additions & 0 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ CHIP_ERROR ReadHandler::OnReadRequest(Messaging::ExchangeContext * apExchangeCon
if (err != CHIP_NO_ERROR)
{
ChipLogFunctError(err);
// Keep Shutdown() from double-closing our exchange.
mpExchangeCtx = nullptr;
Shutdown();
}

Expand Down Expand Up @@ -153,6 +155,14 @@ CHIP_ERROR ReadHandler::ProcessReadRequest(System::PacketBufferHandle && aPayloa
MoveToState(HandlerState::Reportable);

err = InteractionModelEngine::GetInstance()->GetReportingEngine().ScheduleRun();
SuccessOrExit(err);

// mpExchangeCtx can be null here due to
// https://github.com/project-chip/connectedhomeip/issues/8031
if (mpExchangeCtx)
{
mpExchangeCtx->WillSendMessage();
}

exit:
ChipLogFunctError(err);
Expand Down
10 changes: 3 additions & 7 deletions src/app/WriteClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,20 +294,16 @@ CHIP_ERROR WriteClient::OnMessageReceived(Messaging::ExchangeContext * apExchang

VerifyOrDie(apExchangeContext == mpExchangeCtx);

// We are done with this exchange, and it will be closing itself.
mpExchangeCtx = nullptr;

// Verify that the message is an Invoke Command Response.
// If not, close the exchange and free the payload.
if (!aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::WriteResponse))
{
apExchangeContext->Close();
mpExchangeCtx = nullptr;
ExitNow();
}

// Close the current exchange after receiving the response since the response message marks the
// end of conversation represented by the exchange. We should create an new exchange for a new
// conversation defined in Interaction Model protocol.
ClearExistingExchangeContext();

err = ProcessWriteResponseMessage(std::move(aPayload));

exit:
Expand Down
2 changes: 2 additions & 0 deletions src/app/WriteHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ CHIP_ERROR WriteHandler::OnWriteRequest(Messaging::ExchangeContext * apExchangeC

exit:
ChipLogFunctError(err);
// Keep Shutdown() from double-closing our exchange.
mpExchangeCtx = nullptr;
Shutdown();
return err;
}
Expand Down
1 change: 0 additions & 1 deletion src/app/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ class ServerCallback : public ExchangeDelegate
}

exit:
exchangeContext->Close();
return err;
}

Expand Down
1 change: 0 additions & 1 deletion src/controller/CHIPDevice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ CHIP_ERROR Device::OnMessageReceived(Messaging::ExchangeContext * exchange, cons
HandleDataModelMessage(exchange, std::move(msgBuf));
}
}
exchange->Close();
return CHIP_NO_ERROR;
}

Expand Down
6 changes: 0 additions & 6 deletions src/controller/CHIPDeviceController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ CHIP_ERROR DeviceController::OnMessageReceived(Messaging::ExchangeContext * ec,
const PayloadHeader & payloadHeader, System::PacketBufferHandle && msgBuf)
{
uint16_t index;
bool needClose = true;

VerifyOrExit(mState == State::Initialized, ChipLogError(Controller, "OnMessageReceived was called in incorrect state"));

Expand All @@ -582,14 +581,9 @@ CHIP_ERROR DeviceController::OnMessageReceived(Messaging::ExchangeContext * ec,
index = FindDeviceIndex(packetHeader.GetSourceNodeId().Value());
VerifyOrExit(index < kNumMaxActiveDevices, ChipLogError(Controller, "OnMessageReceived was called for unknown device object"));

needClose = false; // Device will handle it
mActiveDevices[index].OnMessageReceived(ec, packetHeader, payloadHeader, std::move(msgBuf));

exit:
if (needClose)
{
ec->Close();
}
return CHIP_NO_ERROR;
}

Expand Down
43 changes: 41 additions & 2 deletions src/messaging/ExchangeContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ void ExchangeContext::SetResponseTimeout(Timeout timeout)
CHIP_ERROR ExchangeContext::SendMessage(Protocols::Id protocolId, uint8_t msgType, PacketBufferHandle && msgBuf,
const SendFlags & sendFlags)
{
// If we were waiting for a message send, this is it.
mFlags.Clear(Flags::kFlagWillSendMessage);

CHIP_ERROR err = CHIP_NO_ERROR;
Transport::PeerConnectionState * state = nullptr;

Expand Down Expand Up @@ -143,6 +146,8 @@ CHIP_ERROR ExchangeContext::SendMessage(Protocols::Id protocolId, uint8_t msgTyp

void ExchangeContext::DoClose(bool clearRetransTable)
{
mFlags.Set(Flags::kFlagClosed);

// Clear protocol callbacks
if (mDelegate != nullptr)
{
Expand Down Expand Up @@ -378,18 +383,26 @@ CHIP_ERROR ExchangeContext::HandleMessage(const PacketHeader & packetHeader, con
// layer has completed its work on the ExchangeContext.
Retain();

// Keep track of whether we're nested under an outer HandleMessage
// invocation.
bool alreadyHandlingMessage = mFlags.Has(Flags::kFlagHandlingMessage);
mFlags.Set(Flags::kFlagHandlingMessage);

bool isStandaloneAck = payloadHeader.HasMessageType(Protocols::SecureChannel::MsgType::StandaloneAck);
bool isDuplicate = msgFlags.Has(MessageFlagValues::kDuplicateMessage);

CHIP_ERROR err = mDispatch->OnMessageReceived(payloadHeader, packetHeader.GetMessageId(), peerAddress, msgFlags,
GetReliableMessageContext());
SuccessOrExit(err);

// The SecureChannel::StandaloneAck message type is only used for MRP; do not pass such messages to the application layer.
if (payloadHeader.HasMessageType(Protocols::SecureChannel::MsgType::StandaloneAck))
if (isStandaloneAck)
{
ExitNow(err = CHIP_NO_ERROR);
}

// Since the message is duplicate, let's not forward it up the stack
if (msgFlags.Has(MessageFlagValues::kDuplicateMessage))
if (isDuplicate)
{
ExitNow(err = CHIP_NO_ERROR);
}
Expand All @@ -412,6 +425,32 @@ CHIP_ERROR ExchangeContext::HandleMessage(const PacketHeader & packetHeader, con
}

exit:
// Don't close ourselves if we're already closed.
//
// Don't close ourselves if this message is a standalone ack. We're still
// not closed and getting an ack should not affect that. In particular,
// since the standalone ack was not passed to the delegate, the delegate
// never got a chance to say "stay open". The one exception here is if
// mDelegate is null: in that case this is an unsolicited message and we
// were just created to ack it and close after that.
//
// Don't close for duplicates for similar reasons, with the same exception.
//
// Also don't close if there's an outer HandleMessage invocation. It'll
// deal with the closing.
if (!mFlags.Has(Flags::kFlagClosed) && !mFlags.Has(Flags::kFlagWillSendMessage) && !IsResponseExpected() &&
(!isStandaloneAck || (mDelegate == nullptr)) && (!isDuplicate || (mDelegate == nullptr)) && !alreadyHandlingMessage)
{
Close();
}

if (!alreadyHandlingMessage)
{
// We are the outermost HandleMessage invocation. We're not handling a
// message anymore.
mFlags.Clear(Flags::kFlagHandlingMessage);
}

// Release the reference to the ExchangeContext that was held at the beginning of this function.
// This call should also do the needful of closing the ExchangeContext if the protocol has
// already made a prior call to Close().
Expand Down
6 changes: 6 additions & 0 deletions src/messaging/ExchangeContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ class DLL_EXPORT ExchangeContext : public ReliableMessageContext, public Referen
std::move(msgPayload), sendFlags);
}

/**
* A notification that we will have SendMessage called on us in the future
* (and should stay open until that happens).
*/
void WillSendMessage() { mFlags.Set(Flags::kFlagWillSendMessage); }

/**
* Handle a received CHIP message on this exchange.
*
Expand Down
13 changes: 12 additions & 1 deletion src/messaging/ExchangeDelegate.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,18 @@ class DLL_EXPORT ExchangeDelegate

/**
* @brief
* This function is the protocol callback for handling a received CHIP message.
* This function is the protocol callback for handling a received CHIP
* message.
*
* After calling this method an exchange will close itself unless one of
* two things happens:
*
* 1) A call to SendMessage on the exchange with the kExpectResponse flag
* set.
* 2) A call to WillSendMessage on the exchange.
*
* Consumers that don't do one of those things MUST NOT retain a pointer
* to the exchange.
*
* @param[in] ec A pointer to the ExchangeContext object.
* @param[in] packetHeader A reference to the PacketHeader object.
Expand Down
34 changes: 10 additions & 24 deletions src/messaging/ExchangeMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ void ExchangeManager::OnMessageReceived(const PacketHeader & packetHeader, const
{
CHIP_ERROR err = CHIP_NO_ERROR;
UnsolicitedMessageHandler * matchingUMH = nullptr;
bool sendAckAndCloseExchange = false;

ChipLogProgress(ExchangeManager, "Received message of type %d and protocolId %" PRIu32 " on exchange %d",
payloadHeader.GetMessageType(), payloadHeader.GetProtocolID().ToFullyQualifiedSpecForm(),
Expand Down Expand Up @@ -269,36 +268,23 @@ void ExchangeManager::OnMessageReceived(const PacketHeader & packetHeader, const
ExitNow(err = CHIP_ERROR_UNSOLICITED_MSG_NO_ORIGINATOR);
}

// If we didn't find an existing exchange that matches the message, and no unsolicited message handler registered
// to hand this message, we need to create a temporary exchange to send an ack for this message and then close this exchange.
sendAckAndCloseExchange = payloadHeader.NeedsAck() && (matchingUMH == nullptr);

// If we found a handler or we need to create a new exchange context (EC).
if (matchingUMH != nullptr || sendAckAndCloseExchange)
// If we found a handler or we need to send an ack, create an exchange to
// handle the message.
if (matchingUMH != nullptr || payloadHeader.NeedsAck())
{
ExchangeContext * ec = nullptr;

if (sendAckAndCloseExchange)
{
// If rcvd msg is from initiator then this exchange is created as not Initiator.
// If rcvd msg is not from initiator then this exchange is created as Initiator.
// TODO: Figure out which channel to use for the received message
ec = mContextPool.CreateObject(this, payloadHeader.GetExchangeID(), session, !payloadHeader.IsInitiator(), nullptr);
}
else
{
ec = mContextPool.CreateObject(this, payloadHeader.GetExchangeID(), session, false, matchingUMH->Delegate);
}
ExchangeDelegate * delegate = matchingUMH ? matchingUMH->Delegate : nullptr;
// If rcvd msg is from initiator then this exchange is created as not Initiator.
// If rcvd msg is not from initiator then this exchange is created as Initiator.
// Note that if matchingUMH is not null then rcvd msg if from initiator.
// TODO: Figure out which channel to use for the received message
ExchangeContext * ec =
mContextPool.CreateObject(this, payloadHeader.GetExchangeID(), session, !payloadHeader.IsInitiator(), delegate);

VerifyOrExit(ec != nullptr, err = CHIP_ERROR_NO_MEMORY);

ChipLogDetail(ExchangeManager, "ec id: %d, Delegate: 0x%p", ec->GetExchangeId(), ec->GetDelegate());

ec->HandleMessage(packetHeader, payloadHeader, source, msgFlags, std::move(msgBuf));

// Close exchange if it was created only to send ack for a duplicate message.
if (sendAckAndCloseExchange)
ec->Close();
}

exit:
Expand Down
8 changes: 8 additions & 0 deletions src/messaging/ReliableMessageContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ class ReliableMessageContext

/// When set, signifies that at least one message has been received from peer on this exchange context.
kFlagMsgRcvdFromPeer = 0x0040,

/// When set, signifies that this exchange is waiting for a call to SendMessage.
kFlagWillSendMessage = 0x0080,

/// When set, signifies that we are currently in the middle of HandleMessage.
kFlagHandlingMessage = 0x0100,
/// When set, we have had Close() or Abort() called on us already.
kFlagClosed = 0x0200,
};

BitFlags<Flags> mFlags; // Internal state flags
Expand Down
1 change: 0 additions & 1 deletion src/messaging/tests/TestExchangeMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ class MockAppDelegate : public ExchangeDelegate
System::PacketBufferHandle && buffer) override
{
IsOnMessageReceivedCalled = true;
ec->Close();
return CHIP_NO_ERROR;
}

Expand Down
Loading

0 comments on commit 675c69d

Please sign in to comment.