From 5472ebc4135650258e3a4f699eec7007d674f2bc Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 16:34:39 +0400 Subject: [PATCH] Review fixes --- core/lib/queued_job_processor/src/lib.rs | 2 +- core/node/node_framework/src/service/mod.rs | 55 +++++++++---------- .../src/service/named_future.rs | 15 ++--- 3 files changed, 35 insertions(+), 37 deletions(-) diff --git a/core/lib/queued_job_processor/src/lib.rs b/core/lib/queued_job_processor/src/lib.rs index 702fd557187b..a5a4fa39fcae 100644 --- a/core/lib/queued_job_processor/src/lib.rs +++ b/core/lib/queued_job_processor/src/lib.rs @@ -139,7 +139,7 @@ pub trait JobProcessor: Sync + Send { stop_receiver.changed(), ) .await - .is_err() + .is_ok() { // Stop signal received, return early. // Exit will be processed/reported by the main loop. diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index 4acb18d14e48..57035a048d86 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -12,6 +12,7 @@ pub use self::{context::ServiceContext, error::ZkStackServiceError, stop_receive use crate::{ resource::{ResourceId, StoredResource}, service::runnables::TaskReprs, + task::TaskId, wiring_layer::{WiringError, WiringLayer}, }; @@ -180,20 +181,7 @@ impl ZkStackService { // We will also collect the errors from the remaining tasks, hence a vector. let mut errors = Vec::new(); let task_name = tasks_names.swap_remove(resolved_idx); - match resolved { - Ok(Ok(())) => { - tracing::info!("Task {task_name} finished"); - } - Ok(Err(err)) => { - tracing::error!("Task {task_name} failed: {err}"); - errors.push(TaskError::TaskFailed(task_name, err)); - } - Err(panic_err) => { - let panic_msg = try_extract_panic_message(panic_err); - tracing::error!("Task {task_name}: {panic_msg}"); - errors.push(TaskError::TaskPanicked(task_name, panic_msg)); - } - }; + handle_task_exit(resolved, task_name, &mut errors); tracing::info!("One of the task has exited, shutting down the node"); // Collect names for remaining tasks for reporting purposes. @@ -211,22 +199,10 @@ impl ZkStackService { .block_on(futures::future::join_all(remaining_tasks_with_timeout)); // Report the results of the remaining tasks. - for (name, result) in remaining_tasks_names - .into_iter() - .zip(execution_results.into_iter()) - { + for (name, result) in remaining_tasks_names.into_iter().zip(execution_results) { match result { - Ok(Ok(Ok(()))) => { - tracing::info!("Task {name} finished"); - } - Ok(Ok(Err(err))) => { - tracing::error!("Task {name} failed: {err}"); - errors.push(TaskError::TaskFailed(name, err)); - } - Ok(Err(err)) => { - let panic_msg = try_extract_panic_message(err); - tracing::error!("Task {name} panicked: {panic_msg}"); - errors.push(TaskError::TaskPanicked(name, panic_msg)); + Ok(resolved) => { + handle_task_exit(resolved, name, &mut errors); } Err(_) => { tracing::error!("Task {name} timed out"); @@ -265,6 +241,27 @@ impl ZkStackService { } } +fn handle_task_exit( + task_result: Result, tokio::task::JoinError>, + task_name: TaskId, + errors: &mut Vec, +) { + match task_result { + Ok(Ok(())) => { + tracing::info!("Task {task_name} finished"); + } + Ok(Err(err)) => { + tracing::error!("Task {task_name} failed: {err}"); + errors.push(TaskError::TaskFailed(task_name, err)); + } + Err(panic_err) => { + let panic_msg = try_extract_panic_message(panic_err); + tracing::error!("Task {task_name} panicked: {panic_msg}"); + errors.push(TaskError::TaskPanicked(task_name, panic_msg)); + } + }; +} + fn oneshot_runner_task( oneshot_tasks: Vec>>, mut stop_receiver: StopReceiver, diff --git a/core/node/node_framework/src/service/named_future.rs b/core/node/node_framework/src/service/named_future.rs index b6fdb905f8d0..9aa715b0a74b 100644 --- a/core/node/node_framework/src/service/named_future.rs +++ b/core/node/node_framework/src/service/named_future.rs @@ -13,9 +13,9 @@ pin_project! { } } -impl NamedFuture +impl NamedFuture where - F: Future, + F: Future, { /// Creates a new future with the name tag attached. pub fn new(inner: F, name: TaskId) -> Self { @@ -31,18 +31,19 @@ where } } -impl Future for NamedFuture +impl Future for NamedFuture where - F: Future, + F: Future, { - type Output = T; + type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { - self.project().inner.poll(cx) + tracing::info_span!("NamedFuture", name = %self.name) + .in_scope(|| self.project().inner.poll(cx)) } } -impl fmt::Debug for NamedFuture { +impl fmt::Debug for NamedFuture { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("NamedFuture") .field("name", &self.name)