Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Minimum Outbound-Only Peers Requirement #2356

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 325 additions & 9 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const PEER_EXCESS_FACTOR: f32 = 0.1;
/// them in lighthouse.
const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1;

/// A fraction of `PeerManager::target_peers` that need to be outbound-only connections.
const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.1;

/// The main struct that handles peer's reputation and connection status.
pub struct PeerManager<TSpec: EthSpec> {
/// Storage of network globals to access the `PeerDB`.
Expand Down Expand Up @@ -843,7 +846,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// NOTE: This is experimental and will likely be adjusted
fn update_peer_scores(&mut self) {
/* Check how long have peers been in this state and update their reputations if needed */

let mut to_ban_peers = Vec::new();
let mut to_unban_peers = Vec::new();

Expand Down Expand Up @@ -923,7 +925,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// NOTE: Discovery will only add a new query if one isn't already queued.
fn heartbeat(&mut self) {
let peer_count = self.network_globals.connected_or_dialing_peers();
if peer_count < self.target_peers {
let outbound_only_peer_count = self.network_globals.connected_outbound_only_peers();
let min_outbound_only_target =
(self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize;

if peer_count < self.target_peers || outbound_only_peer_count < min_outbound_only_target {
// If we need more peers, queue a discovery lookup.
if self.discovery.started {
debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers);
Expand All @@ -939,20 +945,34 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {

let connected_peer_count = self.network_globals.connected_peers();
if connected_peer_count > self.target_peers {
//remove excess peers with the worst scores, but keep subnet peers
for (peer_id, _) in self
// Remove excess peers with the worst scores, but keep subnet peers.
kevlu93 marked this conversation as resolved.
Show resolved Hide resolved
// Must also ensure that the outbound-only peer count does not go below the minimum threshold.
let mut peers_removed = 0;
let mut n_outbound_removed = 0;
for (peer_id, info) in self
.network_globals
.peers
.read()
.worst_connected_peers()
.iter()
.filter(|(_, info)| !info.has_future_duty())
.take(connected_peer_count - self.target_peers)
//we only need to disconnect peers with healthy scores, since the others got already
//disconnected in update_peer_scores
.filter(|(_, info)| info.score_state() == ScoreState::Healthy)
{
disconnecting_peers.push(**peer_id);
if peers_removed == connected_peer_count - self.target_peers {
break;
}
if info.is_outbound_only() {
if min_outbound_only_target < outbound_only_peer_count - n_outbound_removed {
n_outbound_removed += 1;
kevlu93 marked this conversation as resolved.
Show resolved Hide resolved
} else {
continue;
}
}
peers_removed += 1;
// Only add healthy peers to disconnect,
// update_peer_scores would have already disconnected unhealthy peers.
if info.score_state() == ScoreState::Healthy {
disconnecting_peers.push(**peer_id);
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down Expand Up @@ -1053,3 +1073,299 @@ enum ConnectingType {
multiaddr: Multiaddr,
},
}

#[cfg(test)]
mod tests {
use super::*;
use crate::discovery::enr::build_enr;
use crate::discovery::enr_ext::CombinedKeyExt;
use crate::rpc::methods::MetaData;
use crate::Enr;
use discv5::enr::CombinedKey;
use slog::{o, Drain};
use std::net::UdpSocket;
use types::{EnrForkId, MinimalEthSpec};

type E = MinimalEthSpec;

pub fn unused_port() -> u16 {
let socket = UdpSocket::bind("127.0.0.1:0").expect("should create udp socket");
let local_addr = socket.local_addr().expect("should read udp socket");
local_addr.port()
}

pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();

if enabled {
slog::Logger::root(drain.filter_level(level).fuse(), o!())
} else {
slog::Logger::root(drain.filter(|_| false).fuse(), o!())
}
}

async fn build_peer_manager(target: usize) -> PeerManager<E> {
let keypair = libp2p::identity::Keypair::generate_secp256k1();
let config = NetworkConfig {
discovery_port: unused_port(),
target_peers: target,
..Default::default()
};
let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap();
let enr: Enr = build_enr::<E>(&enr_key, &config, EnrForkId::default()).unwrap();
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new(
enr,
9000,
9000,
MetaData {
seq_number: 0,
attnets: Default::default(),
},
vec![],
&log,
);
PeerManager::new(&keypair, &config, Arc::new(globals), &log)
.await
.unwrap()
}

#[tokio::test]
async fn test_peer_manager_disconnects_correctly_during_heartbeat() {
let mut peer_manager = build_peer_manager(3).await;

// Create 5 peers to connect to.
// 2 will be outbound-only, and have the lowest score.
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let outbound_only_peer1 = PeerId::random();
let outbound_only_peer2 = PeerId::random();

peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer2, "/ip4/0.0.0.0".parse().unwrap());

// Set the outbound-only peers to have the lowest score.
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&outbound_only_peer1)
.unwrap()
.add_to_score(-1.0);

peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&outbound_only_peer2)
.unwrap()
.add_to_score(-2.0);

// Check initial connected peers.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5);

peer_manager.heartbeat();

// Check that we disconnected from two peers.
// Check that one outbound-only peer was removed because it had the worst score
// and that we did not disconnect the other outbound peer due to the minimum outbound quota.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);
assert!(peer_manager
.network_globals
.peers
.read()
.is_connected(&outbound_only_peer1));
assert!(!peer_manager
.network_globals
.peers
.read()
.is_connected(&outbound_only_peer2));

peer_manager.heartbeat();

// Check that if we are at target number of peers who are all healthy, we do not disconnect any.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);
}

#[tokio::test]
async fn test_peer_manager_not_enough_outbound_peers_no_panic_during_heartbeat() {
let mut peer_manager = build_peer_manager(20).await;

// Connect to 20 ingoing-only peers.
for _i in 0..19 {
let peer = PeerId::random();
peer_manager.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap());
}

// Connect an outbound-only peer.
// Give it the lowest score so that it is evaluated first in the disconnect list iterator.
let outbound_only_peer = PeerId::random();
peer_manager.connect_ingoing(&outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap());
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer))
.unwrap()
.add_to_score(-1.0);
// After heartbeat, we will have removed one peer.
// Having less outbound-only peers than minimum won't cause panic when the outbound-only peer is being considered for disconnection.
peer_manager.heartbeat();
assert_eq!(
peer_manager.network_globals.connected_or_dialing_peers(),
20
);
}

#[tokio::test]
async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat() {
let mut peer_manager = build_peer_manager(3).await;

// Create 3 peers to connect to.
// One pair will be unhealthy inbound only and outbound only peers.
let peer0 = PeerId::random();
let inbound_only_peer1 = PeerId::random();
let outbound_only_peer1 = PeerId::random();

peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());

// Connect to two peers that are on the threshold of being disconnected.
peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
// Update the gossipsub scores to induce connection downgrade
// during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20, this will then trigger a disconnection.
// If we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected,
// then handle_state_transitions will not change the connection status to disconnected because the score state has not changed.
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);

peer_manager.heartbeat();

// Checks that update_peer_score peer disconnection and the disconnecting_peers logic
// work together to remove the correct number of peers.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1);
}

#[tokio::test]
async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() {
let mut peer_manager = build_peer_manager(3).await;

// Create 4 peers to connect to.
// One pair will be unhealthy inbound only and outbound only peers.
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let inbound_only_peer1 = PeerId::random();
let outbound_only_peer1 = PeerId::random();

peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap());

// Connect to two peers that are on the threshold of being disconnected.
peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);
peer_manager.heartbeat();
// Tests that when we are over the target peer limit, after disconnecting two unhealthy peers,
// the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target).
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 2);
}

#[tokio::test]
async fn test_peer_manager_removes_enough_peers_when_one_is_unhealthy() {
let mut peer_manager = build_peer_manager(3).await;

// Create 5 peers to connect to.
// One will be unhealthy inbound only and outbound only peers.
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let inbound_only_peer1 = PeerId::random();
let outbound_only_peer1 = PeerId::random();

peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
// Have one peer be on the verge of disconnection.
peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);

peer_manager.heartbeat();
// Tests that when we are over the target peer limit, even though one peer was already disconnected for being unhealthy,
// the loop to check for disconnecting peers will still remove the correct amount of peers.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);
}
}
10 changes: 10 additions & 0 deletions beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ impl<T: EthSpec> PeerInfo<T> {
matches!(self.connection_status, Disconnected { .. })
}

/// Checks if the peer is outbound-only
pub fn is_outbound_only(&self) -> bool {
matches!(self.connection_status, Connected {n_in, n_out} if n_in == 0 && n_out > 0)
}

/// Returns the number of connections with this peer.
pub fn connections(&self) -> (u8, u8) {
match self.connection_status {
Expand Down Expand Up @@ -306,6 +311,11 @@ impl<T: EthSpec> PeerInfo<T> {
self.score.test_add(score)
}
}

#[cfg(test)]
pub fn set_gossipsub_score(&mut self, score: f64) {
self.score.set_gossipsub_score(score);
}
}

#[derive(Clone, Debug, Serialize)]
Expand Down
Loading