Skip to content

Commit

Permalink
Review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
popzxc committed Jun 21, 2024
1 parent af32a34 commit 5472ebc
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 37 deletions.
2 changes: 1 addition & 1 deletion core/lib/queued_job_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub trait JobProcessor: Sync + Send {
stop_receiver.changed(),
)
.await
.is_err()
.is_ok()
{
// Stop signal received, return early.
// Exit will be processed/reported by the main loop.
Expand Down
55 changes: 26 additions & 29 deletions core/node/node_framework/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use self::{context::ServiceContext, error::ZkStackServiceError, stop_receive
use crate::{
resource::{ResourceId, StoredResource},
service::runnables::TaskReprs,
task::TaskId,
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -180,20 +181,7 @@ impl ZkStackService {
// We will also collect the errors from the remaining tasks, hence a vector.
let mut errors = Vec::new();
let task_name = tasks_names.swap_remove(resolved_idx);
match resolved {
Ok(Ok(())) => {
tracing::info!("Task {task_name} finished");
}
Ok(Err(err)) => {
tracing::error!("Task {task_name} failed: {err}");
errors.push(TaskError::TaskFailed(task_name, err));
}
Err(panic_err) => {
let panic_msg = try_extract_panic_message(panic_err);
tracing::error!("Task {task_name}: {panic_msg}");
errors.push(TaskError::TaskPanicked(task_name, panic_msg));
}
};
handle_task_exit(resolved, task_name, &mut errors);
tracing::info!("One of the task has exited, shutting down the node");

// Collect names for remaining tasks for reporting purposes.
Expand All @@ -211,22 +199,10 @@ impl ZkStackService {
.block_on(futures::future::join_all(remaining_tasks_with_timeout));

// Report the results of the remaining tasks.
for (name, result) in remaining_tasks_names
.into_iter()
.zip(execution_results.into_iter())
{
for (name, result) in remaining_tasks_names.into_iter().zip(execution_results) {
match result {
Ok(Ok(Ok(()))) => {
tracing::info!("Task {name} finished");
}
Ok(Ok(Err(err))) => {
tracing::error!("Task {name} failed: {err}");
errors.push(TaskError::TaskFailed(name, err));
}
Ok(Err(err)) => {
let panic_msg = try_extract_panic_message(err);
tracing::error!("Task {name} panicked: {panic_msg}");
errors.push(TaskError::TaskPanicked(name, panic_msg));
Ok(resolved) => {
handle_task_exit(resolved, name, &mut errors);
}
Err(_) => {
tracing::error!("Task {name} timed out");
Expand Down Expand Up @@ -265,6 +241,27 @@ impl ZkStackService {
}
}

fn handle_task_exit(
task_result: Result<anyhow::Result<()>, tokio::task::JoinError>,
task_name: TaskId,
errors: &mut Vec<TaskError>,
) {
match task_result {
Ok(Ok(())) => {
tracing::info!("Task {task_name} finished");
}
Ok(Err(err)) => {
tracing::error!("Task {task_name} failed: {err}");
errors.push(TaskError::TaskFailed(task_name, err));
}
Err(panic_err) => {
let panic_msg = try_extract_panic_message(panic_err);
tracing::error!("Task {task_name} panicked: {panic_msg}");
errors.push(TaskError::TaskPanicked(task_name, panic_msg));
}
};
}

fn oneshot_runner_task(
oneshot_tasks: Vec<NamedBoxFuture<anyhow::Result<()>>>,
mut stop_receiver: StopReceiver,
Expand Down
15 changes: 8 additions & 7 deletions core/node/node_framework/src/service/named_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ pin_project! {
}
}

impl<T, F> NamedFuture<F>
impl<F> NamedFuture<F>
where
F: Future<Output = T>,
F: Future,
{
/// Creates a new future with the name tag attached.
pub fn new(inner: F, name: TaskId) -> Self {
Expand All @@ -31,18 +31,19 @@ where
}
}

impl<T, F> Future for NamedFuture<F>
impl<F> Future for NamedFuture<F>
where
F: Future<Output = T>,
F: Future,
{
type Output = T;
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.project().inner.poll(cx)
tracing::info_span!("NamedFuture", name = %self.name)
.in_scope(|| self.project().inner.poll(cx))
}
}

impl<T> fmt::Debug for NamedFuture<T> {
impl<F> fmt::Debug for NamedFuture<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NamedFuture")
.field("name", &self.name)
Expand Down

0 comments on commit 5472ebc

Please sign in to comment.