Skip to content

Commit

Permalink
chore: use async-rt in place of rt utils (#362)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Dec 30, 2024
1 parent 0518b79 commit c6e7186
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 378 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# 0.14.2
- feat: Add reconnect option to address book. [PR 356](https://github.com/dariusc93/rust-ipfs/pull/356)
- chore: use async-rt in place of rt utils. [PR 362](https://github.com/dariusc93/rust-ipfs/pull/362)

# 0.14.1
- fix: remove expect when session failed to get next block.
Expand Down
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ test_js_interop = []

[workspace.dependencies]
anyhow = "1.0.94"
async-rt = "0.1.2"
async-stream = { version = "0.3.6" }
async-trait = { version = "0.1.82" }
asynchronous-codec = "0.7.0"
Expand Down Expand Up @@ -94,6 +95,7 @@ zeroize = "1.8.1"

[dependencies]
anyhow.workspace = true
async-rt.workspace = true
async-stream.workspace = true
async-trait.workspace = true
asynchronous-codec.workspace = true
Expand Down
25 changes: 11 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub mod p2p;
pub mod path;
pub mod refs;
pub mod repo;
pub(crate) mod rt;
mod task;
pub mod unixfs;

Expand Down Expand Up @@ -66,19 +65,20 @@ use repo::{
BlockStore, DataStore, GCConfig, GCTrigger, Lock, RepoFetch, RepoInsertPin, RepoRemovePin,
};

use rt::{AbortableJoinHandle, Executor, ExecutorSwitch};
use tracing::Span;
use tracing_futures::Instrument;

use unixfs::UnixfsGet;
use unixfs::{AddOpt, IpfsUnixfs, UnixfsAdd, UnixfsCat, UnixfsLs};

pub use self::p2p::gossipsub::SubscriptionStream;
use self::{
dag::IpldDag,
ipns::Ipns,
p2p::{create_swarm, TSwarm},
repo::Repo,
};
use async_rt::AbortableJoinHandle;
use ipld_core::cid::Cid;
use ipld_core::ipld::Ipld;
use std::borrow::Borrow;
Expand All @@ -91,8 +91,6 @@ use std::{
time::Duration,
};

pub use self::p2p::gossipsub::SubscriptionStream;

pub use self::{
error::Error,
p2p::BehaviourEvent,
Expand Down Expand Up @@ -342,7 +340,6 @@ pub struct Ipfs {
record_key_validator: HashMap<String, Arc<dyn Fn(&str) -> anyhow::Result<Key> + Sync + Send>>,
_guard: AbortableJoinHandle<()>,
_gc_guard: AbortableJoinHandle<()>,
executor: ExecutorSwitch,
}

impl std::fmt::Debug for Ipfs {
Expand Down Expand Up @@ -916,8 +913,6 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
..
} = self;

let executor = ExecutorSwitch;

let keys = keys.unwrap_or(Keypair::generate_ed25519());

let root_span = Option::take(&mut options.span)
Expand Down Expand Up @@ -999,7 +994,6 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
record_key_validator,
_guard,
_gc_guard,
executor,
};

//Note: If `All` or `Pinned` are used, we would have to auto adjust the amount of
Expand Down Expand Up @@ -1043,7 +1037,6 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
let swarm = create_swarm(
&keys,
&options,
executor,
&ipfs.repo,
exec_span,
(custom_behaviour, custom_transport),
Expand All @@ -1054,7 +1047,7 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
} = options;

let gc_handle = gc_config.map(|config| {
executor.spawn_abortable({
async_rt::task::spawn_abortable({
let repo = ipfs.repo.clone();
async move {
let GCConfig { duration, trigger } = config;
Expand Down Expand Up @@ -1160,7 +1153,7 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
}
}

ipfs._guard.replace(executor.spawn_abortable({
let main_handle = async_rt::task::spawn_abortable({
async move {
//Note: For now this is not configurable as its meant for internal testing purposes but may change in the future
let as_fut = false;
Expand All @@ -1174,8 +1167,12 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
fut.await
}
.instrument(swarm_span)
}));
ipfs._gc_guard.replace(gc_handle);
});

unsafe {
ipfs._guard.replace(main_handle);
ipfs._gc_guard.replace(gc_handle);
}
Ok(ipfs)
}
}
Expand Down Expand Up @@ -2547,7 +2544,7 @@ impl Ipfs {
self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?;
let fut = rx.await??;

self.executor.dispatch(async move {
async_rt::task::dispatch(async move {
if let Err(e) = fut.await.map_err(|e| anyhow!(e)) {
tracing::error!(error = %e, "failed to bootstrap");
}
Expand Down
7 changes: 2 additions & 5 deletions src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! P2P handling for IPFS nodes.
use crate::error::Error;
use crate::repo::Repo;
use crate::rt::{Executor, ExecutorSwitch};
use crate::{IpfsOptions, TTransportFn};
use std::convert::TryInto;
use std::num::{NonZeroU8, NonZeroUsize};
Expand Down Expand Up @@ -227,7 +226,6 @@ impl Default for SwarmConfig {
pub(crate) fn create_swarm<C>(
keypair: &Keypair,
options: &IpfsOptions,
executor: ExecutorSwitch,
repo: &Repo,
span: Span,
(custom, custom_transport): (Option<C>, Option<TTransportFn>),
Expand Down Expand Up @@ -256,7 +254,7 @@ where
transport,
behaviour,
peer_id,
libp2p::swarm::Config::with_executor(SpannedExecutor { executor, span })
libp2p::swarm::Config::with_executor(SpannedExecutor { span })
.with_notify_handler_buffer_size(swarm_config.notify_handler_buffer_size)
.with_per_connection_event_buffer_size(swarm_config.connection_event_buffer_size)
.with_dial_concurrency_factor(swarm_config.dial_concurrency_factor)
Expand All @@ -268,7 +266,6 @@ where
}

struct SpannedExecutor {
executor: ExecutorSwitch,
span: Span,
}

Expand All @@ -278,6 +275,6 @@ impl libp2p::swarm::Executor for SpannedExecutor {
future: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + 'static + Send>>,
) {
use tracing_futures::Instrument;
self.executor.dispatch(future.instrument(self.span.clone()));
async_rt::task::dispatch(future.instrument(self.span.clone()));
}
}
Loading

0 comments on commit c6e7186

Please sign in to comment.