Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(body): rename Recv to Incoming #3022

Merged
merged 1 commit into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ use std::net::SocketAddr;

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::body::Body as _;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Recv, Request, Response, StatusCode};
use hyper::{body::Body, Method, Request, Response, StatusCode};
use tokio::net::TcpListener;

/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
async fn echo(req: Request<Recv>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
async fn echo(
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match (req.method(), req.uri().path()) {
// Serve some instructions at /
(&Method::GET, "/") => Ok(Response::new(full(
Expand Down
4 changes: 2 additions & 2 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Recv, Request, Response};
use hyper::{Request, Response};
use tokio::net::TcpListener;

async fn hello(_: Request<Recv>) -> Result<Response<Full<Bytes>>, Infallible> {
async fn hello(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello World!"))))
}

Expand Down
6 changes: 4 additions & 2 deletions examples/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use hyper::client::conn::http1::Builder;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::upgrade::Upgraded;
use hyper::{Method, Recv, Request, Response};
use hyper::{Method, Request, Response};

use tokio::net::{TcpListener, TcpStream};

Expand Down Expand Up @@ -43,7 +43,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

async fn proxy(req: Request<Recv>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
async fn proxy(
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
println!("req: {:?}", req);

if Method::CONNECT == req.method() {
Expand Down
6 changes: 3 additions & 3 deletions examples/multi_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ use futures_util::future::join;
use http_body_util::Full;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Recv, Request, Response};
use hyper::{Request, Response};
use tokio::net::TcpListener;

static INDEX1: &[u8] = b"The 1st service!";
static INDEX2: &[u8] = b"The 2nd service!";

async fn index1(_: Request<Recv>) -> Result<Response<Full<Bytes>>, hyper::Error> {
async fn index1(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::new(Full::new(Bytes::from(INDEX1))))
}

async fn index2(_: Request<Recv>) -> Result<Response<Full<Bytes>>, hyper::Error> {
async fn index2(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::new(Full::new(Bytes::from(INDEX2))))
}

Expand Down
4 changes: 2 additions & 2 deletions examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Recv, Request, Response, StatusCode};
use hyper::{Method, Request, Response, StatusCode};
use tokio::net::TcpListener;

use std::collections::HashMap;
Expand All @@ -19,7 +19,7 @@ static NOTNUMERIC: &[u8] = b"Number field is not numeric";

// Using service_fn, we can turn this function into a `Service`.
async fn param_example(
req: Request<Recv>,
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, Infallible>>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(full(INDEX))),
Expand Down
4 changes: 2 additions & 2 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::net::TcpListener;
use bytes::Bytes;
use http_body_util::Full;
use hyper::service::service_fn;
use hyper::{Method, Recv, Request, Response, Result, StatusCode};
use hyper::{Method, Request, Response, Result, StatusCode};

static INDEX: &str = "examples/send_file_index.html";
static NOTFOUND: &[u8] = b"Not Found";
Expand Down Expand Up @@ -36,7 +36,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}
}

async fn response_examples(req: Request<Recv>) -> Result<Response<Full<Bytes>>> {
async fn response_examples(req: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/index.html") => simple_file_send(INDEX).await,
(&Method::GET, "/no_file.html") => {
Expand Down
6 changes: 3 additions & 3 deletions examples/service_struct_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::http1;
use hyper::service::Service;
use hyper::{Recv, Request, Response};
use hyper::{body::Incoming as IncomingBody, Request, Response};
use tokio::net::TcpListener;

use std::future::Future;
Expand Down Expand Up @@ -36,12 +36,12 @@ struct Svc {
counter: Counter,
}

impl Service<Request<Recv>> for Svc {
impl Service<Request<IncomingBody>> for Svc {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn call(&mut self, req: Request<Recv>) -> Self::Future {
fn call(&mut self, req: Request<IncomingBody>) -> Self::Future {
fn mk_response(s: String) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::builder().body(Full::new(Bytes::from(s))).unwrap())
}
Expand Down
4 changes: 2 additions & 2 deletions examples/upgrades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hyper::header::{HeaderValue, UPGRADE};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::upgrade::Upgraded;
use hyper::{Recv, Request, Response, StatusCode};
use hyper::{Request, Response, StatusCode};

// A simple type alias so as to DRY.
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand All @@ -38,7 +38,7 @@ async fn server_upgraded_io(mut upgraded: Upgraded) -> Result<()> {
}

/// Our server HTTP handler to initiate HTTP upgrades.
async fn server_upgrade(mut req: Request<Recv>) -> Result<Response<Empty<Bytes>>> {
async fn server_upgrade(mut req: Request<hyper::body::Incoming>) -> Result<Response<Empty<Bytes>>> {
let mut res = Response::new(Empty::new());

// Send a 400 to any request that doesn't have
Expand Down
6 changes: 3 additions & 3 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bytes::{Buf, Bytes};
use http_body_util::{BodyExt, Full};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{header, Method, Recv, Request, Response, StatusCode};
use hyper::{body::Incoming as IncomingBody, header, Method, Request, Response, StatusCode};
use tokio::net::{TcpListener, TcpStream};

type GenericError = Box<dyn std::error::Error + Send + Sync>;
Expand Down Expand Up @@ -46,7 +46,7 @@ async fn client_request_response() -> Result<Response<BoxBody>> {
Ok(Response::new(res_body))
}

async fn api_post_response(req: Request<Recv>) -> Result<Response<BoxBody>> {
async fn api_post_response(req: Request<IncomingBody>) -> Result<Response<BoxBody>> {
// Aggregate the body...
let whole_body = req.collect().await?.aggregate();
// Decode as JSON...
Expand Down Expand Up @@ -77,7 +77,7 @@ async fn api_get_response() -> Result<Response<BoxBody>> {
Ok(res)
}

async fn response_examples(req: Request<Recv>) -> Result<Response<BoxBody>> {
async fn response_examples(req: Request<IncomingBody>) -> Result<Response<BoxBody>> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/index.html") => Ok(Response::new(full(INDEX))),
(&Method::GET, "/test.html") => client_request_response().await,
Expand Down
57 changes: 29 additions & 28 deletions src/body/body.rs → src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type TrailersSender = oneshot::Sender<HeaderMap>;

/// A stream of `Bytes`, used when receiving bodies from the network.
#[must_use = "streams do nothing unless polled"]
pub struct Recv {
pub struct Incoming {
kind: Kind,
}

Expand Down Expand Up @@ -65,17 +65,17 @@ pub(crate) struct Sender {
const WANT_PENDING: usize = 1;
const WANT_READY: usize = 2;

impl Recv {
impl Incoming {
/// Create a `Body` stream with an associated sender half.
///
/// Useful when wanting to stream chunks from another thread.
#[inline]
#[allow(unused)]
pub(crate) fn channel() -> (Sender, Recv) {
pub(crate) fn channel() -> (Sender, Incoming) {
Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
}

pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Recv) {
pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
let (data_tx, data_rx) = mpsc::channel(0);
let (trailers_tx, trailers_rx) = oneshot::channel();

Expand All @@ -90,7 +90,7 @@ impl Recv {
data_tx,
trailers_tx: Some(trailers_tx),
};
let rx = Recv::new(Kind::Chan {
let rx = Incoming::new(Kind::Chan {
content_length,
want_tx,
data_rx,
Expand All @@ -100,18 +100,18 @@ impl Recv {
(tx, rx)
}

fn new(kind: Kind) -> Recv {
Recv { kind }
fn new(kind: Kind) -> Incoming {
Incoming { kind }
}

#[allow(dead_code)]
pub(crate) fn empty() -> Recv {
Recv::new(Kind::Empty)
pub(crate) fn empty() -> Incoming {
Incoming::new(Kind::Empty)
}

#[cfg(feature = "ffi")]
pub(crate) fn ffi() -> Recv {
Recv::new(Kind::Ffi(crate::ffi::UserBody::new()))
pub(crate) fn ffi() -> Incoming {
Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
}

#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Expand All @@ -125,7 +125,7 @@ impl Recv {
if !content_length.is_exact() && recv.is_end_stream() {
content_length = DecodedLength::ZERO;
}
let body = Recv::new(Kind::H2 {
let body = Incoming::new(Kind::H2 {
data_done: false,
ping,
content_length,
Expand All @@ -151,7 +151,7 @@ impl Recv {
}
}

impl Body for Recv {
impl Body for Incoming {
type Data = Bytes;
type Error = crate::Error;

Expand Down Expand Up @@ -259,7 +259,7 @@ impl Body for Recv {
}
}

impl fmt::Debug for Recv {
impl fmt::Debug for Incoming {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[derive(Debug)]
struct Streaming;
Expand Down Expand Up @@ -375,15 +375,15 @@ mod tests {
use std::mem;
use std::task::Poll;

use super::{Body, DecodedLength, Recv, Sender, SizeHint};
use super::{Body, DecodedLength, Incoming, Sender, SizeHint};
use http_body_util::BodyExt;

#[test]
fn test_size_of() {
// These are mostly to help catch *accidentally* increasing
// the size by too much.

let body_size = mem::size_of::<Recv>();
let body_size = mem::size_of::<Incoming>();
let body_expected_size = mem::size_of::<u64>() * 5;
assert!(
body_size <= body_expected_size,
Expand All @@ -392,7 +392,7 @@ mod tests {
body_expected_size,
);

//assert_eq!(body_size, mem::size_of::<Option<Recv>>(), "Option<Recv>");
//assert_eq!(body_size, mem::size_of::<Option<Incoming>>(), "Option<Incoming>");

assert_eq!(
mem::size_of::<Sender>(),
Expand All @@ -409,18 +409,18 @@ mod tests {

#[test]
fn size_hint() {
fn eq(body: Recv, b: SizeHint, note: &str) {
fn eq(body: Incoming, b: SizeHint, note: &str) {
let a = body.size_hint();
assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
}

eq(Recv::empty(), SizeHint::with_exact(0), "empty");
eq(Incoming::empty(), SizeHint::with_exact(0), "empty");

eq(Recv::channel().1, SizeHint::new(), "channel");
eq(Incoming::channel().1, SizeHint::new(), "channel");

eq(
Recv::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
SizeHint::with_exact(4),
"channel with length",
);
Expand All @@ -429,7 +429,7 @@ mod tests {
#[cfg(not(miri))]
#[tokio::test]
async fn channel_abort() {
let (tx, mut rx) = Recv::channel();
let (tx, mut rx) = Incoming::channel();

tx.abort();

Expand All @@ -440,7 +440,7 @@ mod tests {
#[cfg(all(not(miri), feature = "http1"))]
#[tokio::test]
async fn channel_abort_when_buffer_is_full() {
let (mut tx, mut rx) = Recv::channel();
let (mut tx, mut rx) = Incoming::channel();

tx.try_send_data("chunk 1".into()).expect("send 1");
// buffer is full, but can still send abort
Expand All @@ -462,7 +462,7 @@ mod tests {
#[cfg(feature = "http1")]
#[test]
fn channel_buffers_one() {
let (mut tx, _rx) = Recv::channel();
let (mut tx, _rx) = Incoming::channel();

tx.try_send_data("chunk 1".into()).expect("send 1");

Expand All @@ -474,14 +474,14 @@ mod tests {
#[cfg(not(miri))]
#[tokio::test]
async fn channel_empty() {
let (_, mut rx) = Recv::channel();
let (_, mut rx) = Incoming::channel();

assert!(rx.frame().await.is_none());
}

#[test]
fn channel_ready() {
let (mut tx, _rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);

let mut tx_ready = tokio_test::task::spawn(tx.ready());

Expand All @@ -490,7 +490,8 @@ mod tests {

#[test]
fn channel_wanter() {
let (mut tx, mut rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
let (mut tx, mut rx) =
Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);

let mut tx_ready = tokio_test::task::spawn(tx.ready());
let mut rx_data = tokio_test::task::spawn(rx.frame());
Expand All @@ -511,7 +512,7 @@ mod tests {

#[test]
fn channel_notices_closure() {
let (mut tx, rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);

let mut tx_ready = tokio_test::task::spawn(tx.ready());

Expand Down
Loading