Skip to content

Commit

Permalink
Change hook type from a function to a future
Browse files Browse the repository at this point in the history
  • Loading branch information
popzxc committed Jun 21, 2024
1 parent bf640fe commit af32a34
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,11 @@ impl WiringLayer for MetadataCalculatorLayer {

context.add_task(Box::new(metadata_calculator));

context.add_shutdown_hook("rocksdb_terminaton", || {
Box::pin(async {
// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("failed terminating RocksDB instances")
})
context.add_shutdown_hook("rocksdb_terminaton", async {
// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("failed terminating RocksDB instances")
});

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,11 @@ impl WiringLayer for StateKeeperLayer {
storage_factory: Arc::new(storage_factory),
}));

context.add_shutdown_hook("rocksdb_terminaton", || {
Box::pin(async {
// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("failed terminating RocksDB instances")
})
context.add_shutdown_hook("rocksdb_terminaton", async {
// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("failed terminating RocksDB instances")
});
Ok(())
}
Expand Down
16 changes: 9 additions & 7 deletions core/node/node_framework/src/service/context.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::any::type_name;
use std::{any::type_name, future::Future};

use futures::FutureExt as _;

use super::runnables::{ShutdownHook, ShutdownHookFn};
use crate::{
precondition::Precondition,
resource::{Resource, ResourceId, StoredResource},
service::ZkStackService,
service::{named_future::NamedFuture, ZkStackService},
task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask},
wiring_layer::WiringError,
};
Expand Down Expand Up @@ -96,14 +97,15 @@ impl<'a> ServiceContext<'a> {
self
}

/// Adds a function to be invoked after node shutdown.
/// Adds a future to be invoked after node shutdown.
/// May be used to perform cleanup tasks.
///
/// All the collected shutdown hooks will be invoked sequentially after all the node tasks are stopped.
/// The future is guaranteed to only be polled after all the node tasks are stopped or timed out.
/// All the futures will be awaited sequentially.
pub fn add_shutdown_hook(
&mut self,
name: &'static str,
hook: impl ShutdownHookFn,
hook: impl Future<Output = anyhow::Result<()>> + Send + 'static,
) -> &mut Self {
tracing::info!(
"Layer {} has added a new shutdown hook: {}",
Expand All @@ -113,7 +115,7 @@ impl<'a> ServiceContext<'a> {
self.service
.runnables
.shutdown_hooks
.push(ShutdownHook::new(name, hook));
.push(NamedFuture::new(hook.boxed(), name.into()));
self
}

Expand Down
2 changes: 1 addition & 1 deletion core/node/node_framework/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl ZkStackService {
let name = hook.id().clone();
// Limit each shutdown hook to the same timeout as the tasks.
let hook_with_timeout =
async move { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, hook.invoke()).await };
async move { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, hook).await };
match self.runtime.block_on(hook_with_timeout) {
Ok(Ok(())) => {
tracing::info!("Shutdown hook {name} completed");
Expand Down
11 changes: 9 additions & 2 deletions core/node/node_framework/src/service/named_future.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::{future::Future, pin::Pin, task};
use std::{fmt, future::Future, pin::Pin, task};

use pin_project_lite::pin_project;

use crate::task::TaskId;

pin_project! {
/// Implements a future with the name tag attached.
#[derive(Debug)]
pub struct NamedFuture<F> {
#[pin]
inner: F,
Expand Down Expand Up @@ -42,3 +41,11 @@ where
self.project().inner.poll(cx)
}
}

impl<T> fmt::Debug for NamedFuture<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NamedFuture")
.field("name", &self.name)
.finish_non_exhaustive()
}
}
47 changes: 3 additions & 44 deletions core/node/node_framework/src/service/runnables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,12 @@ use tokio::sync::Barrier;
use super::{named_future::NamedFuture, StopReceiver};
use crate::{
precondition::Precondition,
task::{OneshotTask, Task, TaskId, UnconstrainedOneshotTask, UnconstrainedTask},
task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask},
};

/// Alias for futures with the name assigned.
pub type NamedBoxFuture<T> = NamedFuture<BoxFuture<'static, T>>;

/// Alias for a shutdown hook function type.
pub trait ShutdownHookFn:
FnOnce() -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync + 'static
{
}

impl<T> ShutdownHookFn for T where
T: FnOnce() -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync + 'static
{
}

pub struct ShutdownHook {
id: TaskId,
hook: Box<dyn ShutdownHookFn>,
}

impl ShutdownHook {
pub fn new(id: impl Into<TaskId>, hook: impl ShutdownHookFn) -> Self {
Self {
id: id.into(),
hook: Box::new(hook),
}
}

pub fn id(&self) -> &TaskId {
&self.id
}

pub async fn invoke(self) -> anyhow::Result<()> {
(self.hook)().await
}
}

impl fmt::Debug for ShutdownHook {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ShutdownHook")
.field("name", &self.id)
.finish()
}
}

/// A collection of different flavors of tasks.
#[derive(Default)]
pub(super) struct Runnables {
Expand All @@ -67,7 +26,7 @@ pub(super) struct Runnables {
/// Unconstrained oneshot tasks added to the service.
pub(super) unconstrained_oneshot_tasks: Vec<Box<dyn UnconstrainedOneshotTask>>,
/// List of hooks to be invoked after node shutdown.
pub(super) shutdown_hooks: Vec<ShutdownHook>,
pub(super) shutdown_hooks: Vec<NamedBoxFuture<anyhow::Result<()>>>,
}

impl fmt::Debug for Runnables {
Expand All @@ -90,7 +49,7 @@ impl fmt::Debug for Runnables {
pub(super) struct TaskReprs {
pub(super) long_running_tasks: Vec<NamedBoxFuture<anyhow::Result<()>>>,
pub(super) oneshot_tasks: Vec<NamedBoxFuture<anyhow::Result<()>>>,
pub(super) shutdown_hooks: Vec<ShutdownHook>,
pub(super) shutdown_hooks: Vec<NamedBoxFuture<anyhow::Result<()>>>,
}

impl fmt::Debug for TaskReprs {
Expand Down

0 comments on commit af32a34

Please sign in to comment.