Skip to content

Commit

Permalink
feat: Update to hyper 1.0
Browse files Browse the repository at this point in the history
Fixes #24.

hyper has its own IO traits. The dependency on
tokio-io-timeout is dropped. Its types are implemented here,
implementing hyper IO traits instead of tokio.

- `tokio::io::AsyncRead` is switched to `hyper::rt::Read`, and AsyncWrite
to `hyper::rt::Write`.
- `tokio::io::AsyncSeek` is no longer needed.

The wrapper type `TimeoutConnectorStream` that formerly wrapped
tokio_io_timeout::TimeoutStream` is no longer needed.

`tokio::io` has the handy AsyncReadExt and AsyncWriteExt traits, which
were preivously used in tests. Now that we rely on Hyper IO, those had
to be ported over:

- AsyncReadExt -> ReadExt
- AsyncWriteExt -> WriteExt
- tokio internal Read -> ReadFut
- tokio internal Write -> WriteFut
  • Loading branch information
allan2 authored and hjr3 committed Dec 3, 2023
1 parent 92d5610 commit 24f6fca
Show file tree
Hide file tree
Showing 5 changed files with 688 additions and 100 deletions.
14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hyper-timeout"
version = "0.4.1"
version = "0.5.0"
authors = ["Herman J. Radtke III <[email protected]>"]
edition = "2018"
description = "A connect, read and write timeout aware connector to be used with hyper Client."
Expand All @@ -11,12 +11,14 @@ repository = "https://github.com/hjr3/hyper-timeout"
readme = "README.md"

[dependencies]
hyper = { version = "0.14.2", features = ["client"] }
hyper = "1.0"
hyper-util = { version = "0.1", features = ["client-legacy", "http1"] }
pin-project-lite = "0.2"
tokio = "1.0.0"
tokio-io-timeout = "1.1.0"
tokio = { version = "1.34.0" }
tower-service = "0.3"

[dev-dependencies]
hyper = { version = "0.14", features = ["client", "http1", "tcp"] }
hyper-tls = "0.5"
tokio = { version = "1.0.0", features = ["io-std", "io-util", "macros"] }
hyper = { version = "1.0", features = ["http1"] }
hyper-tls = "0.6"
http-body-util = "0.1"
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ There is a `TimeoutConnector` that implements the `hyper::Connect` trait. This c
Hyper version compatibility:

* The `master` branch will track on going development for hyper.
* The `0.5` release supports hyper 1.0.
* The `0.4` release supports hyper 0.14.
* The `0.3` release supports hyper 0.13.
* The `0.2` release supports hyper 0.12.
* The `0.1` release supports hyper 0.11.

Assuming you are using hyper 0.14, add this to your `Cargo.toml`:
Assuming you are using hyper 1.0, add this to your `Cargo.toml`:

```toml
[dependencies]
hyper-timeout = "0.4"
hyper-timeout = "0.5"
```

See the [client example](./examples/client.rs) for a working example.
Expand Down
19 changes: 12 additions & 7 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::env;
use std::time::Duration;

use hyper::{body::HttpBody as _, Client};
use tokio::io::{self, AsyncWriteExt as _};
use http_body_util::{BodyExt, Empty};
use hyper::body::Bytes;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use tokio::io::{self, AsyncWriteExt};

use hyper_tls::HttpsConnector;

Expand All @@ -22,22 +24,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = url.parse::<hyper::Uri>().unwrap();

// This example uses `HttpsConnector`, but you can also use hyper `HttpConnector`
//let h = hyper::client::HttpConnector::new();
//let h = hyper_util::client::legacy::connect::HttpConnector::new();
let h = HttpsConnector::new();
let mut connector = TimeoutConnector::new(h);
connector.set_connect_timeout(Some(Duration::from_secs(5)));
connector.set_read_timeout(Some(Duration::from_secs(5)));
connector.set_write_timeout(Some(Duration::from_secs(5)));
let client = Client::builder().build::<_, hyper::Body>(connector);
let client = Client::builder(TokioExecutor::new()).build::<_, Empty<Bytes>>(connector);

let mut res = client.get(url).await?;

println!("Status: {}", res.status());
println!("Headers:\n{:#?}", res.headers());

while let Some(chunk) = res.body_mut().data().await {
let chunk = chunk?;
io::stdout().write_all(&chunk).await?
while let Some(frame) = res.body_mut().frame().await {
let bytes = frame?
.into_data()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Error when consuming frame"))?;
io::stdout().write_all(&bytes).await?;
}

Ok(())
}
60 changes: 29 additions & 31 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use tokio::io::{AsyncRead, AsyncWrite};
use hyper::rt::{Read, Write};
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;

use hyper::client::connect::{Connected, Connection};
use hyper::{service::Service, Uri};
use hyper::Uri;
use hyper_util::client::legacy::connect::{Connected, Connection};
use tower_service::Service;

mod stream;

use stream::TimeoutConnectorStream;
use stream::TimeoutStream;

type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// A connector that enforces as connection timeout
/// A connector that enforces a connection timeout
#[derive(Debug, Clone)]
pub struct TimeoutConnector<T> {
/// A connector implementing the `Connect` trait
Expand All @@ -33,7 +32,7 @@ pub struct TimeoutConnector<T> {
impl<T> TimeoutConnector<T>
where
T: Service<Uri> + Send,
T::Response: AsyncRead + AsyncWrite + Send + Unpin,
T::Response: Read + Write + Send + Unpin,
T::Future: Send + 'static,
T::Error: Into<BoxError>,
{
Expand All @@ -51,11 +50,11 @@ where
impl<T> Service<Uri> for TimeoutConnector<T>
where
T: Service<Uri> + Send,
T::Response: AsyncRead + AsyncWrite + Connection + Send + Unpin,
T::Response: Read + Write + Connection + Send + Unpin,
T::Future: Send + 'static,
T::Error: Into<BoxError>,
{
type Response = Pin<Box<TimeoutConnectorStream<T::Response>>>;
type Response = Pin<Box<TimeoutStream<T::Response>>>;
type Error = BoxError;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
Expand All @@ -71,7 +70,7 @@ where
let connecting = self.connector.call(dst);

let fut = async move {
let stream = match connect_timeout {
let mut stream = match connect_timeout {
None => {
let io = connecting.await.map_err(Into::into)?;
TimeoutStream::new(io)
Expand All @@ -85,11 +84,9 @@ where
TimeoutStream::new(io)
}
};

let mut tm = TimeoutConnectorStream::new(stream);
tm.set_read_timeout(read_timeout);
tm.set_write_timeout(write_timeout);
Ok(Box::pin(tm))
stream.set_read_timeout(read_timeout);
stream.set_write_timeout(write_timeout);
Ok(Box::pin(stream))
};

Box::pin(fut)
Expand Down Expand Up @@ -124,8 +121,8 @@ impl<T> TimeoutConnector<T> {

impl<T> Connection for TimeoutConnector<T>
where
T: AsyncRead + AsyncWrite + Connection + Service<Uri> + Send + Unpin,
T::Response: AsyncRead + AsyncWrite + Send + Unpin,
T: Read + Write + Connection + Service<Uri> + Send + Unpin,
T::Response: Read + Write + Send + Unpin,
T::Future: Send + 'static,
T::Error: Into<BoxError>,
{
Expand All @@ -136,12 +133,15 @@ where

#[cfg(test)]
mod tests {
use std::error::Error;
use std::io;
use std::time::Duration;
use std::{error::Error, io};

use hyper::client::HttpConnector;
use hyper::Client;
use http_body_util::Empty;
use hyper::body::Bytes;
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor,
};

use super::TimeoutConnector;

Expand All @@ -154,7 +154,7 @@ mod tests {
let mut connector = TimeoutConnector::new(http);
connector.set_connect_timeout(Some(Duration::from_millis(1)));

let client = Client::builder().build::<_, hyper::Body>(connector);
let client = Client::builder(TokioExecutor::new()).build::<_, Empty<Bytes>>(connector);

let res = client.get(url).await;

Expand All @@ -179,19 +179,17 @@ mod tests {
// A 1 ms read timeout should be so short that we trigger a timeout error
connector.set_read_timeout(Some(Duration::from_millis(1)));

let client = Client::builder().build::<_, hyper::Body>(connector);
let client = Client::builder(TokioExecutor::new()).build::<_, Empty<Bytes>>(connector);

let res = client.get(url).await;

match res {
Ok(_) => panic!("Expected a timeout"),
Err(e) => {
if let Some(io_e) = e.source().unwrap().downcast_ref::<io::Error>() {
assert_eq!(io_e.kind(), io::ErrorKind::TimedOut);
} else {
panic!("Expected timeout error");
if let Err(client_e) = res {
if let Some(hyper_e) = client_e.source() {
if let Some(io_e) = hyper_e.source().unwrap().downcast_ref::<io::Error>() {
return assert_eq!(io_e.kind(), io::ErrorKind::TimedOut);
}
}
}
panic!("Expected timeout error");
}
}
Loading

0 comments on commit 24f6fca

Please sign in to comment.