Skip to content

Commit

Permalink
refactor: refactor peer store
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Aug 23, 2021
1 parent adce2ba commit 78159f9
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 158 deletions.
5 changes: 3 additions & 2 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct NetworkState {
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
local_private_key: secio::SecioKeyPair,
local_peer_id: PeerId,
bootnodes: Vec<Multiaddr>,
pub(crate) bootnodes: Vec<Multiaddr>,
pub(crate) config: NetworkConfig,
pub(crate) active: AtomicBool,
/// Node supported protocols
Expand Down Expand Up @@ -856,7 +856,8 @@ impl<T: ExitHandler> NetworkService<T> {
.yamux_config(yamux_config)
.forever(true)
.max_connection_number(1024)
.set_send_buffer_size(config.max_send_buffer());
.set_send_buffer_size(config.max_send_buffer())
.timeout(Duration::from_secs(5));

#[cfg(target_os = "linux")]
let p2p_service = {
Expand Down
10 changes: 10 additions & 0 deletions network/src/peer_store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
//! Peer store manager
//!
//! This module implements a locally managed node information set, which is used for
//! booting into the network when the node is started, real-time update detection/timing
//! saving at runtime, and saving data when stopping
//!
//! The validity and screening speed of the data set are very important to the entire network,
//! and the address information collected on the network cannot be blindly trusted

pub mod addr_manager;
pub mod ban_list;
mod peer_store_db;
Expand All @@ -14,6 +22,8 @@ pub use peer_store_impl::PeerStore;
pub(crate) const ADDR_COUNT_LIMIT: usize = 16384;
/// Consider we never seen a peer if peer's last_connected_at beyond this timeout
const ADDR_TIMEOUT_MS: u64 = 7 * 24 * 3600 * 1000;
/// The timeout that peer's address should be added to the feeler list again
pub(crate) const ADDR_TRY_TIMEOUT_MS: u64 = 3 * 24 * 3600 * 1000;
const ADDR_MAX_RETRIES: u32 = 3;
const ADDR_MAX_FAILURES: u32 = 10;

Expand Down
72 changes: 32 additions & 40 deletions network/src/peer_store/peer_store_impl.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use crate::{
errors::{PeerStoreError, Result},
extract_peer_id, multiaddr_to_socketaddr,
network_group::Group,
peer_store::{
addr_manager::AddrManager,
ban_list::BanList,
types::{ip_to_network, AddrInfo, BannedAddr, PeerInfo},
Behaviour, Multiaddr, PeerScoreConfig, ReportResult, Status, ADDR_COUNT_LIMIT,
ADDR_TIMEOUT_MS,
ADDR_TIMEOUT_MS, ADDR_TRY_TIMEOUT_MS,
},
PeerId, SessionType,
};
use ipnetwork::IpNetwork;
use std::collections::{hash_map::Entry, HashMap};

/// Peer store
///
/// | -- choose to identify --| --- choose to feeler --- | -- delete -- |
/// | 1 | 2 | 3 | 4 | 5 | 6 | 7 | More than seven days |
#[derive(Default)]
pub struct PeerStore {
addr_manager: AddrManager,
Expand Down Expand Up @@ -43,12 +45,12 @@ impl PeerStore {
{
Entry::Occupied(mut entry) => {
let mut peer = entry.get_mut();
peer.connected_addr = addr.clone();
peer.connected_addr = addr;
peer.last_connected_at_ms = now_ms;
peer.session_type = session_type;
}
Entry::Vacant(entry) => {
let peer = PeerInfo::new(addr.clone(), session_type, now_ms);
let peer = PeerInfo::new(addr, session_type, now_ms);
entry.insert(peer);
}
}
Expand Down Expand Up @@ -111,28 +113,39 @@ impl PeerStore {
}
}

/// Get peers for outbound connection, this method randomly return non-connected peer addrs
/// Get peers for outbound connection, this method randomly return recently connected peer addrs
pub fn fetch_addrs_to_attempt(&mut self, count: usize) -> Vec<AddrInfo> {
// Get info:
// 1. Not in ban list
// 2. Not already connected
// 3. Connected within 3 days

let now_ms = faketime::unix_time_as_millis();
let ban_list = &self.ban_list;
let peers = &self.peers;
let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
// get addrs that can attempt.
self.addr_manager
.fetch_random(count, |peer_addr: &AddrInfo| {
!ban_list.is_addr_banned(&peer_addr.addr)
&& extract_peer_id(&peer_addr.addr)
.map(|peer_id| !peers.contains_key(&peer_id))
.unwrap_or_default()
&& !peer_addr.tried_in_last_minute(now_ms)
&& peer_addr.had_connected(addr_expired_ms)
})
}

/// Get peers for feeler connection, this method randomly return peer addrs that we never
/// connected to.
pub fn fetch_addrs_to_feeler(&mut self, count: usize) -> Vec<AddrInfo> {
// Get info:
// 1. Not in ban list
// 2. Not already connected
// 3. Not already tried in a minute
// 4. Not connected within 3 days

let now_ms = faketime::unix_time_as_millis();
let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS;
// get expired or never successed addrs.
let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS);
let ban_list = &self.ban_list;
let peers = &self.peers;
self.addr_manager
Expand All @@ -148,8 +161,12 @@ impl PeerStore {

/// Return valid addrs that success connected, used for discovery.
pub fn fetch_random_addrs(&mut self, count: usize) -> Vec<AddrInfo> {
// Get info:
// 1. Not in ban list
// 2. Already connected or Connected within 7 days

let now_ms = faketime::unix_time_as_millis();
let addr_expired_ms = now_ms - ADDR_TIMEOUT_MS;
let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS);
let ban_list = &self.ban_list;
let peers = &self.peers;
// get success connected addrs.
Expand Down Expand Up @@ -208,43 +225,18 @@ impl PeerStore {
if self.addr_manager.count() < ADDR_COUNT_LIMIT {
return Ok(());
}
// Evicting invalid data in the peer store is a relatively rare operation
// There are certain cleanup strategies here:
// 1. Group current data according to network segment
// 2. Sort according to the amount of data in the same network segment
// 3. Prioritize cleaning on the same network segment

let now_ms = faketime::unix_time_as_millis();
let candidate_peers: Vec<_> = {
// find candidate peers by network group
let mut peers_by_network_group: HashMap<Group, Vec<_>> = HashMap::default();
// find candidate peers by terrible condition
let ban_score = self.score_config.ban_score;
let mut peers = Vec::default();
for addr in self.addr_manager.addrs_iter() {
peers_by_network_group
.entry((&addr.addr).into())
.or_default()
.push(addr);
if addr.is_terrible(now_ms) || addr.score <= ban_score {
peers.push(addr.addr.clone())
}
}
let len = peers_by_network_group.len();
let mut peers = peers_by_network_group
.drain()
.map(|(_, v)| v)
.collect::<Vec<Vec<_>>>();

peers.sort_unstable_by_key(|k| std::cmp::Reverse(k.len()));
let ban_score = self.score_config.ban_score;

peers
.into_iter()
.take(len / 2)
.flatten()
.filter_map(move |addr| {
if addr.is_terrible(now_ms) || addr.score <= ban_score {
Some(addr.addr.clone())
} else {
None
}
})
.collect()
};

if candidate_peers.is_empty() {
Expand Down
36 changes: 27 additions & 9 deletions network/src/protocols/identify/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use p2p::{
service::{SessionType, TargetProtocol},
traits::ServiceProtocol,
utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
ProtocolId, SessionId,
SessionId,
};

mod protocol;
Expand Down Expand Up @@ -62,7 +62,7 @@ pub trait Callback: Clone + Send {
// Register open protocol
fn register(&self, context: &ProtocolContextMutRef, version: &str);
// remove registered identify protocol
fn unregister(&self, id: SessionId, pid: ProtocolId);
fn unregister(&self, context: &ProtocolContextMutRef);
/// Received custom message
fn received_identify(
&mut self,
Expand Down Expand Up @@ -261,8 +261,7 @@ impl<T: Callback> ServiceProtocol for IdentifyProtocol<T> {
.remove(&context.session.id)
.expect("RemoteInfo must exists");
trace!("IdentifyProtocol disconnected from {:?}", info.peer_id);
self.callback
.unregister(context.session.id, context.proto_id)
self.callback.unregister(&context)
}
}

Expand Down Expand Up @@ -374,18 +373,37 @@ impl Callback for IdentifyCallback {
.mut_addr_manager()
.remove(&context.session.address);
} else if context.session.ty.is_outbound() {
// why don't set inbound here?
// because inbound address can't feeler during staying connected
// and if set it to peer store, it will be broadcast to the entire network,
// but this is an unverified address
self.network_state.with_peer_store_mut(|peer_store| {
peer_store.add_outbound_addr(context.session.address.clone());
});
}
}

fn unregister(&self, id: SessionId, pid: ProtocolId) {
self.network_state.with_peer_registry_mut(|reg| {
let _ = reg.get_peer_mut(id).map(|peer| {
peer.protocols.remove(&pid);
fn unregister(&self, context: &ProtocolContextMutRef) {
let version = self
.network_state
.with_peer_registry_mut(|reg| {
reg.get_peer_mut(context.session.id)
.map(|peer| peer.protocols.remove(&context.proto_id))
})
.flatten()
.map(|version| version != "2")
.unwrap_or_default();

if self.network_state.ckb2021.load(Ordering::SeqCst) && version {
} else if context.session.ty.is_outbound() {
// Due to the filtering strategy of the peer store, if the node is
// disconnected after a long connection is maintained for more than seven days,
// it is possible that the node will be accidentally evicted, so it is necessary
// to reset the information of the node when disconnected.
self.network_state.with_peer_store_mut(|peer_store| {
peer_store.add_outbound_addr(context.session.address.clone());
});
});
}
}

fn identify(&mut self) -> &[u8] {
Expand Down
36 changes: 22 additions & 14 deletions network/src/protocols/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,31 +357,39 @@ fn test_discovery_behavior() {

wait_discovery(&node3);

let addr = {
let addrs = {
let listen_addr = &node3.listen_addr;
node3
.network_state
.peer_store
.lock()
.fetch_addrs_to_attempt(2)
let mut locked = node3.network_state.peer_store.lock();

let addr = locked
.fetch_addrs_to_feeler(6)
.into_iter()
.map(|peer| peer.addr)
.find(|addr| {
.flat_map(|addr| {
match (
multiaddr_to_socketaddr(&addr),
multiaddr_to_socketaddr(listen_addr),
) {
(Some(dis), Some(listen)) => dis.port() != listen.port(),
_ => false,
(Some(dis), Some(listen)) => {
if dis.port() != listen.port() {
Some(addr)
} else {
None
}
}
_ => None,
}
})
.unwrap()
.collect::<Vec<_>>();
addr
};

node3.dial_addr(
addr,
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
);
for addr in addrs {
node3.dial_addr(
addr,
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
);
}

wait_connect_state(&node1, 2);
wait_connect_state(&node2, 2);
Expand Down
28 changes: 15 additions & 13 deletions network/src/services/dump_peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use tokio::time::Interval;
use tokio::time::{Instant, Interval, MissedTickBehavior};

const DEFAULT_DUMP_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour

/// Save current peer store data regularly
pub struct DumpPeerStoreService {
network_state: Arc<NetworkState>,
interval: Option<Interval>,
Expand Down Expand Up @@ -48,19 +49,20 @@ impl Future for DumpPeerStoreService {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.interval.is_none() {
self.interval = Some(tokio::time::interval(DEFAULT_DUMP_INTERVAL));
}
let mut interval = self.interval.take().unwrap();
loop {
match interval.poll_tick(cx) {
Poll::Ready(_) => {
self.dump_peer_store();
}
Poll::Pending => {
self.interval = Some(interval);
return Poll::Pending;
}
self.interval = {
let mut interval = tokio::time::interval_at(
Instant::now() + DEFAULT_DUMP_INTERVAL,
DEFAULT_DUMP_INTERVAL,
);
// The dump peer store service does not need to urgently compensate for the missed wake,
// just delay behavior is enough
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
Some(interval)
}
}
while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() {
self.dump_peer_store()
}
Poll::Pending
}
}
Loading

0 comments on commit 78159f9

Please sign in to comment.