Skip to content

Commit

Permalink
feat(body): make body::Sender and Body::channel private (#2970)
Browse files Browse the repository at this point in the history
Closes #2962 

BREAKING CHANGE: A channel body will be available in `hyper-util`.
  • Loading branch information
oddgrd authored Aug 29, 2022
1 parent 7a41da5 commit d963e6a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
18 changes: 11 additions & 7 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ enum Kind {
/// [`Body::channel()`]: struct.Body.html#method.channel
/// [`Sender::abort()`]: struct.Sender.html#method.abort
#[must_use = "Sender does nothing unless sent on"]
pub struct Sender {
pub(crate) struct Sender {
want_rx: watch::Receiver,
data_tx: BodySender,
trailers_tx: Option<TrailersSender>,
Expand All @@ -75,7 +75,8 @@ impl Recv {
///
/// Useful when wanting to stream chunks from another thread.
#[inline]
pub fn channel() -> (Sender, Recv) {
#[allow(unused)]
pub(crate) fn channel() -> (Sender, Recv) {
Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
}

Expand Down Expand Up @@ -289,7 +290,7 @@ impl fmt::Debug for Recv {

impl Sender {
/// Check to see if this `Sender` can send more data.
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
// Check if the receiver end has tried polling for the body yet
ready!(self.poll_want(cx)?);
self.data_tx
Expand All @@ -311,15 +312,17 @@ impl Sender {
}

/// Send data on data channel when it is ready.
pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
#[allow(unused)]
pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
self.ready().await?;
self.data_tx
.try_send(Ok(chunk))
.map_err(|_| crate::Error::new_closed())
}

/// Send trailers on trailers channel.
pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
#[allow(unused)]
pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
let tx = match self.trailers_tx.take() {
Some(tx) => tx,
None => return Err(crate::Error::new_closed()),
Expand All @@ -339,14 +342,15 @@ impl Sender {
/// This is mostly useful for when trying to send from some other thread
/// that doesn't have an async context. If in an async context, prefer
/// `send_data()` instead.
pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
self.data_tx
.try_send(Ok(chunk))
.map_err(|err| err.into_inner().expect("just sent Ok"))
}

/// Aborts the body in an abnormal fashion.
pub fn abort(self) {
#[allow(unused)]
pub(crate) fn abort(self) {
let _ = self
.data_tx
// clone so the send works even if buffer is full
Expand Down
3 changes: 2 additions & 1 deletion src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ pub use http_body::Body as HttpBody;
pub use http_body::SizeHint;

pub use self::aggregate::aggregate;
pub use self::body::{Recv, Sender};
pub use self::body::Recv;
pub(crate) use self::body::Sender;
pub(crate) use self::length::DecodedLength;
pub use self::to_bytes::to_bytes;

Expand Down
30 changes: 19 additions & 11 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,7 @@ test! {
}

mod conn {
use std::error::Error;
use std::io::{self, Read, Write};
use std::net::{SocketAddr, TcpListener};
use std::pin::Pin;
Expand All @@ -1333,15 +1334,15 @@ mod conn {
use std::time::Duration;

use bytes::{Buf, Bytes};
use futures_channel::oneshot;
use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt};
use http_body_util::Empty;
use hyper::upgrade::OnUpgrade;
use http_body_util::{Empty, StreamBody};
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf};
use tokio::net::{TcpListener as TkTcpListener, TcpStream};

use hyper::body::HttpBody;
use hyper::client::conn;
use hyper::upgrade::OnUpgrade;
use hyper::{self, Method, Recv, Request, Response, StatusCode};

use super::{concat, s, support, tcp_connect, FutureHyperExt};
Expand Down Expand Up @@ -1524,17 +1525,23 @@ mod conn {

rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ()));

let (mut sender, body) = Recv::channel();
let (mut sender, recv) = mpsc::channel::<Result<Bytes, Box<dyn Error + Send + Sync>>>(0);

let sender = thread::spawn(move || {
sender.try_send_data("hello".into()).expect("try_send_data");
sender.try_send(Ok("hello".into())).expect("try_send_data");
support::runtime().block_on(rx).unwrap();
sender.abort();

// Aborts the body in an abnormal fashion.
let _ = sender.try_send(Err(Box::new(std::io::Error::new(
io::ErrorKind::Other,
"body write aborted",
))));
});

let req = Request::builder()
.method(Method::POST)
.uri("/")
.body(body)
.body(StreamBody::new(recv))
.unwrap();
let res = client.send_request(req);
rt.block_on(res).unwrap_err();
Expand Down Expand Up @@ -2111,7 +2118,7 @@ mod conn {
.http2_only(true)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(1))
.handshake::<_, Recv>(io)
.handshake(io)
.await
.expect("http handshake");

Expand All @@ -2120,9 +2127,10 @@ mod conn {
});

// Use a channel to keep request stream open
let (_tx, body) = hyper::Recv::channel();
let req1 = http::Request::new(body);
let _resp = client.send_request(req1).await.expect("send_request");
let (_tx, recv) = mpsc::channel::<Result<Bytes, Box<dyn Error + Send + Sync>>>(0);
let req = http::Request::new(StreamBody::new(recv));

let _resp = client.send_request(req).await.expect("send_request");

// sleep longer than keepalive would trigger
tokio::time::sleep(Duration::from_secs(4)).await;
Expand Down

0 comments on commit d963e6a

Please sign in to comment.