Skip to content

Commit

Permalink
feat(http2): add HTTP2 keep-alive support for client and server
Browse files Browse the repository at this point in the history
This adds HTTP2 keep-alive support to client and server connections
based losely on GRPC keep-alive. When enabled, after no data has been
received for some configured interval, an HTTP2 PING frame is sent. If
the PING is not acknowledged with a configured timeout, the connection
is closed.

Clients have an additional option to enable keep-alive while the
connection is otherwise idle. When disabled, keep-alive PINGs are only
used while there are open request/response streams. If enabled, PINGs
are sent even when there are no active streams.

For now, since these features use `tokio::time::Delay`, the `runtime`
cargo feature is required to use them.
  • Loading branch information
seanmonstar committed Mar 20, 2020
1 parent d838d54 commit 9a8413d
Show file tree
Hide file tree
Showing 13 changed files with 1,166 additions and 255 deletions.
21 changes: 13 additions & 8 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use crate::common::{task, watch, Future, Never, Pin, Poll};
use crate::proto::h2::bdp;
use crate::proto::h2::ping;
use crate::proto::DecodedLength;
use crate::upgrade::OnUpgrade;

Expand All @@ -38,7 +38,7 @@ enum Kind {
rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
},
H2 {
bdp: bdp::Sampler,
ping: ping::Recorder,
content_length: DecodedLength,
recv: h2::RecvStream,
},
Expand Down Expand Up @@ -180,10 +180,10 @@ impl Body {
pub(crate) fn h2(
recv: h2::RecvStream,
content_length: DecodedLength,
bdp: bdp::Sampler,
ping: ping::Recorder,
) -> Self {
let body = Body::new(Kind::H2 {
bdp,
ping,
content_length,
recv,
});
Expand Down Expand Up @@ -265,14 +265,14 @@ impl Body {
}
}
Kind::H2 {
ref bdp,
ref ping,
recv: ref mut h2,
content_length: ref mut len,
} => match ready!(h2.poll_data(cx)) {
Some(Ok(bytes)) => {
let _ = h2.flow_control().release_capacity(bytes.len());
len.sub_if(bytes.len() as u64);
bdp.sample(bytes.len());
ping.record_data(bytes.len());
Poll::Ready(Some(Ok(bytes)))
}
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
Expand Down Expand Up @@ -321,9 +321,14 @@ impl HttpBody for Body {
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match self.kind {
Kind::H2 {
recv: ref mut h2, ..
recv: ref mut h2,
ref ping,
..
} => match ready!(h2.poll_trailers(cx)) {
Ok(t) => Poll::Ready(Ok(t)),
Ok(t) => {
ping.record_non_data();
Poll::Ready(Ok(t))
}
Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
},
_ => Poll::Ready(Ok(None)),
Expand Down
56 changes: 56 additions & 0 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
//!
//! If don't have need to manage connections yourself, consider using the
//! higher-level [Client](super) API.
use std::fmt;
use std::mem;
use std::sync::Arc;
#[cfg(feature = "runtime")]
use std::time::Duration;

use bytes::Bytes;
use futures_util::future::{self, Either, FutureExt as _};
Expand Down Expand Up @@ -517,6 +520,59 @@ impl Builder {
self
}

/// Sets an interval for HTTP2 Ping frames should be sent to keep a
/// connection alive.
///
/// Pass `None` to disable HTTP2 keep-alive.
///
/// Default is currently disabled.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_interval(
&mut self,
interval: impl Into<Option<Duration>>,
) -> &mut Self {
self.h2_builder.keep_alive_interval = interval.into();
self
}

/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
///
/// If the ping is not acknowledged within the timeout, the connection will
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
///
/// Default is 20 seconds.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
self.h2_builder.keep_alive_timeout = timeout;
self
}

/// Sets whether HTTP2 keep-alive should apply while the connection is idle.
///
/// If disabled, keep-alive pings are only sent while there are open
/// request/responses streams. If enabled, pings are also sent when no
/// streams are active. Does nothing if `http2_keep_alive_interval` is
/// disabled.
///
/// Default is `false`.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
self.h2_builder.keep_alive_while_idle = enabled;
self
}

/// Constructs a connection with the configured options and IO.
pub fn handshake<T, B>(
&self,
Expand Down
54 changes: 54 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,7 @@ impl Builder {
self.pool_config.max_idle_per_host = max_idle;
self
}

// HTTP/1 options

/// Set whether HTTP/1 connections should try to use vectored writes,
Expand Down Expand Up @@ -1036,6 +1037,59 @@ impl Builder {
self
}

/// Sets an interval for HTTP2 Ping frames should be sent to keep a
/// connection alive.
///
/// Pass `None` to disable HTTP2 keep-alive.
///
/// Default is currently disabled.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_interval(
&mut self,
interval: impl Into<Option<Duration>>,
) -> &mut Self {
self.conn_builder.http2_keep_alive_interval(interval);
self
}

/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
///
/// If the ping is not acknowledged within the timeout, the connection will
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
///
/// Default is 20 seconds.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
self.conn_builder.http2_keep_alive_timeout(timeout);
self
}

/// Sets whether HTTP2 keep-alive should apply while the connection is idle.
///
/// If disabled, keep-alive pings are only sent while there are open
/// request/responses streams. If enabled, pings are also sent when no
/// streams are active. Does nothing if `http2_keep_alive_interval` is
/// disabled.
///
/// Default is `false`.
///
/// # Cargo Feature
///
/// Requires the `runtime` cargo feature to be enabled.
#[cfg(feature = "runtime")]
pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
self.conn_builder.http2_keep_alive_while_idle(enabled);
self
}

/// Set whether to retry requests that get disrupted before ever starting
/// to write.
///
Expand Down
37 changes: 31 additions & 6 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ pub(crate) enum User {
ManualUpgrade,
}

// Sentinel type to indicate the error was caused by a timeout.
#[derive(Debug)]
pub(crate) struct TimedOut;

impl Error {
/// Returns true if this was an HTTP parse error.
pub fn is_parse(&self) -> bool {
Expand Down Expand Up @@ -133,6 +137,11 @@ impl Error {
self.inner.kind == Kind::BodyWriteAborted
}

/// Returns true if the error was caused by a timeout.
pub fn is_timeout(&self) -> bool {
self.find_source::<TimedOut>().is_some()
}

/// Consumes the error, returning its cause.
pub fn into_cause(self) -> Option<Box<dyn StdError + Send + Sync>> {
self.inner.cause
Expand All @@ -153,19 +162,25 @@ impl Error {
&self.inner.kind
}

pub(crate) fn h2_reason(&self) -> h2::Reason {
// Find an h2::Reason somewhere in the cause stack, if it exists,
// otherwise assume an INTERNAL_ERROR.
fn find_source<E: StdError + 'static>(&self) -> Option<&E> {
let mut cause = self.source();
while let Some(err) = cause {
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
return h2_err.reason().unwrap_or(h2::Reason::INTERNAL_ERROR);
if let Some(ref typed) = err.downcast_ref() {
return Some(typed);
}
cause = err.source();
}

// else
h2::Reason::INTERNAL_ERROR
None
}

pub(crate) fn h2_reason(&self) -> h2::Reason {
// Find an h2::Reason somewhere in the cause stack, if it exists,
// otherwise assume an INTERNAL_ERROR.
self.find_source::<h2::Error>()
.and_then(|h2_err| h2_err.reason())
.unwrap_or(h2::Reason::INTERNAL_ERROR)
}

pub(crate) fn new_canceled() -> Error {
Expand Down Expand Up @@ -397,6 +412,16 @@ trait AssertSendSync: Send + Sync + 'static {}
#[doc(hidden)]
impl AssertSendSync for Error {}

// ===== impl TimedOut ====

impl fmt::Display for TimedOut {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("operation timed out")
}
}

impl StdError for TimedOut {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 9a8413d

Please sign in to comment.