Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor peer store #2964

Merged
merged 5 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct NetworkState {
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
local_private_key: secio::SecioKeyPair,
local_peer_id: PeerId,
bootnodes: Vec<Multiaddr>,
pub(crate) bootnodes: Vec<Multiaddr>,
pub(crate) config: NetworkConfig,
pub(crate) active: AtomicBool,
/// Node supported protocols
Expand Down Expand Up @@ -856,7 +856,8 @@ impl<T: ExitHandler> NetworkService<T> {
.yamux_config(yamux_config)
.forever(true)
.max_connection_number(1024)
.set_send_buffer_size(config.max_send_buffer());
.set_send_buffer_size(config.max_send_buffer())
.timeout(Duration::from_secs(5));

#[cfg(target_os = "linux")]
let p2p_service = {
Expand Down
2 changes: 1 addition & 1 deletion network/src/peer_store/addr_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl AddrManager {
// TODO remove this after fix the network tests.
let is_test_ip = ip.is_unspecified() || ip.is_loopback();
if (is_test_ip || is_unique_ip)
&& !addr_info.is_terrible(now_ms)
&& addr_info.is_connectable(now_ms)
&& filter(&addr_info)
{
addr_infos.push(addr_info);
Expand Down
13 changes: 13 additions & 0 deletions network/src/peer_store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
//! Peer store manager
//!
//! This module implements a locally managed node information set, which is used for
//! booting into the network when the node is started, real-time update detection/timing
//! saving at runtime, and saving data when stopping
//!
//! The validity and screening speed of the data set are very important to the entire network,
//! and the address information collected on the network cannot be blindly trusted

pub mod addr_manager;
pub mod ban_list;
mod peer_store_db;
Expand All @@ -14,6 +22,11 @@ pub use peer_store_impl::PeerStore;
pub(crate) const ADDR_COUNT_LIMIT: usize = 16384;
/// Consider we never seen a peer if peer's last_connected_at beyond this timeout
const ADDR_TIMEOUT_MS: u64 = 7 * 24 * 3600 * 1000;
/// The timeout that peer's address should be added to the feeler list again
pub(crate) const ADDR_TRY_TIMEOUT_MS: u64 = 3 * 24 * 3600 * 1000;
/// When obtaining the list of selectable nodes for identify,
/// the node that has just been disconnected needs to be excluded
pub(crate) const DIAL_INTERVAL: u64 = 15 * 1000;
const ADDR_MAX_RETRIES: u32 = 3;
const ADDR_MAX_FAILURES: u32 = 10;

Expand Down
169 changes: 107 additions & 62 deletions network/src/peer_store/peer_store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ use crate::{
ban_list::BanList,
types::{ip_to_network, AddrInfo, BannedAddr, PeerInfo},
Behaviour, Multiaddr, PeerScoreConfig, ReportResult, Status, ADDR_COUNT_LIMIT,
ADDR_TIMEOUT_MS,
ADDR_TIMEOUT_MS, ADDR_TRY_TIMEOUT_MS, DIAL_INTERVAL,
},
PeerId, SessionType,
};
use ipnetwork::IpNetwork;
use rand::prelude::IteratorRandom;
use std::collections::{hash_map::Entry, HashMap};

/// Peer store
///
/// | -- choose to identify --| --- choose to feeler --- | -- delete -- |
/// | 1 | 2 | 3 | 4 | 5 | 6 | 7 | More than seven days |
#[derive(Default)]
pub struct PeerStore {
addr_manager: AddrManager,
Expand Down Expand Up @@ -43,12 +47,12 @@ impl PeerStore {
{
Entry::Occupied(mut entry) => {
let mut peer = entry.get_mut();
peer.connected_addr = addr.clone();
peer.connected_addr = addr;
peer.last_connected_at_ms = now_ms;
peer.session_type = session_type;
}
Entry::Vacant(entry) => {
let peer = PeerInfo::new(addr.clone(), session_type, now_ms);
let peer = PeerInfo::new(addr, session_type, now_ms);
entry.insert(peer);
}
}
Expand All @@ -57,6 +61,9 @@ impl PeerStore {
/// Add discovered peer address
/// this method will assume peer and addr is untrust since we have not connected to it.
pub fn add_addr(&mut self, addr: Multiaddr) -> Result<()> {
if self.ban_list.is_addr_banned(&addr) {
driftluo marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
}
self.check_purge()?;
let score = self.score_config.default_score;
self.addr_manager.add(AddrInfo::new(addr, 0, score));
Expand All @@ -65,6 +72,9 @@ impl PeerStore {

/// Add outbound peer address
pub fn add_outbound_addr(&mut self, addr: Multiaddr) {
if self.ban_list.is_addr_banned(&addr) {
return;
}
let score = self.score_config.default_score;
self.addr_manager
.add(AddrInfo::new(addr, faketime::unix_time_as_millis(), score));
Expand Down Expand Up @@ -111,55 +121,63 @@ impl PeerStore {
}
}

/// Get peers for outbound connection, this method randomly return non-connected peer addrs
/// Get peers for outbound connection, this method randomly return recently connected peer addrs
pub fn fetch_addrs_to_attempt(&mut self, count: usize) -> Vec<AddrInfo> {
// Get info:
// 1. Not already connected
// 2. Connected within 3 days

let now_ms = faketime::unix_time_as_millis();
let ban_list = &self.ban_list;
let peers = &self.peers;
let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
// get addrs that can attempt.
self.addr_manager
.fetch_random(count, |peer_addr: &AddrInfo| {
!ban_list.is_addr_banned(&peer_addr.addr)
&& extract_peer_id(&peer_addr.addr)
.map(|peer_id| !peers.contains_key(&peer_id))
.unwrap_or_default()
&& !peer_addr.tried_in_last_minute(now_ms)
extract_peer_id(&peer_addr.addr)
.map(|peer_id| !peers.contains_key(&peer_id))
.unwrap_or_default()
&& peer_addr.connected(|t| {
t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL)
})
})
}

/// Get peers for feeler connection, this method randomly return peer addrs that we never
/// connected to.
pub fn fetch_addrs_to_feeler(&mut self, count: usize) -> Vec<AddrInfo> {
// Get info:
// 1. Not already connected
// 2. Not already tried in a minute
// 3. Not connected within 3 days

let now_ms = faketime::unix_time_as_millis();
let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS;
// get expired or never successed addrs.
let ban_list = &self.ban_list;
let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
let peers = &self.peers;
self.addr_manager
.fetch_random(count, |peer_addr: &AddrInfo| {
!ban_list.is_addr_banned(&peer_addr.addr)
&& extract_peer_id(&peer_addr.addr)
.map(|peer_id| !peers.contains_key(&peer_id))
.unwrap_or_default()
extract_peer_id(&peer_addr.addr)
.map(|peer_id| !peers.contains_key(&peer_id))
.unwrap_or_default()
&& !peer_addr.tried_in_last_minute(now_ms)
&& !peer_addr.had_connected(addr_expired_ms)
&& !peer_addr.connected(|t| t > addr_expired_ms)
})
}

/// Return valid addrs that success connected, used for discovery.
pub fn fetch_random_addrs(&mut self, count: usize) -> Vec<AddrInfo> {
// Get info:
// 1. Already connected or Connected within 7 days

let now_ms = faketime::unix_time_as_millis();
let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS;
let ban_list = &self.ban_list;
let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS);
let peers = &self.peers;
// get success connected addrs.
self.addr_manager
.fetch_random(count, |peer_addr: &AddrInfo| {
!ban_list.is_addr_banned(&peer_addr.addr)
&& (extract_peer_id(&peer_addr.addr)
.map(|peer_id| peers.contains_key(&peer_id))
.unwrap_or_default()
|| peer_addr.had_connected(addr_expired_ms))
extract_peer_id(&peer_addr.addr)
.map(|peer_id| peers.contains_key(&peer_id))
.unwrap_or_default()
|| peer_addr.connected(|t| t > addr_expired_ms)
})
}

Expand All @@ -169,6 +187,7 @@ impl PeerStore {
let network = ip_to_network(addr.ip());
self.ban_network(network, timeout_ms, ban_reason)
}
self.addr_manager.remove(addr);
}

pub(crate) fn ban_network(&mut self, network: IpNetwork, timeout_ms: u64, ban_reason: String) {
Expand Down Expand Up @@ -208,51 +227,77 @@ impl PeerStore {
if self.addr_manager.count() < ADDR_COUNT_LIMIT {
return Ok(());
}

// Evicting invalid data in the peer store is a relatively rare operation
// There are certain cleanup strategies here:
// 1. Group current data according to network segment
// 2. Sort according to the amount of data in the same network segment
// 3. Prioritize cleaning on the same network segment
// 1. First evict the nodes that have reached the eviction condition
// 2. If the first step is unsuccessful, enter the network segment grouping mode
// 2.1. Group current data according to network segment
// 2.2. Sort according to the amount of data in the same network segment
// 2.3. In the network segment with more than 4 peer, randomly evict 2 peer

let now_ms = faketime::unix_time_as_millis();
let candidate_peers: Vec<_> = {
// find candidate peers by network group
let mut peers_by_network_group: HashMap<Group, Vec<_>> = HashMap::default();
for addr in self.addr_manager.addrs_iter() {
peers_by_network_group
.entry((&addr.addr).into())
.or_default()
.push(addr);
}
let len = peers_by_network_group.len();
let mut peers = peers_by_network_group
.drain()
.map(|(_, v)| v)
.collect::<Vec<Vec<_>>>();

peers.sort_unstable_by_key(|k| std::cmp::Reverse(k.len()));
let ban_score = self.score_config.ban_score;
let candidate_peers: Vec<_> = self
.addr_manager
.addrs_iter()
.filter_map(|addr| {
if !addr.is_connectable(now_ms) {
Some(addr.addr.clone())
} else {
None
}
})
.collect();

peers
.into_iter()
.take(len / 2)
.flatten()
.filter_map(move |addr| {
if addr.is_terrible(now_ms) || addr.score <= ban_score {
Some(addr.addr.clone())
} else {
None
}
})
.collect()
};
for key in candidate_peers.iter() {
self.addr_manager.remove(&key);
}

if candidate_peers.is_empty() {
return Err(PeerStoreError::EvictionFailed.into());
}
let candidate_peers: Vec<_> = {
let mut peers_by_network_group: HashMap<Group, Vec<_>> = HashMap::default();
for addr in self.addr_manager.addrs_iter() {
peers_by_network_group
.entry((&addr.addr).into())
.or_default()
.push(addr);
}
let len = peers_by_network_group.len();
let mut peers = peers_by_network_group
.drain()
.map(|(_, v)| v)
.collect::<Vec<Vec<_>>>();

for key in candidate_peers {
self.addr_manager.remove(&key);
peers.sort_unstable_by_key(|k| std::cmp::Reverse(k.len()));

peers
.into_iter()
.take(len / 2)
.flat_map(move |addrs| {
if addrs.len() > 4 {
Some(
addrs
.iter()
.choose_multiple(&mut rand::thread_rng(), 2)
.into_iter()
.map(|addr| addr.addr.clone())
.collect::<Vec<Multiaddr>>(),
)
} else {
None
}
})
.flatten()
.collect()
};

for key in candidate_peers.iter() {
self.addr_manager.remove(&key);
}

if candidate_peers.is_empty() {
return Err(PeerStoreError::EvictionFailed.into());
}
}
Ok(())
}
Expand Down
20 changes: 10 additions & 10 deletions network/src/peer_store/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,33 +61,33 @@ impl AddrInfo {
}
}

/// Whether already connected
pub fn had_connected(&self, expires_ms: u64) -> bool {
self.last_connected_at_ms > expires_ms
/// Connection information
pub fn connected<F: FnOnce(u64) -> bool>(&self, f: F) -> bool {
f(self.last_connected_at_ms)
}

/// Whether already try dail within a minute
pub fn tried_in_last_minute(&self, now_ms: u64) -> bool {
self.last_tried_at_ms >= now_ms.saturating_sub(60_000)
}

/// Whether terrible peer
pub fn is_terrible(&self, now_ms: u64) -> bool {
/// Whether connectable peer
pub fn is_connectable(&self, now_ms: u64) -> bool {
// do not remove addr tried in last minute
if self.tried_in_last_minute(now_ms) {
return false;
return true;
}
// we give up if never connect to this addr
if self.last_connected_at_ms == 0 && self.attempts_count >= ADDR_MAX_RETRIES {
return true;
return false;
}
// consider addr is terrible if failed too many times
// consider addr is not connectable if failed too many times
if now_ms.saturating_sub(self.last_connected_at_ms) > ADDR_TIMEOUT_MS
&& (self.attempts_count >= ADDR_MAX_FAILURES)
{
return true;
return false;
}
false
true
}

/// Try dail count
Expand Down
Loading