Skip to content

Commit

Permalink
Improve ExchangeMessageDispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
kghost committed Apr 19, 2022
1 parent 03ea72d commit 1efd09b
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 196 deletions.
1 change: 0 additions & 1 deletion src/messaging/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ static_library("messaging") {
"ExchangeContext.cpp",
"ExchangeContext.h",
"ExchangeDelegate.h",
"ExchangeMessageDispatch.cpp",
"ExchangeMessageDispatch.h",
"ExchangeMgr.cpp",
"ExchangeMgr.h",
Expand Down
112 changes: 93 additions & 19 deletions src/messaging/ExchangeContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,6 @@ CHIP_ERROR ExchangeContext::SendMessage(Protocols::Id protocolId, uint8_t msgTyp
// an error arising below. at the end, we have to close it.
ExchangeHandle ref(*this);

// If session requires MRP, NoAutoRequestAck send flag is not specified and is not a group exchange context, request reliable
// transmission.
bool reliableTransmissionRequested =
GetSessionHandle()->RequireMRP() && !sendFlags.Has(SendMessageFlags::kNoAutoRequestAck) && !IsGroupExchangeContext();

// If a response message is expected...
if (sendFlags.Has(SendMessageFlags::kExpectResponse) && !IsGroupExchangeContext())
{
Expand Down Expand Up @@ -184,9 +179,78 @@ CHIP_ERROR ExchangeContext::SendMessage(Protocols::Id protocolId, uint8_t msgTyp
}

// Create a new scope for `err`, to avoid shadowing warning previous `err`.
CHIP_ERROR err = mDispatch.SendMessage(GetExchangeMgr()->GetSessionManager(), mSession.Get(), mExchangeId, IsInitiator(),
GetReliableMessageContext(), reliableTransmissionRequested, protocolId, msgType,
std::move(msgBuf));
CHIP_ERROR err = ([&] {
VerifyOrReturnError(mDispatch.MessagePermitted(protocolId.GetProtocolId(), msgType), CHIP_ERROR_INVALID_ARGUMENT);

PayloadHeader payloadHeader;
payloadHeader.SetExchangeID(mExchangeId).SetMessageType(protocolId, msgType).SetInitiator(IsInitiator());

ReliableMessageContext * reliableMessageContext = GetReliableMessageContext();

// If there is a pending acknowledgment piggyback it on this message.
if (reliableMessageContext->HasPiggybackAckPending())
{
payloadHeader.SetAckMessageCounter(reliableMessageContext->TakePendingPeerAckMessageCounter());

#if !defined(NDEBUG)
if (!payloadHeader.HasMessageType(Protocols::SecureChannel::MsgType::StandaloneAck))
{
ChipLogDetail(ExchangeManager,
"Piggybacking Ack for MessageCounter:" ChipLogFormatMessageCounter
" on exchange: " ChipLogFormatExchangeId,
payloadHeader.GetAckMessageCounter().Value(), ChipLogValueExchangeId(mExchangeId, IsInitiator()));
}
#endif
}

SessionManager * sessionManager = GetExchangeMgr()->GetSessionManager();
SessionHandle session = GetSessionHandle();

// If session requires MRP, NoAutoRequestAck send flag is not specified, request reliable transmission.
if (mSession->RequireMRP() && !sendFlags.Has(SendMessageFlags::kNoAutoRequestAck) && reliableMessageContext->AutoRequestAck())
{
auto * reliableMessageMgr = reliableMessageContext->GetReliableMessageMgr();

payloadHeader.SetNeedsAck(true);

ReliableMessageMgr::RetransTableEntry * entry = nullptr;

// Add to Table for subsequent sending
ReturnErrorOnFailure(reliableMessageMgr->AddToRetransTable(reliableMessageContext, &entry));
auto deleter = [reliableMessageMgr](ReliableMessageMgr::RetransTableEntry * e) {
reliableMessageMgr->ClearRetransTable(*e);
};
std::unique_ptr<ReliableMessageMgr::RetransTableEntry, decltype(deleter)> entryOwner(entry, deleter);

ReturnErrorOnFailure(sessionManager->PrepareMessage(session, payloadHeader, std::move(msgBuf), entryOwner->retainedBuf));
CHIP_ERROR err2 = sessionManager->SendPreparedMessage(session, entryOwner->retainedBuf);
if (err2 == CHIP_ERROR_POSIX(ENOBUFS))
{
// sendmsg on BSD-based systems never blocks, no matter how the
// socket is configured, and will return ENOBUFS in situation in
// which Linux, for example, blocks.
//
// This is typically a transient situation, so we pretend like this
// packet drop happened somewhere on the network instead of inside
// sendmsg and will just resend it in the normal MRP way later.
ChipLogError(ExchangeManager, "Ignoring ENOBUFS: %" CHIP_ERROR_FORMAT " on exchange " ChipLogFormatExchangeId,
err2.Format(), ChipLogValueExchangeId(mExchangeId, IsInitiator()));
err2 = CHIP_NO_ERROR;
}
ReturnErrorOnFailure(err2);
reliableMessageMgr->StartRetransmision(entryOwner.release());
}
else
{
// If the channel itself is providing reliability, let's not request MRP acks
payloadHeader.SetNeedsAck(false);
EncryptedPacketBufferHandle preparedMessage;
ReturnErrorOnFailure(sessionManager->PrepareMessage(session, payloadHeader, std::move(msgBuf), preparedMessage));
ReturnErrorOnFailure(sessionManager->SendPreparedMessage(session, preparedMessage));
}

return CHIP_NO_ERROR;
})();
if (err != CHIP_NO_ERROR && IsResponseExpected())
{
CancelResponseTimer();
Expand Down Expand Up @@ -272,10 +336,10 @@ void ExchangeContextDeletor::Release(ExchangeContext * ec)

ExchangeContext::ExchangeContext(ExchangeManager * em, uint16_t ExchangeId, const SessionHandle & session, bool Initiator,
ExchangeDelegate * delegate) :
mDispatch((delegate != nullptr) ? delegate->GetMessageDispatch() : ApplicationExchangeDispatch::Instance()),
mSession(*this)
mDispatch(ExchangeManager::GetDispatchForDelegate(delegate)), mSession(*this)
{
VerifyOrDie(mExchangeMgr == nullptr);
VerifyOrDie(mDispatch.IsEncryptionRequired() == session->IsEncrypted());

mExchangeMgr = em;
mExchangeId = ExchangeId;
Expand All @@ -286,9 +350,7 @@ ExchangeContext::ExchangeContext(ExchangeManager * em, uint16_t ExchangeId, cons
SetDropAckDebug(false);
SetAckPending(false);
SetMsgRcvdFromPeer(false);

// Do not request Ack for multicast
SetAutoRequestAck(!session->IsGroupSession());
SetAutoRequestAck(true);

#if defined(CHIP_EXCHANGE_CONTEXT_DETAIL_LOGGING)
ChipLogDetail(ExchangeManager, "ec++ id: " ChipLogFormatExchange, ChipLogValueExchange(this));
Expand Down Expand Up @@ -331,10 +393,6 @@ bool ExchangeContext::MatchExchange(const SessionHandle & session, const PacketH
// AND The Session associated with the incoming message matches the Session associated with the exchange.
&& (mSession.Contains(session))

// TODO: This check should be already implied by the equality of session check,
// It should be removed after we have implemented the temporary node id for PASE and CASE sessions
&& (IsEncryptionRequired() == packetHeader.IsEncrypted())

// AND The message was sent by an initiator and the exchange context is a responder (IsInitiator==false)
// OR The message was sent by a responder and the exchange context is an initiator (IsInitiator==true) (for the broadcast
// case, the initiator is ill defined)
Expand Down Expand Up @@ -457,8 +515,24 @@ CHIP_ERROR ExchangeContext::HandleMessage(uint32_t messageCounter, const Payload
MessageHandled();
});

ReturnErrorOnFailure(
mDispatch.OnMessageReceived(messageCounter, payloadHeader, peerAddress, msgFlags, GetReliableMessageContext()));
VerifyOrReturnError(mDispatch.MessagePermitted(payloadHeader.GetProtocolID().GetProtocolId(), payloadHeader.GetMessageType()), CHIP_ERROR_INVALID_ARGUMENT);

if (mSession->RequireMRP())
{
ReliableMessageContext * reliableMessageContext = GetReliableMessageContext();

if (!msgFlags.Has(MessageFlagValues::kDuplicateMessage) && payloadHeader.IsAckMsg() &&
payloadHeader.GetAckMessageCounter().HasValue())
{
reliableMessageContext->HandleRcvdAck(payloadHeader.GetAckMessageCounter().Value());
}

if (payloadHeader.NeedsAck())
{
// An acknowledgment needs to be sent back to the peer for this message on this exchange,
ReturnErrorOnFailure(reliableMessageContext->HandleNeedsAck(messageCounter, msgFlags));
}
}

if (IsAckPending() && !mDelegate)
{
Expand Down
2 changes: 0 additions & 2 deletions src/messaging/ExchangeContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ class DLL_EXPORT ExchangeContext : public ReliableMessageContext,
*/
bool IsInitiator() const;

bool IsEncryptionRequired() const { return mDispatch.IsEncryptionRequired(); }

bool IsGroupExchangeContext() const { return mSession && mSession->IsGroupSession(); }

// Implement SessionReleaseDelegate
Expand Down
143 changes: 0 additions & 143 deletions src/messaging/ExchangeMessageDispatch.cpp

This file was deleted.

14 changes: 0 additions & 14 deletions src/messaging/ExchangeMessageDispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,9 @@ class ReliableMessageContext;
class ExchangeMessageDispatch
{
public:
ExchangeMessageDispatch() {}
virtual ~ExchangeMessageDispatch() {}

virtual bool IsEncryptionRequired() const { return true; }

CHIP_ERROR SendMessage(SessionManager * sessionManager, const SessionHandle & session, uint16_t exchangeId, bool isInitiator,
ReliableMessageContext * reliableMessageContext, bool isReliableTransmission, Protocols::Id protocol,
uint8_t type, System::PacketBufferHandle && message);
CHIP_ERROR OnMessageReceived(uint32_t messageCounter, const PayloadHeader & payloadHeader,
const Transport::PeerAddress & peerAddress, MessageFlags msgFlags,
ReliableMessageContext * reliableMessageContext);

protected:
virtual bool MessagePermitted(uint16_t protocol, uint8_t type) = 0;

// TODO: remove IsReliableTransmissionAllowed, this function should be provided over session.
virtual bool IsReliableTransmissionAllowed() const { return true; }
};

} // namespace Messaging
Expand Down
13 changes: 6 additions & 7 deletions src/messaging/ExchangeMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ void ExchangeManager::OnMessageReceived(const PacketHeader & packetHeader, const
}
}

if (GetDispatchForDelegate(delegate).IsEncryptionRequired() != session->IsEncrypted())
{
ChipLogError(ExchangeManager, "OnMessageReceived failed, err = %s", ErrorStr(CHIP_ERROR_NO_UNSOLICITED_MESSAGE_HANDLER));
return;
}

// If rcvd msg is from initiator then this exchange is created as not Initiator.
// If rcvd msg is not from initiator then this exchange is created as Initiator.
// Note that if matchingUMH is not null then rcvd msg if from initiator.
Expand All @@ -310,13 +316,6 @@ void ExchangeManager::OnMessageReceived(const PacketHeader & packetHeader, const
ChipLogDetail(ExchangeManager, "Handling via exchange: " ChipLogFormatExchange ", Delegate: %p", ChipLogValueExchange(ec),
ec->GetDelegate());

if (ec->IsEncryptionRequired() != packetHeader.IsEncrypted())
{
ChipLogError(ExchangeManager, "OnMessageReceived failed, err = %s", ErrorStr(CHIP_ERROR_INVALID_MESSAGE_TYPE));
ec->Close();
return;
}

CHIP_ERROR err = ec->HandleMessage(packetHeader.GetMessageCounter(), payloadHeader, source, msgFlags, std::move(msgBuf));
if (err != CHIP_NO_ERROR)
{
Expand Down
5 changes: 5 additions & 0 deletions src/messaging/ExchangeMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ class DLL_EXPORT ExchangeManager : public SessionMessageDelegate

size_t GetNumActiveExchanges() { return mContextPool.Allocated(); }

static ExchangeMessageDispatch & GetDispatchForDelegate(ExchangeDelegate * delegate)
{
return (delegate != nullptr) ? delegate->GetMessageDispatch() : ApplicationExchangeDispatch::Instance();
}

private:
enum class State
{
Expand Down
Loading

0 comments on commit 1efd09b

Please sign in to comment.