Skip to content

Commit

Permalink
feat(lib): remove stream cargo feature (#2896)
Browse files Browse the repository at this point in the history
Closes #2855
  • Loading branch information
oddgrd authored Jun 23, 2022
1 parent a563404 commit ce72f73
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 410 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:

- name: Test
# Can't enable tcp feature since Miri does not support the tokio runtime
run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --features http1,http2,client,server,stream,nightly
run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --features http1,http2,client,server,nightly

features:
name: features
Expand Down
7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ futures-core = { version = "0.3", default-features = false }
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
http-body = { git = "https://github.com/hyperium/http-body", branch = "master" }
http-body-util = { git = "https://github.com/hyperium/http-body", branch = "master" }
httpdate = "1.0"
httparse = "1.6"
h2 = { version = "0.3.9", optional = true }
Expand Down Expand Up @@ -80,7 +81,6 @@ full = [
"http1",
"http2",
"server",
"stream",
"runtime",
]

Expand All @@ -92,9 +92,6 @@ http2 = ["h2"]
client = []
server = []

# `impl Stream` for things
stream = []

# Tokio support
runtime = [
"tcp",
Expand Down
5 changes: 3 additions & 2 deletions benches/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ extern crate test;
use bytes::Buf;
use futures_util::stream;
use futures_util::StreamExt;
use hyper::body::Body;
use http_body_util::StreamBody;

macro_rules! bench_stream {
($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{
Expand All @@ -20,9 +20,10 @@ macro_rules! bench_stream {

$bencher.iter(|| {
rt.block_on(async {
let $body_pat = Body::wrap_stream(
let $body_pat = StreamBody::new(
stream::iter(__s.iter()).map(|&s| Ok::<_, std::convert::Infallible>(s)),
);

$block;
});
});
Expand Down
7 changes: 4 additions & 3 deletions benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use std::sync::mpsc;
use std::time::Duration;

use futures_util::{stream, StreamExt};
use http_body_util::StreamBody;
use tokio::sync::oneshot;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Response, Server};
use hyper::{Response, Server};

macro_rules! bench_server {
($b:ident, $header:expr, $body:expr) => {{
Expand Down Expand Up @@ -101,7 +102,7 @@ fn throughput_fixedsize_large_payload(b: &mut test::Bencher) {
fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("content-length", "1000000"), || {
static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
StreamBody::new(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
})
}

Expand All @@ -123,7 +124,7 @@ fn throughput_chunked_large_payload(b: &mut test::Bencher) {
fn throughput_chunked_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("transfer-encoding", "chunked"), || {
static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
StreamBody::new(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
})
}

Expand Down
20 changes: 10 additions & 10 deletions examples/echo.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![deny(warnings)]

use futures_util::TryStreamExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};

Expand All @@ -16,16 +15,17 @@ async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
// Simply echo the body back to the client.
(&Method::POST, "/echo") => Ok(Response::new(req.into_body())),

// TODO: Fix this, broken in PR #2896
// Convert to uppercase before sending back to client using a stream.
(&Method::POST, "/echo/uppercase") => {
let chunk_stream = req.into_body().map_ok(|chunk| {
chunk
.iter()
.map(|byte| byte.to_ascii_uppercase())
.collect::<Vec<u8>>()
});
Ok(Response::new(Body::wrap_stream(chunk_stream)))
}
// (&Method::POST, "/echo/uppercase") => {
// let chunk_stream = req.into_body().map_ok(|chunk| {
// chunk
// .iter()
// .map(|byte| byte.to_ascii_uppercase())
// .collect::<Vec<u8>>()
// });
// Ok(Response::new(Body::wrap_stream(chunk_stream)))
// }

// Reverse the entire body before sending back to the client.
//
Expand Down
11 changes: 2 additions & 9 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#![deny(warnings)]

use tokio::fs::File;

use tokio_util::codec::{BytesCodec, FramedRead};

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Result, Server, StatusCode};

Expand Down Expand Up @@ -48,11 +44,8 @@ fn not_found() -> Response<Body> {
}

async fn simple_file_send(filename: &str) -> Result<Response<Body>> {
// Serve a file by asynchronously reading it by chunks using tokio-util crate.

if let Ok(file) = File::open(filename).await {
let stream = FramedRead::new(file, BytesCodec::new());
let body = Body::wrap_stream(stream);
if let Ok(contents) = tokio::fs::read(filename).await {
let body = contents.into();
return Ok(Response::new(body));
}

Expand Down
15 changes: 3 additions & 12 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![deny(warnings)]

use bytes::Buf;
use futures_util::{stream, StreamExt};
use hyper::client::HttpConnector;
use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode};
Expand All @@ -24,18 +23,10 @@ async fn client_request_response(client: &Client<HttpConnector>) -> Result<Respo
.unwrap();

let web_res = client.request(req).await?;
// Compare the JSON we sent (before) with what we received (after):
let before = stream::once(async {
Ok(format!(
"<b>POST request body</b>: {}<br><b>Response</b>: ",
POST_DATA,
)
.into())
});
let after = web_res.into_body();
let body = Body::wrap_stream(before.chain(after));

Ok(Response::new(body))
let res_body = web_res.into_body();

Ok(Response::new(res_body))
}

async fn api_post_response(req: Request<Body>) -> Result<Response<Body>> {
Expand Down
82 changes: 0 additions & 82 deletions src/body/body.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
use std::borrow::Cow;
#[cfg(feature = "stream")]
use std::error::Error as StdError;
use std::fmt;

use bytes::Bytes;
use futures_channel::mpsc;
use futures_channel::oneshot;
use futures_core::Stream; // for mpsc::Receiver
#[cfg(feature = "stream")]
use futures_util::TryStreamExt;
use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use super::DecodedLength;
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::Future;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
use crate::common::Never;
Expand Down Expand Up @@ -56,12 +50,6 @@ enum Kind {
},
#[cfg(feature = "ffi")]
Ffi(crate::ffi::UserBody),
#[cfg(feature = "stream")]
Wrapped(
SyncWrapper<
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
>,
),
}

struct Extra {
Expand Down Expand Up @@ -164,39 +152,6 @@ impl Body {
(tx, rx)
}

/// Wrap a futures `Stream` in a box inside `Body`.
///
/// # Example
///
/// ```
/// # use hyper::Body;
/// let chunks: Vec<Result<_, std::io::Error>> = vec![
/// Ok("hello"),
/// Ok(" "),
/// Ok("world"),
/// ];
///
/// let stream = futures_util::stream::iter(chunks);
///
/// let body = Body::wrap_stream(stream);
/// ```
///
/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn wrap_stream<S, O, E>(stream: S) -> Body
where
S: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn StdError + Send + Sync>> + 'static,
{
let mapped = stream.map_ok(Into::into).map_err(Into::into);
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
}

fn new(kind: Kind) -> Body {
Body { kind, extra: None }
}
Expand Down Expand Up @@ -329,12 +284,6 @@ impl Body {

#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_data(cx),

#[cfg(feature = "stream")]
Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
None => Poll::Ready(None),
},
}
}

Expand Down Expand Up @@ -405,8 +354,6 @@ impl HttpBody for Body {
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
#[cfg(feature = "ffi")]
Kind::Ffi(..) => false,
#[cfg(feature = "stream")]
Kind::Wrapped(..) => false,
}
}

Expand All @@ -426,8 +373,6 @@ impl HttpBody for Body {
match self.kind {
Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
Kind::Once(None) => SizeHint::with_exact(0),
#[cfg(feature = "stream")]
Kind::Wrapped(..) => SizeHint::default(),
Kind::Chan { content_length, .. } => opt_len!(content_length),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 { content_length, .. } => opt_len!(content_length),
Expand Down Expand Up @@ -457,33 +402,6 @@ impl fmt::Debug for Body {
}
}

/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl Stream for Body {
type Item = crate::Result<Bytes>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
HttpBody::poll_data(self, cx)
}
}

/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
#[inline]
fn from(
stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
) -> Body {
Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
}
}

impl From<Bytes> for Body {
#[inline]
fn from(chunk: Bytes) -> Body {
Expand Down
5 changes: 1 addition & 4 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ pub(crate) mod io;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
mod lazy;
mod never;
#[cfg(any(
feature = "stream",
all(feature = "client", any(feature = "http1", feature = "http2"))
))]
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
pub(crate) mod sync_wrapper;
pub(crate) mod task;
pub(crate) mod watch;
Expand Down
6 changes: 3 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(super) enum Kind {
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
HeaderTimeout,
/// Error while reading a body from connection.
#[cfg(any(feature = "http1", feature = "http2", feature = "stream"))]
#[cfg(any(feature = "http1", feature = "http2"))]
Body,
/// Error while writing a body to connection.
#[cfg(any(feature = "http1", feature = "http2"))]
Expand Down Expand Up @@ -294,7 +294,7 @@ impl Error {
Error::new(Kind::ChannelClosed)
}

#[cfg(any(feature = "http1", feature = "http2", feature = "stream"))]
#[cfg(any(feature = "http1", feature = "http2"))]
pub(super) fn new_body<E: Into<Cause>>(cause: E) -> Error {
Error::new(Kind::Body).with(cause)
}
Expand Down Expand Up @@ -440,7 +440,7 @@ impl Error {
Kind::Accept => "error accepting connection",
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
Kind::HeaderTimeout => "read header from client timeout",
#[cfg(any(feature = "http1", feature = "http2", feature = "stream"))]
#[cfg(any(feature = "http1", feature = "http2"))]
Kind::Body => "error reading a body from connection",
#[cfg(any(feature = "http1", feature = "http2"))]
Kind::BodyWrite => "error writing a body to connection",
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
//! - `runtime`: Enables convenient integration with `tokio`, providing
//! connectors and acceptors for TCP, and a default executor.
//! - `tcp`: Enables convenient implementations over TCP (using tokio).
//! - `stream`: Provides `futures::Stream` capabilities.
//!
//! [feature flags]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section

Expand Down
Loading

0 comments on commit ce72f73

Please sign in to comment.