Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require the main (GraphQL) server to shutdown before other servers #3557

Merged
merged 8 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changesets/fix_garypen_3521_keep_listening.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Require the main (GraphQL) route to shutdown before other routes ([Issue #3521](https://github.com/apollographql/router/issues/3521))

This changes router execution so that there is more control over the sequencing of server shutdown. In particular, this modifies how different routes are shutdown so that the main (GraphQL) route is shutdown before other routes are shutdown. Prior to this change all routes shut down in parallel and this would mean that, for example, health checks stopped responding prematurely.

This is particularly undesirable when the router is executing in Kubernetes, since continuing to report live/ready checks during shutdown is a requirement.

By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/3557
42 changes: 32 additions & 10 deletions apollo-router/src/axum_factory/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use axum::response::*;
use axum::routing::get;
use axum::Router;
use futures::channel::oneshot;
use futures::future::join;
use futures::future::join_all;
use futures::prelude::*;
use http::header::ACCEPT_ENCODING;
Expand Down Expand Up @@ -322,29 +321,52 @@ impl HttpServerFactory for AxumHttpServerFactory {
)
});

let (servers, mut shutdowns): (Vec<_>, Vec<_>) = servers_and_shutdowns.unzip();
shutdowns.push(main_shutdown_sender);
let (servers, shutdowns): (Vec<_>, Vec<_>) = servers_and_shutdowns.unzip();

// graceful shutdown mechanism:
// we will fan out to all of the servers once we receive a signal
let (outer_shutdown_sender, outer_shutdown_receiver) = oneshot::channel::<()>();
// create two shutdown channels. One for the main (GraphQL) server and the other for
// the extra servers (health, metrics, etc...)
// We spawn a task for each server which just waits to propagate the message to:
// - main
// - all extras
// We have two separate channels because we want to ensure that main is notified
// separately from all other servers and we wait for main to shutdown before we notify
// extra servers.
let (outer_main_shutdown_sender, outer_main_shutdown_receiver) =
oneshot::channel::<()>();
tokio::task::spawn(async move {
let _ = outer_shutdown_receiver.await;
let _ = outer_main_shutdown_receiver.await;
if let Err(_err) = main_shutdown_sender.send(()) {
tracing::error!("Failed to notify http thread of shutdown");
}
});

let (outer_extra_shutdown_sender, outer_extra_shutdown_receiver) =
oneshot::channel::<()>();
tokio::task::spawn(async move {
let _ = outer_extra_shutdown_receiver.await;
shutdowns.into_iter().for_each(|sender| {
if let Err(_err) = sender.send(()) {
tracing::error!("Failed to notify http thread of shutdown")
};
})
});

// Spawn the server into a runtime
let server_future = tokio::task::spawn(join(main_server, join_all(servers)))
// Spawn the main (GraphQL) server into a task
let main_future = tokio::task::spawn(main_server)
.map_err(|_| ApolloRouterError::HttpServerLifecycleError)
.boxed();

// Spawn all other servers (health, metrics, etc...) into a task
let extra_futures = tokio::task::spawn(join_all(servers))
.map_err(|_| ApolloRouterError::HttpServerLifecycleError)
.boxed();

Ok(HttpServerHandle::new(
outer_shutdown_sender,
server_future,
outer_main_shutdown_sender,
outer_extra_shutdown_sender,
main_future,
extra_futures,
Some(actual_main_listen_address),
actual_extra_listen_adresses,
all_connections_stopped_sender,
Expand Down
91 changes: 62 additions & 29 deletions apollo-router/src/http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) trait HttpServerFactory {
service_factory: RF,
configuration: Arc<Configuration>,
main_listener: Option<Listener>,
previous_listeners: Vec<(ListenAddr, Listener)>,
previous_listeners: ExtraListeners,
extra_endpoints: MultiMap<ListenAddr, Endpoint>,
license: LicenseState,
all_connections_stopped_sender: mpsc::Sender<()>,
Expand All @@ -40,7 +40,8 @@ pub(crate) trait HttpServerFactory {
fn ready(&self, ready: bool);
}

type MainAndExtraListeners = (Listener, Vec<(ListenAddr, Listener)>);
type ExtraListeners = Vec<(ListenAddr, Listener)>;

/// A handle with with a client can shut down the server gracefully.
/// This relies on the underlying server implementation doing the right thing.
/// There are various ways that a user could prevent this working, including holding open connections
Expand All @@ -49,12 +50,18 @@ type MainAndExtraListeners = (Listener, Vec<(ListenAddr, Listener)>);
#[derivative(Debug)]
pub(crate) struct HttpServerHandle {
/// Sender to use to notify of shutdown
shutdown_sender: oneshot::Sender<()>,
main_shutdown_sender: oneshot::Sender<()>,

/// Sender to use to notify extras of shutdown
extra_shutdown_sender: oneshot::Sender<()>,

/// Future to wait on for graceful shutdown
#[derivative(Debug = "ignore")]
server_future:
Pin<Box<dyn Future<Output = Result<MainAndExtraListeners, ApolloRouterError>> + Send>>,
main_future: Pin<Box<dyn Future<Output = Result<Listener, ApolloRouterError>> + Send>>,

/// More futures to wait on for graceful shutdown
#[derivative(Debug = "ignore")]
extra_futures: Pin<Box<dyn Future<Output = Result<ExtraListeners, ApolloRouterError>> + Send>>,

/// The listen addresses that the server is actually listening on.
/// This includes the `graphql_listen_address` as well as any other address a plugin listens on.
Expand All @@ -71,35 +78,37 @@ pub(crate) struct HttpServerHandle {

impl HttpServerHandle {
pub(crate) fn new(
shutdown_sender: oneshot::Sender<()>,
server_future: Pin<
Box<
dyn Future<Output = Result<MainAndExtraListeners, ApolloRouterError>>
+ Send
+ 'static,
>,
main_shutdown_sender: oneshot::Sender<()>,
extra_shutdown_sender: oneshot::Sender<()>,
main_future: Pin<
Box<dyn Future<Output = Result<Listener, ApolloRouterError>> + Send + 'static>,
>,
extra_futures: Pin<
Box<dyn Future<Output = Result<ExtraListeners, ApolloRouterError>> + Send + 'static>,
>,
graphql_listen_address: Option<ListenAddr>,
listen_addresses: Vec<ListenAddr>,
all_connections_stopped_sender: mpsc::Sender<()>,
) -> Self {
Self {
shutdown_sender,
server_future,
main_shutdown_sender,
extra_shutdown_sender,
main_future,
extra_futures,
graphql_listen_address,
listen_addresses,
all_connections_stopped_sender,
}
}

pub(crate) async fn shutdown(self) -> Result<(), ApolloRouterError> {
if let Err(_err) = self.shutdown_sender.send(()) {
tracing::error!("Failed to notify http thread of shutdown")
};
let _listener = self.server_future.await?;
pub(crate) async fn shutdown(mut self) -> Result<(), ApolloRouterError> {
let listen_addresses = std::mem::take(&mut self.listen_addresses);

let (_main_listener, _extra_listener) = self.wait_for_servers().await?;

#[cfg(unix)]
// listen_addresses includes the main graphql_address
for listen_address in self.listen_addresses {
for listen_address in listen_addresses {
if let ListenAddr::UnixSocket(path) = listen_address {
let _ = tokio::fs::remove_file(path).await;
}
Expand All @@ -119,16 +128,14 @@ impl HttpServerHandle {
SF: HttpServerFactory,
RF: RouterFactory,
{
// we tell the currently running server to stop
if let Err(_err) = self.shutdown_sender.send(()) {
tracing::error!("Failed to notify http thread of shutdown")
};
let all_connections_stopped_sender = self.all_connections_stopped_sender.clone();

// when the server receives the shutdown signal, it stops accepting new
// connections, and returns the TCP listener, to reuse it in the next server
// it is necessary to keep the queue of new TCP sockets associated with
// the listener instead of dropping them
let (main_listener, extra_listeners) = self.server_future.await?;
// the listeners instead of dropping them
let (main_listener, extra_listeners) = self.wait_for_servers().await?;

tracing::debug!("previous server stopped");

// we give the listeners to the new configuration, they'll clean up whatever needs to
Expand All @@ -140,7 +147,7 @@ impl HttpServerHandle {
extra_listeners,
web_endpoints,
license,
self.all_connections_stopped_sender.clone(),
all_connections_stopped_sender,
)
.await?;
tracing::debug!(
Expand All @@ -162,6 +169,19 @@ impl HttpServerHandle {
pub(crate) fn graphql_listen_address(&self) -> &Option<ListenAddr> {
&self.graphql_listen_address
}

async fn wait_for_servers(self) -> Result<(Listener, ExtraListeners), ApolloRouterError> {
if let Err(_err) = self.main_shutdown_sender.send(()) {
tracing::error!("Failed to notify http thread of shutdown")
};
let main_listener = self.main_future.await?;

if let Err(_err) = self.extra_shutdown_sender.send(()) {
tracing::error!("Failed to notify http thread of shutdown")
};
let extra_listeners = self.extra_futures.await?;
Ok((main_listener, extra_listeners))
}
}

pub(crate) enum Listener {
Expand Down Expand Up @@ -254,12 +274,15 @@ mod tests {
// TODO [igni]: add a check with extra endpoints
async fn sanity() {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (extra_shutdown_sender, extra_shutdown_receiver) = oneshot::channel();
let listener = Listener::Tcp(tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap());
let (all_connections_stopped_sender, _) = mpsc::channel::<()>(1);

HttpServerHandle::new(
shutdown_sender,
futures::future::ready(Ok((listener, vec![]))).boxed(),
extra_shutdown_sender,
futures::future::ready(Ok(listener)).boxed(),
futures::future::ready(Ok(vec![])).boxed(),
Some(SocketAddr::from_str("127.0.0.1:0").unwrap().into()),
Default::default(),
all_connections_stopped_sender,
Expand All @@ -271,6 +294,9 @@ mod tests {
shutdown_receiver
.await
.expect("Should have been send notification to shutdown");
extra_shutdown_receiver
.await
.expect("Should have been send notification to shutdown");
}

#[test(tokio::test)]
Expand All @@ -280,12 +306,15 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let sock = temp_dir.as_ref().join("sock");
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (extra_shutdown_sender, extra_shutdown_receiver) = oneshot::channel();
let listener = Listener::Unix(tokio::net::UnixListener::bind(&sock).unwrap());
let (all_connections_stopped_sender, _) = mpsc::channel::<()>(1);

HttpServerHandle::new(
shutdown_sender,
futures::future::ready(Ok((listener, vec![]))).boxed(),
extra_shutdown_sender,
futures::future::ready(Ok(listener)).boxed(),
futures::future::ready(Ok(vec![])).boxed(),
Some(ListenAddr::UnixSocket(sock)),
Default::default(),
all_connections_stopped_sender,
Expand All @@ -297,5 +326,9 @@ mod tests {
shutdown_receiver
.await
.expect("Should have sent notification to shutdown");

extra_shutdown_receiver
.await
.expect("Should have sent notification to shutdown");
}
}
Loading