Skip to content

Commit

Permalink
Only send messages to peers that can understand them
Browse files Browse the repository at this point in the history
  • Loading branch information
bboston7 committed Apr 22, 2024
1 parent cc5b1e0 commit a108620
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
LEDGER_PROTOCOL_MIN_VERSION_INTERNAL_ERROR_REPORT = 18;

OVERLAY_PROTOCOL_MIN_VERSION = 32;
OVERLAY_PROTOCOL_VERSION = 32;
OVERLAY_PROTOCOL_VERSION = 33;

VERSION_STR = STELLAR_CORE_VERSION;

Expand Down
14 changes: 13 additions & 1 deletion src/overlay/Floodgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer, Hash& index)

// send message to anyone you haven't gotten it from
bool
Floodgate::broadcast(StellarMessage const& msg, std::optional<Hash> const& hash)
Floodgate::broadcast(StellarMessage const& msg, std::optional<Hash> const& hash,
uint32_t minOverlayVersion)
{
ZoneScoped;
if (mShuttingDown)
Expand Down Expand Up @@ -123,6 +124,17 @@ Floodgate::broadcast(StellarMessage const& msg, std::optional<Hash> const& hash)
for (auto peer : peers)
{
releaseAssert(peer.second->isAuthenticated());

if (peer.second->getRemoteOverlayVersion().value_or(
mApp.getConfig().OVERLAY_PROTOCOL_MIN_VERSION) <
minOverlayVersion)
{
// Skip peers running overlay versions that are older than
// `minOverlayVersion`. Treat peers with unknown versions as running
// the minimum overlay protocol version.
continue;
}

bool pullMode = msg.type() == TRANSACTION;
bool hasAdvert = pullMode && peer.second->peerKnowsHash(hash.value());

Expand Down
5 changes: 4 additions & 1 deletion src/overlay/Floodgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ class Floodgate

// returns true if msg was sent to at least one peer
// The hash required for transactions
// `minOverlayVersion` is the minimum overlay version a peer must have in
// order to be sent the message.
bool broadcast(StellarMessage const& msg,
std::optional<Hash> const& hash = std::nullopt);
std::optional<Hash> const& hash = std::nullopt,
uint32_t minOverlayVersion = 0);

// returns the list of peers that sent us the item with hash `msgID`
// NB: `msgID` is the hash of a `StellarMessage`
Expand Down
8 changes: 5 additions & 3 deletions src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ class OverlayManager
// returns true if message was sent to at least one peer
// When passing a transaction message,
// the hash of TransactionEnvelope must be passed also for pull mode.
virtual bool
broadcastMessage(StellarMessage const& msg,
std::optional<Hash> const hash = std::nullopt) = 0;
// `minOverlayVersion` is the minimum overlay version a peer must have in
// order to be sent the message.
virtual bool broadcastMessage(StellarMessage const& msg,
std::optional<Hash> const hash = std::nullopt,
uint32_t minOverlayVersion = 0) = 0;

// Make a note in the FloodGate that a given peer has provided us with a
// given broadcast message, so that it is inhibited from being resent to
Expand Down
5 changes: 3 additions & 2 deletions src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1160,10 +1160,11 @@ OverlayManagerImpl::forgetFloodedMsg(Hash const& msgID)

bool
OverlayManagerImpl::broadcastMessage(StellarMessage const& msg,
std::optional<Hash> const hash)
std::optional<Hash> const hash,
uint32_t minOverlayVersion)
{
ZoneScoped;
auto res = mFloodGate.broadcast(msg, hash);
auto res = mFloodGate.broadcast(msg, hash, minOverlayVersion);
if (res)
{
mOverlayMetrics.mMessagesBroadcast.Mark();
Expand Down
6 changes: 3 additions & 3 deletions src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ class OverlayManagerImpl : public OverlayManager
bool recvFloodedMsgID(StellarMessage const& msg, Peer::pointer peer,
Hash& msgID) override;
void forgetFloodedMsg(Hash const& msgID) override;
bool
broadcastMessage(StellarMessage const& msg,
std::optional<Hash> const hash = std::nullopt) override;
bool broadcastMessage(StellarMessage const& msg,
std::optional<Hash> const hash = std::nullopt,
uint32_t minOverlayVersion = 0) override;
void connectTo(PeerBareAddress const& address) override;

void maybeAddInboundConnection(Peer::pointer peer) override;
Expand Down
43 changes: 30 additions & 13 deletions src/overlay/SurveyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace stellar

uint32_t const SurveyManager::SURVEY_THROTTLE_TIMEOUT_MULT(3);

uint32_t constexpr TIME_SLICED_SURVEY_MIN_OVERLAY_PROTOCOL_VERSION = 33;

namespace
{
// Generate JSON for a single peer
Expand Down Expand Up @@ -103,7 +105,7 @@ bool
SurveyManager::startSurveyReporting(SurveyMessageCommandType type,
std::chrono::seconds surveyDuration)
{
if (mRunningSurveyType)
if (mRunningSurveyReportingPhaseType)
{
return false;
}
Expand All @@ -118,7 +120,8 @@ SurveyManager::startSurveyReporting(SurveyMessageCommandType type,
mPeersToSurvey.clear();
mPeersToSurveyQueue = std::queue<NodeID>();

mRunningSurveyType = std::make_optional<SurveyMessageCommandType>(type);
mRunningSurveyReportingPhaseType =
std::make_optional<SurveyMessageCommandType>(type);

mCurve25519SecretKey = curve25519RandomSecret();
mCurve25519PublicKey = curve25519DerivePublic(mCurve25519SecretKey);
Expand All @@ -133,13 +136,13 @@ SurveyManager::startSurveyReporting(SurveyMessageCommandType type,
void
SurveyManager::stopSurveyReporting()
{
// do nothing if survey isn't running
if (!mRunningSurveyType)
// do nothing if survey isn't running in reporting phase
if (!mRunningSurveyReportingPhaseType)
{
return;
}

mRunningSurveyType.reset();
mRunningSurveyReportingPhaseType.reset();
mSurveyThrottleTimer->cancel();

clearCurve25519Keys(mCurve25519PublicKey, mCurve25519SecretKey);
Expand Down Expand Up @@ -298,7 +301,8 @@ SurveyManager::addNodeToRunningSurveyBacklog(
NodeID const& nodeToSurvey, std::optional<uint32_t> inboundPeersIndex,
std::optional<uint32_t> outboundPeersIndex)
{
if (!mRunningSurveyType || *mRunningSurveyType != type)
if (!mRunningSurveyReportingPhaseType ||
*mRunningSurveyReportingPhaseType != type)
{
throw std::runtime_error("addNodeToRunningSurveyBacklog failed");
}
Expand Down Expand Up @@ -409,7 +413,8 @@ SurveyManager::relayOrProcessResponse(StellarMessage const& msg,
{
// only process if survey is still running and we haven't seen the
// response
if (mRunningSurveyType && *mRunningSurveyType == response.commandType)
if (mRunningSurveyReportingPhaseType &&
*mRunningSurveyReportingPhaseType == response.commandType)
{
try
{
Expand Down Expand Up @@ -573,14 +578,14 @@ SurveyManager::makeOldStyleSurveyRequest(NodeID const& nodeToSurvey) const
void
SurveyManager::sendTopologyRequest(NodeID const& nodeToSurvey)
{
if (!mRunningSurveyType)
if (!mRunningSurveyReportingPhaseType)
{
throw std::runtime_error("Tried to send survey request when no survey "
"is running");
"is running in reporting phase");
}

StellarMessage newMsg;
switch (mRunningSurveyType.value())
switch (mRunningSurveyReportingPhaseType.value())
{
case SURVEY_TOPOLOGY:
newMsg = makeOldStyleSurveyRequest(nodeToSurvey);
Expand Down Expand Up @@ -801,7 +806,19 @@ SurveyManager::processTimeSlicedTopologyRequest(
void
SurveyManager::broadcast(StellarMessage const& msg) const
{
mApp.getOverlayManager().broadcastMessage(msg);
uint32_t minOverlayVersion = 0;
if (!mRunningSurveyReportingPhaseType.has_value() ||
*mRunningSurveyReportingPhaseType == TIME_SLICED_SURVEY_TOPOLOGY)
{
// If running a time sliced survey, only send messages to nodes that can
// understand them.
// NOTE: `mRunningSurveyReportingPhaseType == nullopt` also indicates a
// time sliced survey as only time sliced surveys send messages outside
// of the reporting phase (during the collecting phase).
minOverlayVersion = TIME_SLICED_SURVEY_MIN_OVERLAY_PROTOCOL_VERSION;
}
mApp.getOverlayManager().broadcastMessage(msg, /*hash*/ std::nullopt,
minOverlayVersion);
}

void
Expand Down Expand Up @@ -865,7 +882,7 @@ SurveyManager::clearOldLedgers(uint32_t lastClosedledgerSeq)
Json::Value const&
SurveyManager::getJsonResults()
{
mResults["surveyInProgress"] = mRunningSurveyType.has_value();
mResults["surveyInProgress"] = mRunningSurveyReportingPhaseType.has_value();

auto& jsonBacklog = mResults["backlog"];
jsonBacklog.clear();
Expand Down Expand Up @@ -939,7 +956,7 @@ SurveyManager::topOffRequests(SurveyMessageCommandType type)
// happen if some connections get congested)

uint32_t requestsSentInSchedule = 0;
while (mRunningSurveyType &&
while (mRunningSurveyReportingPhaseType &&
requestsSentInSchedule < MAX_REQUEST_LIMIT_PER_LEDGER &&
!mPeersToSurvey.empty())
{
Expand Down
4 changes: 3 additions & 1 deletion src/overlay/SurveyManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ class SurveyManager : public std::enable_shared_from_this<SurveyManager>,
uint32_t const NUM_LEDGERS_BEFORE_IGNORE;
uint32_t const MAX_REQUEST_LIMIT_PER_LEDGER;

std::optional<SurveyMessageCommandType> mRunningSurveyType;
// If a survey is in the reporting phase, this will be set to the type of
// the running survey
std::optional<SurveyMessageCommandType> mRunningSurveyReportingPhaseType;
Curve25519Secret mCurve25519SecretKey;
Curve25519Public mCurve25519PublicKey;
SurveyMessageLimiter mMessageLimiter;
Expand Down

0 comments on commit a108620

Please sign in to comment.