diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index eeef06d0791..a111826028e 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -60,6 +60,9 @@ const PEER_EXCESS_FACTOR: f32 = 0.1; /// them in lighthouse. const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1; +/// A fraction of `PeerManager::target_peers` that need to be outbound-only connections. +const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.1; + /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. @@ -843,7 +846,6 @@ impl PeerManager { /// NOTE: This is experimental and will likely be adjusted fn update_peer_scores(&mut self) { /* Check how long have peers been in this state and update their reputations if needed */ - let mut to_ban_peers = Vec::new(); let mut to_unban_peers = Vec::new(); @@ -918,12 +920,16 @@ impl PeerManager { /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. /// /// It will request discovery queries if the peer count has not reached the desired number of - /// peers. + /// overall peers, as well as the desired number of outbound-only peers. /// /// NOTE: Discovery will only add a new query if one isn't already queued. fn heartbeat(&mut self) { let peer_count = self.network_globals.connected_or_dialing_peers(); - if peer_count < self.target_peers { + let mut outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); + let min_outbound_only_target = + (self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize; + + if peer_count < self.target_peers || outbound_only_peer_count < min_outbound_only_target { // If we need more peers, queue a discovery lookup. if self.discovery.started { debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers); @@ -939,19 +945,28 @@ impl PeerManager { let connected_peer_count = self.network_globals.connected_peers(); if connected_peer_count > self.target_peers { - //remove excess peers with the worst scores, but keep subnet peers - for (peer_id, _) in self + // Remove excess peers with the worst scores, but keep subnet peers. + // Must also ensure that the outbound-only peer count does not go below the minimum threshold. + outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); + let mut n_outbound_removed = 0; + for (peer_id, info) in self .network_globals .peers .read() .worst_connected_peers() .iter() .filter(|(_, info)| !info.has_future_duty()) - .take(connected_peer_count - self.target_peers) - //we only need to disconnect peers with healthy scores, since the others got already - //disconnected in update_peer_scores - .filter(|(_, info)| info.score_state() == ScoreState::Healthy) { + if disconnecting_peers.len() == connected_peer_count - self.target_peers { + break; + } + if info.is_outbound_only() { + if min_outbound_only_target < outbound_only_peer_count - n_outbound_removed { + n_outbound_removed += 1; + } else { + continue; + } + } disconnecting_peers.push(**peer_id); } } @@ -1053,3 +1068,296 @@ enum ConnectingType { multiaddr: Multiaddr, }, } + +#[cfg(test)] +mod tests { + use super::*; + use crate::discovery::enr::build_enr; + use crate::discovery::enr_ext::CombinedKeyExt; + use crate::rpc::methods::MetaData; + use crate::Enr; + use discv5::enr::CombinedKey; + use slog::{o, Drain}; + use std::net::UdpSocket; + use types::{EnrForkId, MinimalEthSpec}; + + type E = MinimalEthSpec; + + pub fn unused_port() -> u16 { + let socket = UdpSocket::bind("127.0.0.1:0").expect("should create udp socket"); + let local_addr = socket.local_addr().expect("should read udp socket"); + local_addr.port() + } + + pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + if enabled { + slog::Logger::root(drain.filter_level(level).fuse(), o!()) + } else { + slog::Logger::root(drain.filter(|_| false).fuse(), o!()) + } + } + + async fn build_peer_manager(target: usize) -> PeerManager { + let keypair = libp2p::identity::Keypair::generate_secp256k1(); + let config = NetworkConfig { + discovery_port: unused_port(), + target_peers: target, + ..Default::default() + }; + let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap(); + let enr: Enr = build_enr::(&enr_key, &config, EnrForkId::default()).unwrap(); + let log = build_log(slog::Level::Debug, false); + let globals = NetworkGlobals::new( + enr, + 9000, + 9000, + MetaData { + seq_number: 0, + attnets: Default::default(), + }, + vec![], + &log, + ); + PeerManager::new(&keypair, &config, Arc::new(globals), &log) + .await + .unwrap() + } + + #[tokio::test] + async fn test_peer_manager_disconnects_correctly_during_heartbeat() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 5 peers to connect to. + // 2 will be outbound-only, and have the lowest score. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + let outbound_only_peer2 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer2, "/ip4/0.0.0.0".parse().unwrap()); + + // Set the outbound-only peers to have the lowest score. + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&outbound_only_peer1) + .unwrap() + .add_to_score(-1.0); + + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&outbound_only_peer2) + .unwrap() + .add_to_score(-2.0); + + // Check initial connected peers. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5); + + peer_manager.heartbeat(); + + // Check that we disconnected from two peers. + // Check that one outbound-only peer was removed because it had the worst score + // and that we did not disconnect the other outbound peer due to the minimum outbound quota. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + assert!(peer_manager + .network_globals + .peers + .read() + .is_connected(&outbound_only_peer1)); + assert!(!peer_manager + .network_globals + .peers + .read() + .is_connected(&outbound_only_peer2)); + + peer_manager.heartbeat(); + + // Check that if we are at target number of peers, we do not disconnect any. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + } + + #[tokio::test] + async fn test_peer_manager_not_enough_outbound_peers_no_panic_during_heartbeat() { + let mut peer_manager = build_peer_manager(20).await; + + // Connect to 20 ingoing-only peers. + for _i in 0..19 { + let peer = PeerId::random(); + peer_manager.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap()); + } + + // Connect an outbound-only peer. + // Give it the lowest score so that it is evaluated first in the disconnect list iterator. + let outbound_only_peer = PeerId::random(); + peer_manager.connect_ingoing(&outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer)) + .unwrap() + .add_to_score(-1.0); + // After heartbeat, we will have removed one peer. + // Having less outbound-only peers than minimum won't cause panic when the outbound-only peer is being considered for disconnection. + peer_manager.heartbeat(); + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + 20 + ); + } + + #[tokio::test] + async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 3 peers to connect to. + let peer0 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + + // Connect to two peers that are on the threshold of being disconnected. + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + // Update the gossipsub scores to induce connection downgrade + // during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20, this will then trigger a disconnection. + // If we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected, + // then handle_state_transitions will not change the connection status to disconnected because the score state has not changed. + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + + peer_manager.heartbeat(); + + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1); + } + + #[tokio::test] + async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 4 peers to connect to. + // One pair will be unhealthy inbound only and outbound only peers. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + + // Connect to two peers that are on the threshold of being disconnected. + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(outbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + peer_manager.heartbeat(); + // Tests that when we are over the target peer limit, after disconnecting two unhealthy peers, + // the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target). + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 2); + } + + #[tokio::test] + async fn test_peer_manager_removes_enough_peers_when_one_is_unhealthy() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 5 peers to connect to. + // One will be unhealthy inbound only and outbound only peers. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let inbound_only_peer1 = PeerId::random(); + let outbound_only_peer1 = PeerId::random(); + + peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + // Have one peer be on the verge of disconnection. + peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap()); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .add_to_score(-19.9); + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&(inbound_only_peer1)) + .unwrap() + .set_gossipsub_score(-85.0); + + peer_manager.heartbeat(); + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + } +} diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 49546776a8f..43570c5aeee 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -182,6 +182,11 @@ impl PeerInfo { matches!(self.connection_status, Disconnected { .. }) } + /// Checks if the peer is outbound-only + pub fn is_outbound_only(&self) -> bool { + matches!(self.connection_status, Connected {n_in, n_out} if n_in == 0 && n_out > 0) + } + /// Returns the number of connections with this peer. pub fn connections(&self) -> (u8, u8) { match self.connection_status { @@ -306,6 +311,11 @@ impl PeerInfo { self.score.test_add(score) } } + + #[cfg(test)] + pub fn set_gossipsub_score(&mut self, score: f64) { + self.score.set_gossipsub_score(score); + } } #[derive(Clone, Debug, Serialize)] diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 6803f49ed11..d4a8cecf13a 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -221,6 +221,14 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Connected outbound-only peers + pub fn connected_outbound_only_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_outbound_only()) + .map(|(peer_id, _)| peer_id) + } + /// Gives the `peer_id` of all known connected and synced peers. pub fn synced_peers(&self) -> impl Iterator { self.peers @@ -677,6 +685,25 @@ mod tests { assert_eq!(peer_info.unwrap().connections(), (n_in, n_out)); } + #[test] + fn test_outbound_only_peers_counted_correctly() { + let mut pdb = get_db(); + let p0 = PeerId::random(); + let p1 = PeerId::random(); + let p2 = PeerId::random(); + // Create peer with no connections. + let _p3 = PeerId::random(); + + pdb.connect_ingoing(&p0, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_outgoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_outgoing(&p2, "/ip4/0.0.0.0".parse().unwrap(), None); + + // We should only have one outbound-only peer (p2). + // Peers that are inbound-only, have both types of connections, or no connections should not be counted. + assert_eq!(pdb.connected_outbound_only_peers().count(), 1); + } + #[test] fn test_disconnected_are_bounded() { let mut pdb = get_db(); diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index 38fecad3af9..02479bef067 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -216,6 +216,13 @@ impl RealScore { self.set_lighthouse_score(0f64); } + // Set the gossipsub_score to a specific f64. + // Used in testing to induce score status changes during a heartbeat. + #[cfg(test)] + pub fn set_gossipsub_score(&mut self, score: f64) { + self.gossipsub_score = score; + } + /// Applies time-based logic such as decay rates to the score. /// This function should be called periodically. pub fn update(&mut self) { @@ -291,6 +298,8 @@ apply!(update_gossipsub_score, new_score: f64, ignore: bool); apply!(test_add, score: f64); #[cfg(test)] apply!(test_reset); +#[cfg(test)] +apply!(set_gossipsub_score, score: f64); impl Score { pub fn score(&self) -> f64 { diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index 67abcf77242..4055e53b205 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -84,6 +84,11 @@ impl NetworkGlobals { self.peers.read().connected_peer_ids().count() } + /// Returns the number of libp2p connected peers with outbound-only connections. + pub fn connected_outbound_only_peers(&self) -> usize { + self.peers.read().connected_outbound_only_peers().count() + } + /// Returns the number of libp2p peers that are either connected or being dialed. pub fn connected_or_dialing_peers(&self) -> usize { self.peers.read().connected_or_dialing_peers().count()