From 990ecc99d8248a66f1915b11e734e51ba104dcb3 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Mon, 31 Jul 2023 07:34:53 -0600 Subject: [PATCH] feat(ext/http): Upgrade to hyper1.0-rc4 (#19987) Includes a lightly-modified version of hyper-util's `TokioIo` utility. Hyper changes: v1.0.0-rc.4 (2023-07-10) Bug Fixes http1: http1 server graceful shutdown fix (#3261) ([f4b51300](https://github.com/hyperium/hyper/commit/f4b513009d81083081d1c60c1981847bbb17dd5d)) send error on Incoming body when connection errors (#3256) ([52f19259](https://github.com/hyperium/hyper/commit/52f192593fb9ebcf6d3894e0c85cbf710da4decd), closes https://github.com/hyperium/hyper/issues/3253) properly end chunked bodies when it was known to be empty (#3254) ([fec64cf0](https://github.com/hyperium/hyper/commit/fec64cf0abdc678e30ca5f1b310c5118b2e01999), closes https://github.com/hyperium/hyper/issues/3252) Features client: Make clients able to use non-Send executor (#3184) ([d977f209](https://github.com/hyperium/hyper/commit/d977f209bc6068d8f878b22803fc42d90c887fcc), closes https://github.com/hyperium/hyper/issues/3017) rt: replace IO traits with hyper::rt ones (#3230) ([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2), closes https://github.com/hyperium/hyper/issues/3110) add downcast on Sleep trait (#3125) ([d92d3917](https://github.com/hyperium/hyper/commit/d92d3917d950e4c61c37c2170f3ce273d2a0f7d1), closes https://github.com/hyperium/hyper/issues/3027) service: change Service::call to take &self (#3223) ([d894439e](https://github.com/hyperium/hyper/commit/d894439e009aa75103f6382a7ba98fb17da72f02), closes https://github.com/hyperium/hyper/issues/3040) Breaking Changes Any IO transport type provided must not implement hyper::rt::{Read, Write} instead of tokio::io traits. You can grab a helper type from hyper-util to wrap Tokio types, or implement the traits yourself, if it's a custom type. ([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2)) client::conn::http2 types now use another generic for an Executor. Code that names Connection needs to include the additional generic parameter. ([d977f209](https://github.com/hyperium/hyper/commit/d977f209bc6068d8f878b22803fc42d90c887fcc)) The Service::call function no longer takes a mutable reference to self. The FnMut trait bound on the service::util::service_fn function and the trait bound on the impl for the ServiceFn struct were changed from FnMut to Fn. --- Cargo.lock | 7 +- ext/http/Cargo.toml | 2 +- ext/http/http_next.rs | 8 +- ext/http/hyper_util_tokioio.rs | 206 +++++++++++++++++++++++++++++++++ ext/http/lib.rs | 7 +- 5 files changed, 220 insertions(+), 10 deletions(-) create mode 100644 ext/http/hyper_util_tokioio.rs diff --git a/Cargo.lock b/Cargo.lock index 8ecca48c6b43a7..349354d25c1144 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1166,7 +1166,7 @@ dependencies = [ "http", "httparse", "hyper 0.14.26", - "hyper 1.0.0-rc.3", + "hyper 1.0.0-rc.4", "memmem", "mime", "once_cell", @@ -2530,13 +2530,12 @@ dependencies = [ [[package]] name = "hyper" -version = "1.0.0-rc.3" +version = "1.0.0-rc.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b75264b2003a3913f118d35c586e535293b3e22e41f074930762929d071e092" +checksum = "d280a71f348bcc670fc55b02b63c53a04ac0bf2daff2980795aeaf53edae10e6" dependencies = [ "bytes", "futures-channel", - "futures-core", "futures-util", "h2", "http", diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index cb47db6b2ede27..6c7d6e7bb66394 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -36,7 +36,7 @@ fly-accept-encoding = "0.2.0" http.workspace = true httparse.workspace = true hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] } -hyper1 = { package = "hyper", features = ["full"], version = "=1.0.0-rc.3" } +hyper1 = { package = "hyper", features = ["full"], version = "=1.0.0-rc.4" } memmem.workspace = true mime = "0.3.16" once_cell.workspace = true diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index cd63bc8991e6ab..7cf088e3065401 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -1,6 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use crate::compressible::is_content_compressible; use crate::extract_network_stream; +use crate::hyper_util_tokioio::TokioIo; use crate::network_buffered_stream::NetworkStreamPrefixCheck; use crate::request_body::HttpRequestBody; use crate::request_properties::HttpConnectionProperties; @@ -139,7 +140,7 @@ pub fn op_http_upgrade_raw( let mut http = slab_get(slab_id); *http.response() = response; http.complete(); - let mut upgraded = upgrade.await?; + let mut upgraded = TokioIo::new(upgrade.await?); upgraded.write_all(&bytes).await?; break upgraded; } @@ -709,7 +710,7 @@ fn serve_http11_unconditional( let conn = http1::Builder::new() .keep_alive(true) .writev(*USE_WRITEV) - .serve_connection(io, svc); + .serve_connection(TokioIo::new(io), svc); conn.with_upgrades().map_err(AnyError::from) } @@ -718,7 +719,8 @@ fn serve_http2_unconditional( io: impl HttpServeStream, svc: impl HttpService + 'static, ) -> impl Future> + 'static { - let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc); + let conn = + http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc); conn.map_err(AnyError::from) } diff --git a/ext/http/hyper_util_tokioio.rs b/ext/http/hyper_util_tokioio.rs new file mode 100644 index 00000000000000..a2d649cccfb18f --- /dev/null +++ b/ext/http/hyper_util_tokioio.rs @@ -0,0 +1,206 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// Copyright 2023 Sean McArthur +// MIT licensed copy of unreleased hyper-util code from +// https://raw.githubusercontent.com/hyperium/hyper-util/master/src/rt/tokio_io.rs + +#![allow(dead_code)] +//! Tokio IO integration for hyper +use hyper1 as hyper; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use pin_project::pin_project; + +/// A wrapping implementing hyper IO traits for a type that +/// implements Tokio's IO traits. +#[pin_project] +#[derive(Debug)] +pub struct TokioIo { + #[pin] + inner: T, +} + +impl TokioIo { + /// Wrap a type implementing Tokio's IO traits. + pub fn new(inner: T) -> Self { + Self { inner } + } + + /// Borrow the inner type. + pub fn inner(&self) -> &T { + &self.inner + } + + /// Consume this wrapper and get the inner type. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl hyper::rt::Read for TokioIo +where + T: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + // SAFETY: Imported code from hyper-util + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) + { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + // SAFETY: Imported code from hyper-util + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +impl hyper::rt::Write for TokioIo +where + T: tokio::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +impl tokio::io::AsyncRead for TokioIo +where + T: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + //let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + // SAFETY: Imported code from hyper-util + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) + { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + // SAFETY: Imported code from hyper-util + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +impl tokio::io::AsyncWrite for TokioIo +where + T: hyper::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +/// A wrapping implementing Tokio IO traits for a type that +/// implements Hyper's IO traits. +#[pin_project] +#[derive(Debug)] +pub struct TokioIoForHyper { + #[pin] + inner: T, +} + +impl TokioIoForHyper { + /// Wrap a type implementing Tokio's IO traits. + pub fn new(inner: T) -> Self { + Self { inner } + } + + /// Borrow the inner type. + pub fn inner(&self) -> &T { + &self.inner + } + + /// Consume this wrapper and get the inner type. + pub fn into_inner(self) -> T { + self.inner + } +} diff --git a/ext/http/lib.rs b/ext/http/lib.rs index c33c1d15e45332..5a8788f923b8d2 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -50,6 +50,7 @@ use hyper::Body; use hyper::HeaderMap; use hyper::Request; use hyper::Response; +use hyper_util_tokioio::TokioIo; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; @@ -76,6 +77,7 @@ use crate::reader_stream::ShutdownHandle; pub mod compressible; mod http_next; +mod hyper_util_tokioio; mod network_buffered_stream; mod reader_stream; mod request_body; @@ -1061,8 +1063,9 @@ impl CanDowncastUpgrade for hyper1::upgrade::Upgraded { fn downcast( self, ) -> Result<(T, Bytes), Self> { - let hyper1::upgrade::Parts { io, read_buf, .. } = self.downcast()?; - Ok((io, read_buf)) + let hyper1::upgrade::Parts { io, read_buf, .. } = + self.downcast::>()?; + Ok((io.into_inner(), read_buf)) } }