From b136f859513bf5aae3b94ea963ba1d942b9c8116 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Jul 2024 19:06:32 +0300 Subject: [PATCH 01/13] transport/limits: Add connection limits config Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 60 +++++++++++++++++++++++++++++++++ src/transport/manager/mod.rs | 1 + 2 files changed, 61 insertions(+) create mode 100644 src/transport/manager/limits.rs diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs new file mode 100644 index 00000000..ef02f499 --- /dev/null +++ b/src/transport/manager/limits.rs @@ -0,0 +1,60 @@ +// Copyright 2024 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Limits for the transport manager. + +/// Configuration for the connection limits. +#[derive(Debug, Clone, Default)] +pub struct ConnectionLimitsConfig { + /// Maximum number of connections that can be established. + max_connections: Option, + /// Maximum number of incoming connections that can be established. + max_incoming_connections: Option, + /// Maximum number of outgoing connections that can be established. + max_outgoing_connections: Option, + /// Maximum number of connections that can be established per peer. + max_connections_per_peer: Option, +} + +impl ConnectionLimitsConfig { + /// Configures the maximum number of connections that can be established. + pub fn max_connections(mut self, limit: Option) -> Self { + self.max_connections = limit; + self + } + + /// Configures the maximum number of incoming connections that can be established. + pub fn max_incoming_connections(mut self, limit: Option) -> Self { + self.max_incoming_connections = limit; + self + } + + /// Configures the maximum number of outgoing connections that can be established. + pub fn max_outgoing_connections(mut self, limit: Option) -> Self { + self.max_outgoing_connections = limit; + self + } + + /// Configures the maximum number of connections that can be established per peer. + pub fn max_connections_per_peer(mut self, limit: Option) -> Self { + self.max_connections_per_peer = limit; + self + } +} diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index a7248811..110f03bf 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -57,6 +57,7 @@ pub use handle::{TransportHandle, TransportManagerHandle}; pub use types::SupportedTransport; mod address; +pub mod limits; mod types; pub(crate) mod handle; From 6dd3bd4eb2f65f7781d8d79b34ab5618955eb873 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Jul 2024 19:32:44 +0300 Subject: [PATCH 02/13] transport/limits: Implement connection limits Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 114 ++++++++++++++++++++++++++++++-- 1 file changed, 108 insertions(+), 6 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index ef02f499..a3839b6d 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -20,6 +20,10 @@ //! Limits for the transport manager. +use crate::{transport::Endpoint, types::ConnectionId, PeerId}; + +use std::collections::{HashMap, HashSet}; + /// Configuration for the connection limits. #[derive(Debug, Clone, Default)] pub struct ConnectionLimitsConfig { @@ -34,12 +38,6 @@ pub struct ConnectionLimitsConfig { } impl ConnectionLimitsConfig { - /// Configures the maximum number of connections that can be established. - pub fn max_connections(mut self, limit: Option) -> Self { - self.max_connections = limit; - self - } - /// Configures the maximum number of incoming connections that can be established. pub fn max_incoming_connections(mut self, limit: Option) -> Self { self.max_incoming_connections = limit; @@ -57,4 +55,108 @@ impl ConnectionLimitsConfig { self.max_connections_per_peer = limit; self } + + /// Configures the maximum number of connections that can be established. + pub fn max_connections(mut self, limit: Option) -> Self { + self.max_connections = limit; + self + } +} + +/// Error type for connection limits. +enum ConnectionLimitsError { + /// Maximum number of incoming connections exceeded. + MaxIncomingConnectionsExceeded, + /// Maximum number of outgoing connections exceeded. + MaxOutgoingConnectionsExceeded, + /// Maximum number of connections per peer exceeded. + MaxConnectionsPerPeerExceeded, + /// Maximum number of connections exceeded. + MaxConnectionsExceeded, +} + +/// Connection limits. +#[derive(Debug, Clone)] +pub struct ConnectionLimits { + /// Configuration for the connection limits. + config: ConnectionLimitsConfig, + + /// Established incoming connections. + incoming_connections: HashSet, + /// Established outgoing connections. + outgoing_connections: HashSet, + /// Maximum number of connections that can be established per peer. + connections_per_peer: HashMap>, +} + +impl ConnectionLimits { + /// Called when a new connection is established. + pub fn on_connection_established( + &mut self, + peer: PeerId, + connection_id: ConnectionId, + is_listener: bool, + ) -> Result<(), ConnectionLimitsError> { + // incoming + // outgoing + // per peer + // total + + // Check connection limits. + if is_listener { + if let Some(max_incoming_connections) = self.config.max_incoming_connections { + if self.incoming_connections.len() >= max_incoming_connections { + return Err(ConnectionLimitsError::MaxIncomingConnectionsExceeded); + } + } + } else { + if let Some(max_outgoing_connections) = self.config.max_outgoing_connections { + if self.outgoing_connections.len() >= max_outgoing_connections { + return Err(ConnectionLimitsError::MaxOutgoingConnectionsExceeded); + } + } + } + + if let Some(max_connections_per_peer) = self.config.max_connections_per_peer { + if let Some(connections) = self.connections_per_peer.get(&peer) { + if connections.len() >= max_connections_per_peer { + return Err(ConnectionLimitsError::MaxConnectionsPerPeerExceeded); + } + } + } + + if let Some(max_connections) = self.config.max_connections { + if self.incoming_connections.len() + self.outgoing_connections.len() >= max_connections + { + return Err(ConnectionLimitsError::MaxConnectionsExceeded); + } + } + + // Keep track of the connection. + if is_listener { + self.incoming_connections.insert(connection_id); + } else { + self.outgoing_connections.insert(connection_id); + } + self.connections_per_peer.entry(peer).or_default().insert(connection_id); + + Ok(()) + } + + /// Called when a connection is closed. + pub fn on_connection_closed( + &mut self, + peer: PeerId, + connection_id: ConnectionId, + is_listener: bool, + ) { + if is_listener { + self.incoming_connections.remove(&connection_id); + } else { + self.outgoing_connections.remove(&connection_id); + } + if let Some(connections) = self.connections_per_peer.get_mut(&peer) { + connections.remove(&connection_id); + } + } } From 2755189882a2303ce39610d4f03d083174ca9b4d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Jul 2024 19:38:53 +0300 Subject: [PATCH 03/13] transport/limits: Preallocate and track connections only on limits Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 53 +++++++++++++++++---------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index a3839b6d..6ef220f3 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -20,15 +20,13 @@ //! Limits for the transport manager. -use crate::{transport::Endpoint, types::ConnectionId, PeerId}; +use crate::{types::ConnectionId, PeerId}; use std::collections::{HashMap, HashSet}; /// Configuration for the connection limits. #[derive(Debug, Clone, Default)] pub struct ConnectionLimitsConfig { - /// Maximum number of connections that can be established. - max_connections: Option, /// Maximum number of incoming connections that can be established. max_incoming_connections: Option, /// Maximum number of outgoing connections that can be established. @@ -55,12 +53,6 @@ impl ConnectionLimitsConfig { self.max_connections_per_peer = limit; self } - - /// Configures the maximum number of connections that can be established. - pub fn max_connections(mut self, limit: Option) -> Self { - self.max_connections = limit; - self - } } /// Error type for connection limits. @@ -71,8 +63,6 @@ enum ConnectionLimitsError { MaxOutgoingConnectionsExceeded, /// Maximum number of connections per peer exceeded. MaxConnectionsPerPeerExceeded, - /// Maximum number of connections exceeded. - MaxConnectionsExceeded, } /// Connection limits. @@ -90,6 +80,19 @@ pub struct ConnectionLimits { } impl ConnectionLimits { + /// Creates a new connection limits instance. + pub fn new(config: ConnectionLimitsConfig) -> Self { + let max_incoming_connections = config.max_incoming_connections.unwrap_or(0); + let max_outgoing_connections = config.max_outgoing_connections.unwrap_or(0); + + Self { + config, + incoming_connections: HashSet::with_capacity(max_incoming_connections), + outgoing_connections: HashSet::with_capacity(max_outgoing_connections), + connections_per_peer: HashMap::new(), + } + } + /// Called when a new connection is established. pub fn on_connection_established( &mut self, @@ -97,11 +100,6 @@ impl ConnectionLimits { connection_id: ConnectionId, is_listener: bool, ) -> Result<(), ConnectionLimitsError> { - // incoming - // outgoing - // per peer - // total - // Check connection limits. if is_listener { if let Some(max_incoming_connections) = self.config.max_incoming_connections { @@ -125,20 +123,23 @@ impl ConnectionLimits { } } - if let Some(max_connections) = self.config.max_connections { - if self.incoming_connections.len() + self.outgoing_connections.len() >= max_connections - { - return Err(ConnectionLimitsError::MaxConnectionsExceeded); - } - } - // Keep track of the connection. if is_listener { - self.incoming_connections.insert(connection_id); + if self.config.max_incoming_connections.is_some() { + self.incoming_connections.insert(connection_id); + } } else { - self.outgoing_connections.insert(connection_id); + if self.config.max_outgoing_connections.is_some() { + self.outgoing_connections.insert(connection_id); + } + } + + if let Some(max_connections_per_peer) = self.config.max_connections_per_peer { + self.connections_per_peer + .entry(peer) + .or_insert_with(|| HashSet::with_capacity(max_connections_per_peer)) + .insert(connection_id); } - self.connections_per_peer.entry(peer).or_default().insert(connection_id); Ok(()) } From 6cbca627906dc4da5cd0d0c3686f2a62aacb8a99 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Jul 2024 19:59:54 +0300 Subject: [PATCH 04/13] limits/tests: Check limits work properly Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 116 +++++++++++++++++++++++++++----- 1 file changed, 101 insertions(+), 15 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 6ef220f3..d1733558 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -22,7 +22,7 @@ use crate::{types::ConnectionId, PeerId}; -use std::collections::{HashMap, HashSet}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; /// Configuration for the connection limits. #[derive(Debug, Clone, Default)] @@ -56,7 +56,8 @@ impl ConnectionLimitsConfig { } /// Error type for connection limits. -enum ConnectionLimitsError { +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionLimitsError { /// Maximum number of incoming connections exceeded. MaxIncomingConnectionsExceeded, /// Maximum number of outgoing connections exceeded. @@ -145,19 +146,104 @@ impl ConnectionLimits { } /// Called when a connection is closed. - pub fn on_connection_closed( - &mut self, - peer: PeerId, - connection_id: ConnectionId, - is_listener: bool, - ) { - if is_listener { - self.incoming_connections.remove(&connection_id); - } else { - self.outgoing_connections.remove(&connection_id); - } - if let Some(connections) = self.connections_per_peer.get_mut(&peer) { - connections.remove(&connection_id); + pub fn on_connection_closed(&mut self, peer: PeerId, connection_id: ConnectionId) { + self.incoming_connections.remove(&connection_id); + self.outgoing_connections.remove(&connection_id); + + if let Entry::Occupied(mut entry) = self.connections_per_peer.entry(peer) { + entry.get_mut().remove(&connection_id); + + if entry.get().is_empty() { + entry.remove(); + } } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{types::ConnectionId, PeerId}; + + #[test] + fn connection_limits() { + let config = ConnectionLimitsConfig::default() + .max_incoming_connections(Some(3)) + .max_outgoing_connections(Some(2)) + .max_connections_per_peer(Some(2)); + let mut limits = ConnectionLimits::new(config); + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + let connection_id_in_1 = ConnectionId::random(); + let connection_id_in_2 = ConnectionId::random(); + let connection_id_out_1 = ConnectionId::random(); + let connection_id_out_2 = ConnectionId::random(); + let connection_id_in_3 = ConnectionId::random(); + let connection_id_out_3 = ConnectionId::random(); + + // Establish incoming connection. + assert!(limits + .on_connection_established(peer_a.clone(), connection_id_in_1, true) + .is_ok()); + assert_eq!(limits.incoming_connections.len(), 1); + + assert!(limits + .on_connection_established(peer_b.clone(), connection_id_in_2, true) + .is_ok()); + assert_eq!(limits.incoming_connections.len(), 2); + + assert!(limits + .on_connection_established(peer_c.clone(), connection_id_in_3, true) + .is_ok()); + assert_eq!(limits.incoming_connections.len(), 3); + + assert_eq!( + limits + .on_connection_established(PeerId::random(), ConnectionId::random(), true) + .unwrap_err(), + ConnectionLimitsError::MaxIncomingConnectionsExceeded + ); + assert_eq!(limits.incoming_connections.len(), 3); + + // Establish outgoing connection. + assert!(limits + .on_connection_established(peer_a.clone(), connection_id_out_1, false) + .is_ok()); + assert_eq!(limits.incoming_connections.len(), 3); + assert_eq!(limits.outgoing_connections.len(), 1); + + assert!(limits + .on_connection_established(peer_b.clone(), connection_id_out_2, false) + .is_ok()); + assert_eq!(limits.incoming_connections.len(), 3); + assert_eq!(limits.outgoing_connections.len(), 2); + + assert_eq!( + limits + .on_connection_established(peer_c.clone(), connection_id_out_3, false) + .unwrap_err(), + ConnectionLimitsError::MaxOutgoingConnectionsExceeded + ); + + // Close connections with peer a. + limits.on_connection_closed(peer_a.clone(), connection_id_in_1); + assert_eq!(limits.incoming_connections.len(), 2); + assert_eq!(limits.outgoing_connections.len(), 2); + assert_eq!(limits.connections_per_peer.len(), 3); + + limits.on_connection_closed(peer_a.clone(), connection_id_out_1); + assert_eq!(limits.incoming_connections.len(), 2); + assert_eq!(limits.outgoing_connections.len(), 1); + assert_eq!(limits.connections_per_peer.len(), 2); + + // We have room for another incoming connection, however the limit for peer b is reached. + assert_eq!( + limits + .on_connection_established(peer_b.clone(), connection_id_in_3, false) + .unwrap_err(), + ConnectionLimitsError::MaxConnectionsPerPeerExceeded + ); + } +} From 50cd290e10280d9091e00eca38249b4ae454195a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jul 2024 16:11:10 +0300 Subject: [PATCH 05/13] transport/limits: Max peer connection is presumed 2 in litep2p Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 38 +-------------------------------- 1 file changed, 1 insertion(+), 37 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index d1733558..9eb9ed7f 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -22,7 +22,7 @@ use crate::{types::ConnectionId, PeerId}; -use std::collections::{hash_map::Entry, HashMap, HashSet}; +use std::collections::HashSet; /// Configuration for the connection limits. #[derive(Debug, Clone, Default)] @@ -31,8 +31,6 @@ pub struct ConnectionLimitsConfig { max_incoming_connections: Option, /// Maximum number of outgoing connections that can be established. max_outgoing_connections: Option, - /// Maximum number of connections that can be established per peer. - max_connections_per_peer: Option, } impl ConnectionLimitsConfig { @@ -47,12 +45,6 @@ impl ConnectionLimitsConfig { self.max_outgoing_connections = limit; self } - - /// Configures the maximum number of connections that can be established per peer. - pub fn max_connections_per_peer(mut self, limit: Option) -> Self { - self.max_connections_per_peer = limit; - self - } } /// Error type for connection limits. @@ -62,8 +54,6 @@ pub enum ConnectionLimitsError { MaxIncomingConnectionsExceeded, /// Maximum number of outgoing connections exceeded. MaxOutgoingConnectionsExceeded, - /// Maximum number of connections per peer exceeded. - MaxConnectionsPerPeerExceeded, } /// Connection limits. @@ -76,8 +66,6 @@ pub struct ConnectionLimits { incoming_connections: HashSet, /// Established outgoing connections. outgoing_connections: HashSet, - /// Maximum number of connections that can be established per peer. - connections_per_peer: HashMap>, } impl ConnectionLimits { @@ -90,7 +78,6 @@ impl ConnectionLimits { config, incoming_connections: HashSet::with_capacity(max_incoming_connections), outgoing_connections: HashSet::with_capacity(max_outgoing_connections), - connections_per_peer: HashMap::new(), } } @@ -116,14 +103,6 @@ impl ConnectionLimits { } } - if let Some(max_connections_per_peer) = self.config.max_connections_per_peer { - if let Some(connections) = self.connections_per_peer.get(&peer) { - if connections.len() >= max_connections_per_peer { - return Err(ConnectionLimitsError::MaxConnectionsPerPeerExceeded); - } - } - } - // Keep track of the connection. if is_listener { if self.config.max_incoming_connections.is_some() { @@ -135,13 +114,6 @@ impl ConnectionLimits { } } - if let Some(max_connections_per_peer) = self.config.max_connections_per_peer { - self.connections_per_peer - .entry(peer) - .or_insert_with(|| HashSet::with_capacity(max_connections_per_peer)) - .insert(connection_id); - } - Ok(()) } @@ -149,14 +121,6 @@ impl ConnectionLimits { pub fn on_connection_closed(&mut self, peer: PeerId, connection_id: ConnectionId) { self.incoming_connections.remove(&connection_id); self.outgoing_connections.remove(&connection_id); - - if let Entry::Occupied(mut entry) = self.connections_per_peer.entry(peer) { - entry.get_mut().remove(&connection_id); - - if entry.get().is_empty() { - entry.remove(); - } - } } } From 206442ca174e3c1f5ff5604adbcccf6d65fb0edb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jul 2024 16:13:32 +0300 Subject: [PATCH 06/13] limits/tests: Remove peerid notion Signed-off-by: Alexandru Vasile --- src/transport/manager/limits.rs | 53 ++++++++------------------------- 1 file changed, 12 insertions(+), 41 deletions(-) diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 9eb9ed7f..5e76c19a 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -84,7 +84,6 @@ impl ConnectionLimits { /// Called when a new connection is established. pub fn on_connection_established( &mut self, - peer: PeerId, connection_id: ConnectionId, is_listener: bool, ) -> Result<(), ConnectionLimitsError> { @@ -118,7 +117,7 @@ impl ConnectionLimits { } /// Called when a connection is closed. - pub fn on_connection_closed(&mut self, peer: PeerId, connection_id: ConnectionId) { + pub fn on_connection_closed(&mut self, connection_id: ConnectionId) { self.incoming_connections.remove(&connection_id); self.outgoing_connections.remove(&connection_id); } @@ -127,19 +126,15 @@ impl ConnectionLimits { #[cfg(test)] mod tests { use super::*; - use crate::{types::ConnectionId, PeerId}; + use crate::types::ConnectionId; #[test] fn connection_limits() { let config = ConnectionLimitsConfig::default() .max_incoming_connections(Some(3)) - .max_outgoing_connections(Some(2)) - .max_connections_per_peer(Some(2)); + .max_outgoing_connections(Some(2)); let mut limits = ConnectionLimits::new(config); - let peer_a = PeerId::random(); - let peer_b = PeerId::random(); - let peer_c = PeerId::random(); let connection_id_in_1 = ConnectionId::random(); let connection_id_in_2 = ConnectionId::random(); let connection_id_out_1 = ConnectionId::random(); @@ -148,66 +143,42 @@ mod tests { let connection_id_out_3 = ConnectionId::random(); // Establish incoming connection. - assert!(limits - .on_connection_established(peer_a.clone(), connection_id_in_1, true) - .is_ok()); + assert!(limits.on_connection_established(connection_id_in_1, true).is_ok()); assert_eq!(limits.incoming_connections.len(), 1); - assert!(limits - .on_connection_established(peer_b.clone(), connection_id_in_2, true) - .is_ok()); + assert!(limits.on_connection_established(connection_id_in_2, true).is_ok()); assert_eq!(limits.incoming_connections.len(), 2); - assert!(limits - .on_connection_established(peer_c.clone(), connection_id_in_3, true) - .is_ok()); + assert!(limits.on_connection_established(connection_id_in_3, true).is_ok()); assert_eq!(limits.incoming_connections.len(), 3); assert_eq!( - limits - .on_connection_established(PeerId::random(), ConnectionId::random(), true) - .unwrap_err(), + limits.on_connection_established(ConnectionId::random(), true).unwrap_err(), ConnectionLimitsError::MaxIncomingConnectionsExceeded ); assert_eq!(limits.incoming_connections.len(), 3); // Establish outgoing connection. - assert!(limits - .on_connection_established(peer_a.clone(), connection_id_out_1, false) - .is_ok()); + assert!(limits.on_connection_established(connection_id_out_1, false).is_ok()); assert_eq!(limits.incoming_connections.len(), 3); assert_eq!(limits.outgoing_connections.len(), 1); - assert!(limits - .on_connection_established(peer_b.clone(), connection_id_out_2, false) - .is_ok()); + assert!(limits.on_connection_established(connection_id_out_2, false).is_ok()); assert_eq!(limits.incoming_connections.len(), 3); assert_eq!(limits.outgoing_connections.len(), 2); assert_eq!( - limits - .on_connection_established(peer_c.clone(), connection_id_out_3, false) - .unwrap_err(), + limits.on_connection_established(connection_id_out_3, false).unwrap_err(), ConnectionLimitsError::MaxOutgoingConnectionsExceeded ); // Close connections with peer a. - limits.on_connection_closed(peer_a.clone(), connection_id_in_1); + limits.on_connection_closed(connection_id_in_1); assert_eq!(limits.incoming_connections.len(), 2); assert_eq!(limits.outgoing_connections.len(), 2); - assert_eq!(limits.connections_per_peer.len(), 3); - limits.on_connection_closed(peer_a.clone(), connection_id_out_1); + limits.on_connection_closed(connection_id_out_1); assert_eq!(limits.incoming_connections.len(), 2); assert_eq!(limits.outgoing_connections.len(), 1); - assert_eq!(limits.connections_per_peer.len(), 2); - - // We have room for another incoming connection, however the limit for peer b is reached. - assert_eq!( - limits - .on_connection_established(peer_b.clone(), connection_id_in_3, false) - .unwrap_err(), - ConnectionLimitsError::MaxConnectionsPerPeerExceeded - ); } } From a439cc735b3d8064ec8f56c7f181cbd793f6adcd Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jul 2024 16:35:25 +0300 Subject: [PATCH 07/13] transport: Allow litep2p config to configure transport limits Signed-off-by: Alexandru Vasile --- src/config.rs | 20 +++++++++++++++++--- src/lib.rs | 1 + src/transport/manager/limits.rs | 2 +- src/transport/manager/mod.rs | 24 +++++++++++++++++++++++- 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/config.rs b/src/config.rs index 570be398..75620a59 100644 --- a/src/config.rs +++ b/src/config.rs @@ -29,9 +29,9 @@ use crate::{ notification, request_response, UserProtocol, }, transport::{ - quic::config::Config as QuicConfig, tcp::config::Config as TcpConfig, - webrtc::config::Config as WebRtcConfig, websocket::config::Config as WebSocketConfig, - MAX_PARALLEL_DIALS, + manager::limits::ConnectionLimitsConfig, quic::config::Config as QuicConfig, + tcp::config::Config as TcpConfig, webrtc::config::Config as WebRtcConfig, + websocket::config::Config as WebSocketConfig, MAX_PARALLEL_DIALS, }, types::protocol::ProtocolName, PeerId, @@ -109,6 +109,9 @@ pub struct ConfigBuilder { /// Maximum number of parallel dial attempts. max_parallel_dials: usize, + + /// Connection limits config. + connection_limits: ConnectionLimitsConfig, } impl Default for ConfigBuilder { @@ -137,6 +140,7 @@ impl ConfigBuilder { notification_protocols: HashMap::new(), request_response_protocols: HashMap::new(), known_addresses: Vec::new(), + connection_limits: ConnectionLimitsConfig::default(), } } @@ -243,6 +247,12 @@ impl ConfigBuilder { self } + /// Set connection limits configuration. + pub fn with_connection_limits(mut self, config: ConnectionLimitsConfig) -> Self { + self.connection_limits = config; + self + } + /// Build [`Litep2pConfig`]. pub fn build(mut self) -> Litep2pConfig { let keypair = match self.keypair { @@ -267,6 +277,7 @@ impl ConfigBuilder { notification_protocols: self.notification_protocols, request_response_protocols: self.request_response_protocols, known_addresses: self.known_addresses, + connection_limits: self.connection_limits, } } } @@ -320,4 +331,7 @@ pub struct Litep2pConfig { /// Known addresses. pub(crate) known_addresses: Vec<(PeerId, Vec)>, + + /// Connection limits config. + pub(crate) connection_limits: ConnectionLimitsConfig, } diff --git a/src/lib.rs b/src/lib.rs index dfa67b97..ef75a3b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -143,6 +143,7 @@ impl Litep2p { supported_transports, bandwidth_sink.clone(), litep2p_config.max_parallel_dials, + litep2p_config.connection_limits, ); // add known addresses to `TransportManager`, if any exist diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 5e76c19a..551cbac9 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -20,7 +20,7 @@ //! Limits for the transport manager. -use crate::{types::ConnectionId, PeerId}; +use crate::types::ConnectionId; use std::collections::HashSet; diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 110f03bf..844627a9 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -76,7 +76,7 @@ const SCORE_CONNECT_SUCCESS: i32 = 100i32; /// Score for a non-working address. const SCORE_CONNECT_FAILURE: i32 = -100i32; -/// TODO: +/// The connection established result. enum ConnectionEstablishedResult { /// Accept connection and inform `Litep2p` about the connection. Accept, @@ -243,6 +243,9 @@ pub struct TransportManager { /// Pending connections. pending_connections: HashMap, + + /// Connection limits. + connection_limits: limits::ConnectionLimits, } impl TransportManager { @@ -253,6 +256,7 @@ impl TransportManager { supported_transports: HashSet, bandwidth_sink: BandwidthSink, max_parallel_dials: usize, + connection_limits_config: limits::ConnectionLimitsConfig, ) -> (Self, TransportManagerHandle) { let local_peer_id = PeerId::from_public_key(&keypair.public().into()); let peers = Arc::new(RwLock::new(HashMap::new())); @@ -285,6 +289,7 @@ impl TransportManager { pending_connections: HashMap::new(), next_substream_id: Arc::new(AtomicUsize::new(0usize)), next_connection_id: Arc::new(AtomicUsize::new(0usize)), + connection_limits: limits::ConnectionLimits::new(connection_limits_config), }, handle, ) @@ -760,6 +765,8 @@ impl TransportManager { peer: PeerId, connection_id: ConnectionId, ) -> crate::Result> { + self.connection_limits.on_connection_closed(connection_id); + let mut peers = self.peers.write(); let Some(context) = peers.get_mut(&peer) else { tracing::warn!( @@ -912,6 +919,21 @@ impl TransportManager { } }; + // Reject the connection if exceeded limits. + if let Err(error) = self + .connection_limits + .on_connection_established(endpoint.connection_id(), endpoint.is_listener()) + { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?endpoint, + ?error, + "connection limit exceeded, rejecting connection", + ); + return Ok(ConnectionEstablishedResult::Reject); + } + let mut peers = self.peers.write(); match peers.get_mut(&peer) { Some(context) => match context.state { From 7be1662ea8731a6244d32d386bbc711db9e5cb39 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 25 Jul 2024 16:52:41 +0300 Subject: [PATCH 08/13] tests: Add connection limits Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 8 ++- src/protocol/mdns.rs | 8 ++- src/protocol/notification/tests/mod.rs | 3 +- src/protocol/request_response/tests.rs | 3 +- src/transport/manager/mod.rs | 76 +++++++++++++++++++++++--- src/transport/tcp/mod.rs | 5 +- 6 files changed, 88 insertions(+), 15 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index a46e59f4..4904fee0 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -897,8 +897,11 @@ mod tests { use super::*; use crate::{ - codec::ProtocolCodec, crypto::ed25519::Keypair, transport::manager::TransportManager, - types::protocol::ProtocolName, BandwidthSink, + codec::ProtocolCodec, + crypto::ed25519::Keypair, + transport::manager::{limits::ConnectionLimitsConfig, TransportManager}, + types::protocol::ProtocolName, + BandwidthSink, }; use tokio::sync::mpsc::channel; @@ -914,6 +917,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let peer = PeerId::random(); diff --git a/src/protocol/mdns.rs b/src/protocol/mdns.rs index 685c34f7..d305a3f7 100644 --- a/src/protocol/mdns.rs +++ b/src/protocol/mdns.rs @@ -334,7 +334,11 @@ impl Mdns { #[cfg(test)] mod tests { use super::*; - use crate::{crypto::ed25519::Keypair, transport::manager::TransportManager, BandwidthSink}; + use crate::{ + crypto::ed25519::Keypair, + transport::manager::{limits::ConnectionLimitsConfig, TransportManager}, + BandwidthSink, + }; use futures::StreamExt; use multiaddr::Protocol; @@ -350,6 +354,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let mdns1 = Mdns::new( @@ -372,6 +377,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let mdns2 = Mdns::new( diff --git a/src/protocol/notification/tests/mod.rs b/src/protocol/notification/tests/mod.rs index 43dd9121..0b275502 100644 --- a/src/protocol/notification/tests/mod.rs +++ b/src/protocol/notification/tests/mod.rs @@ -29,7 +29,7 @@ use crate::{ }, InnerTransportEvent, ProtocolCommand, TransportService, }, - transport::manager::TransportManager, + transport::manager::{limits::ConnectionLimitsConfig, TransportManager}, types::protocol::ProtocolName, BandwidthSink, PeerId, }; @@ -53,6 +53,7 @@ fn make_notification_protocol() -> ( HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let peer = PeerId::random(); diff --git a/src/protocol/request_response/tests.rs b/src/protocol/request_response/tests.rs index 524b3b2d..7c57b4f9 100644 --- a/src/protocol/request_response/tests.rs +++ b/src/protocol/request_response/tests.rs @@ -29,7 +29,7 @@ use crate::{ InnerTransportEvent, TransportService, }, substream::Substream, - transport::manager::TransportManager, + transport::manager::{limits::ConnectionLimitsConfig, TransportManager}, types::{RequestId, SubstreamId}, BandwidthSink, Error, PeerId, ProtocolName, }; @@ -51,6 +51,7 @@ fn protocol() -> ( HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let peer = PeerId::random(); diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 844627a9..b7c21099 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -1639,6 +1639,8 @@ impl TransportManager { #[cfg(test)] mod tests { + use limits::ConnectionLimitsConfig; + use super::*; use crate::{ crypto::ed25519::Keypair, executor::DefaultExecutor, transport::dummy::DummyTransport, @@ -1653,8 +1655,13 @@ mod tests { #[cfg(debug_assertions)] fn duplicate_protocol() { let sink = BandwidthSink::new(); - let (mut manager, _handle) = - TransportManager::new(Keypair::generate(), HashSet::new(), sink, 8usize); + let (mut manager, _handle) = TransportManager::new( + Keypair::generate(), + HashSet::new(), + sink, + 8usize, + ConnectionLimitsConfig::default(), + ); manager.register_protocol( ProtocolName::from("/notif/1"), @@ -1673,8 +1680,13 @@ mod tests { #[cfg(debug_assertions)] fn fallback_protocol_as_duplicate_main_protocol() { let sink = BandwidthSink::new(); - let (mut manager, _handle) = - TransportManager::new(Keypair::generate(), HashSet::new(), sink, 8usize); + let (mut manager, _handle) = TransportManager::new( + Keypair::generate(), + HashSet::new(), + sink, + 8usize, + ConnectionLimitsConfig::default(), + ); manager.register_protocol( ProtocolName::from("/notif/1"), @@ -1696,8 +1708,13 @@ mod tests { #[cfg(debug_assertions)] fn duplicate_fallback_protocol() { let sink = BandwidthSink::new(); - let (mut manager, _handle) = - TransportManager::new(Keypair::generate(), HashSet::new(), sink, 8usize); + let (mut manager, _handle) = TransportManager::new( + Keypair::generate(), + HashSet::new(), + sink, + 8usize, + ConnectionLimitsConfig::default(), + ); manager.register_protocol( ProtocolName::from("/notif/1"), @@ -1722,8 +1739,13 @@ mod tests { #[cfg(debug_assertions)] fn duplicate_transport() { let sink = BandwidthSink::new(); - let (mut manager, _handle) = - TransportManager::new(Keypair::generate(), HashSet::new(), sink, 8usize); + let (mut manager, _handle) = TransportManager::new( + Keypair::generate(), + HashSet::new(), + sink, + 8usize, + ConnectionLimitsConfig::default(), + ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1734,7 +1756,13 @@ mod tests { let keypair = Keypair::generate(); let local_peer_id = PeerId::from_public_key(&keypair.public().into()); let sink = BandwidthSink::new(); - let (mut manager, _handle) = TransportManager::new(keypair, HashSet::new(), sink, 8usize); + let (mut manager, _handle) = TransportManager::new( + keypair, + HashSet::new(), + sink, + 8usize, + ConnectionLimitsConfig::default(), + ); assert!(manager.dial(local_peer_id).await.is_err()); } @@ -1746,6 +1774,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1775,6 +1804,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let peer = PeerId::random(); let dial_address = Multiaddr::empty() @@ -1836,6 +1866,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1866,6 +1897,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1910,6 +1942,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1928,6 +1961,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -1956,6 +1990,7 @@ mod tests { HashSet::from_iter([SupportedTransport::Tcp, SupportedTransport::Quic]), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); // ipv6 @@ -2014,6 +2049,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2080,6 +2116,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2166,6 +2203,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let _handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2250,6 +2288,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2354,6 +2393,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2456,6 +2496,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2562,6 +2603,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); @@ -2690,6 +2732,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); manager.on_dial_failure(ConnectionId::random()).unwrap(); @@ -2708,6 +2751,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2728,6 +2772,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); manager.on_connection_closed(PeerId::random(), ConnectionId::random()).unwrap(); } @@ -2745,6 +2790,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); manager .on_connection_opened( @@ -2768,6 +2814,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2791,6 +2838,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2817,6 +2865,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); manager @@ -2837,6 +2886,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let connection_id = ConnectionId::random(); let peer = PeerId::random(); @@ -2856,6 +2906,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); assert!(manager.next().await.is_none()); @@ -2868,6 +2919,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let peer = { @@ -2915,6 +2967,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let peer = { @@ -2958,6 +3011,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let peer = { @@ -3001,6 +3055,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); // transport doesn't start with ip/dns @@ -3066,6 +3121,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); async fn call_manager(manager: &mut TransportManager, address: Multiaddr) { @@ -3119,6 +3175,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let peer = PeerId::random(); let dial_address = Multiaddr::empty() @@ -3210,6 +3267,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let peer = PeerId::random(); let dial_address = Multiaddr::empty() diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index eb06a863..62cc00d9 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -490,7 +490,9 @@ mod tests { codec::ProtocolCodec, crypto::ed25519::Keypair, executor::DefaultExecutor, - transport::manager::{ProtocolContext, SupportedTransport, TransportManager}, + transport::manager::{ + limits::ConnectionLimitsConfig, ProtocolContext, SupportedTransport, TransportManager, + }, types::protocol::ProtocolName, BandwidthSink, PeerId, }; @@ -683,6 +685,7 @@ mod tests { HashSet::new(), BandwidthSink::new(), 8usize, + ConnectionLimitsConfig::default(), ); let handle = manager.transport_handle(Arc::new(DefaultExecutor {})); manager.register_transport( From e162bc2fd54bc541f695885af75238de3c0d48ac Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 26 Jul 2024 12:53:13 +0300 Subject: [PATCH 09/13] transport/limits: Limit dial attempts Signed-off-by: Alexandru Vasile --- src/error.rs | 9 +++++++++ src/transport/manager/limits.rs | 20 ++++++++++++++++++++ src/transport/manager/mod.rs | 14 +++++++++++--- 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/error.rs b/src/error.rs index efb8ad6f..05dbbd82 100644 --- a/src/error.rs +++ b/src/error.rs @@ -28,6 +28,7 @@ use crate::{ protocol::Direction, + transport::manager::limits::ConnectionLimitsError, types::{protocol::ProtocolName, ConnectionId, SubstreamId}, PeerId, }; @@ -118,6 +119,8 @@ pub enum Error { ChannelClogged, #[error("Connection doesn't exist: `{0:?}`")] ConnectionDoesntExist(ConnectionId), + #[error("Exceeded connection limits `{0:?}`")] + ConnectionLimit(ConnectionLimitsError), } #[derive(Debug, thiserror::Error)] @@ -243,6 +246,12 @@ impl From for Error { } } +impl From for Error { + fn from(error: ConnectionLimitsError) -> Self { + Error::ConnectionLimit(error) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/transport/manager/limits.rs b/src/transport/manager/limits.rs index 551cbac9..b6838e8e 100644 --- a/src/transport/manager/limits.rs +++ b/src/transport/manager/limits.rs @@ -81,6 +81,26 @@ impl ConnectionLimits { } } + /// Called when dialing an address. + /// + /// Returns the number of outgoing connections permitted to be established. + /// It is guaranteed that at least one connection can be established if the method returns `Ok`. + /// The number of available outgoing connections can influence the maximum parallel dials to a + /// single address. + /// + /// If the maximum number of outgoing connections is not set, `Ok(usize::MAX)` is returned. + pub fn on_dial_address(&mut self) -> Result { + if let Some(max_outgoing_connections) = self.config.max_outgoing_connections { + if self.outgoing_connections.len() >= max_outgoing_connections { + return Err(ConnectionLimitsError::MaxOutgoingConnectionsExceeded); + } + + return Ok(max_outgoing_connections - self.outgoing_connections.len()); + } + + Ok(usize::MAX) + } + /// Called when a new connection is established. pub fn on_connection_established( &mut self, diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index b7c21099..89cd4b2c 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -399,6 +399,12 @@ impl TransportManager { /// /// Returns an error if the peer is unknown or the peer is already connected. pub async fn dial(&mut self, peer: PeerId) -> crate::Result<()> { + // Don't alter the peer state if there's no capacity to dial. + let available_capacity = self.connection_limits.on_dial_address()?; + // The available capacity is the maximum number of connections that can be established, + // so we limit the number of parallel dials to the minimum of these values. + let limit = available_capacity.min(self.max_parallel_dials); + if peer == self.local_peer_id { return Err(Error::TriedToDialSelf); } @@ -441,7 +447,7 @@ impl TransportManager { tracing::debug!( target: LOG_TARGET, ?peer, - "peer is aready being dialed", + "peer is already being dialed", ); peers.insert( @@ -457,7 +463,7 @@ impl TransportManager { } let mut records: HashMap<_, _> = addresses - .take(self.max_parallel_dials) + .take(limit) .into_iter() .map(|record| (record.address().clone(), record)) .collect(); @@ -564,6 +570,8 @@ impl TransportManager { /// /// Returns an error if address it not valid. pub async fn dial_address(&mut self, address: Multiaddr) -> crate::Result<()> { + self.connection_limits.on_dial_address()?; + let mut record = AddressRecord::from_multiaddr(address) .ok_or(Error::AddressError(AddressError::PeerIdMissing))?; @@ -1074,7 +1082,7 @@ impl TransportManager { }); // since an inbound connection was removed, the outbound connection can be - // removed from pendind dials + // removed from pending dials // // all records have the same `ConnectionId` so it doens't matter which of them // is used to remove the pending dial From 53bb9eb2a1e87708a812243569dec83a50d5ff2c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 26 Jul 2024 18:17:09 +0300 Subject: [PATCH 10/13] transport/manager/tests: Check inbound limits Signed-off-by: Alexandru Vasile --- src/transport/manager/mod.rs | 89 ++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 89cd4b2c..036d61d6 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -77,6 +77,7 @@ const SCORE_CONNECT_SUCCESS: i32 = 100i32; const SCORE_CONNECT_FAILURE: i32 = -100i32; /// The connection established result. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] enum ConnectionEstablishedResult { /// Accept connection and inform `Litep2p` about the connection. Accept, @@ -3355,4 +3356,92 @@ mod tests { state => panic!("invalid peer state: {state:?}"), } } + + #[tokio::test] + async fn manager_limits_incoming_connections() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let (mut manager, _handle) = TransportManager::new( + Keypair::generate(), + HashSet::new(), + BandwidthSink::new(), + 8usize, + ConnectionLimitsConfig::default() + .max_incoming_connections(Some(3)) + .max_outgoing_connections(Some(2)), + ); + // The connection limit is agnostic of the underlying transports. + manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); + + let peer = PeerId::random(); + let second_peer = PeerId::random(); + + let setup_dial_addr = |connection_id: u16| { + let dial_address = Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .with(Protocol::Tcp(8888 + connection_id)) + .with(Protocol::P2p( + Multihash::from_bytes(&peer.to_bytes()).unwrap(), + )); + let connection_id = ConnectionId::from(connection_id as usize); + + (dial_address, connection_id) + }; + + // Setup addresses. + let (first_addr, first_connection_id) = setup_dial_addr(0); + let (second_addr, second_connection_id) = setup_dial_addr(1); + let (_, third_connection_id) = setup_dial_addr(2); + let (_, remote_connection_id) = setup_dial_addr(3); + + // Peer established the first inbound connection. + let result = manager + .on_connection_established( + peer, + &Endpoint::listener(first_addr.clone(), first_connection_id), + ) + .unwrap(); + assert_eq!(result, ConnectionEstablishedResult::Accept); + + // The peer is allowed to dial us a second time. + let result = manager + .on_connection_established( + peer, + &Endpoint::listener(first_addr.clone(), second_connection_id), + ) + .unwrap(); + assert_eq!(result, ConnectionEstablishedResult::Accept); + + // Second peer calls us. + let result = manager + .on_connection_established( + second_peer, + &Endpoint::listener(second_addr.clone(), third_connection_id), + ) + .unwrap(); + assert_eq!(result, ConnectionEstablishedResult::Accept); + + // Limits of inbound connections are reached. + let result = manager + .on_connection_established( + second_peer, + &Endpoint::listener(second_addr.clone(), remote_connection_id), + ) + .unwrap(); + assert_eq!(result, ConnectionEstablishedResult::Reject); + + // Close one connection. + let _ = manager.on_connection_closed(peer, first_connection_id).unwrap(); + + // The second peer can establish 2 inbounds now. + let result = manager + .on_connection_established( + second_peer, + &Endpoint::listener(second_addr.clone(), remote_connection_id), + ) + .unwrap(); + assert_eq!(result, ConnectionEstablishedResult::Accept); + } } From aa6b1a6113b01d763cf87fb828600c4175c5797b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 26 Jul 2024 18:47:27 +0300 Subject: [PATCH 11/13] manager/tests: Check outbound connection limits Signed-off-by: Alexandru Vasile --- src/transport/manager/mod.rs | 95 ++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 036d61d6..dde1907b 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -3444,4 +3444,99 @@ mod tests { .unwrap(); assert_eq!(result, ConnectionEstablishedResult::Accept); } + + #[tokio::test] + async fn manager_limits_outbound_connections() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let (mut manager, _handle) = TransportManager::new( + Keypair::generate(), + HashSet::new(), + BandwidthSink::new(), + 8usize, + ConnectionLimitsConfig::default() + .max_incoming_connections(Some(3)) + .max_outgoing_connections(Some(2)), + ); + // The connection limit is agnostic of the underlying transports. + manager.register_transport(SupportedTransport::Tcp, Box::new(DummyTransport::new())); + + let peer = PeerId::random(); + let second_peer = PeerId::random(); + let third_peer = PeerId::random(); + + let setup_dial_addr = |peer: PeerId, connection_id: u16| { + let dial_address = Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .with(Protocol::Tcp(8888 + connection_id)) + .with(Protocol::P2p( + Multihash::from_bytes(&peer.to_bytes()).unwrap(), + )); + let connection_id = ConnectionId::from(connection_id as usize); + + (dial_address, connection_id) + }; + + // Setup addresses. + let (first_addr, first_connection_id) = setup_dial_addr(peer, 0); + let (second_addr, second_connection_id) = setup_dial_addr(second_peer, 1); + let (third_addr, third_connection_id) = setup_dial_addr(third_peer, 2); + + // First dial. + manager.dial_address(first_addr.clone()).await.unwrap(); + + // Second dial. + manager.dial_address(second_addr.clone()).await.unwrap(); + + // Third dial, we have a limit on 2 outbound connections. + manager.dial_address(third_addr.clone()).await.unwrap(); + + let result = manager + .on_connection_established( + peer, + &Endpoint::dialer(first_addr.clone(), first_connection_id), + ) + .unwrap(); + assert_eq!(result, ConnectionEstablishedResult::Accept); + + let result = manager + .on_connection_established( + second_peer, + &Endpoint::dialer(second_addr.clone(), second_connection_id), + ) + .unwrap(); + assert_eq!(result, ConnectionEstablishedResult::Accept); + + // We have reached the limit now. + let result = manager + .on_connection_established( + third_peer, + &Endpoint::dialer(third_addr.clone(), third_connection_id), + ) + .unwrap(); + assert_eq!(result, ConnectionEstablishedResult::Reject); + + // While we have 2 outbound connections active, any dials will fail immediately. + // We cannot perform this check for the non negotiated inbound connections yet, + // since the transport will eagerly accept and negotiate them. This requires + // a refactor into the transport manager, to not waste resources on + // negotiating connections that will be rejected. + let result = manager.dial(peer).await.unwrap_err(); + assert!(std::matches!( + result, + Error::ConnectionLimit(limits::ConnectionLimitsError::MaxOutgoingConnectionsExceeded) + )); + let result = manager.dial_address(first_addr.clone()).await.unwrap_err(); + assert!(std::matches!( + result, + Error::ConnectionLimit(limits::ConnectionLimitsError::MaxOutgoingConnectionsExceeded) + )); + + // Close one connection. + let _ = manager.on_connection_closed(peer, first_connection_id).unwrap(); + // We can now dial again. + manager.dial_address(first_addr).await.unwrap(); + } } From 4c7cda693a953548e1aaa594dbe6972a5fac18fa Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 26 Jul 2024 18:51:52 +0300 Subject: [PATCH 12/13] manager/tests: Refactor helper fn for addr Signed-off-by: Alexandru Vasile --- src/transport/manager/mod.rs | 45 ++++++++++++++---------------------- 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index dde1907b..e9669cfc 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -1659,6 +1659,19 @@ mod tests { sync::Arc, }; + /// Setup TCP address and connection id. + fn setup_dial_addr(peer: PeerId, connection_id: u16) -> (Multiaddr, ConnectionId) { + let dial_address = Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .with(Protocol::Tcp(8888 + connection_id)) + .with(Protocol::P2p( + Multihash::from_bytes(&peer.to_bytes()).unwrap(), + )); + let connection_id = ConnectionId::from(connection_id as usize); + + (dial_address, connection_id) + } + #[test] #[should_panic] #[cfg(debug_assertions)] @@ -3378,23 +3391,11 @@ mod tests { let peer = PeerId::random(); let second_peer = PeerId::random(); - let setup_dial_addr = |connection_id: u16| { - let dial_address = Multiaddr::empty() - .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) - .with(Protocol::Tcp(8888 + connection_id)) - .with(Protocol::P2p( - Multihash::from_bytes(&peer.to_bytes()).unwrap(), - )); - let connection_id = ConnectionId::from(connection_id as usize); - - (dial_address, connection_id) - }; - // Setup addresses. - let (first_addr, first_connection_id) = setup_dial_addr(0); - let (second_addr, second_connection_id) = setup_dial_addr(1); - let (_, third_connection_id) = setup_dial_addr(2); - let (_, remote_connection_id) = setup_dial_addr(3); + let (first_addr, first_connection_id) = setup_dial_addr(peer, 0); + let (second_addr, second_connection_id) = setup_dial_addr(second_peer, 1); + let (_, third_connection_id) = setup_dial_addr(peer, 2); + let (_, remote_connection_id) = setup_dial_addr(peer, 3); // Peer established the first inbound connection. let result = manager @@ -3467,18 +3468,6 @@ mod tests { let second_peer = PeerId::random(); let third_peer = PeerId::random(); - let setup_dial_addr = |peer: PeerId, connection_id: u16| { - let dial_address = Multiaddr::empty() - .with(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) - .with(Protocol::Tcp(8888 + connection_id)) - .with(Protocol::P2p( - Multihash::from_bytes(&peer.to_bytes()).unwrap(), - )); - let connection_id = ConnectionId::from(connection_id as usize); - - (dial_address, connection_id) - }; - // Setup addresses. let (first_addr, first_connection_id) = setup_dial_addr(peer, 0); let (second_addr, second_connection_id) = setup_dial_addr(second_peer, 1); From 356caac789d2272379bc2431c2c8727f44664ccd Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 29 Jul 2024 19:08:45 +0300 Subject: [PATCH 13/13] transport/tests: Check connection establishes after dial atempt Signed-off-by: Alexandru Vasile --- src/transport/manager/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index e9669cfc..e39cd75a 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -3526,6 +3526,11 @@ mod tests { // Close one connection. let _ = manager.on_connection_closed(peer, first_connection_id).unwrap(); // We can now dial again. - manager.dial_address(first_addr).await.unwrap(); + manager.dial_address(first_addr.clone()).await.unwrap(); + + let result = manager + .on_connection_established(peer, &Endpoint::dialer(first_addr, first_connection_id)) + .unwrap(); + assert_eq!(result, ConnectionEstablishedResult::Accept); } }