Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Minimum Outbound-Only Peers Requirement #2356

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 139 additions & 7 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const PEER_EXCESS_FACTOR: f32 = 0.1;
/// them in lighthouse.
const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1;

/// A fraction of `PeerManager::target_peers` that need to be outbound-only connections.
const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.1;

/// The main struct that handles peer's reputation and connection status.
pub struct PeerManager<TSpec: EthSpec> {
/// Storage of network globals to access the `PeerDB`.
Expand Down Expand Up @@ -923,7 +926,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// NOTE: Discovery will only add a new query if one isn't already queued.
fn heartbeat(&mut self) {
let peer_count = self.network_globals.connected_or_dialing_peers();
if peer_count < self.target_peers {
let outbound_only_peer_count = self.network_globals.connected_outbound_only_peers();
let min_outbound_only_target =
(self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize;

if peer_count < self.target_peers || outbound_only_peer_count < min_outbound_only_target {
// If we need more peers, queue a discovery lookup.
if self.discovery.started {
debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers);
Expand All @@ -940,19 +947,31 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let connected_peer_count = self.network_globals.connected_peers();
if connected_peer_count > self.target_peers {
//remove excess peers with the worst scores, but keep subnet peers
for (peer_id, _) in self
//must also ensure that the outbound-only peer count does not go below the minimum threshold
let mut peers_removed = 0;
let mut n_outbound_removed = 0;
for (peer_id, info) in self
.network_globals
.peers
.read()
.worst_connected_peers()
.iter()
.filter(|(_, info)| !info.has_future_duty())
.take(connected_peer_count - self.target_peers)
//we only need to disconnect peers with healthy scores, since the others got already
//disconnected in update_peer_scores
.filter(|(_, info)| info.score_state() == ScoreState::Healthy)
{
disconnecting_peers.push(**peer_id);
if peers_removed == connected_peer_count - self.target_peers {
break;
}
if info.is_outbound_only() {
if n_outbound_removed < outbound_only_peer_count - min_outbound_only_target {
kevlu93 marked this conversation as resolved.
Show resolved Hide resolved
kevlu93 marked this conversation as resolved.
Show resolved Hide resolved
n_outbound_removed += 1;
kevlu93 marked this conversation as resolved.
Show resolved Hide resolved
} else {
continue;
}
}
peers_removed += 1;
if info.score_state() == ScoreState::Healthy {
disconnecting_peers.push(**peer_id);
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down Expand Up @@ -1053,3 +1072,116 @@ enum ConnectingType {
multiaddr: Multiaddr,
},
}

#[cfg(test)]
mod tests {
use super::*;
use crate::discovery::enr::build_enr;
use crate::discovery::enr_ext::CombinedKeyExt;
use crate::rpc::methods::MetaData;
use crate::Enr;
use discv5::enr::CombinedKey;
use slog::{o, Drain};
use std::net::UdpSocket;
use types::{EnrForkId, MinimalEthSpec};

type E = MinimalEthSpec;

pub fn unused_port() -> u16 {
let socket = UdpSocket::bind("127.0.0.1:0").expect("should create udp socket");
let local_addr = socket.local_addr().expect("should read udp socket");
local_addr.port()
}

pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();

if enabled {
slog::Logger::root(drain.filter_level(level).fuse(), o!())
} else {
slog::Logger::root(drain.filter(|_| false).fuse(), o!())
}
}

async fn build_peer_manager() -> PeerManager<E> {
let keypair = libp2p::identity::Keypair::generate_secp256k1();
let config = NetworkConfig {
discovery_port: unused_port(),
target_peers: 5,
..Default::default()
};
let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap();
let enr: Enr = build_enr::<E>(&enr_key, &config, EnrForkId::default()).unwrap();
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new(
enr,
9000,
9000,
MetaData {
seq_number: 0,
attnets: Default::default(),
},
vec![],
&log,
);
PeerManager::new(&keypair, &config, Arc::new(globals), &log)
.await
.unwrap()
}

#[tokio::test]
async fn test_peer_manager_disconnects_peers_when_target_reached() {
let mut peer_manager = build_peer_manager().await;

//create 6 peers to connect too
//1 will be outbound-only, and have the lowest score.
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let peer3 = PeerId::random();
let peer4 = PeerId::random();
let outbound_only_peer = PeerId::random();

peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&peer2, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer3, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&peer3, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer4, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&peer4, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap());

//set the outbound-only peer to have the lowest score
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&outbound_only_peer)
.unwrap()
.add_to_score(-1.0);

//check initial connected peers
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 6);

peer_manager.heartbeat();

//check that we disconnected from one peer
//and that we did not disconnect the outbound peer even though it was the worst peer
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5);
assert!(peer_manager
.network_globals
.peers
.read()
.is_connected(&outbound_only_peer));

peer_manager.heartbeat();

//check that if we are at target number of peers, we do not disconnect any
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5);
}
}
5 changes: 5 additions & 0 deletions beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ impl<T: EthSpec> PeerInfo<T> {
matches!(self.connection_status, Disconnected { .. })
}

/// Checks if the peer is outbound-only
pub fn is_outbound_only(&self) -> bool {
matches!(self.connection_status, Connected {n_in, n_out} if n_in == 0 && n_out > 0)
}

/// Returns the number of connections with this peer.
pub fn connections(&self) -> (u8, u8) {
match self.connection_status {
Expand Down
27 changes: 27 additions & 0 deletions beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.map(|(peer_id, _)| peer_id)
}

///Connected outbound-only peers
pub fn connected_outbound_only_peers(&self) -> impl Iterator<Item = &PeerId> {
self.peers
.iter()
.filter(|(_, info)| info.is_outbound_only())
.map(|(peer_id, _)| peer_id)
}

/// Gives the `peer_id` of all known connected and synced peers.
pub fn synced_peers(&self) -> impl Iterator<Item = &PeerId> {
self.peers
Expand Down Expand Up @@ -677,6 +685,25 @@ mod tests {
assert_eq!(peer_info.unwrap().connections(), (n_in, n_out));
}

#[test]
fn test_outbound_only_peers_counted_correctly() {
let mut pdb = get_db();
let p0 = PeerId::random();
let p1 = PeerId::random();
let p2 = PeerId::random();
//create peer with no connections
let _p3 = PeerId::random();

pdb.connect_ingoing(&p0, "/ip4/0.0.0.0".parse().unwrap(), None);
pdb.connect_ingoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None);
pdb.connect_outgoing(&p1, "/ip4/0.0.0.0".parse().unwrap(), None);
pdb.connect_outgoing(&p2, "/ip4/0.0.0.0".parse().unwrap(), None);

// we should only have one outbound-only peer (p2).
// peers that are inbound-only, have both types of connections, or no connections should not be counted
assert_eq!(pdb.connected_outbound_only_peers().count(), 1);
}

#[test]
fn test_disconnected_are_bounded() {
let mut pdb = get_db();
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/eth2_libp2p/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
self.peers.read().connected_peer_ids().count()
}

/// Returns the number of libp2p connected peers with outbound-only connections
pub fn connected_outbound_only_peers(&self) -> usize {
self.peers.read().connected_outbound_only_peers().count()
}

/// Returns the number of libp2p peers that are either connected or being dialed.
pub fn connected_or_dialing_peers(&self) -> usize {
self.peers.read().connected_or_dialing_peers().count()
Expand Down