Skip to content

Commit

Permalink
Maintain sync committee peers
Browse files Browse the repository at this point in the history
  • Loading branch information
pawanjay176 committed Jul 20, 2021
1 parent 9f29788 commit 2e35733
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 8 deletions.
8 changes: 8 additions & 0 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,10 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.peers
.write()
.extend_peers_on_subnet(&s.subnet, min_ttl);
if let Subnet::SyncCommittee(sync_subnet) = s.subnet {
self.peer_manager_mut()
.add_sync_subnet(sync_subnet, min_ttl);
}
}
// Already have target number of peers, no need for subnet discovery
let peers_on_subnet = self
Expand Down Expand Up @@ -1090,6 +1094,10 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
// Peer manager has requested a discovery query for more peers.
self.discovery.discover_peers();
}
PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover) => {
// Peer manager has requested a subnet discovery query for more peers.
self.discover_subnet_peers(subnets_to_discover);
}
PeerManagerEvent::Ping(peer_id) => {
// send a ping request to this peer
self.ping(RequestId::Behaviour, peer_id);
Expand Down
73 changes: 70 additions & 3 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Implementation of Lighthouse's peer management system.
pub use self::peerdb::*;
use crate::discovery::TARGET_SUBNET_PEERS;
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::types::SyncState;
use crate::Subnet;
use crate::{error, metrics, Gossipsub};
use crate::{NetworkConfig, NetworkGlobals, PeerId};
use crate::{Subnet, SubnetDiscovery};
use discv5::Enr;
use futures::prelude::*;
use futures::Stream;
Expand All @@ -20,7 +21,7 @@ use std::{
task::{Context, Poll},
time::{Duration, Instant},
};
use types::EthSpec;
use types::{EthSpec, SyncSubnetId};

pub use libp2p::core::{identity::Keypair, Multiaddr};

Expand All @@ -35,7 +36,7 @@ pub use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerConnectionSta
pub use peer_sync_status::{PeerSyncStatus, SyncInfo};
use score::{PeerAction, ReportSource, ScoreState};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{hash_map::Entry, HashMap};
use std::net::IpAddr;

/// The time in seconds between re-status's peers.
Expand Down Expand Up @@ -79,6 +80,11 @@ pub struct PeerManager<TSpec: EthSpec> {
target_peers: usize,
/// The maximum number of peers we allow (exceptions for subnet peers)
max_peers: usize,
/// A collection of sync committee subnets that we need to stay subscribed to.
/// Sync committee subnets are longer term (256 epochs). Hence, we need to re-run
/// discovery queries for subnet peers if we disconnect from existing sync
/// committee subnet peers.
sync_committee_subnets: HashMap<SyncSubnetId, Instant>,
/// The heartbeat interval to perform routine maintenance.
heartbeat: tokio::time::Interval,
/// Keeps track of whether the discovery service is enabled or not.
Expand Down Expand Up @@ -109,6 +115,8 @@ pub enum PeerManagerEvent {
UnBanned(PeerId, Vec<IpAddr>),
/// Request the behaviour to discover more peers.
DiscoverPeers,
/// Request the behaviour to discover peers on subnets.
DiscoverSubnetPeers(Vec<SubnetDiscovery>),
}

impl<TSpec: EthSpec> PeerManager<TSpec> {
Expand All @@ -128,6 +136,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
outbound_ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL_OUTBOUND)),
status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)),
target_peers: config.target_peers,
sync_committee_subnets: Default::default(),
max_peers: (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as usize,
heartbeat,
discovery_enabled: !config.disable_discovery,
Expand Down Expand Up @@ -285,6 +294,21 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
}

/// Insert the sync subnet into list of long lived sync committee subnets that we need to
/// maintain adequate number of peers for.
pub fn add_sync_subnet(&mut self, subnet_id: SyncSubnetId, min_ttl: Instant) {
match self.sync_committee_subnets.entry(subnet_id) {
Entry::Vacant(_) => {
self.sync_committee_subnets.insert(subnet_id, min_ttl);
}
Entry::Occupied(old) => {
if *old.get() < min_ttl {
self.sync_committee_subnets.insert(subnet_id, min_ttl);
}
}
}
}

/* Notifications from the Swarm */

// A peer is being dialed.
Expand Down Expand Up @@ -966,6 +990,46 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
Ok(())
}

/// Run discovery query for additional sync committee peers if we fall below `TARGET_PEERS`.
fn maintain_sync_committee_peers(&mut self) {
// Remove expired entries
self.sync_committee_subnets
.retain(|_, v| *v > Instant::now());

let subnets_to_discover: Vec<SubnetDiscovery> = self
.sync_committee_subnets
.iter()
.filter_map(|(k, v)| {
if self
.network_globals
.peers
.read()
.good_peers_on_subnet(Subnet::SyncCommittee(*k))
.count()
< TARGET_SUBNET_PEERS
{
Some(SubnetDiscovery {
subnet: Subnet::SyncCommittee(*k),
min_ttl: Some(*v),
})
} else {
None
}
})
.collect();

// request the subnet query from discovery
if !subnets_to_discover.is_empty() {
debug!(
self.log,
"Making subnet queries for maintaining sync committee peers";
"subnets" => ?subnets_to_discover.iter().map(|s| s.subnet).collect::<Vec<_>>()
);
self.events
.push(PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover));
}
}

/// The Peer manager's heartbeat maintains the peer count and maintains peer reputations.
///
/// It will request discovery queries if the peer count has not reached the desired number of
Expand All @@ -990,6 +1054,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// Updates peer's scores.
self.update_peer_scores();

// Maintain minimum count for sync committee peers.
self.maintain_sync_committee_peers();

// Keep a list of peers we are disconnecting
let mut disconnecting_peers = Vec::new();

Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl<T: BeaconChainTypes> Router<T> {
.on_attester_slashing_gossip(id, peer_id, attester_slashing);
}
PubsubMessage::SignedContributionAndProof(contribution_and_proof) => {
debug!(
trace!(
self.log,
"Received sync committee aggregate";
"peer_id" => %peer_id
Expand All @@ -260,7 +260,7 @@ impl<T: BeaconChainTypes> Router<T> {
);
}
PubsubMessage::SyncCommitteeMessage(sync_committtee_msg) => {
debug!(
trace!(
self.log,
"Received sync committee signature";
"peer_id" => %peer_id
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/network/src/subnet_service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use types::{
SyncSubnetId, ValidatorSubscription,
};

const SLOT_DURATION_MILLIS: u64 = 200;
const SLOT_DURATION_MILLIS: u64 = 400;

type TestBeaconChainType = Witness<
SystemTimeSlotClock,
Expand Down Expand Up @@ -478,11 +478,11 @@ mod sync_committee_service {
.unwrap();
let subnet_id = subnet_ids.iter().next().unwrap();

// Note: the unsubscription event takes a full epoch (8 * 0.2 secs = 1.6 secs)
// Note: the unsubscription event takes 2 epochs (8 * 2 * 0.4 secs = 3.2 secs)
let events = get_events(
&mut sync_committee_service,
Some(5),
(MinimalEthSpec::slots_per_epoch() * 3) as u32,
(MinimalEthSpec::slots_per_epoch() * 3) as u32, // Have some buffer time before getting 5 events
)
.await;
assert_eq!(
Expand Down

0 comments on commit 2e35733

Please sign in to comment.