Skip to content

Commit

Permalink
Minimum Outbound-Only Peers Requirement (#2356)
Browse files Browse the repository at this point in the history
## Issue Addressed

#2325 

## Proposed Changes

This pull request changes the behavior of the Peer Manager by including a minimum outbound-only peers requirement. The peer manager will continue querying for peers if this outbound-only target number hasn't been met. Additionally, when peers are being removed, an outbound-only peer will not be disconnected if doing so brings us below the minimum. 

## Additional Info

Unit test for heartbeat function tests that disconnection behavior is correct. Continual querying for peers if outbound-only hasn't been met is not directly tested, but indirectly through unit testing of the helper function that counts the number of outbound-only peers.

EDIT: Am concerned about the behavior of ```update_peer_scores```. If we have connected to a peer with a score below the disconnection threshold (-20), then its connection status will remain connected, while its score state will change to disconnected. 

```rust
let previous_state = info.score_state();            
// Update scores            
info.score_update();
Self::handle_score_transitions(                
               previous_state,
                peer_id,
                info, 
               &mut to_ban_peers,
               &mut to_unban_peers,
               &mut self.events,
               &self.log,
);
```

```previous_state``` will be set to Disconnected, and then because ```handle_score_transitions``` only changes connection status for a peer if the state changed, the peer remains connected. Then in the heartbeat code, because we only disconnect healthy peers if we have too many peers, these peers don't get disconnected. I'm not sure realistically how often this scenario would occur, but it might be better to adjust the logic to account for scenarios where the score state implies a connection status different from the current connection status. 

Co-authored-by: Kevin Lu <[email protected]>
  • Loading branch information
kevlu93 and kevlu93 committed May 31, 2021
1 parent 0b4707a commit 263f46b
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 9 deletions.
326 changes: 317 additions & 9 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const PEER_EXCESS_FACTOR: f32 = 0.1;
/// them in lighthouse.
const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1;

/// A fraction of `PeerManager::target_peers` that need to be outbound-only connections.
const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.1;

/// The main struct that handles peer's reputation and connection status.
pub struct PeerManager<TSpec: EthSpec> {
/// Storage of network globals to access the `PeerDB`.
Expand Down Expand Up @@ -835,7 +838,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// NOTE: This is experimental and will likely be adjusted
fn update_peer_scores(&mut self) {
/* Check how long have peers been in this state and update their reputations if needed */

let mut to_ban_peers = Vec::new();
let mut to_unban_peers = Vec::new();

Expand Down Expand Up @@ -910,12 +912,16 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// The Peer manager's heartbeat maintains the peer count and maintains peer reputations.
///
/// It will request discovery queries if the peer count has not reached the desired number of
/// peers.
/// overall peers, as well as the desired number of outbound-only peers.
///
/// NOTE: Discovery will only add a new query if one isn't already queued.
fn heartbeat(&mut self) {
let peer_count = self.network_globals.connected_or_dialing_peers();
if peer_count < self.target_peers {
let mut outbound_only_peer_count = self.network_globals.connected_outbound_only_peers();
let min_outbound_only_target =
(self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize;

if peer_count < self.target_peers || outbound_only_peer_count < min_outbound_only_target {
// If we need more peers, queue a discovery lookup.
if self.discovery.started {
debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers);
Expand All @@ -931,19 +937,28 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {

let connected_peer_count = self.network_globals.connected_peers();
if connected_peer_count > self.target_peers {
//remove excess peers with the worst scores, but keep subnet peers
for (peer_id, _) in self
// Remove excess peers with the worst scores, but keep subnet peers.
// Must also ensure that the outbound-only peer count does not go below the minimum threshold.
outbound_only_peer_count = self.network_globals.connected_outbound_only_peers();
let mut n_outbound_removed = 0;
for (peer_id, info) in self
.network_globals
.peers
.read()
.worst_connected_peers()
.iter()
.filter(|(_, info)| !info.has_future_duty())
.take(connected_peer_count - self.target_peers)
//we only need to disconnect peers with healthy scores, since the others got already
//disconnected in update_peer_scores
.filter(|(_, info)| info.score_state() == ScoreState::Healthy)
{
if disconnecting_peers.len() == connected_peer_count - self.target_peers {
break;
}
if info.is_outbound_only() {
if min_outbound_only_target < outbound_only_peer_count - n_outbound_removed {
n_outbound_removed += 1;
} else {
continue;
}
}
disconnecting_peers.push(**peer_id);
}
}
Expand Down Expand Up @@ -1045,3 +1060,296 @@ enum ConnectingType {
multiaddr: Multiaddr,
},
}

#[cfg(test)]
mod tests {
use super::*;
use crate::discovery::enr::build_enr;
use crate::discovery::enr_ext::CombinedKeyExt;
use crate::rpc::methods::MetaData;
use crate::Enr;
use discv5::enr::CombinedKey;
use slog::{o, Drain};
use std::net::UdpSocket;
use types::{EnrForkId, MinimalEthSpec};

type E = MinimalEthSpec;

pub fn unused_port() -> u16 {
let socket = UdpSocket::bind("127.0.0.1:0").expect("should create udp socket");
let local_addr = socket.local_addr().expect("should read udp socket");
local_addr.port()
}

pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();

if enabled {
slog::Logger::root(drain.filter_level(level).fuse(), o!())
} else {
slog::Logger::root(drain.filter(|_| false).fuse(), o!())
}
}

async fn build_peer_manager(target: usize) -> PeerManager<E> {
let keypair = libp2p::identity::Keypair::generate_secp256k1();
let config = NetworkConfig {
discovery_port: unused_port(),
target_peers: target,
..Default::default()
};
let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap();
let enr: Enr = build_enr::<E>(&enr_key, &config, EnrForkId::default()).unwrap();
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new(
enr,
9000,
9000,
MetaData {
seq_number: 0,
attnets: Default::default(),
},
vec![],
&log,
);
PeerManager::new(&keypair, &config, Arc::new(globals), &log)
.await
.unwrap()
}

#[tokio::test]
async fn test_peer_manager_disconnects_correctly_during_heartbeat() {
let mut peer_manager = build_peer_manager(3).await;

// Create 5 peers to connect to.
// 2 will be outbound-only, and have the lowest score.
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let outbound_only_peer1 = PeerId::random();
let outbound_only_peer2 = PeerId::random();

peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer2, "/ip4/0.0.0.0".parse().unwrap());

// Set the outbound-only peers to have the lowest score.
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&outbound_only_peer1)
.unwrap()
.add_to_score(-1.0);

peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&outbound_only_peer2)
.unwrap()
.add_to_score(-2.0);

// Check initial connected peers.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5);

peer_manager.heartbeat();

// Check that we disconnected from two peers.
// Check that one outbound-only peer was removed because it had the worst score
// and that we did not disconnect the other outbound peer due to the minimum outbound quota.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);
assert!(peer_manager
.network_globals
.peers
.read()
.is_connected(&outbound_only_peer1));
assert!(!peer_manager
.network_globals
.peers
.read()
.is_connected(&outbound_only_peer2));

peer_manager.heartbeat();

// Check that if we are at target number of peers, we do not disconnect any.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);
}

#[tokio::test]
async fn test_peer_manager_not_enough_outbound_peers_no_panic_during_heartbeat() {
let mut peer_manager = build_peer_manager(20).await;

// Connect to 20 ingoing-only peers.
for _i in 0..19 {
let peer = PeerId::random();
peer_manager.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap());
}

// Connect an outbound-only peer.
// Give it the lowest score so that it is evaluated first in the disconnect list iterator.
let outbound_only_peer = PeerId::random();
peer_manager.connect_ingoing(&outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap());
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer))
.unwrap()
.add_to_score(-1.0);
// After heartbeat, we will have removed one peer.
// Having less outbound-only peers than minimum won't cause panic when the outbound-only peer is being considered for disconnection.
peer_manager.heartbeat();
assert_eq!(
peer_manager.network_globals.connected_or_dialing_peers(),
20
);
}

#[tokio::test]
async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat() {
let mut peer_manager = build_peer_manager(3).await;

// Create 3 peers to connect to.
let peer0 = PeerId::random();
let inbound_only_peer1 = PeerId::random();
let outbound_only_peer1 = PeerId::random();

peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());

// Connect to two peers that are on the threshold of being disconnected.
peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
// Update the gossipsub scores to induce connection downgrade
// during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20, this will then trigger a disconnection.
// If we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected,
// then handle_state_transitions will not change the connection status to disconnected because the score state has not changed.
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);

peer_manager.heartbeat();

assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1);
}

#[tokio::test]
async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() {
let mut peer_manager = build_peer_manager(3).await;

// Create 4 peers to connect to.
// One pair will be unhealthy inbound only and outbound only peers.
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let inbound_only_peer1 = PeerId::random();
let outbound_only_peer1 = PeerId::random();

peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap());

// Connect to two peers that are on the threshold of being disconnected.
peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);
peer_manager.heartbeat();
// Tests that when we are over the target peer limit, after disconnecting two unhealthy peers,
// the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target).
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 2);
}

#[tokio::test]
async fn test_peer_manager_removes_enough_peers_when_one_is_unhealthy() {
let mut peer_manager = build_peer_manager(3).await;

// Create 5 peers to connect to.
// One will be unhealthy inbound only and outbound only peers.
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let inbound_only_peer1 = PeerId::random();
let outbound_only_peer1 = PeerId::random();

peer_manager.connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap());
peer_manager.connect_outgoing(&outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
// Have one peer be on the verge of disconnection.
peer_manager.connect_ingoing(&inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap());
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);

peer_manager.heartbeat();
// Tests that when we are over the target peer limit, after disconnecting an unhealthy peer,
// the number of connected peers updates and we will not remove too many peers.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);
}
}
10 changes: 10 additions & 0 deletions beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ impl<T: EthSpec> PeerInfo<T> {
matches!(self.connection_status, Disconnected { .. })
}

/// Checks if the peer is outbound-only
pub fn is_outbound_only(&self) -> bool {
matches!(self.connection_status, Connected {n_in, n_out} if n_in == 0 && n_out > 0)
}

/// Returns the number of connections with this peer.
pub fn connections(&self) -> (u8, u8) {
match self.connection_status {
Expand Down Expand Up @@ -306,6 +311,11 @@ impl<T: EthSpec> PeerInfo<T> {
self.score.test_add(score)
}
}

#[cfg(test)]
pub fn set_gossipsub_score(&mut self, score: f64) {
self.score.set_gossipsub_score(score);
}
}

#[derive(Clone, Debug, Serialize)]
Expand Down
Loading

0 comments on commit 263f46b

Please sign in to comment.