Skip to content

Commit

Permalink
Merge pull request stellar#4120 from marta-lokhova/drop_old_peers_pos…
Browse files Browse the repository at this point in the history
…t_upgrade

Drop old peers post v20 upgrade

Reviewed-by: MonsieurNicolas
  • Loading branch information
latobarita authored Jan 5, 2024
2 parents a15c993 + de714e6 commit d60d3c3
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 12 deletions.
15 changes: 15 additions & 0 deletions src/herder/HerderUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "herder/HerderUtils.h"
#include "scp/Slot.h"
#include "util/ProtocolVersion.h"
#include "xdr/Stellar-ledger.h"
#include <algorithm>
#include <xdrpp/marshal.h>
Expand Down Expand Up @@ -40,4 +41,18 @@ getStellarValues(SCPStatement const& statement)

return result;
}

bool
shouldDropPeerPredicate(Peer::pointer peer, uint32_t protocolVersion)
{
bool upgraded =
protocolVersionStartsFrom(protocolVersion, SOROBAN_PROTOCOL_VERSION);
auto ovVersion = peer->getRemoteOverlayVersion();
if (upgraded && ovVersion &&
ovVersion.value() < Peer::FIRST_VERSION_REQUIRED_FOR_PROTOCOL_20)
{
return true;
}
return false;
}
}
2 changes: 2 additions & 0 deletions src/herder/HerderUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "overlay/Peer.h"
#include "xdr/Stellar-types.h"
#include <vector>

Expand All @@ -16,4 +17,5 @@ struct StellarValue;

std::vector<Hash> getTxSetHashes(SCPEnvelope const& envelope);
std::vector<StellarValue> getStellarValues(SCPStatement const& envelope);
bool shouldDropPeerPredicate(Peer::pointer peer, uint32_t protocolVersion);
}
7 changes: 5 additions & 2 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "database/Database.h"
#include "herder/Herder.h"
#include "herder/HerderPersistence.h"
#include "herder/HerderUtils.h"
#include "herder/LedgerCloseData.h"
#include "herder/TxSetFrame.h"
#include "herder/Upgrades.h"
Expand Down Expand Up @@ -885,10 +886,12 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData)
CLOG_ERROR(Ledger, "Unknown exception during upgrade");
}
}
if (protocolVersionStartsFrom(ltx.loadHeader().current().ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
auto maybeNewVersion = ltx.loadHeader().current().ledgerVersion;
if (protocolVersionStartsFrom(maybeNewVersion, SOROBAN_PROTOCOL_VERSION))
{
updateNetworkConfig(ltx);
mApp.getOverlayManager().dropPeersIf(
shouldDropPeerPredicate, maybeNewVersion, "version too old");
}

ledgerClosed(ltx, ledgerCloseMeta, initialLedgerVers);
Expand Down
8 changes: 7 additions & 1 deletion src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ class OverlayManager

// Return a list of random peers from the set of authenticated peers.
virtual std::vector<Peer::pointer> getRandomAuthenticatedPeers() = 0;
virtual std::vector<Peer::pointer>
getAuthenticatedPeers(bool randomize) = 0;

// Return a list of random peers from the set of inbound authenticated
// peers.
Expand Down Expand Up @@ -204,9 +206,13 @@ class OverlayManager

virtual size_t getMaxAdvertSize() const = 0;

virtual AdjustedFlowControlConfig getFlowControlBytesConfig() const = 0;

virtual void
dropPeersIf(std::function<bool(Peer::pointer, uint32_t)> predicate,
uint32_t version, std::string const& reason) = 0;
virtual ~OverlayManager()
{
}
virtual AdjustedFlowControlConfig getFlowControlBytesConfig() const = 0;
};
}
42 changes: 41 additions & 1 deletion src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,37 @@ OverlayManagerImpl::getFlowControlBytesConfig() const
cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES};
}

void
OverlayManagerImpl::dropPeersIf(
std::function<bool(Peer::pointer, uint32_t)> predicate, uint32_t version,
std::string const& reason)
{
auto maybeDrop = [](auto peers,
std::function<bool(Peer::pointer, uint32_t)> predicate,
uint32_t version, std::string const& reason) {
std::vector<Peer::pointer> peersToDrop;
for (auto it = peers.begin(); it != peers.end(); ++it)
{
if (predicate(*it, version))
{
peersToDrop.emplace_back(*it);
}
}

for (auto& peer : peersToDrop)
{
// Drop will cleanup peer lists and remove peer references from
// overlay manager
peer->drop(reason, Peer::DropDirection::WE_DROPPED_REMOTE,
Peer::DropMode::IGNORE_WRITE_QUEUE);
}
};

maybeDrop(getPendingPeers(), predicate, version, reason);
maybeDrop(getAuthenticatedPeers(/* randomize */ false), predicate, version,
reason);
}

void
OverlayManagerImpl::connectTo(PeerBareAddress const& address)
{
Expand Down Expand Up @@ -1047,13 +1078,22 @@ OverlayManagerImpl::isFloodMessage(StellarMessage const& msg)
}
std::vector<Peer::pointer>
OverlayManagerImpl::getRandomAuthenticatedPeers()
{
return getAuthenticatedPeers(true);
}

std::vector<Peer::pointer>
OverlayManagerImpl::getAuthenticatedPeers(bool randomize)
{
std::vector<Peer::pointer> result;
result.reserve(mInboundPeers.mAuthenticated.size() +
mOutboundPeers.mAuthenticated.size());
extractPeersFromMap(mInboundPeers.mAuthenticated, result);
extractPeersFromMap(mOutboundPeers.mAuthenticated, result);
shufflePeerList(result);
if (randomize)
{
shufflePeerList(result);
}
return result;
}

Expand Down
5 changes: 5 additions & 0 deletions src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ class OverlayManagerImpl : public OverlayManager
Peer::pointer getConnectedPeer(PeerBareAddress const& address) override;

std::vector<Peer::pointer> getRandomAuthenticatedPeers() override;
std::vector<Peer::pointer> getAuthenticatedPeers(bool randomize) override;

std::vector<Peer::pointer> getRandomInboundAuthenticatedPeers() override;
std::vector<Peer::pointer> getRandomOutboundAuthenticatedPeers() override;

Expand Down Expand Up @@ -236,6 +238,9 @@ class OverlayManagerImpl : public OverlayManager
void shufflePeerList(std::vector<Peer::pointer>& peerList);
AdjustedFlowControlConfig getFlowControlBytesConfig() const override;

void dropPeersIf(std::function<bool(Peer::pointer, uint32_t)> predicate,
uint32_t version, std::string const& reason) override;

// Returns `true` iff the overlay can accept the outbound peer at `address`.
// Logs whenever a peer cannot be accepted.
bool canAcceptOutboundPeer(PeerBareAddress const& address) const;
Expand Down
22 changes: 16 additions & 6 deletions src/overlay/Peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "util/ProtocolVersion.h"
#include "util/XDROperators.h"

#include "herder/HerderUtils.h"
#include "medida/meter.h"
#include "medida/metrics_registry.h"
#include "medida/timer.h"
Expand Down Expand Up @@ -58,7 +59,7 @@ Peer::Peer(Application& app, PeerRole role)
, mState(role == WE_CALLED_REMOTE ? CONNECTING : CONNECTED)
, mFlowControl(std::make_shared<FlowControl>(mApp))
, mRemoteOverlayMinVersion(0)
, mRemoteOverlayVersion(0)
, mRemoteOverlayVersion(std::nullopt)
, mCreationTime(app.getClock().now())
, mRecurringTimer(app)
, mDelayedExecutionTimer(app)
Expand Down Expand Up @@ -299,7 +300,10 @@ Peer::getJsonInfo(bool compact) const
res["elapsed"] = (int)getLifeTime().count();
res["latency"] = (int)getPing().count();
res["ver"] = getRemoteVersion();
res["olver"] = (int)getRemoteOverlayVersion();
if (getRemoteOverlayVersion())
{
res["olver"] = getRemoteOverlayVersion().value();
}
if (mFlowControl)
{
res["flow_control"] = mFlowControl->getFlowControlJsonInfo(compact);
Expand Down Expand Up @@ -1431,12 +1435,18 @@ Peer::recvHello(Hello const& elo)
dropMode = Peer::DropMode::FLUSH_WRITE_QUEUE;
}

if (mRemoteOverlayMinVersion > mRemoteOverlayVersion ||
mRemoteOverlayVersion < mApp.getConfig().OVERLAY_PROTOCOL_MIN_VERSION ||
mRemoteOverlayMinVersion > mApp.getConfig().OVERLAY_PROTOCOL_VERSION)
bool rejectBasedOnLedgerVersion = shouldDropPeerPredicate(
shared_from_this(), mApp.getLedgerManager()
.getLastClosedLedgerHeader()
.header.ledgerVersion);
if (mRemoteOverlayMinVersion > mRemoteOverlayVersion.value() ||
mRemoteOverlayVersion.value() <
mApp.getConfig().OVERLAY_PROTOCOL_MIN_VERSION ||
mRemoteOverlayMinVersion > mApp.getConfig().OVERLAY_PROTOCOL_VERSION ||
rejectBasedOnLedgerVersion)
{
CLOG_DEBUG(Overlay, "Protocol = [{},{}] expected: [{},{}]",
mRemoteOverlayMinVersion, mRemoteOverlayVersion,
mRemoteOverlayMinVersion, mRemoteOverlayVersion.value(),
mApp.getConfig().OVERLAY_PROTOCOL_MIN_VERSION,
mApp.getConfig().OVERLAY_PROTOCOL_VERSION);
sendErrorAndDrop(ERR_CONF, "wrong protocol version", dropMode);
Expand Down
5 changes: 3 additions & 2 deletions src/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Peer : public std::enable_shared_from_this<Peer>,
std::chrono::seconds(1);
static constexpr uint32_t FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES =
28;
static constexpr uint32_t FIRST_VERSION_REQUIRED_FOR_PROTOCOL_20 = 31;

// The reporting will be based on the previous
// PEER_METRICS_WINDOW_SIZE-second time window.
Expand Down Expand Up @@ -212,7 +213,7 @@ class Peer : public std::enable_shared_from_this<Peer>,

std::string mRemoteVersion;
uint32_t mRemoteOverlayMinVersion;
uint32_t mRemoteOverlayVersion;
std::optional<uint32_t> mRemoteOverlayVersion;
PeerBareAddress mAddress;

VirtualClock::time_point mCreationTime;
Expand Down Expand Up @@ -375,7 +376,7 @@ class Peer : public std::enable_shared_from_this<Peer>,
return mRemoteOverlayMinVersion;
}

uint32_t
std::optional<uint32_t>
getRemoteOverlayVersion() const
{
return mRemoteOverlayVersion;
Expand Down
73 changes: 73 additions & 0 deletions src/overlay/test/OverlayTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,56 @@ TEST_CASE("loopback peer send auth before hello", "[overlay][connections]")
testutil::shutdownWorkScheduler(*app1);
}

TEST_CASE("overlay version peer drop post-protocol-20")
{
auto cfg1 = getTestConfig(0);
auto cfg2 = getTestConfig(1);

cfg1.OVERLAY_PROTOCOL_VERSION =
Peer::FIRST_VERSION_REQUIRED_FOR_PROTOCOL_20 - 1;
cfg1.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION) - 1;
cfg2.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION) - 1;

VirtualClock clock;
auto app1 = createTestApplication(clock, cfg1);
auto app2 = createTestApplication(clock, cfg2);

auto result = LedgerUpgrade{LEDGER_UPGRADE_VERSION};
result.newLedgerVersion() = static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION);
LoopbackPeerConnection conn(*app1, *app2);

std::string expectedDropReason = "version too old";
SECTION("fully authenticated")
{
testutil::crankSome(clock);
REQUIRE(conn.getInitiator()->isAuthenticated());
REQUIRE(conn.getAcceptor()->isAuthenticated());
}
SECTION("pending peers")
{
REQUIRE(!conn.getInitiator()->isAuthenticated());
REQUIRE(!conn.getAcceptor()->isAuthenticated());
expectedDropReason = "wrong protocol version";
}

txtest::executeUpgrade(*app1, result);
txtest::executeUpgrade(*app2, result);

testutil::crankSome(clock);
REQUIRE(!conn.getInitiator()->isConnected());
REQUIRE(!conn.getAcceptor()->isConnected());
REQUIRE(conn.getAcceptor()->getDropReason() == expectedDropReason);

// Post-upgrade, try to re-connect and get rejected due to old protocol
LoopbackPeerConnection conn2(*app1, *app2);
testutil::crankSome(clock);
REQUIRE(!conn2.getInitiator()->isConnected());
REQUIRE(!conn2.getAcceptor()->isConnected());
REQUIRE(conn2.getAcceptor()->getDropReason() == "wrong protocol version");
}

TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]")
{
VirtualClock clock;
Expand Down Expand Up @@ -283,6 +333,14 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]")
{
cfg1.OVERLAY_PROTOCOL_VERSION =
Peer::FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES;
cfg2.OVERLAY_PROTOCOL_VERSION =
Peer::FIRST_VERSION_REQUIRED_FOR_PROTOCOL_20 - 1;
// Mixed versions do not work in v20, as older clients are banned
cfg1.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION) - 1;
cfg2.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION) - 1;

cfg1.PEER_FLOOD_READING_CAPACITY_BYTES =
2 * getTxSize(tx1) + Herder::FLOW_CONTROL_BYTES_EXTRA_BUFFER;
cfg1.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = getTxSize(tx1);
Expand All @@ -298,6 +356,11 @@ TEST_CASE("flow control byte capacity", "[overlay][flowcontrol]")
Peer::FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES;
cfg2.OVERLAY_PROTOCOL_VERSION =
Peer::FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES;
// Mixed versions do not work in v20, as older clients are banned
cfg1.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION) - 1;
cfg2.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION) - 1;
cfg1.PEER_FLOOD_READING_CAPACITY_BYTES =
2 * getTxSize(tx1) + Herder::FLOW_CONTROL_BYTES_EXTRA_BUFFER;
cfg1.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES = getTxSize(tx1);
Expand Down Expand Up @@ -659,6 +722,10 @@ TEST_CASE("loopback peer flow control activation", "[overlay][flowcontrol]")
{
cfg1.OVERLAY_PROTOCOL_VERSION =
Peer::FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES;
cfg1.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION) - 1;
cfg2.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION) - 1;
runTest({cfg1, cfg2}, false);
}
SECTION("bad peer")
Expand Down Expand Up @@ -2290,6 +2357,12 @@ TEST_CASE("overlay flow control", "[overlay][flowcontrol]")
{
configs[2].OVERLAY_PROTOCOL_VERSION =
Peer::FIRST_VERSION_SUPPORTING_FLOW_CONTROL_IN_BYTES - 1;
for (auto& cfg : configs)
{
cfg.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
static_cast<uint32_t>(SOROBAN_PROTOCOL_VERSION) - 1;
}

setupSimulation();
}
SECTION("one peer disables")
Expand Down

0 comments on commit d60d3c3

Please sign in to comment.