From 33c9162c0e3d2b59463dd7ba5b3f8560a55691aa Mon Sep 17 00:00:00 2001 From: driftluo Date: Mon, 23 Aug 2021 14:43:32 +0800 Subject: [PATCH 1/5] refactor: refactor peer store --- network/src/network.rs | 5 +- network/src/peer_store/mod.rs | 10 ++ network/src/peer_store/peer_store_impl.rs | 72 ++++----- network/src/protocols/identify/mod.rs | 36 +++-- network/src/protocols/tests/mod.rs | 35 ++-- network/src/services/dump_peer_store.rs | 28 ++-- network/src/services/outbound_peer.rs | 149 ++++++++++-------- network/src/services/protocol_type_checker.rs | 24 +-- network/src/tests/peer_store.rs | 48 +++++- 9 files changed, 249 insertions(+), 158 deletions(-) diff --git a/network/src/network.rs b/network/src/network.rs index 1dbd9103b2..9335a37fce 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -75,7 +75,7 @@ pub struct NetworkState { pending_observed_addrs: RwLock>, local_private_key: secio::SecioKeyPair, local_peer_id: PeerId, - bootnodes: Vec, + pub(crate) bootnodes: Vec, pub(crate) config: NetworkConfig, pub(crate) active: AtomicBool, /// Node supported protocols @@ -856,7 +856,8 @@ impl NetworkService { .yamux_config(yamux_config) .forever(true) .max_connection_number(1024) - .set_send_buffer_size(config.max_send_buffer()); + .set_send_buffer_size(config.max_send_buffer()) + .timeout(Duration::from_secs(5)); #[cfg(target_os = "linux")] let p2p_service = { diff --git a/network/src/peer_store/mod.rs b/network/src/peer_store/mod.rs index 8df7012bde..5d8b09536e 100644 --- a/network/src/peer_store/mod.rs +++ b/network/src/peer_store/mod.rs @@ -1,4 +1,12 @@ //! Peer store manager +//! +//! This module implements a locally managed node information set, which is used for +//! booting into the network when the node is started, real-time update detection/timing +//! saving at runtime, and saving data when stopping +//! +//! The validity and screening speed of the data set are very important to the entire network, +//! and the address information collected on the network cannot be blindly trusted + pub mod addr_manager; pub mod ban_list; mod peer_store_db; @@ -14,6 +22,8 @@ pub use peer_store_impl::PeerStore; pub(crate) const ADDR_COUNT_LIMIT: usize = 16384; /// Consider we never seen a peer if peer's last_connected_at beyond this timeout const ADDR_TIMEOUT_MS: u64 = 7 * 24 * 3600 * 1000; +/// The timeout that peer's address should be added to the feeler list again +pub(crate) const ADDR_TRY_TIMEOUT_MS: u64 = 3 * 24 * 3600 * 1000; const ADDR_MAX_RETRIES: u32 = 3; const ADDR_MAX_FAILURES: u32 = 10; diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 98d342577c..3a9a1c5a10 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -1,13 +1,12 @@ use crate::{ errors::{PeerStoreError, Result}, extract_peer_id, multiaddr_to_socketaddr, - network_group::Group, peer_store::{ addr_manager::AddrManager, ban_list::BanList, types::{ip_to_network, AddrInfo, BannedAddr, PeerInfo}, Behaviour, Multiaddr, PeerScoreConfig, ReportResult, Status, ADDR_COUNT_LIMIT, - ADDR_TIMEOUT_MS, + ADDR_TIMEOUT_MS, ADDR_TRY_TIMEOUT_MS, }, PeerId, SessionType, }; @@ -15,6 +14,9 @@ use ipnetwork::IpNetwork; use std::collections::{hash_map::Entry, HashMap}; /// Peer store +/// +/// | -- choose to identify --| --- choose to feeler --- | -- delete -- | +/// | 1 | 2 | 3 | 4 | 5 | 6 | 7 | More than seven days | #[derive(Default)] pub struct PeerStore { addr_manager: AddrManager, @@ -43,12 +45,12 @@ impl PeerStore { { Entry::Occupied(mut entry) => { let mut peer = entry.get_mut(); - peer.connected_addr = addr.clone(); + peer.connected_addr = addr; peer.last_connected_at_ms = now_ms; peer.session_type = session_type; } Entry::Vacant(entry) => { - let peer = PeerInfo::new(addr.clone(), session_type, now_ms); + let peer = PeerInfo::new(addr, session_type, now_ms); entry.insert(peer); } } @@ -111,11 +113,17 @@ impl PeerStore { } } - /// Get peers for outbound connection, this method randomly return non-connected peer addrs + /// Get peers for outbound connection, this method randomly return recently connected peer addrs pub fn fetch_addrs_to_attempt(&mut self, count: usize) -> Vec { + // Get info: + // 1. Not in ban list + // 2. Not already connected + // 3. Connected within 3 days + let now_ms = faketime::unix_time_as_millis(); let ban_list = &self.ban_list; let peers = &self.peers; + let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS); // get addrs that can attempt. self.addr_manager .fetch_random(count, |peer_addr: &AddrInfo| { @@ -123,16 +131,21 @@ impl PeerStore { && extract_peer_id(&peer_addr.addr) .map(|peer_id| !peers.contains_key(&peer_id)) .unwrap_or_default() - && !peer_addr.tried_in_last_minute(now_ms) + && peer_addr.had_connected(addr_expired_ms) }) } /// Get peers for feeler connection, this method randomly return peer addrs that we never /// connected to. pub fn fetch_addrs_to_feeler(&mut self, count: usize) -> Vec { + // Get info: + // 1. Not in ban list + // 2. Not already connected + // 3. Not already tried in a minute + // 4. Not connected within 3 days + let now_ms = faketime::unix_time_as_millis(); - let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS; - // get expired or never successed addrs. + let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS); let ban_list = &self.ban_list; let peers = &self.peers; self.addr_manager @@ -148,8 +161,12 @@ impl PeerStore { /// Return valid addrs that success connected, used for discovery. pub fn fetch_random_addrs(&mut self, count: usize) -> Vec { + // Get info: + // 1. Not in ban list + // 2. Already connected or Connected within 7 days + let now_ms = faketime::unix_time_as_millis(); - let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS; + let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS); let ban_list = &self.ban_list; let peers = &self.peers; // get success connected addrs. @@ -208,43 +225,18 @@ impl PeerStore { if self.addr_manager.count() < ADDR_COUNT_LIMIT { return Ok(()); } - // Evicting invalid data in the peer store is a relatively rare operation - // There are certain cleanup strategies here: - // 1. Group current data according to network segment - // 2. Sort according to the amount of data in the same network segment - // 3. Prioritize cleaning on the same network segment let now_ms = faketime::unix_time_as_millis(); let candidate_peers: Vec<_> = { - // find candidate peers by network group - let mut peers_by_network_group: HashMap> = HashMap::default(); + // find candidate peers by terrible condition + let ban_score = self.score_config.ban_score; + let mut peers = Vec::default(); for addr in self.addr_manager.addrs_iter() { - peers_by_network_group - .entry((&addr.addr).into()) - .or_default() - .push(addr); + if addr.is_terrible(now_ms) || addr.score <= ban_score { + peers.push(addr.addr.clone()) + } } - let len = peers_by_network_group.len(); - let mut peers = peers_by_network_group - .drain() - .map(|(_, v)| v) - .collect::>>(); - - peers.sort_unstable_by_key(|k| std::cmp::Reverse(k.len())); - let ban_score = self.score_config.ban_score; - peers - .into_iter() - .take(len / 2) - .flatten() - .filter_map(move |addr| { - if addr.is_terrible(now_ms) || addr.score <= ban_score { - Some(addr.addr.clone()) - } else { - None - } - }) - .collect() }; if candidate_peers.is_empty() { diff --git a/network/src/protocols/identify/mod.rs b/network/src/protocols/identify/mod.rs index d316149217..f593e644ef 100644 --- a/network/src/protocols/identify/mod.rs +++ b/network/src/protocols/identify/mod.rs @@ -12,7 +12,7 @@ use p2p::{ service::{SessionType, TargetProtocol}, traits::ServiceProtocol, utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr}, - ProtocolId, SessionId, + SessionId, }; mod protocol; @@ -62,7 +62,7 @@ pub trait Callback: Clone + Send { // Register open protocol fn register(&self, context: &ProtocolContextMutRef, version: &str); // remove registered identify protocol - fn unregister(&self, id: SessionId, pid: ProtocolId); + fn unregister(&self, context: &ProtocolContextMutRef); /// Received custom message fn received_identify( &mut self, @@ -261,8 +261,7 @@ impl ServiceProtocol for IdentifyProtocol { .remove(&context.session.id) .expect("RemoteInfo must exists"); trace!("IdentifyProtocol disconnected from {:?}", info.peer_id); - self.callback - .unregister(context.session.id, context.proto_id) + self.callback.unregister(&context) } } @@ -374,18 +373,37 @@ impl Callback for IdentifyCallback { .mut_addr_manager() .remove(&context.session.address); } else if context.session.ty.is_outbound() { + // why don't set inbound here? + // because inbound address can't feeler during staying connected + // and if set it to peer store, it will be broadcast to the entire network, + // but this is an unverified address self.network_state.with_peer_store_mut(|peer_store| { peer_store.add_outbound_addr(context.session.address.clone()); }); } } - fn unregister(&self, id: SessionId, pid: ProtocolId) { - self.network_state.with_peer_registry_mut(|reg| { - let _ = reg.get_peer_mut(id).map(|peer| { - peer.protocols.remove(&pid); + fn unregister(&self, context: &ProtocolContextMutRef) { + let version = self + .network_state + .with_peer_registry_mut(|reg| { + reg.get_peer_mut(context.session.id) + .map(|peer| peer.protocols.remove(&context.proto_id)) + }) + .flatten() + .map(|version| version != "2") + .unwrap_or_default(); + + if self.network_state.ckb2021.load(Ordering::SeqCst) && version { + } else if context.session.ty.is_outbound() { + // Due to the filtering strategy of the peer store, if the node is + // disconnected after a long connection is maintained for more than seven days, + // it is possible that the node will be accidentally evicted, so it is necessary + // to reset the information of the node when disconnected. + self.network_state.with_peer_store_mut(|peer_store| { + peer_store.add_outbound_addr(context.session.address.clone()); }); - }); + } } fn identify(&mut self) -> &[u8] { diff --git a/network/src/protocols/tests/mod.rs b/network/src/protocols/tests/mod.rs index 89dab86e28..94ae9b983a 100644 --- a/network/src/protocols/tests/mod.rs +++ b/network/src/protocols/tests/mod.rs @@ -359,31 +359,38 @@ fn test_discovery_behavior() { wait_discovery(&node3); - let addr = { + let addrs = { let listen_addr = &node3.listen_addr; - node3 - .network_state - .peer_store - .lock() - .fetch_addrs_to_attempt(2) + let mut locked = node3.network_state.peer_store.lock(); + + locked + .fetch_addrs_to_feeler(6) .into_iter() .map(|peer| peer.addr) - .find(|addr| { + .flat_map(|addr| { match ( multiaddr_to_socketaddr(&addr), multiaddr_to_socketaddr(listen_addr), ) { - (Some(dis), Some(listen)) => dis.port() != listen.port(), - _ => false, + (Some(dis), Some(listen)) => { + if dis.port() != listen.port() { + Some(addr) + } else { + None + } + } + _ => None, } }) - .unwrap() + .collect::>() }; - node3.dial_addr( - addr, - TargetProtocol::Single(SupportProtocols::Identify.protocol_id()), - ); + for addr in addrs { + node3.dial_addr( + addr, + TargetProtocol::Single(SupportProtocols::Identify.protocol_id()), + ); + } wait_connect_state(&node1, 2); wait_connect_state(&node2, 2); diff --git a/network/src/services/dump_peer_store.rs b/network/src/services/dump_peer_store.rs index 0fbe9c9493..2db8ec8295 100644 --- a/network/src/services/dump_peer_store.rs +++ b/network/src/services/dump_peer_store.rs @@ -7,10 +7,11 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use tokio::time::Interval; +use tokio::time::{Instant, Interval, MissedTickBehavior}; const DEFAULT_DUMP_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour +/// Save current peer store data regularly pub struct DumpPeerStoreService { network_state: Arc, interval: Option, @@ -48,19 +49,20 @@ impl Future for DumpPeerStoreService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { - self.interval = Some(tokio::time::interval(DEFAULT_DUMP_INTERVAL)); - } - let mut interval = self.interval.take().unwrap(); - loop { - match interval.poll_tick(cx) { - Poll::Ready(_) => { - self.dump_peer_store(); - } - Poll::Pending => { - self.interval = Some(interval); - return Poll::Pending; - } + self.interval = { + let mut interval = tokio::time::interval_at( + Instant::now() + DEFAULT_DUMP_INTERVAL, + DEFAULT_DUMP_INTERVAL, + ); + // The dump peer store service does not need to urgently compensate for the missed wake, + // just delay behavior is enough + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + Some(interval) } } + while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + self.dump_peer_store() + } + Poll::Pending } } diff --git a/network/src/services/outbound_peer.rs b/network/src/services/outbound_peer.rs index d27191fb67..2a97494f78 100644 --- a/network/src/services/outbound_peer.rs +++ b/network/src/services/outbound_peer.rs @@ -1,25 +1,29 @@ -use crate::peer_store::types::AddrInfo; use crate::NetworkState; use ckb_logger::trace; use faketime::unix_time_as_millis; use futures::Future; -use p2p::service::ServiceControl; +use p2p::{multiaddr::MultiAddr, service::ServiceControl}; +use rand::prelude::IteratorRandom; use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::{Duration, Instant}, + time::Duration, }; -use tokio::time::Interval; +use tokio::time::{Interval, MissedTickBehavior}; -const FEELER_CONNECTION_COUNT: usize = 5; +const FEELER_CONNECTION_COUNT: usize = 10; +/// Ensure that the outbound of the current node reaches the expected upper limit as much as possible +/// Periodically detect and verify data in the peer store +/// Keep the whitelist nodes connected as much as possible +/// Periodically detection finds that the observed addresses are all valid pub struct OutboundPeerService { network_state: Arc, p2p_control: ServiceControl, interval: Option, try_connect_interval: Duration, - last_connect: Option, + try_identify_count: u8, } impl OutboundPeerService { @@ -33,23 +37,15 @@ impl OutboundPeerService { p2p_control, interval: None, try_connect_interval, - last_connect: None, + try_identify_count: 0, } } - fn dial_peers(&mut self, is_feeler: bool, count: usize) { + fn dial_feeler(&mut self) { let now_ms = unix_time_as_millis(); let attempt_peers = self.network_state.with_peer_store_mut(|peer_store| { - // take extra 5 peers - // in current implementation fetch peers may return less than count - let extra_count = 5; - let mut paddrs = if is_feeler { - peer_store.fetch_addrs_to_feeler(count + extra_count) - } else { - peer_store.fetch_addrs_to_attempt(count + extra_count) - }; - paddrs.truncate(count as usize); - for paddr in &mut paddrs { + let paddrs = peer_store.fetch_addrs_to_feeler(FEELER_CONNECTION_COUNT); + for paddr in paddrs.iter() { // mark addr as tried if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&paddr.addr) { paddr.mark_tried(now_ms); @@ -57,25 +53,67 @@ impl OutboundPeerService { } paddrs }); + trace!( - "count={}, attempt_peers: {:?} is_feeler: {}", - count, + "feeler dial count={}, attempt_peers: {:?}", + attempt_peers.len(), attempt_peers, - is_feeler ); - for paddr in attempt_peers { - let AddrInfo { addr, .. } = paddr; - if is_feeler { - self.network_state.dial_feeler(&self.p2p_control, addr); - } else { - self.network_state.dial_identify(&self.p2p_control, addr); - } + for addr in attempt_peers.into_iter().map(|info| info.addr) { + self.network_state.dial_identify(&self.p2p_control, addr); + } + } + + fn try_dial_peers(&mut self) { + let status = self.network_state.connection_status(); + let count = status + .max_outbound + .saturating_sub(status.non_whitelist_outbound) as usize; + if count == 0 { + self.try_identify_count = 0; + return; + } + self.try_identify_count += 1; + + let peers: Box> = if self.try_identify_count > 3 { + self.try_identify_count = 0; + Box::new( + self.network_state + .bootnodes + .iter() + .choose_multiple(&mut rand::thread_rng(), count) + .into_iter() + .cloned(), + ) + } else { + let now_ms = unix_time_as_millis(); + let attempt_peers = self.network_state.with_peer_store_mut(|peer_store| { + let paddrs = peer_store.fetch_addrs_to_attempt(count); + for paddr in paddrs.iter() { + // mark addr as tried + if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&paddr.addr) { + paddr.mark_tried(now_ms); + } + } + paddrs + }); + + trace!( + "identify dial count={}, attempt_peers: {:?}", + attempt_peers.len(), + attempt_peers, + ); + + Box::new(attempt_peers.into_iter().map(|info| info.addr)) + }; + + for addr in peers { + self.network_state.dial_identify(&self.p2p_control, addr); } } fn try_dial_whitelist(&self) { - // This will never panic because network start has already been checked for addr in self.network_state.config.whitelist_peers() { self.network_state.dial_identify(&self.p2p_control, addr); } @@ -92,43 +130,24 @@ impl Future for OutboundPeerService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { - self.interval = Some(tokio::time::interval(Duration::from_secs(1))); - } - let mut interval = self.interval.take().unwrap(); - loop { - match interval.poll_tick(cx) { - Poll::Ready(_) => { - let last_connect = self - .last_connect - .map(|time| time.elapsed()) - .unwrap_or_else(|| Duration::from_secs(std::u64::MAX)); - if last_connect > self.try_connect_interval { - let status = self.network_state.connection_status(); - let new_outbound = status - .max_outbound - .saturating_sub(status.non_whitelist_outbound) - as usize; - if !self.network_state.config.whitelist_only { - if new_outbound > 0 { - // dial peers - self.dial_peers(false, new_outbound); - } else { - // feeler peers - self.dial_peers(true, FEELER_CONNECTION_COUNT); - } - } - // keep whitelist peer on connected - self.try_dial_whitelist(); - // try dial observed addrs - self.try_dial_observed(); - self.last_connect = Some(Instant::now()); - } - } - Poll::Pending => { - self.interval = Some(interval); - return Poll::Pending; - } + self.interval = { + let mut interval = tokio::time::interval(self.try_connect_interval); + // The outbound service does not need to urgently compensate for the missed wake, + // just skip behavior is enough + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + Some(interval) } } + while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + // ensure feeler work at any time + self.dial_feeler(); + // keep outbound peer is enough + self.try_dial_peers(); + // keep whitelist peer on connected + self.try_dial_whitelist(); + // try dial observed addrs + self.try_dial_observed(); + } + Poll::Pending } } diff --git a/network/src/services/protocol_type_checker.rs b/network/src/services/protocol_type_checker.rs index 9c5ff9537d..409f17dc71 100644 --- a/network/src/services/protocol_type_checker.rs +++ b/network/src/services/protocol_type_checker.rs @@ -18,7 +18,7 @@ use std::{ task::{Context, Poll}, time::{Duration, Instant}, }; -use tokio::time::Interval; +use tokio::time::{Interval, MissedTickBehavior}; const TIMEOUT: Duration = Duration::from_secs(60); const CHECK_INTERVAL: Duration = Duration::from_secs(30); @@ -55,6 +55,8 @@ impl std::fmt::Display for ProtocolTypeError { } } +/// Periodically check whether all connections are normally open sync protocol, +/// if not, close the connection pub struct ProtocolTypeCheckerService { network_state: Arc, p2p_control: ServiceControl, @@ -126,17 +128,17 @@ impl Future for ProtocolTypeCheckerService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { - self.interval = Some(tokio::time::interval(CHECK_INTERVAL)); - } - let mut interval = self.interval.take().unwrap(); - loop { - match interval.poll_tick(cx) { - Poll::Ready(_) => self.check_protocol_type(), - Poll::Pending => { - self.interval = Some(interval); - return Poll::Pending; - } + self.interval = { + let mut interval = tokio::time::interval(CHECK_INTERVAL); + // The protocol type checker service does not need to urgently compensate for the missed wake, + // just skip behavior is enough + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + Some(interval) } } + while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + self.check_protocol_type(); + } + Poll::Pending } } diff --git a/network/src/tests/peer_store.rs b/network/src/tests/peer_store.rs index 845afa66e8..0d1d006309 100644 --- a/network/src/tests/peer_store.rs +++ b/network/src/tests/peer_store.rs @@ -2,7 +2,7 @@ use super::random_addr; use crate::{ extract_peer_id, multiaddr::Multiaddr, - peer_store::{PeerStore, Status, ADDR_COUNT_LIMIT}, + peer_store::{PeerStore, Status, ADDR_COUNT_LIMIT, ADDR_TRY_TIMEOUT_MS}, Behaviour, PeerId, SessionType, }; @@ -22,8 +22,9 @@ fn test_add_addr() { assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 0); let addr = random_addr(); peer_store.add_addr(addr).unwrap(); - assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); + assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1); // we have not connected yet, so return 0 + assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 0); assert_eq!(peer_store.fetch_random_addrs(2).len(), 0); } @@ -59,6 +60,11 @@ fn test_attempt_ban() { let mut peer_store: PeerStore = Default::default(); let addr = random_addr(); peer_store.add_addr(addr.clone()).unwrap(); + peer_store + .mut_addr_manager() + .get_mut(&addr) + .unwrap() + .last_connected_at_ms = faketime::unix_time_as_millis(); assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); peer_store.ban_addr(&addr, 10_000, "no reason".into()); assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 0); @@ -70,11 +76,36 @@ fn test_fetch_addrs_to_attempt() { assert!(peer_store.fetch_addrs_to_attempt(1).is_empty()); let addr = random_addr(); peer_store.add_addr(addr.clone()).unwrap(); + peer_store + .mut_addr_manager() + .get_mut(&addr) + .unwrap() + .last_connected_at_ms = faketime::unix_time_as_millis(); assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); peer_store.add_connected_peer(addr, SessionType::Outbound); assert!(peer_store.fetch_addrs_to_attempt(1).is_empty()); } +#[cfg(not(disable_faketime))] +#[test] +fn test_fetch_addrs_to_attempt_or_feeler() { + let faketime_file = faketime::millis_tempfile(0).expect("create faketime file"); + faketime::enable(&faketime_file); + + faketime::write_millis(&faketime_file, 1).expect("write millis"); + + let mut peer_store: PeerStore = Default::default(); + let addr = random_addr(); + peer_store.add_outbound_addr(addr); + assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); + assert!(peer_store.fetch_addrs_to_feeler(2).is_empty()); + + faketime::write_millis(&faketime_file, ADDR_TRY_TIMEOUT_MS + 1).expect("write millis"); + + assert!(peer_store.fetch_addrs_to_attempt(2).is_empty()); + assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1); +} + #[test] fn test_fetch_addrs_to_attempt_in_last_minutes() { let mut peer_store: PeerStore = Default::default(); @@ -90,6 +121,16 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() { if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&addr) { paddr.mark_tried(now - 60_001); } + assert!(peer_store.fetch_addrs_to_attempt(1).is_empty()); + peer_store + .mut_addr_manager() + .get_mut(&addr) + .unwrap() + .last_connected_at_ms = now; + assert_eq!(peer_store.fetch_addrs_to_attempt(1).len(), 1); + if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&addr) { + paddr.mark_tried(now); + } assert_eq!(peer_store.fetch_addrs_to_attempt(1).len(), 1); } @@ -245,8 +286,7 @@ fn test_eviction() { .unwrap(); peer_store.add_addr(new_peer_addr.clone()).unwrap(); // check addrs - // peer store will evict peers from largest network group to low group - // the two evict_addrs should be evict, other addrs will remain in peer store + // peer store will evict all peers which are invalid assert!(peer_store.mut_addr_manager().get(&new_peer_addr).is_some()); assert!(peer_store.mut_addr_manager().get(&evict_addr_2).is_none()); assert!(peer_store.mut_addr_manager().get(&evict_addr).is_none()); From 2e906e63e25828470bd605cacb1ce917cb42ddcc Mon Sep 17 00:00:00 2001 From: driftluo Date: Tue, 24 Aug 2021 10:15:22 +0800 Subject: [PATCH 2/5] chore: remove ban address from peer store --- network/src/peer_store/peer_store_impl.rs | 48 +++++++++++------------ 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 3a9a1c5a10..6ba0d50070 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -60,6 +60,9 @@ impl PeerStore { /// this method will assume peer and addr is untrust since we have not connected to it. pub fn add_addr(&mut self, addr: Multiaddr) -> Result<()> { self.check_purge()?; + if self.ban_list.is_addr_banned(&addr) { + return Ok(()); + } let score = self.score_config.default_score; self.addr_manager.add(AddrInfo::new(addr, 0, score)); Ok(()) @@ -67,6 +70,9 @@ impl PeerStore { /// Add outbound peer address pub fn add_outbound_addr(&mut self, addr: Multiaddr) { + if self.ban_list.is_addr_banned(&addr) { + return; + } let score = self.score_config.default_score; self.addr_manager .add(AddrInfo::new(addr, faketime::unix_time_as_millis(), score)); @@ -116,21 +122,18 @@ impl PeerStore { /// Get peers for outbound connection, this method randomly return recently connected peer addrs pub fn fetch_addrs_to_attempt(&mut self, count: usize) -> Vec { // Get info: - // 1. Not in ban list - // 2. Not already connected - // 3. Connected within 3 days + // 1. Not already connected + // 2. Connected within 3 days let now_ms = faketime::unix_time_as_millis(); - let ban_list = &self.ban_list; let peers = &self.peers; let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS); // get addrs that can attempt. self.addr_manager .fetch_random(count, |peer_addr: &AddrInfo| { - !ban_list.is_addr_banned(&peer_addr.addr) - && extract_peer_id(&peer_addr.addr) - .map(|peer_id| !peers.contains_key(&peer_id)) - .unwrap_or_default() + extract_peer_id(&peer_addr.addr) + .map(|peer_id| !peers.contains_key(&peer_id)) + .unwrap_or_default() && peer_addr.had_connected(addr_expired_ms) }) } @@ -139,21 +142,18 @@ impl PeerStore { /// connected to. pub fn fetch_addrs_to_feeler(&mut self, count: usize) -> Vec { // Get info: - // 1. Not in ban list - // 2. Not already connected - // 3. Not already tried in a minute - // 4. Not connected within 3 days + // 1. Not already connected + // 2. Not already tried in a minute + // 3. Not connected within 3 days let now_ms = faketime::unix_time_as_millis(); let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS); - let ban_list = &self.ban_list; let peers = &self.peers; self.addr_manager .fetch_random(count, |peer_addr: &AddrInfo| { - !ban_list.is_addr_banned(&peer_addr.addr) - && extract_peer_id(&peer_addr.addr) - .map(|peer_id| !peers.contains_key(&peer_id)) - .unwrap_or_default() + extract_peer_id(&peer_addr.addr) + .map(|peer_id| !peers.contains_key(&peer_id)) + .unwrap_or_default() && !peer_addr.tried_in_last_minute(now_ms) && !peer_addr.had_connected(addr_expired_ms) }) @@ -162,21 +162,18 @@ impl PeerStore { /// Return valid addrs that success connected, used for discovery. pub fn fetch_random_addrs(&mut self, count: usize) -> Vec { // Get info: - // 1. Not in ban list - // 2. Already connected or Connected within 7 days + // 1. Already connected or Connected within 7 days let now_ms = faketime::unix_time_as_millis(); let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS); - let ban_list = &self.ban_list; let peers = &self.peers; // get success connected addrs. self.addr_manager .fetch_random(count, |peer_addr: &AddrInfo| { - !ban_list.is_addr_banned(&peer_addr.addr) - && (extract_peer_id(&peer_addr.addr) - .map(|peer_id| peers.contains_key(&peer_id)) - .unwrap_or_default() - || peer_addr.had_connected(addr_expired_ms)) + extract_peer_id(&peer_addr.addr) + .map(|peer_id| peers.contains_key(&peer_id)) + .unwrap_or_default() + || peer_addr.had_connected(addr_expired_ms) }) } @@ -186,6 +183,7 @@ impl PeerStore { let network = ip_to_network(addr.ip()); self.ban_network(network, timeout_ms, ban_reason) } + self.addr_manager.remove(addr); } pub(crate) fn ban_network(&mut self, network: IpNetwork, timeout_ms: u64, ban_reason: String) { From f3c85c600d95572bd7cc99b903cc7a288130b6f6 Mon Sep 17 00:00:00 2001 From: driftluo Date: Wed, 25 Aug 2021 09:47:46 +0800 Subject: [PATCH 3/5] feat: evict connectable nodes by network segment --- network/src/peer_store/addr_manager.rs | 2 +- network/src/peer_store/peer_store_impl.rs | 64 +++++++++++++++++++++-- network/src/peer_store/types.rs | 10 ++-- network/src/tests/peer_store.rs | 18 ++++++- 4 files changed, 81 insertions(+), 13 deletions(-) diff --git a/network/src/peer_store/addr_manager.rs b/network/src/peer_store/addr_manager.rs index 0db641737f..f6f5a44070 100644 --- a/network/src/peer_store/addr_manager.rs +++ b/network/src/peer_store/addr_manager.rs @@ -61,7 +61,7 @@ impl AddrManager { // TODO remove this after fix the network tests. let is_test_ip = ip.is_unspecified() || ip.is_loopback(); if (is_test_ip || is_unique_ip) - && !addr_info.is_terrible(now_ms) + && addr_info.is_connectable(now_ms) && filter(&addr_info) { addr_infos.push(addr_info); diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 6ba0d50070..c875567a56 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -1,6 +1,7 @@ use crate::{ errors::{PeerStoreError, Result}, extract_peer_id, multiaddr_to_socketaddr, + network_group::Group, peer_store::{ addr_manager::AddrManager, ban_list::BanList, @@ -11,6 +12,7 @@ use crate::{ PeerId, SessionType, }; use ipnetwork::IpNetwork; +use rand::prelude::IteratorRandom; use std::collections::{hash_map::Entry, HashMap}; /// Peer store @@ -224,25 +226,77 @@ impl PeerStore { return Ok(()); } + // Evicting invalid data in the peer store is a relatively rare operation + // There are certain cleanup strategies here: + // 1. First evict the nodes that have reached the eviction condition + // 2. If the first step is unsuccessful, enter the network segment grouping mode + // 2.1. Group current data according to network segment + // 2.2. Sort according to the amount of data in the same network segment + // 2.3. In the network segment with more than 4 peer, randomly evict 2 peer + let now_ms = faketime::unix_time_as_millis(); let candidate_peers: Vec<_> = { // find candidate peers by terrible condition let ban_score = self.score_config.ban_score; let mut peers = Vec::default(); for addr in self.addr_manager.addrs_iter() { - if addr.is_terrible(now_ms) || addr.score <= ban_score { + if !addr.is_connectable(now_ms) || addr.score <= ban_score { peers.push(addr.addr.clone()) } } peers }; - if candidate_peers.is_empty() { - return Err(PeerStoreError::EvictionFailed.into()); + for key in candidate_peers.iter() { + self.addr_manager.remove(&key); } - for key in candidate_peers { - self.addr_manager.remove(&key); + if candidate_peers.is_empty() { + let candidate_peers: Vec<_> = { + // find candidate peers by terrible condition + let mut peers_by_network_group: HashMap> = HashMap::default(); + for addr in self.addr_manager.addrs_iter() { + peers_by_network_group + .entry((&addr.addr).into()) + .or_default() + .push(addr); + } + let len = peers_by_network_group.len(); + let mut peers = peers_by_network_group + .drain() + .map(|(_, v)| v) + .collect::>>(); + + peers.sort_unstable_by_key(|k| std::cmp::Reverse(k.len())); + + peers + .into_iter() + .take(len / 2) + .flat_map(move |addrs| { + if addrs.len() > 4 { + Some( + addrs + .iter() + .choose_multiple(&mut rand::thread_rng(), 2) + .into_iter() + .map(|addr| addr.addr.clone()) + .collect::>(), + ) + } else { + None + } + }) + .flatten() + .collect() + }; + + for key in candidate_peers.iter() { + self.addr_manager.remove(&key); + } + + if candidate_peers.is_empty() { + return Err(PeerStoreError::EvictionFailed.into()); + } } Ok(()) } diff --git a/network/src/peer_store/types.rs b/network/src/peer_store/types.rs index 13d645ab7e..c9f0162747 100644 --- a/network/src/peer_store/types.rs +++ b/network/src/peer_store/types.rs @@ -72,22 +72,22 @@ impl AddrInfo { } /// Whether terrible peer - pub fn is_terrible(&self, now_ms: u64) -> bool { + pub fn is_connectable(&self, now_ms: u64) -> bool { // do not remove addr tried in last minute if self.tried_in_last_minute(now_ms) { - return false; + return true; } // we give up if never connect to this addr if self.last_connected_at_ms == 0 && self.attempts_count >= ADDR_MAX_RETRIES { - return true; + return false; } // consider addr is terrible if failed too many times if now_ms.saturating_sub(self.last_connected_at_ms) > ADDR_TIMEOUT_MS && (self.attempts_count >= ADDR_MAX_FAILURES) { - return true; + return false; } - false + true } /// Try dail count diff --git a/network/src/tests/peer_store.rs b/network/src/tests/peer_store.rs index 0d1d006309..218b4292ab 100644 --- a/network/src/tests/peer_store.rs +++ b/network/src/tests/peer_store.rs @@ -271,13 +271,13 @@ fn test_eviction() { paddr.mark_tried(tried_ms); paddr.mark_tried(tried_ms); paddr.mark_tried(tried_ms); - assert!(paddr.is_terrible(now)); + assert!(!paddr.is_connectable(now)); } if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&evict_addr_2) { paddr.mark_tried(tried_ms); paddr.mark_tried(tried_ms); paddr.mark_tried(tried_ms); - assert!(paddr.is_terrible(now)); + assert!(!paddr.is_connectable(now)); } // should evict evict_addr and accept new_peer let new_peer_addr: Multiaddr = @@ -290,4 +290,18 @@ fn test_eviction() { assert!(peer_store.mut_addr_manager().get(&new_peer_addr).is_some()); assert!(peer_store.mut_addr_manager().get(&evict_addr_2).is_none()); assert!(peer_store.mut_addr_manager().get(&evict_addr).is_none()); + + // In the absence of invalid nodes, too many nodes on the same network segment will be automatically evicted + let new_peer_addr: Multiaddr = + format!("/ip4/225.0.0.3/tcp/42/p2p/{}", PeerId::random().to_base58()) + .parse() + .unwrap(); + peer_store.add_addr(new_peer_addr.clone()).unwrap(); + assert!(peer_store.mut_addr_manager().get(&new_peer_addr).is_some()); + let new_peer_addr: Multiaddr = + format!("/ip4/225.0.0.3/tcp/42/p2p/{}", PeerId::random().to_base58()) + .parse() + .unwrap(); + peer_store.add_addr(new_peer_addr.clone()).unwrap(); + assert!(peer_store.mut_addr_manager().get(&new_peer_addr).is_some()); } From 36ed62b16598e6fc40e78c0baba03a9fcc237eb2 Mon Sep 17 00:00:00 2001 From: driftluo Date: Wed, 25 Aug 2021 11:01:22 +0800 Subject: [PATCH 4/5] feat: remove nodes that have been connected within 15s --- network/src/peer_store/peer_store_impl.rs | 7 ++--- network/src/peer_store/types.rs | 6 ++--- network/src/tests/peer_store.rs | 31 ++++++++++++++++++++++- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index c875567a56..2d4c61af3c 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -136,7 +136,8 @@ impl PeerStore { extract_peer_id(&peer_addr.addr) .map(|peer_id| !peers.contains_key(&peer_id)) .unwrap_or_default() - && peer_addr.had_connected(addr_expired_ms) + && peer_addr + .connected(|t| t > addr_expired_ms && t <= now_ms.saturating_sub(15_000)) }) } @@ -157,7 +158,7 @@ impl PeerStore { .map(|peer_id| !peers.contains_key(&peer_id)) .unwrap_or_default() && !peer_addr.tried_in_last_minute(now_ms) - && !peer_addr.had_connected(addr_expired_ms) + && !peer_addr.connected(|t| t >= addr_expired_ms) }) } @@ -175,7 +176,7 @@ impl PeerStore { extract_peer_id(&peer_addr.addr) .map(|peer_id| peers.contains_key(&peer_id)) .unwrap_or_default() - || peer_addr.had_connected(addr_expired_ms) + || peer_addr.connected(|t| t >= addr_expired_ms) }) } diff --git a/network/src/peer_store/types.rs b/network/src/peer_store/types.rs index c9f0162747..924f70a045 100644 --- a/network/src/peer_store/types.rs +++ b/network/src/peer_store/types.rs @@ -61,9 +61,9 @@ impl AddrInfo { } } - /// Whether already connected - pub fn had_connected(&self, expires_ms: u64) -> bool { - self.last_connected_at_ms > expires_ms + /// Connection information + pub fn connected bool>(&self, f: F) -> bool { + f(self.last_connected_at_ms) } /// Whether already try dail within a minute diff --git a/network/src/tests/peer_store.rs b/network/src/tests/peer_store.rs index 218b4292ab..e4b4efc12e 100644 --- a/network/src/tests/peer_store.rs +++ b/network/src/tests/peer_store.rs @@ -55,8 +55,14 @@ fn test_ban_peer() { assert!(peer_store.is_addr_banned(&addr)); } +#[cfg(not(disable_faketime))] #[test] fn test_attempt_ban() { + let faketime_file = faketime::millis_tempfile(0).expect("create faketime file"); + faketime::enable(&faketime_file); + + faketime::write_millis(&faketime_file, 1).expect("write millis"); + let mut peer_store: PeerStore = Default::default(); let addr = random_addr(); peer_store.add_addr(addr.clone()).unwrap(); @@ -65,13 +71,22 @@ fn test_attempt_ban() { .get_mut(&addr) .unwrap() .last_connected_at_ms = faketime::unix_time_as_millis(); + + faketime::write_millis(&faketime_file, 100_000).expect("write millis"); + assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); peer_store.ban_addr(&addr, 10_000, "no reason".into()); assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 0); } +#[cfg(not(disable_faketime))] #[test] fn test_fetch_addrs_to_attempt() { + let faketime_file = faketime::millis_tempfile(0).expect("create faketime file"); + faketime::enable(&faketime_file); + + faketime::write_millis(&faketime_file, 1).expect("write millis"); + let mut peer_store: PeerStore = Default::default(); assert!(peer_store.fetch_addrs_to_attempt(1).is_empty()); let addr = random_addr(); @@ -81,6 +96,8 @@ fn test_fetch_addrs_to_attempt() { .get_mut(&addr) .unwrap() .last_connected_at_ms = faketime::unix_time_as_millis(); + faketime::write_millis(&faketime_file, 100_000).expect("write millis"); + assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); peer_store.add_connected_peer(addr, SessionType::Outbound); assert!(peer_store.fetch_addrs_to_attempt(1).is_empty()); @@ -97,17 +114,27 @@ fn test_fetch_addrs_to_attempt_or_feeler() { let mut peer_store: PeerStore = Default::default(); let addr = random_addr(); peer_store.add_outbound_addr(addr); + + faketime::write_millis(&faketime_file, 100_000).expect("write millis"); + assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); assert!(peer_store.fetch_addrs_to_feeler(2).is_empty()); - faketime::write_millis(&faketime_file, ADDR_TRY_TIMEOUT_MS + 1).expect("write millis"); + faketime::write_millis(&faketime_file, 100_000 + ADDR_TRY_TIMEOUT_MS + 1) + .expect("write millis"); assert!(peer_store.fetch_addrs_to_attempt(2).is_empty()); assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1); } +#[cfg(not(disable_faketime))] #[test] fn test_fetch_addrs_to_attempt_in_last_minutes() { + let faketime_file = faketime::millis_tempfile(0).expect("create faketime file"); + faketime::enable(&faketime_file); + + faketime::write_millis(&faketime_file, 100_000).expect("write millis"); + let mut peer_store: PeerStore = Default::default(); let addr = random_addr(); peer_store.add_addr(addr.clone()).unwrap(); @@ -127,6 +154,8 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() { .get_mut(&addr) .unwrap() .last_connected_at_ms = now; + faketime::write_millis(&faketime_file, 200_000).expect("write millis"); + assert_eq!(peer_store.fetch_addrs_to_attempt(1).len(), 1); if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&addr) { paddr.mark_tried(now); From fca5c4d301c3a805f9c31f05be3be53e39bb7d8d Mon Sep 17 00:00:00 2001 From: driftluo Date: Thu, 26 Aug 2021 10:47:45 +0800 Subject: [PATCH 5/5] chore: impl review comments --- network/src/peer_store/mod.rs | 3 ++ network/src/peer_store/peer_store_impl.rs | 34 +++++++++++------------ network/src/peer_store/types.rs | 4 +-- network/src/services/outbound_peer.rs | 4 +-- network/src/tests/peer_store.rs | 8 +++--- 5 files changed, 28 insertions(+), 25 deletions(-) diff --git a/network/src/peer_store/mod.rs b/network/src/peer_store/mod.rs index 5d8b09536e..a93541d3f2 100644 --- a/network/src/peer_store/mod.rs +++ b/network/src/peer_store/mod.rs @@ -24,6 +24,9 @@ pub(crate) const ADDR_COUNT_LIMIT: usize = 16384; const ADDR_TIMEOUT_MS: u64 = 7 * 24 * 3600 * 1000; /// The timeout that peer's address should be added to the feeler list again pub(crate) const ADDR_TRY_TIMEOUT_MS: u64 = 3 * 24 * 3600 * 1000; +/// When obtaining the list of selectable nodes for identify, +/// the node that has just been disconnected needs to be excluded +pub(crate) const DIAL_INTERVAL: u64 = 15 * 1000; const ADDR_MAX_RETRIES: u32 = 3; const ADDR_MAX_FAILURES: u32 = 10; diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 2d4c61af3c..924a2dc6fb 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -7,7 +7,7 @@ use crate::{ ban_list::BanList, types::{ip_to_network, AddrInfo, BannedAddr, PeerInfo}, Behaviour, Multiaddr, PeerScoreConfig, ReportResult, Status, ADDR_COUNT_LIMIT, - ADDR_TIMEOUT_MS, ADDR_TRY_TIMEOUT_MS, + ADDR_TIMEOUT_MS, ADDR_TRY_TIMEOUT_MS, DIAL_INTERVAL, }, PeerId, SessionType, }; @@ -61,10 +61,10 @@ impl PeerStore { /// Add discovered peer address /// this method will assume peer and addr is untrust since we have not connected to it. pub fn add_addr(&mut self, addr: Multiaddr) -> Result<()> { - self.check_purge()?; if self.ban_list.is_addr_banned(&addr) { return Ok(()); } + self.check_purge()?; let score = self.score_config.default_score; self.addr_manager.add(AddrInfo::new(addr, 0, score)); Ok(()) @@ -136,8 +136,9 @@ impl PeerStore { extract_peer_id(&peer_addr.addr) .map(|peer_id| !peers.contains_key(&peer_id)) .unwrap_or_default() - && peer_addr - .connected(|t| t > addr_expired_ms && t <= now_ms.saturating_sub(15_000)) + && peer_addr.connected(|t| { + t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL) + }) }) } @@ -158,7 +159,7 @@ impl PeerStore { .map(|peer_id| !peers.contains_key(&peer_id)) .unwrap_or_default() && !peer_addr.tried_in_last_minute(now_ms) - && !peer_addr.connected(|t| t >= addr_expired_ms) + && !peer_addr.connected(|t| t > addr_expired_ms) }) } @@ -176,7 +177,7 @@ impl PeerStore { extract_peer_id(&peer_addr.addr) .map(|peer_id| peers.contains_key(&peer_id)) .unwrap_or_default() - || peer_addr.connected(|t| t >= addr_expired_ms) + || peer_addr.connected(|t| t > addr_expired_ms) }) } @@ -236,17 +237,17 @@ impl PeerStore { // 2.3. In the network segment with more than 4 peer, randomly evict 2 peer let now_ms = faketime::unix_time_as_millis(); - let candidate_peers: Vec<_> = { - // find candidate peers by terrible condition - let ban_score = self.score_config.ban_score; - let mut peers = Vec::default(); - for addr in self.addr_manager.addrs_iter() { - if !addr.is_connectable(now_ms) || addr.score <= ban_score { - peers.push(addr.addr.clone()) + let candidate_peers: Vec<_> = self + .addr_manager + .addrs_iter() + .filter_map(|addr| { + if !addr.is_connectable(now_ms) { + Some(addr.addr.clone()) + } else { + None } - } - peers - }; + }) + .collect(); for key in candidate_peers.iter() { self.addr_manager.remove(&key); @@ -254,7 +255,6 @@ impl PeerStore { if candidate_peers.is_empty() { let candidate_peers: Vec<_> = { - // find candidate peers by terrible condition let mut peers_by_network_group: HashMap> = HashMap::default(); for addr in self.addr_manager.addrs_iter() { peers_by_network_group diff --git a/network/src/peer_store/types.rs b/network/src/peer_store/types.rs index 924f70a045..407c449e94 100644 --- a/network/src/peer_store/types.rs +++ b/network/src/peer_store/types.rs @@ -71,7 +71,7 @@ impl AddrInfo { self.last_tried_at_ms >= now_ms.saturating_sub(60_000) } - /// Whether terrible peer + /// Whether connectable peer pub fn is_connectable(&self, now_ms: u64) -> bool { // do not remove addr tried in last minute if self.tried_in_last_minute(now_ms) { @@ -81,7 +81,7 @@ impl AddrInfo { if self.last_connected_at_ms == 0 && self.attempts_count >= ADDR_MAX_RETRIES { return false; } - // consider addr is terrible if failed too many times + // consider addr is not connectable if failed too many times if now_ms.saturating_sub(self.last_connected_at_ms) > ADDR_TIMEOUT_MS && (self.attempts_count >= ADDR_MAX_FAILURES) { diff --git a/network/src/services/outbound_peer.rs b/network/src/services/outbound_peer.rs index 2a97494f78..d23d2aec03 100644 --- a/network/src/services/outbound_peer.rs +++ b/network/src/services/outbound_peer.rs @@ -139,12 +139,12 @@ impl Future for OutboundPeerService { } } while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + // keep whitelist peer on connected + self.try_dial_whitelist(); // ensure feeler work at any time self.dial_feeler(); // keep outbound peer is enough self.try_dial_peers(); - // keep whitelist peer on connected - self.try_dial_whitelist(); // try dial observed addrs self.try_dial_observed(); } diff --git a/network/src/tests/peer_store.rs b/network/src/tests/peer_store.rs index e4b4efc12e..a6443f5428 100644 --- a/network/src/tests/peer_store.rs +++ b/network/src/tests/peer_store.rs @@ -70,7 +70,7 @@ fn test_attempt_ban() { .mut_addr_manager() .get_mut(&addr) .unwrap() - .last_connected_at_ms = faketime::unix_time_as_millis(); + .mark_connected(faketime::unix_time_as_millis()); faketime::write_millis(&faketime_file, 100_000).expect("write millis"); @@ -95,7 +95,7 @@ fn test_fetch_addrs_to_attempt() { .mut_addr_manager() .get_mut(&addr) .unwrap() - .last_connected_at_ms = faketime::unix_time_as_millis(); + .mark_connected(faketime::unix_time_as_millis()); faketime::write_millis(&faketime_file, 100_000).expect("write millis"); assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); @@ -153,7 +153,7 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() { .mut_addr_manager() .get_mut(&addr) .unwrap() - .last_connected_at_ms = now; + .mark_connected(now); faketime::write_millis(&faketime_file, 200_000).expect("write millis"); assert_eq!(peer_store.fetch_addrs_to_attempt(1).len(), 1); @@ -224,7 +224,7 @@ fn test_fetch_random_addrs() { .mut_addr_manager() .get_mut(&addr3) .unwrap() - .last_connected_at_ms = 0; + .mark_connected(0); assert_eq!(peer_store.fetch_random_addrs(3).len(), 3); peer_store.remove_disconnected_peer(&addr3); assert_eq!(peer_store.fetch_random_addrs(3).len(), 2);