Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ExchangeMessageDispatch #17521

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/messaging/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ static_library("messaging") {
"ExchangeContext.cpp",
"ExchangeContext.h",
"ExchangeDelegate.h",
"ExchangeMessageDispatch.cpp",
"ExchangeMessageDispatch.h",
"ExchangeMgr.cpp",
"ExchangeMgr.h",
Expand Down
112 changes: 93 additions & 19 deletions src/messaging/ExchangeContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,6 @@ CHIP_ERROR ExchangeContext::SendMessage(Protocols::Id protocolId, uint8_t msgTyp
// an error arising below. at the end, we have to close it.
ExchangeHandle ref(*this);

// If session requires MRP, NoAutoRequestAck send flag is not specified and is not a group exchange context, request reliable
// transmission.
bool reliableTransmissionRequested =
GetSessionHandle()->RequireMRP() && !sendFlags.Has(SendMessageFlags::kNoAutoRequestAck) && !IsGroupExchangeContext();

// If a response message is expected...
if (sendFlags.Has(SendMessageFlags::kExpectResponse) && !IsGroupExchangeContext())
{
Expand Down Expand Up @@ -184,9 +179,78 @@ CHIP_ERROR ExchangeContext::SendMessage(Protocols::Id protocolId, uint8_t msgTyp
}

// Create a new scope for `err`, to avoid shadowing warning previous `err`.
CHIP_ERROR err = mDispatch.SendMessage(GetExchangeMgr()->GetSessionManager(), mSession.Get(), mExchangeId, IsInitiator(),
GetReliableMessageContext(), reliableTransmissionRequested, protocolId, msgType,
std::move(msgBuf));
CHIP_ERROR err = ([&] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes an already-large method much larger and quite hard to read/understand, what with the various "return" bits that return out of the lambda, not the overall method. What is the benefit of inlining this instead of having it as a separate function (possibly on ExchangeContext if we don't think it should be on the message dispatch)?

VerifyOrReturnError(mDispatch.MessagePermitted(protocolId.GetProtocolId(), msgType), CHIP_ERROR_INVALID_ARGUMENT);

PayloadHeader payloadHeader;
payloadHeader.SetExchangeID(mExchangeId).SetMessageType(protocolId, msgType).SetInitiator(IsInitiator());

ReliableMessageContext * reliableMessageContext = GetReliableMessageContext();

// If there is a pending acknowledgment piggyback it on this message.
if (reliableMessageContext->HasPiggybackAckPending())
{
payloadHeader.SetAckMessageCounter(reliableMessageContext->TakePendingPeerAckMessageCounter());

#if !defined(NDEBUG)
if (!payloadHeader.HasMessageType(Protocols::SecureChannel::MsgType::StandaloneAck))
{
ChipLogDetail(ExchangeManager,
"Piggybacking Ack for MessageCounter:" ChipLogFormatMessageCounter
" on exchange: " ChipLogFormatExchangeId,
payloadHeader.GetAckMessageCounter().Value(), ChipLogValueExchangeId(mExchangeId, IsInitiator()));
}
#endif
}

SessionManager * sessionManager = GetExchangeMgr()->GetSessionManager();
SessionHandle session = GetSessionHandle();

// If session requires MRP, NoAutoRequestAck send flag is not specified, request reliable transmission.
if (mSession->RequireMRP() && !sendFlags.Has(SendMessageFlags::kNoAutoRequestAck) && reliableMessageContext->AutoRequestAck())
{
auto * reliableMessageMgr = reliableMessageContext->GetReliableMessageMgr();

payloadHeader.SetNeedsAck(true);

ReliableMessageMgr::RetransTableEntry * entry = nullptr;

// Add to Table for subsequent sending
ReturnErrorOnFailure(reliableMessageMgr->AddToRetransTable(reliableMessageContext, &entry));
auto deleter = [reliableMessageMgr](ReliableMessageMgr::RetransTableEntry * e) {
reliableMessageMgr->ClearRetransTable(*e);
};
std::unique_ptr<ReliableMessageMgr::RetransTableEntry, decltype(deleter)> entryOwner(entry, deleter);

ReturnErrorOnFailure(sessionManager->PrepareMessage(session, payloadHeader, std::move(msgBuf), entryOwner->retainedBuf));
CHIP_ERROR err2 = sessionManager->SendPreparedMessage(session, entryOwner->retainedBuf);
if (err2 == CHIP_ERROR_POSIX(ENOBUFS))
{
// sendmsg on BSD-based systems never blocks, no matter how the
// socket is configured, and will return ENOBUFS in situation in
// which Linux, for example, blocks.
//
// This is typically a transient situation, so we pretend like this
// packet drop happened somewhere on the network instead of inside
// sendmsg and will just resend it in the normal MRP way later.
ChipLogError(ExchangeManager, "Ignoring ENOBUFS: %" CHIP_ERROR_FORMAT " on exchange " ChipLogFormatExchangeId,
err2.Format(), ChipLogValueExchangeId(mExchangeId, IsInitiator()));
err2 = CHIP_NO_ERROR;
}
ReturnErrorOnFailure(err2);
reliableMessageMgr->StartRetransmision(entryOwner.release());
}
else
{
// If the channel itself is providing reliability, let's not request MRP acks
payloadHeader.SetNeedsAck(false);
EncryptedPacketBufferHandle preparedMessage;
ReturnErrorOnFailure(sessionManager->PrepareMessage(session, payloadHeader, std::move(msgBuf), preparedMessage));
ReturnErrorOnFailure(sessionManager->SendPreparedMessage(session, preparedMessage));
}

return CHIP_NO_ERROR;
})();
if (err != CHIP_NO_ERROR && IsResponseExpected())
{
CancelResponseTimer();
Expand Down Expand Up @@ -272,10 +336,10 @@ void ExchangeContextDeletor::Release(ExchangeContext * ec)

ExchangeContext::ExchangeContext(ExchangeManager * em, uint16_t ExchangeId, const SessionHandle & session, bool Initiator,
ExchangeDelegate * delegate) :
mDispatch((delegate != nullptr) ? delegate->GetMessageDispatch() : ApplicationExchangeDispatch::Instance()),
mSession(*this)
mDispatch(ExchangeManager::GetDispatchForDelegate(delegate)), mSession(*this)
{
VerifyOrDie(mExchangeMgr == nullptr);
VerifyOrDie(mDispatch.IsEncryptionRequired() == session->IsEncrypted());

mExchangeMgr = em;
mExchangeId = ExchangeId;
Expand All @@ -286,9 +350,7 @@ ExchangeContext::ExchangeContext(ExchangeManager * em, uint16_t ExchangeId, cons
SetDropAckDebug(false);
SetAckPending(false);
SetMsgRcvdFromPeer(false);

// Do not request Ack for multicast
SetAutoRequestAck(!session->IsGroupSession());
SetAutoRequestAck(true);

#if defined(CHIP_EXCHANGE_CONTEXT_DETAIL_LOGGING)
ChipLogDetail(ExchangeManager, "ec++ id: " ChipLogFormatExchange, ChipLogValueExchange(this));
Expand Down Expand Up @@ -331,10 +393,6 @@ bool ExchangeContext::MatchExchange(const SessionHandle & session, const PacketH
// AND The Session associated with the incoming message matches the Session associated with the exchange.
&& (mSession.Contains(session))

// TODO: This check should be already implied by the equality of session check,
// It should be removed after we have implemented the temporary node id for PASE and CASE sessions
&& (IsEncryptionRequired() == packetHeader.IsEncrypted())

// AND The message was sent by an initiator and the exchange context is a responder (IsInitiator==false)
// OR The message was sent by a responder and the exchange context is an initiator (IsInitiator==true) (for the broadcast
// case, the initiator is ill defined)
Expand Down Expand Up @@ -457,8 +515,24 @@ CHIP_ERROR ExchangeContext::HandleMessage(uint32_t messageCounter, const Payload
MessageHandled();
});

ReturnErrorOnFailure(
mDispatch.OnMessageReceived(messageCounter, payloadHeader, peerAddress, msgFlags, GetReliableMessageContext()));
VerifyOrReturnError(mDispatch.MessagePermitted(payloadHeader.GetProtocolID().GetProtocolId(), payloadHeader.GetMessageType()), CHIP_ERROR_INVALID_ARGUMENT);

if (mSession->RequireMRP())
{
ReliableMessageContext * reliableMessageContext = GetReliableMessageContext();

if (!msgFlags.Has(MessageFlagValues::kDuplicateMessage) && payloadHeader.IsAckMsg() &&
payloadHeader.GetAckMessageCounter().HasValue())
{
reliableMessageContext->HandleRcvdAck(payloadHeader.GetAckMessageCounter().Value());
}

if (payloadHeader.NeedsAck())
{
// An acknowledgment needs to be sent back to the peer for this message on this exchange,
ReturnErrorOnFailure(reliableMessageContext->HandleNeedsAck(messageCounter, msgFlags));
}
}

if (IsAckPending() && !mDelegate)
{
Expand Down
2 changes: 0 additions & 2 deletions src/messaging/ExchangeContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ class DLL_EXPORT ExchangeContext : public ReliableMessageContext,
*/
bool IsInitiator() const;

bool IsEncryptionRequired() const { return mDispatch.IsEncryptionRequired(); }

bool IsGroupExchangeContext() const { return mSession && mSession->IsGroupSession(); }

// Implement SessionReleaseDelegate
Expand Down
143 changes: 0 additions & 143 deletions src/messaging/ExchangeMessageDispatch.cpp

This file was deleted.

14 changes: 0 additions & 14 deletions src/messaging/ExchangeMessageDispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,9 @@ class ReliableMessageContext;
class ExchangeMessageDispatch
{
public:
ExchangeMessageDispatch() {}
virtual ~ExchangeMessageDispatch() {}

virtual bool IsEncryptionRequired() const { return true; }

CHIP_ERROR SendMessage(SessionManager * sessionManager, const SessionHandle & session, uint16_t exchangeId, bool isInitiator,
ReliableMessageContext * reliableMessageContext, bool isReliableTransmission, Protocols::Id protocol,
uint8_t type, System::PacketBufferHandle && message);
CHIP_ERROR OnMessageReceived(uint32_t messageCounter, const PayloadHeader & payloadHeader,
const Transport::PeerAddress & peerAddress, MessageFlags msgFlags,
ReliableMessageContext * reliableMessageContext);

protected:
virtual bool MessagePermitted(uint16_t protocol, uint8_t type) = 0;

// TODO: remove IsReliableTransmissionAllowed, this function should be provided over session.
virtual bool IsReliableTransmissionAllowed() const { return true; }
};

} // namespace Messaging
Expand Down
13 changes: 6 additions & 7 deletions src/messaging/ExchangeMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ void ExchangeManager::OnMessageReceived(const PacketHeader & packetHeader, const
}
}

if (GetDispatchForDelegate(delegate).IsEncryptionRequired() != session->IsEncrypted())
{
ChipLogError(ExchangeManager, "OnMessageReceived failed, err = %s", ErrorStr(CHIP_ERROR_NO_UNSOLICITED_MESSAGE_HANDLER));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not the error type we used to have here. Why is this the right type?

Also, this check will always fail if delegate is null, right? As long as we are moving this around, can we skip this check for the !delegate case where we're just trying to send a standalone ack? Or will something go terribly wrong if the session is not in fact encrypted and we try to send an ack via ApplicationExchangeDispatch?

return;
}

// If rcvd msg is from initiator then this exchange is created as not Initiator.
// If rcvd msg is not from initiator then this exchange is created as Initiator.
// Note that if matchingUMH is not null then rcvd msg if from initiator.
Expand All @@ -310,13 +316,6 @@ void ExchangeManager::OnMessageReceived(const PacketHeader & packetHeader, const
ChipLogDetail(ExchangeManager, "Handling via exchange: " ChipLogFormatExchange ", Delegate: %p", ChipLogValueExchange(ec),
ec->GetDelegate());

if (ec->IsEncryptionRequired() != packetHeader.IsEncrypted())
{
ChipLogError(ExchangeManager, "OnMessageReceived failed, err = %s", ErrorStr(CHIP_ERROR_INVALID_MESSAGE_TYPE));
ec->Close();
return;
}

CHIP_ERROR err = ec->HandleMessage(packetHeader.GetMessageCounter(), payloadHeader, source, msgFlags, std::move(msgBuf));
if (err != CHIP_NO_ERROR)
{
Expand Down
5 changes: 5 additions & 0 deletions src/messaging/ExchangeMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ class DLL_EXPORT ExchangeManager : public SessionMessageDelegate

size_t GetNumActiveExchanges() { return mContextPool.Allocated(); }

static ExchangeMessageDispatch & GetDispatchForDelegate(ExchangeDelegate * delegate)
{
return (delegate != nullptr) ? delegate->GetMessageDispatch() : ApplicationExchangeDispatch::Instance();
}

private:
enum class State
{
Expand Down
Loading