Skip to content

Commit

Permalink
feat(node_framework): Support shutdown hooks + more (#2293)
Browse files Browse the repository at this point in the history
## What ❔

- Adds support for shutdown hooks. These are to be executed sequentially
after all the tasks are either completed or dropped.
- Note: I didn't spend too much time designing it, went with a "well
enough" approach, as there are a ton of other things to do rn. We can
revisit the design later, unless there are critical issues here.
- One known caveat is that the hooks are not very reusable, and two
tasks can add the same hook, and it will be executed twice. Not sure if
it's an issue, given that the second execution would be a no-op.
- Moves waiting of rocksdb termination from state keeper and metadata
calculator tasks to the hooks.
- Increases the amount of logs we emit.
- Adds task names to many service logs where they were missing.
- Collects all the errors that occurred in the framework.
- Improves handling of stop signals in queued job processor.

## Why ❔

- Performing the shutdown routine in the task itself is deadlock-prone.
- Added logs would've helped with identifying the issues we've already
met.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
popzxc authored Jun 25, 2024
1 parent 627aab9 commit 2b2c790
Show file tree
Hide file tree
Showing 14 changed files with 308 additions and 122 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions core/lib/queued_job_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use anyhow::Context as _;
pub use async_trait::async_trait;
use tokio::{sync::watch, task::JoinHandle, time::sleep};
use tokio::{sync::watch, task::JoinHandle};
use vise::{Buckets, Counter, Histogram, LabeledFamily, Metrics};
use zksync_utils::panic_extractor::try_extract_panic_message;

Expand Down Expand Up @@ -57,7 +57,7 @@ pub trait JobProcessor: Sync + Send {
/// To process a batch, pass `Some(batch_size)`.
async fn run(
self,
stop_receiver: watch::Receiver<bool>,
mut stop_receiver: watch::Receiver<bool>,
mut iterations_left: Option<usize>,
) -> anyhow::Result<()>
where
Expand Down Expand Up @@ -86,15 +86,18 @@ pub trait JobProcessor: Sync + Send {
);
let task = self.process_job(&job_id, job, started_at).await;

self.wait_for_task(job_id, started_at, task)
self.wait_for_task(job_id, started_at, task, &mut stop_receiver)
.await
.context("wait_for_task")?;
} else if iterations_left.is_some() {
tracing::info!("No more jobs to process. Server can stop now.");
return Ok(());
} else {
tracing::trace!("Backing off for {} ms", backoff);
sleep(Duration::from_millis(backoff)).await;
// Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this.
tokio::time::timeout(Duration::from_millis(backoff), stop_receiver.changed())
.await
.ok();
backoff = (backoff * Self::BACKOFF_MULTIPLIER).min(Self::MAX_BACKOFF_MS);
}
}
Expand All @@ -108,6 +111,7 @@ pub trait JobProcessor: Sync + Send {
job_id: Self::JobId,
started_at: Instant,
task: JoinHandle<anyhow::Result<Self::JobArtifacts>>,
stop_receiver: &mut watch::Receiver<bool>,
) -> anyhow::Result<()> {
let attempts = self.get_job_attempts(&job_id).await?;
let max_attempts = self.max_attempts();
Expand All @@ -130,7 +134,17 @@ pub trait JobProcessor: Sync + Send {
if task.is_finished() {
break task.await;
}
sleep(Duration::from_millis(Self::POLLING_INTERVAL_MS)).await;
if tokio::time::timeout(
Duration::from_millis(Self::POLLING_INTERVAL_MS),
stop_receiver.changed(),
)
.await
.is_ok()
{
// Stop signal received, return early.
// Exit will be processed/reported by the main loop.
return Ok(());
}
};
let error_message = match result {
Ok(Ok(data)) => {
Expand Down
1 change: 1 addition & 0 deletions core/node/node_framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ zksync_reorg_detector.workspace = true
zksync_vm_runner.workspace = true
zksync_node_db_pruner.workspace = true

pin-project-lite.workspace = true
tracing.workspace = true
thiserror.workspace = true
async-trait.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,34 +118,27 @@ impl WiringLayer for MetadataCalculatorLayer {
metadata_calculator.tree_reader(),
)))?;

let metadata_calculator_task = Box::new(MetadataCalculatorTask {
metadata_calculator,
context.add_task(Box::new(metadata_calculator));

context.add_shutdown_hook("rocksdb_terminaton", async {
// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("failed terminating RocksDB instances")
});
context.add_task(metadata_calculator_task);

Ok(())
}
}

#[derive(Debug)]
pub struct MetadataCalculatorTask {
metadata_calculator: MetadataCalculator,
}

#[async_trait::async_trait]
impl Task for MetadataCalculatorTask {
impl Task for MetadataCalculator {
fn id(&self) -> TaskId {
"metadata_calculator".into()
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
let result = self.metadata_calculator.run(stop_receiver.0).await;

// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("failed terminating RocksDB instances")?;
result
(*self).run(stop_receiver.0).await
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ impl UnconstrainedTask for SigintHandlerTask {

// Wait for either SIGINT or stop signal.
tokio::select! {
_ = sigint_receiver => {},
_ = sigint_receiver => {
tracing::info!("Received SIGINT signal");
},
_ = stop_receiver.0.changed() => {},
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ impl WiringLayer for StateKeeperLayer {
sealer,
storage_factory: Arc::new(storage_factory),
}));

context.add_shutdown_hook("rocksdb_terminaton", async {
// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.context("failed terminating RocksDB instances")
});
Ok(())
}
}
Expand Down Expand Up @@ -119,14 +126,7 @@ impl Task for StateKeeperTask {
self.sealer,
self.storage_factory,
);
let result = state_keeper.run().await;

// Wait for all the instances of RocksDB to be destroyed.
tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination)
.await
.unwrap();

result
state_keeper.run().await
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,10 @@ impl Task for ApiTaskGarbageCollector {
// We can ignore the stop signal here, since we're tied to the main API task through the channel:
// it'll either get dropped if API cannot be built or will send something through the channel.
// The tasks it sends are aware of the stop receiver themselves.
let tasks = self.task_receiver.await?;
let Ok(tasks) = self.task_receiver.await else {
// API cannot be built, so there are no tasks to wait for.
return Ok(());
};
let _ = futures::future::join_all(tasks).await;
Ok(())
}
Expand Down
10 changes: 9 additions & 1 deletion core/node/node_framework/src/precondition.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{fmt, sync::Arc};

use tokio::sync::Barrier;

Expand Down Expand Up @@ -31,3 +31,11 @@ impl dyn Precondition {
}
}
}

impl fmt::Debug for dyn Precondition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Precondition")
.field("name", &self.id())
.finish()
}
}
28 changes: 26 additions & 2 deletions core/node/node_framework/src/service/context.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::any::type_name;
use std::{any::type_name, future::Future};

use futures::FutureExt as _;

use crate::{
precondition::Precondition,
resource::{Resource, ResourceId, StoredResource},
service::ZkStackService,
service::{named_future::NamedFuture, ZkStackService},
task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask},
wiring_layer::WiringError,
};
Expand Down Expand Up @@ -95,6 +97,28 @@ impl<'a> ServiceContext<'a> {
self
}

/// Adds a future to be invoked after node shutdown.
/// May be used to perform cleanup tasks.
///
/// The future is guaranteed to only be polled after all the node tasks are stopped or timed out.
/// All the futures will be awaited sequentially.
pub fn add_shutdown_hook(
&mut self,
name: &'static str,
hook: impl Future<Output = anyhow::Result<()>> + Send + 'static,
) -> &mut Self {
tracing::info!(
"Layer {} has added a new shutdown hook: {}",
self.layer,
name
);
self.service
.runnables
.shutdown_hooks
.push(NamedFuture::new(hook.boxed(), name.into()));
self
}

/// Attempts to retrieve the resource with the specified name.
/// Internally the resources are stored as [`std::any::Any`], and this method does the downcasting
/// on behalf of the caller.
Expand Down
20 changes: 17 additions & 3 deletions core/node/node_framework/src/service/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
use crate::wiring_layer::WiringError;
use crate::{task::TaskId, wiring_layer::WiringError};

#[derive(Debug, thiserror::Error)]
pub enum TaskError {
#[error("Task {0} failed: {1}")]
TaskFailed(TaskId, anyhow::Error),
#[error("Task {0} panicked: {1}")]
TaskPanicked(TaskId, String),
#[error("Shutdown for task {0} timed out")]
TaskShutdownTimedOut(TaskId),
#[error("Shutdown hook {0} failed: {1}")]
ShutdownHookFailed(TaskId, anyhow::Error),
#[error("Shutdown hook {0} timed out")]
ShutdownHookTimedOut(TaskId),
}

#[derive(Debug, thiserror::Error)]
pub enum ZkStackServiceError {
Expand All @@ -8,6 +22,6 @@ pub enum ZkStackServiceError {
NoTasks,
#[error("One or more wiring layers failed to initialize: {0:?}")]
Wiring(Vec<(String, WiringError)>),
#[error(transparent)]
Task(#[from] anyhow::Error),
#[error("One or more tasks failed: {0:?}")]
Task(Vec<TaskError>),
}
Loading

0 comments on commit 2b2c790

Please sign in to comment.