Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node_framework): Support shutdown hooks + more #2293

Merged
merged 13 commits into from
Jun 25, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions core/lib/queued_job_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use anyhow::Context as _;
pub use async_trait::async_trait;
use tokio::{sync::watch, task::JoinHandle, time::sleep};
use tokio::{sync::watch, task::JoinHandle};
use vise::{Buckets, Counter, Histogram, LabeledFamily, Metrics};
use zksync_utils::panic_extractor::try_extract_panic_message;

Expand Down Expand Up @@ -57,7 +57,7 @@ pub trait JobProcessor: Sync + Send {
/// To process a batch, pass `Some(batch_size)`.
async fn run(
self,
stop_receiver: watch::Receiver<bool>,
mut stop_receiver: watch::Receiver<bool>,
mut iterations_left: Option<usize>,
) -> anyhow::Result<()>
where
Expand Down Expand Up @@ -86,15 +86,18 @@ pub trait JobProcessor: Sync + Send {
);
let task = self.process_job(&job_id, job, started_at).await;

self.wait_for_task(job_id, started_at, task)
self.wait_for_task(job_id, started_at, task, &mut stop_receiver)
.await
.context("wait_for_task")?;
} else if iterations_left.is_some() {
tracing::info!("No more jobs to process. Server can stop now.");
return Ok(());
} else {
tracing::trace!("Backing off for {} ms", backoff);
sleep(Duration::from_millis(backoff)).await;
// Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this.
tokio::time::timeout(Duration::from_millis(backoff), stop_receiver.changed())
.await
.ok();
popzxc marked this conversation as resolved.
Show resolved Hide resolved
backoff = (backoff * Self::BACKOFF_MULTIPLIER).min(Self::MAX_BACKOFF_MS);
}
}
Expand All @@ -108,6 +111,7 @@ pub trait JobProcessor: Sync + Send {
job_id: Self::JobId,
started_at: Instant,
task: JoinHandle<anyhow::Result<Self::JobArtifacts>>,
stop_receiver: &mut watch::Receiver<bool>,
) -> anyhow::Result<()> {
let attempts = self.get_job_attempts(&job_id).await?;
let max_attempts = self.max_attempts();
Expand All @@ -130,7 +134,17 @@ pub trait JobProcessor: Sync + Send {
if task.is_finished() {
break task.await;
}
sleep(Duration::from_millis(Self::POLLING_INTERVAL_MS)).await;
if tokio::time::timeout(
Duration::from_millis(Self::POLLING_INTERVAL_MS),
stop_receiver.changed(),
)
.await
.is_ok()
{
// Stop signal received, return early.
popzxc marked this conversation as resolved.
Show resolved Hide resolved
// Exit will be processed/reported by the main loop.
return Ok(());
}
};
let error_message = match result {
Ok(Ok(data)) => {
Expand Down
1 change: 1 addition & 0 deletions core/node/node_framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ zksync_reorg_detector.workspace = true
zksync_vm_runner.workspace = true
zksync_node_db_pruner.workspace = true

pin-project-lite.workspace = true
tracing.workspace = true
thiserror.workspace = true
async-trait.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,34 +118,27 @@ impl WiringLayer for MetadataCalculatorLayer {
metadata_calculator.tree_reader(),
)))?;

let metadata_calculator_task = Box::new(MetadataCalculatorTask {
metadata_calculator,
context.add_task(Box::new(metadata_calculator));

context.add_shutdown_hook("rocksdb_terminaton", async {
// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("failed terminating RocksDB instances")
});
context.add_task(metadata_calculator_task);

Ok(())
}
}

#[derive(Debug)]
pub struct MetadataCalculatorTask {
metadata_calculator: MetadataCalculator,
}

#[async_trait::async_trait]
impl Task for MetadataCalculatorTask {
impl Task for MetadataCalculator {
fn id(&self) -> TaskId {
"metadata_calculator".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
let result = self.metadata_calculator.run(stop_receiver.0).await;

// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("failed terminating RocksDB instances")?;
result
(*self).run(stop_receiver.0).await
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ impl UnconstrainedTask for SigintHandlerTask {

// Wait for either SIGINT or stop signal.
tokio::select! {
_ = sigint_receiver => {},
_ = sigint_receiver => {
tracing::info!("Received SIGINT signal");
},
_ = stop_receiver.0.changed() => {},
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ impl WiringLayer for StateKeeperLayer {
sealer,
storage_factory: Arc::new(storage_factory),
}));

context.add_shutdown_hook("rocksdb_terminaton", async {
// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("failed terminating RocksDB instances")
});
Ok(())
}
}
Expand Down Expand Up @@ -119,14 +126,7 @@ impl Task for StateKeeperTask {
self.sealer,
self.storage_factory,
);
let result = state_keeper.run().await;

// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.unwrap();

result
state_keeper.run().await
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,10 @@ impl Task for ApiTaskGarbageCollector {
// We can ignore the stop signal here, since we're tied to the main API task through the channel:
// it'll either get dropped if API cannot be built or will send something through the channel.
// The tasks it sends are aware of the stop receiver themselves.
let tasks = self.task_receiver.await?;
let Ok(tasks) = self.task_receiver.await else {
// API cannot be built, so there are no tasks to wait for.
return Ok(());
};
let _ = futures::future::join_all(tasks).await;
Ok(())
}
Expand Down
10 changes: 9 additions & 1 deletion core/node/node_framework/src/precondition.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{fmt, sync::Arc};

use tokio::sync::Barrier;

Expand Down Expand Up @@ -31,3 +31,11 @@ impl dyn Precondition {
}
}
}

impl fmt::Debug for dyn Precondition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Precondition")
.field("name", &self.id())
.finish()
}
}
28 changes: 26 additions & 2 deletions core/node/node_framework/src/service/context.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::any::type_name;
use std::{any::type_name, future::Future};

use futures::FutureExt as _;

use crate::{
precondition::Precondition,
resource::{Resource, ResourceId, StoredResource},
service::ZkStackService,
service::{named_future::NamedFuture, ZkStackService},
task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask},
wiring_layer::WiringError,
};
Expand Down Expand Up @@ -95,6 +97,28 @@ impl<'a> ServiceContext<'a> {
self
}

/// Adds a future to be invoked after node shutdown.
/// May be used to perform cleanup tasks.
///
/// The future is guaranteed to only be polled after all the node tasks are stopped or timed out.
/// All the futures will be awaited sequentially.
pub fn add_shutdown_hook(
&mut self,
name: &'static str,
hook: impl Future<Output = anyhow::Result<()>> + Send + 'static,
) -> &mut Self {
tracing::info!(
"Layer {} has added a new shutdown hook: {}",
self.layer,
name
);
self.service
.runnables
.shutdown_hooks
.push(NamedFuture::new(hook.boxed(), name.into()));
self
}

/// Attempts to retrieve the resource with the specified name.
/// Internally the resources are stored as [`std::any::Any`], and this method does the downcasting
/// on behalf of the caller.
Expand Down
20 changes: 17 additions & 3 deletions core/node/node_framework/src/service/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
use crate::wiring_layer::WiringError;
use crate::{task::TaskId, wiring_layer::WiringError};

#[derive(Debug, thiserror::Error)]
pub enum TaskError {
#[error("Task {0} failed: {1}")]
TaskFailed(TaskId, anyhow::Error),
#[error("Task {0} panicked: {1}")]
TaskPanicked(TaskId, String),
#[error("Shutdown for task {0} timed out")]
TaskShutdownTimedOut(TaskId),
#[error("Shutdown hook {0} failed: {1}")]
ShutdownHookFailed(TaskId, anyhow::Error),
#[error("Shutdown hook {0} timed out")]
ShutdownHookTimedOut(TaskId),
}

#[derive(Debug, thiserror::Error)]
pub enum ZkStackServiceError {
Expand All @@ -8,6 +22,6 @@ pub enum ZkStackServiceError {
NoTasks,
#[error("One or more wiring layers failed to initialize: {0:?}")]
Wiring(Vec<(String, WiringError)>),
#[error(transparent)]
Task(#[from] anyhow::Error),
#[error("One or more tasks failed: {0:?}")]
Task(Vec<TaskError>),
}
Loading
Loading