From 58012f85e18062939aa70d8df42f3d6f46c05dda Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 15 Sep 2021 00:01:18 +0000 Subject: [PATCH] Shutdown gracefully on panic (#2596) ## Proposed Changes * Modify the `TaskExecutor` so that it spawns a "monitor" future for each future spawned by `spawn` or `spawn_blocking`. This monitor future joins the handle of the child future and shuts down the executor if it detects a panic. * Enable backtraces by default by setting the environment variable `RUST_BACKTRACE`. * Spawn the `ProductionBeaconNode` on the `TaskExecutor` so that if a panic occurs during start-up it will take down the whole process. Previously we were using a raw Tokio `spawn`, but I can't see any reason not to use the executor (perhaps someone else can). ## Additional Info I considered using [`std::panic::set_hook`](https://doc.rust-lang.org/std/panic/fn.set_hook.html) to instantiate a custom panic handler, however this doesn't allow us to send a shutdown signal because `Fn` functions can't move variables (i.e. the shutdown sender) out of their environment. This also prevents it from receiving a `Logger`. Hence I decided to leave the panic handler untouched, but with backtraces turned on by default. I did a run through the code base with all the raw Tokio spawn functions disallowed by Clippy, and found only two instances where we bypass the `TaskExecutor`: the HTTP API and `InitializedValidators` in the VC. In both places we use `spawn_blocking` and handle the return value, so I figured that was OK for now. In terms of performance I think the overhead should be minimal. The monitor tasks will just get parked by the executor until their child resolves. I've checked that this covers Discv5, as the `TaskExecutor` gets injected into Discv5 here: https://github.com/sigp/lighthouse/blob/f9bba92db3468321b28ddd9010e26b359f88bafe/beacon_node/src/lib.rs#L125-L126 --- common/task_executor/src/lib.rs | 103 +++++++++++++++----------------- lighthouse/src/main.rs | 65 +++++++++++--------- 2 files changed, 87 insertions(+), 81 deletions(-) diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 269dba80414..4a509897ea7 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -2,7 +2,7 @@ mod metrics; use futures::channel::mpsc::Sender; use futures::prelude::*; -use slog::{debug, o, trace}; +use slog::{crit, debug, o, trace}; use std::sync::Weak; use tokio::runtime::Runtime; @@ -83,34 +83,56 @@ impl TaskExecutor { self.spawn(task.map(|_| ()), name) } - /// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit`. The task is canceled - /// when the corresponding exit_future `Signal` is fired/dropped. + /// Spawn a task to monitor the completion of another task. /// - /// This function generates prometheus metrics on number of tasks and task duration. - pub fn spawn(&self, task: impl Future + Send + 'static, name: &'static str) { - let exit = self.exit.clone(); + /// If the other task exits by panicking, then the monitor task will shut down the executor. + fn spawn_monitor( + &self, + task_handle: impl Future> + Send + 'static, + name: &'static str, + ) { + let mut shutdown_sender = self.shutdown_sender(); let log = self.log.clone(); - if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) { - // Task is shutdown before it completes if `exit` receives - let int_gauge_1 = int_gauge.clone(); - let future = future::select(Box::pin(task), exit).then(move |either| { - match either { - future::Either::Left(_) => trace!(log, "Async task completed"; "task" => name), - future::Either::Right(_) => { - debug!(log, "Async task shutdown, exit received"; "task" => name) + if let Some(runtime) = self.runtime.upgrade() { + runtime.spawn(async move { + if let Err(join_error) = task_handle.await { + if let Ok(panic) = join_error.try_into_panic() { + let message = panic.downcast_ref::<&str>().unwrap_or(&""); + + crit!( + log, + "Task panic. This is a bug!"; + "task_name" => name, + "message" => message, + "advice" => "Please check above for a backtrace and notify \ + the developers" + ); + let _ = shutdown_sender + .try_send(ShutdownReason::Failure("Panic (fatal error)")); } } - int_gauge_1.dec(); - futures::future::ready(()) }); + } else { + debug!( + self.log, + "Couldn't spawn monitor task. Runtime shutting down" + ) + } + } - int_gauge.inc(); - if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn(future); - } else { - debug!(self.log, "Couldn't spawn task. Runtime shutting down"); - } + /// Spawn a future on the tokio runtime. + /// + /// The future is wrapped in an `exit_future::Exit`. The task is canceled when the corresponding + /// exit_future `Signal` is fired/dropped. + /// + /// The future is monitored via another spawned future to ensure that it doesn't panic. In case + /// of a panic, the executor will be shut down via `self.signal_tx`. + /// + /// This function generates prometheus metrics on number of tasks and task duration. + pub fn spawn(&self, task: impl Future + Send + 'static, name: &'static str) { + if let Some(task_handle) = self.spawn_handle(task, name) { + self.spawn_monitor(task_handle, name) } } @@ -150,38 +172,11 @@ impl TaskExecutor { where F: FnOnce() + Send + 'static, { - let log = self.log.clone(); - - if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]) { - if let Some(int_gauge) = metrics::get_int_gauge(&metrics::BLOCKING_TASKS_COUNT, &[name]) - { - let int_gauge_1 = int_gauge.clone(); - let timer = metric.start_timer(); - let join_handle = if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn_blocking(task) - } else { - debug!(self.log, "Couldn't spawn task. Runtime shutting down"); - return; - }; - - let future = async move { - match join_handle.await { - Ok(_) => trace!(log, "Blocking task completed"; "task" => name), - Err(e) => debug!(log, "Blocking task failed"; "error" => %e), - }; - timer.observe_duration(); - int_gauge_1.dec(); - }; - - int_gauge.inc(); - if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn(future); - } else { - debug!(self.log, "Couldn't spawn task. Runtime shutting down"); - } - } + if let Some(task_handle) = self.spawn_blocking_handle(task, name) { + self.spawn_monitor(task_handle, name) } } + /// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit` returning an optional /// join handle to the future. /// The task is canceled when the corresponding exit_future `Signal` is fired/dropped. @@ -200,9 +195,9 @@ impl TaskExecutor { let int_gauge_1 = int_gauge.clone(); let future = future::select(Box::pin(task), exit).then(move |either| { let result = match either { - future::Either::Left((task, _)) => { + future::Either::Left((value, _)) => { trace!(log, "Async task completed"; "task" => name); - Some(task) + Some(value) } future::Either::Right(_) => { debug!(log, "Async task shutdown, exit received"; "task" => name); diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 10f53ff7b20..97e00477af3 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -32,6 +32,11 @@ fn bls_library_name() -> &'static str { } fn main() { + // Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided. + if std::env::var("RUST_BACKTRACE").is_err() { + std::env::set_var("RUST_BACKTRACE", "1"); + } + // Parse the CLI parameters. let matches = App::new("Lighthouse") .version(VERSION.replace("Lighthouse/", "").as_str()) @@ -344,20 +349,23 @@ fn run( .map_err(|e| format!("Error serializing config: {:?}", e))?; }; - environment.runtime().spawn(async move { - if let Err(e) = ProductionBeaconNode::new(context.clone(), config).await { - crit!(log, "Failed to start beacon node"; "reason" => e); - // Ignore the error since it always occurs during normal operation when - // shutting down. - let _ = executor - .shutdown_sender() - .try_send(ShutdownReason::Failure("Failed to start beacon node")); - } else if shutdown_flag { - let _ = executor.shutdown_sender().try_send(ShutdownReason::Success( - "Beacon node immediate shutdown triggered.", - )); - } - }); + executor.clone().spawn( + async move { + if let Err(e) = ProductionBeaconNode::new(context.clone(), config).await { + crit!(log, "Failed to start beacon node"; "reason" => e); + // Ignore the error since it always occurs during normal operation when + // shutting down. + let _ = executor + .shutdown_sender() + .try_send(ShutdownReason::Failure("Failed to start beacon node")); + } else if shutdown_flag { + let _ = executor.shutdown_sender().try_send(ShutdownReason::Success( + "Beacon node immediate shutdown triggered.", + )); + } + }, + "beacon_node", + ); } ("validator_client", Some(matches)) => { let context = environment.core_context(); @@ -374,19 +382,22 @@ fn run( .map_err(|e| format!("Error serializing config: {:?}", e))?; }; if !shutdown_flag { - environment.runtime().spawn(async move { - if let Err(e) = ProductionValidatorClient::new(context, config) - .await - .and_then(|mut vc| vc.start_service()) - { - crit!(log, "Failed to start validator client"; "reason" => e); - // Ignore the error since it always occurs during normal operation when - // shutting down. - let _ = executor - .shutdown_sender() - .try_send(ShutdownReason::Failure("Failed to start validator client")); - } - }); + executor.clone().spawn( + async move { + if let Err(e) = ProductionValidatorClient::new(context, config) + .await + .and_then(|mut vc| vc.start_service()) + { + crit!(log, "Failed to start validator client"; "reason" => e); + // Ignore the error since it always occurs during normal operation when + // shutting down. + let _ = executor.shutdown_sender().try_send(ShutdownReason::Failure( + "Failed to start validator client", + )); + } + }, + "validator_client", + ); } else { let _ = executor.shutdown_sender().try_send(ShutdownReason::Success( "Validator client immediate shutdown triggered.",