From 9421b3da16e72e529b3cbc5767aec6db1d23d961 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Sat, 28 Dec 2024 15:02:00 -0500 Subject: [PATCH] chore: use async-rt in place of rt utils --- Cargo.lock | 13 ++ Cargo.toml | 2 + src/lib.rs | 25 ++-- src/p2p/mod.rs | 7 +- src/rt.rs | 359 ------------------------------------------------- 5 files changed, 28 insertions(+), 378 deletions(-) delete mode 100644 src/rt.rs diff --git a/Cargo.lock b/Cargo.lock index 23ca7b9e4..e345bf627 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -396,6 +396,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "async-rt" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7ec56063c15a7e614415d6daf5fd9ae0a29d6b8c024addd886521843b0bfda2" +dependencies = [ + "futures", + "parking_lot", + "tokio", + "wasm-bindgen-futures", +] + [[package]] name = "async-signal" version = "0.2.10" @@ -4485,6 +4497,7 @@ name = "rust-ipfs" version = "0.14.1" dependencies = [ "anyhow", + "async-rt", "async-stream", "async-trait", "asynchronous-codec", diff --git a/Cargo.toml b/Cargo.toml index cd0c45bde..44bb6e70e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ test_js_interop = [] [workspace.dependencies] anyhow = "1.0.94" +async-rt = "0.1.2" async-stream = { version = "0.3.6" } async-trait = { version = "0.1.82" } asynchronous-codec = "0.7.0" @@ -94,6 +95,7 @@ zeroize = "1.8.1" [dependencies] anyhow.workspace = true +async-rt.workspace = true async-stream.workspace = true async-trait.workspace = true asynchronous-codec.workspace = true diff --git a/src/lib.rs b/src/lib.rs index 541785e8a..0a97a749a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,7 +32,6 @@ pub mod p2p; pub mod path; pub mod refs; pub mod repo; -pub(crate) mod rt; mod task; pub mod unixfs; @@ -66,19 +65,20 @@ use repo::{ BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, RepoInsertPin, RepoRemovePin, }; -use rt::{AbortableJoinHandle, Executor, ExecutorSwitch}; use tracing::Span; use tracing_futures::Instrument; use unixfs::UnixfsGet; use unixfs::{AddOpt, IpfsUnixfs, UnixfsAdd, UnixfsCat, UnixfsLs}; +pub use self::p2p::gossipsub::SubscriptionStream; use self::{ dag::IpldDag, ipns::Ipns, p2p::{create_swarm, TSwarm}, repo::Repo, }; +use async_rt::AbortableJoinHandle; use ipld_core::cid::Cid; use ipld_core::ipld::Ipld; use std::borrow::Borrow; @@ -91,8 +91,6 @@ use std::{ time::Duration, }; -pub use self::p2p::gossipsub::SubscriptionStream; - pub use self::{ error::Error, p2p::BehaviourEvent, @@ -342,7 +340,6 @@ pub struct Ipfs { record_key_validator: HashMap anyhow::Result + Sync + Send>>, _guard: AbortableJoinHandle<()>, _gc_guard: AbortableJoinHandle<()>, - executor: ExecutorSwitch, } impl std::fmt::Debug for Ipfs { @@ -916,8 +913,6 @@ impl + Send> UninitializedIpfs { .. } = self; - let executor = ExecutorSwitch; - let keys = keys.unwrap_or(Keypair::generate_ed25519()); let root_span = Option::take(&mut options.span) @@ -999,7 +994,6 @@ impl + Send> UninitializedIpfs { record_key_validator, _guard, _gc_guard, - executor, }; //Note: If `All` or `Pinned` are used, we would have to auto adjust the amount of @@ -1043,7 +1037,6 @@ impl + Send> UninitializedIpfs { let swarm = create_swarm( &keys, &options, - executor, &ipfs.repo, exec_span, (custom_behaviour, custom_transport), @@ -1054,7 +1047,7 @@ impl + Send> UninitializedIpfs { } = options; let gc_handle = gc_config.map(|config| { - executor.spawn_abortable({ + async_rt::task::spawn_abortable({ let repo = ipfs.repo.clone(); async move { let GCConfig { duration, trigger } = config; @@ -1160,7 +1153,7 @@ impl + Send> UninitializedIpfs { } } - ipfs._guard.replace(executor.spawn_abortable({ + let main_handle = async_rt::task::spawn_abortable({ async move { //Note: For now this is not configurable as its meant for internal testing purposes but may change in the future let as_fut = false; @@ -1174,8 +1167,12 @@ impl + Send> UninitializedIpfs { fut.await } .instrument(swarm_span) - })); - ipfs._gc_guard.replace(gc_handle); + }); + + unsafe { + ipfs._guard.replace(main_handle); + ipfs._gc_guard.replace(gc_handle); + } Ok(ipfs) } } @@ -2547,7 +2544,7 @@ impl Ipfs { self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?; let fut = rx.await??; - self.executor.dispatch(async move { + async_rt::task::dispatch(async move { if let Err(e) = fut.await.map_err(|e| anyhow!(e)) { tracing::error!(error = %e, "failed to bootstrap"); } diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 0eb091bc8..b83c37394 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -1,7 +1,6 @@ //! P2P handling for IPFS nodes. use crate::error::Error; use crate::repo::Repo; -use crate::rt::{Executor, ExecutorSwitch}; use crate::{IpfsOptions, TTransportFn}; use std::convert::TryInto; use std::num::{NonZeroU8, NonZeroUsize}; @@ -227,7 +226,6 @@ impl Default for SwarmConfig { pub(crate) fn create_swarm( keypair: &Keypair, options: &IpfsOptions, - executor: ExecutorSwitch, repo: &Repo, span: Span, (custom, custom_transport): (Option, Option), @@ -256,7 +254,7 @@ where transport, behaviour, peer_id, - libp2p::swarm::Config::with_executor(SpannedExecutor { executor, span }) + libp2p::swarm::Config::with_executor(SpannedExecutor { span }) .with_notify_handler_buffer_size(swarm_config.notify_handler_buffer_size) .with_per_connection_event_buffer_size(swarm_config.connection_event_buffer_size) .with_dial_concurrency_factor(swarm_config.dial_concurrency_factor) @@ -268,7 +266,6 @@ where } struct SpannedExecutor { - executor: ExecutorSwitch, span: Span, } @@ -278,6 +275,6 @@ impl libp2p::swarm::Executor for SpannedExecutor { future: std::pin::Pin + 'static + Send>>, ) { use tracing_futures::Instrument; - self.executor.dispatch(future.instrument(self.span.clone())); + async_rt::task::dispatch(future.instrument(self.span.clone())); } } diff --git a/src/rt.rs b/src/rt.rs deleted file mode 100644 index a25b3dc6c..000000000 --- a/src/rt.rs +++ /dev/null @@ -1,359 +0,0 @@ -#[allow(unused_imports)] -use futures::future::Abortable; -use std::fmt::{Debug, Formatter}; - -use futures::future::{AbortHandle, Aborted}; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -pub struct JoinHandle { - inner: InnerJoinHandle, -} - -impl Debug for JoinHandle { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("JoinHandle").finish() - } -} - -enum InnerJoinHandle { - #[cfg(not(target_arch = "wasm32"))] - TokioHandle(tokio::task::JoinHandle), - #[allow(dead_code)] - CustomHandle { - inner: Option>>, - handle: AbortHandle, - }, - Empty, -} - -impl Default for InnerJoinHandle { - fn default() -> Self { - Self::Empty - } -} - -impl JoinHandle { - pub(crate) fn empty() -> Self { - JoinHandle { - inner: InnerJoinHandle::Empty, - } - } -} - -impl JoinHandle { - #[allow(dead_code)] - pub fn abort(&self) { - match self.inner { - #[cfg(not(target_arch = "wasm32"))] - InnerJoinHandle::TokioHandle(ref handle) => handle.abort(), - InnerJoinHandle::CustomHandle { ref handle, .. } => handle.abort(), - InnerJoinHandle::Empty => {} - } - } - - #[allow(dead_code)] - pub fn is_finished(&self) -> bool { - match self.inner { - #[cfg(not(target_arch = "wasm32"))] - InnerJoinHandle::TokioHandle(ref handle) => handle.is_finished(), - InnerJoinHandle::CustomHandle { - ref handle, - ref inner, - } => handle.is_aborted() || inner.is_none(), - InnerJoinHandle::Empty => true, - } - } - - #[allow(dead_code)] - pub(crate) fn replace(&mut self, mut handle: JoinHandle) { - self.inner = std::mem::take(&mut handle.inner); - } - - #[allow(dead_code)] - pub(crate) fn replace_mut(&mut self, handle: &mut JoinHandle) { - self.inner = std::mem::take(&mut handle.inner); - } -} - -impl Future for JoinHandle { - type Output = std::io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let inner = &mut self.inner; - match inner { - #[cfg(not(target_arch = "wasm32"))] - InnerJoinHandle::TokioHandle(handle) => { - let fut = futures::ready!(Pin::new(handle).poll(cx)); - - match fut { - Ok(val) => Poll::Ready(Ok(val)), - Err(e) => { - let e = std::io::Error::other(e); - Poll::Ready(Err(e)) - } - } - } - InnerJoinHandle::CustomHandle { inner, .. } => { - let Some(this) = inner.as_mut() else { - unreachable!("cannot poll completed future"); - }; - - let fut = futures::ready!(Pin::new(this).poll(cx)); - inner.take(); - - match fut { - Ok(Ok(val)) => Poll::Ready(Ok(val)), - Ok(Err(e)) => { - let e = std::io::Error::other(e); - Poll::Ready(Err(e)) - } - Err(e) => { - let e = std::io::Error::other(e); - Poll::Ready(Err(e)) - } - } - } - InnerJoinHandle::Empty => { - Poll::Ready(Err(std::io::Error::from(std::io::ErrorKind::Other))) - } - } - } -} - -#[derive(Clone)] -pub struct AbortableJoinHandle { - handle: Arc>, -} - -impl Debug for AbortableJoinHandle { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AbortableJoinHandle").finish() - } -} - -impl From> for AbortableJoinHandle { - fn from(handle: JoinHandle) -> Self { - AbortableJoinHandle { - handle: Arc::new(InnerHandle { - inner: parking_lot::Mutex::new(handle), - }), - } - } -} - -impl AbortableJoinHandle { - pub(crate) fn empty() -> Self { - Self { - handle: Arc::new(InnerHandle { - inner: parking_lot::Mutex::new(JoinHandle::empty()), - }), - } - } -} - -impl AbortableJoinHandle { - #[allow(dead_code)] - pub fn abort(&self) { - self.handle.inner.lock().abort(); - } - - #[allow(dead_code)] - pub fn is_finished(&self) -> bool { - self.handle.inner.lock().is_finished() - } - - pub(crate) fn replace(&mut self, inner: AbortableJoinHandle) { - let current_handle = &mut *self.handle.inner.lock(); - let inner_handle = &mut *inner.handle.inner.lock(); - current_handle.replace_mut(inner_handle); - } -} - -struct InnerHandle { - pub inner: parking_lot::Mutex>, -} - -impl Drop for InnerHandle { - fn drop(&mut self) { - self.inner.lock().abort(); - } -} - -impl Future for AbortableJoinHandle { - type Output = std::io::Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let inner = &mut *self.handle.inner.lock(); - Pin::new(inner).poll(cx).map_err(std::io::Error::other) - } -} - -pub trait Executor { - /// Spawns a new asynchronous task in the background, returning an Future ['JoinHandle'] for it. - fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static; - - /// Spawns a new asynchronous task in the background, returning an abortable handle that will cancel the task - /// once the handle is dropped. - /// - /// Note: This function is used if the task is expected to run until the handle is dropped. It is recommended to use - /// [`Executor::spawn`] or [`Executor::dispatch`] otherwise. - fn spawn_abortable(&self, future: F) -> AbortableJoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let handle = self.spawn(future); - handle.into() - } - - /// Spawns a new asynchronous task in the background without an handle. - /// Basically the same as [`Executor::spawn`]. - fn dispatch(&self, future: F) - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.spawn(future); - } -} - -#[cfg(not(target_arch = "wasm32"))] -#[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Eq)] -pub struct TokioExecutor; - -#[cfg(not(target_arch = "wasm32"))] -impl Executor for TokioExecutor { - fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let handle = tokio::task::spawn(future); - let inner = InnerJoinHandle::TokioHandle(handle); - JoinHandle { inner } - } -} - -#[cfg(target_arch = "wasm32")] -#[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Eq)] -pub struct WasmExecutor; - -#[cfg(target_arch = "wasm32")] -impl Executor for WasmExecutor { - fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - let future = Abortable::new(future, abort_registration); - let (tx, rx) = futures::channel::oneshot::channel(); - let fut = async { - let val = future.await; - _ = tx.send(val); - }; - - wasm_bindgen_futures::spawn_local(fut); - let inner = InnerJoinHandle::CustomHandle { - inner: Some(rx), - handle: abort_handle, - }; - JoinHandle { inner } - } -} - -#[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Eq)] -pub struct ExecutorSwitch; - -impl Executor for ExecutorSwitch { - fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - #[cfg(not(target_arch = "wasm32"))] - let executor = TokioExecutor; - #[cfg(target_arch = "wasm32")] - let executor = WasmExecutor; - - executor.spawn(future) - } -} - -#[cfg(not(target_arch = "wasm32"))] -#[tokio::test] -async fn default_abortable_task() { - let executor = ExecutorSwitch; - - let (tx, rx) = futures::channel::oneshot::channel::<()>(); - - let handle = executor.spawn_abortable(async { - futures_timer::Delay::new(std::time::Duration::from_secs(5)).await; - let _ = tx.send(()); - unreachable!(); - }); - - drop(handle); - let result = rx.await; - assert!(result.is_err()); -} - -#[test] -fn custom_abortable_task() { - use futures::future::Abortable; - struct FuturesExecutor { - pool: futures::executor::ThreadPool, - } - - impl Default for FuturesExecutor { - fn default() -> Self { - Self { - pool: futures::executor::ThreadPool::new().unwrap(), - } - } - } - - impl Executor for FuturesExecutor { - fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - let future = Abortable::new(future, abort_registration); - let (tx, rx) = futures::channel::oneshot::channel(); - let fut = async { - let val = future.await; - let _ = tx.send(val); - }; - - self.pool.spawn_ok(fut); - let inner = InnerJoinHandle::CustomHandle { - inner: Some(rx), - handle: abort_handle, - }; - - JoinHandle { inner } - } - } - - futures::executor::block_on(async move { - let executor = FuturesExecutor::default(); - - let (tx, rx) = futures::channel::oneshot::channel::<()>(); - - let handle = executor.spawn_abortable(async { - futures_timer::Delay::new(std::time::Duration::from_secs(5)).await; - let _ = tx.send(()); - unreachable!(); - }); - - drop(handle); - let result = rx.await; - assert!(result.is_err()); - }); -}