From 0585f3cdf8f15b9edc983530b00a20942c4ab643 Mon Sep 17 00:00:00 2001 From: sistemd Date: Mon, 22 Jan 2024 12:28:52 +0100 Subject: [PATCH 1/5] max inbound peer connections limit --- crates/p2p/src/lib.rs | 23 +++- crates/p2p/src/main_loop.rs | 45 ++++++- crates/p2p/src/tests.rs | 189 +++++++++++++++++++++++++-- crates/pathfinder/src/p2p_network.rs | 9 +- 4 files changed, 245 insertions(+), 21 deletions(-) diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 20d96b4018..953194e890 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -44,6 +44,7 @@ pub fn new( keypair: Keypair, peers: Arc>, periodic_cfg: PeriodicTaskConfig, + limits_cfg: LimitsConfig, chain_id: ChainId, ) -> (Client, EventReceiver, MainLoop) { let local_peer_id = keypair.public().to_peer_id(); @@ -78,6 +79,7 @@ pub fn new( event_sender, peers, periodic_cfg, + limits_cfg, chain_id, ), ) @@ -86,10 +88,29 @@ pub fn new( #[derive(Copy, Clone, Debug)] pub struct PeriodicTaskConfig { pub bootstrap: BootstrapConfig, +} + +#[derive(Copy, Clone, Debug)] +pub struct LimitsConfig { /// A direct (not relayed) peer can only connect once in this period. pub direct_connection_timeout: Duration, /// A relayed peer can only connect once in this period. pub relay_connection_timeout: Duration, + /// Maximum number of direct (non-relayed) peers. + pub max_inbound_direct_peers: usize, + /// Maximum number of relayed peers. + pub max_inbound_relay_peers: usize, +} + +impl Default for LimitsConfig { + fn default() -> Self { + Self { + direct_connection_timeout: Duration::from_secs(30), + relay_connection_timeout: Duration::from_secs(10), + max_inbound_direct_peers: 35, + max_inbound_relay_peers: 15, + } + } } #[derive(Copy, Clone, Debug)] @@ -105,8 +126,6 @@ impl Default for PeriodicTaskConfig { start_offset: Duration::from_secs(5), period: Duration::from_secs(10 * 60), }, - direct_connection_timeout: Duration::from_secs(30), - relay_connection_timeout: Duration::from_secs(10), } } } diff --git a/crates/p2p/src/main_loop.rs b/crates/p2p/src/main_loop.rs index 0792b2abaa..7763602207 100644 --- a/crates/p2p/src/main_loop.rs +++ b/crates/p2p/src/main_loop.rs @@ -21,17 +21,18 @@ use pathfinder_common::ChainId; use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::time::Duration; -use crate::behaviour; use crate::peers; use crate::recent_peers::RecentPeers; #[cfg(test)] use crate::test_utils; +use crate::{behaviour, LimitsConfig}; use crate::{ BootstrapConfig, Command, EmptyResultSender, Event, PeriodicTaskConfig, TestCommand, TestEvent, }; pub struct MainLoop { bootstrap_cfg: BootstrapConfig, + limits_cfg: LimitsConfig, swarm: libp2p::swarm::Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, @@ -42,6 +43,10 @@ pub struct MainLoop { recent_direct_peers: RecentPeers, /// Recent peers that have connected over a relay. recent_relay_peers: RecentPeers, + /// Number of peers that have inbound connections open to us directly (not over a relay). + inbound_direct_peers: usize, + /// Number of peers that have inbound connections open to us over a relay. + inbound_relay_peers: usize, pending_dials: HashMap, pending_sync_requests: PendingRequests, // TODO there's no sync status message anymore so we have to: @@ -89,12 +94,16 @@ impl MainLoop { event_sender: mpsc::Sender, peers: Arc>, periodic_cfg: PeriodicTaskConfig, + limits_cfg: LimitsConfig, chain_id: ChainId, ) -> Self { Self { bootstrap_cfg: periodic_cfg.bootstrap, - recent_direct_peers: RecentPeers::new(periodic_cfg.direct_connection_timeout), - recent_relay_peers: RecentPeers::new(periodic_cfg.relay_connection_timeout), + limits_cfg, + recent_direct_peers: RecentPeers::new(limits_cfg.direct_connection_timeout), + recent_relay_peers: RecentPeers::new(limits_cfg.relay_connection_timeout), + inbound_direct_peers: 0, + inbound_relay_peers: 0, swarm, command_receiver, event_sender, @@ -209,13 +218,37 @@ impl MainLoop { return; } - // Prevent the peer from reconnecting too quickly. - - // Different connection timeouts apply to direct peers and peers connecting over a relay. + // Is the peer connecting over a relay? let is_relay = endpoint .get_remote_address() .iter() .any(|p| p == Protocol::P2pCircuit); + + // Limit the number of inbound peer connections. Different limits apply to direct peers + // and peers connecting over a relay. + if is_relay { + if self.inbound_relay_peers >= self.limits_cfg.max_inbound_relay_peers { + tracing::debug!(%peer_id, "Too many inbound relay peers, closing"); + if let Err(e) = self.disconnect(peer_id).await { + tracing::debug!(%e, "Failed to disconnect peer"); + } + return; + } + self.inbound_relay_peers += 1; + } else { + if self.inbound_direct_peers >= self.limits_cfg.max_inbound_direct_peers { + tracing::debug!(%peer_id, "Too many inbound direct peers, closing"); + if let Err(e) = self.disconnect(peer_id).await { + tracing::debug!(%e, "Failed to disconnect peer"); + } + return; + } + self.inbound_direct_peers += 1; + } + + // Prevent the peer from reconnecting too quickly. + + // Different connection timeouts apply to direct peers and peers connecting over a relay. let recent_peers = if is_relay { &mut self.recent_relay_peers } else { diff --git a/crates/p2p/src/tests.rs b/crates/p2p/src/tests.rs index a099614147..1b12104720 100644 --- a/crates/p2p/src/tests.rs +++ b/crates/p2p/src/tests.rs @@ -21,7 +21,9 @@ use rstest::rstest; use tokio::sync::RwLock; use tokio::task::JoinHandle; -use crate::{BootstrapConfig, Event, EventReceiver, Peers, PeriodicTaskConfig, TestEvent}; +use crate::{ + BootstrapConfig, Event, EventReceiver, LimitsConfig, Peers, PeriodicTaskConfig, TestEvent, +}; #[allow(dead_code)] #[derive(Debug)] @@ -36,13 +38,18 @@ struct TestPeer { impl TestPeer { #[must_use] - pub fn new(periodic_cfg: PeriodicTaskConfig, keypair: Keypair) -> Self { + pub fn new( + periodic_cfg: PeriodicTaskConfig, + limits_cfg: LimitsConfig, + keypair: Keypair, + ) -> Self { let peer_id = keypair.public().to_peer_id(); let peers: Arc> = Default::default(); let (client, event_receiver, main_loop) = crate::new( keypair.clone(), peers.clone(), periodic_cfg, + limits_cfg, ChainId::GOERLI_TESTNET, ); let main_loop_jh = tokio::spawn(main_loop.run()); @@ -94,7 +101,11 @@ impl TestPeer { impl Default for TestPeer { fn default() -> Self { - Self::new(Default::default(), Keypair::generate_ed25519()) + Self::new( + Default::default(), + Default::default(), + Keypair::generate_ed25519(), + ) } } @@ -233,12 +244,15 @@ async fn periodic_bootstrap() { period: Duration::from_millis(500), start_offset: Duration::from_secs(1), }, + }; + let limits_cfg = LimitsConfig { direct_connection_timeout: Duration::from_millis(500), relay_connection_timeout: Duration::from_millis(500), + ..Default::default() }; - let mut boot = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); - let mut peer1 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); - let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); + let mut boot = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); + let mut peer1 = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); + let mut peer2 = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); let mut boot_addr = boot.start_listening().await.unwrap(); boot_addr.push(Protocol::P2p(boot.peer_id)); @@ -297,12 +311,15 @@ async fn reconnect_too_quickly() { // Bootstrapping can cause redials, so set the offset to a high value. start_offset: Duration::from_secs(10), }, + }; + let limits_cfg = LimitsConfig { direct_connection_timeout: CONNECTION_TIMEOUT, relay_connection_timeout: Duration::from_millis(500), + ..Default::default() }; - let mut peer1 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); - let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); + let mut peer1 = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); + let mut peer2 = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); let addr2 = peer2.start_listening().await.unwrap(); tracing::info!(%peer2.peer_id, %addr2); @@ -404,13 +421,16 @@ async fn duplicate_connection() { // Bootstrapping can cause redials, so set the offset to a high value. start_offset: Duration::from_secs(10), }, + }; + let limits_cfg = LimitsConfig { direct_connection_timeout: CONNECTION_TIMEOUT, relay_connection_timeout: Duration::from_millis(500), + ..Default::default() }; let keypair = Keypair::generate_ed25519(); - let mut peer1 = TestPeer::new(periodic_cfg, keypair.clone()); - let mut peer1_copy = TestPeer::new(periodic_cfg, keypair); - let mut peer2 = TestPeer::new(periodic_cfg, Keypair::generate_ed25519()); + let mut peer1 = TestPeer::new(periodic_cfg, limits_cfg, keypair.clone()); + let mut peer1_copy = TestPeer::new(periodic_cfg, limits_cfg, keypair); + let mut peer2 = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); let addr2 = peer2.start_listening().await.unwrap(); tracing::info!(%peer2.peer_id, %addr2); @@ -469,6 +489,153 @@ async fn duplicate_connection() { assert!(peer1_copy.connected().await.is_empty()); } +/// Test that each peer accepts at most one connection from any other peer, and duplicate +/// connections are closed. +#[test_log::test(tokio::test)] +async fn max_inbound_connections() { + const CONNECTION_TIMEOUT: Duration = Duration::from_millis(50); + let periodic_cfg = PeriodicTaskConfig { + bootstrap: BootstrapConfig { + period: Duration::from_millis(500), + // Bootstrapping can cause redials, so set the offset to a high value. + start_offset: Duration::from_secs(10), + }, + }; + let limits_cfg = LimitsConfig { + direct_connection_timeout: CONNECTION_TIMEOUT, + relay_connection_timeout: Duration::from_millis(500), + max_inbound_direct_peers: 2, + ..Default::default() + }; + let mut peer1 = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); + let mut peer2 = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); + let mut peer3 = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); + let mut peer4 = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); + + let addr1 = peer1.start_listening().await.unwrap(); + tracing::info!(%peer1.peer_id, %addr1); + let addr4 = peer4.start_listening().await.unwrap(); + tracing::info!(%peer4.peer_id, %addr4); + + // Open the connection. + peer2 + .client + .dial(peer1.peer_id, addr1.clone()) + .await + .unwrap(); + + wait_for_event(&mut peer1.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer2.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + wait_for_event(&mut peer2.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer1.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + // Ensure that the connection timeout has passed, so this is not the reason why the connection + // would be closed. + tokio::time::sleep(CONNECTION_TIMEOUT).await; + + // Open another inbound connection to the peer. Since the limit is 2, this is allowed. + peer3 + .client + .dial(peer1.peer_id, addr1.clone()) + .await + .unwrap(); + + wait_for_event(&mut peer1.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer3.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + wait_for_event(&mut peer3.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer1.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + // Ensure that the connection timeout has passed, so this is not the reason why the connection + // would be closed. + tokio::time::sleep(CONNECTION_TIMEOUT).await; + + // Open another inbound connection to the peer. Since the limit is 2, and there are already 2 + // inbound connections, this is not allowed. + peer4 + .client + .dial(peer1.peer_id, addr1.clone()) + .await + .unwrap(); + + wait_for_event(&mut peer4.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer1.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + wait_for_event(&mut peer4.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionClosed { remote, .. }) if remote == peer1.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + wait_for_event(&mut peer1.event_receiver, |event| match event { + Event::Test(TestEvent::ConnectionClosed { remote, .. }) if remote == peer4.peer_id => { + Some(()) + } + _ => None, + }) + .await; + + assert!(peer4.connected().await.is_empty()); + + // The restriction does not apply to inbound connections, so peer 1 can still open a connection + // to peer 4. + + let peer4_id = peer4.peer_id; + let mut peer_1_connection_established = + filter_events(peer1.event_receiver, move |event| match event { + Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer4_id => { + Some(()) + } + _ => None, + }); + + let peer1_id = peer1.peer_id; + let mut peer_4_connection_established = + filter_events(peer4.event_receiver, move |event| match event { + Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer1_id => { + Some(()) + } + _ => None, + }); + + peer1 + .client + .dial(peer4.peer_id, dbg!(addr4.clone())) + .await + .unwrap(); + + peer_1_connection_established.recv().await; + peer_4_connection_established.recv().await; +} + #[rstest] #[case::server_to_client(server_to_client().await)] #[case::client_to_server(client_to_server().await)] diff --git a/crates/pathfinder/src/p2p_network.rs b/crates/pathfinder/src/p2p_network.rs index 7dcdf1f9a3..1e86014cc0 100644 --- a/crates/pathfinder/src/p2p_network.rs +++ b/crates/pathfinder/src/p2p_network.rs @@ -48,8 +48,13 @@ pub async fn start(context: P2PContext) -> anyhow::Result { tracing::info!(%peer_id, "🖧 Starting P2P"); let peers: Arc> = Arc::new(RwLock::new(Default::default())); - let (p2p_client, mut p2p_events, p2p_main_loop) = - p2p::new(keypair, peers.clone(), Default::default(), chain_id); + let (p2p_client, mut p2p_events, p2p_main_loop) = p2p::new( + keypair, + peers.clone(), + Default::default(), + Default::default(), + chain_id, + ); let mut main_loop_handle = { let span = tracing::info_span!("behaviour"); From 13d3138c2d69062e587f63cc35d5d59836b4166e Mon Sep 17 00:00:00 2001 From: sistemd Date: Mon, 22 Jan 2024 14:56:12 +0100 Subject: [PATCH 2/5] typo Co-authored-by: KOVACS Krisztian --- crates/p2p/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/p2p/src/tests.rs b/crates/p2p/src/tests.rs index 1b12104720..c414ad2955 100644 --- a/crates/p2p/src/tests.rs +++ b/crates/p2p/src/tests.rs @@ -605,7 +605,7 @@ async fn max_inbound_connections() { assert!(peer4.connected().await.is_empty()); - // The restriction does not apply to inbound connections, so peer 1 can still open a connection + // The restriction does not apply to outbound connections, so peer 1 can still open a connection // to peer 4. let peer4_id = peer4.peer_id; From 37e16b1c575b516a8ff4f580d740c9701ebccdc9 Mon Sep 17 00:00:00 2001 From: sistemd Date: Mon, 22 Jan 2024 18:27:42 +0100 Subject: [PATCH 3/5] wip refactor --- crates/p2p/src/behaviour.rs | 272 +++++++++++++++++++++++++++++++---- crates/p2p/src/lib.rs | 3 +- crates/p2p/src/main_loop.rs | 132 ++--------------- crates/p2p/src/test_utils.rs | 2 +- crates/p2p/src/tests.rs | 1 + 5 files changed, 258 insertions(+), 152 deletions(-) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 462e68cbc8..8066271b52 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -1,18 +1,28 @@ use std::collections::hash_map::DefaultHasher; +use std::collections::HashSet; use std::hash::{Hash, Hasher}; +use std::net::IpAddr; use std::time::Duration; +use crate::recent_peers::RecentPeers; use crate::sync::codec; -use libp2p::autonat; +use crate::LimitsConfig; +use anyhow::anyhow; +use libp2p::core::Endpoint; use libp2p::dcutr; use libp2p::gossipsub::{self, IdentTopic, MessageAuthenticity, MessageId}; use libp2p::identify; use libp2p::identity; use libp2p::kad::{self, store::MemoryStore}; +use libp2p::multiaddr::Protocol; use libp2p::ping; use libp2p::relay; -use libp2p::swarm::NetworkBehaviour; +use libp2p::swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, +}; use libp2p::StreamProtocol; +use libp2p::{autonat, Multiaddr, PeerId}; use p2p_proto::block::{ BlockBodiesRequest, BlockBodiesResponse, BlockHeadersRequest, BlockHeadersResponse, }; @@ -20,34 +30,187 @@ use p2p_proto::event::{EventsRequest, EventsResponse}; use p2p_proto::receipt::{ReceiptsRequest, ReceiptsResponse}; use p2p_proto::transaction::{TransactionsRequest, TransactionsResponse}; use pathfinder_common::ChainId; +use std::task; + +pub const IDENTIFY_PROTOCOL_NAME: &str = "/starknet/id/1.0.0"; + +pub fn kademlia_protocol_name(chain_id: ChainId) -> String { + format!("/starknet/kad/{}/1.0.0", chain_id.to_hex_str()) +} + +pub struct Behaviour { + limits: LimitsConfig, + /// Recent peers that have connected to us directly (not over a relay). + /// + /// The distinction is important because different limits apply to direct and relayed peers. + recent_inbound_direct_peers: RecentPeers, + /// Recent peers that have connected to us over a relay. + recent_inbound_relay_peers: RecentPeers, + /// Number of peers that have inbound connections open to us directly (not over a relay). + inbound_direct_peers: usize, + /// Number of peers that have inbound connections open to us over a relay. + inbound_relay_peers: usize, + /// Peers connected to our node. Used for closing duplicate connections. + connected_peers: HashSet, + inner: Inner, +} #[derive(NetworkBehaviour)] #[behaviour(to_swarm = "Event", event_process = false)] -pub struct Behaviour { +pub struct Inner { relay: relay::client::Behaviour, autonat: autonat::Behaviour, dcutr: dcutr::Behaviour, ping: ping::Behaviour, identify: identify::Behaviour, - pub kademlia: kad::Behaviour, - pub gossipsub: gossipsub::Behaviour, - pub headers_sync: p2p_stream::Behaviour, - pub bodies_sync: p2p_stream::Behaviour, - pub transactions_sync: p2p_stream::Behaviour, - pub receipts_sync: p2p_stream::Behaviour, - pub events_sync: p2p_stream::Behaviour, + kademlia: kad::Behaviour, + gossipsub: gossipsub::Behaviour, + headers_sync: p2p_stream::Behaviour, + bodies_sync: p2p_stream::Behaviour, + transactions_sync: p2p_stream::Behaviour, + receipts_sync: p2p_stream::Behaviour, + events_sync: p2p_stream::Behaviour, } -pub const IDENTIFY_PROTOCOL_NAME: &str = "/starknet/id/1.0.0"; +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = ::ConnectionHandler; + type ToSwarm = ::ToSwarm; -pub fn kademlia_protocol_name(chain_id: ChainId) -> String { - format!("/starknet/kad/{}/1.0.0", chain_id.to_hex_str()) + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.on_connection_established(peer)?; + self.inner.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + self.on_connection_established(peer)?; + self.inner + .handle_established_outbound_connection(connection_id, peer, addr, role_override) + } + + fn on_swarm_event(&mut self, event: FromSwarm<'_>) { + self.inner.on_swarm_event(event) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + self.inner + .on_connection_handler_event(peer_id, connection_id, event) + } + + fn poll( + &mut self, + cx: &mut task::Context<'_>, + ) -> task::Poll>> { + self.inner.poll(cx) + } + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + // Extract the IP address of the peer from his multiaddr. + let peer_ip = remote_addr.iter().find_map(|p| match p { + Protocol::Ip4(ip) => Some(IpAddr::V4(ip)), + Protocol::Ip6(ip) => Some(IpAddr::V6(ip)), + _ => None, + }); + + // If the peer has no IP address, disconnect. + let Some(peer_ip) = peer_ip else { + tracing::debug!(%connection_id, "Disconnected peer without IP"); + return Err(ConnectionDenied::new(anyhow!("peer without IP"))); + }; + + // Prevent the peer from reconnecting too quickly. + + // Is the peer connecting over a relay? + let is_relay = remote_addr.iter().any(|p| p == Protocol::P2pCircuit); + + // Different connection timeouts apply to direct peers and peers connecting over a relay. + let recent_peers = if is_relay { + &mut self.recent_inbound_relay_peers + } else { + &mut self.recent_inbound_direct_peers + }; + + // If the peer is in the recent peers set, this means he is attempting to + // reconnect too quickly. Close the connection. + if recent_peers.contains(&peer_ip) { + tracing::debug!(%connection_id, "Peer attempted to reconnect too quickly, closing"); + return Err(ConnectionDenied::new(anyhow!("reconnect too quickly"))); + } else { + // Otherwise, add the peer to the recent peers set. + recent_peers.insert(peer_ip); + } + + // Limit the number of inbound peer connections. Different limits apply to direct peers + // and peers connecting over a relay. + if is_relay { + if self.inbound_relay_peers >= self.limits.max_inbound_relay_peers { + tracing::debug!(%connection_id, "Too many inbound relay peers, closing"); + return Err(ConnectionDenied::new(anyhow!( + "too many inbound relay peers" + ))); + } + self.inbound_relay_peers += 1; + } else { + if self.inbound_direct_peers >= self.limits.max_inbound_direct_peers { + tracing::debug!(%connection_id, "Too many inbound direct peers, closing"); + return Err(ConnectionDenied::new(anyhow!( + "too many inbound direct peers" + ))); + } + self.inbound_direct_peers += 1; + } + + self.inner + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) + } + + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[Multiaddr], + effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + self.inner.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + ) + } } impl Behaviour { pub fn new( identity: &identity::Keypair, chain_id: ChainId, + limits: LimitsConfig, ) -> (Self, relay::client::Transport) { const PROVIDER_PUBLICATION_INTERVAL: Duration = Duration::from_secs(600); @@ -94,21 +257,32 @@ impl Behaviour { ( Self { - relay, - autonat: autonat::Behaviour::new(peer_id, Default::default()), - dcutr: dcutr::Behaviour::new(peer_id), - ping: ping::Behaviour::new(ping::Config::new()), - identify: identify::Behaviour::new( - identify::Config::new(IDENTIFY_PROTOCOL_NAME.to_string(), identity.public()) + limits, + recent_inbound_direct_peers: RecentPeers::new(limits.direct_connection_timeout), + recent_inbound_relay_peers: RecentPeers::new(limits.relay_connection_timeout), + inbound_direct_peers: Default::default(), + inbound_relay_peers: Default::default(), + connected_peers: Default::default(), + inner: Inner { + relay, + autonat: autonat::Behaviour::new(peer_id, Default::default()), + dcutr: dcutr::Behaviour::new(peer_id), + ping: ping::Behaviour::new(ping::Config::new()), + identify: identify::Behaviour::new( + identify::Config::new( + IDENTIFY_PROTOCOL_NAME.to_string(), + identity.public(), + ) .with_agent_version(format!("pathfinder/{}", env!("CARGO_PKG_VERSION"))), - ), - kademlia, - gossipsub, - headers_sync, - bodies_sync, - transactions_sync, - receipts_sync, - events_sync, + ), + kademlia, + gossipsub, + headers_sync, + bodies_sync, + transactions_sync, + receipts_sync, + events_sync, + }, }, relay_transport, ) @@ -116,19 +290,57 @@ impl Behaviour { pub fn provide_capability(&mut self, capability: &str) -> anyhow::Result<()> { let key = string_to_key(capability); - self.kademlia.start_providing(key)?; + self.inner.kademlia.start_providing(key)?; Ok(()) } pub fn get_capability_providers(&mut self, capability: &str) -> kad::QueryId { let key = string_to_key(capability); - self.kademlia.get_providers(key) + self.inner.kademlia.get_providers(key) } pub fn subscribe_topic(&mut self, topic: &IdentTopic) -> anyhow::Result<()> { - self.gossipsub.subscribe(topic)?; + self.inner.gossipsub.subscribe(topic)?; Ok(()) } + + fn on_connection_established(&mut self, peer_id: PeerId) -> Result<(), ConnectionDenied> { + // Only allow one connection per peer. + if self.connected_peers.contains(&peer_id) { + tracing::debug!(%peer_id, "Peer already connected, closing"); + return Err(ConnectionDenied::new(anyhow!("duplicate connection"))); + } + self.connected_peers.insert(peer_id.clone()); + Ok(()) + } + + pub fn kademlia_mut(&mut self) -> &mut kad::Behaviour { + &mut self.inner.kademlia + } + + pub fn gossipsub_mut(&mut self) -> &mut gossipsub::Behaviour { + &mut self.inner.gossipsub + } + + pub fn headers_sync_mut(&mut self) -> &mut p2p_stream::Behaviour { + &mut self.inner.headers_sync + } + + pub fn bodies_sync_mut(&mut self) -> &mut p2p_stream::Behaviour { + &mut self.inner.bodies_sync + } + + pub fn transactions_sync_mut(&mut self) -> &mut p2p_stream::Behaviour { + &mut self.inner.transactions_sync + } + + pub fn receipts_sync_mut(&mut self) -> &mut p2p_stream::Behaviour { + &mut self.inner.receipts_sync + } + + pub fn events_sync_mut(&mut self) -> &mut p2p_stream::Behaviour { + &mut self.inner.events_sync + } } fn request_response_behavior() -> p2p_stream::Behaviour diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 953194e890..9488b9171a 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -49,7 +49,7 @@ pub fn new( ) -> (Client, EventReceiver, MainLoop) { let local_peer_id = keypair.public().to_peer_id(); - let (behaviour, relay_transport) = behaviour::Behaviour::new(&keypair, chain_id); + let (behaviour, relay_transport) = behaviour::Behaviour::new(&keypair, chain_id, limits_cfg); let swarm = Swarm::new( transport::create(&keypair, relay_transport), @@ -79,7 +79,6 @@ pub fn new( event_sender, peers, periodic_cfg, - limits_cfg, chain_id, ), ) diff --git a/crates/p2p/src/main_loop.rs b/crates/p2p/src/main_loop.rs index 7763602207..e77d014940 100644 --- a/crates/p2p/src/main_loop.rs +++ b/crates/p2p/src/main_loop.rs @@ -1,16 +1,15 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; -use std::net::IpAddr; use std::sync::Arc; use futures::{channel::mpsc::Receiver as ResponseReceiver, StreamExt}; use libp2p::gossipsub::{self, IdentTopic}; +use libp2p::identify; use libp2p::kad::{self, BootstrapError, BootstrapOk, QueryId, QueryResult}; use libp2p::multiaddr::Protocol; use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::SwarmEvent; use libp2p::PeerId; -use libp2p::{identify, Multiaddr}; use p2p_proto::block::{BlockBodiesResponse, BlockHeadersResponse}; use p2p_proto::event::EventsResponse; use p2p_proto::receipt::ReceiptsResponse; @@ -21,32 +20,20 @@ use pathfinder_common::ChainId; use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::time::Duration; +use crate::behaviour; use crate::peers; -use crate::recent_peers::RecentPeers; #[cfg(test)] use crate::test_utils; -use crate::{behaviour, LimitsConfig}; use crate::{ BootstrapConfig, Command, EmptyResultSender, Event, PeriodicTaskConfig, TestCommand, TestEvent, }; pub struct MainLoop { bootstrap_cfg: BootstrapConfig, - limits_cfg: LimitsConfig, swarm: libp2p::swarm::Swarm, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, peers: Arc>, - /// Recent peers that have connected directly (not over a relay). - /// - /// The distinction is important because different limits apply to direct and relayed peers. - recent_direct_peers: RecentPeers, - /// Recent peers that have connected over a relay. - recent_relay_peers: RecentPeers, - /// Number of peers that have inbound connections open to us directly (not over a relay). - inbound_direct_peers: usize, - /// Number of peers that have inbound connections open to us over a relay. - inbound_relay_peers: usize, pending_dials: HashMap, pending_sync_requests: PendingRequests, // TODO there's no sync status message anymore so we have to: @@ -94,16 +81,10 @@ impl MainLoop { event_sender: mpsc::Sender, peers: Arc>, periodic_cfg: PeriodicTaskConfig, - limits_cfg: LimitsConfig, chain_id: ChainId, ) -> Self { Self { bootstrap_cfg: periodic_cfg.bootstrap, - limits_cfg, - recent_direct_peers: RecentPeers::new(limits_cfg.direct_connection_timeout), - recent_relay_peers: RecentPeers::new(limits_cfg.relay_connection_timeout), - inbound_direct_peers: 0, - inbound_relay_peers: 0, swarm, command_receiver, event_sender, @@ -146,7 +127,7 @@ impl MainLoop { tracing::info!(%num_peers, %num_established_connections, %num_pending_connections, "Network status") } _ = peer_status_interval_tick => { - let dht = self.swarm.behaviour_mut().kademlia + let dht = self.swarm.behaviour_mut().kademlia_mut() .kbuckets() // Cannot .into_iter() a KBucketRef, hence the inner collect followed by flat_map .map(|kbucket_ref| { @@ -169,7 +150,7 @@ impl MainLoop { } _ = bootstrap_interval_tick => { tracing::debug!("Doing periodical bootstrap"); - _ = self.swarm.behaviour_mut().kademlia.bootstrap(); + _ = self.swarm.behaviour_mut().kademlia_mut().bootstrap(); } command = self.command_receiver.recv() => { match command { @@ -188,87 +169,8 @@ impl MainLoop { // Connection management // =========================== SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - .. + peer_id, endpoint, .. } => { - // Extract the IP address of the peer from his multiaddr. - let peer_ip = get_ip(endpoint.get_remote_address()); - - // If the peer has no IP address, disconnect. - let Some(peer_ip) = peer_ip else { - if let Err(e) = self.disconnect(peer_id).await { - tracing::debug!(reason=%e, %peer_id, "Failed to disconnect peer without IP"); - } else { - tracing::debug!(%peer_id, "Disconnected peer without IP"); - } - return; - }; - - if endpoint.is_listener() { - // This is an incoming connection. - - // Only allow one connection per peer. - if num_established.get() > 1 { - tracing::debug!(%peer_id, "Peer has more than one connection, closing"); - if let Err(e) = self.disconnect(peer_id).await { - tracing::debug!(%e, "Failed to disconnect peer"); - } - return; - } - - // Is the peer connecting over a relay? - let is_relay = endpoint - .get_remote_address() - .iter() - .any(|p| p == Protocol::P2pCircuit); - - // Limit the number of inbound peer connections. Different limits apply to direct peers - // and peers connecting over a relay. - if is_relay { - if self.inbound_relay_peers >= self.limits_cfg.max_inbound_relay_peers { - tracing::debug!(%peer_id, "Too many inbound relay peers, closing"); - if let Err(e) = self.disconnect(peer_id).await { - tracing::debug!(%e, "Failed to disconnect peer"); - } - return; - } - self.inbound_relay_peers += 1; - } else { - if self.inbound_direct_peers >= self.limits_cfg.max_inbound_direct_peers { - tracing::debug!(%peer_id, "Too many inbound direct peers, closing"); - if let Err(e) = self.disconnect(peer_id).await { - tracing::debug!(%e, "Failed to disconnect peer"); - } - return; - } - self.inbound_direct_peers += 1; - } - - // Prevent the peer from reconnecting too quickly. - - // Different connection timeouts apply to direct peers and peers connecting over a relay. - let recent_peers = if is_relay { - &mut self.recent_relay_peers - } else { - &mut self.recent_direct_peers - }; - - // If the peer is in the recent peers set, this means he is attempting to - // reconnect too quickly. Close the connection. - if recent_peers.contains(&peer_ip) { - tracing::debug!(%peer_id, "Peer attempted to reconnect too quickly, closing"); - if let Err(e) = self.disconnect(peer_id).await { - tracing::debug!(%e, "Failed to disconnect peer"); - } - return; - } else { - // Otherwise, add the peer to the recent peers set. - recent_peers.insert(peer_ip); - } - } - self.peers.write().await.peer_connected(&peer_id); if endpoint.is_dialer() { @@ -367,7 +269,7 @@ impl MainLoop { for addr in &listen_addrs { self.swarm .behaviour_mut() - .kademlia + .kademlia_mut() .add_address(&peer_id, addr.clone()); } @@ -804,7 +706,7 @@ impl MainLoop { { self.swarm .behaviour_mut() - .kademlia + .kademlia_mut() .add_address(&peer_id, addr.clone()); match self.swarm.dial( // Dial a known peer with a given address only if it's not connected yet @@ -863,7 +765,7 @@ impl MainLoop { let request_id = self .swarm .behaviour_mut() - .headers_sync + .headers_sync_mut() .send_request(&peer_id, request); self.pending_sync_requests .headers @@ -879,7 +781,7 @@ impl MainLoop { let request_id = self .swarm .behaviour_mut() - .bodies_sync + .bodies_sync_mut() .send_request(&peer_id, request); self.pending_sync_requests.bodies.insert(request_id, sender); } @@ -893,7 +795,7 @@ impl MainLoop { let request_id = self .swarm .behaviour_mut() - .transactions_sync + .transactions_sync_mut() .send_request(&peer_id, request); self.pending_sync_requests .transactions @@ -909,7 +811,7 @@ impl MainLoop { let request_id = self .swarm .behaviour_mut() - .receipts_sync + .receipts_sync_mut() .send_request(&peer_id, request); self.pending_sync_requests .receipts @@ -925,7 +827,7 @@ impl MainLoop { let request_id = self .swarm .behaviour_mut() - .events_sync + .events_sync_mut() .send_request(&peer_id, request); self.pending_sync_requests.events.insert(request_id, sender); } @@ -947,7 +849,7 @@ impl MainLoop { let message_id = self .swarm .behaviour_mut() - .gossipsub + .gossipsub_mut() .publish(topic, data) .map_err(|e| anyhow::anyhow!("Gossipsub publish failed: {}", e))?; tracing::debug!(?message_id, "Data published"); @@ -1001,14 +903,6 @@ impl MainLoop { } } -fn get_ip(addr: &Multiaddr) -> Option { - addr.iter().find_map(|p| match p { - Protocol::Ip4(ip) => Some(IpAddr::V4(ip)), - Protocol::Ip6(ip) => Some(IpAddr::V6(ip)), - _ => None, - }) -} - /// No-op outside tests async fn send_test_event(_event_sender: &mpsc::Sender, _event: TestEvent) { #[cfg(test)] diff --git a/crates/p2p/src/test_utils.rs b/crates/p2p/src/test_utils.rs index 9dc4c30065..40741a30f7 100644 --- a/crates/p2p/src/test_utils.rs +++ b/crates/p2p/src/test_utils.rs @@ -63,7 +63,7 @@ pub(super) async fn handle_command( match command { TestCommand::GetPeersFromDHT(sender) => { let peers = behavior - .kademlia + .kademlia_mut() .kbuckets() // Cannot .into_iter() a KBucketRef, hence the inner collect followed by flat_map .map(|kbucket_ref| { diff --git a/crates/p2p/src/tests.rs b/crates/p2p/src/tests.rs index c414ad2955..774208b68a 100644 --- a/crates/p2p/src/tests.rs +++ b/crates/p2p/src/tests.rs @@ -487,6 +487,7 @@ async fn duplicate_connection() { .await; assert!(peer1_copy.connected().await.is_empty()); + assert!(peer1.connected().await.contains(&peer2.peer_id)); } /// Test that each peer accepts at most one connection from any other peer, and duplicate From 5b7217eb06df2546b5a20b3ba23e5fc0c043409c Mon Sep 17 00:00:00 2001 From: sistemd Date: Tue, 23 Jan 2024 11:25:48 +0100 Subject: [PATCH 4/5] close connections before protocol negotiation when possible --- crates/p2p/src/behaviour.rs | 55 ++++++++++++++++++++++++++++++---- crates/p2p/src/tests.rs | 59 +++++-------------------------------- 2 files changed, 57 insertions(+), 57 deletions(-) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 8066271b52..92730d6e9e 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -18,8 +18,8 @@ use libp2p::multiaddr::Protocol; use libp2p::ping; use libp2p::relay; use libp2p::swarm::{ - ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, - THandlerOutEvent, ToSwarm, + ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, }; use libp2p::StreamProtocol; use libp2p::{autonat, Multiaddr, PeerId}; @@ -84,6 +84,30 @@ impl NetworkBehaviour for Behaviour { remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { self.on_connection_established(peer)?; + + // Is the peer connecting over a relay? + let is_relay = remote_addr.iter().any(|p| p == Protocol::P2pCircuit); + + // Limit the number of inbound peer connections. Different limits apply to direct peers + // and peers connecting over a relay. + if is_relay { + if self.inbound_relay_peers >= self.limits.max_inbound_relay_peers { + tracing::debug!(%connection_id, "Too many inbound relay peers, closing"); + return Err(ConnectionDenied::new(anyhow!( + "too many inbound relay peers" + ))); + } + self.inbound_relay_peers += 1; + } else { + if self.inbound_direct_peers >= self.limits.max_inbound_direct_peers { + tracing::debug!(%connection_id, "Too many inbound direct peers, closing"); + return Err(ConnectionDenied::new(anyhow!( + "too many inbound direct peers" + ))); + } + self.inbound_direct_peers += 1; + } + self.inner.handle_established_inbound_connection( connection_id, peer, @@ -105,6 +129,20 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: FromSwarm<'_>) { + if let FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, endpoint, .. + }) = event + { + // Is this an inbound connection? + if endpoint.is_listener() { + if endpoint.is_relayed() { + self.inbound_relay_peers -= 1; + } else { + self.inbound_direct_peers -= 1; + } + } + self.connected_peers.remove(&peer_id); + } self.inner.on_swarm_event(event) } @@ -144,11 +182,11 @@ impl NetworkBehaviour for Behaviour { return Err(ConnectionDenied::new(anyhow!("peer without IP"))); }; - // Prevent the peer from reconnecting too quickly. - // Is the peer connecting over a relay? let is_relay = remote_addr.iter().any(|p| p == Protocol::P2pCircuit); + // Prevent the peer from reconnecting too quickly. + // // Different connection timeouts apply to direct peers and peers connecting over a relay. let recent_peers = if is_relay { &mut self.recent_inbound_relay_peers @@ -168,6 +206,13 @@ impl NetworkBehaviour for Behaviour { // Limit the number of inbound peer connections. Different limits apply to direct peers // and peers connecting over a relay. + // + // This same check happens when the connection is established, but we are also checking + // here because it allows us to avoid potentially expensive protocol negotiation with the + // peer if there are already too many inbound connections. + // + // The check must be repeated when the connection is established due to race conditions, + // since multiple peers may be attempting to connect at the same time. if is_relay { if self.inbound_relay_peers >= self.limits.max_inbound_relay_peers { tracing::debug!(%connection_id, "Too many inbound relay peers, closing"); @@ -175,7 +220,6 @@ impl NetworkBehaviour for Behaviour { "too many inbound relay peers" ))); } - self.inbound_relay_peers += 1; } else { if self.inbound_direct_peers >= self.limits.max_inbound_direct_peers { tracing::debug!(%connection_id, "Too many inbound direct peers, closing"); @@ -183,7 +227,6 @@ impl NetworkBehaviour for Behaviour { "too many inbound direct peers" ))); } - self.inbound_direct_peers += 1; } self.inner diff --git a/crates/p2p/src/tests.rs b/crates/p2p/src/tests.rs index 774208b68a..749f96588c 100644 --- a/crates/p2p/src/tests.rs +++ b/crates/p2p/src/tests.rs @@ -246,8 +246,8 @@ async fn periodic_bootstrap() { }, }; let limits_cfg = LimitsConfig { - direct_connection_timeout: Duration::from_millis(500), - relay_connection_timeout: Duration::from_millis(500), + direct_connection_timeout: Duration::from_millis(50), + relay_connection_timeout: Duration::from_millis(50), ..Default::default() }; let mut boot = TestPeer::new(periodic_cfg, limits_cfg, Keypair::generate_ed25519()); @@ -369,28 +369,13 @@ async fn reconnect_too_quickly() { .await; // Attempt to immediately reconnect. - peer1 - .client - .dial(peer2.peer_id, addr2.clone()) - .await - .unwrap(); - - // The peer gets disconnected without completing the connection establishment handler. - wait_for_event(&mut peer1.event_receiver, |event| match event { - Event::Test(TestEvent::ConnectionClosed { remote }) if remote == peer2.peer_id => Some(()), - _ => None, - }) - .await; - - wait_for_event(&mut peer2.event_receiver, |event| match event { - Event::Test(TestEvent::ConnectionClosed { remote }) if remote == peer1.peer_id => Some(()), - _ => None, - }) - .await; + let result = peer1.client.dial(peer2.peer_id, addr2.clone()).await; + assert!(result.is_err()); // Attempt to reconnect after the timeout. tokio::time::sleep(CONNECTION_TIMEOUT).await; - peer1.client.dial(peer2.peer_id, addr2).await.unwrap(); + let result = peer1.client.dial(peer2.peer_id, addr2).await; + assert!(result.is_ok()); // The connection is established. wait_for_event(&mut peer1.event_receiver, |event| match event { @@ -574,36 +559,8 @@ async fn max_inbound_connections() { // Open another inbound connection to the peer. Since the limit is 2, and there are already 2 // inbound connections, this is not allowed. - peer4 - .client - .dial(peer1.peer_id, addr1.clone()) - .await - .unwrap(); - - wait_for_event(&mut peer4.event_receiver, |event| match event { - Event::Test(TestEvent::ConnectionEstablished { remote, .. }) if remote == peer1.peer_id => { - Some(()) - } - _ => None, - }) - .await; - - wait_for_event(&mut peer4.event_receiver, |event| match event { - Event::Test(TestEvent::ConnectionClosed { remote, .. }) if remote == peer1.peer_id => { - Some(()) - } - _ => None, - }) - .await; - - wait_for_event(&mut peer1.event_receiver, |event| match event { - Event::Test(TestEvent::ConnectionClosed { remote, .. }) if remote == peer4.peer_id => { - Some(()) - } - _ => None, - }) - .await; - + let result = peer4.client.dial(peer1.peer_id, addr1.clone()).await; + assert!(result.is_err()); assert!(peer4.connected().await.is_empty()); // The restriction does not apply to outbound connections, so peer 1 can still open a connection From 0c24cc03859f1f8377be36ccd26c161d0398c004 Mon Sep 17 00:00:00 2001 From: sistemd Date: Tue, 23 Jan 2024 23:07:09 +0100 Subject: [PATCH 5/5] clippy --- crates/p2p/src/behaviour.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 92730d6e9e..def8fc1628 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -220,13 +220,11 @@ impl NetworkBehaviour for Behaviour { "too many inbound relay peers" ))); } - } else { - if self.inbound_direct_peers >= self.limits.max_inbound_direct_peers { - tracing::debug!(%connection_id, "Too many inbound direct peers, closing"); - return Err(ConnectionDenied::new(anyhow!( - "too many inbound direct peers" - ))); - } + } else if self.inbound_direct_peers >= self.limits.max_inbound_direct_peers { + tracing::debug!(%connection_id, "Too many inbound direct peers, closing"); + return Err(ConnectionDenied::new(anyhow!( + "too many inbound direct peers" + ))); } self.inner @@ -353,7 +351,7 @@ impl Behaviour { tracing::debug!(%peer_id, "Peer already connected, closing"); return Err(ConnectionDenied::new(anyhow!("duplicate connection"))); } - self.connected_peers.insert(peer_id.clone()); + self.connected_peers.insert(peer_id); Ok(()) }