Skip to content

Commit

Permalink
feat: replace wrap_with_spawn with wrap_with_spawn_named to provide t…
Browse files Browse the repository at this point in the history
…ask name
  • Loading branch information
matthias-wright committed May 27, 2024
1 parent 555b88e commit 6246382
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 30 deletions.
7 changes: 4 additions & 3 deletions core/archive/src/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ struct ArchiveInner<C: Collection> {

impl<C: Collection> BuildGraph for Archive<C> {
fn build_graph() -> fdi::DependencyGraph {
fdi::DependencyGraph::new().with_infallible(
Self::new.with_event_handler("start", insertion_task::<C>.wrap_with_spawn()),
)
fdi::DependencyGraph::new().with_infallible(Self::new.with_event_handler(
"start",
insertion_task::<C>.wrap_with_spawn_named("ARCHIVE"),
))
}
}

Expand Down
8 changes: 5 additions & 3 deletions core/blockstore-server/src/blockstore_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ impl<C: Collection> BlockstoreServer<C> {

impl<C: Collection> BuildGraph for BlockstoreServer<C> {
fn build_graph() -> fdi::DependencyGraph {
fdi::DependencyGraph::default()
.with(Self::init.with_event_handler("start", Self::start.wrap_with_spawn()))
fdi::DependencyGraph::default().with(Self::init.with_event_handler(
"start",
Self::start.wrap_with_spawn_named("BLOCKSTORE-SERVER"),
))
}
}

Expand Down Expand Up @@ -165,7 +167,7 @@ impl<C: Collection> BlockstoreServerInner<C> {
Some("Counter for number of blockstore requests handled by this node")
);
},
"BLOCKSTORE-SERVER: HANDLE-REQUEST"
"BLOCKSTORE-SERVER: handle request"
);
} else {
self.num_responses.fetch_sub(1, Ordering::Release);
Expand Down
11 changes: 7 additions & 4 deletions core/dack-aggregator/src/dack_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ impl<C: Collection> DeliveryAcknowledgmentAggregator<C> {

impl<C: Collection> BuildGraph for DeliveryAcknowledgmentAggregator<C> {
fn build_graph() -> fdi::DependencyGraph {
fdi::DependencyGraph::default()
.with(Self::init.with_event_handler("start", Self::start.wrap_with_spawn()))
fdi::DependencyGraph::default().with(Self::init.with_event_handler(
"start",
Self::start.wrap_with_spawn_named("DACK_AGGREGATOR"),
))
}
}

Expand Down Expand Up @@ -151,14 +153,15 @@ impl AggregatorInner {
metadata: metadata.remove(&service_id),
};
let submit_tx = self.submit_tx.clone();
tokio::spawn(async move {
spawn!(async move {
if let Err(e) = submit_tx
.run(update)
.await
{
error!("Failed to submit DACK to signer: {e:?}");
}
});
},
"DACK_AGGREGATOR: submit tx");
}

increment_counter_by!(
Expand Down
15 changes: 11 additions & 4 deletions core/handshake/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,19 @@ impl<C: Collection> Handshake<C> {
if let Some(https) = this.config.https.clone() {
let https_router = router.clone();
let handle = run.handle.clone();
tokio::spawn(async move { spawn_https_server(https_router, https, handle).await });
spawn!(
async move { spawn_https_server(https_router, https, handle).await },
"HANDSHAKE: start optional http server"
);
}

// Start HTTP server.
let waiter2 = waiter.clone();
let http_addr = this.config.http_address;
tokio::spawn(async move { spawn_http_server(http_addr, router, waiter2).await });
spawn!(
async move { spawn_http_server(http_addr, router, waiter2).await },
"HANDSHAKE: start http server"
);

// Shutdown the handle.
waiter.wait_for_shutdown().await;
Expand All @@ -110,8 +116,9 @@ impl<C: Collection> Handshake<C> {

impl<C: Collection> BuildGraph for Handshake<C> {
fn build_graph() -> fdi::DependencyGraph {
fdi::DependencyGraph::new()
.with_infallible(Self::new.with_event_handler("start", Self::start.wrap_with_spawn()))
fdi::DependencyGraph::new().with_infallible(
Self::new.with_event_handler("start", Self::start.wrap_with_spawn_named("HANDSHAKE")),
)
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/interfaces/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ macro_rules! partial {
#[macro_export]
macro_rules! spawn {
($future:expr, $name:expr, crucial($waiter:expr)) => {
tokio::task::Builder::new().name(concat!($name,"#WAITER")).spawn(async move{
tokio::task::Builder::new().name(&format!("{}#WAITER", $name)).spawn(async move{
let handle = tokio::task::Builder::new().name($name).spawn($future).expect("Tokio task created outside of tokio runtime");

if let Err(e) = handle.await {
tracing::error!("Crucial task:{} had a panic: {:?} \n Signaling to shutdown the rest of the node",$name, e);
tracing::error!("Crucial task:{} had a panic: {:?} \n Signaling to shutdown the rest of the node", $name, e);
$crate::ShutdownWaiter::trigger_shutdown(&$waiter);
}
}).expect("Tokio task created outside of tokio runtime")
Expand Down
7 changes: 5 additions & 2 deletions core/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,18 @@ impl<C: Collection> ContainedNode<C> {
pub fn spawn(&self) -> JoinHandle<Result<()>> {
let provider = self.provider.clone();

let waiter = self.shutdown.waiter();
self.runtime.as_ref().unwrap().spawn_blocking(move || {
let graph = C::build_graph();
let mut provider = provider.get_local_provider();

// Set tokio as the spawner of fdi async works.
provider
.get_mut::<fdi::Executor>()
.set_spawn_cb(|fut, _name| {
tokio::spawn(fut);
.set_spawn_cb(move |fut, name| {
let name = name.expect("Name must be provided");
let waiter = waiter.clone();
spawn!(fut, &name, crucial(waiter));
});

// Init all of the components and dependencies.
Expand Down
10 changes: 6 additions & 4 deletions core/pinger/src/pinger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ impl<C: Collection> PingerInterface<C> for Pinger<C> {}

impl<C: Collection> BuildGraph for Pinger<C> {
fn build_graph() -> fdi::DependencyGraph {
fdi::DependencyGraph::new()
.with(Self::new.with_event_handler("start", Self::start.wrap_with_spawn()))
fdi::DependencyGraph::new().with(
Self::new.with_event_handler("start", Self::start.wrap_with_spawn_named("PINGER")),
)
}
}

Expand Down Expand Up @@ -203,12 +204,13 @@ impl<C: Collection> PingerInner<C> {
} else {
pending_req.insert((peer_index, id), Instant::now());
let tx = timeout_tx.clone();
tokio::spawn(async move {
spawn!(async move {
tokio::time::sleep(TIMEOUT).await;
// We ignore the sending error because it can happen that the
// pinger is shutdown while there are still pending timeout tasks.
let _ = tx.send((peer_index, id)).await;
});
},
"PINGER: request timeout");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/pool/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ where
impl<C: Collection> BuildGraph for PoolProvider<C, QuinnMuxer> {
fn build_graph() -> fdi::DependencyGraph {
fdi::DependencyGraph::new()
.with(Self::init.with_event_handler("start", Self::start.wrap_with_spawn()))
.with(Self::init.with_event_handler("start", Self::start.wrap_with_spawn_named("POOL")))
}
}

Expand Down
10 changes: 8 additions & 2 deletions core/rep-collector/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,14 @@ impl<C: Collection> ReputationAggregator<C> {

impl<C: Collection> BuildGraph for ReputationAggregator<C> {
fn build_graph() -> fdi::DependencyGraph {
fdi::DependencyGraph::new()
.with(Self::new.with_event_handler("start", Self::start.bounded().wrap_with_spawn()))
fdi::DependencyGraph::new().with(
Self::new.with_event_handler(
"start",
Self::start
.bounded()
.wrap_with_spawn_named("REP-AGGREGATOR"),
),
)
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ impl<C: Collection> Resolver<C> {

impl<C: Collection> BuildGraph for Resolver<C> {
fn build_graph() -> fdi::DependencyGraph {
fdi::DependencyGraph::default()
.with(Self::init.with_event_handler("start", Self::start.wrap_with_spawn()))
fdi::DependencyGraph::default().with(
Self::init.with_event_handler("start", Self::start.wrap_with_spawn_named("RESOLVER")),
)
}
}

Expand Down
7 changes: 6 additions & 1 deletion core/test-utils/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ impl<C: Collection> BuildGraph for MockConsensus<C> {
.with_infallible(|config: fdi::Ref<C::ConfigProviderInterface>| {
MockConsensusGroup::new(config.get::<Self>())
})
.with_infallible(Self::new.with_event_handler("start", Self::start.wrap_with_spawn()))
.with_infallible(
Self::new.with_event_handler(
"start",
Self::start.wrap_with_spawn_named("MOCK-CONSENSUS"),
),
)
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/topology/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ impl<C: Collection> Topology<C> {

impl<C: Collection> BuildGraph for Topology<C> {
fn build_graph() -> fdi::DependencyGraph {
fdi::DependencyGraph::default()
.with(Self::init.with_event_handler("start", Self::start.wrap_with_spawn()))
fdi::DependencyGraph::default().with(
Self::init.with_event_handler("start", Self::start.wrap_with_spawn_named("TOPOLOGY")),
)
}
}

Expand Down

0 comments on commit 6246382

Please sign in to comment.