Skip to content

Commit

Permalink
feat(client): implement the HTTP/2 extended CONNECT protocol from RFC…
Browse files Browse the repository at this point in the history
… 8441 (#2682)
  • Loading branch information
nox authored Feb 8, 2022
1 parent 6932896 commit 5ec094c
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 3 deletions.
17 changes: 17 additions & 0 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,23 @@ where
Poll::Ready(Ok(conn.take().unwrap().into_parts()))
})
}

/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
///
/// This setting is configured by the server peer by sending the
/// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
/// This method returns the currently acknowledged value recieved from the
/// remote.
///
/// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
/// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
#[cfg(feature = "http2")]
pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool {
match self.inner.as_ref().unwrap() {
ProtoClient::H1 { .. } => false,
ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(),
}
}
}

impl<T, B> Future for Connection<T, B>
Expand Down
60 changes: 59 additions & 1 deletion src/ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,67 @@
//! HTTP extensions
//! HTTP extensions.

use bytes::Bytes;
#[cfg(feature = "http1")]
use http::header::{HeaderName, IntoHeaderName, ValueIter};
use http::HeaderMap;
#[cfg(feature = "http2")]
use std::fmt;

#[cfg(feature = "http2")]
/// Represents the `:protocol` pseudo-header used by
/// the [Extended CONNECT Protocol].
///
/// [Extended CONNECT Protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
#[derive(Clone, Eq, PartialEq)]
pub struct Protocol {
inner: h2::ext::Protocol,
}

#[cfg(feature = "http2")]
impl Protocol {
/// Converts a static string to a protocol name.
pub const fn from_static(value: &'static str) -> Self {
Self {
inner: h2::ext::Protocol::from_static(value),
}
}

/// Returns a str representation of the header.
pub fn as_str(&self) -> &str {
self.inner.as_str()
}

pub(crate) fn from_inner(inner: h2::ext::Protocol) -> Self {
Self { inner }
}

pub(crate) fn into_inner(self) -> h2::ext::Protocol {
self.inner
}
}

#[cfg(feature = "http2")]
impl<'a> From<&'a str> for Protocol {
fn from(value: &'a str) -> Self {
Self {
inner: h2::ext::Protocol::from(value),
}
}
}

#[cfg(feature = "http2")]
impl AsRef<[u8]> for Protocol {
fn as_ref(&self) -> &[u8] {
self.inner.as_ref()
}
}

#[cfg(feature = "http2")]
impl fmt::Debug for Protocol {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}

/// A map from header names to their original casing as received in an HTTP message.
///
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ mod cfg;
mod common;
pub mod body;
mod error;
mod ext;
pub mod ext;
#[cfg(test)]
mod mock;
pub mod rt;
Expand Down
14 changes: 14 additions & 0 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tracing::{debug, trace, warn};
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
use crate::body::HttpBody;
use crate::common::{exec::Exec, task, Future, Never, Pin, Poll};
use crate::ext::Protocol;
use crate::headers;
use crate::proto::h2::UpgradedSendStream;
use crate::proto::Dispatched;
Expand Down Expand Up @@ -204,6 +205,15 @@ where
req_rx: ClientRx<B>,
}

impl<B> ClientTask<B>
where
B: HttpBody + 'static,
{
pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
self.h2_tx.is_extended_connect_protocol_enabled()
}
}

impl<B> Future for ClientTask<B>
where
B: HttpBody + Send + 'static,
Expand Down Expand Up @@ -260,6 +270,10 @@ where
}
}

if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
req.extensions_mut().insert(protocol.into_inner());
}

let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
Ok(ok) => ok,
Err(err) => {
Expand Down
12 changes: 11 additions & 1 deletion src/proto/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use super::{ping, PipeToSendStream, SendBuf};
use crate::body::HttpBody;
use crate::common::exec::ConnStreamExec;
use crate::common::{date, task, Future, Pin, Poll};
use crate::ext::Protocol;
use crate::headers;
use crate::proto::h2::ping::Recorder;
use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
Expand All @@ -41,6 +42,7 @@ pub(crate) struct Config {
pub(crate) initial_conn_window_size: u32,
pub(crate) initial_stream_window_size: u32,
pub(crate) max_frame_size: u32,
pub(crate) enable_connect_protocol: bool,
pub(crate) max_concurrent_streams: Option<u32>,
#[cfg(feature = "runtime")]
pub(crate) keep_alive_interval: Option<Duration>,
Expand All @@ -56,6 +58,7 @@ impl Default for Config {
initial_conn_window_size: DEFAULT_CONN_WINDOW,
initial_stream_window_size: DEFAULT_STREAM_WINDOW,
max_frame_size: DEFAULT_MAX_FRAME_SIZE,
enable_connect_protocol: false,
max_concurrent_streams: None,
#[cfg(feature = "runtime")]
keep_alive_interval: None,
Expand Down Expand Up @@ -117,6 +120,9 @@ where
if let Some(max) = config.max_concurrent_streams {
builder.max_concurrent_streams(max);
}
if config.enable_connect_protocol {
builder.enable_connect_protocol();
}
let handshake = builder.handshake(io);

let bdp = if config.adaptive_window {
Expand Down Expand Up @@ -280,7 +286,7 @@ where

let is_connect = req.method() == Method::CONNECT;
let (mut parts, stream) = req.into_parts();
let (req, connect_parts) = if !is_connect {
let (mut req, connect_parts) = if !is_connect {
(
Request::from_parts(
parts,
Expand All @@ -307,6 +313,10 @@ where
)
};

if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
req.extensions_mut().insert(Protocol::from_inner(protocol));
}

let fut = H2Stream::new(service.call(req), connect_parts, respond);
exec.execute_h2stream(fut);
}
Expand Down
9 changes: 9 additions & 0 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,15 @@ impl<E> Http<E> {
self
}

/// Enables the [extended CONNECT protocol].
///
/// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
#[cfg(feature = "http2")]
pub fn http2_enable_connect_protocol(&mut self) -> &mut Self {
self.h2_builder.enable_connect_protocol = true;
self
}

/// Set the maximum buffer size for the connection.
///
/// Default is ~400kb.
Expand Down
9 changes: 9 additions & 0 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,15 @@ impl<I, E> Builder<I, E> {
self
}

/// Enables the [extended CONNECT protocol].
///
/// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
#[cfg(feature = "http2")]
pub fn http2_enable_connect_protocol(mut self) -> Self {
self.protocol.http2_enable_connect_protocol();
self
}

/// Sets the `Executor` to deal with connection tasks.
///
/// Default is `tokio::spawn`.
Expand Down

0 comments on commit 5ec094c

Please sign in to comment.