Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Make peer evictions less aggressive (#14619)
Browse files Browse the repository at this point in the history
* Make peer evictions less aggressive

The original implementation of peer eviction prioritized aliveness over
connection stability which made the peer count unstable for some users.

As this may cause discomfort or infrastructure alerts if stability is
tracked, adjust the eviction to be less aggressive by only evicting
peers when the node has fully stalled. This causes the node to have some
peers who are inactive and won't send any block announcements.
These nodes are removed if the local node is able to receive at least
one block announcement from one of its peers as the inactivity of the
substream is detected when a notification is sent.

If the node won't send or receive any block annoucements for 30 seconds,
it's considered stalled and it will evict all peers,
causing `ProtocolController` to accept and establish connections from new
peers.

* Update client/network/sync/src/engine.rs

Co-authored-by: Dmitry Markin <[email protected]>

* Track last send and received notification simultaneously

---------

Co-authored-by: Dmitry Markin <[email protected]>
Co-authored-by: parity-processbot <>
  • Loading branch information
altonen and dmitry-markin authored Aug 15, 2023
1 parent 19971bd commit 5e76587
Showing 1 changed file with 20 additions and 43 deletions.
63 changes: 20 additions & 43 deletions client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,6 @@ pub struct Peer<B: BlockT> {
pub known_blocks: LruHashSet<B::Hash>,
/// Notification sink.
sink: NotificationsSink,
/// Instant when the last notification was sent to peer.
last_notification_sent: Instant,
/// Instant when the last notification was received from peer.
last_notification_received: Instant,
/// Is the peer inbound.
inbound: bool,
}
Expand Down Expand Up @@ -220,9 +216,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// All connected peers. Contains both full and light node peers.
peers: HashMap<PeerId, Peer<B>>,

/// Evicted peers
evicted: HashSet<PeerId>,

/// List of nodes for which we perform additional logging because they are important for the
/// user.
important_peers: HashSet<PeerId>,
Expand Down Expand Up @@ -263,6 +256,9 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Stored as an `Option<Instant>` so once the initial wait has passed, `SyncingEngine`
/// can reset the peer timers and continue with the normal eviction process.
syncing_started: Option<Instant>,

/// Instant when the last notification was sent or received.
last_notification_io: Instant,
}

impl<B: BlockT, Client> SyncingEngine<B, Client>
Expand Down Expand Up @@ -391,7 +387,6 @@ where
chain_sync,
network_service,
peers: HashMap::new(),
evicted: HashSet::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
block_announce_protocol_name,
num_connected: num_connected.clone(),
Expand All @@ -410,6 +405,7 @@ where
event_streams: Vec::new(),
tick_timeout: Delay::new(TICK_TIMEOUT),
syncing_started: None,
last_notification_io: Instant::now(),
metrics: if let Some(r) = metrics_registry {
match Metrics::register(r, is_major_syncing.clone()) {
Ok(metrics) => Some(metrics),
Expand Down Expand Up @@ -522,7 +518,6 @@ where
},
};
peer.known_blocks.insert(hash);
peer.last_notification_received = Instant::now();

if peer.info.roles.is_full() {
let is_best = match announce.state.unwrap_or(BlockState::Best) {
Expand Down Expand Up @@ -573,7 +568,7 @@ where
data: Some(data.clone()),
};

peer.last_notification_sent = Instant::now();
self.last_notification_io = Instant::now();
peer.sink.send_sync_notification(message.encode());
}
}
Expand Down Expand Up @@ -616,42 +611,26 @@ where
continue
}

// reset the peer activity timers so they don't expire right away after
// the initial wait is done.
for info in self.peers.values_mut() {
info.last_notification_received = Instant::now();
info.last_notification_sent = Instant::now();
}

self.syncing_started = None;
self.last_notification_io = Instant::now();
}

// go over all connected peers and check if any of them have been idle for a while. Idle
// in this case means that we haven't sent or received block announcements to/from this
// peer. If that is the case, because of #5685, it could be that the block announces
// substream is not actually open and and this peer is just wasting a slot and is should
// be replaced with some other node that is willing to send us block announcements.
for (id, peer) in self.peers.iter() {
// because of a delay between disconnecting a peer in `SyncingEngine` and getting
// the response back from `Protocol`, a peer might be reported and disconnect
// multiple times. To prevent this from happening (until the underlying issue is
// fixed), keep track of evicted peers and report and disconnect them only once.
if self.evicted.contains(id) {
continue
}
// if syncing hasn't sent or received any blocks within `INACTIVITY_EVICT_THRESHOLD`,
// it means the local node has stalled and is connected to peers who either don't
// consider it connected or are also all stalled. In order to unstall the node,
// disconnect all peers and allow `ProtocolController` to establish new connections.
if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD {
log::debug!(target: "sync", "syncing has halted due to inactivity, evicting all peers");

let last_received_late =
peer.last_notification_received.elapsed() > INACTIVITY_EVICT_THRESHOLD;
let last_sent_late =
peer.last_notification_sent.elapsed() > INACTIVITY_EVICT_THRESHOLD;

if last_received_late && last_sent_late {
log::debug!(target: "sync", "evict peer {id} since it has been idling for too long");
self.network_service.report_peer(*id, rep::INACTIVE_SUBSTREAM);
for peer in self.peers.keys() {
self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM);
self.network_service
.disconnect_peer(*id, self.block_announce_protocol_name.clone());
self.evicted.insert(*id);
.disconnect_peer(*peer, self.block_announce_protocol_name.clone());
}

// after all the peers have been evicted, start timer again to prevent evicting
// new peers that join after the old peer have been evicted
self.last_notification_io = Instant::now();
}
}

Expand Down Expand Up @@ -749,7 +728,6 @@ where
},
},
sc_network::SyncEvent::NotificationStreamClosed { remote } => {
self.evicted.remove(&remote);
if self.on_sync_peer_disconnected(remote).is_err() {
log::trace!(
target: "sync",
Expand All @@ -762,6 +740,7 @@ where
for message in messages {
if self.peers.contains_key(&remote) {
if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) {
self.last_notification_io = Instant::now();
self.push_block_announce_validation(remote, announce);

// Make sure that the newly added block announce validation future
Expand Down Expand Up @@ -928,8 +907,6 @@ where
NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
),
sink,
last_notification_sent: Instant::now(),
last_notification_received: Instant::now(),
inbound,
};

Expand Down

0 comments on commit 5e76587

Please sign in to comment.