From 2e35733b68106a77f4eff298f841c7c09126a6bd Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 20 Jul 2021 16:14:34 +0530 Subject: [PATCH] Maintain sync committee peers --- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 8 ++ .../eth2_libp2p/src/peer_manager/mod.rs | 73 ++++++++++++++++++- beacon_node/network/src/router/mod.rs | 4 +- .../network/src/subnet_service/tests/mod.rs | 6 +- 4 files changed, 83 insertions(+), 8 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 88d7e294d8c..2f1d77cb012 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -571,6 +571,10 @@ impl Behaviour { .peers .write() .extend_peers_on_subnet(&s.subnet, min_ttl); + if let Subnet::SyncCommittee(sync_subnet) = s.subnet { + self.peer_manager_mut() + .add_sync_subnet(sync_subnet, min_ttl); + } } // Already have target number of peers, no need for subnet discovery let peers_on_subnet = self @@ -1090,6 +1094,10 @@ impl Behaviour { // Peer manager has requested a discovery query for more peers. self.discovery.discover_peers(); } + PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover) => { + // Peer manager has requested a subnet discovery query for more peers. + self.discover_subnet_peers(subnets_to_discover); + } PeerManagerEvent::Ping(peer_id) => { // send a ping request to this peer self.ping(RequestId::Behaviour, peer_id); diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index b73e8a20dca..d9ae3a7b663 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -1,11 +1,12 @@ //! Implementation of Lighthouse's peer management system. pub use self::peerdb::*; +use crate::discovery::TARGET_SUBNET_PEERS; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::types::SyncState; -use crate::Subnet; use crate::{error, metrics, Gossipsub}; use crate::{NetworkConfig, NetworkGlobals, PeerId}; +use crate::{Subnet, SubnetDiscovery}; use discv5::Enr; use futures::prelude::*; use futures::Stream; @@ -20,7 +21,7 @@ use std::{ task::{Context, Poll}, time::{Duration, Instant}, }; -use types::EthSpec; +use types::{EthSpec, SyncSubnetId}; pub use libp2p::core::{identity::Keypair, Multiaddr}; @@ -35,7 +36,7 @@ pub use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerConnectionSta pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; use score::{PeerAction, ReportSource, ScoreState}; use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{hash_map::Entry, HashMap}; use std::net::IpAddr; /// The time in seconds between re-status's peers. @@ -79,6 +80,11 @@ pub struct PeerManager { target_peers: usize, /// The maximum number of peers we allow (exceptions for subnet peers) max_peers: usize, + /// A collection of sync committee subnets that we need to stay subscribed to. + /// Sync committee subnets are longer term (256 epochs). Hence, we need to re-run + /// discovery queries for subnet peers if we disconnect from existing sync + /// committee subnet peers. + sync_committee_subnets: HashMap, /// The heartbeat interval to perform routine maintenance. heartbeat: tokio::time::Interval, /// Keeps track of whether the discovery service is enabled or not. @@ -109,6 +115,8 @@ pub enum PeerManagerEvent { UnBanned(PeerId, Vec), /// Request the behaviour to discover more peers. DiscoverPeers, + /// Request the behaviour to discover peers on subnets. + DiscoverSubnetPeers(Vec), } impl PeerManager { @@ -128,6 +136,7 @@ impl PeerManager { outbound_ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL_OUTBOUND)), status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)), target_peers: config.target_peers, + sync_committee_subnets: Default::default(), max_peers: (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as usize, heartbeat, discovery_enabled: !config.disable_discovery, @@ -285,6 +294,21 @@ impl PeerManager { } } + /// Insert the sync subnet into list of long lived sync committee subnets that we need to + /// maintain adequate number of peers for. + pub fn add_sync_subnet(&mut self, subnet_id: SyncSubnetId, min_ttl: Instant) { + match self.sync_committee_subnets.entry(subnet_id) { + Entry::Vacant(_) => { + self.sync_committee_subnets.insert(subnet_id, min_ttl); + } + Entry::Occupied(old) => { + if *old.get() < min_ttl { + self.sync_committee_subnets.insert(subnet_id, min_ttl); + } + } + } + } + /* Notifications from the Swarm */ // A peer is being dialed. @@ -966,6 +990,46 @@ impl PeerManager { Ok(()) } + /// Run discovery query for additional sync committee peers if we fall below `TARGET_PEERS`. + fn maintain_sync_committee_peers(&mut self) { + // Remove expired entries + self.sync_committee_subnets + .retain(|_, v| *v > Instant::now()); + + let subnets_to_discover: Vec = self + .sync_committee_subnets + .iter() + .filter_map(|(k, v)| { + if self + .network_globals + .peers + .read() + .good_peers_on_subnet(Subnet::SyncCommittee(*k)) + .count() + < TARGET_SUBNET_PEERS + { + Some(SubnetDiscovery { + subnet: Subnet::SyncCommittee(*k), + min_ttl: Some(*v), + }) + } else { + None + } + }) + .collect(); + + // request the subnet query from discovery + if !subnets_to_discover.is_empty() { + debug!( + self.log, + "Making subnet queries for maintaining sync committee peers"; + "subnets" => ?subnets_to_discover.iter().map(|s| s.subnet).collect::>() + ); + self.events + .push(PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover)); + } + } + /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. /// /// It will request discovery queries if the peer count has not reached the desired number of @@ -990,6 +1054,9 @@ impl PeerManager { // Updates peer's scores. self.update_peer_scores(); + // Maintain minimum count for sync committee peers. + self.maintain_sync_committee_peers(); + // Keep a list of peers we are disconnecting let mut disconnecting_peers = Vec::new(); diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 304436f4149..5096a4bdc84 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -248,7 +248,7 @@ impl Router { .on_attester_slashing_gossip(id, peer_id, attester_slashing); } PubsubMessage::SignedContributionAndProof(contribution_and_proof) => { - debug!( + trace!( self.log, "Received sync committee aggregate"; "peer_id" => %peer_id @@ -260,7 +260,7 @@ impl Router { ); } PubsubMessage::SyncCommitteeMessage(sync_committtee_msg) => { - debug!( + trace!( self.log, "Received sync committee signature"; "peer_id" => %peer_id diff --git a/beacon_node/network/src/subnet_service/tests/mod.rs b/beacon_node/network/src/subnet_service/tests/mod.rs index 12c355ce920..8af4996fe7b 100644 --- a/beacon_node/network/src/subnet_service/tests/mod.rs +++ b/beacon_node/network/src/subnet_service/tests/mod.rs @@ -20,7 +20,7 @@ use types::{ SyncSubnetId, ValidatorSubscription, }; -const SLOT_DURATION_MILLIS: u64 = 200; +const SLOT_DURATION_MILLIS: u64 = 400; type TestBeaconChainType = Witness< SystemTimeSlotClock, @@ -478,11 +478,11 @@ mod sync_committee_service { .unwrap(); let subnet_id = subnet_ids.iter().next().unwrap(); - // Note: the unsubscription event takes a full epoch (8 * 0.2 secs = 1.6 secs) + // Note: the unsubscription event takes 2 epochs (8 * 2 * 0.4 secs = 3.2 secs) let events = get_events( &mut sync_committee_service, Some(5), - (MinimalEthSpec::slots_per_epoch() * 3) as u32, + (MinimalEthSpec::slots_per_epoch() * 3) as u32, // Have some buffer time before getting 5 events ) .await; assert_eq!(