From ce2cb5c309865381b123c3d806d4fac9467629a7 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Tue, 8 Nov 2022 17:46:03 +0100 Subject: [PATCH 01/28] Initial prototype commit --- Cargo.toml | 4 +- core/Cargo.toml | 11 ++- core/src/lib.rs | 40 +++++++++ examples/chat-tokio.rs | 23 ++--- examples/chat.rs | 2 +- swarm/Cargo.toml | 4 + swarm/src/connection/pool.rs | 169 +++++++++++++++++++++++++++-------- swarm/src/lib.rs | 113 ++++++++++++++++------- 8 files changed, 271 insertions(+), 95 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e0086fe5e01..3958f277edb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,8 +75,8 @@ secp256k1 = ["libp2p-core/secp256k1"] rsa = ["libp2p-core/rsa"] ecdsa = ["libp2p-core/ecdsa"] serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"] -tokio = ["libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio"] -async-std = ["libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std"] +tokio = ["libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-swarm/tokio", "libp2p-core/tokio"] +async-std = ["libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-swarm/async-std", "libp2p-core/async-std"] [dependencies] bytes = "1" diff --git a/core/Cargo.toml b/core/Cargo.toml index 07cec58c002..54fd3492803 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -38,6 +38,9 @@ unsigned-varint = "0.7" void = "1" zeroize = "1" serde = { version = "1", optional = true, features = ["derive"] } +# TODO make optional before commit +tokio = { version = "1.15", features = ["rt"], optional = true } +async-std = { version = "1.6.2", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false, optional = true} @@ -56,10 +59,12 @@ serde_json = "1.0" prost-build = "0.11" [features] -secp256k1 = [ "libsecp256k1" ] -ecdsa = [ "p256" ] -rsa = [ "dep:ring" ] +secp256k1 = ["libsecp256k1"] +ecdsa = ["p256"] +rsa = ["dep:ring"] serde = ["multihash/serde-codec", "dep:serde"] +tokio = ["dep:tokio"] +async-std = ["dep:async-std"] [[bench]] name = "peer_id" diff --git a/core/src/lib.rs b/core/src/lib.rs index c7b9aa6068c..22c9e567a1a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -51,6 +51,7 @@ mod peer_record_proto { include!(concat!(env!("OUT_DIR"), "/peer_record_proto.rs")); } +use futures::executor::ThreadPool; /// Multi-address re-export. pub use multiaddr; pub type Negotiated = multistream_select::Negotiated; @@ -102,3 +103,42 @@ impl + Send>>)> Executor for F { self(f) } } + +impl Executor for ThreadPool { + fn exec(&self, future: Pin + Send>>) { + self.spawn_ok(future) + } +} + +#[cfg(feature = "tokio")] +#[derive(Clone, Copy, Debug, Default)] +pub enum TokioExecutor<'a> { + #[default] + Empty, + Given(&'a tokio::runtime::Runtime), +} + +#[cfg(feature = "tokio")] +impl<'a> Executor for TokioExecutor<'a> { + fn exec(&self, future: Pin + Send>>) { + match self { + Self::Given(runtime) => { + let _ = runtime.spawn(future); + } + Self::Empty => { + let _ = tokio::spawn(future); + } + } + } +} + +#[cfg(feature = "async-std")] +#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct AsyncStdExecutor; + +#[cfg(feature = "async-std")] +impl Executor for AsyncStdExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = async_std::task::spawn(future); + } +} diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index d2e5b63a357..1b2b06d3424 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -39,7 +39,7 @@ use libp2p::{ TokioMdns, }, mplex, noise, - swarm::{SwarmBuilder, SwarmEvent}, + swarm::SwarmEvent, tcp, Multiaddr, NetworkBehaviour, PeerId, Transport, }; use std::error::Error; @@ -97,23 +97,12 @@ async fn main() -> Result<(), Box> { } // Create a Swarm to manage peers and events. - let mut swarm = { - let mdns = TokioMdns::new(Default::default())?; - let mut behaviour = MyBehaviour { - floodsub: Floodsub::new(peer_id), - mdns, - }; - - behaviour.floodsub.subscribe(floodsub_topic.clone()); - - SwarmBuilder::new(transport, behaviour, peer_id) - // We want the connection background tasks to be spawned - // onto the tokio runtime. - .executor(Box::new(|fut| { - tokio::spawn(fut); - })) - .build() + let mdns = TokioMdns::new(Default::default())?; + let behaviour = MyBehaviour { + floodsub: Floodsub::new(peer_id), + mdns, }; + let mut swarm = libp2p_swarm::TokioSwarm::new(transport, behaviour, peer_id); // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { diff --git a/examples/chat.rs b/examples/chat.rs index e5368b49e80..fa504066e9c 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -115,7 +115,7 @@ async fn main() -> Result<(), Box> { }; behaviour.floodsub.subscribe(floodsub_topic.clone()); - Swarm::new(transport, behaviour, local_peer_id) + Swarm::<_, libp2p_core::AsyncStdExecutor>::new(transport, behaviour, local_peer_id) }; // Reach out to another node if specified diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 75dd0968b16..bb849a298e8 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -30,6 +30,10 @@ env_logger = "0.9" libp2p = { path = "..", features = ["full"] } quickcheck = { package = "quickcheck-ext", path = "../misc/quickcheck-ext" } +[features] +tokio = ["libp2p-core/tokio"] +async-std = ["libp2p-core/async-std"] + # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index cc6a9bbd816..2f2a2c5afd9 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -31,6 +31,7 @@ use crate::{ }; use concurrent_dial::ConcurrentDial; use fnv::FnvHashMap; +use futures::executor::ThreadPool; use futures::prelude::*; use futures::{ channel::{mpsc, oneshot}, @@ -54,10 +55,44 @@ use void::Void; mod concurrent_dial; mod task; +enum ExecSwitch +where + TExecutor: Executor, +{ + Executor(TExecutor), + LocalSpawn(FuturesUnordered + Send>>>), +} + +impl ExecSwitch +where + TExecutor: Executor, +{ + // advance the local queue + #[inline] + fn advance_local(&mut self, cx: &mut Context) { + match self { + ExecSwitch::Executor(_) => {} + ExecSwitch::LocalSpawn(local) => { + while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {} + } + } + } + + #[inline] + fn spawn(&mut self, task: BoxFuture<'static, ()>) { + match self { + Self::Executor(executor) => executor.exec(task), + Self::LocalSpawn(local) => local.push(task), + } + } +} + /// A connection `Pool` manages a set of connections for each peer. -pub struct Pool +pub struct Pool where TTrans: Transport, + TExecutor: Executor, + THandler: IntoConnectionHandler, { local_id: PeerId, @@ -93,14 +128,9 @@ where /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, - /// The executor to use for running the background tasks. If `None`, - /// the tasks are kept in `local_spawns` instead and polled on the - /// current thread when the [`Pool`] is polled for new events. - executor: Option>, - - /// If no `executor` is configured, tasks are kept in this set and - /// polled on the current thread when the [`Pool`] is polled for new events. - local_spawns: FuturesUnordered + Send>>>, + /// The executor to use for running the background tasks. Can either be an advanced executor + /// or a local queue. + executor: ExecSwitch, /// Sender distributed to pending tasks for reporting events back /// to the pool. @@ -191,7 +221,9 @@ impl PendingConnection { } } -impl fmt::Debug for Pool { +impl fmt::Debug + for Pool +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("Pool") .field("counters", &self.counters) @@ -288,17 +320,22 @@ where }, } -impl Pool +impl Pool where THandler: IntoConnectionHandler, TTrans: Transport, + TExecutor: Executor, { /// Creates a new empty `Pool`. - pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { + pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(config.task_event_buffer_size); let (established_connection_events_tx, established_connection_events_rx) = mpsc::channel(config.task_event_buffer_size); + let executor = match config.executor { + Some(exec) => ExecSwitch::Executor(exec), + None => ExecSwitch::LocalSpawn(Default::default()), + }; Pool { local_id, counters: ConnectionCounters::new(limits), @@ -309,8 +346,7 @@ where dial_concurrency_factor: config.dial_concurrency_factor, substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, - executor: config.executor, - local_spawns: FuturesUnordered::new(), + executor, pending_connection_events_tx, pending_connection_events_rx, established_connection_events_tx, @@ -399,17 +435,14 @@ where } fn spawn(&mut self, task: BoxFuture<'static, ()>) { - if let Some(executor) = &mut self.executor { - executor.exec(task); - } else { - self.local_spawns.push(task); - } + self.executor.spawn(task) } } -impl Pool +impl Pool where THandler: IntoConnectionHandler, + TExecutor: Executor, TTrans: Transport + 'static, TTrans::Output: Send + 'static, TTrans::Error: Send + 'static, @@ -821,7 +854,7 @@ where } // Advance the tasks in `local_spawns`. - while let Poll::Ready(Some(())) = self.local_spawns.poll_next_unpin(cx) {} + self.executor.advance_local(cx); Poll::Pending } @@ -1050,9 +1083,9 @@ impl ConnectionLimits { /// /// The default configuration specifies no dedicated task executor, a /// task event buffer size of 32, and a task command buffer size of 7. -pub struct PoolConfig { +pub struct PoolConfig { /// Executor to use to spawn tasks. - pub executor: Option>, + pub executor: Option, /// Size of the task command buffer (per task). pub task_command_buffer_size: usize, @@ -1073,10 +1106,10 @@ pub struct PoolConfig { max_negotiating_inbound_streams: usize, } -impl Default for PoolConfig { - fn default() -> Self { +macro_rules! impl_pool_config_default { + ($executor:ident) => { PoolConfig { - executor: None, + $executor, task_event_buffer_size: 32, task_command_buffer_size: 7, // Set to a default of 8 based on frequency of dialer connections @@ -1084,24 +1117,51 @@ impl Default for PoolConfig { substream_upgrade_protocol_override: None, max_negotiating_inbound_streams: 128, } + }; +} + +impl Default for PoolConfig { + fn default() -> Self { + let executor = ThreadPool::new().ok(); + impl_pool_config_default!(executor) } } -impl PoolConfig { - /// Configures the executor to use for spawning connection background tasks. - pub fn with_executor(mut self, e: Box) -> Self { - self.executor = Some(e); - self +#[cfg(feature = "tokio")] +impl<'a> Default for PoolConfig> { + fn default() -> Self { + let executor = Some(libp2p_core::TokioExecutor::default()); + impl_pool_config_default!(executor) } +} - /// Configures the executor to use for spawning connection background tasks, - /// only if no executor has already been configured. - pub fn or_else_with_executor(mut self, f: F) -> Self - where - F: FnOnce() -> Option>, - { - self.executor = self.executor.or_else(f); - self +#[cfg(feature = "async-std")] +impl Default for PoolConfig { + fn default() -> Self { + let executor = Some(libp2p_core::AsyncStdExecutor::default()); + impl_pool_config_default!(executor) + } +} + +impl PoolConfig { + /// Configures the executor to use for spawning connection background tasks. + pub fn with_executor(self, executor: NExecutor) -> PoolConfig { + let PoolConfig { + task_command_buffer_size, + task_event_buffer_size, + dial_concurrency_factor, + substream_upgrade_protocol_override, + max_negotiating_inbound_streams, + .. + } = self; + PoolConfig { + executor: Some(executor), + task_command_buffer_size, + task_event_buffer_size, + dial_concurrency_factor, + substream_upgrade_protocol_override, + max_negotiating_inbound_streams, + } } /// Sets the maximum number of events sent to a connection's background task @@ -1151,6 +1211,37 @@ impl PoolConfig { } } +impl PoolConfig +where + TExecutor: Executor + Send + 'static, +{ + /// Configures the executor to use for spawning connection background tasks, + /// only if no executor has already been configured. + pub fn or_else_with_executor(self, f: F) -> PoolConfig> + where + F: FnOnce() -> Option>, + { + let PoolConfig { + task_command_buffer_size, + task_event_buffer_size, + dial_concurrency_factor, + substream_upgrade_protocol_override, + max_negotiating_inbound_streams, + .. + } = self; + let executor: Option> = + self.executor.map_or_else(f, |e| Some(Box::new(e))); + PoolConfig { + executor, + task_command_buffer_size, + task_event_buffer_size, + dial_concurrency_factor, + substream_upgrade_protocol_override, + max_negotiating_inbound_streams, + } + } +} + trait EntryExt<'a, K, V> { fn expect_occupied(self, msg: &'static str) -> hash_map::OccupiedEntry<'a, K, V>; } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 6fbff5db6b4..50867877614 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -86,7 +86,7 @@ use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent}; use connection::IncomingInfo; use dial_opts::{DialOpts, PeerCondition}; use either::Either; -use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; +use futures::{prelude::*, stream::FusedStream}; use libp2p_core::connection::ConnectionId; use libp2p_core::muxing::SubstreamBox; use libp2p_core::{ @@ -258,15 +258,16 @@ pub enum SwarmEvent { /// /// Note: Needs to be polled via `` in order to make /// progress. -pub struct Swarm +pub struct Swarm where TBehaviour: NetworkBehaviour, + TExecutor: Executor, { /// [`Transport`] for dialing remote peers and listening for incoming connection. transport: transport::Boxed<(PeerId, StreamMuxerBox)>, /// The nodes currently active. - pool: Pool, transport::Boxed<(PeerId, StreamMuxerBox)>>, + pool: Pool, transport::Boxed<(PeerId, StreamMuxerBox)>, TExecutor>, /// The local peer ID. local_peer_id: PeerId, @@ -300,11 +301,24 @@ where pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent)>, } -impl Unpin for Swarm where TBehaviour: NetworkBehaviour {} +#[cfg(feature = "tokio")] +pub type TokioSwarm<'a, TBehaviour> = Swarm>; -impl Swarm +#[cfg(feature = "async-std")] +pub type AsyncStdSwarm = Swarm; + +impl Unpin for Swarm where TBehaviour: NetworkBehaviour, + TExecutor: Executor, +{ +} + +impl Swarm +where + TBehaviour: NetworkBehaviour, + TExecutor: Executor, + PoolConfig: Default, { /// Builds a new `Swarm`. pub fn new( @@ -314,7 +328,13 @@ where ) -> Self { SwarmBuilder::new(transport, behaviour, local_peer_id).build() } +} +impl Swarm +where + TBehaviour: NetworkBehaviour, + TExecutor: Executor, +{ /// Returns information about the connections underlying the [`Swarm`]. pub fn network_info(&self) -> NetworkInfo { let num_peers = self.pool.num_peers(); @@ -1045,7 +1065,12 @@ where } } PendingNotifyHandler::Any(ids) => { - match notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) { + match notify_any::<_, _, TBehaviour, TExecutor>( + ids, + &mut this.pool, + event, + cx, + ) { None => continue, Some((event, ids)) => { let handler = PendingNotifyHandler::Any(ids); @@ -1154,9 +1179,9 @@ fn notify_one( /// /// Returns `None` if either all connections are closing or the event /// was successfully sent to a handler, in either case the event is consumed. -fn notify_any( +fn notify_any( ids: SmallVec<[ConnectionId; 10]>, - pool: &mut Pool, + pool: &mut Pool, event: THandlerInEvent, cx: &mut Context<'_>, ) -> Option<(THandlerInEvent, SmallVec<[ConnectionId; 10]>)> @@ -1164,6 +1189,7 @@ where TTrans: Transport, TTrans::Error: Send + 'static, TBehaviour: NetworkBehaviour, + TExecutor: Executor, THandler: IntoConnectionHandler, THandler::Handler: ConnectionHandler< InEvent = THandlerInEvent, @@ -1205,9 +1231,10 @@ where /// /// Note: This stream is infinite and it is guaranteed that /// [`Stream::poll_next`] will never return `Poll::Ready(None)`. -impl Stream for Swarm +impl Stream for Swarm where TBehaviour: NetworkBehaviour, + TExecutor: Executor, { type Item = SwarmEvent, THandlerErr>; @@ -1217,9 +1244,10 @@ where } /// The stream of swarm events never terminates, so we can implement fused for it. -impl FusedStream for Swarm +impl FusedStream for Swarm where TBehaviour: NetworkBehaviour, + TExecutor: Executor, { fn is_terminated(&self) -> bool { false @@ -1258,17 +1286,29 @@ impl<'a> PollParameters for SwarmPollParameters<'a> { } /// A [`SwarmBuilder`] provides an API for configuring and constructing a [`Swarm`]. -pub struct SwarmBuilder { +pub struct SwarmBuilder +where + TExecutor: Executor, +{ local_peer_id: PeerId, transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, - pool_config: PoolConfig, + pool_config: PoolConfig, connection_limits: ConnectionLimits, } -impl SwarmBuilder +#[cfg(feature = "tokio")] +pub type TokioSwarmBuilder<'a, TBehaviour> = + SwarmBuilder>; + +#[cfg(feature = "async-std")] +pub type AsyncStdSwarmBuilder = SwarmBuilder; + +impl SwarmBuilder where TBehaviour: NetworkBehaviour, + TExecutor: Executor, + PoolConfig: Default, { /// Creates a new `SwarmBuilder` from the given transport, behaviour and /// local peer ID. The `Swarm` with its underlying `Network` is obtained @@ -1286,15 +1326,36 @@ where connection_limits: Default::default(), } } +} +impl SwarmBuilder +where + TBehaviour: NetworkBehaviour, + TExecutor: Executor, +{ /// Configures the `Executor` to use for spawning background tasks. /// /// By default, unless another executor has been configured, /// [`SwarmBuilder::build`] will try to set up a /// [`ThreadPool`](futures::executor::ThreadPool). - pub fn executor(mut self, e: Box) -> Self { - self.pool_config = self.pool_config.with_executor(e); - self + pub fn executor(self, executor: NExecutor) -> SwarmBuilder + where + NExecutor: Executor, + { + let Self { + local_peer_id, + transport, + behaviour, + pool_config, + connection_limits, + } = self; + SwarmBuilder { + local_peer_id, + transport, + behaviour, + pool_config: pool_config.with_executor(executor), + connection_limits, + } } /// Configures the number of events from the [`NetworkBehaviour`] in @@ -1381,7 +1442,7 @@ where } /// Builds a `Swarm` with the current configuration. - pub fn build(mut self) -> Swarm { + pub fn build(mut self) -> Swarm { let supported_protocols = self .behaviour .new_handler() @@ -1391,25 +1452,10 @@ where .map(|info| info.protocol_name().to_vec()) .collect(); - // If no executor has been explicitly configured, try to set up a thread pool. - let pool_config = - self.pool_config.or_else_with_executor(|| { - match ThreadPoolBuilder::new() - .name_prefix("libp2p-swarm-task-") - .create() - { - Ok(tp) => Some(Box::new(move |f| tp.spawn_ok(f))), - Err(err) => { - log::warn!("Failed to create executor thread pool: {:?}", err); - None - } - } - }); - Swarm { local_peer_id: self.local_peer_id, transport: self.transport, - pool: Pool::new(self.local_peer_id, pool_config, self.connection_limits), + pool: Pool::new(self.local_peer_id, self.pool_config, self.connection_limits), behaviour: self.behaviour, supported_protocols, listened_addrs: HashMap::new(), @@ -1565,6 +1611,7 @@ mod tests { use super::*; use crate::test::{CallTraceBehaviour, MockBehaviour}; use futures::executor::block_on; + use futures::executor::ThreadPool; use futures::future::poll_fn; use futures::future::Either; use futures::{executor, future, ready}; From b69dc357f0bfbb67392b06428f51937c04d3f185 Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Wed, 9 Nov 2022 10:00:32 +0100 Subject: [PATCH 02/28] Update swarm/src/connection/pool.rs Co-authored-by: Thomas Eizinger --- swarm/src/connection/pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 2f2a2c5afd9..64813659470 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -128,7 +128,7 @@ where /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, - /// The executor to use for running the background tasks. Can either be an advanced executor + /// The executor to use for running the background tasks. Can either be a global executor /// or a local queue. executor: ExecSwitch, From 12e0a32b43a2fb0784da727a20c44da04fb0a61c Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Wed, 9 Nov 2022 10:24:24 +0100 Subject: [PATCH 03/28] Moving tokio and async std executors --- Cargo.toml | 4 ++-- core/Cargo.toml | 5 ----- core/src/lib.rs | 32 --------------------------- swarm/Cargo.toml | 6 +++-- swarm/src/connection/pool.rs | 1 + swarm/src/connection/pool/executor.rs | 25 +++++++++++++++++++++ 6 files changed, 32 insertions(+), 41 deletions(-) create mode 100644 swarm/src/connection/pool/executor.rs diff --git a/Cargo.toml b/Cargo.toml index 3958f277edb..f920ff4d910 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,8 +75,8 @@ secp256k1 = ["libp2p-core/secp256k1"] rsa = ["libp2p-core/rsa"] ecdsa = ["libp2p-core/ecdsa"] serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"] -tokio = ["libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-swarm/tokio", "libp2p-core/tokio"] -async-std = ["libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-swarm/async-std", "libp2p-core/async-std"] +tokio = ["libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-swarm/tokio"] +async-std = ["libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-swarm/async-std"] [dependencies] bytes = "1" diff --git a/core/Cargo.toml b/core/Cargo.toml index 54fd3492803..bc2972c019b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -38,9 +38,6 @@ unsigned-varint = "0.7" void = "1" zeroize = "1" serde = { version = "1", optional = true, features = ["derive"] } -# TODO make optional before commit -tokio = { version = "1.15", features = ["rt"], optional = true } -async-std = { version = "1.6.2", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false, optional = true} @@ -63,8 +60,6 @@ secp256k1 = ["libsecp256k1"] ecdsa = ["p256"] rsa = ["dep:ring"] serde = ["multihash/serde-codec", "dep:serde"] -tokio = ["dep:tokio"] -async-std = ["dep:async-std"] [[bench]] name = "peer_id" diff --git a/core/src/lib.rs b/core/src/lib.rs index 22c9e567a1a..5012ecb4e21 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -110,35 +110,3 @@ impl Executor for ThreadPool { } } -#[cfg(feature = "tokio")] -#[derive(Clone, Copy, Debug, Default)] -pub enum TokioExecutor<'a> { - #[default] - Empty, - Given(&'a tokio::runtime::Runtime), -} - -#[cfg(feature = "tokio")] -impl<'a> Executor for TokioExecutor<'a> { - fn exec(&self, future: Pin + Send>>) { - match self { - Self::Given(runtime) => { - let _ = runtime.spawn(future); - } - Self::Empty => { - let _ = tokio::spawn(future); - } - } - } -} - -#[cfg(feature = "async-std")] -#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct AsyncStdExecutor; - -#[cfg(feature = "async-std")] -impl Executor for AsyncStdExecutor { - fn exec(&self, future: Pin + Send>>) { - let _ = async_std::task::spawn(future); - } -} diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index bb849a298e8..24c58110ac9 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -23,6 +23,8 @@ rand = "0.8" smallvec = "1.6.1" thiserror = "1.0" void = "1" +tokio = { version = "1.15", features = ["rt"], optional = true } +async-std = { version = "1.6.2", optional = true } [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } @@ -31,8 +33,8 @@ libp2p = { path = "..", features = ["full"] } quickcheck = { package = "quickcheck-ext", path = "../misc/quickcheck-ext" } [features] -tokio = ["libp2p-core/tokio"] -async-std = ["libp2p-core/async-std"] +tokio = ["dep:tokio"] +async-std = ["dep:async-std"] # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 64813659470..8ba06715c8a 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -54,6 +54,7 @@ use void::Void; mod concurrent_dial; mod task; +pub mod executor; enum ExecSwitch where diff --git a/swarm/src/connection/pool/executor.rs b/swarm/src/connection/pool/executor.rs new file mode 100644 index 00000000000..556e08d86d0 --- /dev/null +++ b/swarm/src/connection/pool/executor.rs @@ -0,0 +1,25 @@ +use std::{pin::Pin, future::Future}; +use libp2p_core::Executor; + + +#[cfg(feature = "tokio")] +#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct TokioExecutor; + +#[cfg(feature = "tokio")] +impl Executor for TokioExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = tokio::spawn(future); + } +} + +#[cfg(feature = "async-std")] +#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct AsyncStdExecutor; + +#[cfg(feature = "async-std")] +impl Executor for AsyncStdExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = async_std::task::spawn(future); + } +} From ad2ad9bf162a3b0b7b1f64a1fd57f088bb6e609f Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Wed, 9 Nov 2022 13:30:11 +0100 Subject: [PATCH 04/28] Implement other changes --- core/src/lib.rs | 1 - examples/chat-tokio.rs | 2 +- examples/chat.rs | 2 +- examples/file-sharing.rs | 5 +- protocols/dcutr/examples/dcutr.rs | 11 +- protocols/rendezvous/tests/harness.rs | 6 +- swarm/src/connection/pool.rs | 117 +++++++----------- swarm/src/connection/pool/executor.rs | 3 +- swarm/src/lib.rs | 170 ++++++++++++++------------ 9 files changed, 146 insertions(+), 171 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index 5012ecb4e21..ffcdac33515 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -109,4 +109,3 @@ impl Executor for ThreadPool { self.spawn_ok(future) } } - diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index 1b2b06d3424..736819b6109 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -102,7 +102,7 @@ async fn main() -> Result<(), Box> { floodsub: Floodsub::new(peer_id), mdns, }; - let mut swarm = libp2p_swarm::TokioSwarm::new(transport, behaviour, peer_id); + let mut swarm = libp2p_swarm::Swarm::with_tokio_executor(transport, behaviour, peer_id); // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { diff --git a/examples/chat.rs b/examples/chat.rs index fa504066e9c..717545411aa 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -115,7 +115,7 @@ async fn main() -> Result<(), Box> { }; behaviour.floodsub.subscribe(floodsub_topic.clone()); - Swarm::<_, libp2p_core::AsyncStdExecutor>::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Reach out to another node if specified diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 4e3cad3f187..f8e00a108f4 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -251,7 +251,7 @@ mod network { // Build the Swarm, connecting the lower layer transport logic with the // higher layer network behaviour logic. - let swarm = SwarmBuilder::new( + let swarm = Swarm::with_threadpool_executor( libp2p::development_transport(id_keys).await?, ComposedBehaviour { kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), @@ -262,8 +262,7 @@ mod network { ), }, peer_id, - ) - .build(); + ); let (command_sender, command_receiver) = mpsc::channel(0); let (event_sender, event_receiver) = mpsc::channel(0); diff --git a/protocols/dcutr/examples/dcutr.rs b/protocols/dcutr/examples/dcutr.rs index b239702d300..ed737bf3b59 100644 --- a/protocols/dcutr/examples/dcutr.rs +++ b/protocols/dcutr/examples/dcutr.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use clap::Parser; -use futures::executor::block_on; +use futures::executor::{block_on, ThreadPool}; use futures::future::FutureExt; use futures::stream::StreamExt; use libp2p::core::multiaddr::{Multiaddr, Protocol}; @@ -34,6 +34,7 @@ use libp2p::tcp; use libp2p::Transport; use libp2p::{dcutr, ping}; use libp2p::{identity, NetworkBehaviour, PeerId}; +use libp2p_core::Executor; use log::info; use std::convert::TryInto; use std::error::Error; @@ -155,7 +156,13 @@ fn main() -> Result<(), Box> { dcutr: dcutr::behaviour::Behaviour::new(), }; - let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id) + + let executor: Option> = match ThreadPool::new() { + Ok(tp) => Some(Box::new(tp)), + Err(_) => None, + }; + + let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id, executor) .dial_concurrency_factor(10_u8.try_into().unwrap()) .build(); diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index cad3a087afb..fa1094cef3d 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -53,11 +53,7 @@ where .timeout(Duration::from_secs(5)) .boxed(); - SwarmBuilder::new(transport, behaviour_fn(peer_id, identity), peer_id) - .executor(Box::new(|future| { - let _ = tokio::spawn(future); - })) - .build() + Swarm::with_tokio_executor(transport, behaviour_fn(peer_id, identity), peer_id) } fn get_rand_memory_address() -> Multiaddr { diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 8ba06715c8a..50209110d2c 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -31,7 +31,7 @@ use crate::{ }; use concurrent_dial::ConcurrentDial; use fnv::FnvHashMap; -use futures::executor::ThreadPool; +use futures::executor::ThreadPoolBuilder; use futures::prelude::*; use futures::{ channel::{mpsc, oneshot}, @@ -53,21 +53,16 @@ use std::{ use void::Void; mod concurrent_dial; -mod task; +#[cfg(any(feature = "tokio", feature = "async-std"))] pub mod executor; +mod task; -enum ExecSwitch -where - TExecutor: Executor, -{ - Executor(TExecutor), +enum ExecSwitch { + Executor(Box), LocalSpawn(FuturesUnordered + Send>>>), } -impl ExecSwitch -where - TExecutor: Executor, -{ +impl ExecSwitch { // advance the local queue #[inline] fn advance_local(&mut self, cx: &mut Context) { @@ -89,10 +84,9 @@ where } /// A connection `Pool` manages a set of connections for each peer. -pub struct Pool +pub struct Pool where TTrans: Transport, - TExecutor: Executor, THandler: IntoConnectionHandler, { local_id: PeerId, @@ -131,7 +125,7 @@ where /// The executor to use for running the background tasks. Can either be a global executor /// or a local queue. - executor: ExecSwitch, + executor: ExecSwitch, /// Sender distributed to pending tasks for reporting events back /// to the pool. @@ -222,9 +216,7 @@ impl PendingConnection { } } -impl fmt::Debug - for Pool -{ +impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("Pool") .field("counters", &self.counters) @@ -321,14 +313,13 @@ where }, } -impl Pool +impl Pool where THandler: IntoConnectionHandler, TTrans: Transport, - TExecutor: Executor, { /// Creates a new empty `Pool`. - pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { + pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(config.task_event_buffer_size); let (established_connection_events_tx, established_connection_events_rx) = @@ -440,10 +431,9 @@ where } } -impl Pool +impl Pool where THandler: IntoConnectionHandler, - TExecutor: Executor, TTrans: Transport + 'static, TTrans::Output: Send + 'static, TTrans::Error: Send + 'static, @@ -1084,9 +1074,9 @@ impl ConnectionLimits { /// /// The default configuration specifies no dedicated task executor, a /// task event buffer size of 32, and a task command buffer size of 7. -pub struct PoolConfig { +pub struct PoolConfig { /// Executor to use to spawn tasks. - pub executor: Option, + pub executor: Option>, /// Size of the task command buffer (per task). pub task_command_buffer_size: usize, @@ -1107,62 +1097,42 @@ pub struct PoolConfig { max_negotiating_inbound_streams: usize, } -macro_rules! impl_pool_config_default { - ($executor:ident) => { - PoolConfig { - $executor, +impl Default for PoolConfig { + fn default() -> Self { + let executor: Option> = match ThreadPoolBuilder::new() + .name_prefix("libp2p-swarm-task-") + .create() + { + Ok(pool) => Some(Box::new(pool)), + Err(_) => None, + }; + Self { + executor, task_event_buffer_size: 32, task_command_buffer_size: 7, - // Set to a default of 8 based on frequency of dialer connections dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"), substream_upgrade_protocol_override: None, max_negotiating_inbound_streams: 128, } - }; -} - -impl Default for PoolConfig { - fn default() -> Self { - let executor = ThreadPool::new().ok(); - impl_pool_config_default!(executor) - } -} - -#[cfg(feature = "tokio")] -impl<'a> Default for PoolConfig> { - fn default() -> Self { - let executor = Some(libp2p_core::TokioExecutor::default()); - impl_pool_config_default!(executor) } } -#[cfg(feature = "async-std")] -impl Default for PoolConfig { - fn default() -> Self { - let executor = Some(libp2p_core::AsyncStdExecutor::default()); - impl_pool_config_default!(executor) +impl PoolConfig { + pub fn new(executor: Option>) -> Self { + Self { + executor, + task_command_buffer_size: 32, + task_event_buffer_size: 7, + dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"), + substream_upgrade_protocol_override: None, + max_negotiating_inbound_streams: 128, + } } -} -impl PoolConfig { /// Configures the executor to use for spawning connection background tasks. - pub fn with_executor(self, executor: NExecutor) -> PoolConfig { - let PoolConfig { - task_command_buffer_size, - task_event_buffer_size, - dial_concurrency_factor, - substream_upgrade_protocol_override, - max_negotiating_inbound_streams, - .. - } = self; - PoolConfig { - executor: Some(executor), - task_command_buffer_size, - task_event_buffer_size, - dial_concurrency_factor, - substream_upgrade_protocol_override, - max_negotiating_inbound_streams, - } + pub fn with_executor(mut self, executor: Box) -> Self { + self.executor = Some(executor); + self } /// Sets the maximum number of events sent to a connection's background task @@ -1210,15 +1180,11 @@ impl PoolConfig { self.max_negotiating_inbound_streams = v; self } -} -impl PoolConfig -where - TExecutor: Executor + Send + 'static, -{ /// Configures the executor to use for spawning connection background tasks, /// only if no executor has already been configured. - pub fn or_else_with_executor(self, f: F) -> PoolConfig> + #[allow(dead_code)] // TODO: Can we just remove this? + pub fn or_else_with_executor(self, f: F) -> Self where F: FnOnce() -> Option>, { @@ -1230,8 +1196,7 @@ where max_negotiating_inbound_streams, .. } = self; - let executor: Option> = - self.executor.map_or_else(f, |e| Some(Box::new(e))); + let executor: Option> = self.executor.map_or_else(f, |e| Some(e)); PoolConfig { executor, task_command_buffer_size, diff --git a/swarm/src/connection/pool/executor.rs b/swarm/src/connection/pool/executor.rs index 556e08d86d0..6e955fadbdc 100644 --- a/swarm/src/connection/pool/executor.rs +++ b/swarm/src/connection/pool/executor.rs @@ -1,6 +1,5 @@ -use std::{pin::Pin, future::Future}; use libp2p_core::Executor; - +use std::{future::Future, pin::Pin}; #[cfg(feature = "tokio")] #[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 50867877614..4d5c7dd7dc8 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -75,6 +75,7 @@ pub use connection::{ ConnectionError, ConnectionLimit, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }; +use futures::executor::ThreadPoolBuilder; pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, @@ -258,16 +259,15 @@ pub enum SwarmEvent { /// /// Note: Needs to be polled via `` in order to make /// progress. -pub struct Swarm +pub struct Swarm where TBehaviour: NetworkBehaviour, - TExecutor: Executor, { /// [`Transport`] for dialing remote peers and listening for incoming connection. transport: transport::Boxed<(PeerId, StreamMuxerBox)>, /// The nodes currently active. - pool: Pool, transport::Boxed<(PeerId, StreamMuxerBox)>, TExecutor>, + pool: Pool, transport::Boxed<(PeerId, StreamMuxerBox)>>, /// The local peer ID. local_peer_id: PeerId, @@ -301,40 +301,88 @@ where pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent)>, } -#[cfg(feature = "tokio")] -pub type TokioSwarm<'a, TBehaviour> = Swarm>; +impl Unpin for Swarm where TBehaviour: NetworkBehaviour {} -#[cfg(feature = "async-std")] -pub type AsyncStdSwarm = Swarm; - -impl Unpin for Swarm -where - TBehaviour: NetworkBehaviour, - TExecutor: Executor, -{ -} - -impl Swarm +impl Swarm where TBehaviour: NetworkBehaviour, - TExecutor: Executor, - PoolConfig: Default, { /// Builds a new `Swarm`. + #[deprecated( + since = "0.50", + note = "This constructor is considered ambiguous regarding the executor. Use `Swarm::with_threadpool_executor` for the same behaviour." + )] pub fn new( transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { - SwarmBuilder::new(transport, behaviour, local_peer_id).build() + Self::with_threadpool_executor(transport, behaviour, local_peer_id) + } + + /// Builds a new `Swarm` with a provided executor. + pub fn with_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + executor: Box, + ) -> Self { + SwarmBuilder::new(transport, behaviour, local_peer_id, Some(executor)).build() + } + + /// Builds a new `Swarm` with a tokio executor. + #[cfg(feature = "tokio")] + pub fn with_tokio_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + SwarmBuilder::new( + transport, + behaviour, + local_peer_id, + Some(Box::new(crate::connection::pool::executor::TokioExecutor)), + ) + .build() + } + + /// Builds a new `Swarm` with an async-std executor. + #[cfg(feature = "async-std")] + pub fn with_async_std_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + SwarmBuilder::new( + transport, + behaviour, + local_peer_id, + Some(Box::new( + crate::connection::pool::executor::AsyncStdExecutor, + )), + ) + .build() + } + + /// Builds a new `Swarm` with a threadpool executor. + pub fn with_threadpool_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + let executor: Option> = match ThreadPoolBuilder::new() + .name_prefix("libp2p-swarm-task-") + .create() + { + Ok(tp) => Some(Box::new(tp)), + Err(err) => { + log::warn!("Failed to create executor thread pool: {:?}", err); + None + } + }; + SwarmBuilder::new(transport, behaviour, local_peer_id, executor).build() } -} -impl Swarm -where - TBehaviour: NetworkBehaviour, - TExecutor: Executor, -{ /// Returns information about the connections underlying the [`Swarm`]. pub fn network_info(&self) -> NetworkInfo { let num_peers = self.pool.num_peers(); @@ -1065,12 +1113,7 @@ where } } PendingNotifyHandler::Any(ids) => { - match notify_any::<_, _, TBehaviour, TExecutor>( - ids, - &mut this.pool, - event, - cx, - ) { + match notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) { None => continue, Some((event, ids)) => { let handler = PendingNotifyHandler::Any(ids); @@ -1179,9 +1222,9 @@ fn notify_one( /// /// Returns `None` if either all connections are closing or the event /// was successfully sent to a handler, in either case the event is consumed. -fn notify_any( +fn notify_any( ids: SmallVec<[ConnectionId; 10]>, - pool: &mut Pool, + pool: &mut Pool, event: THandlerInEvent, cx: &mut Context<'_>, ) -> Option<(THandlerInEvent, SmallVec<[ConnectionId; 10]>)> @@ -1189,7 +1232,6 @@ where TTrans: Transport, TTrans::Error: Send + 'static, TBehaviour: NetworkBehaviour, - TExecutor: Executor, THandler: IntoConnectionHandler, THandler::Handler: ConnectionHandler< InEvent = THandlerInEvent, @@ -1231,10 +1273,9 @@ where /// /// Note: This stream is infinite and it is guaranteed that /// [`Stream::poll_next`] will never return `Poll::Ready(None)`. -impl Stream for Swarm +impl Stream for Swarm where TBehaviour: NetworkBehaviour, - TExecutor: Executor, { type Item = SwarmEvent, THandlerErr>; @@ -1244,10 +1285,9 @@ where } /// The stream of swarm events never terminates, so we can implement fused for it. -impl FusedStream for Swarm +impl FusedStream for Swarm where TBehaviour: NetworkBehaviour, - TExecutor: Executor, { fn is_terminated(&self) -> bool { false @@ -1286,29 +1326,17 @@ impl<'a> PollParameters for SwarmPollParameters<'a> { } /// A [`SwarmBuilder`] provides an API for configuring and constructing a [`Swarm`]. -pub struct SwarmBuilder -where - TExecutor: Executor, -{ +pub struct SwarmBuilder { local_peer_id: PeerId, transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, - pool_config: PoolConfig, + pool_config: PoolConfig, connection_limits: ConnectionLimits, } -#[cfg(feature = "tokio")] -pub type TokioSwarmBuilder<'a, TBehaviour> = - SwarmBuilder>; - -#[cfg(feature = "async-std")] -pub type AsyncStdSwarmBuilder = SwarmBuilder; - -impl SwarmBuilder +impl SwarmBuilder where TBehaviour: NetworkBehaviour, - TExecutor: Executor, - PoolConfig: Default, { /// Creates a new `SwarmBuilder` from the given transport, behaviour and /// local peer ID. The `Swarm` with its underlying `Network` is obtained @@ -1317,45 +1345,25 @@ where transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, local_peer_id: PeerId, + executor: Option>, ) -> Self { SwarmBuilder { local_peer_id, transport, behaviour, - pool_config: Default::default(), + pool_config: PoolConfig::new(executor), connection_limits: Default::default(), } } -} -impl SwarmBuilder -where - TBehaviour: NetworkBehaviour, - TExecutor: Executor, -{ /// Configures the `Executor` to use for spawning background tasks. /// /// By default, unless another executor has been configured, /// [`SwarmBuilder::build`] will try to set up a /// [`ThreadPool`](futures::executor::ThreadPool). - pub fn executor(self, executor: NExecutor) -> SwarmBuilder - where - NExecutor: Executor, - { - let Self { - local_peer_id, - transport, - behaviour, - pool_config, - connection_limits, - } = self; - SwarmBuilder { - local_peer_id, - transport, - behaviour, - pool_config: pool_config.with_executor(executor), - connection_limits, - } + pub fn executor(mut self, executor: Box) -> Self { + self.pool_config = self.pool_config.with_executor(executor); + self } /// Configures the number of events from the [`NetworkBehaviour`] in @@ -1442,7 +1450,9 @@ where } /// Builds a `Swarm` with the current configuration. - pub fn build(mut self) -> Swarm { + /// + /// If no executor was given, no executor will be set. + pub fn build(mut self) -> Swarm { let supported_protocols = self .behaviour .new_handler() @@ -1648,7 +1658,7 @@ mod tests { .multiplex(yamux::YamuxConfig::default()) .boxed(); let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); - SwarmBuilder::new(transport, behaviour, local_public_key.into()) + SwarmBuilder::new(transport, behaviour, local_public_key.into(), None) } fn swarms_connected( From 92da6a249a8ee6a695083474ede8177fd5ba9a10 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Wed, 9 Nov 2022 13:41:18 +0100 Subject: [PATCH 05/28] Formatted --- protocols/dcutr/examples/dcutr.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/protocols/dcutr/examples/dcutr.rs b/protocols/dcutr/examples/dcutr.rs index ed737bf3b59..5d4a4e9dc87 100644 --- a/protocols/dcutr/examples/dcutr.rs +++ b/protocols/dcutr/examples/dcutr.rs @@ -156,7 +156,6 @@ fn main() -> Result<(), Box> { dcutr: dcutr::behaviour::Behaviour::new(), }; - let executor: Option> = match ThreadPool::new() { Ok(tp) => Some(Box::new(tp)), Err(_) => None, From e7253d15bf071c1e9f0052f68e978fade5ee6930 Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Thu, 10 Nov 2022 12:58:35 +0100 Subject: [PATCH 06/28] Update swarm/src/connection/pool.rs Co-authored-by: Thomas Eizinger --- swarm/src/connection/pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 50209110d2c..addad47d11c 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -123,7 +123,7 @@ where /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, - /// The executor to use for running the background tasks. Can either be a global executor + /// The executor to use for running connection tasks. Can either be a global executor /// or a local queue. executor: ExecSwitch, From 3a24540201419186da6d2acd4c80c71db779aabe Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Thu, 10 Nov 2022 14:15:19 +0100 Subject: [PATCH 07/28] Update swarm/src/lib.rs Co-authored-by: Thomas Eizinger --- swarm/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 4d5c7dd7dc8..ff2b30edee3 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -337,13 +337,12 @@ where behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { - SwarmBuilder::new( + SwarmBuilder::with_executor( transport, behaviour, local_peer_id, - Some(Box::new(crate::connection::pool::executor::TokioExecutor)), + Box::new(crate::connection::pool::executor::TokioExecutor), ) - .build() } /// Builds a new `Swarm` with an async-std executor. From 98dc7ce6c0ea5d32231f75dfba619acd9e2b5317 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Thu, 10 Nov 2022 14:16:23 +0100 Subject: [PATCH 08/28] Implemented changes --- core/Cargo.toml | 8 +++---- examples/chat.rs | 2 +- swarm/src/connection/pool.rs | 46 ------------------------------------ swarm/src/lib.rs | 34 ++++++++++++++++++++++++++ 4 files changed, 39 insertions(+), 51 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index bc2972c019b..bff2f086aaa 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -56,10 +56,10 @@ serde_json = "1.0" prost-build = "0.11" [features] -secp256k1 = ["libsecp256k1"] -ecdsa = ["p256"] -rsa = ["dep:ring"] -serde = ["multihash/serde-codec", "dep:serde"] +secp256k1 = [ "libsecp256k1" ] +ecdsa = [ "p256" ] +rsa = [ "dep:ring" ] +serde = [ "multihash/serde-codec", "dep:serde" ] [[bench]] name = "peer_id" diff --git a/examples/chat.rs b/examples/chat.rs index 717545411aa..2b43526ae6b 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -115,7 +115,7 @@ async fn main() -> Result<(), Box> { }; behaviour.floodsub.subscribe(floodsub_topic.clone()); - Swarm::with_async_std_executor(transport, behaviour, local_peer_id) + Swarm::with_threadpool_executor(transport, behaviour, local_peer_id) }; // Reach out to another node if specified diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index addad47d11c..c078383b9f8 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -1097,26 +1097,6 @@ pub struct PoolConfig { max_negotiating_inbound_streams: usize, } -impl Default for PoolConfig { - fn default() -> Self { - let executor: Option> = match ThreadPoolBuilder::new() - .name_prefix("libp2p-swarm-task-") - .create() - { - Ok(pool) => Some(Box::new(pool)), - Err(_) => None, - }; - Self { - executor, - task_event_buffer_size: 32, - task_command_buffer_size: 7, - dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"), - substream_upgrade_protocol_override: None, - max_negotiating_inbound_streams: 128, - } - } -} - impl PoolConfig { pub fn new(executor: Option>) -> Self { Self { @@ -1180,32 +1160,6 @@ impl PoolConfig { self.max_negotiating_inbound_streams = v; self } - - /// Configures the executor to use for spawning connection background tasks, - /// only if no executor has already been configured. - #[allow(dead_code)] // TODO: Can we just remove this? - pub fn or_else_with_executor(self, f: F) -> Self - where - F: FnOnce() -> Option>, - { - let PoolConfig { - task_command_buffer_size, - task_event_buffer_size, - dial_concurrency_factor, - substream_upgrade_protocol_override, - max_negotiating_inbound_streams, - .. - } = self; - let executor: Option> = self.executor.map_or_else(f, |e| Some(e)); - PoolConfig { - executor, - task_command_buffer_size, - task_event_buffer_size, - dial_concurrency_factor, - substream_upgrade_protocol_override, - max_negotiating_inbound_streams, - } - } } trait EntryExt<'a, K, V> { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 4d5c7dd7dc8..0362442c7c8 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1356,6 +1356,40 @@ where } } + /// Creates a new `SwarmBuilder` from the given transport, behaviour, local peer ID and + /// executor. The `Swarm` with its underlying `Network` is obtained via + /// [`SwarmBuilder::build`]. + pub fn with_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + executor: Box, + ) -> Self { + Self { + local_peer_id, + transport, + behaviour, + pool_config: PoolConfig::new(Some(executor)), + connection_limits: Default::default(), + } + } + + /// Creates a new `SwarmBuilder` from the given transport, behaviour and local peer ID. The + /// `Swarm` with its underlying `Network` is obtained via [`SwarmBuilder::build`]. + pub fn without_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + Self { + local_peer_id, + transport, + behaviour, + pool_config: PoolConfig::new(None), + connection_limits: Default::default(), + } + } + /// Configures the `Executor` to use for spawning background tasks. /// /// By default, unless another executor has been configured, From 8fd5e5623fb59eefc80636ead5251c0384c3ab67 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Thu, 10 Nov 2022 14:42:30 +0100 Subject: [PATCH 09/28] Fix tests --- protocols/rendezvous/tests/harness.rs | 2 +- swarm/src/connection/pool.rs | 1 - swarm/src/lib.rs | 27 ++++++++++++++++++--------- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index fa1094cef3d..523f34c76db 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -28,7 +28,7 @@ use libp2p::core::upgrade::SelectUpgrade; use libp2p::core::{identity, Multiaddr, PeerId, Transport}; use libp2p::mplex::MplexConfig; use libp2p::noise::NoiseAuthenticated; -use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::yamux::YamuxConfig; use std::fmt::Debug; use std::time::Duration; diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index c078383b9f8..7b8f1068e37 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -31,7 +31,6 @@ use crate::{ }; use concurrent_dial::ConcurrentDial; use fnv::FnvHashMap; -use futures::executor::ThreadPoolBuilder; use futures::prelude::*; use futures::{ channel::{mpsc, oneshot}, diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index a1480e86568..83df7812631 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -327,7 +327,7 @@ where local_peer_id: PeerId, executor: Box, ) -> Self { - SwarmBuilder::new(transport, behaviour, local_peer_id, Some(executor)).build() + SwarmBuilder::with_executor(transport, behaviour, local_peer_id, executor).build() } /// Builds a new `Swarm` with a tokio executor. @@ -343,6 +343,7 @@ where local_peer_id, Box::new(crate::connection::pool::executor::TokioExecutor), ) + .build() } /// Builds a new `Swarm` with an async-std executor. @@ -352,13 +353,11 @@ where behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { - SwarmBuilder::new( + SwarmBuilder::with_executor( transport, behaviour, local_peer_id, - Some(Box::new( - crate::connection::pool::executor::AsyncStdExecutor, - )), + Box::new(crate::connection::pool::executor::AsyncStdExecutor), ) .build() } @@ -369,17 +368,19 @@ where behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { - let executor: Option> = match ThreadPoolBuilder::new() + let builder = match ThreadPoolBuilder::new() .name_prefix("libp2p-swarm-task-") .create() { - Ok(tp) => Some(Box::new(tp)), + Ok(tp) => { + SwarmBuilder::with_executor(transport, behaviour, local_peer_id, Box::new(tp)) + } Err(err) => { log::warn!("Failed to create executor thread pool: {:?}", err); - None + SwarmBuilder::without_executor(transport, behaviour, local_peer_id) } }; - SwarmBuilder::new(transport, behaviour, local_peer_id, executor).build() + builder.build() } /// Returns information about the connections underlying the [`Swarm`]. @@ -1340,6 +1341,10 @@ where /// Creates a new `SwarmBuilder` from the given transport, behaviour and /// local peer ID. The `Swarm` with its underlying `Network` is obtained /// via [`SwarmBuilder::build`]. + #[deprecated( + since = "0.50", + note = "Unnecessary since replaced by `SwarmBuilder::with_executor` and `SwarmBuilder::without_executor`." + )] pub fn new( transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, @@ -1394,6 +1399,10 @@ where /// By default, unless another executor has been configured, /// [`SwarmBuilder::build`] will try to set up a /// [`ThreadPool`](futures::executor::ThreadPool). + #[deprecated( + since = "0.50", + note = "Deprecated since executor creation now happens in `SwarmBuilder::with_executor`." + )] pub fn executor(mut self, executor: Box) -> Self { self.pool_config = self.pool_config.with_executor(executor); self From 5381a2c77bdb19736d07d27c13beb50a50d1d45d Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Thu, 10 Nov 2022 14:54:53 +0100 Subject: [PATCH 10/28] Corrected some clippy issues --- swarm/src/connection/pool.rs | 3 ++- swarm/src/lib.rs | 7 +++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 7b8f1068e37..c6e9fdda57c 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -1185,9 +1185,10 @@ mod tests { fn exec(&self, _: Pin + Send>>) {} } + // TODO: This test has to be redesigned. #[test] fn set_executor() { - PoolConfig::default() + PoolConfig::new(None) .with_executor(Box::new(Dummy)) .with_executor(Box::new(|f| { async_std::task::spawn(f); diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 83df7812631..e4ceec5b1d1 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -309,7 +309,7 @@ where { /// Builds a new `Swarm`. #[deprecated( - since = "0.50", + since = "0.50.0", note = "This constructor is considered ambiguous regarding the executor. Use `Swarm::with_threadpool_executor` for the same behaviour." )] pub fn new( @@ -1342,7 +1342,7 @@ where /// local peer ID. The `Swarm` with its underlying `Network` is obtained /// via [`SwarmBuilder::build`]. #[deprecated( - since = "0.50", + since = "0.50.0", note = "Unnecessary since replaced by `SwarmBuilder::with_executor` and `SwarmBuilder::without_executor`." )] pub fn new( @@ -1400,7 +1400,7 @@ where /// [`SwarmBuilder::build`] will try to set up a /// [`ThreadPool`](futures::executor::ThreadPool). #[deprecated( - since = "0.50", + since = "0.50.0", note = "Deprecated since executor creation now happens in `SwarmBuilder::with_executor`." )] pub fn executor(mut self, executor: Box) -> Self { @@ -1663,7 +1663,6 @@ mod tests { use super::*; use crate::test::{CallTraceBehaviour, MockBehaviour}; use futures::executor::block_on; - use futures::executor::ThreadPool; use futures::future::poll_fn; use futures::future::Either; use futures::{executor, future, ready}; From dc5f1b7a3294133ba5bf9811b70f8d1b9379c801 Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Thu, 10 Nov 2022 23:22:21 +0100 Subject: [PATCH 11/28] Update core/Cargo.toml Co-authored-by: Thomas Eizinger --- core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index bff2f086aaa..07cec58c002 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -59,7 +59,7 @@ prost-build = "0.11" secp256k1 = [ "libsecp256k1" ] ecdsa = [ "p256" ] rsa = [ "dep:ring" ] -serde = [ "multihash/serde-codec", "dep:serde" ] +serde = ["multihash/serde-codec", "dep:serde"] [[bench]] name = "peer_id" From 5365389529a135c90e2243a1439e0033d520b9ae Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Fri, 11 Nov 2022 12:52:36 +0100 Subject: [PATCH 12/28] Implemented suggested changes --- protocols/dcutr/examples/dcutr.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/protocols/dcutr/examples/dcutr.rs b/protocols/dcutr/examples/dcutr.rs index 5d4a4e9dc87..9d68b9a254c 100644 --- a/protocols/dcutr/examples/dcutr.rs +++ b/protocols/dcutr/examples/dcutr.rs @@ -34,7 +34,6 @@ use libp2p::tcp; use libp2p::Transport; use libp2p::{dcutr, ping}; use libp2p::{identity, NetworkBehaviour, PeerId}; -use libp2p_core::Executor; use log::info; use std::convert::TryInto; use std::error::Error; @@ -156,12 +155,10 @@ fn main() -> Result<(), Box> { dcutr: dcutr::behaviour::Behaviour::new(), }; - let executor: Option> = match ThreadPool::new() { - Ok(tp) => Some(Box::new(tp)), - Err(_) => None, - }; - - let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id, executor) + let mut swarm = match ThreadPool::new() { + Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, Box::new(tp)), + Err(_) => SwarmBuilder::without_executor(transport, behaviour, local_peer_id), + } .dial_concurrency_factor(10_u8.try_into().unwrap()) .build(); From e055d2ab5b7f844720631bd7f0510978c4e046f5 Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Sat, 12 Nov 2022 18:24:55 +0100 Subject: [PATCH 13/28] Apply suggestions from code review Co-authored-by: Thomas Eizinger --- swarm/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index e4ceec5b1d1..7311bd02c89 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1343,7 +1343,7 @@ where /// via [`SwarmBuilder::build`]. #[deprecated( since = "0.50.0", - note = "Unnecessary since replaced by `SwarmBuilder::with_executor` and `SwarmBuilder::without_executor`." + note = "Use `SwarmBuilder::with_executor` or `SwarmBuilder::without_executor` instead." )] pub fn new( transport: transport::Boxed<(PeerId, StreamMuxerBox)>, @@ -1401,7 +1401,7 @@ where /// [`ThreadPool`](futures::executor::ThreadPool). #[deprecated( since = "0.50.0", - note = "Deprecated since executor creation now happens in `SwarmBuilder::with_executor`." + note = "Use `SwarmBuilder::with_executor` instead." )] pub fn executor(mut self, executor: Box) -> Self { self.pool_config = self.pool_config.with_executor(executor); @@ -1493,7 +1493,6 @@ where /// Builds a `Swarm` with the current configuration. /// - /// If no executor was given, no executor will be set. pub fn build(mut self) -> Swarm { let supported_protocols = self .behaviour From 5feb2e5fba3c444a2b6744bc9bc34b7426ac8230 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Sat, 12 Nov 2022 22:48:31 +0100 Subject: [PATCH 14/28] Implemented requested changes --- swarm/src/lib.rs | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 7311bd02c89..a39a3e300cb 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -75,7 +75,6 @@ pub use connection::{ ConnectionError, ConnectionLimit, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }; -use futures::executor::ThreadPoolBuilder; pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, @@ -87,7 +86,7 @@ use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent}; use connection::IncomingInfo; use dial_opts::{DialOpts, PeerCondition}; use either::Either; -use futures::{prelude::*, stream::FusedStream}; +use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; use libp2p_core::connection::ConnectionId; use libp2p_core::muxing::SubstreamBox; use libp2p_core::{ @@ -383,6 +382,21 @@ where builder.build() } + /// Builds a new `Swarm` without an executor, instead using the current task. + /// + /// ## ⚠️ Performance warning + /// All connections will be polled on the current task, thus quite bad performance + /// characteristics should be expected. Whenever possible use an executor and + /// [`Swarm::with_executor`]. + pub fn without_executor( + transport: transport::Boxed<(PeerId, StreamMuxerBox)>, + behaviour: TBehaviour, + local_peer_id: PeerId, + ) -> Self { + SwarmBuilder::without_executor(transport, behaviour, local_peer_id) + .build() + } + /// Returns information about the connections underlying the [`Swarm`]. pub fn network_info(&self) -> NetworkInfo { let num_peers = self.pool.num_peers(); @@ -1349,8 +1363,12 @@ where transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, local_peer_id: PeerId, - executor: Option>, ) -> Self { + let executor: Option> = match ThreadPoolBuilder::new().name_prefix("libp2p-swarm-task-") + .create().ok() { + Some(tp) => Some(Box::new(tp)), + None => None, + }; SwarmBuilder { local_peer_id, transport, @@ -1380,6 +1398,11 @@ where /// Creates a new `SwarmBuilder` from the given transport, behaviour and local peer ID. The /// `Swarm` with its underlying `Network` is obtained via [`SwarmBuilder::build`]. + /// + /// ## ⚠️ Performance warning + /// All connections will be polled on the current task, thus quite bad performance + /// characteristics should be expected. Whenever possible use an executor and + /// [`SwarmBuilder::with_executor`]. pub fn without_executor( transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, @@ -1698,7 +1721,7 @@ mod tests { .multiplex(yamux::YamuxConfig::default()) .boxed(); let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); - SwarmBuilder::new(transport, behaviour, local_public_key.into(), None) + SwarmBuilder::new(transport, behaviour, local_public_key.into()) } fn swarms_connected( From e6761c6cc6fda251f6ede6f215fb011f9a98f3d3 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 13:52:42 +0100 Subject: [PATCH 15/28] Use new api --- examples/distributed-key-value-store.rs | 2 +- examples/file-sharing.rs | 2 +- examples/gossipsub-chat.rs | 2 +- examples/ipfs-kad.rs | 2 +- examples/ipfs-private.rs | 2 +- examples/mdns-passive-discovery.rs | 2 +- examples/ping.rs | 2 +- misc/metrics/examples/metrics/main.rs | 2 +- misc/multistream-select/tests/transport.rs | 4 ++-- protocols/autonat/examples/autonat_client.rs | 2 +- protocols/autonat/examples/autonat_server.rs | 2 +- protocols/autonat/tests/test_client.rs | 2 +- protocols/autonat/tests/test_server.rs | 2 +- protocols/dcutr/tests/lib.rs | 4 ++-- protocols/gossipsub/tests/smoke.rs | 2 +- protocols/identify/examples/identify.rs | 2 +- protocols/identify/src/behaviour.rs | 12 ++++++------ protocols/kad/src/behaviour/test.rs | 2 +- protocols/mdns/tests/use-async-std.rs | 2 +- protocols/mdns/tests/use-tokio.rs | 2 +- protocols/ping/tests/ping.rs | 12 ++++++------ protocols/relay/examples/relay_v2.rs | 2 +- protocols/relay/tests/v2.rs | 4 ++-- protocols/rendezvous/examples/discover.rs | 6 +++--- protocols/rendezvous/examples/register.rs | 6 +++--- .../rendezvous/examples/register_with_identify.rs | 6 +++--- protocols/rendezvous/examples/rendezvous_point.rs | 6 +++--- protocols/request-response/tests/ping.rs | 14 +++++++------- swarm/src/lib.rs | 6 +++++- transports/tls/tests/smoke.rs | 2 +- 30 files changed, 61 insertions(+), 57 deletions(-) diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 3323cc91b96..4bed3a4e99c 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -99,7 +99,7 @@ async fn main() -> Result<(), Box> { let kademlia = Kademlia::new(local_peer_id, store); let mdns = Mdns::new(MdnsConfig::default())?; let behaviour = MyBehaviour { kademlia, mdns }; - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Read full lines from stdin diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 1c800a6ce2b..8f0183749e9 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -219,7 +219,7 @@ mod network { ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; - use libp2p::swarm::{ConnectionHandlerUpgrErr, SwarmBuilder, SwarmEvent}; + use libp2p::swarm::{ConnectionHandlerUpgrErr, SwarmEvent}; use libp2p::{NetworkBehaviour, Swarm}; use std::collections::{hash_map, HashMap, HashSet}; use std::iter; diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index e293b25b5eb..b445294f865 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -109,7 +109,7 @@ async fn main() -> Result<(), Box> { let mut swarm = { let mdns = Mdns::new(MdnsConfig::default())?; let behaviour = MyBehaviour { gossipsub, mdns }; - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Read full lines from stdin diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index 659cd49b607..2c370472bec 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -68,7 +68,7 @@ async fn main() -> Result<(), Box> { behaviour.add_address(&PeerId::from_str(peer)?, bootaddr.clone()); } - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Order Kademlia to search for a peer. diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index 1f6aec90a01..f7159555e97 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -204,7 +204,7 @@ async fn main() -> Result<(), Box> { println!("Subscribing to {gossipsub_topic:?}"); behaviour.gossipsub.subscribe(&gossipsub_topic).unwrap(); - Swarm::new(transport, behaviour, local_peer_id) + Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; // Reach out to other nodes if specified diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs index 8231d888dcc..477c9766391 100644 --- a/examples/mdns-passive-discovery.rs +++ b/examples/mdns-passive-discovery.rs @@ -45,7 +45,7 @@ async fn main() -> Result<(), Box> { // Create a Swarm that establishes connections through the given transport. // Note that the MDNS behaviour itself will not actually inititiate any connections, // as it only uses UDP. - let mut swarm = Swarm::new(transport, behaviour, peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; loop { diff --git a/examples/ping.rs b/examples/ping.rs index 5d7443b8cfc..5a26bd4a91c 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box> { let transport = libp2p::development_transport(local_key).await?; - let mut swarm = Swarm::new(transport, Behaviour::default(), local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, Behaviour::default(), local_peer_id); // Tell the swarm to listen on all interfaces and a random, OS-assigned // port. diff --git a/misc/metrics/examples/metrics/main.rs b/misc/metrics/examples/metrics/main.rs index 0307e32dc1e..e1477987967 100644 --- a/misc/metrics/examples/metrics/main.rs +++ b/misc/metrics/examples/metrics/main.rs @@ -70,7 +70,7 @@ fn main() -> Result<(), Box> { let local_peer_id = PeerId::from(local_key.public()); info!("Local peer id: {:?}", local_peer_id); - let mut swarm = Swarm::new( + let mut swarm = Swarm::without_executor( block_on(libp2p::development_transport(local_key))?, Behaviour::default(), local_peer_id, diff --git a/misc/multistream-select/tests/transport.rs b/misc/multistream-select/tests/transport.rs index bf5dd247b40..4c1a822b0c6 100644 --- a/misc/multistream-select/tests/transport.rs +++ b/misc/multistream-select/tests/transport.rs @@ -61,8 +61,8 @@ fn transport_upgrade() { let listen_addr = Multiaddr::from(Protocol::Memory(random::())); - let mut dialer = Swarm::new(dialer_transport, dummy::Behaviour, dialer_id); - let mut listener = Swarm::new(listener_transport, dummy::Behaviour, listener_id); + let mut dialer = Swarm::with_async_std_executor(dialer_transport, dummy::Behaviour, dialer_id); + let mut listener = Swarm::with_async_std_executor(listener_transport, dummy::Behaviour, listener_id); listener.listen_on(listen_addr).unwrap(); let (addr_sender, addr_receiver) = oneshot::channel(); diff --git a/protocols/autonat/examples/autonat_client.rs b/protocols/autonat/examples/autonat_client.rs index c90a5c55060..b711f2117cb 100644 --- a/protocols/autonat/examples/autonat_client.rs +++ b/protocols/autonat/examples/autonat_client.rs @@ -67,7 +67,7 @@ async fn main() -> Result<(), Box> { let behaviour = Behaviour::new(local_key.public()); - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id); swarm.listen_on( Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) diff --git a/protocols/autonat/examples/autonat_server.rs b/protocols/autonat/examples/autonat_server.rs index 7bb79383710..84e50a6a5ae 100644 --- a/protocols/autonat/examples/autonat_server.rs +++ b/protocols/autonat/examples/autonat_server.rs @@ -57,7 +57,7 @@ async fn main() -> Result<(), Box> { let behaviour = Behaviour::new(local_key.public()); - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id); swarm.listen_on( Multiaddr::empty() .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) diff --git a/protocols/autonat/tests/test_client.rs b/protocols/autonat/tests/test_client.rs index 420bcf99829..5e23304e3f2 100644 --- a/protocols/autonat/tests/test_client.rs +++ b/protocols/autonat/tests/test_client.rs @@ -40,7 +40,7 @@ async fn init_swarm(config: Config) -> Swarm { let local_id = PeerId::from_public_key(&keypair.public()); let transport = development_transport(keypair).await.unwrap(); let behaviour = Behaviour::new(local_id, config); - Swarm::new(transport, behaviour, local_id) + Swarm::with_async_std_executor(transport, behaviour, local_id) } async fn spawn_server(kill: oneshot::Receiver<()>) -> (PeerId, Multiaddr) { diff --git a/protocols/autonat/tests/test_server.rs b/protocols/autonat/tests/test_server.rs index b45ae7ecafc..3035a6d8d9e 100644 --- a/protocols/autonat/tests/test_server.rs +++ b/protocols/autonat/tests/test_server.rs @@ -39,7 +39,7 @@ async fn init_swarm(config: Config) -> Swarm { let local_id = PeerId::from_public_key(&keypair.public()); let transport = development_transport(keypair).await.unwrap(); let behaviour = Behaviour::new(local_id, config); - Swarm::new(transport, behaviour, local_id) + Swarm::with_async_std_executor(transport, behaviour, local_id) } async fn init_server(config: Option) -> (Swarm, PeerId, Multiaddr) { diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 64aca18596b..7db524d70f0 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -99,7 +99,7 @@ fn build_relay() -> Swarm { let transport = build_transport(MemoryTransport::default().boxed(), local_public_key); - Swarm::new( + Swarm::with_threadpool_executor( transport, relay::Relay::new( local_peer_id, @@ -123,7 +123,7 @@ fn build_client() -> Swarm { local_public_key, ); - Swarm::new( + Swarm::with_threadpool_executor( transport, Client { relay: behaviour, diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 43ad944dccb..db99179b07f 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -171,7 +171,7 @@ fn build_node() -> (Multiaddr, Swarm) { .build() .unwrap(); let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id), config).unwrap(); - let mut swarm = Swarm::new(transport, behaviour, peer_id); + let mut swarm = Swarm::without_executor(transport, behaviour, peer_id); let port = 1 + random::(); let mut addr: Multiaddr = Protocol::Memory(port).into(); diff --git a/protocols/identify/examples/identify.rs b/protocols/identify/examples/identify.rs index b02eb1c9ebf..6f5fb2a1427 100644 --- a/protocols/identify/examples/identify.rs +++ b/protocols/identify/examples/identify.rs @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box> { local_key.public(), )); - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id); // Tell the swarm to listen on all interfaces and a random, OS-assigned // port. diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 92279765d5a..2215fde0ee8 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -584,7 +584,7 @@ mod tests { let protocol = Behaviour::new( Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), ); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -593,7 +593,7 @@ mod tests { let protocol = Behaviour::new( Config::new("c".to_string(), pubkey.clone()).with_agent_version("d".to_string()), ); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -661,7 +661,7 @@ mod tests { let (mut swarm1, pubkey1) = { let (pubkey, transport) = transport(); let protocol = Behaviour::new(Config::new("a".to_string(), pubkey.clone())); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -670,7 +670,7 @@ mod tests { let protocol = Behaviour::new( Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), ); - let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id()); + let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()); (swarm, pubkey) }; @@ -742,7 +742,7 @@ mod tests { .with_initial_delay(Duration::from_secs(10)), ); - Swarm::new(transport, protocol, pubkey.to_peer_id()) + Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()) }; let mut swarm2 = { @@ -751,7 +751,7 @@ mod tests { Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()), ); - Swarm::new(transport, protocol, pubkey.to_peer_id()) + Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id()) }; let swarm1_peer_id = *swarm1.local_peer_id(); diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index c61ffaf158f..47da12904bb 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -66,7 +66,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { let store = MemoryStore::new(local_id); let behaviour = Kademlia::with_config(local_id, store, cfg); - let mut swarm = Swarm::new(transport, behaviour, local_id); + let mut swarm = Swarm::without_executor(transport, behaviour, local_id); let address: Multiaddr = Protocol::Memory(random::()).into(); swarm.listen_on(address.clone()).unwrap(); diff --git a/protocols/mdns/tests/use-async-std.rs b/protocols/mdns/tests/use-async-std.rs index 2ddb36355be..3774179fefa 100644 --- a/protocols/mdns/tests/use-async-std.rs +++ b/protocols/mdns/tests/use-async-std.rs @@ -62,7 +62,7 @@ async fn create_swarm(config: MdnsConfig) -> Result, Box> let peer_id = PeerId::from(id_keys.public()); let transport = libp2p::development_transport(id_keys).await?; let behaviour = Mdns::new(config)?; - let mut swarm = Swarm::new(transport, behaviour, peer_id); + let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; Ok(swarm) } diff --git a/protocols/mdns/tests/use-tokio.rs b/protocols/mdns/tests/use-tokio.rs index 830557d3f00..dfd2d7a08c8 100644 --- a/protocols/mdns/tests/use-tokio.rs +++ b/protocols/mdns/tests/use-tokio.rs @@ -58,7 +58,7 @@ async fn create_swarm(config: MdnsConfig) -> Result, Box(1); @@ -128,10 +128,10 @@ fn max_failures() { .with_max_failures(max_failures.into()); let (peer1_id, trans) = mk_transport(muxer); - let mut swarm1 = Swarm::new(trans, Behaviour::new(cfg.clone()), peer1_id); + let mut swarm1 = Swarm::with_async_std_executor(trans, Behaviour::new(cfg.clone()), peer1_id); let (peer2_id, trans) = mk_transport(muxer); - let mut swarm2 = Swarm::new(trans, Behaviour::new(cfg), peer2_id); + let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::new(cfg), peer2_id); let (mut tx, mut rx) = mpsc::channel::(1); @@ -198,10 +198,10 @@ fn max_failures() { #[test] fn unsupported_doesnt_fail() { let (peer1_id, trans) = mk_transport(MuxerChoice::Mplex); - let mut swarm1 = Swarm::new(trans, keep_alive::Behaviour, peer1_id); + let mut swarm1 = Swarm::with_async_std_executor(trans, keep_alive::Behaviour, peer1_id); let (peer2_id, trans) = mk_transport(MuxerChoice::Mplex); - let mut swarm2 = Swarm::new(trans, Behaviour::default(), peer2_id); + let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::default(), peer2_id); let (mut tx, mut rx) = mpsc::channel::(1); diff --git a/protocols/relay/examples/relay_v2.rs b/protocols/relay/examples/relay_v2.rs index 95027937d12..e73ba48662c 100644 --- a/protocols/relay/examples/relay_v2.rs +++ b/protocols/relay/examples/relay_v2.rs @@ -66,7 +66,7 @@ fn main() -> Result<(), Box> { )), }; - let mut swarm = Swarm::new(transport, behaviour, local_peer_id); + let mut swarm = Swarm::without_executor(transport, behaviour, local_peer_id); // Listen on all interfaces let listen_addr = Multiaddr::empty() diff --git a/protocols/relay/tests/v2.rs b/protocols/relay/tests/v2.rs index e377a9249f5..60a8ce33401 100644 --- a/protocols/relay/tests/v2.rs +++ b/protocols/relay/tests/v2.rs @@ -291,7 +291,7 @@ fn build_relay() -> Swarm { let transport = upgrade_transport(MemoryTransport::default().boxed(), local_public_key); - Swarm::new( + Swarm::with_threadpool_executor( transport, Relay { ping: ping::Behaviour::new(ping::Config::new()), @@ -318,7 +318,7 @@ fn build_client() -> Swarm { local_public_key, ); - Swarm::new( + Swarm::with_threadpool_executor( transport, Client { ping: ping::Behaviour::new(ping::Config::new()), diff --git a/protocols/rendezvous/examples/discover.rs b/protocols/rendezvous/examples/discover.rs index c14e114ee88..78a87a98e5a 100644 --- a/protocols/rendezvous/examples/discover.rs +++ b/protocols/rendezvous/examples/discover.rs @@ -25,7 +25,7 @@ use libp2p::multiaddr::Protocol; use libp2p::ping; use libp2p::swarm::{keep_alive, SwarmEvent}; use libp2p::Swarm; -use libp2p::{development_transport, rendezvous, Multiaddr}; +use libp2p::{tokio_development_transport, rendezvous, Multiaddr}; use std::time::Duration; use void::Void; @@ -41,8 +41,8 @@ async fn main() { .parse() .unwrap(); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), diff --git a/protocols/rendezvous/examples/register.rs b/protocols/rendezvous/examples/register.rs index 3fbfa02785d..72144fdb797 100644 --- a/protocols/rendezvous/examples/register.rs +++ b/protocols/rendezvous/examples/register.rs @@ -23,7 +23,7 @@ use libp2p::core::identity; use libp2p::core::PeerId; use libp2p::ping; use libp2p::swarm::{Swarm, SwarmEvent}; -use libp2p::{development_transport, rendezvous}; +use libp2p::{tokio_development_transport, rendezvous}; use libp2p::{Multiaddr, NetworkBehaviour}; use libp2p_swarm::AddressScore; use std::time::Duration; @@ -39,8 +39,8 @@ async fn main() { let identity = identity::Keypair::generate_ed25519(); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), diff --git a/protocols/rendezvous/examples/register_with_identify.rs b/protocols/rendezvous/examples/register_with_identify.rs index f12a1a6ed98..7fef3e3ae6d 100644 --- a/protocols/rendezvous/examples/register_with_identify.rs +++ b/protocols/rendezvous/examples/register_with_identify.rs @@ -24,7 +24,7 @@ use libp2p::core::PeerId; use libp2p::identify; use libp2p::ping; use libp2p::swarm::{keep_alive, Swarm, SwarmEvent}; -use libp2p::{development_transport, rendezvous}; +use libp2p::{tokio_development_transport, rendezvous}; use libp2p::{Multiaddr, NetworkBehaviour}; use std::time::Duration; use void::Void; @@ -40,8 +40,8 @@ async fn main() { let identity = identity::Keypair::generate_ed25519(); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { identify: identify::Behaviour::new(identify::Config::new( "rendezvous-example/1.0.0".to_string(), diff --git a/protocols/rendezvous/examples/rendezvous_point.rs b/protocols/rendezvous/examples/rendezvous_point.rs index 980a3a6fd5d..15c27af1fb9 100644 --- a/protocols/rendezvous/examples/rendezvous_point.rs +++ b/protocols/rendezvous/examples/rendezvous_point.rs @@ -25,7 +25,7 @@ use libp2p::identify; use libp2p::ping; use libp2p::swarm::{keep_alive, Swarm, SwarmEvent}; use libp2p::NetworkBehaviour; -use libp2p::{development_transport, rendezvous}; +use libp2p::{tokio_development_transport, rendezvous}; use void::Void; /// Examples for the rendezvous protocol: @@ -44,8 +44,8 @@ async fn main() { let key = identity::ed25519::SecretKey::from_bytes(bytes).expect("we always pass 32 bytes"); let identity = identity::Keypair::Ed25519(key.into()); - let mut swarm = Swarm::new( - development_transport(identity.clone()).await.unwrap(), + let mut swarm = Swarm::with_tokio_executor( + tokio_development_transport(identity.clone()).unwrap(), MyBehaviour { identify: identify::Behaviour::new(identify::Config::new( "rendezvous-example/1.0.0".to_string(), diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 77f8efd8fec..e97b725d4af 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -48,7 +48,7 @@ fn is_response_outbound() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let request_id1 = swarm1 .behaviour_mut() @@ -87,11 +87,11 @@ fn ping_protocol() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); + let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let (mut tx, mut rx) = mpsc::channel::(1); @@ -176,11 +176,11 @@ fn emits_inbound_connection_closed_failure() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); + let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); swarm1.listen_on(addr).unwrap(); @@ -245,11 +245,11 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() { let (peer1_id, trans) = mk_transport(); let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); + let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); + let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); swarm1.listen_on(addr).unwrap(); diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index a39a3e300cb..ae3f8a913c0 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1684,6 +1684,7 @@ fn p2p_addr(peer: Option, addr: Multiaddr) -> Result SwarmBuilder::with_executor(transport, behaviour, local_public_key.into(), Box::new(tp)), + None => SwarmBuilder::without_executor(transport, behaviour, local_public_key.into()), + } } fn swarms_connected( diff --git a/transports/tls/tests/smoke.rs b/transports/tls/tests/smoke.rs index 1def8717e01..d30753b8fb5 100644 --- a/transports/tls/tests/smoke.rs +++ b/transports/tls/tests/smoke.rs @@ -65,7 +65,7 @@ fn make_swarm() -> Swarm { .multiplex(libp2p::yamux::YamuxConfig::default()) .boxed(); - Swarm::new( + Swarm::without_executor( transport, keep_alive::Behaviour, identity.public().to_peer_id(), From 20c5a09cb987ac37e369f33e3b9bbcc598b21705 Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 13:55:14 +0100 Subject: [PATCH 16/28] Update swarm/src/lib.rs Co-authored-by: Thomas Eizinger --- swarm/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index ae3f8a913c0..18c760af4b4 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1515,7 +1515,6 @@ where } /// Builds a `Swarm` with the current configuration. - /// pub fn build(mut self) -> Swarm { let supported_protocols = self .behaviour From eb7836d88ae70f2bd3e645fa23d816853d19cd57 Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 13:56:50 +0100 Subject: [PATCH 17/28] Update swarm/src/lib.rs Co-authored-by: Thomas Eizinger --- swarm/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 18c760af4b4..599097d6cce 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -309,7 +309,7 @@ where /// Builds a new `Swarm`. #[deprecated( since = "0.50.0", - note = "This constructor is considered ambiguous regarding the executor. Use `Swarm::with_threadpool_executor` for the same behaviour." + note = "This constructor is considered ambiguous regarding the executor. Use one of the new, executor-specific constructors or `Swarm::with_threadpool_executor` for the same behaviour." )] pub fn new( transport: transport::Boxed<(PeerId, StreamMuxerBox)>, From 361ec51f4be65c08163f8d0ad72d46d28c3eaebe Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 13:57:08 +0100 Subject: [PATCH 18/28] Update swarm/src/lib.rs Co-authored-by: Thomas Eizinger --- swarm/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 599097d6cce..81aba2e30f2 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -336,13 +336,12 @@ where behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { - SwarmBuilder::with_executor( + Self::with_executor( transport, behaviour, local_peer_id, Box::new(crate::connection::pool::executor::TokioExecutor), ) - .build() } /// Builds a new `Swarm` with an async-std executor. From da955ba0ba94fd46cd1fbfaeafbc8392b430863e Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 14:51:33 +0100 Subject: [PATCH 19/28] Implemented more ergonomic api --- misc/multistream-select/tests/transport.rs | 6 ++- protocols/dcutr/examples/dcutr.rs | 6 +-- protocols/ping/tests/ping.rs | 6 ++- protocols/rendezvous/examples/discover.rs | 2 +- protocols/rendezvous/examples/register.rs | 2 +- .../examples/register_with_identify.rs | 2 +- .../rendezvous/examples/rendezvous_point.rs | 2 +- swarm/src/lib.rs | 41 +++++++++---------- 8 files changed, 35 insertions(+), 32 deletions(-) diff --git a/misc/multistream-select/tests/transport.rs b/misc/multistream-select/tests/transport.rs index 4c1a822b0c6..a66d20eadd5 100644 --- a/misc/multistream-select/tests/transport.rs +++ b/misc/multistream-select/tests/transport.rs @@ -61,8 +61,10 @@ fn transport_upgrade() { let listen_addr = Multiaddr::from(Protocol::Memory(random::())); - let mut dialer = Swarm::with_async_std_executor(dialer_transport, dummy::Behaviour, dialer_id); - let mut listener = Swarm::with_async_std_executor(listener_transport, dummy::Behaviour, listener_id); + let mut dialer = + Swarm::with_async_std_executor(dialer_transport, dummy::Behaviour, dialer_id); + let mut listener = + Swarm::with_async_std_executor(listener_transport, dummy::Behaviour, listener_id); listener.listen_on(listen_addr).unwrap(); let (addr_sender, addr_receiver) = oneshot::channel(); diff --git a/protocols/dcutr/examples/dcutr.rs b/protocols/dcutr/examples/dcutr.rs index 9d68b9a254c..82144d611af 100644 --- a/protocols/dcutr/examples/dcutr.rs +++ b/protocols/dcutr/examples/dcutr.rs @@ -156,11 +156,11 @@ fn main() -> Result<(), Box> { }; let mut swarm = match ThreadPool::new() { - Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, Box::new(tp)), + Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, tp), Err(_) => SwarmBuilder::without_executor(transport, behaviour, local_peer_id), } - .dial_concurrency_factor(10_u8.try_into().unwrap()) - .build(); + .dial_concurrency_factor(10_u8.try_into().unwrap()) + .build(); swarm .listen_on( diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 8ffb78f2b54..abcfb69b90c 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -44,7 +44,8 @@ fn ping_pong() { let cfg = ping::Config::new().with_interval(Duration::from_millis(10)); let (peer1_id, trans) = mk_transport(muxer); - let mut swarm1 = Swarm::with_async_std_executor(trans, Behaviour::new(cfg.clone()), peer1_id); + let mut swarm1 = + Swarm::with_async_std_executor(trans, Behaviour::new(cfg.clone()), peer1_id); let (peer2_id, trans) = mk_transport(muxer); let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::new(cfg), peer2_id); @@ -128,7 +129,8 @@ fn max_failures() { .with_max_failures(max_failures.into()); let (peer1_id, trans) = mk_transport(muxer); - let mut swarm1 = Swarm::with_async_std_executor(trans, Behaviour::new(cfg.clone()), peer1_id); + let mut swarm1 = + Swarm::with_async_std_executor(trans, Behaviour::new(cfg.clone()), peer1_id); let (peer2_id, trans) = mk_transport(muxer); let mut swarm2 = Swarm::with_async_std_executor(trans, Behaviour::new(cfg), peer2_id); diff --git a/protocols/rendezvous/examples/discover.rs b/protocols/rendezvous/examples/discover.rs index 78a87a98e5a..40f5e7f39a4 100644 --- a/protocols/rendezvous/examples/discover.rs +++ b/protocols/rendezvous/examples/discover.rs @@ -25,7 +25,7 @@ use libp2p::multiaddr::Protocol; use libp2p::ping; use libp2p::swarm::{keep_alive, SwarmEvent}; use libp2p::Swarm; -use libp2p::{tokio_development_transport, rendezvous, Multiaddr}; +use libp2p::{rendezvous, tokio_development_transport, Multiaddr}; use std::time::Duration; use void::Void; diff --git a/protocols/rendezvous/examples/register.rs b/protocols/rendezvous/examples/register.rs index 72144fdb797..0624d2a8147 100644 --- a/protocols/rendezvous/examples/register.rs +++ b/protocols/rendezvous/examples/register.rs @@ -23,7 +23,7 @@ use libp2p::core::identity; use libp2p::core::PeerId; use libp2p::ping; use libp2p::swarm::{Swarm, SwarmEvent}; -use libp2p::{tokio_development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use libp2p::{Multiaddr, NetworkBehaviour}; use libp2p_swarm::AddressScore; use std::time::Duration; diff --git a/protocols/rendezvous/examples/register_with_identify.rs b/protocols/rendezvous/examples/register_with_identify.rs index 7fef3e3ae6d..8c240535c8f 100644 --- a/protocols/rendezvous/examples/register_with_identify.rs +++ b/protocols/rendezvous/examples/register_with_identify.rs @@ -24,7 +24,7 @@ use libp2p::core::PeerId; use libp2p::identify; use libp2p::ping; use libp2p::swarm::{keep_alive, Swarm, SwarmEvent}; -use libp2p::{tokio_development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use libp2p::{Multiaddr, NetworkBehaviour}; use std::time::Duration; use void::Void; diff --git a/protocols/rendezvous/examples/rendezvous_point.rs b/protocols/rendezvous/examples/rendezvous_point.rs index 15c27af1fb9..2592da0a1e3 100644 --- a/protocols/rendezvous/examples/rendezvous_point.rs +++ b/protocols/rendezvous/examples/rendezvous_point.rs @@ -25,7 +25,7 @@ use libp2p::identify; use libp2p::ping; use libp2p::swarm::{keep_alive, Swarm, SwarmEvent}; use libp2p::NetworkBehaviour; -use libp2p::{tokio_development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use void::Void; /// Examples for the rendezvous protocol: diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 81aba2e30f2..f8cd12594e0 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -324,7 +324,7 @@ where transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, local_peer_id: PeerId, - executor: Box, + executor: impl Executor + Send + 'static, ) -> Self { SwarmBuilder::with_executor(transport, behaviour, local_peer_id, executor).build() } @@ -340,7 +340,7 @@ where transport, behaviour, local_peer_id, - Box::new(crate::connection::pool::executor::TokioExecutor), + crate::connection::pool::executor::TokioExecutor, ) } @@ -355,7 +355,7 @@ where transport, behaviour, local_peer_id, - Box::new(crate::connection::pool::executor::AsyncStdExecutor), + crate::connection::pool::executor::AsyncStdExecutor, ) .build() } @@ -370,9 +370,7 @@ where .name_prefix("libp2p-swarm-task-") .create() { - Ok(tp) => { - SwarmBuilder::with_executor(transport, behaviour, local_peer_id, Box::new(tp)) - } + Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, tp), Err(err) => { log::warn!("Failed to create executor thread pool: {:?}", err); SwarmBuilder::without_executor(transport, behaviour, local_peer_id) @@ -392,8 +390,7 @@ where behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { - SwarmBuilder::without_executor(transport, behaviour, local_peer_id) - .build() + SwarmBuilder::without_executor(transport, behaviour, local_peer_id).build() } /// Returns information about the connections underlying the [`Swarm`]. @@ -1363,11 +1360,14 @@ where behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { - let executor: Option> = match ThreadPoolBuilder::new().name_prefix("libp2p-swarm-task-") - .create().ok() { - Some(tp) => Some(Box::new(tp)), - None => None, - }; + let executor: Option> = match ThreadPoolBuilder::new() + .name_prefix("libp2p-swarm-task-") + .create() + .ok() + { + Some(tp) => Some(Box::new(tp)), + None => None, + }; SwarmBuilder { local_peer_id, transport, @@ -1384,13 +1384,13 @@ where transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, local_peer_id: PeerId, - executor: Box, + executor: impl Executor + Send + 'static, ) -> Self { Self { local_peer_id, transport, behaviour, - pool_config: PoolConfig::new(Some(executor)), + pool_config: PoolConfig::new(Some(Box::new(executor))), connection_limits: Default::default(), } } @@ -1421,10 +1421,7 @@ where /// By default, unless another executor has been configured, /// [`SwarmBuilder::build`] will try to set up a /// [`ThreadPool`](futures::executor::ThreadPool). - #[deprecated( - since = "0.50.0", - note = "Use `SwarmBuilder::with_executor` instead." - )] + #[deprecated(since = "0.50.0", note = "Use `SwarmBuilder::with_executor` instead.")] pub fn executor(mut self, executor: Box) -> Self { self.pool_config = self.pool_config.with_executor(executor); self @@ -1682,8 +1679,8 @@ fn p2p_addr(peer: Option, addr: Multiaddr) -> Result SwarmBuilder::with_executor(transport, behaviour, local_public_key.into(), Box::new(tp)), + Some(tp) => { + SwarmBuilder::with_executor(transport, behaviour, local_public_key.into(), tp) + } None => SwarmBuilder::without_executor(transport, behaviour, local_public_key.into()), } } From 7d8cbe343e252e6f8e001fca20132a5f71e25469 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 15:05:38 +0100 Subject: [PATCH 20/28] Move executor --- core/src/lib.rs | 27 --------------- swarm/src/connection/pool.rs | 2 -- swarm/src/connection/pool/executor.rs | 24 -------------- swarm/src/executor.rs | 48 +++++++++++++++++++++++++++ swarm/src/lib.rs | 11 +++--- 5 files changed, 54 insertions(+), 58 deletions(-) delete mode 100644 swarm/src/connection/pool/executor.rs create mode 100644 swarm/src/executor.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index ffcdac33515..5a3640a0b07 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -51,7 +51,6 @@ mod peer_record_proto { include!(concat!(env!("OUT_DIR"), "/peer_record_proto.rs")); } -use futures::executor::ThreadPool; /// Multi-address re-export. pub use multiaddr; pub type Negotiated = multistream_select::Negotiated; @@ -83,29 +82,3 @@ pub use upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError, U #[derive(thiserror::Error, Debug)] #[error(transparent)] pub struct DecodeError(prost::DecodeError); - -use std::{future::Future, pin::Pin}; - -/// Implemented on objects that can run a `Future` in the background. -/// -/// > **Note**: While it may be tempting to implement this trait on types such as -/// > [`futures::stream::FuturesUnordered`], please note that passing an `Executor` is -/// > optional, and that `FuturesUnordered` (or a similar struct) will automatically -/// > be used as fallback by libp2p. The `Executor` trait should therefore only be -/// > about running `Future`s in the background. -pub trait Executor { - /// Run the given future in the background until it ends. - fn exec(&self, future: Pin + Send>>); -} - -impl + Send>>)> Executor for F { - fn exec(&self, f: Pin + Send>>) { - self(f) - } -} - -impl Executor for ThreadPool { - fn exec(&self, future: Pin + Send>>) { - self.spawn_ok(future) - } -} diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index c6e9fdda57c..ef8bd6942ce 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -52,8 +52,6 @@ use std::{ use void::Void; mod concurrent_dial; -#[cfg(any(feature = "tokio", feature = "async-std"))] -pub mod executor; mod task; enum ExecSwitch { diff --git a/swarm/src/connection/pool/executor.rs b/swarm/src/connection/pool/executor.rs deleted file mode 100644 index 6e955fadbdc..00000000000 --- a/swarm/src/connection/pool/executor.rs +++ /dev/null @@ -1,24 +0,0 @@ -use libp2p_core::Executor; -use std::{future::Future, pin::Pin}; - -#[cfg(feature = "tokio")] -#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct TokioExecutor; - -#[cfg(feature = "tokio")] -impl Executor for TokioExecutor { - fn exec(&self, future: Pin + Send>>) { - let _ = tokio::spawn(future); - } -} - -#[cfg(feature = "async-std")] -#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct AsyncStdExecutor; - -#[cfg(feature = "async-std")] -impl Executor for AsyncStdExecutor { - fn exec(&self, future: Pin + Send>>) { - let _ = async_std::task::spawn(future); - } -} diff --git a/swarm/src/executor.rs b/swarm/src/executor.rs new file mode 100644 index 00000000000..ca8337b6400 --- /dev/null +++ b/swarm/src/executor.rs @@ -0,0 +1,48 @@ +use futures::executor::ThreadPool; +use std::{future::Future, pin::Pin}; + +/// Implemented on objects that can run a `Future` in the background. +/// +/// > **Note**: While it may be tempting to implement this trait on types such as +/// > [`futures::stream::FuturesUnordered`], please note that passing an `Executor` is +/// > optional, and that `FuturesUnordered` (or a similar struct) will automatically +/// > be used as fallback by libp2p. The `Executor` trait should therefore only be +/// > about running `Future`s in the background. +pub trait Executor { + /// Run the given future in the background until it ends. + fn exec(&self, future: Pin + Send>>); +} + +impl + Send>>)> Executor for F { + fn exec(&self, f: Pin + Send>>) { + self(f) + } +} + +impl Executor for ThreadPool { + fn exec(&self, future: Pin + Send>>) { + self.spawn_ok(future) + } +} + +#[cfg(feature = "tokio")] +#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct TokioExecutor; + +#[cfg(feature = "tokio")] +impl Executor for TokioExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = tokio::spawn(future); + } +} + +#[cfg(feature = "async-std")] +#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct AsyncStdExecutor; + +#[cfg(feature = "async-std")] +impl Executor for AsyncStdExecutor { + fn exec(&self, future: Pin + Send>>) { + let _ = async_std::task::spawn(future); + } +} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index f8cd12594e0..286b9b43a0a 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -64,6 +64,7 @@ mod upgrade; pub mod behaviour; pub mod dial_opts; pub mod dummy; +pub mod executor; pub mod handler; pub mod keep_alive; @@ -86,6 +87,7 @@ use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent}; use connection::IncomingInfo; use dial_opts::{DialOpts, PeerCondition}; use either::Either; +use executor::Executor; use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; use libp2p_core::connection::ConnectionId; use libp2p_core::muxing::SubstreamBox; @@ -96,7 +98,7 @@ use libp2p_core::{ muxing::StreamMuxerBox, transport::{self, ListenerId, TransportError, TransportEvent}, upgrade::ProtocolName, - Endpoint, Executor, Multiaddr, Negotiated, PeerId, Transport, + Endpoint, Multiaddr, Negotiated, PeerId, Transport, }; use registry::{AddressIntoIter, Addresses}; use smallvec::SmallVec; @@ -340,7 +342,7 @@ where transport, behaviour, local_peer_id, - crate::connection::pool::executor::TokioExecutor, + crate::executor::TokioExecutor, ) } @@ -351,13 +353,12 @@ where behaviour: TBehaviour, local_peer_id: PeerId, ) -> Self { - SwarmBuilder::with_executor( + Self::with_executor( transport, behaviour, local_peer_id, - crate::connection::pool::executor::AsyncStdExecutor, + crate::executor::AsyncStdExecutor, ) - .build() } /// Builds a new `Swarm` with a threadpool executor. From f0493ed1cb1073e950fb2468d4cd9b40e12d7286 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 15:17:23 +0100 Subject: [PATCH 21/28] Correct bad merge --- examples/file-sharing.rs | 3 +-- swarm/Cargo.toml | 6 ++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index e0b549bd973..f2a7b489f38 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -2,8 +2,7 @@ // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), -// to deal in the Softwa -without restriction, including without limitation +// to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 7b997069a28..a6f9a91a5ca 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -29,6 +29,8 @@ async-std = { version = "1.6.2", optional = true } [features] macros = ["dep:libp2p-swarm-derive"] +tokio = ["dep:tokio"] +async-std = ["dep:async-std"] [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } @@ -48,10 +50,6 @@ void = "1" name = "swarm_derive" required-features = ["macros"] -[features] -tokio = ["dep:tokio"] -async-std = ["dep:async-std"] - # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] From 91676a94bf84bf255538b260039b65578904e6dc Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 15:18:57 +0100 Subject: [PATCH 22/28] Do fmt --- examples/file-sharing.rs | 4 +--- protocols/rendezvous/examples/register_with_identify.rs | 2 +- protocols/rendezvous/examples/rendezvous_point.rs | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index f2a7b489f38..620ce6cd5d9 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -219,9 +219,7 @@ mod network { ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; - use libp2p::swarm::{ - ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent, - }; + use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent}; use std::collections::{hash_map, HashMap, HashSet}; use std::iter; diff --git a/protocols/rendezvous/examples/register_with_identify.rs b/protocols/rendezvous/examples/register_with_identify.rs index ab7740fedb3..06844be1281 100644 --- a/protocols/rendezvous/examples/register_with_identify.rs +++ b/protocols/rendezvous/examples/register_with_identify.rs @@ -25,7 +25,7 @@ use libp2p::identify; use libp2p::ping; use libp2p::swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::Multiaddr; -use libp2p::{tokio_development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use std::time::Duration; use void::Void; diff --git a/protocols/rendezvous/examples/rendezvous_point.rs b/protocols/rendezvous/examples/rendezvous_point.rs index 7dc45172f10..1e98f73d6da 100644 --- a/protocols/rendezvous/examples/rendezvous_point.rs +++ b/protocols/rendezvous/examples/rendezvous_point.rs @@ -24,7 +24,7 @@ use libp2p::core::PeerId; use libp2p::identify; use libp2p::ping; use libp2p::swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::{tokio_development_transport, rendezvous}; +use libp2p::{rendezvous, tokio_development_transport}; use void::Void; /// Examples for the rendezvous protocol: From d99b802e4f04768a4d34a63dad2ba22bd7a3a9ea Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 15:44:05 +0100 Subject: [PATCH 23/28] Added changelog entries --- core/CHANGELOG.md | 3 +++ swarm/CHANGELOG.md | 61 ++++++++++++++++++++++++++++++++++++++++++++++ swarm/src/lib.rs | 6 ++--- 3 files changed, 67 insertions(+), 3 deletions(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index d7059cab98f..a3d9b9f4e38 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -4,8 +4,11 @@ - Hide `prost::Error` from public API in `FromEnvelopeError::InvalidPeerRecord` and `signed_envelope::DecodingError`. See [PR 3058]. +- Moved `Executor` to `libp2p-swarm`. See [PR 3097]. + [PR 3031]: https://github.com/libp2p/rust-libp2p/pull/3031 [PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058 +[PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097 # 0.37.0 diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index f66828db64a..cea3c7de386 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -5,9 +5,70 @@ - Export `NetworkBehaviour` derive as `libp2p_swarm::NetworkBehaviour`. This follows the convention of other popular libraries. `serde` for example exports the `Serialize` trait and macro as `serde::Serialize`. See [PR 3055]. + - Feature-gate `NetworkBehaviour` macro behind `macros` feature flag. See [PR 3055]. +- Made Swarm executor aware. See [PR 3097] for details. + + Supported executors: + - Tokio + + Previously + ```rust + let swarm = SwarmBuilder::new(transport, behaviour, peer_id) + .executor(Box::new(|fut| { + tokio::spawn(fut); + })) + .build(); + ``` + Now + ```rust + let swarm = Swarm::with_tokio_executor(transport, behaviour, peer_id); + ``` + - Async Std + + Previously + ```rust + let swarm = SwarmBuilder::new(transport, behaviour, peer_id) + .executor(Box::new(|fut| { + async_std::task::spawn(fut); + })) + .build(); + ``` + Now + ```rust + let swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); + ``` + - ThreadPool (see [Issue 3107]) + + In most cases ThreadPool can be replaced by executors or spawning on the local task. + + Previously + ```rust + let swarm = Swarm::new(transport, behaviour, peer_id); + ``` + + Now + ``` + let swarm = Swarm::with_threadpool_executor(transport, behaviour, peer_id); + ``` + - Without + + Spawns the tasks on the current task, this results in bad performance so try to use an executor if possible. Previously this was just a fallback for ThreadPool. + + New + ```rust + let swarm = Swarm::without_executor(transport, behaviour, peer_id); + ``` + + Deprecated APIs: + - `Swarm::new` + - `SwarmBuilder::new` + - `SwarmBuilder::executor` + [PR 3055]: https://github.com/libp2p/rust-libp2p/pull/3055 +[PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097 +[Issue 3107]: https://github.com/libp2p/rust-libp2p/issues/3107 # 0.40.1 diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index ab4a2beb94f..0b8f6ac24dd 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -331,7 +331,7 @@ where { /// Builds a new `Swarm`. #[deprecated( - since = "0.50.0", + since = "0.41.0", note = "This constructor is considered ambiguous regarding the executor. Use one of the new, executor-specific constructors or `Swarm::with_threadpool_executor` for the same behaviour." )] pub fn new( @@ -1374,7 +1374,7 @@ where /// local peer ID. The `Swarm` with its underlying `Network` is obtained /// via [`SwarmBuilder::build`]. #[deprecated( - since = "0.50.0", + since = "0.41.0", note = "Use `SwarmBuilder::with_executor` or `SwarmBuilder::without_executor` instead." )] pub fn new( @@ -1443,7 +1443,7 @@ where /// By default, unless another executor has been configured, /// [`SwarmBuilder::build`] will try to set up a /// [`ThreadPool`](futures::executor::ThreadPool). - #[deprecated(since = "0.50.0", note = "Use `SwarmBuilder::with_executor` instead.")] + #[deprecated(since = "0.41.0", note = "Use `SwarmBuilder::with_executor` instead.")] pub fn executor(mut self, executor: Box) -> Self { self.pool_config = self.pool_config.with_executor(executor); self From 69fa67185683ec633f6cc66dcbf0412a950a0c60 Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 23:30:39 +0100 Subject: [PATCH 24/28] Apply suggestions from code review Co-authored-by: Thomas Eizinger --- core/CHANGELOG.md | 2 +- swarm/CHANGELOG.md | 6 +++--- swarm/src/connection/pool.rs | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index a3d9b9f4e38..c3831a3b3a9 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -4,7 +4,7 @@ - Hide `prost::Error` from public API in `FromEnvelopeError::InvalidPeerRecord` and `signed_envelope::DecodingError`. See [PR 3058]. -- Moved `Executor` to `libp2p-swarm`. See [PR 3097]. +- Move `Executor` to `libp2p-swarm`. See [PR 3097]. [PR 3031]: https://github.com/libp2p/rust-libp2p/pull/3031 [PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058 diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index cea3c7de386..f04da3da77b 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -8,7 +8,7 @@ - Feature-gate `NetworkBehaviour` macro behind `macros` feature flag. See [PR 3055]. -- Made Swarm executor aware. See [PR 3097] for details. +- Make executor in Swarm constructor explicit. See [PR 3097]. Supported executors: - Tokio @@ -49,12 +49,12 @@ ``` Now - ``` + ```rust let swarm = Swarm::with_threadpool_executor(transport, behaviour, peer_id); ``` - Without - Spawns the tasks on the current task, this results in bad performance so try to use an executor if possible. Previously this was just a fallback for ThreadPool. + Spawns the tasks on the current task, this may result in bad performance so try to use an executor where possible. Previously this was just a fallback when no executor was specified and constructing a `ThreadPool` failed. New ```rust diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index ef8bd6942ce..caf422b716d 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -60,7 +60,6 @@ enum ExecSwitch { } impl ExecSwitch { - // advance the local queue #[inline] fn advance_local(&mut self, cx: &mut Context) { match self { From f27fdf8afdb644874c309ef10144903ac78fd7f5 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Sun, 13 Nov 2022 23:40:36 +0100 Subject: [PATCH 25/28] Implement small changes --- swarm/src/connection/pool.rs | 10 ---------- swarm/src/lib.rs | 4 ++-- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index caf422b716d..ee41ad3ea16 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -1181,14 +1181,4 @@ mod tests { impl Executor for Dummy { fn exec(&self, _: Pin + Send>>) {} } - - // TODO: This test has to be redesigned. - #[test] - fn set_executor() { - PoolConfig::new(None) - .with_executor(Box::new(Dummy)) - .with_executor(Box::new(|f| { - async_std::task::spawn(f); - })); - } } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 0b8f6ac24dd..8660ab8b17a 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -64,7 +64,7 @@ mod upgrade; pub mod behaviour; pub mod dial_opts; pub mod dummy; -pub mod executor; +mod executor; pub mod handler; pub mod keep_alive; @@ -95,6 +95,7 @@ pub use connection::{ ConnectionError, ConnectionLimit, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }; +pub use executor::Executor; pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr, IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler, @@ -108,7 +109,6 @@ use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent}; use connection::IncomingInfo; use dial_opts::{DialOpts, PeerCondition}; use either::Either; -use executor::Executor; use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; use libp2p_core::connection::ConnectionId; use libp2p_core::muxing::SubstreamBox; From f61686d549955763c15289f72394519ed0c35971 Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Mon, 14 Nov 2022 14:09:31 +0100 Subject: [PATCH 26/28] Apply suggestions from code review Co-authored-by: Max Inden --- swarm/src/executor.rs | 2 +- swarm/src/lib.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/swarm/src/executor.rs b/swarm/src/executor.rs index ca8337b6400..d5bbe9e4d00 100644 --- a/swarm/src/executor.rs +++ b/swarm/src/executor.rs @@ -7,7 +7,7 @@ use std::{future::Future, pin::Pin}; /// > [`futures::stream::FuturesUnordered`], please note that passing an `Executor` is /// > optional, and that `FuturesUnordered` (or a similar struct) will automatically /// > be used as fallback by libp2p. The `Executor` trait should therefore only be -/// > about running `Future`s in the background. +/// > about running `Future`s on a separate task. pub trait Executor { /// Run the given future in the background until it ends. fn exec(&self, future: Pin + Send>>); diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 8660ab8b17a..6fc1b8707c3 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1399,7 +1399,7 @@ where } } - /// Creates a new `SwarmBuilder` from the given transport, behaviour, local peer ID and + /// Creates a new [`SwarmBuilder`] from the given transport, behaviour, local peer ID and /// executor. The `Swarm` with its underlying `Network` is obtained via /// [`SwarmBuilder::build`]. pub fn with_executor( @@ -1417,7 +1417,7 @@ where } } - /// Creates a new `SwarmBuilder` from the given transport, behaviour and local peer ID. The + /// Creates a new [`SwarmBuilder`] from the given transport, behaviour and local peer ID. The /// `Swarm` with its underlying `Network` is obtained via [`SwarmBuilder::build`]. /// /// ## ⚠️ Performance warning From 30c50200a15bc532b76b7825695ee5d564d01119 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Mon, 14 Nov 2022 14:15:32 +0100 Subject: [PATCH 27/28] Addressed Max's concerns. --- swarm/src/connection/pool.rs | 1 - swarm/src/executor.rs | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index ee41ad3ea16..5c2c069e11d 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -840,7 +840,6 @@ where } } - // Advance the tasks in `local_spawns`. self.executor.advance_local(cx); Poll::Pending diff --git a/swarm/src/executor.rs b/swarm/src/executor.rs index d5bbe9e4d00..7799d141d49 100644 --- a/swarm/src/executor.rs +++ b/swarm/src/executor.rs @@ -26,7 +26,7 @@ impl Executor for ThreadPool { } #[cfg(feature = "tokio")] -#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Default, Debug, Clone, Copy)] pub(crate) struct TokioExecutor; #[cfg(feature = "tokio")] @@ -37,7 +37,7 @@ impl Executor for TokioExecutor { } #[cfg(feature = "async-std")] -#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Default, Debug, Clone, Copy)] pub(crate) struct AsyncStdExecutor; #[cfg(feature = "async-std")] From 2a87f5b69dbd9e4ab7015dc376eb1e0f0a3f5539 Mon Sep 17 00:00:00 2001 From: umgefahren <55623006+umgefahren@users.noreply.github.com> Date: Mon, 14 Nov 2022 14:44:36 +0100 Subject: [PATCH 28/28] Removed deprecated APIs --- swarm/src/connection/pool.rs | 6 ---- swarm/src/lib.rs | 53 ------------------------------------ 2 files changed, 59 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 5c2c069e11d..cb716c8d2ba 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -1104,12 +1104,6 @@ impl PoolConfig { } } - /// Configures the executor to use for spawning connection background tasks. - pub fn with_executor(mut self, executor: Box) -> Self { - self.executor = Some(executor); - self - } - /// Sets the maximum number of events sent to a connection's background task /// that may be buffered, if the task cannot keep up with their consumption and /// delivery to the connection handler. diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 6fc1b8707c3..c13896f304d 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -329,19 +329,6 @@ impl Swarm where TBehaviour: NetworkBehaviour, { - /// Builds a new `Swarm`. - #[deprecated( - since = "0.41.0", - note = "This constructor is considered ambiguous regarding the executor. Use one of the new, executor-specific constructors or `Swarm::with_threadpool_executor` for the same behaviour." - )] - pub fn new( - transport: transport::Boxed<(PeerId, StreamMuxerBox)>, - behaviour: TBehaviour, - local_peer_id: PeerId, - ) -> Self { - Self::with_threadpool_executor(transport, behaviour, local_peer_id) - } - /// Builds a new `Swarm` with a provided executor. pub fn with_executor( transport: transport::Boxed<(PeerId, StreamMuxerBox)>, @@ -1370,35 +1357,6 @@ impl SwarmBuilder where TBehaviour: NetworkBehaviour, { - /// Creates a new `SwarmBuilder` from the given transport, behaviour and - /// local peer ID. The `Swarm` with its underlying `Network` is obtained - /// via [`SwarmBuilder::build`]. - #[deprecated( - since = "0.41.0", - note = "Use `SwarmBuilder::with_executor` or `SwarmBuilder::without_executor` instead." - )] - pub fn new( - transport: transport::Boxed<(PeerId, StreamMuxerBox)>, - behaviour: TBehaviour, - local_peer_id: PeerId, - ) -> Self { - let executor: Option> = match ThreadPoolBuilder::new() - .name_prefix("libp2p-swarm-task-") - .create() - .ok() - { - Some(tp) => Some(Box::new(tp)), - None => None, - }; - SwarmBuilder { - local_peer_id, - transport, - behaviour, - pool_config: PoolConfig::new(executor), - connection_limits: Default::default(), - } - } - /// Creates a new [`SwarmBuilder`] from the given transport, behaviour, local peer ID and /// executor. The `Swarm` with its underlying `Network` is obtained via /// [`SwarmBuilder::build`]. @@ -1438,17 +1396,6 @@ where } } - /// Configures the `Executor` to use for spawning background tasks. - /// - /// By default, unless another executor has been configured, - /// [`SwarmBuilder::build`] will try to set up a - /// [`ThreadPool`](futures::executor::ThreadPool). - #[deprecated(since = "0.41.0", note = "Use `SwarmBuilder::with_executor` instead.")] - pub fn executor(mut self, executor: Box) -> Self { - self.pool_config = self.pool_config.with_executor(executor); - self - } - /// Configures the number of events from the [`NetworkBehaviour`] in /// destination to the [`ConnectionHandler`] that can be buffered before /// the [`Swarm`] has to wait. An individual buffer with this number of