From 6fc98dbaedb7d32e31ce01bb3fa5b51e245ae96f Mon Sep 17 00:00:00 2001 From: santos227 Date: Thu, 13 Jan 2022 18:07:07 +0100 Subject: [PATCH] {core,swarm}/: Allow configuring dial concurrency factor per dial (#2404) Enable a `NetworkBehaviour` or a user via `Swarm::dial` to override the dial concurrency factor per dial. This is especially relevant in the case of libp2p-autonat where one wants to probe addresses in sequence to reduce the amount of work a remote peer can force onto the local node. To enable the above, this commit also: - Introduces `libp2p_core::DialOpts` mirroring `libp2p_swarm::DialOpts`. Passed as an argument to `Network::dial`. - Removes `Peer::dial` in favor of `Network::dial`. - Simplifies `Swarm::dial_with_handler`. The introduction of `libp2p_core::DialOpts` will be useful beyond this feature, e.g. for https://github.com/libp2p/rust-libp2p/pull/2363. In the long run I would like to move and merge `libp2p_core::Network` and `libp2p_core::Pool` into `libp2p_swarm::Swarm` thus deduplicating `libp2p_core::DialOpts` and `libp2p_swarm::DialOpts`. Fixes #2385. --- core/CHANGELOG.md | 10 + core/src/connection/pool.rs | 8 +- core/src/lib.rs | 2 +- core/src/network.rs | 206 +++++++++++++++------ core/src/network/peer.rs | 47 +---- core/tests/concurrent_dialing.rs | 10 +- core/tests/connection_limits.rs | 22 ++- core/tests/network_dial_error.rs | 19 +- misc/multistream-select/tests/transport.rs | 2 +- swarm/CHANGELOG.md | 4 + swarm/src/dial_opts.rs | 19 ++ swarm/src/lib.rs | 73 ++++---- 12 files changed, 267 insertions(+), 155 deletions(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index e66d7501..b7db3b4f 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -11,12 +11,22 @@ - Add `ConnectedPoint::is_relayed` (see [PR 2392]). +- Enable overriding _dial concurrency factor_ per dial via + `DialOpts::override_dial_concurrency_factor`. + + - Introduces `libp2p_core::DialOpts` mirroring `libp2p_swarm::DialOpts`. + Passed as an argument to `Network::dial`. + - Removes `Peer::dial` in favor of `Network::dial`. + + See [PR 2404]. + - Implement `Serialize` and `Deserialize` for `PeerId` (see [PR 2408]) [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2352]: https://github.com/libp2p/rust-libp2p/pull/2352 [PR 2392]: https://github.com/libp2p/rust-libp2p/pull/2392 +[PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404 [PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408 # 0.30.1 [2021-11-16] diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 59d41908..85ac1b8a 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -535,6 +535,7 @@ where addresses: impl Iterator + Send + 'static, peer: Option, handler: THandler, + dial_concurrency_factor_override: Option, ) -> Result> where TTrans: Clone + Send, @@ -544,7 +545,12 @@ where return Err(DialError::ConnectionLimit { limit, handler }); }; - let dial = ConcurrentDial::new(transport, peer, addresses, self.dial_concurrency_factor); + let dial = ConcurrentDial::new( + transport, + peer, + addresses, + dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor), + ); let connection_id = self.next_connection_id(); diff --git a/core/src/lib.rs b/core/src/lib.rs index 594fb668..055374ba 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -72,7 +72,7 @@ pub use identity::PublicKey; pub use multiaddr::Multiaddr; pub use multihash; pub use muxing::StreamMuxer; -pub use network::Network; +pub use network::{DialOpts, Network}; pub use peer_id::PeerId; pub use peer_record::PeerRecord; pub use signed_envelope::SignedEnvelope; diff --git a/core/src/network.rs b/core/src/network.rs index 00946bc9..63a07573 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -36,6 +36,7 @@ use crate::{ transport::{Transport, TransportError}, Executor, Multiaddr, PeerId, }; +use either::Either; use std::{ convert::TryFrom as _, error, fmt, @@ -43,6 +44,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use thiserror::Error; /// Implementation of `Stream` that handles the nodes. pub struct Network @@ -185,16 +187,15 @@ where &self.local_peer_id } - /// Dials a [`Multiaddr`] that may or may not encapsulate a - /// specific expected remote peer ID. + /// Dial a known or unknown peer. /// /// The given `handler` will be used to create the /// [`Connection`](crate::connection::Connection) upon success and the /// connection ID is returned. pub fn dial( &mut self, - address: &Multiaddr, handler: THandler, + opts: impl Into, ) -> Result> where TTrans: Transport + Send, @@ -203,50 +204,54 @@ where TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, { - // If the address ultimately encapsulates an expected peer ID, dial that peer - // such that any mismatch is detected. We do not "pop off" the `P2p` protocol - // from the address, because it may be used by the `Transport`, i.e. `P2p` - // is a protocol component that can influence any transport, like `libp2p-dns`. - if let Some(multiaddr::Protocol::P2p(ma)) = address.iter().last() { - if let Ok(peer) = PeerId::try_from(ma) { - return self.dial_peer(DialingOpts { - peer, - addresses: std::iter::once(address.clone()), - handler, - }); + let opts = opts.into(); + + let (peer_id, addresses, dial_concurrency_factor_override) = match opts.0 { + // Dial a known peer. + Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses { + peer_id, + addresses, + dial_concurrency_factor_override, + }) => ( + Some(peer_id), + Either::Left(addresses.into_iter()), + dial_concurrency_factor_override, + ), + // Dial an unknown peer. + Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { address }) => { + // If the address ultimately encapsulates an expected peer ID, dial that peer + // such that any mismatch is detected. We do not "pop off" the `P2p` protocol + // from the address, because it may be used by the `Transport`, i.e. `P2p` + // is a protocol component that can influence any transport, like `libp2p-dns`. + let peer_id = match address + .iter() + .last() + .and_then(|p| { + if let multiaddr::Protocol::P2p(ma) = p { + Some(PeerId::try_from(ma)) + } else { + None + } + }) + .transpose() + { + Ok(peer_id) => peer_id, + Err(_) => return Err(DialError::InvalidPeerId { handler }), + }; + + (peer_id, Either::Right(std::iter::once(address)), None) } - } + }; self.pool.add_outgoing( self.transport().clone(), - std::iter::once(address.clone()), - None, + addresses, + peer_id, handler, + dial_concurrency_factor_override, ) } - /// Initiates a connection attempt to a known peer. - fn dial_peer( - &mut self, - opts: DialingOpts, - ) -> Result> - where - I: Iterator + Send + 'static, - TTrans: Transport + Send, - TTrans::Output: Send + 'static, - TTrans::Dial: Send + 'static, - TTrans::Error: Send + 'static, - { - let id = self.pool.add_outgoing( - self.transport().clone(), - opts.addresses, - Some(opts.peer), - opts.handler, - )?; - - Ok(id) - } - /// Returns information about the state of the `Network`. pub fn info(&self) -> NetworkInfo { let num_peers = self.pool.num_peers(); @@ -463,14 +468,6 @@ where } } -/// Options for a dialing attempt (i.e. repeated connection attempt -/// via a list of address) to a peer. -struct DialingOpts { - peer: PeerId, - handler: THandler, - addresses: I, -} - /// Information about the network obtained by [`Network::info()`]. #[derive(Clone, Debug)] pub struct NetworkInfo { @@ -560,7 +557,7 @@ impl NetworkConfig { } /// Possible (synchronous) errors when dialing a peer. -#[derive(Clone)] +#[derive(Debug, Clone, Error)] pub enum DialError { /// The dialing attempt is rejected because of a connection limit. ConnectionLimit { @@ -568,23 +565,114 @@ pub enum DialError { handler: THandler, }, /// The dialing attempt is rejected because the peer being dialed is the local peer. - LocalPeerId { handler: THandler }, + LocalPeerId { + handler: THandler, + }, + InvalidPeerId { + handler: THandler, + }, } -impl fmt::Debug for DialError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match self { - DialError::ConnectionLimit { limit, handler: _ } => f - .debug_struct("DialError::ConnectionLimit") - .field("limit", limit) - .finish(), - DialError::LocalPeerId { handler: _ } => { - f.debug_struct("DialError::LocalPeerId").finish() - } +/// Options to configure a dial to a known or unknown peer. +/// +/// Used in [`Network::dial`]. +/// +/// To construct use either of: +/// +/// - [`DialOpts::peer_id`] dialing a known peer +/// +/// - [`DialOpts::unknown_peer_id`] dialing an unknown peer +#[derive(Debug, Clone, PartialEq)] +pub struct DialOpts(pub(super) Opts); + +impl DialOpts { + /// Dial a known peer. + pub fn peer_id(peer_id: PeerId) -> WithPeerId { + WithPeerId { peer_id } + } + + /// Dial an unknown peer. + pub fn unknown_peer_id() -> WithoutPeerId { + WithoutPeerId {} + } +} + +impl From for DialOpts { + fn from(address: Multiaddr) -> Self { + DialOpts::unknown_peer_id().address(address).build() + } +} + +/// Internal options type. +/// +/// Not to be constructed manually. Use either of the below instead: +/// +/// - [`DialOpts::peer_id`] dialing a known peer +/// - [`DialOpts::unknown_peer_id`] dialing an unknown peer +#[derive(Debug, Clone, PartialEq)] +pub(super) enum Opts { + WithPeerIdWithAddresses(WithPeerIdWithAddresses), + WithoutPeerIdWithAddress(WithoutPeerIdWithAddress), +} + +#[derive(Debug, Clone, PartialEq)] +pub struct WithPeerId { + pub(crate) peer_id: PeerId, +} + +impl WithPeerId { + /// Specify a set of addresses to be used to dial the known peer. + pub fn addresses(self, addresses: Vec) -> WithPeerIdWithAddresses { + WithPeerIdWithAddresses { + peer_id: self.peer_id, + addresses, + dial_concurrency_factor_override: Default::default(), } } } +#[derive(Debug, Clone, PartialEq)] +pub struct WithPeerIdWithAddresses { + pub(crate) peer_id: PeerId, + pub(crate) addresses: Vec, + pub(crate) dial_concurrency_factor_override: Option, +} + +impl WithPeerIdWithAddresses { + /// Override [`NetworkConfig::with_dial_concurrency_factor`]. + pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self { + self.dial_concurrency_factor_override = Some(factor); + self + } + + /// Build the final [`DialOpts`]. + pub fn build(self) -> DialOpts { + DialOpts(Opts::WithPeerIdWithAddresses(self)) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct WithoutPeerId {} + +impl WithoutPeerId { + /// Specify a single address to dial the unknown peer. + pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress { + WithoutPeerIdWithAddress { address } + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct WithoutPeerIdWithAddress { + pub(crate) address: Multiaddr, +} + +impl WithoutPeerIdWithAddress { + /// Build the final [`DialOpts`]. + pub fn build(self) -> DialOpts { + DialOpts(Opts::WithoutPeerIdWithAddress(self)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/network/peer.rs b/core/src/network/peer.rs index 3a286c56..c797dae5 100644 --- a/core/src/network/peer.rs +++ b/core/src/network/peer.rs @@ -18,13 +18,13 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use super::{DialError, DialingOpts, Network}; +use super::Network; use crate::{ connection::{ handler::THandlerInEvent, pool::Pool, ConnectionHandler, ConnectionId, EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler, PendingConnection, }, - Multiaddr, PeerId, Transport, + PeerId, Transport, }; use std::{collections::VecDeque, error, fmt}; @@ -95,7 +95,10 @@ where } fn disconnected(network: &'a mut Network, peer_id: PeerId) -> Self { - Peer::Disconnected(DisconnectedPeer { network, peer_id }) + Peer::Disconnected(DisconnectedPeer { + _network: network, + peer_id, + }) } fn connected(network: &'a mut Network, peer_id: PeerId) -> Self { @@ -149,38 +152,6 @@ where matches!(self, Peer::Disconnected(..)) } - /// Initiates a new dialing attempt to this peer using the given addresses. - /// - /// The connection ID of the first connection attempt, i.e. to `address`, - /// is returned, together with a [`DialingPeer`] for further use. The - /// `remaining` addresses are tried in order in subsequent connection - /// attempts in the context of the same dialing attempt, if the connection - /// attempt to the first address fails. - pub fn dial( - self, - addresses: I, - handler: THandler, - ) -> Result<(ConnectionId, DialingPeer<'a, TTrans, THandler>), DialError> - where - I: IntoIterator, - I::IntoIter: Send + 'static, - { - let (peer_id, network) = match self { - Peer::Connected(p) => (p.peer_id, p.network), - Peer::Dialing(p) => (p.peer_id, p.network), - Peer::Disconnected(p) => (p.peer_id, p.network), - Peer::Local => return Err(DialError::LocalPeerId { handler }), - }; - - let id = network.dial_peer(DialingOpts { - peer: peer_id, - handler, - addresses: addresses.into_iter(), - })?; - - Ok((id, DialingPeer { network, peer_id })) - } - /// Converts the peer into a `ConnectedPeer`, if an established connection exists. /// /// Succeeds if the there is at least one established connection to the peer. @@ -294,7 +265,7 @@ where pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, THandler> { self.network.disconnect(&self.peer_id); DisconnectedPeer { - network: self.network, + _network: self.network, peer_id: self.peer_id, } } @@ -354,7 +325,7 @@ where pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, THandler> { self.network.disconnect(&self.peer_id); DisconnectedPeer { - network: self.network, + _network: self.network, peer_id: self.peer_id, } } @@ -432,7 +403,7 @@ where THandler: IntoConnectionHandler, { peer_id: PeerId, - network: &'a mut Network, + _network: &'a mut Network, } impl<'a, TTrans, THandler> fmt::Debug for DisconnectedPeer<'a, TTrans, THandler> diff --git a/core/tests/concurrent_dialing.rs b/core/tests/concurrent_dialing.rs index 08b1b578..e708be91 100644 --- a/core/tests/concurrent_dialing.rs +++ b/core/tests/concurrent_dialing.rs @@ -26,7 +26,7 @@ use futures::ready; use libp2p_core::{ multiaddr::Protocol, network::{NetworkConfig, NetworkEvent}, - ConnectedPoint, + ConnectedPoint, DialOpts, }; use quickcheck::*; use rand07::Rng; @@ -73,8 +73,12 @@ fn concurrent_dialing() { // Have network 2 dial network 1 and wait for network 1 to receive the incoming // connections. network_2 - .peer(*network_1.local_peer_id()) - .dial(network_1_listen_addresses.clone(), TestHandler()) + .dial( + TestHandler(), + DialOpts::peer_id(*network_1.local_peer_id()) + .addresses(network_1_listen_addresses.clone().into()) + .build(), + ) .unwrap(); let mut network_1_incoming_connections = Vec::new(); for i in 0..concurrency_factor.0.get() { diff --git a/core/tests/connection_limits.rs b/core/tests/connection_limits.rs index bee42b53..0d0cb388 100644 --- a/core/tests/connection_limits.rs +++ b/core/tests/connection_limits.rs @@ -25,7 +25,7 @@ use libp2p_core::multiaddr::{multiaddr, Multiaddr}; use libp2p_core::{ connection::PendingConnectionError, network::{ConnectionLimits, DialError, NetworkConfig, NetworkEvent}, - PeerId, + DialOpts, PeerId, }; use quickcheck::*; use std::task::Poll; @@ -46,15 +46,23 @@ fn max_outgoing() { let target = PeerId::random(); for _ in 0..outgoing_limit { network - .peer(target.clone()) - .dial(vec![addr.clone()], TestHandler()) + .dial( + TestHandler(), + DialOpts::peer_id(target) + .addresses(vec![addr.clone()]) + .build(), + ) .ok() .expect("Unexpected connection limit."); } match network - .peer(target.clone()) - .dial(vec![addr.clone()], TestHandler()) + .dial( + TestHandler(), + DialOpts::peer_id(target) + .addresses(vec![addr.clone()]) + .build(), + ) .expect_err("Unexpected dialing success.") { DialError::ConnectionLimit { limit, handler: _ } => { @@ -122,7 +130,7 @@ fn max_established_incoming() { // Spawn and block on the dialer. async_std::task::block_on({ let mut n = 0; - let _ = network2.dial(&listen_addr, TestHandler()).unwrap(); + let _ = network2.dial(TestHandler(), listen_addr.clone()).unwrap(); let mut expected_closed = false; let mut network_1_established = false; @@ -188,7 +196,7 @@ fn max_established_incoming() { if n <= limit { // Dial again until the limit is exceeded. n += 1; - network2.dial(&listen_addr, TestHandler()).unwrap(); + network2.dial(TestHandler(), listen_addr.clone()).unwrap(); if n == limit { // The the next dialing attempt exceeds the limit, this diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 827db92e..330061bb 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -22,6 +22,7 @@ mod util; use futures::prelude::*; use libp2p_core::multiaddr::multiaddr; +use libp2p_core::DialOpts; use libp2p_core::{ connection::PendingConnectionError, multiaddr::Protocol, @@ -50,8 +51,12 @@ fn deny_incoming_connec() { })); swarm2 - .peer(swarm1.local_peer_id().clone()) - .dial(vec![address.clone()], TestHandler()) + .dial( + TestHandler(), + DialOpts::peer_id(*swarm1.local_peer_id()) + .addresses(vec![address.clone()]) + .build(), + ) .unwrap(); async_std::task::block_on(future::poll_fn(|cx| -> Poll> { @@ -106,7 +111,7 @@ fn dial_self() { _ => panic!("Was expecting the listen address to be reported"), })); - swarm.dial(&local_address, TestHandler()).unwrap(); + swarm.dial(TestHandler(), local_address.clone()).unwrap(); let mut got_dial_err = false; let mut got_inc_err = false; @@ -174,8 +179,12 @@ fn multiple_addresses_err() { addresses.shuffle(&mut rand::thread_rng()); swarm - .peer(target.clone()) - .dial(addresses.clone(), TestHandler()) + .dial( + TestHandler(), + DialOpts::peer_id(target.clone()) + .addresses(addresses.clone()) + .build(), + ) .unwrap(); async_std::task::block_on(future::poll_fn(|cx| -> Poll> { diff --git a/misc/multistream-select/tests/transport.rs b/misc/multistream-select/tests/transport.rs index 1c48af37..9fa197e1 100644 --- a/misc/multistream-select/tests/transport.rs +++ b/misc/multistream-select/tests/transport.rs @@ -75,7 +75,7 @@ fn transport_upgrade() { let client = async move { let addr = addr_receiver.await.unwrap(); - dialer.dial(&addr, TestHandler()).unwrap(); + dialer.dial(TestHandler(), addr).unwrap(); futures::future::poll_fn(move |cx| loop { match ready!(dialer.poll(cx)) { NetworkEvent::ConnectionEstablished { .. } => return Poll::Ready(()), diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 49f42931..a97ea499 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -14,12 +14,16 @@ - Implement `swarm::NetworkBehaviour` on `either::Either` (see [PR 2370]). +- Enable overriding _dial concurrency factor_ per dial via + `DialOpts::override_dial_concurrency_factor`. See [PR 2404]. + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2362]: https://github.com/libp2p/rust-libp2p/pull/2362 [PR 2370]: https://github.com/libp2p/rust-libp2p/pull/2370 [PR 2375]: https://github.com/libp2p/rust-libp2p/pull/2375 [PR 2378]: https://github.com/libp2p/rust-libp2p/pull/2378 +[PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404 # 0.32.0 [2021-11-16] diff --git a/swarm/src/dial_opts.rs b/swarm/src/dial_opts.rs index ae41be17..e98092a7 100644 --- a/swarm/src/dial_opts.rs +++ b/swarm/src/dial_opts.rs @@ -20,6 +20,7 @@ // DEALINGS IN THE SOFTWARE. use libp2p_core::{Multiaddr, PeerId}; +use std::num::NonZeroU8; /// Options to configure a dial to a known or unknown peer. /// @@ -50,6 +51,7 @@ impl DialOpts { WithPeerId { peer_id, condition: Default::default(), + dial_concurrency_factor_override: Default::default(), } } @@ -106,6 +108,7 @@ pub(super) enum Opts { pub struct WithPeerId { pub(crate) peer_id: PeerId, pub(crate) condition: PeerCondition, + pub(crate) dial_concurrency_factor_override: Option, } impl WithPeerId { @@ -115,6 +118,13 @@ impl WithPeerId { self } + /// Override + /// [`NetworkConfig::with_dial_concurrency_factor`](libp2p_core::network::NetworkConfig::with_dial_concurrency_factor). + pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self { + self.dial_concurrency_factor_override = Some(factor); + self + } + /// Specify a set of addresses to be used to dial the known peer. pub fn addresses(self, addresses: Vec) -> WithPeerIdWithAddresses { WithPeerIdWithAddresses { @@ -122,6 +132,7 @@ impl WithPeerId { condition: self.condition, addresses, extend_addresses_through_behaviour: false, + dial_concurrency_factor_override: self.dial_concurrency_factor_override, } } @@ -140,6 +151,7 @@ pub struct WithPeerIdWithAddresses { pub(crate) condition: PeerCondition, pub(crate) addresses: Vec, pub(crate) extend_addresses_through_behaviour: bool, + pub(crate) dial_concurrency_factor_override: Option, } impl WithPeerIdWithAddresses { @@ -156,6 +168,13 @@ impl WithPeerIdWithAddresses { self } + /// Override + /// [`NetworkConfig::with_dial_concurrency_factor`](libp2p_core::network::NetworkConfig::with_dial_concurrency_factor). + pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self { + self.dial_concurrency_factor_override = Some(factor); + self + } + /// Build the final [`DialOpts`]. pub fn build(self) -> DialOpts { DialOpts(Opts::WithPeerIdWithAddresses(self)) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 8bebe3f0..97043240 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -359,15 +359,20 @@ where fn dial_with_handler( &mut self, - opts: DialOpts, + swarm_dial_opts: DialOpts, handler: ::ProtocolsHandler, ) -> Result<(), DialError> { - match opts.0 { + let core_dial_opts = match swarm_dial_opts.0 { // Dial a known peer. - dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { peer_id, condition }) + dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { + peer_id, + condition, + dial_concurrency_factor_override, + }) | dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses { peer_id, condition, + dial_concurrency_factor_override, .. }) => { // Check [`PeerCondition`] if provided. @@ -396,7 +401,7 @@ where // Retrieve the addresses to dial. let addresses = { - let mut addresses = match opts.0 { + let mut addresses = match swarm_dial_opts.0 { dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { .. }) => { self.behaviour.addresses_of_peer(&peer_id) } @@ -433,47 +438,33 @@ where addresses }; - let handler = handler - .into_node_handler_builder() - .with_substream_upgrade_protocol_override( - self.substream_upgrade_protocol_override, - ); + let mut opts = libp2p_core::DialOpts::peer_id(peer_id).addresses(addresses); - match self.network.peer(peer_id).dial(addresses, handler) { - Ok(_connection_id) => Ok(()), - Err(error) => { - let (error, handler) = DialError::from_network_dial_error(error); - self.behaviour.inject_dial_failure( - Some(peer_id), - handler.into_protocols_handler(), - &error, - ); - return Err(error); - } + if let Some(f) = dial_concurrency_factor_override { + opts = opts.override_dial_concurrency_factor(f); } + + opts.build() } // Dial an unknown peer. dial_opts::Opts::WithoutPeerIdWithAddress(dial_opts::WithoutPeerIdWithAddress { address, - }) => { - let handler = handler - .into_node_handler_builder() - .with_substream_upgrade_protocol_override( - self.substream_upgrade_protocol_override, - ); + }) => libp2p_core::DialOpts::unknown_peer_id() + .address(address) + .build(), + }; - match self.network.dial(&address, handler).map(|_id| ()) { - Ok(_connection_id) => Ok(()), - Err(error) => { - let (error, handler) = DialError::from_network_dial_error(error); - self.behaviour.inject_dial_failure( - None, - handler.into_protocols_handler(), - &error, - ); - return Err(error); - } - } + let handler = handler + .into_node_handler_builder() + .with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override); + + match self.network.dial(handler, core_dial_opts).map(|_id| ()) { + Ok(_connection_id) => Ok(()), + Err(error) => { + let (error, handler) = DialError::from_network_dial_error(error); + self.behaviour + .inject_dial_failure(None, handler.into_protocols_handler(), &error); + return Err(error); } } } @@ -1334,8 +1325,9 @@ pub enum DialError { DialPeerConditionFalse(dial_opts::PeerCondition), /// Pending connection attempt has been aborted. Aborted, - /// The peer identity obtained on the connection did not - /// match the one that was expected or is otherwise invalid. + /// The provided peer identity is invalid or the peer identity obtained on + /// the connection did not match the one that was expected or is otherwise + /// invalid. InvalidPeerId, /// An I/O error occurred on the connection. ConnectionIo(io::Error), @@ -1350,6 +1342,7 @@ impl DialError { (DialError::ConnectionLimit(limit), handler) } network::DialError::LocalPeerId { handler } => (DialError::LocalPeerId, handler), + network::DialError::InvalidPeerId { handler } => (DialError::InvalidPeerId, handler), } } }