Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(core): Expose shutdown errors #18153

Merged
merged 13 commits into from
Aug 10, 2023
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

177 changes: 97 additions & 80 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
cli::{handle_config_errors, LogFormat, Opts, RootOpts},
config::{self, Config, ConfigPath},
heartbeat,
signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
signal::{ShutdownError, SignalHandler, SignalPair, SignalRx, SignalTo},
topology::{
self, ReloadOutcome, RunningTopology, SharedTopologyController, TopologyController,
},
Expand All @@ -49,8 +49,8 @@ use tokio::sync::broadcast::error::RecvError;
pub struct ApplicationConfig {
pub config_paths: Vec<config::ConfigPath>,
pub topology: RunningTopology,
pub graceful_crash_sender: mpsc::UnboundedSender<()>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<()>,
pub graceful_crash_sender: mpsc::UnboundedSender<ShutdownError>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
#[cfg(feature = "api")]
pub api: config::api::Options,
#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -139,9 +139,12 @@ impl ApplicationConfig {

Some(api_server)
}
Err(e) => {
error!("An error occurred that Vector couldn't handle: {}.", e);
_ = self.graceful_crash_sender.send(());
Err(error) => {
let error = error.to_string();
error!("An error occurred that Vector couldn't handle: {}.", error);
_ = self
.graceful_crash_sender
.send(ShutdownError::ApiFailed { error });
None
}
}
Expand Down Expand Up @@ -245,7 +248,7 @@ impl Application {

pub struct StartedApplication {
pub config_paths: Vec<ConfigPath>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<()>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
pub signals: SignalPair,
pub topology_controller: SharedTopologyController,
}
Expand All @@ -270,42 +273,19 @@ impl StartedApplication {

let signal = loop {
tokio::select! {
signal = signal_rx.recv() => {
match signal {
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
let mut topology_controller = topology_controller.lock().await;
let new_config = config_builder.build().map_err(handle_config_errors).ok();
if let ReloadOutcome::FatalError = topology_controller.reload(new_config).await {
break SignalTo::Shutdown;
}
}
Ok(SignalTo::ReloadFromDisk) => {
let mut topology_controller = topology_controller.lock().await;

// Reload paths
if let Some(paths) = config::process_paths(&config_paths) {
topology_controller.config_paths = paths;
}

// Reload config
let new_config = config::load_from_paths_with_provider_and_secrets(&topology_controller.config_paths, &mut signal_handler)
.await
.map_err(handle_config_errors).ok();

if let ReloadOutcome::FatalError = topology_controller.reload(new_config).await {
break SignalTo::Shutdown;
}
},
Err(RecvError::Lagged(amt)) => warn!("Overflow, dropped {} signals.", amt),
Err(RecvError::Closed) => break SignalTo::Shutdown,
Ok(signal) => break signal,
}
}
signal = signal_rx.recv() => if let Some(signal) = handle_signal(
signal,
&topology_controller,
&config_paths,
&mut signal_handler,
).await {
break signal;
},
// Trigger graceful shutdown if a component crashed, or all sources have ended.
_ = graceful_crash.next() => break SignalTo::Shutdown,
error = graceful_crash.next() => break SignalTo::Shutdown(error),
_ = TopologyController::sources_finished(topology_controller.clone()) => {
info!("All sources have finished.");
break SignalTo::Shutdown
break SignalTo::Shutdown(None)
} ,
else => unreachable!("Signal streams never end"),
}
Expand All @@ -319,6 +299,53 @@ impl StartedApplication {
}
}

async fn handle_signal(
signal: Result<SignalTo, RecvError>,
topology_controller: &SharedTopologyController,
config_paths: &[ConfigPath],
signal_handler: &mut SignalHandler,
) -> Option<SignalTo> {
match signal {
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
let mut topology_controller = topology_controller.lock().await;
let new_config = config_builder.build().map_err(handle_config_errors).ok();
match topology_controller.reload(new_config).await {
ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
_ => None,
}
}
Ok(SignalTo::ReloadFromDisk) => {
let mut topology_controller = topology_controller.lock().await;

// Reload paths
if let Some(paths) = config::process_paths(config_paths) {
topology_controller.config_paths = paths;
}

// Reload config
let new_config = config::load_from_paths_with_provider_and_secrets(
&topology_controller.config_paths,
signal_handler,
)
.await
.map_err(handle_config_errors)
.ok();

match topology_controller.reload(new_config).await {
ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
_ => None,
}
}
Err(RecvError::Lagged(amt)) => {
warn!("Overflow, dropped {} signals.", amt);
None
}
Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
Ok(signal) => Some(signal),
}
}

#[derive(Debug)]
pub struct FinishedApplication {
pub signal: SignalTo,
pub signal_rx: SignalRx,
Expand All @@ -329,7 +356,7 @@ impl FinishedApplication {
pub async fn shutdown(self) -> ExitStatus {
let FinishedApplication {
signal,
mut signal_rx,
signal_rx,
topology_controller,
} = self;

Expand All @@ -341,49 +368,39 @@ impl FinishedApplication {
.into_inner();

match signal {
SignalTo::Shutdown => {
emit!(VectorStopped);
tokio::select! {
_ = topology_controller.stop() => ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::OK as u32
}
#[cfg(unix)]
exitcode::OK
}), // Graceful shutdown finished
_ = signal_rx.recv() => {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
// Dropping the shutdown future will immediately shut the server down
ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::UNAVAILABLE as u32
}
#[cfg(unix)]
exitcode::OK
})
}
SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
SignalTo::Quit => Self::quit(),
_ => unreachable!(),
}
}

async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
emit!(VectorStopped);
tokio::select! {
_ = topology_controller.stop() => ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::OK as u32
}
}
SignalTo::Quit => {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
drop(topology_controller);
ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::UNAVAILABLE as u32
}
#[cfg(unix)]
exitcode::OK
})
}
_ => unreachable!(),
#[cfg(unix)]
exitcode::OK
}), // Graceful shutdown finished
_ = signal_rx.recv() => Self::quit(),
}
}

fn quit() -> ExitStatus {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
ExitStatus::from_raw({
#[cfg(windows)]
{
exitcode::UNAVAILABLE as u32
}
#[cfg(unix)]
exitcode::OK
})
}
}

pub fn init_global() {
Expand Down
2 changes: 1 addition & 1 deletion src/secrets/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn query_backend(
loop {
tokio::select! {
biased;
Ok(signal::SignalTo::Shutdown | signal::SignalTo::Quit) = signal_rx.recv() => {
Ok(signal::SignalTo::Shutdown(_) | signal::SignalTo::Quit) = signal_rx.recv() => {
drop(command);
return Err("Secret retrieval was interrupted.".into());
}
Expand Down
27 changes: 22 additions & 5 deletions src/signal.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#![allow(missing_docs)]

use snafu::Snafu;
use tokio::{runtime::Runtime, sync::broadcast};
use tokio_stream::{Stream, StreamExt};

use super::config::ConfigBuilder;
use super::config::{ComponentKey, ConfigBuilder};

pub type ShutdownTx = broadcast::Sender<()>;
pub type SignalTx = broadcast::Sender<SignalTo>;
Expand All @@ -18,11 +19,27 @@ pub enum SignalTo {
/// Signal to reload config from the filesystem.
ReloadFromDisk,
/// Signal to shutdown process.
Shutdown,
Shutdown(Option<ShutdownError>),
/// Shutdown process immediately.
Quit,
}

#[derive(Clone, Debug, Snafu)]
pub enum ShutdownError {
// For future work: It would be nice if we could keep the actual errors in here, but
// `crate::Error` doesn't implement `Clone`, and adding `DynClone` for errors is tricky.
#[snafu(display("The API failed to start: {error}"))]
ApiFailed { error: String },
#[snafu(display("Reload failed, and then failed to restore the previous config"))]
ReloadFailedToRestore,
#[snafu(display(r#"The task for source "{key}" died during execution: {error}"#))]
SourceAborted { key: ComponentKey, error: String },
#[snafu(display(r#"The task for transform "{key}" died during execution: {error}"#))]
TransformAborted { key: ComponentKey, error: String },
#[snafu(display(r#"The task for sink "{key}" died during execution: {error}"#))]
SinkAborted { key: ComponentKey, error: String },
}

/// Convenience struct for app setup handling.
pub struct SignalPair {
pub handler: SignalHandler,
Expand Down Expand Up @@ -153,11 +170,11 @@ fn os_signals(runtime: &Runtime) -> impl Stream<Item = SignalTo> {
let signal = tokio::select! {
_ = sigint.recv() => {
info!(message = "Signal received.", signal = "SIGINT");
SignalTo::Shutdown
SignalTo::Shutdown(None)
},
_ = sigterm.recv() => {
info!(message = "Signal received.", signal = "SIGTERM");
SignalTo::Shutdown
SignalTo::Shutdown(None)
} ,
_ = sigquit.recv() => {
info!(message = "Signal received.", signal = "SIGQUIT");
Expand All @@ -181,7 +198,7 @@ fn os_signals(_: &Runtime) -> impl Stream<Item = SignalTo> {

async_stream::stream! {
loop {
let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown).await;
let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown(None)).await;
yield signal;
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/sinks/datadog/traces/apm_stats/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::time::{sleep, Duration};

use crate::{
config::ConfigBuilder,
signal::ShutdownError,
sinks::datadog::traces::{apm_stats::StatsPayload, DatadogTracesConfig},
sources::datadog_agent::DatadogAgentConfig,
test_util::{start_topology, trace_init},
Expand Down Expand Up @@ -322,8 +323,8 @@ fn validate_stats(agent_stats: &StatsPayload, vector_stats: &StatsPayload) {
async fn start_vector() -> (
RunningTopology,
(
tokio::sync::mpsc::UnboundedSender<()>,
tokio::sync::mpsc::UnboundedReceiver<()>,
tokio::sync::mpsc::UnboundedSender<ShutdownError>,
tokio::sync::mpsc::UnboundedReceiver<ShutdownError>,
),
) {
let dd_agent_address = format!("0.0.0.0:{}", vector_receive_port());
Expand Down
2 changes: 1 addition & 1 deletion src/tap/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitC
loop {
tokio::select! {
biased;
Ok(SignalTo::Shutdown | SignalTo::Quit) = signal_rx.recv() => break,
Ok(SignalTo::Shutdown(_) | SignalTo::Quit) = signal_rx.recv() => break,
status = run(subscription_url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => {
if status == exitcode::UNAVAILABLE || status == exitcode::TEMPFAIL && !opts.no_reconnect {
#[allow(clippy::print_stderr)]
Expand Down
5 changes: 3 additions & 2 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use zstd::Decoder as ZstdDecoder;

use crate::{
config::{Config, ConfigDiff, GenerateConfig},
signal::ShutdownError,
topology::{self, RunningTopology},
trace,
};
Expand Down Expand Up @@ -683,8 +684,8 @@ pub async fn start_topology(
) -> (
RunningTopology,
(
tokio::sync::mpsc::UnboundedSender<()>,
tokio::sync::mpsc::UnboundedReceiver<()>,
tokio::sync::mpsc::UnboundedSender<ShutdownError>,
tokio::sync::mpsc::UnboundedReceiver<ShutdownError>,
),
) {
config.healthchecks.set_require_healthy(require_healthy);
Expand Down
Loading