Skip to content

Commit

Permalink
separate timeouts for direct and relayed peers
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Jan 18, 2024
1 parent 4e8ce39 commit 08d0ab2
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 37 deletions.
9 changes: 6 additions & 3 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ pub fn new(
#[derive(Copy, Clone, Debug)]
pub struct PeriodicTaskConfig {
pub bootstrap: BootstrapConfig,
/// A peer can only connect once in this period.
pub connection_timeout: Duration,
/// A direct (not relayed) peer can only connect once in this period.
pub direct_connection_timeout: Duration,
/// A relayed peer can only connect once in this period.
pub relay_connection_timeout: Duration,
}

#[derive(Copy, Clone, Debug)]
Expand All @@ -103,7 +105,8 @@ impl Default for PeriodicTaskConfig {
start_offset: Duration::from_secs(5),
period: Duration::from_secs(10 * 60),
},
connection_timeout: Duration::from_secs(30),
direct_connection_timeout: Duration::from_secs(30),
relay_connection_timeout: Duration::from_secs(10),
}
}
}
Expand Down
46 changes: 34 additions & 12 deletions crates/p2p/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use futures::{channel::mpsc::Receiver as ResponseReceiver, StreamExt};
use libp2p::gossipsub::{self, IdentTopic};
use libp2p::identify;
use libp2p::{identify, Multiaddr};
use libp2p::kad::{self, BootstrapError, BootstrapOk, QueryId, QueryResult};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::dial_opts::DialOpts;
Expand Down Expand Up @@ -36,8 +36,12 @@ pub struct MainLoop {
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
peers: Arc<RwLock<peers::Peers>>,
/// Peers that have connected within the last `connection_timeout` seconds.
recent_peers: RecentPeers,
/// Recent peers that have connected directly (not over a relay).
///
/// The distinction is important because different limits apply to direct and relayed peers.
recent_direct_peers: RecentPeers,
/// Recent peers that have connected over a relay.
recent_relay_peers: RecentPeers,
pending_dials: HashMap<PeerId, EmptyResultSender>,
pending_sync_requests: PendingRequests,
// TODO there's no sync status message anymore so we have to:
Expand Down Expand Up @@ -89,7 +93,8 @@ impl MainLoop {
) -> Self {
Self {
bootstrap_cfg: periodic_cfg.bootstrap,
recent_peers: RecentPeers::new(periodic_cfg.connection_timeout),
recent_direct_peers: RecentPeers::new(periodic_cfg.direct_connection_timeout),
recent_relay_peers: RecentPeers::new(periodic_cfg.relay_connection_timeout),
swarm,
command_receiver,
event_sender,
Expand Down Expand Up @@ -177,11 +182,7 @@ impl MainLoop {
peer_id, endpoint, ..
} => {
// Extract the IP address of the peer from his multiaddr.
let peer_ip = endpoint.get_remote_address().iter().find_map(|p| match p {
Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
_ => None,
});
let peer_ip = get_ip(endpoint.get_remote_address());

// If the peer has no IP address, disconnect.
let Some(peer_ip) = peer_ip else {
Expand All @@ -196,17 +197,26 @@ impl MainLoop {
// If this is an incoming connection, we have to prevent the peer from
// reconnecting too quickly.
if endpoint.is_listener() {
// Is this connection established over a relay node?
let is_relay = endpoint.get_remote_address().iter().any(|p| p == Protocol::P2pCircuit);
// Different rules apply to direct and relayed peers.
let recent_peers = if is_relay {
&mut self.recent_relay_peers
} else {
&mut self.recent_direct_peers
};

// If the peer is in the recent peers set, this means he is attempting to
// reconnect too quickly. Close the connection.
if self.recent_peers.contains(&peer_ip) {
if recent_peers.contains(&peer_ip) {
tracing::debug!(%peer_id, "Peer attempted to reconnect too quickly, closing");
if let Err(e) = self.disconnect(peer_id).await {
tracing::error!(%e, "Failed to disconnect peer");
}
return;
} else {
// Otherwise, add the peer to the recent peers set.
self.recent_peers.insert(peer_id, peer_ip);
recent_peers.insert(peer_ip);
}
}

Expand Down Expand Up @@ -246,12 +256,16 @@ impl MainLoop {
peer_id,
num_established,
connection_id: _, // TODO consider tracking connection IDs for peers
endpoint,
..
} => {
if num_established == 0 {
self.peers.write().await.peer_disconnected(&peer_id);
// Don't keep expired peers in the recent peers set.
self.recent_peers.remove_if_expired(peer_id);
if let Some(peer_ip) = get_ip(endpoint.get_remote_address()) {
self.recent_direct_peers.remove_if_expired(&peer_ip);
self.recent_relay_peers.remove_if_expired(&peer_ip);
}
tracing::debug!(%peer_id, "Fully disconnected from");
send_test_event(
&self.event_sender,
Expand Down Expand Up @@ -944,6 +958,14 @@ impl MainLoop {
}
}

fn get_ip(addr: &Multiaddr) -> Option<IpAddr> {
addr.iter().find_map(|p| match p {
Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
_ => None,
})
}

/// No-op outside tests
async fn send_test_event(_event_sender: &mpsc::Sender<Event>, _event: TestEvent) {
#[cfg(test)]
Expand Down
29 changes: 9 additions & 20 deletions crates/p2p/src/recent_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,50 @@ use std::{
time::{Duration, Instant},
};

use libp2p::PeerId;

/// Set of recently connected peers. Peers are tracked primarily by their IP address, but the
/// peer ID is also stored to allow for removal of peers.
///
/// Peers are removed from the set after a timeout. The actual removal only happens once any
/// of the methods on this type are called.
#[derive(Debug)]
pub struct RecentPeers {
instants: HashMap<IpAddr, Instant>,
ips: HashMap<PeerId, IpAddr>,
peers: HashMap<IpAddr, Instant>,
timeout: Duration,
}

impl RecentPeers {
pub fn new(timeout: Duration) -> Self {
Self {
instants: HashMap::new(),
ips: HashMap::new(),
peers: HashMap::new(),
timeout,
}
}

/// Insert the peer into the recent peers set.
///
/// Panics if the peer is already in the set.
pub fn insert(&mut self, peer_id: PeerId, peer_ip: IpAddr) {
if self.instants.insert(peer_ip, Instant::now()).is_some() {
pub fn insert(&mut self, peer_ip: IpAddr) {
if self.peers.insert(peer_ip, Instant::now()).is_some() {
panic!("peer already in the set, was insert called before contains?");
}
self.ips.insert(peer_id, peer_ip);
}

/// Returns `true` if the peer with the given IP is in the recent peers set.
///
/// Removes the peer from the set if it is expired.
pub fn contains(&mut self, peer_ip: &IpAddr) -> bool {
match self.instants.get(peer_ip) {
match self.peers.get(peer_ip) {
Some(instant) if instant.elapsed() < self.timeout => true,
_ => {
self.instants.remove(peer_ip);
self.peers.remove(peer_ip);
false
}
}
}

/// Removes the peer from the set if it is expired.
pub fn remove_if_expired(&mut self, peer_id: PeerId) {
if let Some(&ip) = self.ips.get(&peer_id) {
// The contains method removes the peer IP if it is expired.
if !self.contains(&ip) {
// The peer IP is removed from the set, so we need to remove the
// peer ID mapping as well.
self.ips.remove(&peer_id);
}
}
pub fn remove_if_expired(&mut self, peer_ip: &IpAddr) {
// The contains method removes the peer IP if it is expired.
self.contains(peer_ip);
}
}
6 changes: 4 additions & 2 deletions crates/p2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ async fn periodic_bootstrap() {
period: Duration::from_millis(500),
start_offset: Duration::from_secs(1),
},
connection_timeout: Duration::from_millis(500),
direct_connection_timeout: Duration::from_millis(500),
relay_connection_timeout: Duration::from_millis(500),
};
let mut boot = TestPeer::new(periodic_cfg);
let mut peer1 = TestPeer::new(periodic_cfg);
Expand Down Expand Up @@ -297,7 +298,8 @@ async fn reconnect_too_quickly() {
// Bootstrapping can cause redials, so set the offset to a high value.
start_offset: Duration::from_secs(3),
},
connection_timeout: CONNECTION_TIMEOUT,
direct_connection_timeout: CONNECTION_TIMEOUT,
relay_connection_timeout: Duration::from_millis(500),
};

let mut peer1 = TestPeer::new(periodic_cfg);
Expand Down

0 comments on commit 08d0ab2

Please sign in to comment.