From 2a760fa40c3e15d601c25f2cd98bce0f7ed9e3d5 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 21 Jun 2024 16:34:39 +0400 Subject: [PATCH] Review fixes --- core/lib/queued_job_processor/src/lib.rs | 2 +- core/node/node_framework/src/service/mod.rs | 55 +++++++++---------- .../src/service/named_future.rs | 18 +++--- 3 files changed, 37 insertions(+), 38 deletions(-) diff --git a/core/lib/queued_job_processor/src/lib.rs b/core/lib/queued_job_processor/src/lib.rs index 702fd557187b..a5a4fa39fcae 100644 --- a/core/lib/queued_job_processor/src/lib.rs +++ b/core/lib/queued_job_processor/src/lib.rs @@ -139,7 +139,7 @@ pub trait JobProcessor: Sync + Send { stop_receiver.changed(), ) .await - .is_err() + .is_ok() { // Stop signal received, return early. // Exit will be processed/reported by the main loop. diff --git a/core/node/node_framework/src/service/mod.rs b/core/node/node_framework/src/service/mod.rs index 4acb18d14e48..57035a048d86 100644 --- a/core/node/node_framework/src/service/mod.rs +++ b/core/node/node_framework/src/service/mod.rs @@ -12,6 +12,7 @@ pub use self::{context::ServiceContext, error::ZkStackServiceError, stop_receive use crate::{ resource::{ResourceId, StoredResource}, service::runnables::TaskReprs, + task::TaskId, wiring_layer::{WiringError, WiringLayer}, }; @@ -180,20 +181,7 @@ impl ZkStackService { // We will also collect the errors from the remaining tasks, hence a vector. let mut errors = Vec::new(); let task_name = tasks_names.swap_remove(resolved_idx); - match resolved { - Ok(Ok(())) => { - tracing::info!("Task {task_name} finished"); - } - Ok(Err(err)) => { - tracing::error!("Task {task_name} failed: {err}"); - errors.push(TaskError::TaskFailed(task_name, err)); - } - Err(panic_err) => { - let panic_msg = try_extract_panic_message(panic_err); - tracing::error!("Task {task_name}: {panic_msg}"); - errors.push(TaskError::TaskPanicked(task_name, panic_msg)); - } - }; + handle_task_exit(resolved, task_name, &mut errors); tracing::info!("One of the task has exited, shutting down the node"); // Collect names for remaining tasks for reporting purposes. @@ -211,22 +199,10 @@ impl ZkStackService { .block_on(futures::future::join_all(remaining_tasks_with_timeout)); // Report the results of the remaining tasks. - for (name, result) in remaining_tasks_names - .into_iter() - .zip(execution_results.into_iter()) - { + for (name, result) in remaining_tasks_names.into_iter().zip(execution_results) { match result { - Ok(Ok(Ok(()))) => { - tracing::info!("Task {name} finished"); - } - Ok(Ok(Err(err))) => { - tracing::error!("Task {name} failed: {err}"); - errors.push(TaskError::TaskFailed(name, err)); - } - Ok(Err(err)) => { - let panic_msg = try_extract_panic_message(err); - tracing::error!("Task {name} panicked: {panic_msg}"); - errors.push(TaskError::TaskPanicked(name, panic_msg)); + Ok(resolved) => { + handle_task_exit(resolved, name, &mut errors); } Err(_) => { tracing::error!("Task {name} timed out"); @@ -265,6 +241,27 @@ impl ZkStackService { } } +fn handle_task_exit( + task_result: Result, tokio::task::JoinError>, + task_name: TaskId, + errors: &mut Vec, +) { + match task_result { + Ok(Ok(())) => { + tracing::info!("Task {task_name} finished"); + } + Ok(Err(err)) => { + tracing::error!("Task {task_name} failed: {err}"); + errors.push(TaskError::TaskFailed(task_name, err)); + } + Err(panic_err) => { + let panic_msg = try_extract_panic_message(panic_err); + tracing::error!("Task {task_name} panicked: {panic_msg}"); + errors.push(TaskError::TaskPanicked(task_name, panic_msg)); + } + }; +} + fn oneshot_runner_task( oneshot_tasks: Vec>>, mut stop_receiver: StopReceiver, diff --git a/core/node/node_framework/src/service/named_future.rs b/core/node/node_framework/src/service/named_future.rs index b6fdb905f8d0..6c168c4a013c 100644 --- a/core/node/node_framework/src/service/named_future.rs +++ b/core/node/node_framework/src/service/named_future.rs @@ -1,6 +1,7 @@ use std::{fmt, future::Future, pin::Pin, task}; use pin_project_lite::pin_project; +use tracing::{instrument::Instrumented, Instrument}; use crate::task::TaskId; @@ -8,17 +9,18 @@ pin_project! { /// Implements a future with the name tag attached. pub struct NamedFuture { #[pin] - inner: F, + inner: Instrumented, name: TaskId, } } -impl NamedFuture +impl NamedFuture where - F: Future, + F: Future, { /// Creates a new future with the name tag attached. pub fn new(inner: F, name: TaskId) -> Self { + let inner = inner.instrument(tracing::info_span!("task", name = %name)); Self { inner, name } } @@ -26,23 +28,23 @@ where self.name.clone() } - pub fn into_inner(self) -> F { + pub fn into_inner(self) -> Instrumented { self.inner } } -impl Future for NamedFuture +impl Future for NamedFuture where - F: Future, + F: Future, { - type Output = T; + type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { self.project().inner.poll(cx) } } -impl fmt::Debug for NamedFuture { +impl fmt::Debug for NamedFuture { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("NamedFuture") .field("name", &self.name)