From 78159f91734426e6cdfdea5a5eb7f5ead13c6ba7 Mon Sep 17 00:00:00 2001 From: driftluo Date: Mon, 23 Aug 2021 14:43:32 +0800 Subject: [PATCH] refactor: refactor peer store --- network/src/network.rs | 5 +- network/src/peer_store/mod.rs | 10 ++ network/src/peer_store/peer_store_impl.rs | 72 ++++----- network/src/protocols/identify/mod.rs | 36 +++-- network/src/protocols/test.rs | 36 +++-- network/src/services/dump_peer_store.rs | 28 ++-- network/src/services/outbound_peer.rs | 149 ++++++++++-------- network/src/services/protocol_type_checker.rs | 24 +-- network/src/tests/peer_store.rs | 48 +++++- 9 files changed, 250 insertions(+), 158 deletions(-) diff --git a/network/src/network.rs b/network/src/network.rs index 1dbd9103b27..9335a37fced 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -75,7 +75,7 @@ pub struct NetworkState { pending_observed_addrs: RwLock>, local_private_key: secio::SecioKeyPair, local_peer_id: PeerId, - bootnodes: Vec, + pub(crate) bootnodes: Vec, pub(crate) config: NetworkConfig, pub(crate) active: AtomicBool, /// Node supported protocols @@ -856,7 +856,8 @@ impl NetworkService { .yamux_config(yamux_config) .forever(true) .max_connection_number(1024) - .set_send_buffer_size(config.max_send_buffer()); + .set_send_buffer_size(config.max_send_buffer()) + .timeout(Duration::from_secs(5)); #[cfg(target_os = "linux")] let p2p_service = { diff --git a/network/src/peer_store/mod.rs b/network/src/peer_store/mod.rs index 8df7012bde4..5d8b09536e4 100644 --- a/network/src/peer_store/mod.rs +++ b/network/src/peer_store/mod.rs @@ -1,4 +1,12 @@ //! Peer store manager +//! +//! This module implements a locally managed node information set, which is used for +//! booting into the network when the node is started, real-time update detection/timing +//! saving at runtime, and saving data when stopping +//! +//! The validity and screening speed of the data set are very important to the entire network, +//! and the address information collected on the network cannot be blindly trusted + pub mod addr_manager; pub mod ban_list; mod peer_store_db; @@ -14,6 +22,8 @@ pub use peer_store_impl::PeerStore; pub(crate) const ADDR_COUNT_LIMIT: usize = 16384; /// Consider we never seen a peer if peer's last_connected_at beyond this timeout const ADDR_TIMEOUT_MS: u64 = 7 * 24 * 3600 * 1000; +/// The timeout that peer's address should be added to the feeler list again +pub(crate) const ADDR_TRY_TIMEOUT_MS: u64 = 3 * 24 * 3600 * 1000; const ADDR_MAX_RETRIES: u32 = 3; const ADDR_MAX_FAILURES: u32 = 10; diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 98d342577c4..3a9a1c5a100 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -1,13 +1,12 @@ use crate::{ errors::{PeerStoreError, Result}, extract_peer_id, multiaddr_to_socketaddr, - network_group::Group, peer_store::{ addr_manager::AddrManager, ban_list::BanList, types::{ip_to_network, AddrInfo, BannedAddr, PeerInfo}, Behaviour, Multiaddr, PeerScoreConfig, ReportResult, Status, ADDR_COUNT_LIMIT, - ADDR_TIMEOUT_MS, + ADDR_TIMEOUT_MS, ADDR_TRY_TIMEOUT_MS, }, PeerId, SessionType, }; @@ -15,6 +14,9 @@ use ipnetwork::IpNetwork; use std::collections::{hash_map::Entry, HashMap}; /// Peer store +/// +/// | -- choose to identify --| --- choose to feeler --- | -- delete -- | +/// | 1 | 2 | 3 | 4 | 5 | 6 | 7 | More than seven days | #[derive(Default)] pub struct PeerStore { addr_manager: AddrManager, @@ -43,12 +45,12 @@ impl PeerStore { { Entry::Occupied(mut entry) => { let mut peer = entry.get_mut(); - peer.connected_addr = addr.clone(); + peer.connected_addr = addr; peer.last_connected_at_ms = now_ms; peer.session_type = session_type; } Entry::Vacant(entry) => { - let peer = PeerInfo::new(addr.clone(), session_type, now_ms); + let peer = PeerInfo::new(addr, session_type, now_ms); entry.insert(peer); } } @@ -111,11 +113,17 @@ impl PeerStore { } } - /// Get peers for outbound connection, this method randomly return non-connected peer addrs + /// Get peers for outbound connection, this method randomly return recently connected peer addrs pub fn fetch_addrs_to_attempt(&mut self, count: usize) -> Vec { + // Get info: + // 1. Not in ban list + // 2. Not already connected + // 3. Connected within 3 days + let now_ms = faketime::unix_time_as_millis(); let ban_list = &self.ban_list; let peers = &self.peers; + let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS); // get addrs that can attempt. self.addr_manager .fetch_random(count, |peer_addr: &AddrInfo| { @@ -123,16 +131,21 @@ impl PeerStore { && extract_peer_id(&peer_addr.addr) .map(|peer_id| !peers.contains_key(&peer_id)) .unwrap_or_default() - && !peer_addr.tried_in_last_minute(now_ms) + && peer_addr.had_connected(addr_expired_ms) }) } /// Get peers for feeler connection, this method randomly return peer addrs that we never /// connected to. pub fn fetch_addrs_to_feeler(&mut self, count: usize) -> Vec { + // Get info: + // 1. Not in ban list + // 2. Not already connected + // 3. Not already tried in a minute + // 4. Not connected within 3 days + let now_ms = faketime::unix_time_as_millis(); - let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS; - // get expired or never successed addrs. + let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS); let ban_list = &self.ban_list; let peers = &self.peers; self.addr_manager @@ -148,8 +161,12 @@ impl PeerStore { /// Return valid addrs that success connected, used for discovery. pub fn fetch_random_addrs(&mut self, count: usize) -> Vec { + // Get info: + // 1. Not in ban list + // 2. Already connected or Connected within 7 days + let now_ms = faketime::unix_time_as_millis(); - let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS; + let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS); let ban_list = &self.ban_list; let peers = &self.peers; // get success connected addrs. @@ -208,43 +225,18 @@ impl PeerStore { if self.addr_manager.count() < ADDR_COUNT_LIMIT { return Ok(()); } - // Evicting invalid data in the peer store is a relatively rare operation - // There are certain cleanup strategies here: - // 1. Group current data according to network segment - // 2. Sort according to the amount of data in the same network segment - // 3. Prioritize cleaning on the same network segment let now_ms = faketime::unix_time_as_millis(); let candidate_peers: Vec<_> = { - // find candidate peers by network group - let mut peers_by_network_group: HashMap> = HashMap::default(); + // find candidate peers by terrible condition + let ban_score = self.score_config.ban_score; + let mut peers = Vec::default(); for addr in self.addr_manager.addrs_iter() { - peers_by_network_group - .entry((&addr.addr).into()) - .or_default() - .push(addr); + if addr.is_terrible(now_ms) || addr.score <= ban_score { + peers.push(addr.addr.clone()) + } } - let len = peers_by_network_group.len(); - let mut peers = peers_by_network_group - .drain() - .map(|(_, v)| v) - .collect::>>(); - - peers.sort_unstable_by_key(|k| std::cmp::Reverse(k.len())); - let ban_score = self.score_config.ban_score; - peers - .into_iter() - .take(len / 2) - .flatten() - .filter_map(move |addr| { - if addr.is_terrible(now_ms) || addr.score <= ban_score { - Some(addr.addr.clone()) - } else { - None - } - }) - .collect() }; if candidate_peers.is_empty() { diff --git a/network/src/protocols/identify/mod.rs b/network/src/protocols/identify/mod.rs index d316149217f..f593e644ef6 100644 --- a/network/src/protocols/identify/mod.rs +++ b/network/src/protocols/identify/mod.rs @@ -12,7 +12,7 @@ use p2p::{ service::{SessionType, TargetProtocol}, traits::ServiceProtocol, utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr}, - ProtocolId, SessionId, + SessionId, }; mod protocol; @@ -62,7 +62,7 @@ pub trait Callback: Clone + Send { // Register open protocol fn register(&self, context: &ProtocolContextMutRef, version: &str); // remove registered identify protocol - fn unregister(&self, id: SessionId, pid: ProtocolId); + fn unregister(&self, context: &ProtocolContextMutRef); /// Received custom message fn received_identify( &mut self, @@ -261,8 +261,7 @@ impl ServiceProtocol for IdentifyProtocol { .remove(&context.session.id) .expect("RemoteInfo must exists"); trace!("IdentifyProtocol disconnected from {:?}", info.peer_id); - self.callback - .unregister(context.session.id, context.proto_id) + self.callback.unregister(&context) } } @@ -374,18 +373,37 @@ impl Callback for IdentifyCallback { .mut_addr_manager() .remove(&context.session.address); } else if context.session.ty.is_outbound() { + // why don't set inbound here? + // because inbound address can't feeler during staying connected + // and if set it to peer store, it will be broadcast to the entire network, + // but this is an unverified address self.network_state.with_peer_store_mut(|peer_store| { peer_store.add_outbound_addr(context.session.address.clone()); }); } } - fn unregister(&self, id: SessionId, pid: ProtocolId) { - self.network_state.with_peer_registry_mut(|reg| { - let _ = reg.get_peer_mut(id).map(|peer| { - peer.protocols.remove(&pid); + fn unregister(&self, context: &ProtocolContextMutRef) { + let version = self + .network_state + .with_peer_registry_mut(|reg| { + reg.get_peer_mut(context.session.id) + .map(|peer| peer.protocols.remove(&context.proto_id)) + }) + .flatten() + .map(|version| version != "2") + .unwrap_or_default(); + + if self.network_state.ckb2021.load(Ordering::SeqCst) && version { + } else if context.session.ty.is_outbound() { + // Due to the filtering strategy of the peer store, if the node is + // disconnected after a long connection is maintained for more than seven days, + // it is possible that the node will be accidentally evicted, so it is necessary + // to reset the information of the node when disconnected. + self.network_state.with_peer_store_mut(|peer_store| { + peer_store.add_outbound_addr(context.session.address.clone()); }); - }); + } } fn identify(&mut self) -> &[u8] { diff --git a/network/src/protocols/test.rs b/network/src/protocols/test.rs index e926ad789e9..01112018773 100644 --- a/network/src/protocols/test.rs +++ b/network/src/protocols/test.rs @@ -357,31 +357,39 @@ fn test_discovery_behavior() { wait_discovery(&node3); - let addr = { + let addrs = { let listen_addr = &node3.listen_addr; - node3 - .network_state - .peer_store - .lock() - .fetch_addrs_to_attempt(2) + let mut locked = node3.network_state.peer_store.lock(); + + let addr = locked + .fetch_addrs_to_feeler(6) .into_iter() .map(|peer| peer.addr) - .find(|addr| { + .flat_map(|addr| { match ( multiaddr_to_socketaddr(&addr), multiaddr_to_socketaddr(listen_addr), ) { - (Some(dis), Some(listen)) => dis.port() != listen.port(), - _ => false, + (Some(dis), Some(listen)) => { + if dis.port() != listen.port() { + Some(addr) + } else { + None + } + } + _ => None, } }) - .unwrap() + .collect::>(); + addr }; - node3.dial_addr( - addr, - TargetProtocol::Single(SupportProtocols::Identify.protocol_id()), - ); + for addr in addrs { + node3.dial_addr( + addr, + TargetProtocol::Single(SupportProtocols::Identify.protocol_id()), + ); + } wait_connect_state(&node1, 2); wait_connect_state(&node2, 2); diff --git a/network/src/services/dump_peer_store.rs b/network/src/services/dump_peer_store.rs index 0fbe9c94936..2db8ec8295f 100644 --- a/network/src/services/dump_peer_store.rs +++ b/network/src/services/dump_peer_store.rs @@ -7,10 +7,11 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use tokio::time::Interval; +use tokio::time::{Instant, Interval, MissedTickBehavior}; const DEFAULT_DUMP_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour +/// Save current peer store data regularly pub struct DumpPeerStoreService { network_state: Arc, interval: Option, @@ -48,19 +49,20 @@ impl Future for DumpPeerStoreService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { - self.interval = Some(tokio::time::interval(DEFAULT_DUMP_INTERVAL)); - } - let mut interval = self.interval.take().unwrap(); - loop { - match interval.poll_tick(cx) { - Poll::Ready(_) => { - self.dump_peer_store(); - } - Poll::Pending => { - self.interval = Some(interval); - return Poll::Pending; - } + self.interval = { + let mut interval = tokio::time::interval_at( + Instant::now() + DEFAULT_DUMP_INTERVAL, + DEFAULT_DUMP_INTERVAL, + ); + // The dump peer store service does not need to urgently compensate for the missed wake, + // just delay behavior is enough + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + Some(interval) } } + while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + self.dump_peer_store() + } + Poll::Pending } } diff --git a/network/src/services/outbound_peer.rs b/network/src/services/outbound_peer.rs index d27191fb67e..2a97494f782 100644 --- a/network/src/services/outbound_peer.rs +++ b/network/src/services/outbound_peer.rs @@ -1,25 +1,29 @@ -use crate::peer_store::types::AddrInfo; use crate::NetworkState; use ckb_logger::trace; use faketime::unix_time_as_millis; use futures::Future; -use p2p::service::ServiceControl; +use p2p::{multiaddr::MultiAddr, service::ServiceControl}; +use rand::prelude::IteratorRandom; use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::{Duration, Instant}, + time::Duration, }; -use tokio::time::Interval; +use tokio::time::{Interval, MissedTickBehavior}; -const FEELER_CONNECTION_COUNT: usize = 5; +const FEELER_CONNECTION_COUNT: usize = 10; +/// Ensure that the outbound of the current node reaches the expected upper limit as much as possible +/// Periodically detect and verify data in the peer store +/// Keep the whitelist nodes connected as much as possible +/// Periodically detection finds that the observed addresses are all valid pub struct OutboundPeerService { network_state: Arc, p2p_control: ServiceControl, interval: Option, try_connect_interval: Duration, - last_connect: Option, + try_identify_count: u8, } impl OutboundPeerService { @@ -33,23 +37,15 @@ impl OutboundPeerService { p2p_control, interval: None, try_connect_interval, - last_connect: None, + try_identify_count: 0, } } - fn dial_peers(&mut self, is_feeler: bool, count: usize) { + fn dial_feeler(&mut self) { let now_ms = unix_time_as_millis(); let attempt_peers = self.network_state.with_peer_store_mut(|peer_store| { - // take extra 5 peers - // in current implementation fetch peers may return less than count - let extra_count = 5; - let mut paddrs = if is_feeler { - peer_store.fetch_addrs_to_feeler(count + extra_count) - } else { - peer_store.fetch_addrs_to_attempt(count + extra_count) - }; - paddrs.truncate(count as usize); - for paddr in &mut paddrs { + let paddrs = peer_store.fetch_addrs_to_feeler(FEELER_CONNECTION_COUNT); + for paddr in paddrs.iter() { // mark addr as tried if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&paddr.addr) { paddr.mark_tried(now_ms); @@ -57,25 +53,67 @@ impl OutboundPeerService { } paddrs }); + trace!( - "count={}, attempt_peers: {:?} is_feeler: {}", - count, + "feeler dial count={}, attempt_peers: {:?}", + attempt_peers.len(), attempt_peers, - is_feeler ); - for paddr in attempt_peers { - let AddrInfo { addr, .. } = paddr; - if is_feeler { - self.network_state.dial_feeler(&self.p2p_control, addr); - } else { - self.network_state.dial_identify(&self.p2p_control, addr); - } + for addr in attempt_peers.into_iter().map(|info| info.addr) { + self.network_state.dial_identify(&self.p2p_control, addr); + } + } + + fn try_dial_peers(&mut self) { + let status = self.network_state.connection_status(); + let count = status + .max_outbound + .saturating_sub(status.non_whitelist_outbound) as usize; + if count == 0 { + self.try_identify_count = 0; + return; + } + self.try_identify_count += 1; + + let peers: Box> = if self.try_identify_count > 3 { + self.try_identify_count = 0; + Box::new( + self.network_state + .bootnodes + .iter() + .choose_multiple(&mut rand::thread_rng(), count) + .into_iter() + .cloned(), + ) + } else { + let now_ms = unix_time_as_millis(); + let attempt_peers = self.network_state.with_peer_store_mut(|peer_store| { + let paddrs = peer_store.fetch_addrs_to_attempt(count); + for paddr in paddrs.iter() { + // mark addr as tried + if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&paddr.addr) { + paddr.mark_tried(now_ms); + } + } + paddrs + }); + + trace!( + "identify dial count={}, attempt_peers: {:?}", + attempt_peers.len(), + attempt_peers, + ); + + Box::new(attempt_peers.into_iter().map(|info| info.addr)) + }; + + for addr in peers { + self.network_state.dial_identify(&self.p2p_control, addr); } } fn try_dial_whitelist(&self) { - // This will never panic because network start has already been checked for addr in self.network_state.config.whitelist_peers() { self.network_state.dial_identify(&self.p2p_control, addr); } @@ -92,43 +130,24 @@ impl Future for OutboundPeerService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { - self.interval = Some(tokio::time::interval(Duration::from_secs(1))); - } - let mut interval = self.interval.take().unwrap(); - loop { - match interval.poll_tick(cx) { - Poll::Ready(_) => { - let last_connect = self - .last_connect - .map(|time| time.elapsed()) - .unwrap_or_else(|| Duration::from_secs(std::u64::MAX)); - if last_connect > self.try_connect_interval { - let status = self.network_state.connection_status(); - let new_outbound = status - .max_outbound - .saturating_sub(status.non_whitelist_outbound) - as usize; - if !self.network_state.config.whitelist_only { - if new_outbound > 0 { - // dial peers - self.dial_peers(false, new_outbound); - } else { - // feeler peers - self.dial_peers(true, FEELER_CONNECTION_COUNT); - } - } - // keep whitelist peer on connected - self.try_dial_whitelist(); - // try dial observed addrs - self.try_dial_observed(); - self.last_connect = Some(Instant::now()); - } - } - Poll::Pending => { - self.interval = Some(interval); - return Poll::Pending; - } + self.interval = { + let mut interval = tokio::time::interval(self.try_connect_interval); + // The outbound service does not need to urgently compensate for the missed wake, + // just skip behavior is enough + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + Some(interval) } } + while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + // ensure feeler work at any time + self.dial_feeler(); + // keep outbound peer is enough + self.try_dial_peers(); + // keep whitelist peer on connected + self.try_dial_whitelist(); + // try dial observed addrs + self.try_dial_observed(); + } + Poll::Pending } } diff --git a/network/src/services/protocol_type_checker.rs b/network/src/services/protocol_type_checker.rs index 9c5ff9537de..409f17dc716 100644 --- a/network/src/services/protocol_type_checker.rs +++ b/network/src/services/protocol_type_checker.rs @@ -18,7 +18,7 @@ use std::{ task::{Context, Poll}, time::{Duration, Instant}, }; -use tokio::time::Interval; +use tokio::time::{Interval, MissedTickBehavior}; const TIMEOUT: Duration = Duration::from_secs(60); const CHECK_INTERVAL: Duration = Duration::from_secs(30); @@ -55,6 +55,8 @@ impl std::fmt::Display for ProtocolTypeError { } } +/// Periodically check whether all connections are normally open sync protocol, +/// if not, close the connection pub struct ProtocolTypeCheckerService { network_state: Arc, p2p_control: ServiceControl, @@ -126,17 +128,17 @@ impl Future for ProtocolTypeCheckerService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { - self.interval = Some(tokio::time::interval(CHECK_INTERVAL)); - } - let mut interval = self.interval.take().unwrap(); - loop { - match interval.poll_tick(cx) { - Poll::Ready(_) => self.check_protocol_type(), - Poll::Pending => { - self.interval = Some(interval); - return Poll::Pending; - } + self.interval = { + let mut interval = tokio::time::interval(CHECK_INTERVAL); + // The protocol type checker service does not need to urgently compensate for the missed wake, + // just skip behavior is enough + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + Some(interval) } } + while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + self.check_protocol_type(); + } + Poll::Pending } } diff --git a/network/src/tests/peer_store.rs b/network/src/tests/peer_store.rs index 845afa66e8b..0d1d006309e 100644 --- a/network/src/tests/peer_store.rs +++ b/network/src/tests/peer_store.rs @@ -2,7 +2,7 @@ use super::random_addr; use crate::{ extract_peer_id, multiaddr::Multiaddr, - peer_store::{PeerStore, Status, ADDR_COUNT_LIMIT}, + peer_store::{PeerStore, Status, ADDR_COUNT_LIMIT, ADDR_TRY_TIMEOUT_MS}, Behaviour, PeerId, SessionType, }; @@ -22,8 +22,9 @@ fn test_add_addr() { assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 0); let addr = random_addr(); peer_store.add_addr(addr).unwrap(); - assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); + assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1); // we have not connected yet, so return 0 + assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 0); assert_eq!(peer_store.fetch_random_addrs(2).len(), 0); } @@ -59,6 +60,11 @@ fn test_attempt_ban() { let mut peer_store: PeerStore = Default::default(); let addr = random_addr(); peer_store.add_addr(addr.clone()).unwrap(); + peer_store + .mut_addr_manager() + .get_mut(&addr) + .unwrap() + .last_connected_at_ms = faketime::unix_time_as_millis(); assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); peer_store.ban_addr(&addr, 10_000, "no reason".into()); assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 0); @@ -70,11 +76,36 @@ fn test_fetch_addrs_to_attempt() { assert!(peer_store.fetch_addrs_to_attempt(1).is_empty()); let addr = random_addr(); peer_store.add_addr(addr.clone()).unwrap(); + peer_store + .mut_addr_manager() + .get_mut(&addr) + .unwrap() + .last_connected_at_ms = faketime::unix_time_as_millis(); assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); peer_store.add_connected_peer(addr, SessionType::Outbound); assert!(peer_store.fetch_addrs_to_attempt(1).is_empty()); } +#[cfg(not(disable_faketime))] +#[test] +fn test_fetch_addrs_to_attempt_or_feeler() { + let faketime_file = faketime::millis_tempfile(0).expect("create faketime file"); + faketime::enable(&faketime_file); + + faketime::write_millis(&faketime_file, 1).expect("write millis"); + + let mut peer_store: PeerStore = Default::default(); + let addr = random_addr(); + peer_store.add_outbound_addr(addr); + assert_eq!(peer_store.fetch_addrs_to_attempt(2).len(), 1); + assert!(peer_store.fetch_addrs_to_feeler(2).is_empty()); + + faketime::write_millis(&faketime_file, ADDR_TRY_TIMEOUT_MS + 1).expect("write millis"); + + assert!(peer_store.fetch_addrs_to_attempt(2).is_empty()); + assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1); +} + #[test] fn test_fetch_addrs_to_attempt_in_last_minutes() { let mut peer_store: PeerStore = Default::default(); @@ -90,6 +121,16 @@ fn test_fetch_addrs_to_attempt_in_last_minutes() { if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&addr) { paddr.mark_tried(now - 60_001); } + assert!(peer_store.fetch_addrs_to_attempt(1).is_empty()); + peer_store + .mut_addr_manager() + .get_mut(&addr) + .unwrap() + .last_connected_at_ms = now; + assert_eq!(peer_store.fetch_addrs_to_attempt(1).len(), 1); + if let Some(paddr) = peer_store.mut_addr_manager().get_mut(&addr) { + paddr.mark_tried(now); + } assert_eq!(peer_store.fetch_addrs_to_attempt(1).len(), 1); } @@ -245,8 +286,7 @@ fn test_eviction() { .unwrap(); peer_store.add_addr(new_peer_addr.clone()).unwrap(); // check addrs - // peer store will evict peers from largest network group to low group - // the two evict_addrs should be evict, other addrs will remain in peer store + // peer store will evict all peers which are invalid assert!(peer_store.mut_addr_manager().get(&new_peer_addr).is_some()); assert!(peer_store.mut_addr_manager().get(&evict_addr_2).is_none()); assert!(peer_store.mut_addr_manager().get(&evict_addr).is_none());