diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8965dd072..957186a0d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -71,7 +71,6 @@ jobs: - "feat.: default-tls disabled" - "feat.: rustls-tls" - "feat.: rustls-tls-manual-roots" - - "feat.: rustls-tls-native-roots" - "feat.: native-tls" - "feat.: default-tls and rustls-tls" - "feat.: cookies" @@ -102,23 +101,23 @@ jobs: - name: windows / stable-x86_64-msvc os: windows-latest target: x86_64-pc-windows-msvc - features: "--features blocking,gzip,brotli,deflate,json,multipart,stream" + features: "--features blocking,gzip,brotli,deflate,json,multipart" - name: windows / stable-i686-msvc os: windows-latest target: i686-pc-windows-msvc - features: "--features blocking,gzip,brotli,deflate,json,multipart,stream" + features: "--features blocking,gzip,brotli,deflate,json,multipart" - name: windows / stable-x86_64-gnu os: windows-latest rust: stable-x86_64-pc-windows-gnu target: x86_64-pc-windows-gnu - features: "--features blocking,gzip,brotli,deflate,json,multipart,stream" + features: "--features blocking,gzip,brotli,deflate,json,multipart" package_name: mingw-w64-x86_64-gcc mingw64_path: "C:\\msys64\\mingw64\\bin" - name: windows / stable-i686-gnu os: windows-latest rust: stable-i686-pc-windows-gnu target: i686-pc-windows-gnu - features: "--features blocking,gzip,brotli,deflate,json,multipart,stream" + features: "--features blocking,gzip,brotli,deflate,json,multipart" package_name: mingw-w64-i686-gcc mingw64_path: "C:\\msys64\\mingw32\\bin" @@ -128,8 +127,6 @@ jobs: features: "--no-default-features --features rustls-tls" - name: "feat.: rustls-tls-manual-roots" features: "--no-default-features --features rustls-tls-manual-roots" - - name: "feat.: rustls-tls-native-roots" - features: "--no-default-features --features rustls-tls-native-roots" - name: "feat.: native-tls" features: "--features native-tls" - name: "feat.: default-tls and rustls-tls" @@ -139,11 +136,11 @@ jobs: - name: "feat.: blocking" features: "--features blocking" - name: "feat.: gzip" - features: "--features gzip,stream" + features: "--features gzip" - name: "feat.: brotli" - features: "--features brotli,stream" + features: "--features brotli" - name: "feat.: deflate" - features: "--features deflate,stream" + features: "--features deflate" - name: "feat.: json" features: "--features json" - name: "feat.: multipart" @@ -207,12 +204,11 @@ jobs: with: toolchain: 'stable' - #- name: Check - # run: RUSTFLAGS="--cfg reqwest_unstable" cargo check --features http3 + - name: Check + run: RUSTFLAGS="--cfg reqwest_unstable" cargo check --features http3 docs: name: Docs - needs: [test] runs-on: ubuntu-latest steps: diff --git a/Cargo.toml b/Cargo.toml index 96447554e..951afd4e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,17 +27,15 @@ features = [ ] [features] -default = ["default-tls", "http2"] +default = ["default-tls"] # Note: this doesn't enable the 'native-tls' feature, which adds specific # functionality for it. default-tls = ["hyper-tls", "native-tls-crate", "__tls", "tokio-native-tls"] -http2 = ["h2", "hyper/http2", "hyper-util/http2"] - # Enables native-tls specific functionality not available by default. native-tls = ["default-tls"] -native-tls-alpn = ["native-tls", "native-tls-crate/alpn", "hyper-tls/alpn"] +native-tls-alpn = ["native-tls", "native-tls-crate/alpn"] native-tls-vendored = ["native-tls", "native-tls-crate/vendored"] rustls-tls = ["rustls-tls-webpki-roots"] @@ -45,7 +43,7 @@ rustls-tls-manual-roots = ["__rustls"] rustls-tls-webpki-roots = ["webpki-roots", "__rustls"] rustls-tls-native-roots = ["rustls-native-certs", "__rustls"] -blocking = ["futures-channel/sink", "futures-util/io", "futures-util/sink", "tokio/rt-multi-thread", "tokio/sync"] +blocking = ["futures-util/io", "tokio/sync"] cookies = ["cookie_crate", "cookie_store"] @@ -66,25 +64,24 @@ stream = ["tokio/fs", "tokio-util", "wasm-streams"] socks = ["tokio-socks"] # Experimental HTTP/3 client. -# Disabled while waiting for quinn to upgrade. -#http3 = ["rustls-tls-manual-roots", "h3", "h3-quinn", "quinn", "futures-channel"] +http3 = ["rustls-tls-manual-roots", "h3", "h3-quinn", "quinn", "futures-channel"] # Internal (PRIVATE!) features used to aid testing. # Don't rely on these whatsoever. They may disappear at anytime. # Enables common types used for TLS. Useless on its own. -__tls = ["dep:rustls-pemfile", "tokio/io-util"] +__tls = ["dep:rustls-pemfile"] # Enables common rustls code. # Equivalent to rustls-tls-manual-roots but shorter :) -__rustls = ["hyper-rustls", "tokio-rustls", "rustls", "__tls", "dep:rustls-pemfile", "rustls-pki-types"] +__rustls = ["hyper-rustls", "tokio-rustls", "rustls", "__tls"] # When enabled, disable using the cached SYS_PROXIES. __internal_proxy_sys_no_cache = [] [dependencies] base64 = "0.21" -http = "1" +http = "0.2" url = "2.2" bytes = "1.0" serde = "1.0" @@ -103,11 +100,9 @@ mime_guess = { version = "2.0", default-features = false, optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] encoding_rs = "0.8" -http-body = "1" -http-body-util = "0.1" -hyper = { version = "1", features = ["http1", "client"] } -hyper-util = { version = "0.1.3", features = ["http1", "client", "client-legacy", "tokio"] } -h2 = { version = "0.4", optional = true } +http-body = "0.4.0" +hyper = { version = "0.14.21", default-features = false, features = ["tcp", "http1", "http2", "client", "runtime"] } +h2 = "0.3.14" once_cell = "1" log = "0.4" mime = "0.3.16" @@ -120,17 +115,16 @@ ipnet = "2.3" rustls-pemfile = { version = "1.0", optional = true } ## default-tls -hyper-tls = { version = "0.6", optional = true } +hyper-tls = { version = "0.5", optional = true } native-tls-crate = { version = "0.2.10", optional = true, package = "native-tls" } tokio-native-tls = { version = "0.3.0", optional = true } # rustls-tls -hyper-rustls = { version = "0.26.0", default-features = false, optional = true } -rustls = { version = "0.22.2", optional = true } -rustls-pki-types = { version = "1.1.0", features = ["alloc"] ,optional = true } -tokio-rustls = { version = "0.25", optional = true } -webpki-roots = { version = "0.26.0", optional = true } -rustls-native-certs = { version = "0.7", optional = true } +hyper-rustls = { version = "0.24.0", default-features = false, optional = true } +rustls = { version = "0.21.6", features = ["dangerous_configuration"], optional = true } +tokio-rustls = { version = "0.24", optional = true } +webpki-roots = { version = "0.25", optional = true } +rustls-native-certs = { version = "0.6", optional = true } ## cookies cookie_crate = { version = "0.17.0", package = "cookie", optional = true } @@ -147,16 +141,15 @@ tokio-socks = { version = "0.5.1", optional = true } trust-dns-resolver = { version = "0.23", optional = true, features = ["tokio-runtime"] } # HTTP/3 experimental support -h3 = { version = "0.0.4", optional = true } -h3-quinn = { version = "0.0.5", optional = true } +h3 = { version = "0.0.3", optional = true } +h3-quinn = { version = "0.0.4", optional = true } quinn = { version = "0.10", default-features = false, features = ["tls-rustls", "ring", "runtime-tokio"], optional = true } futures-channel = { version = "0.3", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] env_logger = "0.10" -hyper = { version = "1.1.0", default-features = false, features = ["http1", "http2", "client", "server"] } -hyper-util = { version = "0.1", features = ["http1", "http2", "client", "client-legacy", "server-auto", "tokio"] } +hyper = { version = "0.14", default-features = false, features = ["tcp", "stream", "http1", "http2", "client", "server", "runtime"] } serde = { version = "1.0", features = ["derive"] } libflate = "1.0" brotli_crate = { package = "brotli", version = "3.3.0" } @@ -244,17 +237,17 @@ required-features = ["cookies"] [[test]] name = "gzip" path = "tests/gzip.rs" -required-features = ["gzip", "stream"] +required-features = ["gzip"] [[test]] name = "brotli" path = "tests/brotli.rs" -required-features = ["brotli", "stream"] +required-features = ["brotli"] [[test]] name = "deflate" path = "tests/deflate.rs" -required-features = ["deflate", "stream"] +required-features = ["deflate"] [[test]] name = "multipart" diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index ff5446e53..0d0357cb6 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -4,9 +4,10 @@ use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; +use futures_core::Stream; use http_body::Body as HttpBody; -use http_body_util::combinators::BoxBody; -//use sync_wrapper::SyncWrapper; +use pin_project_lite::pin_project; +use sync_wrapper::SyncWrapper; #[cfg(feature = "stream")] use tokio::fs::File; use tokio::time::Sleep; @@ -18,22 +19,31 @@ pub struct Body { inner: Inner, } +// The `Stream` trait isn't stable, so the impl isn't public. +pub(crate) struct ImplStream(Body); + enum Inner { Reusable(Bytes), - Streaming(BoxBody>), + Streaming { + body: Pin< + Box< + dyn HttpBody> + + Send + + Sync, + >, + >, + timeout: Option>>, + }, } -/// A body with a total timeout. -/// -/// The timeout does not reset upon each chunk, but rather requires the whole -/// body be streamed before the deadline is reached. -pub(crate) struct TotalTimeoutBody { - inner: B, - timeout: Pin>, +pin_project! { + struct WrapStream { + #[pin] + inner: SyncWrapper, + } } -/// Converts any `impl Body` into a `impl Stream` of just its DATA frames. -pub(crate) struct DataStream(pub(crate) B); +struct WrapHyper(hyper::Body); impl Body { /// Returns a reference to the internal data of the `Body`. @@ -42,7 +52,7 @@ impl Body { pub fn as_bytes(&self) -> Option<&[u8]> { match &self.inner { Inner::Reusable(bytes) => Some(bytes.as_ref()), - Inner::Streaming(..) => None, + Inner::Streaming { .. } => None, } } @@ -73,44 +83,50 @@ impl Body { #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] pub fn wrap_stream(stream: S) -> Body where - S: futures_core::stream::TryStream + Send + Sync + 'static, + S: futures_core::stream::TryStream + Send + 'static, S::Error: Into>, Bytes: From, { Body::stream(stream) } - #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))] pub(crate) fn stream(stream: S) -> Body where - S: futures_core::stream::TryStream + Send + Sync + 'static, + S: futures_core::stream::TryStream + Send + 'static, S::Error: Into>, Bytes: From, { use futures_util::TryStreamExt; - use http_body::Frame; - use http_body_util::StreamBody; - - let body = http_body_util::BodyExt::boxed(StreamBody::new( - stream - .map_ok(|d| Frame::data(Bytes::from(d))) - .map_err(Into::into), - )); + + let body = Box::pin(WrapStream { + inner: SyncWrapper::new(stream.map_ok(Bytes::from).map_err(Into::into)), + }); Body { - inner: Inner::Streaming(body), + inner: Inner::Streaming { + body, + timeout: None, + }, + } + } + + pub(crate) fn response(body: hyper::Body, timeout: Option>>) -> Body { + Body { + inner: Inner::Streaming { + body: Box::pin(WrapHyper(body)), + timeout, + }, } } - /* #[cfg(feature = "blocking")] pub(crate) fn wrap(body: hyper::Body) -> Body { Body { inner: Inner::Streaming { body: Box::pin(WrapHyper(body)), + timeout: None, }, } } - */ pub(crate) fn empty() -> Body { Body::reusable(Bytes::new()) @@ -122,25 +138,6 @@ impl Body { } } - // pub? - pub(crate) fn streaming(inner: B) -> Body - where - B: HttpBody + Send + Sync + 'static, - B::Data: Into, - B::Error: Into>, - { - use http_body_util::BodyExt; - - let boxed = inner - .map_frame(|f| f.map_data(Into::into)) - .map_err(Into::into) - .boxed(); - - Body { - inner: Inner::Streaming(boxed), - } - } - pub(crate) fn try_reuse(self) -> (Option, Self) { let reuse = match self.inner { Inner::Reusable(ref chunk) => Some(chunk.clone()), @@ -157,39 +154,30 @@ impl Body { } } - #[cfg(feature = "multipart")] - pub(crate) fn into_stream(self) -> DataStream { - DataStream(self) + pub(crate) fn into_stream(self) -> ImplStream { + ImplStream(self) } #[cfg(feature = "multipart")] pub(crate) fn content_length(&self) -> Option { match self.inner { Inner::Reusable(ref bytes) => Some(bytes.len() as u64), - Inner::Streaming(ref body) => body.size_hint().exact(), + Inner::Streaming { ref body, .. } => body.size_hint().exact(), } } } -impl Default for Body { - #[inline] - fn default() -> Body { - Body::empty() - } -} - -/* impl From for Body { #[inline] fn from(body: hyper::Body) -> Body { Self { inner: Inner::Streaming { body: Box::pin(WrapHyper(body)), + timeout: None, }, } } } -*/ impl From for Body { #[inline] @@ -241,112 +229,132 @@ impl fmt::Debug for Body { } } -impl HttpBody for Body { +// ===== impl ImplStream ===== + +impl HttpBody for ImplStream { type Data = Bytes; type Error = crate::Error; - fn poll_frame( + fn poll_data( mut self: Pin<&mut Self>, cx: &mut Context, - ) -> Poll, Self::Error>>> { - match self.inner { + ) -> Poll>> { + let opt_try_chunk = match self.0.inner { + Inner::Streaming { + ref mut body, + ref mut timeout, + } => { + if let Some(ref mut timeout) = timeout { + if let Poll::Ready(()) = timeout.as_mut().poll(cx) { + return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); + } + } + futures_core::ready!(Pin::new(body).poll_data(cx)) + .map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::body)) + } Inner::Reusable(ref mut bytes) => { - let out = bytes.split_off(0); - if out.is_empty() { - Poll::Ready(None) + if bytes.is_empty() { + None } else { - Poll::Ready(Some(Ok(hyper::body::Frame::data(out)))) + Some(Ok(std::mem::replace(bytes, Bytes::new()))) } } - Inner::Streaming(ref mut body) => Poll::Ready( - futures_core::ready!(Pin::new(body).poll_frame(cx)) - .map(|opt_chunk| opt_chunk.map_err(crate::error::body)), - ), + }; + + Poll::Ready(opt_try_chunk) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } + + fn is_end_stream(&self) -> bool { + match self.0.inner { + Inner::Streaming { ref body, .. } => body.is_end_stream(), + Inner::Reusable(ref bytes) => bytes.is_empty(), } } fn size_hint(&self) -> http_body::SizeHint { - match self.inner { + match self.0.inner { + Inner::Streaming { ref body, .. } => body.size_hint(), Inner::Reusable(ref bytes) => { let mut hint = http_body::SizeHint::default(); hint.set_exact(bytes.len() as u64); hint } - Inner::Streaming(ref body) => body.size_hint(), } } } -// ===== impl TotalTimeoutBody ===== +impl Stream for ImplStream { + type Item = Result; -pub(crate) fn total_timeout(body: B, timeout: Pin>) -> TotalTimeoutBody { - TotalTimeoutBody { - inner: body, - timeout, + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.poll_data(cx) } } -impl hyper::body::Body for TotalTimeoutBody +// ===== impl WrapStream ===== + +impl HttpBody for WrapStream where - B: hyper::body::Body + Unpin, - B::Error: Into>, + S: Stream>, + D: Into, + E: Into>, { - type Data = B::Data; - type Error = crate::Error; + type Data = Bytes; + type Error = E; - fn poll_frame( - mut self: Pin<&mut Self>, + fn poll_data( + self: Pin<&mut Self>, cx: &mut Context, - ) -> Poll, Self::Error>>> { - if let Poll::Ready(()) = self.timeout.as_mut().poll(cx) { - return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); - } - Poll::Ready( - futures_core::ready!(Pin::new(&mut self.inner).poll_frame(cx)) - .map(|opt_chunk| opt_chunk.map_err(crate::error::body)), - ) + ) -> Poll>> { + let item = futures_core::ready!(self.project().inner.get_pin_mut().poll_next(cx)?); + + Poll::Ready(item.map(|val| Ok(val.into()))) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) } } -pub(crate) type ResponseBody = - http_body_util::combinators::BoxBody>; +// ===== impl WrapHyper ===== -pub(crate) fn response( - body: hyper::body::Incoming, - timeout: Option>>, -) -> ResponseBody { - use http_body_util::BodyExt; +impl HttpBody for WrapHyper { + type Data = Bytes; + type Error = Box; - if let Some(timeout) = timeout { - total_timeout(body, timeout).map_err(Into::into).boxed() - } else { - body.map_err(Into::into).boxed() + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { + // safe pin projection + Pin::new(&mut self.0) + .poll_data(cx) + .map(|opt| opt.map(|res| res.map_err(Into::into))) } -} -// ===== impl DataStream ===== + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } -impl futures_core::Stream for DataStream -where - B: HttpBody + Unpin, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - loop { - return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) { - Some(Ok(frame)) => { - // skip non-data frames - if let Ok(buf) = frame.into_data() { - Poll::Ready(Some(Ok(buf))) - } else { - continue; - } - } - Some(Err(err)) => Poll::Ready(Some(Err(err))), - None => Poll::Ready(None), - }; - } + fn is_end_stream(&self) -> bool { + self.0.is_end_stream() + } + + fn size_hint(&self) -> http_body::SizeHint { + HttpBody::size_hint(&self.0) } } diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 1f2f9e605..8acecc596 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -13,7 +13,7 @@ use http::header::{ }; use http::uri::Scheme; use http::Uri; -use hyper_util::client::legacy::connect::HttpConnector; +use hyper::client::{HttpConnector, ResponseFuture as HyperResponseFuture}; #[cfg(feature = "native-tls-crate")] use native_tls_crate::TlsConnector; use pin_project_lite::pin_project; @@ -46,14 +46,12 @@ use crate::Certificate; #[cfg(any(feature = "native-tls", feature = "__rustls"))] use crate::Identity; use crate::{IntoUrl, Method, Proxy, StatusCode, Url}; -use log::debug; +use log::{debug, trace}; #[cfg(feature = "http3")] use quinn::TransportConfig; #[cfg(feature = "http3")] use quinn::VarInt; -type HyperResponseFuture = hyper_util::client::legacy::ResponseFuture; - /// An asynchronous `Client` to make Requests with. /// /// The Client has various configuration values to tweak, but the defaults @@ -80,7 +78,6 @@ pub struct ClientBuilder { enum HttpVersionPref { Http1, - #[cfg(feature = "http2")] Http2, #[cfg(feature = "http3")] Http3, @@ -127,23 +124,14 @@ struct Config { http1_allow_obsolete_multiline_headers_in_responses: bool, http1_ignore_invalid_headers_in_responses: bool, http1_allow_spaces_after_header_name_in_responses: bool, - #[cfg(feature = "http2")] http2_initial_stream_window_size: Option, - #[cfg(feature = "http2")] http2_initial_connection_window_size: Option, - #[cfg(feature = "http2")] http2_adaptive_window: bool, - #[cfg(feature = "http2")] http2_max_frame_size: Option, - #[cfg(feature = "http2")] http2_keep_alive_interval: Option, - #[cfg(feature = "http2")] http2_keep_alive_timeout: Option, - #[cfg(feature = "http2")] http2_keep_alive_while_idle: bool, local_address: Option, - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - interface: Option, nodelay: bool, #[cfg(feature = "cookies")] cookie_store: Option>, @@ -221,23 +209,14 @@ impl ClientBuilder { http1_allow_obsolete_multiline_headers_in_responses: false, http1_ignore_invalid_headers_in_responses: false, http1_allow_spaces_after_header_name_in_responses: false, - #[cfg(feature = "http2")] http2_initial_stream_window_size: None, - #[cfg(feature = "http2")] http2_initial_connection_window_size: None, - #[cfg(feature = "http2")] http2_adaptive_window: false, - #[cfg(feature = "http2")] http2_max_frame_size: None, - #[cfg(feature = "http2")] http2_keep_alive_interval: None, - #[cfg(feature = "http2")] http2_keep_alive_timeout: None, - #[cfg(feature = "http2")] http2_keep_alive_while_idle: false, local_address: None, - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - interface: None, nodelay: true, trust_dns: cfg!(feature = "trust-dns"), #[cfg(feature = "cookies")] @@ -368,7 +347,6 @@ impl ClientBuilder { HttpVersionPref::Http1 => { tls.request_alpns(&["http/1.1"]); } - #[cfg(feature = "http2")] HttpVersionPref::Http2 => { tls.request_alpns(&["h2"]); } @@ -434,12 +412,6 @@ impl ClientBuilder { proxies.clone(), user_agent(&config.headers), config.local_address, - #[cfg(any( - target_os = "android", - target_os = "fuchsia", - target_os = "linux" - ))] - config.interface.as_deref(), config.nodelay, config.tls_info, )? @@ -451,8 +423,6 @@ impl ClientBuilder { proxies.clone(), user_agent(&config.headers), config.local_address, - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - config.interface.as_deref(), config.nodelay, config.tls_info, ), @@ -468,12 +438,6 @@ impl ClientBuilder { config.quic_receive_window, config.quic_send_window, config.local_address, - #[cfg(any( - target_os = "android", - target_os = "fuchsia", - target_os = "linux" - ))] - config.interface.as_deref(), &config.http_version_pref, )?; } @@ -484,12 +448,6 @@ impl ClientBuilder { proxies.clone(), user_agent(&config.headers), config.local_address, - #[cfg(any( - target_os = "android", - target_os = "fuchsia", - target_os = "linux" - ))] - config.interface.as_deref(), config.nodelay, config.tls_info, ) @@ -506,7 +464,18 @@ impl ClientBuilder { #[cfg(feature = "rustls-tls-webpki-roots")] if config.tls_built_in_root_certs { - root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + use rustls::OwnedTrustAnchor; + + let trust_anchors = + webpki_roots::TLS_SERVER_ROOTS.iter().map(|trust_anchor| { + OwnedTrustAnchor::from_subject_spki_name_constraints( + trust_anchor.subject, + trust_anchor.spki, + trust_anchor.name_constraints, + ) + }); + + root_cert_store.add_trust_anchors(trust_anchors); } #[cfg(feature = "rustls-tls-native-roots")] @@ -516,14 +485,17 @@ impl ClientBuilder { for cert in rustls_native_certs::load_native_certs() .map_err(crate::error::builder)? { + let cert = rustls::Certificate(cert.0); // Continue on parsing errors, as native stores often include ancient or syntactically // invalid certificates, like root certificates without any X509 extensions. // Inspiration: https://github.com/rustls/rustls/blob/633bf4ba9d9521a95f68766d04c22e2b01e68318/rustls/src/anchors.rs#L105-L112 - match root_cert_store.add(cert.into()) { + match root_cert_store.add(&cert) { Ok(_) => valid_count += 1, Err(err) => { invalid_count += 1; - log::debug!("rustls failed to parse DER certificate: {err:?}"); + log::warn!( + "rustls failed to parse DER certificate {err:?} {cert:?}" + ); } } } @@ -558,8 +530,12 @@ impl ClientBuilder { } // Build TLS config - let config_builder = - rustls::ClientConfig::builder().with_root_certificates(root_cert_store); + let config_builder = rustls::ClientConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_protocol_versions(&versions) + .map_err(crate::error::builder)? + .with_root_certificates(root_cert_store); // Finalize TLS config let mut tls = if let Some(id) = config.identity { @@ -581,7 +557,6 @@ impl ClientBuilder { HttpVersionPref::Http1 => { tls.alpn_protocols = vec!["http/1.1".into()]; } - #[cfg(feature = "http2")] HttpVersionPref::Http2 => { tls.alpn_protocols = vec!["h2".into()]; } @@ -616,12 +591,6 @@ impl ClientBuilder { proxies.clone(), user_agent(&config.headers), config.local_address, - #[cfg(any( - target_os = "android", - target_os = "fuchsia", - target_os = "linux" - ))] - config.interface.as_deref(), config.nodelay, config.tls_info, ) @@ -641,42 +610,35 @@ impl ClientBuilder { connector.set_timeout(config.connect_timeout); connector.set_verbose(config.connection_verbose); - let mut builder = - hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()); - #[cfg(feature = "http2")] - { - if matches!(config.http_version_pref, HttpVersionPref::Http2) { - builder.http2_only(true); - } + let mut builder = hyper::Client::builder(); + if matches!(config.http_version_pref, HttpVersionPref::Http2) { + builder.http2_only(true); + } - if let Some(http2_initial_stream_window_size) = config.http2_initial_stream_window_size - { - builder.http2_initial_stream_window_size(http2_initial_stream_window_size); - } - if let Some(http2_initial_connection_window_size) = - config.http2_initial_connection_window_size - { - builder.http2_initial_connection_window_size(http2_initial_connection_window_size); - } - if config.http2_adaptive_window { - builder.http2_adaptive_window(true); - } - if let Some(http2_max_frame_size) = config.http2_max_frame_size { - builder.http2_max_frame_size(http2_max_frame_size); - } - if let Some(http2_keep_alive_interval) = config.http2_keep_alive_interval { - builder.http2_keep_alive_interval(http2_keep_alive_interval); - } - if let Some(http2_keep_alive_timeout) = config.http2_keep_alive_timeout { - builder.http2_keep_alive_timeout(http2_keep_alive_timeout); - } - if config.http2_keep_alive_while_idle { - builder.http2_keep_alive_while_idle(true); - } + if let Some(http2_initial_stream_window_size) = config.http2_initial_stream_window_size { + builder.http2_initial_stream_window_size(http2_initial_stream_window_size); + } + if let Some(http2_initial_connection_window_size) = + config.http2_initial_connection_window_size + { + builder.http2_initial_connection_window_size(http2_initial_connection_window_size); + } + if config.http2_adaptive_window { + builder.http2_adaptive_window(true); + } + if let Some(http2_max_frame_size) = config.http2_max_frame_size { + builder.http2_max_frame_size(http2_max_frame_size); + } + if let Some(http2_keep_alive_interval) = config.http2_keep_alive_interval { + builder.http2_keep_alive_interval(http2_keep_alive_interval); + } + if let Some(http2_keep_alive_timeout) = config.http2_keep_alive_timeout { + builder.http2_keep_alive_timeout(http2_keep_alive_timeout); + } + if config.http2_keep_alive_while_idle { + builder.http2_keep_alive_while_idle(true); } - #[cfg(not(target_arch = "wasm32"))] - builder.timer(hyper_util::rt::TokioTimer::new()); builder.pool_idle_timeout(config.pool_idle_timeout); builder.pool_max_idle_per_host(config.pool_max_idle_per_host); connector.set_keepalive(config.tcp_keepalive); @@ -1140,7 +1102,6 @@ impl ClientBuilder { } /// Only use HTTP/2. - #[cfg(feature = "http2")] pub fn http2_prior_knowledge(mut self) -> ClientBuilder { self.config.http_version_pref = HttpVersionPref::Http2; self @@ -1157,7 +1118,6 @@ impl ClientBuilder { /// Sets the `SETTINGS_INITIAL_WINDOW_SIZE` option for HTTP2 stream-level flow control. /// /// Default is currently 65,535 but may change internally to optimize for common uses. - #[cfg(feature = "http2")] pub fn http2_initial_stream_window_size(mut self, sz: impl Into>) -> ClientBuilder { self.config.http2_initial_stream_window_size = sz.into(); self @@ -1166,7 +1126,6 @@ impl ClientBuilder { /// Sets the max connection-level flow control for HTTP2 /// /// Default is currently 65,535 but may change internally to optimize for common uses. - #[cfg(feature = "http2")] pub fn http2_initial_connection_window_size( mut self, sz: impl Into>, @@ -1179,7 +1138,6 @@ impl ClientBuilder { /// /// Enabling this will override the limits set in `http2_initial_stream_window_size` and /// `http2_initial_connection_window_size`. - #[cfg(feature = "http2")] pub fn http2_adaptive_window(mut self, enabled: bool) -> ClientBuilder { self.config.http2_adaptive_window = enabled; self @@ -1188,7 +1146,6 @@ impl ClientBuilder { /// Sets the maximum frame size to use for HTTP2. /// /// Default is currently 16,384 but may change internally to optimize for common uses. - #[cfg(feature = "http2")] pub fn http2_max_frame_size(mut self, sz: impl Into>) -> ClientBuilder { self.config.http2_max_frame_size = sz.into(); self @@ -1198,7 +1155,6 @@ impl ClientBuilder { /// /// Pass `None` to disable HTTP2 keep-alive. /// Default is currently disabled. - #[cfg(feature = "http2")] pub fn http2_keep_alive_interval( mut self, interval: impl Into>, @@ -1212,7 +1168,6 @@ impl ClientBuilder { /// If the ping is not acknowledged within the timeout, the connection will be closed. /// Does nothing if `http2_keep_alive_interval` is disabled. /// Default is currently disabled. - #[cfg(feature = "http2")] pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> ClientBuilder { self.config.http2_keep_alive_timeout = Some(timeout); self @@ -1224,7 +1179,6 @@ impl ClientBuilder { /// If enabled, pings are also sent when no streams are active. /// Does nothing if `http2_keep_alive_interval` is disabled. /// Default is `false`. - #[cfg(feature = "http2")] pub fn http2_keep_alive_while_idle(mut self, enabled: bool) -> ClientBuilder { self.config.http2_keep_alive_while_idle = enabled; self @@ -1259,22 +1213,6 @@ impl ClientBuilder { self } - /// Bind to an interface by `SO_BINDTODEVICE`. - /// - /// # Example - /// - /// ``` - /// let interface = "lo"; - /// let client = reqwest::Client::builder() - /// .interface(interface) - /// .build().unwrap(); - /// ``` - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - pub fn interface(mut self, interface: &str) -> ClientBuilder { - self.config.interface = Some(interface.to_string()); - self - } - /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. /// /// If `None`, the option will not be set. @@ -1718,7 +1656,7 @@ impl ClientBuilder { } } -type HyperClient = hyper_util::client::legacy::Client; +type HyperClient = hyper::Client; impl Default for Client { fn default() -> Self { @@ -1898,7 +1836,9 @@ impl Client { ResponseFuture::H3(self.inner.h3_client.as_ref().unwrap().request(req)) } _ => { - let mut req = builder.body(body).expect("valid request parts"); + let mut req = builder + .body(body.into_stream()) + .expect("valid request parts"); *req.headers_mut() = headers.clone(); ResponseFuture::Default(self.inner.hyper.request(req)) } @@ -2048,7 +1988,6 @@ impl Config { f.field("http1_only", &true); } - #[cfg(feature = "http2")] if matches!(self.http_version_pref, HttpVersionPref::Http2) { f.field("http2_prior_knowledge", &true); } @@ -2065,11 +2004,6 @@ impl Config { f.field("local_address", v); } - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - if let Some(ref v) = self.interface { - f.field("interface", v); - } - if self.nodelay { f.field("tcp_nodelay", &true); } @@ -2223,10 +2157,7 @@ impl PendingRequest { self.project().headers } - #[cfg(feature = "http2")] fn retry_error(mut self: Pin<&mut Self>, err: &(dyn std::error::Error + 'static)) -> bool { - use log::trace; - if !is_retryable_error(err) { return false; } @@ -2272,7 +2203,7 @@ impl PendingRequest { let mut req = hyper::Request::builder() .method(self.method.clone()) .uri(uri) - .body(body) + .body(body.into_stream()) .expect("valid request parts"); *req.headers_mut() = self.headers.clone(); ResponseFuture::Default(self.client.hyper.request(req)) @@ -2283,15 +2214,7 @@ impl PendingRequest { } } -#[cfg(feature = "http2")] fn is_retryable_error(err: &(dyn std::error::Error + 'static)) -> bool { - // pop the legacy::Error - let err = if let Some(err) = err.source() { - err - } else { - return false; - }; - #[cfg(feature = "http3")] if let Some(cause) = err.source() { if let Some(err) = cause.downcast_ref::() { @@ -2361,7 +2284,6 @@ impl Future for PendingRequest { let res = match self.as_mut().in_flight().get_mut() { ResponseFuture::Default(r) => match Pin::new(r).poll(cx) { Poll::Ready(Err(e)) => { - #[cfg(feature = "http2")] if self.as_mut().retry_error(&e) { continue; } @@ -2517,7 +2439,7 @@ impl Future for PendingRequest { let mut req = hyper::Request::builder() .method(self.method.clone()) .uri(uri.clone()) - .body(body) + .body(body.into_stream()) .expect("valid request parts"); *req.headers_mut() = headers.clone(); std::mem::swap(self.as_mut().headers(), &mut headers); diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index 128f77ecb..86eb6e5d9 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -16,15 +16,14 @@ use bytes::Bytes; use futures_core::Stream; use futures_util::stream::Peekable; use http::HeaderMap; -use hyper::body::Body as HttpBody; -use hyper::body::Frame; +use hyper::body::HttpBody; #[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))] use tokio_util::codec::{BytesCodec, FramedRead}; #[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))] use tokio_util::io::StreamReader; -use super::body::ResponseBody; +use super::super::Body; use crate::error; #[derive(Clone, Copy, Debug)] @@ -37,19 +36,6 @@ pub(super) struct Accepts { pub(super) deflate: bool, } -impl Accepts { - pub fn none() -> Self { - Self { - #[cfg(feature = "gzip")] - gzip: false, - #[cfg(feature = "brotli")] - brotli: false, - #[cfg(feature = "deflate")] - deflate: false, - } - } -} - /// A response decompressor over a non-blocking stream of chunks. /// /// The inner decoder may be constructed asynchronously. @@ -64,7 +50,7 @@ type PeekableIoStreamReader = StreamReader; enum Inner { /// A `PlainText` decoder just returns the response content as is. - PlainText(ResponseBody), + PlainText(super::body::ImplStream), /// A `Gzip` decoder will uncompress the gzipped response content before returning it. #[cfg(feature = "gzip")] @@ -86,7 +72,7 @@ enum Inner { /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. struct Pending(PeekableIoStream, DecoderType); -pub(crate) struct IoStream(B); +struct IoStream(super::body::ImplStream); enum DecoderType { #[cfg(feature = "gzip")] @@ -107,21 +93,16 @@ impl Decoder { #[cfg(feature = "blocking")] pub(crate) fn empty() -> Decoder { Decoder { - inner: Inner::PlainText(empty()), + inner: Inner::PlainText(Body::empty().into_stream()), } } - #[cfg(feature = "blocking")] - pub(crate) fn into_stream(self) -> IoStream { - IoStream(self) - } - /// A plain text decoder. /// /// This decoder will emit the underlying chunks as-is. - fn plain_text(body: ResponseBody) -> Decoder { + fn plain_text(body: Body) -> Decoder { Decoder { - inner: Inner::PlainText(body), + inner: Inner::PlainText(body.into_stream()), } } @@ -129,12 +110,12 @@ impl Decoder { /// /// This decoder will buffer and decompress chunks that are gzipped. #[cfg(feature = "gzip")] - fn gzip(body: ResponseBody) -> Decoder { + fn gzip(body: Body) -> Decoder { use futures_util::StreamExt; Decoder { inner: Inner::Pending(Box::pin(Pending( - IoStream(body).peekable(), + IoStream(body.into_stream()).peekable(), DecoderType::Gzip, ))), } @@ -144,12 +125,12 @@ impl Decoder { /// /// This decoder will buffer and decompress chunks that are brotlied. #[cfg(feature = "brotli")] - fn brotli(body: ResponseBody) -> Decoder { + fn brotli(body: Body) -> Decoder { use futures_util::StreamExt; Decoder { inner: Inner::Pending(Box::pin(Pending( - IoStream(body).peekable(), + IoStream(body.into_stream()).peekable(), DecoderType::Brotli, ))), } @@ -159,12 +140,12 @@ impl Decoder { /// /// This decoder will buffer and decompress chunks that are deflated. #[cfg(feature = "deflate")] - fn deflate(body: ResponseBody) -> Decoder { + fn deflate(body: Body) -> Decoder { use futures_util::StreamExt; Decoder { inner: Inner::Pending(Box::pin(Pending( - IoStream(body).peekable(), + IoStream(body.into_stream()).peekable(), DecoderType::Deflate, ))), } @@ -206,11 +187,7 @@ impl Decoder { /// how to decode the content body of the request. /// /// Uses the correct variant by inspecting the Content-Encoding header. - pub(super) fn detect( - _headers: &mut HeaderMap, - body: ResponseBody, - _accepts: Accepts, - ) -> Decoder { + pub(super) fn detect(_headers: &mut HeaderMap, body: Body, _accepts: Accepts) -> Decoder { #[cfg(feature = "gzip")] { if _accepts.gzip && Decoder::detect_encoding(_headers, "gzip") { @@ -236,35 +213,26 @@ impl Decoder { } } -impl HttpBody for Decoder { - type Data = Bytes; - type Error = crate::Error; +impl Stream for Decoder { + type Item = Result; - fn poll_frame( - mut self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll, Self::Error>>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // Do a read or poll for a pending decoder value. match self.inner { #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))] Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { Poll::Ready(Ok(inner)) => { self.inner = inner; - self.poll_frame(cx) + self.poll_next(cx) } Poll::Ready(Err(e)) => Poll::Ready(Some(Err(crate::error::decode_io(e)))), Poll::Pending => Poll::Pending, }, - Inner::PlainText(ref mut body) => { - match futures_core::ready!(Pin::new(body).poll_frame(cx)) { - Some(Ok(frame)) => Poll::Ready(Some(Ok(frame))), - Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode(err)))), - None => Poll::Ready(None), - } - } + Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx), #[cfg(feature = "gzip")] Inner::Gzip(ref mut decoder) => { match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))), + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), None => Poll::Ready(None), } @@ -272,7 +240,7 @@ impl HttpBody for Decoder { #[cfg(feature = "brotli")] Inner::Brotli(ref mut decoder) => { match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))), + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), None => Poll::Ready(None), } @@ -280,13 +248,32 @@ impl HttpBody for Decoder { #[cfg(feature = "deflate")] Inner::Deflate(ref mut decoder) => { match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))), + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), None => Poll::Ready(None), } } } } +} + +impl HttpBody for Decoder { + type Data = Bytes; + type Error = crate::Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { + self.poll_next(cx) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } fn size_hint(&self) -> http_body::SizeHint { match self.inner { @@ -298,11 +285,6 @@ impl HttpBody for Decoder { } } -fn empty() -> ResponseBody { - use http_body_util::{combinators::BoxBody, BodyExt, Empty}; - BoxBody::new(Empty::new().map_err(|never| match never {})) -} - impl Future for Pending { type Output = Result; @@ -321,10 +303,13 @@ impl Future for Pending { .expect("just peeked Some") .unwrap_err())); } - None => return Poll::Ready(Ok(Inner::PlainText(empty()))), + None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), }; - let _body = std::mem::replace(&mut self.0, IoStream(empty()).peekable()); + let _body = std::mem::replace( + &mut self.0, + IoStream(Body::empty().into_stream()).peekable(), + ); match self.1 { #[cfg(feature = "brotli")] @@ -346,27 +331,14 @@ impl Future for Pending { } } -impl Stream for IoStream -where - B: HttpBody + Unpin, - B::Error: Into>, -{ +impl Stream for IoStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - loop { - return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) { - Some(Ok(frame)) => { - // skip non-data frames - if let Ok(buf) = frame.into_data() { - Poll::Ready(Some(Ok(buf))) - } else { - continue; - } - } - Some(Err(err)) => Poll::Ready(Some(Err(error::into_io(err.into())))), - None => Poll::Ready(None), - }; + match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { + Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), + Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), + None => Poll::Ready(None), } } } @@ -374,7 +346,6 @@ where // ===== impl Accepts ===== impl Accepts { - /* pub(super) fn none() -> Self { Accepts { #[cfg(feature = "gzip")] @@ -385,7 +356,6 @@ impl Accepts { deflate: false, } } - */ pub(super) fn as_str(&self) -> Option<&'static str> { match (self.is_gzip(), self.is_brotli(), self.is_deflate()) { diff --git a/src/async_impl/h3_client/connect.rs b/src/async_impl/h3_client/connect.rs index ec732f66a..968704713 100644 --- a/src/async_impl/h3_client/connect.rs +++ b/src/async_impl/h3_client/connect.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use h3::client::SendRequest; use h3_quinn::{Connection, OpenStreams}; use http::Uri; -use hyper_util::client::legacy::connect::dns::Name; +use hyper::client::connect::dns::Name; use quinn::{ClientConfig, Endpoint, TransportConfig}; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; diff --git a/src/async_impl/h3_client/dns.rs b/src/async_impl/h3_client/dns.rs index bd59daaed..9cb50d1e3 100644 --- a/src/async_impl/h3_client/dns.rs +++ b/src/async_impl/h3_client/dns.rs @@ -1,5 +1,5 @@ use core::task; -use hyper_util::client::legacy::connect::dns::Name; +use hyper::client::connect::dns::Name; use std::future::Future; use std::net::SocketAddr; use std::task::Poll; diff --git a/src/async_impl/h3_client/pool.rs b/src/async_impl/h3_client/pool.rs index d6442c81a..d9ca3a661 100644 --- a/src/async_impl/h3_client/pool.rs +++ b/src/async_impl/h3_client/pool.rs @@ -13,7 +13,7 @@ use h3::client::SendRequest; use h3_quinn::{Connection, OpenStreams}; use http::uri::{Authority, Scheme}; use http::{Request, Response, Uri}; -use hyper::body as HyperBody; +use hyper::Body as HyperBody; use log::trace; pub(super) type Key = (Scheme, Authority); diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index a947b5151..77a3e53aa 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -4,9 +4,9 @@ use std::pin::Pin; use bytes::Bytes; use encoding_rs::{Encoding, UTF_8}; -use http_body_util::BodyExt; +use futures_util::stream::StreamExt; +use hyper::client::connect::HttpInfo; use hyper::{HeaderMap, StatusCode, Version}; -use hyper_util::client::legacy::connect::HttpInfo; use mime::Mime; #[cfg(feature = "json")] use serde::de::DeserializeOwned; @@ -17,9 +17,9 @@ use url::Url; use super::body::Body; use super::decoder::{Accepts, Decoder}; -use crate::async_impl::body::ResponseBody; #[cfg(feature = "cookies")] use crate::cookie; +use crate::response::ResponseUrl; /// A Response to a submitted `Request`. pub struct Response { @@ -31,17 +31,13 @@ pub struct Response { impl Response { pub(super) fn new( - res: hyper::Response, + res: hyper::Response, url: Url, accepts: Accepts, timeout: Option>>, ) -> Response { let (mut parts, body) = res.into_parts(); - let decoder = Decoder::detect( - &mut parts.headers, - super::body::response(body, timeout), - accepts, - ); + let decoder = Decoder::detect(&mut parts.headers, Body::response(body, timeout), accepts); let res = hyper::Response::from_parts(parts, decoder); Response { @@ -82,9 +78,9 @@ impl Response { /// - The response is compressed and automatically decoded (thus changing /// the actual decoded length). pub fn content_length(&self) -> Option { - use hyper::body::Body; + use hyper::body::HttpBody; - Body::size_hint(self.res.body()).exact() + HttpBody::size_hint(self.res.body()).exact() } /// Retrieve the cookies contained in the response. @@ -260,11 +256,7 @@ impl Response { /// # } /// ``` pub async fn bytes(self) -> crate::Result { - use http_body_util::BodyExt; - - BodyExt::collect(self.res.into_body()) - .await - .map(|buf| buf.to_bytes()) + hyper::body::to_bytes(self.res.into_body()).await } /// Stream a chunk of the response body. @@ -284,19 +276,10 @@ impl Response { /// # } /// ``` pub async fn chunk(&mut self) -> crate::Result> { - use http_body_util::BodyExt; - - // loop to ignore unrecognized frames - loop { - if let Some(res) = self.res.body_mut().frame().await { - let frame = res?; - if let Ok(buf) = frame.into_data() { - return Ok(Some(buf)); - } - // else continue - } else { - return Ok(None); - } + if let Some(item) = self.res.body_mut().next().await { + Ok(Some(item?)) + } else { + Ok(None) } } @@ -325,7 +308,7 @@ impl Response { #[cfg(feature = "stream")] #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] pub fn bytes_stream(self) -> impl futures_core::Stream> { - super::body::DataStream(self.res.into_body()) + self.res.into_body() } // util methods @@ -413,26 +396,11 @@ impl fmt::Debug for Response { } } -/// A `Response` can be piped as the `Body` of another request. -impl From for Body { - fn from(r: Response) -> Body { - Body::streaming(r.res.into_body()) - } -} - -// I'm not sure this conversion is that useful... People should be encouraged -// to use `http::Resposne`, not `reqwest::Response`. impl> From> for Response { fn from(r: http::Response) -> Response { - use crate::response::ResponseUrl; - let (mut parts, body) = r.into_parts(); - let body: crate::async_impl::body::Body = body.into(); - let decoder = Decoder::detect( - &mut parts.headers, - ResponseBody::new(body.map_err(Into::into)), - Accepts::none(), - ); + let body = body.into(); + let decoder = Decoder::detect(&mut parts.headers, body, Accepts::none()); let url = parts .extensions .remove::() @@ -446,6 +414,13 @@ impl> From> for Response { } } +/// A `Response` can be piped as the `Body` of another request. +impl From for Body { + fn from(r: Response) -> Body { + Body::stream(r.res.into_body()) + } +} + #[cfg(test)] mod tests { use super::Response; diff --git a/src/async_impl/upgrade.rs b/src/async_impl/upgrade.rs index 3b599d0ad..4a69b4db5 100644 --- a/src/async_impl/upgrade.rs +++ b/src/async_impl/upgrade.rs @@ -3,12 +3,11 @@ use std::task::{self, Poll}; use std::{fmt, io}; use futures_util::TryFutureExt; -use hyper_util::rt::TokioIo; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; /// An upgraded HTTP connection. pub struct Upgraded { - inner: TokioIo, + inner: hyper::upgrade::Upgraded, } impl AsyncRead for Upgraded { @@ -59,9 +58,7 @@ impl fmt::Debug for Upgraded { impl From for Upgraded { fn from(inner: hyper::upgrade::Upgraded) -> Self { - Upgraded { - inner: TokioIo::new(inner), - } + Upgraded { inner } } } diff --git a/src/blocking/body.rs b/src/blocking/body.rs index dd44c6fa2..db46cde05 100644 --- a/src/blocking/body.rs +++ b/src/blocking/body.rs @@ -9,7 +9,6 @@ use std::ptr; use bytes::buf::UninitSlice; use bytes::Bytes; -use futures_channel::mpsc; use crate::async_impl; @@ -134,12 +133,12 @@ impl Body { pub(crate) fn into_async(self) -> (Option, async_impl::Body, Option) { match self.kind { Kind::Reader(read, len) => { - let (tx, rx) = mpsc::channel(0); + let (tx, rx) = hyper::Body::channel(); let tx = Sender { body: (read, len), tx, }; - (Some(tx), async_impl::Body::stream(rx), len) + (Some(tx), async_impl::Body::wrap(rx), len) } Kind::Bytes(chunk) => { let len = chunk.len() as u64; @@ -258,23 +257,11 @@ impl Read for Reader { pub(crate) struct Sender { body: (Box, Option), - tx: mpsc::Sender>, + tx: hyper::body::Sender, } -#[derive(Debug)] -struct Abort; - -impl fmt::Display for Abort { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("abort request body") - } -} - -impl std::error::Error for Abort {} - async fn send_future(sender: Sender) -> Result<(), crate::Error> { use bytes::{BufMut, BytesMut}; - use futures_util::SinkExt; use std::cmp; let con_len = sender.body.1; @@ -325,11 +312,7 @@ async fn send_future(sender: Sender) -> Result<(), crate::Error> { buf.advance_mut(n); }, Err(e) => { - let _ = tx - .take() - .expect("tx only taken on error") - .clone() - .try_send(Err(Abort)); + tx.take().expect("tx only taken on error").abort(); return Err(crate::error::body(e)); } } @@ -341,7 +324,7 @@ async fn send_future(sender: Sender) -> Result<(), crate::Error> { let buf_len = buf.len() as u64; tx.as_mut() .expect("tx only taken on error") - .send(Ok(buf.split().freeze())) + .send_data(buf.split().freeze()) .await .map_err(crate::error::body)?; diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 6c6fb9993..689e6a0d8 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -513,21 +513,6 @@ impl ClientBuilder { self.with_inner(move |inner| inner.local_address(addr)) } - /// Bind to an interface by `SO_BINDTODEVICE`. - /// - /// # Example - /// - /// ``` - /// let interface = "lo"; - /// let client = reqwest::blocking::Client::builder() - /// .interface(interface) - /// .build().unwrap(); - /// ``` - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - pub fn interface(self, interface: &str) -> ClientBuilder { - self.with_inner(move |inner| inner.interface(interface)) - } - /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. /// /// If `None`, the option will not be set. @@ -1187,7 +1172,7 @@ impl Default for Timeout { } } -pub(crate) struct KeepCoreThreadAlive(#[allow(dead_code)] Option>); +pub(crate) struct KeepCoreThreadAlive(#[allow(unused)] Option>); impl KeepCoreThreadAlive { pub(crate) fn empty() -> KeepCoreThreadAlive { diff --git a/src/blocking/response.rs b/src/blocking/response.rs index 6ece95ba6..2da634f68 100644 --- a/src/blocking/response.rs +++ b/src/blocking/response.rs @@ -397,7 +397,7 @@ impl Response { if self.body.is_none() { let body = mem::replace(self.inner.body_mut(), async_impl::Decoder::empty()); - let body = body.into_stream().into_async_read(); + let body = body.map_err(crate::error::into_io).into_async_read(); self.body = Some(Box::pin(body)); } diff --git a/src/connect.rs b/src/connect.rs index 68ef26924..b6b51130e 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -2,13 +2,11 @@ use http::header::HeaderValue; use http::uri::{Authority, Scheme}; use http::Uri; -use hyper::rt::{Read, ReadBufCursor, Write}; -use hyper_util::client::legacy::connect::{Connected, Connection}; -#[cfg(feature = "__tls")] -use hyper_util::rt::TokioIo; +use hyper::client::connect::{Connected, Connection}; +use hyper::service::Service; #[cfg(feature = "native-tls-crate")] use native_tls_crate::{TlsConnector, TlsConnectorBuilder}; -use tower_service::Service; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; use std::future::Future; @@ -27,7 +25,7 @@ use crate::dns::DynResolver; use crate::error::BoxError; use crate::proxy::{Proxy, ProxyScheme}; -pub(crate) type HttpConnector = hyper_util::client::legacy::connect::HttpConnector; +pub(crate) type HttpConnector = hyper::client::HttpConnector; #[derive(Clone)] pub(crate) struct Connector { @@ -86,8 +84,6 @@ impl Connector { proxies: Arc>, user_agent: Option, local_addr: T, - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - interface: Option<&str>, nodelay: bool, tls_info: bool, ) -> crate::Result @@ -96,15 +92,7 @@ impl Connector { { let tls = tls.build().map_err(crate::error::builder)?; Ok(Self::from_built_default_tls( - http, - tls, - proxies, - user_agent, - local_addr, - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - interface, - nodelay, - tls_info, + http, tls, proxies, user_agent, local_addr, nodelay, tls_info, )) } @@ -115,8 +103,6 @@ impl Connector { proxies: Arc>, user_agent: Option, local_addr: T, - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - interface: Option<&str>, nodelay: bool, tls_info: bool, ) -> Connector @@ -124,10 +110,6 @@ impl Connector { T: Into>, { http.set_local_address(local_addr.into()); - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - if let Some(interface) = interface { - http.set_interface(interface); - } http.set_nodelay(nodelay); http.enforce_http(false); @@ -149,8 +131,6 @@ impl Connector { proxies: Arc>, user_agent: Option, local_addr: T, - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - interface: Option<&str>, nodelay: bool, tls_info: bool, ) -> Connector @@ -158,10 +138,6 @@ impl Connector { T: Into>, { http.set_local_address(local_addr.into()); - #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] - if let Some(interface) = interface { - http.set_interface(interface.to_owned()); - } http.set_nodelay(nodelay); http.enforce_http(false); @@ -217,11 +193,8 @@ impl Connector { if dst.scheme() == Some(&Scheme::HTTPS) { let host = dst.host().ok_or("no host in url")?.to_string(); let conn = socks::connect(proxy, dst, dns).await?; - let conn = hyper_util::rt::TokioIo::new(conn); - let conn = hyper_util::rt::TokioIo::new(conn); let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone()); let io = tls_connector.connect(&host, conn).await?; - let io = hyper_util::rt::TokioIo::new(io); return Ok(Conn { inner: self.verbose.wrap(NativeTlsConn { inner: io }), is_proxy: false, @@ -238,15 +211,11 @@ impl Connector { let tls = tls_proxy.clone(); let host = dst.host().ok_or("no host in url")?.to_string(); let conn = socks::connect(proxy, dst, dns).await?; - let conn = hyper_util::rt::TokioIo::new(conn); - let conn = hyper_util::rt::TokioIo::new(conn); - let server_name = - rustls_pki_types::ServerName::try_from(host.as_str().to_owned()) - .map_err(|_| "Invalid Server Name")?; + let server_name = rustls::ServerName::try_from(host.as_str()) + .map_err(|_| "Invalid Server Name")?; let io = RustlsConnector::from(tls) .connect(server_name, conn) .await?; - let io = hyper_util::rt::TokioIo::new(io); return Ok(Conn { inner: self.verbose.wrap(RustlsTlsConn { inner: io }), is_proxy: false, @@ -259,7 +228,7 @@ impl Connector { } socks::connect(proxy, dst, dns).await.map(|tcp| Conn { - inner: self.verbose.wrap(hyper_util::rt::TokioIo::new(tcp)), + inner: self.verbose.wrap(tcp), is_proxy: false, tls_info: false, }) @@ -293,14 +262,7 @@ impl Connector { if let hyper_tls::MaybeHttpsStream::Https(stream) = io { if !self.nodelay { - stream - .inner() - .get_ref() - .get_ref() - .get_ref() - .inner() - .inner() - .set_nodelay(false)?; + stream.get_ref().get_ref().get_ref().set_nodelay(false)?; } Ok(Conn { inner: self.verbose.wrap(NativeTlsConn { inner: stream }), @@ -331,8 +293,8 @@ impl Connector { if let hyper_rustls::MaybeHttpsStream::Https(stream) = io { if !self.nodelay { - let (io, _) = stream.inner().get_ref(); - io.inner().inner().set_nodelay(false)?; + let (io, _) = stream.get_ref(); + io.set_nodelay(false)?; } Ok(Conn { inner: self.verbose.wrap(RustlsTlsConn { inner: stream }), @@ -388,12 +350,10 @@ impl Connector { .await?; let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone()); let io = tls_connector - .connect(host.ok_or("no host in url")?, TokioIo::new(tunneled)) + .connect(host.ok_or("no host in url")?, tunneled) .await?; return Ok(Conn { - inner: self.verbose.wrap(NativeTlsConn { - inner: TokioIo::new(io), - }), + inner: self.verbose.wrap(NativeTlsConn { inner: io }), is_proxy: false, tls_info: false, }); @@ -406,7 +366,7 @@ impl Connector { tls_proxy, } => { if dst.scheme() == Some(&Scheme::HTTPS) { - use rustls_pki_types::ServerName; + use rustls::ServerName; use std::convert::TryFrom; use tokio_rustls::TlsConnector as RustlsConnector; @@ -417,18 +377,16 @@ impl Connector { let tls = tls.clone(); let conn = http.call(proxy_dst).await?; log::trace!("tunneling HTTPS over proxy"); - let maybe_server_name = ServerName::try_from(host.as_str().to_owned()) - .map_err(|_| "Invalid Server Name"); + let maybe_server_name = + ServerName::try_from(host.as_str()).map_err(|_| "Invalid Server Name"); let tunneled = tunnel(conn, host, port, self.user_agent.clone(), auth).await?; let server_name = maybe_server_name?; let io = RustlsConnector::from(tls) - .connect(server_name, TokioIo::new(tunneled)) + .connect(server_name, tunneled) .await?; return Ok(Conn { - inner: self.verbose.wrap(RustlsTlsConn { - inner: TokioIo::new(io), - }), + inner: self.verbose.wrap(RustlsTlsConn { inner: io }), is_proxy: false, tls_info: false, }); @@ -518,15 +476,18 @@ impl TlsInfoFactory for tokio::net::TcpStream { } } -#[cfg(feature = "__tls")] -impl TlsInfoFactory for TokioIo { +#[cfg(feature = "default-tls")] +impl TlsInfoFactory for hyper_tls::MaybeHttpsStream { fn tls_info(&self) -> Option { - self.inner().tls_info() + match self { + hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(), + hyper_tls::MaybeHttpsStream::Http(_) => None, + } } } #[cfg(feature = "default-tls")] -impl TlsInfoFactory for tokio_native_tls::TlsStream>> { +impl TlsInfoFactory for hyper_tls::TlsStream> { fn tls_info(&self) -> Option { let peer_certificate = self .get_ref() @@ -539,11 +500,7 @@ impl TlsInfoFactory for tokio_native_tls::TlsStream>>, - > -{ +impl TlsInfoFactory for tokio_native_tls::TlsStream { fn tls_info(&self) -> Option { let peer_certificate = self .get_ref() @@ -555,35 +512,32 @@ impl TlsInfoFactory } } -#[cfg(feature = "default-tls")] -impl TlsInfoFactory for hyper_tls::MaybeHttpsStream> { +#[cfg(feature = "__rustls")] +impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream { fn tls_info(&self) -> Option { match self { - hyper_tls::MaybeHttpsStream::Https(tls) => tls.tls_info(), - hyper_tls::MaybeHttpsStream::Http(_) => None, + hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(), + hyper_rustls::MaybeHttpsStream::Http(_) => None, } } } #[cfg(feature = "__rustls")] -impl TlsInfoFactory for tokio_rustls::client::TlsStream>> { +impl TlsInfoFactory for tokio_rustls::TlsStream { fn tls_info(&self) -> Option { let peer_certificate = self .get_ref() .1 .peer_certificates() .and_then(|certs| certs.first()) - .map(|c| c.first()) - .and_then(|c| c.map(|cc| vec![*cc])); + .map(|c| c.0.clone()); Some(crate::tls::TlsInfo { peer_certificate }) } } #[cfg(feature = "__rustls")] impl TlsInfoFactory - for tokio_rustls::client::TlsStream< - TokioIo>>, - > + for tokio_rustls::client::TlsStream> { fn tls_info(&self) -> Option { let peer_certificate = self @@ -591,28 +545,30 @@ impl TlsInfoFactory .1 .peer_certificates() .and_then(|certs| certs.first()) - .map(|c| c.first()) - .and_then(|c| c.map(|cc| vec![*cc])); + .map(|c| c.0.clone()); Some(crate::tls::TlsInfo { peer_certificate }) } } #[cfg(feature = "__rustls")] -impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream> { +impl TlsInfoFactory for tokio_rustls::client::TlsStream { fn tls_info(&self) -> Option { - match self { - hyper_rustls::MaybeHttpsStream::Https(tls) => tls.tls_info(), - hyper_rustls::MaybeHttpsStream::Http(_) => None, - } + let peer_certificate = self + .get_ref() + .1 + .peer_certificates() + .and_then(|certs| certs.first()) + .map(|c| c.0.clone()); + Some(crate::tls::TlsInfo { peer_certificate }) } } pub(crate) trait AsyncConn: - Read + Write + Connection + Send + Sync + Unpin + 'static + AsyncRead + AsyncWrite + Connection + Send + Sync + Unpin + 'static { } -impl AsyncConn for T {} +impl AsyncConn for T {} #[cfg(feature = "__tls")] trait AsyncConnWithInfo: AsyncConn + TlsInfoFactory {} @@ -658,25 +614,25 @@ impl Connection for Conn { } } -impl Read for Conn { +impl AsyncRead for Conn { fn poll_read( self: Pin<&mut Self>, cx: &mut Context, - buf: ReadBufCursor<'_>, + buf: &mut ReadBuf<'_>, ) -> Poll> { let this = self.project(); - Read::poll_read(this.inner, cx, buf) + AsyncRead::poll_read(this.inner, cx, buf) } } -impl Write for Conn { +impl AsyncWrite for Conn { fn poll_write( self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> Poll> { let this = self.project(); - Write::poll_write(this.inner, cx, buf) + AsyncWrite::poll_write(this.inner, cx, buf) } fn poll_write_vectored( @@ -685,7 +641,7 @@ impl Write for Conn { bufs: &[IoSlice<'_>], ) -> Poll> { let this = self.project(); - Write::poll_write_vectored(this.inner, cx, bufs) + AsyncWrite::poll_write_vectored(this.inner, cx, bufs) } fn is_write_vectored(&self) -> bool { @@ -694,12 +650,12 @@ impl Write for Conn { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); - Write::poll_flush(this.inner, cx) + AsyncWrite::poll_flush(this.inner, cx) } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); - Write::poll_shutdown(this.inner, cx) + AsyncWrite::poll_shutdown(this.inner, cx) } } @@ -714,9 +670,8 @@ async fn tunnel( auth: Option, ) -> Result where - T: Read + Write + Unpin, + T: AsyncRead + AsyncWrite + Unpin, { - use hyper_util::rt::TokioIo; use tokio::io::{AsyncReadExt, AsyncWriteExt}; let mut buf = format!( @@ -745,15 +700,13 @@ where // headers end buf.extend_from_slice(b"\r\n"); - let mut tokio_conn = TokioIo::new(&mut conn); - - tokio_conn.write_all(&buf).await?; + conn.write_all(&buf).await?; let mut buf = [0; 8192]; let mut pos = 0; loop { - let n = tokio_conn.read(&mut buf[pos..]).await?; + let n = conn.read(&mut buf[pos..]).await?; if n == 0 { return Err(tunnel_eof()); @@ -785,69 +738,62 @@ fn tunnel_eof() -> BoxError { #[cfg(feature = "default-tls")] mod native_tls_conn { use super::TlsInfoFactory; - use hyper::rt::{Read, ReadBufCursor, Write}; - use hyper_tls::MaybeHttpsStream; - use hyper_util::client::legacy::connect::{Connected, Connection}; - use hyper_util::rt::TokioIo; + use hyper::client::connect::{Connected, Connection}; use pin_project_lite::pin_project; use std::{ io::{self, IoSlice}, pin::Pin, task::{Context, Poll}, }; - use tokio::io::{AsyncRead, AsyncWrite}; - use tokio::net::TcpStream; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio_native_tls::TlsStream; pin_project! { pub(super) struct NativeTlsConn { - #[pin] pub(super) inner: TokioIo>, + #[pin] pub(super) inner: TlsStream, } } - impl Connection for NativeTlsConn>> { + impl Connection for NativeTlsConn { + #[cfg(feature = "native-tls-alpn")] fn connected(&self) -> Connected { - self.inner - .inner() - .get_ref() - .get_ref() - .get_ref() - .inner() - .connected() + match self.inner.get_ref().negotiated_alpn().ok() { + Some(Some(alpn_protocol)) if alpn_protocol == b"h2" => self + .inner + .get_ref() + .get_ref() + .get_ref() + .connected() + .negotiated_h2(), + _ => self.inner.get_ref().get_ref().get_ref().connected(), + } } - } - impl Connection for NativeTlsConn>>> { + #[cfg(not(feature = "native-tls-alpn"))] fn connected(&self) -> Connected { - self.inner - .inner() - .get_ref() - .get_ref() - .get_ref() - .inner() - .connected() + self.inner.get_ref().get_ref().get_ref().connected() } } - impl Read for NativeTlsConn { + impl AsyncRead for NativeTlsConn { fn poll_read( self: Pin<&mut Self>, cx: &mut Context, - buf: ReadBufCursor<'_>, + buf: &mut ReadBuf<'_>, ) -> Poll> { let this = self.project(); - Read::poll_read(this.inner, cx, buf) + AsyncRead::poll_read(this.inner, cx, buf) } } - impl Write for NativeTlsConn { + impl AsyncWrite for NativeTlsConn { fn poll_write( self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> Poll> { let this = self.project(); - Write::poll_write(this.inner, cx, buf) + AsyncWrite::poll_write(this.inner, cx, buf) } fn poll_write_vectored( @@ -856,7 +802,7 @@ mod native_tls_conn { bufs: &[IoSlice<'_>], ) -> Poll> { let this = self.project(); - Write::poll_write_vectored(this.inner, cx, bufs) + AsyncWrite::poll_write_vectored(this.inner, cx, bufs) } fn is_write_vectored(&self) -> bool { @@ -868,7 +814,7 @@ mod native_tls_conn { cx: &mut Context, ) -> Poll> { let this = self.project(); - Write::poll_flush(this.inner, cx) + AsyncWrite::poll_flush(this.inner, cx) } fn poll_shutdown( @@ -876,14 +822,17 @@ mod native_tls_conn { cx: &mut Context, ) -> Poll> { let this = self.project(); - Write::poll_shutdown(this.inner, cx) + AsyncWrite::poll_shutdown(this.inner, cx) } } - impl TlsInfoFactory for NativeTlsConn - where - TokioIo>: TlsInfoFactory, - { + impl TlsInfoFactory for NativeTlsConn { + fn tls_info(&self) -> Option { + self.inner.tls_info() + } + } + + impl TlsInfoFactory for NativeTlsConn> { fn tls_info(&self) -> Option { self.inner.tls_info() } @@ -893,76 +842,51 @@ mod native_tls_conn { #[cfg(feature = "__rustls")] mod rustls_tls_conn { use super::TlsInfoFactory; - use hyper::rt::{Read, ReadBufCursor, Write}; - use hyper_rustls::MaybeHttpsStream; - use hyper_util::client::legacy::connect::{Connected, Connection}; - use hyper_util::rt::TokioIo; + use hyper::client::connect::{Connected, Connection}; use pin_project_lite::pin_project; use std::{ io::{self, IoSlice}, pin::Pin, task::{Context, Poll}, }; - use tokio::io::{AsyncRead, AsyncWrite}; - use tokio::net::TcpStream; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio_rustls::client::TlsStream; pin_project! { pub(super) struct RustlsTlsConn { - #[pin] pub(super) inner: TokioIo>, + #[pin] pub(super) inner: TlsStream, } } - impl Connection for RustlsTlsConn>> { + impl Connection for RustlsTlsConn { fn connected(&self) -> Connected { - if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") { - self.inner - .inner() - .get_ref() - .0 - .inner() - .connected() - .negotiated_h2() - } else { - self.inner.inner().get_ref().0.inner().connected() - } - } - } - impl Connection for RustlsTlsConn>>> { - fn connected(&self) -> Connected { - if self.inner.inner().get_ref().1.alpn_protocol() == Some(b"h2") { - self.inner - .inner() - .get_ref() - .0 - .inner() - .connected() - .negotiated_h2() + if self.inner.get_ref().1.alpn_protocol() == Some(b"h2") { + self.inner.get_ref().0.connected().negotiated_h2() } else { - self.inner.inner().get_ref().0.inner().connected() + self.inner.get_ref().0.connected() } } } - impl Read for RustlsTlsConn { + impl AsyncRead for RustlsTlsConn { fn poll_read( self: Pin<&mut Self>, cx: &mut Context, - buf: ReadBufCursor<'_>, + buf: &mut ReadBuf<'_>, ) -> Poll> { let this = self.project(); - Read::poll_read(this.inner, cx, buf) + AsyncRead::poll_read(this.inner, cx, buf) } } - impl Write for RustlsTlsConn { + impl AsyncWrite for RustlsTlsConn { fn poll_write( self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> Poll> { let this = self.project(); - Write::poll_write(this.inner, cx, buf) + AsyncWrite::poll_write(this.inner, cx, buf) } fn poll_write_vectored( @@ -971,7 +895,7 @@ mod rustls_tls_conn { bufs: &[IoSlice<'_>], ) -> Poll> { let this = self.project(); - Write::poll_write_vectored(this.inner, cx, bufs) + AsyncWrite::poll_write_vectored(this.inner, cx, bufs) } fn is_write_vectored(&self) -> bool { @@ -983,7 +907,7 @@ mod rustls_tls_conn { cx: &mut Context, ) -> Poll> { let this = self.project(); - Write::poll_flush(this.inner, cx) + AsyncWrite::poll_flush(this.inner, cx) } fn poll_shutdown( @@ -991,13 +915,17 @@ mod rustls_tls_conn { cx: &mut Context, ) -> Poll> { let this = self.project(); - Write::poll_shutdown(this.inner, cx) + AsyncWrite::poll_shutdown(this.inner, cx) } } - impl TlsInfoFactory for RustlsTlsConn - where - TokioIo>: TlsInfoFactory, - { + + impl TlsInfoFactory for RustlsTlsConn { + fn tls_info(&self) -> Option { + self.inner.tls_info() + } + } + + impl TlsInfoFactory for RustlsTlsConn> { fn tls_info(&self) -> Option { self.inner.tls_info() } @@ -1070,13 +998,13 @@ mod socks { } mod verbose { - use hyper::rt::{Read, ReadBufCursor, Write}; - use hyper_util::client::legacy::connect::{Connected, Connection}; + use hyper::client::connect::{Connected, Connection}; use std::cmp::min; use std::fmt; use std::io::{self, IoSlice}; use std::pin::Pin; use std::task::{Context, Poll}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub(super) const OFF: Wrapper = Wrapper(false); @@ -1102,24 +1030,21 @@ mod verbose { inner: T, } - impl Connection for Verbose { + impl Connection for Verbose { fn connected(&self) -> Connected { self.inner.connected() } } - impl Read for Verbose { + impl AsyncRead for Verbose { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context, - buf: ReadBufCursor<'_>, + buf: &mut ReadBuf<'_>, ) -> Poll> { match Pin::new(&mut self.inner).poll_read(cx, buf) { Poll::Ready(Ok(())) => { - /* log::trace!("{:08x} read: {:?}", self.id, Escape(buf.filled())); - */ - log::trace!("TODO: verbose poll_read"); Poll::Ready(Ok(())) } Poll::Ready(Err(e)) => Poll::Ready(Err(e)), @@ -1128,7 +1053,7 @@ mod verbose { } } - impl Write for Verbose { + impl AsyncWrite for Verbose { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context, @@ -1244,7 +1169,6 @@ mod verbose { mod tests { use super::tunnel; use crate::proxy; - use hyper_util::rt::TokioIo; use std::io::{Read, Write}; use std::net::TcpListener; use std::thread; @@ -1307,7 +1231,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?); + let tcp = TcpStream::connect(&addr).await?; let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1325,7 +1249,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?); + let tcp = TcpStream::connect(&addr).await?; let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1343,7 +1267,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?); + let tcp = TcpStream::connect(&addr).await?; let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1367,7 +1291,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?); + let tcp = TcpStream::connect(&addr).await?; let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1389,7 +1313,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?); + let tcp = TcpStream::connect(&addr).await?; let host = addr.ip().to_string(); let port = addr.port(); tunnel( diff --git a/src/dns/gai.rs b/src/dns/gai.rs index 00c981f0a..f32f3b0e0 100644 --- a/src/dns/gai.rs +++ b/src/dns/gai.rs @@ -1,6 +1,6 @@ use futures_util::future::FutureExt; -use hyper_util::client::legacy::connect::dns::{GaiResolver as HyperGaiResolver, Name}; -use tower_service::Service; +use hyper::client::connect::dns::{GaiResolver as HyperGaiResolver, Name}; +use hyper::service::Service; use crate::dns::{Addrs, Resolve, Resolving}; use crate::error::BoxError; diff --git a/src/dns/resolve.rs b/src/dns/resolve.rs index 4c36f30ec..3686765a0 100644 --- a/src/dns/resolve.rs +++ b/src/dns/resolve.rs @@ -1,5 +1,5 @@ -use hyper_util::client::legacy::connect::dns::Name; -use tower_service::Service; +use hyper::client::connect::dns::Name; +use hyper::service::Service; use std::collections::HashMap; use std::future::Future; diff --git a/src/dns/trust_dns.rs b/src/dns/trust_dns.rs index fc93f08b1..a25326085 100644 --- a/src/dns/trust_dns.rs +++ b/src/dns/trust_dns.rs @@ -1,6 +1,6 @@ //! DNS resolution via the [trust_dns_resolver](https://github.com/bluejekyll/trust-dns) crate -use hyper_util::client::legacy::connect::dns::Name; +use hyper::client::connect::dns::Name; use once_cell::sync::OnceCell; use trust_dns_resolver::{lookup_ip::LookupIpIntoIter, system_conf, TokioAsyncResolver}; diff --git a/src/error.rs b/src/error.rs index c558ebbac..9ffb6ed17 100644 --- a/src/error.rs +++ b/src/error.rs @@ -127,7 +127,7 @@ impl Error { let mut source = self.source(); while let Some(err) = source { - if let Some(hyper_err) = err.downcast_ref::() { + if let Some(hyper_err) = err.downcast_ref::() { if hyper_err.is_connect() { return true; } @@ -291,8 +291,9 @@ pub(crate) fn upgrade>(e: E) -> Error { // io::Error helpers -pub(crate) fn into_io(e: BoxError) -> io::Error { - io::Error::new(io::ErrorKind::Other, e) +#[allow(unused)] +pub(crate) fn into_io(e: Error) -> io::Error { + e.into_io() } #[allow(unused)] diff --git a/src/lib.rs b/src/lib.rs index e7f5545b6..f3da39a94 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -173,7 +173,6 @@ //! The following are a list of [Cargo features][cargo-features] that can be //! enabled or disabled: //! -//! - **http2** *(enabled by default)*: Enables HTTP/2 support. //! - **default-tls** *(enabled by default)*: Provides TLS support to connect //! over HTTPS. //! - **native-tls**: Enables TLS functionality provided by `native-tls`. diff --git a/src/tls.rs b/src/tls.rs index 27101d733..3f53d875f 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -46,11 +46,9 @@ #[cfg(feature = "__rustls")] use rustls::{ - client::danger::HandshakeSignatureValid, client::danger::ServerCertVerified, - client::danger::ServerCertVerifier, DigitallySignedStruct, Error as TLSError, SignatureScheme, + client::HandshakeSignatureValid, client::ServerCertVerified, client::ServerCertVerifier, + DigitallySignedStruct, Error as TLSError, ServerName, }; -#[cfg(feature = "__rustls")] -use rustls_pki_types::{ServerName, UnixTime}; use std::{ fmt, io::{BufRead, BufReader}, @@ -79,6 +77,7 @@ pub struct Identity { inner: ClientCert, } +#[derive(Clone)] enum ClientCert { #[cfg(feature = "native-tls")] Pkcs12(native_tls_crate::Identity), @@ -86,32 +85,11 @@ enum ClientCert { Pkcs8(native_tls_crate::Identity), #[cfg(feature = "__rustls")] Pem { - key: rustls_pki_types::PrivateKeyDer<'static>, - certs: Vec>, + key: rustls::PrivateKey, + certs: Vec, }, } -impl Clone for ClientCert { - fn clone(&self) -> Self { - match self { - #[cfg(feature = "native-tls")] - Self::Pkcs8(i) => Self::Pkcs8(i.clone()), - #[cfg(feature = "native-tls")] - Self::Pkcs12(i) => Self::Pkcs12(i.clone()), - #[cfg(feature = "__rustls")] - ClientCert::Pem { key, certs } => ClientCert::Pem { - key: key.clone_key(), - certs: certs.clone(), - }, - #[cfg_attr( - any(feature = "native-tls", feature = "__rustls"), - allow(unreachable_patterns) - )] - _ => unreachable!(), - } - } -} - impl Certificate { /// Create a `Certificate` from a binary DER encoded certificate /// @@ -203,14 +181,14 @@ impl Certificate { match self.original { Cert::Der(buf) => root_cert_store - .add(buf.into()) + .add(&rustls::Certificate(buf)) .map_err(crate::error::builder)?, Cert::Pem(buf) => { let mut reader = Cursor::new(buf); let certs = Self::read_pem_certs(&mut reader)?; for c in certs { root_cert_store - .add(c.into()) + .add(&rustls::Certificate(c)) .map_err(crate::error::builder)?; } } @@ -330,8 +308,8 @@ impl Identity { let (key, certs) = { let mut pem = Cursor::new(buf); - let mut sk = Vec::::new(); - let mut certs = Vec::::new(); + let mut sk = Vec::::new(); + let mut certs = Vec::::new(); for item in std::iter::from_fn(|| rustls_pemfile::read_one(&mut pem).transpose()) { match item.map_err(|_| { @@ -339,16 +317,12 @@ impl Identity { "Invalid identity PEM file", ))) })? { - rustls_pemfile::Item::X509Certificate(cert) => certs.push(cert.into()), - rustls_pemfile::Item::PKCS8Key(key) => { - sk.push(rustls_pki_types::PrivateKeyDer::Pkcs8(key.into())) - } - rustls_pemfile::Item::RSAKey(key) => { - sk.push(rustls_pki_types::PrivateKeyDer::Pkcs1(key.into())) - } - rustls_pemfile::Item::ECKey(key) => { - sk.push(rustls_pki_types::PrivateKeyDer::Sec1(key.into())) + rustls_pemfile::Item::X509Certificate(cert) => { + certs.push(rustls::Certificate(cert)) } + rustls_pemfile::Item::PKCS8Key(key) => sk.push(rustls::PrivateKey(key)), + rustls_pemfile::Item::RSAKey(key) => sk.push(rustls::PrivateKey(key)), + rustls_pemfile::Item::ECKey(key) => sk.push(rustls::PrivateKey(key)), _ => { return Err(crate::error::builder(TLSError::General(String::from( "No valid certificate was found", @@ -391,8 +365,7 @@ impl Identity { self, config_builder: rustls::ConfigBuilder< rustls::ClientConfig, - // Not sure here - rustls::client::WantsClientCert, + rustls::client::WantsTransparencyPolicyOrClientCert, >, ) -> crate::Result { match self.inner { @@ -518,18 +491,18 @@ impl Default for TlsBackend { } #[cfg(feature = "__rustls")] -#[derive(Debug)] pub(crate) struct NoVerifier; #[cfg(feature = "__rustls")] impl ServerCertVerifier for NoVerifier { fn verify_server_cert( &self, - _end_entity: &rustls_pki_types::CertificateDer, - _intermediates: &[rustls_pki_types::CertificateDer], + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], _server_name: &ServerName, + _scts: &mut dyn Iterator, _ocsp_response: &[u8], - _now: UnixTime, + _now: std::time::SystemTime, ) -> Result { Ok(ServerCertVerified::assertion()) } @@ -537,7 +510,7 @@ impl ServerCertVerifier for NoVerifier { fn verify_tls12_signature( &self, _message: &[u8], - _cert: &rustls_pki_types::CertificateDer, + _cert: &rustls::Certificate, _dss: &DigitallySignedStruct, ) -> Result { Ok(HandshakeSignatureValid::assertion()) @@ -546,29 +519,11 @@ impl ServerCertVerifier for NoVerifier { fn verify_tls13_signature( &self, _message: &[u8], - _cert: &rustls_pki_types::CertificateDer, + _cert: &rustls::Certificate, _dss: &DigitallySignedStruct, ) -> Result { Ok(HandshakeSignatureValid::assertion()) } - - fn supported_verify_schemes(&self) -> Vec { - vec![ - SignatureScheme::RSA_PKCS1_SHA1, - SignatureScheme::ECDSA_SHA1_Legacy, - SignatureScheme::RSA_PKCS1_SHA256, - SignatureScheme::ECDSA_NISTP256_SHA256, - SignatureScheme::RSA_PKCS1_SHA384, - SignatureScheme::ECDSA_NISTP384_SHA384, - SignatureScheme::RSA_PKCS1_SHA512, - SignatureScheme::ECDSA_NISTP521_SHA512, - SignatureScheme::RSA_PSS_SHA256, - SignatureScheme::RSA_PSS_SHA384, - SignatureScheme::RSA_PSS_SHA512, - SignatureScheme::ED25519, - SignatureScheme::ED448, - ] - } } /// Hyper extension carrying extra TLS layer information. diff --git a/tests/blocking.rs b/tests/blocking.rs index 314b3e504..fa6c8d01c 100644 --- a/tests/blocking.rs +++ b/tests/blocking.rs @@ -1,9 +1,7 @@ mod support; -#[cfg(feature = "json")] use http::header::CONTENT_TYPE; -use http_body_util::BodyExt; -#[cfg(feature = "json")] +use http::HeaderValue; use std::collections::HashMap; use support::server; @@ -90,7 +88,7 @@ fn test_post() { assert_eq!(req.method(), "POST"); assert_eq!(req.headers()["content-length"], "5"); - let data = req.into_body().collect().await.unwrap().to_bytes(); + let data = hyper::body::to_bytes(req.into_body()).await.unwrap(); assert_eq!(&*data, b"Hello"); http::Response::default() @@ -117,7 +115,7 @@ fn test_post_form() { "application/x-www-form-urlencoded" ); - let data = req.into_body().collect().await.unwrap().to_bytes(); + let data = hyper::body::to_bytes(req.into_body()).await.unwrap(); assert_eq!(&*data, b"hello=world&sean=monstar"); http::Response::default() @@ -338,8 +336,6 @@ fn test_body_from_bytes() { #[test] #[cfg(feature = "json")] fn blocking_add_json_default_content_type_if_not_set_manually() { - use http::header::HeaderValue; - let mut map = HashMap::new(); map.insert("body", "json"); let content_type = HeaderValue::from_static("application/vnd.api+json"); diff --git a/tests/brotli.rs b/tests/brotli.rs index 5c2b01849..dc7d6d767 100644 --- a/tests/brotli.rs +++ b/tests/brotli.rs @@ -19,6 +19,7 @@ async fn test_brotli_empty_body() { http::Response::builder() .header("content-encoding", "br") + .header("content-length", 100) .body(Default::default()) .unwrap() }); @@ -124,7 +125,7 @@ async fn brotli_case(response_size: usize, chunk_size: usize) { Some((chunk, (brotlied, pos + 1))) }); - let body = reqwest::Body::wrap_stream(stream.map(Ok::<_, std::convert::Infallible>)); + let body = hyper::Body::wrap_stream(stream.map(Ok::<_, std::convert::Infallible>)); http::Response::builder() .header("content-encoding", "br") diff --git a/tests/client.rs b/tests/client.rs index 8a17ca3da..51fc28254 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,6 +1,8 @@ #![cfg(not(target_arch = "wasm32"))] mod support; +use futures_util::stream::StreamExt; +use support::delay_server; use support::server; #[cfg(feature = "json")] @@ -130,23 +132,19 @@ async fn response_json() { #[tokio::test] async fn body_pipe_response() { - use http_body_util::BodyExt; let _ = env_logger::try_init(); - let server = server::http(move |req| async move { + let server = server::http(move |mut req| async move { if req.uri() == "/get" { http::Response::new("pipe me".into()) } else { assert_eq!(req.uri(), "/pipe"); assert_eq!(req.headers()["transfer-encoding"], "chunked"); - let full: Vec = req - .into_body() - .collect() - .await - .expect("must succeed") - .to_bytes() - .to_vec(); + let mut full: Vec = Vec::new(); + while let Some(item) = req.body_mut().next().await { + full.extend(&*item.unwrap()); + } assert_eq!(full, b"pipe me"); @@ -323,6 +321,7 @@ fn use_preconfigured_rustls_default() { let root_cert_store = rustls::RootCertStore::empty(); let tls = rustls::ClientConfig::builder() + .with_safe_defaults() .with_root_certificates(root_cert_store) .with_no_client_auth(); @@ -441,7 +440,6 @@ async fn test_tls_info() { // fail, because the only thread would block until `panic_rx` receives a // notification while the client needs to be driven to get the graceful shutdown // done. -#[cfg(feature = "http2")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn highly_concurrent_requests_to_http2_server_with_low_max_concurrent_streams() { let client = reqwest::Client::builder() @@ -454,9 +452,7 @@ async fn highly_concurrent_requests_to_http2_server_with_low_max_concurrent_stre assert_eq!(req.version(), http::Version::HTTP_2); http::Response::default() }, - |builder| { - builder.http2().max_concurrent_streams(1); - }, + |builder| builder.http2_only(true).http2_max_concurrent_streams(1), ); let url = format!("http://{}", server.addr()); @@ -472,11 +468,8 @@ async fn highly_concurrent_requests_to_http2_server_with_low_max_concurrent_stre futures_util::future::join_all(futs).await; } -#[cfg(feature = "http2")] #[tokio::test] async fn highly_concurrent_requests_to_slow_http2_server_with_low_max_concurrent_streams() { - use support::delay_server; - let client = reqwest::Client::builder() .http2_prior_knowledge() .build() @@ -487,8 +480,9 @@ async fn highly_concurrent_requests_to_slow_http2_server_with_low_max_concurrent assert_eq!(req.version(), http::Version::HTTP_2); http::Response::default() }, - |http| { - http.http2().max_concurrent_streams(1); + |mut http| { + http.http2_only(true).http2_max_concurrent_streams(1); + http }, std::time::Duration::from_secs(2), ) diff --git a/tests/deflate.rs b/tests/deflate.rs index ec27ba180..3b8d9e021 100644 --- a/tests/deflate.rs +++ b/tests/deflate.rs @@ -19,6 +19,7 @@ async fn test_deflate_empty_body() { http::Response::builder() .header("content-encoding", "deflate") + .header("content-length", 100) .body(Default::default()) .unwrap() }); @@ -127,7 +128,7 @@ async fn deflate_case(response_size: usize, chunk_size: usize) { Some((chunk, (deflated, pos + 1))) }); - let body = reqwest::Body::wrap_stream(stream.map(Ok::<_, std::convert::Infallible>)); + let body = hyper::Body::wrap_stream(stream.map(Ok::<_, std::convert::Infallible>)); http::Response::builder() .header("content-encoding", "deflate") diff --git a/tests/gzip.rs b/tests/gzip.rs index 57189e0ac..66e1b7f25 100644 --- a/tests/gzip.rs +++ b/tests/gzip.rs @@ -20,6 +20,7 @@ async fn test_gzip_empty_body() { http::Response::builder() .header("content-encoding", "gzip") + .header("content-length", 100) .body(Default::default()) .unwrap() }); @@ -128,7 +129,7 @@ async fn gzip_case(response_size: usize, chunk_size: usize) { Some((chunk, (gzipped, pos + 1))) }); - let body = reqwest::Body::wrap_stream(stream.map(Ok::<_, std::convert::Infallible>)); + let body = hyper::Body::wrap_stream(stream.map(Ok::<_, std::convert::Infallible>)); http::Response::builder() .header("content-encoding", "gzip") diff --git a/tests/multipart.rs b/tests/multipart.rs index 425c830a7..59ada280d 100644 --- a/tests/multipart.rs +++ b/tests/multipart.rs @@ -1,6 +1,6 @@ #![cfg(not(target_arch = "wasm32"))] mod support; -use http_body_util::BodyExt; +use futures_util::stream::StreamExt; use support::server; #[tokio::test] @@ -33,8 +33,8 @@ async fn text_part() { ); let mut full: Vec = Vec::new(); - while let Some(item) = req.body_mut().frame().await { - full.extend(&*item.unwrap().into_data().unwrap()); + while let Some(item) = req.body_mut().next().await { + full.extend(&*item.unwrap()); } assert_eq!(full, expected_body.as_bytes()); @@ -97,7 +97,10 @@ async fn stream_part() { assert_eq!(req.headers()["content-type"], ct); assert_eq!(req.headers()["transfer-encoding"], "chunked"); - let full = req.collect().await.unwrap().to_bytes(); + let mut full: Vec = Vec::new(); + while let Some(item) = req.body_mut().next().await { + full.extend(&*item.unwrap()); + } assert_eq!(full, expected_body.as_bytes()); @@ -156,7 +159,10 @@ fn blocking_file_part() { expected_body.len().to_string() ); - let full = req.collect().await.unwrap().to_bytes(); + let mut full: Vec = Vec::new(); + while let Some(item) = req.body_mut().next().await { + full.extend(&*item.unwrap()); + } assert_eq!(full, expected_body.as_bytes()); diff --git a/tests/redirect.rs b/tests/redirect.rs index c98c799ef..9df6265a4 100644 --- a/tests/redirect.rs +++ b/tests/redirect.rs @@ -1,7 +1,7 @@ #![cfg(not(target_arch = "wasm32"))] mod support; -use http_body_util::BodyExt; -use reqwest::Body; +use futures_util::stream::StreamExt; +use hyper::Body; use support::server; #[tokio::test] @@ -87,14 +87,7 @@ async fn test_redirect_307_and_308_tries_to_post_again() { assert_eq!(req.method(), "POST"); assert_eq!(req.headers()["content-length"], "5"); - let data = req - .body_mut() - .frame() - .await - .unwrap() - .unwrap() - .into_data() - .unwrap(); + let data = req.body_mut().next().await.unwrap().unwrap(); assert_eq!(&*data, b"Hello"); if req.uri() == &*format!("/{code}") { @@ -137,14 +130,7 @@ fn test_redirect_307_does_not_try_if_reader_cannot_reset() { assert_eq!(req.uri(), &*format!("/{code}")); assert_eq!(req.headers()["transfer-encoding"], "chunked"); - let data = req - .body_mut() - .frame() - .await - .unwrap() - .unwrap() - .into_data() - .unwrap(); + let data = req.body_mut().next().await.unwrap().unwrap(); assert_eq!(&*data, b"Hello"); http::Response::builder() diff --git a/tests/support/delay_server.rs b/tests/support/delay_server.rs index f79c2a4df..08f421598 100644 --- a/tests/support/delay_server.rs +++ b/tests/support/delay_server.rs @@ -1,13 +1,14 @@ #![cfg(not(target_arch = "wasm32"))] -#![allow(unused)] use std::convert::Infallible; use std::future::Future; use std::net; +use std::sync::Arc; use std::time::Duration; use futures_util::FutureExt; use http::{Request, Response}; use hyper::service::service_fn; +use hyper::Body; use tokio::net::TcpListener; use tokio::select; use tokio::sync::oneshot; @@ -28,14 +29,12 @@ pub struct Server { server_terminated_rx: oneshot::Receiver<()>, } -type Builder = hyper_util::server::conn::auto::Builder; - impl Server { - pub async fn new(func: F1, apply_config: F2, delay: Duration) -> Self + pub async fn new(func: F1, apply_config: F2, delay: Duration) -> Self where - F1: Fn(Request) -> Fut + Clone + Send + 'static, - Fut: Future> + Send + 'static, - F2: FnOnce(&mut Builder) -> Bu + Send + 'static, + F1: Fn(Request) -> Fut + Clone + Send + 'static, + Fut: Future> + Send + 'static, + F2: FnOnce(hyper::server::conn::Http) -> hyper::server::conn::Http + Send + 'static, { let (shutdown_tx, shutdown_rx) = oneshot::channel(); let (server_terminated_tx, server_terminated_rx) = oneshot::channel(); @@ -44,12 +43,9 @@ impl Server { let addr = tcp_listener.local_addr().unwrap(); tokio::spawn(async move { - let mut builder = - hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new()); - apply_config(&mut builder); + let http = Arc::new(apply_config(hyper::server::conn::Http::new())); tokio::spawn(async move { - let builder = builder; let (connection_shutdown_tx, connection_shutdown_rx) = oneshot::channel(); let connection_shutdown_rx = connection_shutdown_rx.shared(); let mut shutdown_rx = std::pin::pin!(shutdown_rx); @@ -63,24 +59,24 @@ impl Server { } res = tcp_listener.accept() => { let (stream, _) = res.unwrap(); - let io = hyper_util::rt::TokioIo::new(stream); let handle = tokio::spawn({ let connection_shutdown_rx = connection_shutdown_rx.clone(); + let http = http.clone(); let func = func.clone(); - let svc = service_fn(move |req| { - let fut = func(req); - async move { - Ok::<_, Infallible>(fut.await) - }}); - let builder = builder.clone(); async move { - let fut = builder.serve_connection_with_upgrades(io, svc); tokio::time::sleep(delay).await; - let mut conn = std::pin::pin!(fut); + let mut conn = std::pin::pin!(http.serve_connection( + stream, + service_fn(move |req| { + let fut = func(req); + async move { + Ok::<_, Infallible>(fut.await) + }}) + )); select! { _ = conn.as_mut() => {} diff --git a/tests/support/server.rs b/tests/support/server.rs index f9c45b4d2..5193a5fbe 100644 --- a/tests/support/server.rs +++ b/tests/support/server.rs @@ -1,11 +1,12 @@ #![cfg(not(target_arch = "wasm32"))] -use std::convert::Infallible; +use std::convert::{identity, Infallible}; use std::future::Future; use std::net; use std::sync::mpsc as std_mpsc; use std::thread; use std::time::Duration; +use hyper::server::conn::AddrIncoming; use tokio::runtime; use tokio::sync::oneshot; @@ -37,19 +38,19 @@ impl Drop for Server { pub fn http(func: F) -> Server where - F: Fn(http::Request) -> Fut + Clone + Send + 'static, - Fut: Future> + Send + 'static, + F: Fn(http::Request) -> Fut + Clone + Send + 'static, + Fut: Future> + Send + 'static, { - http_with_config(func, |_builder| {}) + http_with_config(func, identity) } -type Builder = hyper_util::server::conn::auto::Builder; - -pub fn http_with_config(func: F1, apply_config: F2) -> Server +pub fn http_with_config(func: F1, apply_config: F2) -> Server where - F1: Fn(http::Request) -> Fut + Clone + Send + 'static, - Fut: Future> + Send + 'static, - F2: FnOnce(&mut Builder) -> Bu + Send + 'static, + F1: Fn(http::Request) -> Fut + Clone + Send + 'static, + Fut: Future> + Send + 'static, + F2: FnOnce(hyper::server::Builder) -> hyper::server::Builder + + Send + + 'static, { // Spawn new runtime in thread to prevent reactor execution context conflict thread::spawn(move || { @@ -57,14 +58,26 @@ where .enable_all() .build() .expect("new rt"); - let listener = rt.block_on(async move { - tokio::net::TcpListener::bind(&std::net::SocketAddr::from(([127, 0, 0, 1], 0))) - .await - .unwrap() + let srv = rt.block_on(async move { + let builder = hyper::Server::bind(&([127, 0, 0, 1], 0).into()); + + apply_config(builder).serve(hyper::service::make_service_fn(move |_| { + let func = func.clone(); + async move { + Ok::<_, Infallible>(hyper::service::service_fn(move |req| { + let fut = func(req); + async move { Ok::<_, Infallible>(fut.await) } + })) + } + })) + }); + + let addr = srv.local_addr(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let srv = srv.with_graceful_shutdown(async move { + let _ = shutdown_rx.await; }); - let addr = listener.local_addr().unwrap(); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); let (panic_tx, panic_rx) = std_mpsc::channel(); let tname = format!( "test({})-support-server", @@ -73,34 +86,11 @@ where thread::Builder::new() .name(tname) .spawn(move || { - rt.block_on(async move { - let mut builder = - hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new()); - apply_config(&mut builder); - - loop { - tokio::select! { - _ = &mut shutdown_rx => { - break; - } - accepted = listener.accept() => { - let (io, _) = accepted.expect("accepted"); - let func = func.clone(); - let svc = hyper::service::service_fn(move |req| { - let fut = func(req); - async move { Ok::<_, Infallible>(fut.await) } - }); - let builder = builder.clone(); - tokio::spawn(async move { - let _ = builder.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(io), svc).await; - }); - } - } - } - let _ = panic_tx.send(()); - }); + rt.block_on(srv).unwrap(); + let _ = panic_tx.send(()); }) .expect("thread spawn"); + Server { addr, panic_rx, diff --git a/tests/timeouts.rs b/tests/timeouts.rs index ee690933e..6f6b0d588 100644 --- a/tests/timeouts.rs +++ b/tests/timeouts.rs @@ -143,7 +143,6 @@ async fn connect_many_timeout() { assert!(err.is_connect() && err.is_timeout()); } -#[cfg(feature = "stream")] #[tokio::test] async fn response_timeout() { let _ = env_logger::try_init(); @@ -151,7 +150,7 @@ async fn response_timeout() { let server = server::http(move |_req| { async { // immediate response, but delayed body - let body = reqwest::Body::wrap_stream(futures_util::stream::once(async { + let body = hyper::Body::wrap_stream(futures_util::stream::once(async { tokio::time::sleep(Duration::from_secs(2)).await; Ok::<_, std::convert::Infallible>("Hello") })); @@ -233,7 +232,6 @@ fn timeout_blocking_request() { } #[cfg(feature = "blocking")] -#[cfg(feature = "stream")] #[test] fn blocking_request_timeout_body() { let _ = env_logger::try_init(); @@ -249,7 +247,7 @@ fn blocking_request_timeout_body() { let server = server::http(move |_req| { async { // immediate response, but delayed body - let body = reqwest::Body::wrap_stream(futures_util::stream::once(async { + let body = hyper::Body::wrap_stream(futures_util::stream::once(async { tokio::time::sleep(Duration::from_secs(1)).await; Ok::<_, std::convert::Infallible>("Hello") })); diff --git a/tests/upgrade.rs b/tests/upgrade.rs index 5ea72acc2..de5c2904d 100644 --- a/tests/upgrade.rs +++ b/tests/upgrade.rs @@ -11,7 +11,7 @@ async fn http_upgrade() { assert_eq!(req.headers()["upgrade"], "foobar"); tokio::spawn(async move { - let mut upgraded = hyper_util::rt::TokioIo::new(hyper::upgrade::on(req).await.unwrap()); + let mut upgraded = hyper::upgrade::on(req).await.unwrap(); let mut buf = vec![0; 7]; upgraded.read_exact(&mut buf).await.unwrap(); @@ -25,7 +25,7 @@ async fn http_upgrade() { .status(http::StatusCode::SWITCHING_PROTOCOLS) .header(http::header::CONNECTION, "upgrade") .header(http::header::UPGRADE, "foobar") - .body(reqwest::Body::default()) + .body(hyper::Body::empty()) .unwrap() } });