From 168c7d2155952ba09f781c331fd67593b820af20 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 23 Aug 2018 11:30:44 -0700 Subject: [PATCH] feat(server): add `Server::with_graceful_shutdown` method This adds a "combinator" method to `Server`, which accepts a user's future to "select" on. All connections received by the `Server` will be tracked, and if the user's future finishes, graceful shutdown will begin. - The listener will be closed immediately. - The currently active connections will all be notified to start a graceful shutdown. For HTTP/1, that means finishing the existing response and using `connection: clone`. For HTTP/2, the graceful `GOAWAY` process is started. - Once all active connections have terminated, the graceful future will return. Closes #1575 --- src/common/drain.rs | 115 +++++++++++++++++++++++++++++++++++++++++ src/common/mod.rs | 1 + src/server/conn.rs | 17 +++--- src/server/mod.rs | 63 +++++++++++++++++++++- src/server/shutdown.rs | 93 +++++++++++++++++++++++++++++++++ 5 files changed, 281 insertions(+), 8 deletions(-) create mode 100644 src/common/drain.rs create mode 100644 src/server/shutdown.rs diff --git a/src/common/drain.rs b/src/common/drain.rs new file mode 100644 index 0000000000..0222c2ec41 --- /dev/null +++ b/src/common/drain.rs @@ -0,0 +1,115 @@ +use std::mem; + +use futures::{Async, Future, Poll, Stream}; +use futures::future::Shared; +use futures::sync::{mpsc, oneshot}; + +use super::Never; + +pub fn channel() -> (Signal, Watch) { + let (tx, rx) = oneshot::channel(); + let (drained_tx, drained_rx) = mpsc::channel(0); + ( + Signal { + drained_rx, + tx, + }, + Watch { + drained_tx, + rx: rx.shared(), + }, + ) +} + +pub struct Signal { + drained_rx: mpsc::Receiver, + tx: oneshot::Sender<()>, +} + +pub struct Draining { + drained_rx: mpsc::Receiver, +} + +#[derive(Clone)] +pub struct Watch { + drained_tx: mpsc::Sender, + rx: Shared>, +} + +pub struct Watching { + future: F, + state: State, + watch: Watch, +} + +enum State { + Watch(F), + Draining, +} + +impl Signal { + pub fn drain(self) -> Draining { + let _ = self.tx.send(()); + Draining { + drained_rx: self.drained_rx, + } + } +} + +impl Future for Draining { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match try_ready!(self.drained_rx.poll()) { + Some(never) => match never {}, + None => Ok(Async::Ready(())), + } + } +} + +impl Watch { + pub fn watch(self, future: F, on_drain: FN) -> Watching + where + F: Future, + FN: FnOnce(&mut F), + { + Watching { + future, + state: State::Watch(on_drain), + watch: self, + } + } +} + +impl Future for Watching +where + F: Future, + FN: FnOnce(&mut F), +{ + type Item = F::Item; + type Error = F::Error; + + fn poll(&mut self) -> Poll { + loop { + match mem::replace(&mut self.state, State::Draining) { + State::Watch(on_drain) => { + match self.watch.rx.poll() { + Ok(Async::Ready(_)) | Err(_) => { + // Drain has been triggered! + on_drain(&mut self.future); + }, + Ok(Async::NotReady) => { + self.state = State::Watch(on_drain); + return self.future.poll(); + }, + } + }, + State::Draining => { + return self.future.poll(); + }, + } + } + } +} + diff --git a/src/common/mod.rs b/src/common/mod.rs index 585c82950f..c3411569f6 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,4 +1,5 @@ mod buf; +pub(crate) mod drain; mod exec; pub(crate) mod io; mod lazy; diff --git a/src/server/conn.rs b/src/server/conn.rs index d66c3add0e..b8e0a1383a 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -85,7 +85,7 @@ pub struct Connecting { #[must_use = "futures do nothing unless polled"] #[derive(Debug)] pub(super) struct SpawnAll { - serve: Serve, + pub(super) serve: Serve, } /// A future binding a connection with a Service. @@ -618,7 +618,7 @@ impl SpawnAll { } } -impl Future for SpawnAll +impl SpawnAll where I: Stream, I::Error: Into>, @@ -630,16 +630,19 @@ where ::Future: Send + 'static, B: Payload, { - type Item = (); - type Error = ::Error; - - fn poll(&mut self) -> Poll { + pub(super) fn poll_with(&mut self, per_connection: F1) -> Poll<(), ::Error> + where + F1: Fn() -> F2, + F2: FnOnce(UpgradeableConnection) -> R + Send + 'static, + R: Future + Send + 'static, + { loop { if let Some(connecting) = try_ready!(self.serve.poll()) { + let and_then = per_connection(); let fut = connecting .map_err(::Error::new_user_new_service) // flatten basically - .and_then(|conn| conn.with_upgrades()) + .and_then(|conn| and_then(conn.with_upgrades())) .map_err(|err| debug!("conn error: {}", err)); self.serve.protocol.exec.execute(fut)?; } else { diff --git a/src/server/mod.rs b/src/server/mod.rs index f4160c30bf..ddb80c0a1c 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -51,6 +51,7 @@ //! ``` pub mod conn; +mod shutdown; #[cfg(feature = "runtime")] mod tcp; use std::fmt; @@ -67,6 +68,7 @@ use service::{NewService, Service}; // Renamed `Http` as `Http_` for now so that people upgrading don't see an // error that `hyper::server::Http` is private... use self::conn::{Http as Http_, SpawnAll}; +use self::shutdown::Graceful; #[cfg(feature = "runtime")] use self::tcp::AddrIncoming; /// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. @@ -136,6 +138,65 @@ impl Server { } } +impl Server +where + I: Stream, + I::Error: Into>, + I::Item: AsyncRead + AsyncWrite + Send + 'static, + S: NewService + Send + 'static, + S::Error: Into>, + S::Service: Send, + S::Future: Send + 'static, + ::Future: Send + 'static, + B: Payload, +{ + /// Prepares a server to handle graceful shutdown when the provided future + /// completes. + /// + /// # Example + /// + /// ``` + /// # extern crate hyper; + /// # extern crate futures; + /// # use futures::Future; + /// # fn main() {} + /// # #[cfg(feature = "runtime")] + /// # fn run() { + /// # use hyper::{Body, Response, Server}; + /// # use hyper::service::service_fn_ok; + /// # let new_service = || { + /// # service_fn_ok(|_req| { + /// # Response::new(Body::from("Hello World")) + /// # }) + /// # }; + /// + /// // Make a server from the previous examples... + /// let server = Server::bind(&([127, 0, 0, 1], 3000).into()) + /// .serve(new_service); + /// + /// // Prepare some signal for when the server should start + /// // shutting down... + /// let (tx, rx) = futures::sync::oneshot::channel::<()>(); + /// + /// let graceful = server + /// .with_graceful_shutdown(rx) + /// .map_err(|err| eprintln!("server error: {}", err)); + /// + /// // Spawn `server` onto an Executor... + /// hyper::rt::spawn(graceful); + /// + /// // And later, trigger the signal by calling `tx.send(())`. + /// let _ = tx.send(()); + /// # } + /// ``` + pub fn with_graceful_shutdown(self, signal: F) -> Graceful + where + F: Future + { + Graceful::new(self.spawn_all, signal) + } +} + impl Future for Server where I: Stream, @@ -152,7 +213,7 @@ where type Error = ::Error; fn poll(&mut self) -> Poll { - self.spawn_all.poll() + self.spawn_all.poll_with(|| |conn| conn) } } diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs new file mode 100644 index 0000000000..1240f41f63 --- /dev/null +++ b/src/server/shutdown.rs @@ -0,0 +1,93 @@ +use futures::{Async, Future, Stream, Poll}; +use tokio_io::{AsyncRead, AsyncWrite}; + +use body::{Body, Payload}; +use common::drain::{self, Draining, Signal, Watch}; +use service::{Service, NewService}; +use super::SpawnAll; + +#[allow(missing_debug_implementations)] +pub struct Graceful { + state: State, +} + +enum State { + Running { + drain: Option<(Signal, Watch)>, + spawn_all: SpawnAll, + signal: F, + }, + Draining(Draining), +} + +impl Graceful { + pub(super) fn new(spawn_all: SpawnAll, signal: F) -> Self { + let drain = Some(drain::channel()); + Graceful { + state: State::Running { + drain, + spawn_all, + signal, + }, + } + } +} + + +impl Future for Graceful +where + I: Stream, + I::Error: Into>, + I::Item: AsyncRead + AsyncWrite + Send + 'static, + S: NewService + Send + 'static, + S::Error: Into>, + S::Service: Send, + S::Future: Send + 'static, + ::Future: Send + 'static, + B: Payload, + F: Future, +{ + type Item = (); + type Error = ::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + State::Running { + ref mut drain, + ref mut spawn_all, + ref mut signal, + } => match signal.poll() { + Ok(Async::Ready(())) | Err(_) => { + debug!("signal received, starting graceful shutdown"); + let sig = drain + .take() + .expect("drain channel") + .0; + State::Draining(sig.drain()) + }, + Ok(Async::NotReady) => { + let watch = &drain + .as_ref() + .expect("drain channel") + .1; + return spawn_all.poll_with(|| { + let watch = watch.clone(); + move |conn| { + watch.watch(conn, |conn| { + // on_drain, start conn graceful shutdown + conn.graceful_shutdown() + }) + } + }); + }, + }, + State::Draining(ref mut draining) => { + return draining.poll() + .map_err(|()| unreachable!("drain mpsc rx never errors")); + } + }; + self.state = next; + } + } +}