Skip to content

Commit

Permalink
chore(core): Expose shutdown errors (#18153)
Browse files Browse the repository at this point in the history
* Refactor app signal handling into functions

* Introduce `ShutdownError` to pass along the cause of a shutdown

* Add handling of component shutdown errors

* Add component name to shutdown errors

* Add error message to component errors

* Add API error message

* Add snafu-based display messages for the shutdown errors

* Fix broken dep in Cargo.lock

* Fix Windows compile error

* More fix Windows builds
  • Loading branch information
bruceg authored Aug 9, 2023
1 parent 4cc9cdf commit 910f836
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 108 deletions.
183 changes: 103 additions & 80 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
cli::{handle_config_errors, LogFormat, Opts, RootOpts},
config::{self, Config, ConfigPath},
heartbeat,
signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
signal::{ShutdownError, SignalHandler, SignalPair, SignalRx, SignalTo},
topology::{
self, ReloadOutcome, RunningTopology, SharedTopologyController, TopologyController,
},
Expand All @@ -49,8 +49,8 @@ use tokio::sync::broadcast::error::RecvError;
pub struct ApplicationConfig {
pub config_paths: Vec<config::ConfigPath>,
pub topology: RunningTopology,
pub graceful_crash_sender: mpsc::UnboundedSender<()>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<()>,
pub graceful_crash_sender: mpsc::UnboundedSender<ShutdownError>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
#[cfg(feature = "api")]
pub api: config::api::Options,
#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -139,9 +139,12 @@ impl ApplicationConfig {

Some(api_server)
}
Err(e) => {
error!("An error occurred that Vector couldn't handle: {}.", e);
_ = self.graceful_crash_sender.send(());
Err(error) => {
let error = error.to_string();
error!("An error occurred that Vector couldn't handle: {}.", error);
_ = self
.graceful_crash_sender
.send(ShutdownError::ApiFailed { error });
None
}
}
Expand All @@ -153,6 +156,7 @@ impl ApplicationConfig {
}

impl Application {
#[must_use]
pub fn run() -> ExitStatus {
let (runtime, app) = Self::prepare_start().unwrap_or_else(|code| std::process::exit(code));

Expand Down Expand Up @@ -245,16 +249,18 @@ impl Application {

pub struct StartedApplication {
pub config_paths: Vec<ConfigPath>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<()>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
pub signals: SignalPair,
pub topology_controller: SharedTopologyController,
}

impl StartedApplication {
#[must_use]
pub async fn run(self) -> ExitStatus {
self.main().await.shutdown().await
}

#[must_use]
pub async fn main(self) -> FinishedApplication {
let Self {
config_paths,
Expand All @@ -270,42 +276,19 @@ impl StartedApplication {

let signal = loop {
tokio::select! {
signal = signal_rx.recv() => {
match signal {
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
let mut topology_controller = topology_controller.lock().await;
let new_config = config_builder.build().map_err(handle_config_errors).ok();
if let ReloadOutcome::FatalError = topology_controller.reload(new_config).await {
break SignalTo::Shutdown;
}
}
Ok(SignalTo::ReloadFromDisk) => {
let mut topology_controller = topology_controller.lock().await;

// Reload paths
if let Some(paths) = config::process_paths(&config_paths) {
topology_controller.config_paths = paths;
}

// Reload config
let new_config = config::load_from_paths_with_provider_and_secrets(&topology_controller.config_paths, &mut signal_handler)
.await
.map_err(handle_config_errors).ok();

if let ReloadOutcome::FatalError = topology_controller.reload(new_config).await {
break SignalTo::Shutdown;
}
},
Err(RecvError::Lagged(amt)) => warn!("Overflow, dropped {} signals.", amt),
Err(RecvError::Closed) => break SignalTo::Shutdown,
Ok(signal) => break signal,
}
}
signal = signal_rx.recv() => if let Some(signal) = handle_signal(
signal,
&topology_controller,
&config_paths,
&mut signal_handler,
).await {
break signal;
},
// Trigger graceful shutdown if a component crashed, or all sources have ended.
_ = graceful_crash.next() => break SignalTo::Shutdown,
error = graceful_crash.next() => break SignalTo::Shutdown(error),
_ = TopologyController::sources_finished(topology_controller.clone()) => {
info!("All sources have finished.");
break SignalTo::Shutdown
break SignalTo::Shutdown(None)
} ,
else => unreachable!("Signal streams never end"),
}
Expand All @@ -319,17 +302,65 @@ impl StartedApplication {
}
}

async fn handle_signal(
signal: Result<SignalTo, RecvError>,
topology_controller: &SharedTopologyController,
config_paths: &[ConfigPath],
signal_handler: &mut SignalHandler,
) -> Option<SignalTo> {
match signal {
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
let mut topology_controller = topology_controller.lock().await;
let new_config = config_builder.build().map_err(handle_config_errors).ok();
match topology_controller.reload(new_config).await {
ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
_ => None,
}
}
Ok(SignalTo::ReloadFromDisk) => {
let mut topology_controller = topology_controller.lock().await;

// Reload paths
if let Some(paths) = config::process_paths(config_paths) {
topology_controller.config_paths = paths;
}

// Reload config
let new_config = config::load_from_paths_with_provider_and_secrets(
&topology_controller.config_paths,
signal_handler,
)
.await
.map_err(handle_config_errors)
.ok();

match topology_controller.reload(new_config).await {
ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
_ => None,
}
}
Err(RecvError::Lagged(amt)) => {
warn!("Overflow, dropped {} signals.", amt);
None
}
Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
Ok(signal) => Some(signal),
}
}

#[derive(Debug)]
pub struct FinishedApplication {
pub signal: SignalTo,
pub signal_rx: SignalRx,
pub topology_controller: SharedTopologyController,
}

impl FinishedApplication {
#[must_use]
pub async fn shutdown(self) -> ExitStatus {
let FinishedApplication {
signal,
mut signal_rx,
signal_rx,
topology_controller,
} = self;

Expand All @@ -341,49 +372,41 @@ impl FinishedApplication {
.into_inner();

match signal {
SignalTo::Shutdown => {
emit!(VectorStopped);
tokio::select! {
_ = topology_controller.stop() => ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::OK as u32
}
#[cfg(unix)]
exitcode::OK
}), // Graceful shutdown finished
_ = signal_rx.recv() => {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
// Dropping the shutdown future will immediately shut the server down
ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::UNAVAILABLE as u32
}
#[cfg(unix)]
exitcode::OK
})
}
SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
SignalTo::Quit => Self::quit(),
_ => unreachable!(),
}
}

#[must_use]
async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
emit!(VectorStopped);
tokio::select! {
_ = topology_controller.stop() => ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::OK as u32
}
}
SignalTo::Quit => {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
drop(topology_controller);
ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::UNAVAILABLE as u32
}
#[cfg(unix)]
exitcode::OK
})
}
_ => unreachable!(),
#[cfg(unix)]
exitcode::OK
}), // Graceful shutdown finished
_ = signal_rx.recv() => Self::quit(),
}
}

#[must_use]
fn quit() -> ExitStatus {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::UNAVAILABLE as u32
}
#[cfg(unix)]
exitcode::OK
})
}
}

pub fn init_global() {
Expand Down
2 changes: 1 addition & 1 deletion src/secrets/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn query_backend(
loop {
tokio::select! {
biased;
Ok(signal::SignalTo::Shutdown | signal::SignalTo::Quit) = signal_rx.recv() => {
Ok(signal::SignalTo::Shutdown(_) | signal::SignalTo::Quit) = signal_rx.recv() => {
drop(command);
return Err("Secret retrieval was interrupted.".into());
}
Expand Down
27 changes: 22 additions & 5 deletions src/signal.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#![allow(missing_docs)]

use snafu::Snafu;
use tokio::{runtime::Runtime, sync::broadcast};
use tokio_stream::{Stream, StreamExt};

use super::config::ConfigBuilder;
use super::config::{ComponentKey, ConfigBuilder};

pub type ShutdownTx = broadcast::Sender<()>;
pub type SignalTx = broadcast::Sender<SignalTo>;
Expand All @@ -18,11 +19,27 @@ pub enum SignalTo {
/// Signal to reload config from the filesystem.
ReloadFromDisk,
/// Signal to shutdown process.
Shutdown,
Shutdown(Option<ShutdownError>),
/// Shutdown process immediately.
Quit,
}

#[derive(Clone, Debug, Snafu)]
pub enum ShutdownError {
// For future work: It would be nice if we could keep the actual errors in here, but
// `crate::Error` doesn't implement `Clone`, and adding `DynClone` for errors is tricky.
#[snafu(display("The API failed to start: {error}"))]
ApiFailed { error: String },
#[snafu(display("Reload failed, and then failed to restore the previous config"))]
ReloadFailedToRestore,
#[snafu(display(r#"The task for source "{key}" died during execution: {error}"#))]
SourceAborted { key: ComponentKey, error: String },
#[snafu(display(r#"The task for transform "{key}" died during execution: {error}"#))]
TransformAborted { key: ComponentKey, error: String },
#[snafu(display(r#"The task for sink "{key}" died during execution: {error}"#))]
SinkAborted { key: ComponentKey, error: String },
}

/// Convenience struct for app setup handling.
pub struct SignalPair {
pub handler: SignalHandler,
Expand Down Expand Up @@ -153,11 +170,11 @@ fn os_signals(runtime: &Runtime) -> impl Stream<Item = SignalTo> {
let signal = tokio::select! {
_ = sigint.recv() => {
info!(message = "Signal received.", signal = "SIGINT");
SignalTo::Shutdown
SignalTo::Shutdown(None)
},
_ = sigterm.recv() => {
info!(message = "Signal received.", signal = "SIGTERM");
SignalTo::Shutdown
SignalTo::Shutdown(None)
} ,
_ = sigquit.recv() => {
info!(message = "Signal received.", signal = "SIGQUIT");
Expand All @@ -181,7 +198,7 @@ fn os_signals(_: &Runtime) -> impl Stream<Item = SignalTo> {

async_stream::stream! {
loop {
let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown).await;
let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown(None)).await;
yield signal;
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/sinks/datadog/traces/apm_stats/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::time::{sleep, Duration};

use crate::{
config::ConfigBuilder,
signal::ShutdownError,
sinks::datadog::traces::{apm_stats::StatsPayload, DatadogTracesConfig},
sources::datadog_agent::DatadogAgentConfig,
test_util::{start_topology, trace_init},
Expand Down Expand Up @@ -322,8 +323,8 @@ fn validate_stats(agent_stats: &StatsPayload, vector_stats: &StatsPayload) {
async fn start_vector() -> (
RunningTopology,
(
tokio::sync::mpsc::UnboundedSender<()>,
tokio::sync::mpsc::UnboundedReceiver<()>,
tokio::sync::mpsc::UnboundedSender<ShutdownError>,
tokio::sync::mpsc::UnboundedReceiver<ShutdownError>,
),
) {
let dd_agent_address = format!("0.0.0.0:{}", vector_receive_port());
Expand Down
2 changes: 1 addition & 1 deletion src/tap/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitC
loop {
tokio::select! {
biased;
Ok(SignalTo::Shutdown | SignalTo::Quit) = signal_rx.recv() => break,
Ok(SignalTo::Shutdown(_) | SignalTo::Quit) = signal_rx.recv() => break,
status = run(subscription_url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => {
if status == exitcode::UNAVAILABLE || status == exitcode::TEMPFAIL && !opts.no_reconnect {
#[allow(clippy::print_stderr)]
Expand Down
5 changes: 3 additions & 2 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use zstd::Decoder as ZstdDecoder;

use crate::{
config::{Config, ConfigDiff, GenerateConfig},
signal::ShutdownError,
topology::{self, RunningTopology},
trace,
};
Expand Down Expand Up @@ -683,8 +684,8 @@ pub async fn start_topology(
) -> (
RunningTopology,
(
tokio::sync::mpsc::UnboundedSender<()>,
tokio::sync::mpsc::UnboundedReceiver<()>,
tokio::sync::mpsc::UnboundedSender<ShutdownError>,
tokio::sync::mpsc::UnboundedReceiver<ShutdownError>,
),
) {
config.healthchecks.set_require_healthy(require_healthy);
Expand Down
Loading

0 comments on commit 910f836

Please sign in to comment.