From b44898ce7ab9c5d38f564f21a7f961a34360ff66 Mon Sep 17 00:00:00 2001 From: oddgrd Date: Thu, 26 May 2022 00:07:26 +0200 Subject: [PATCH] feat(body): remove stream cargo feature remove stream cargo feature and any usage of stream, as it isn't stable and shouldn't be depended on closes issue #2855 --- Cargo.toml | 4 --- examples/send_file.rs | 11 ++---- examples/web_api.rs | 15 ++------ src/body/body.rs | 82 ------------------------------------------- src/server/accept.rs | 40 --------------------- 5 files changed, 5 insertions(+), 147 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a52d991002..c31a5525ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,6 @@ full = [ "http1", "http2", "server", - "stream", "runtime", ] @@ -90,9 +89,6 @@ http2 = ["h2"] client = [] server = [] -# `impl Stream` for things -stream = [] - # Tokio support runtime = [ "tcp", diff --git a/examples/send_file.rs b/examples/send_file.rs index 3f660abf72..8456268755 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -1,9 +1,5 @@ #![deny(warnings)] -use tokio::fs::File; - -use tokio_util::codec::{BytesCodec, FramedRead}; - use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Result, Server, StatusCode}; @@ -48,11 +44,8 @@ fn not_found() -> Response { } async fn simple_file_send(filename: &str) -> Result> { - // Serve a file by asynchronously reading it by chunks using tokio-util crate. - - if let Ok(file) = File::open(filename).await { - let stream = FramedRead::new(file, BytesCodec::new()); - let body = Body::wrap_stream(stream); + if let Ok(contents) = tokio::fs::read(filename).await { + let body = contents.into(); return Ok(Response::new(body)); } diff --git a/examples/web_api.rs b/examples/web_api.rs index 5226249b35..855ce5bc77 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -1,7 +1,6 @@ #![deny(warnings)] use bytes::Buf; -use futures_util::{stream, StreamExt}; use hyper::client::HttpConnector; use hyper::service::{make_service_fn, service_fn}; use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode}; @@ -24,18 +23,10 @@ async fn client_request_response(client: &Client) -> ResultPOST request body: {}
Response: ", - POST_DATA, - ) - .into()) - }); - let after = web_res.into_body(); - let body = Body::wrap_stream(before.chain(after)); - Ok(Response::new(body)) + let res_body = web_res.into_body(); + + Ok(Response::new(res_body)) } async fn api_post_response(req: Request) -> Result> { diff --git a/src/body/body.rs b/src/body/body.rs index 9dc1a034f9..0ba63a4b68 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -1,20 +1,14 @@ use std::borrow::Cow; -#[cfg(feature = "stream")] -use std::error::Error as StdError; use std::fmt; use bytes::Bytes; use futures_channel::mpsc; use futures_channel::oneshot; use futures_core::Stream; // for mpsc::Receiver -#[cfg(feature = "stream")] -use futures_util::TryStreamExt; use http::HeaderMap; use http_body::{Body as HttpBody, SizeHint}; use super::DecodedLength; -#[cfg(feature = "stream")] -use crate::common::sync_wrapper::SyncWrapper; use crate::common::Future; #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] use crate::common::Never; @@ -56,12 +50,6 @@ enum Kind { }, #[cfg(feature = "ffi")] Ffi(crate::ffi::UserBody), - #[cfg(feature = "stream")] - Wrapped( - SyncWrapper< - Pin>> + Send>>, - >, - ), } struct Extra { @@ -164,39 +152,6 @@ impl Body { (tx, rx) } - /// Wrap a futures `Stream` in a box inside `Body`. - /// - /// # Example - /// - /// ``` - /// # use hyper::Body; - /// let chunks: Vec> = vec![ - /// Ok("hello"), - /// Ok(" "), - /// Ok("world"), - /// ]; - /// - /// let stream = futures_util::stream::iter(chunks); - /// - /// let body = Body::wrap_stream(stream); - /// ``` - /// - /// # Optional - /// - /// This function requires enabling the `stream` feature in your - /// `Cargo.toml`. - #[cfg(feature = "stream")] - #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] - pub fn wrap_stream(stream: S) -> Body - where - S: Stream> + Send + 'static, - O: Into + 'static, - E: Into> + 'static, - { - let mapped = stream.map_ok(Into::into).map_err(Into::into); - Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) - } - fn new(kind: Kind) -> Body { Body { kind, extra: None } } @@ -329,12 +284,6 @@ impl Body { #[cfg(feature = "ffi")] Kind::Ffi(ref mut body) => body.poll_data(cx), - - #[cfg(feature = "stream")] - Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { - Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), - None => Poll::Ready(None), - }, } } @@ -405,8 +354,6 @@ impl HttpBody for Body { Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), #[cfg(feature = "ffi")] Kind::Ffi(..) => false, - #[cfg(feature = "stream")] - Kind::Wrapped(..) => false, } } @@ -426,8 +373,6 @@ impl HttpBody for Body { match self.kind { Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64), Kind::Once(None) => SizeHint::with_exact(0), - #[cfg(feature = "stream")] - Kind::Wrapped(..) => SizeHint::default(), Kind::Chan { content_length, .. } => opt_len!(content_length), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] Kind::H2 { content_length, .. } => opt_len!(content_length), @@ -457,33 +402,6 @@ impl fmt::Debug for Body { } } -/// # Optional -/// -/// This function requires enabling the `stream` feature in your -/// `Cargo.toml`. -#[cfg(feature = "stream")] -impl Stream for Body { - type Item = crate::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - HttpBody::poll_data(self, cx) - } -} - -/// # Optional -/// -/// This function requires enabling the `stream` feature in your -/// `Cargo.toml`. -#[cfg(feature = "stream")] -impl From>> + Send>> for Body { - #[inline] - fn from( - stream: Box>> + Send>, - ) -> Body { - Body::new(Kind::Wrapped(SyncWrapper::new(stream.into()))) - } -} - impl From for Body { #[inline] fn from(chunk: Bytes) -> Body { diff --git a/src/server/accept.rs b/src/server/accept.rs index 4b7a1487dd..d38dcb986f 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -6,11 +6,6 @@ //! connections. //! - Utilities like `poll_fn` to ease creating a custom `Accept`. -#[cfg(feature = "stream")] -use futures_core::Stream; -#[cfg(feature = "stream")] -use pin_project_lite::pin_project; - use crate::common::{ task::{self, Poll}, Pin, @@ -74,38 +69,3 @@ where PollFn(func) } - -/// Adapt a `Stream` of incoming connections into an `Accept`. -/// -/// # Optional -/// -/// This function requires enabling the `stream` feature in your -/// `Cargo.toml`. -#[cfg(feature = "stream")] -pub fn from_stream(stream: S) -> impl Accept -where - S: Stream>, -{ - pin_project! { - struct FromStream { - #[pin] - stream: S, - } - } - - impl Accept for FromStream - where - S: Stream>, - { - type Conn = IO; - type Error = E; - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>> { - self.project().stream.poll_next(cx) - } - } - - FromStream { stream } -}