Skip to content

Commit

Permalink
feat(client): add connect, read, and write timeouts to client
Browse files Browse the repository at this point in the history
- Add a `TimeoutConnector` implememtation that can be used to wrap any
  implemention of `Connect`.
- Add read and write timeout support to `Client` and client `Config`.
  The internal client implementation now wraps I/O with the
  tokio-io-timeout crate. Due to the way tokio-proto works, the user
  will see a "broken pipe" error instead of a "timed out" error when
  a read or write timeout occurs.

Closes hyperium#1234
  • Loading branch information
hjr3 committed Jul 12, 2017
1 parent 258584a commit da5a074
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tokio-core = "0.1.6"
tokio-proto = "0.1"
tokio-service = "0.1"
tokio-io = "0.1"
tokio-io-timeout = "0.1.0"
unicase = "2.0"

[dev-dependencies]
Expand Down
86 changes: 84 additions & 2 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ use std::fmt;
use std::io;
use std::mem;
//use std::net::SocketAddr;
use std::time::Duration;

use futures::{Future, Poll, Async};
use futures::future::Either;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio::reactor::{Handle, Timeout};
use tokio::net::{TcpStream, TcpStreamNew};
use tokio_service::Service;
use Uri;
Expand Down Expand Up @@ -242,11 +244,77 @@ impl<S: SslClient> HttpsConnector<S> {
}
*/

/// A connector that enforces as connection timeout
#[derive(Debug)]
pub struct TimeoutConnector<T> {
/// A connector implementing the `Connect` trait
connector: T,
/// Handle to be used to set the timeout within tokio's core
handle: Handle,
/// Amount of time to wait connecting
connect_timeout: Duration,
}

impl TimeoutConnector<HttpConnector> {
/// Construct a new TimeoutConnector
///
/// Takes number of DNS worker threads
///
/// This uses hyper's default `HttpConnector`. If you wish to use something besides the defaults,
/// create the connector and then use `TimeoutConnector::with_connector`.
pub fn new(threads: usize, handle: &Handle, timeout: Duration) -> Self {
let http = HttpConnector::new(threads, handle);
TimeoutConnector::with_connector(http, handle, timeout)
}
}

impl<T: Connect> TimeoutConnector<T> {
/// Construct a new TimeoutConnector with a given connector implementing the `Connect` trait
pub fn with_connector(connector: T, handle: &Handle, timeout: Duration) -> Self {
TimeoutConnector {
connector: connector,
handle: handle.clone(),
connect_timeout: timeout,
}
}
}

impl<T> Service for TimeoutConnector<T>
where T: Service<Error=io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error=io::Error>,
{
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;

fn call(&self, req: Self::Request) -> Self::Future {
let connecting = self.connector.call(req);
let timeout = Timeout::new(self.connect_timeout, &self.handle).unwrap();

Box::new(connecting.select2(timeout).then(|res| {
match res {
Ok(Either::A((connecting, _))) => Ok(connecting),
Ok(Either::B((_, _))) => {
Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client timed out while connecting"
))
}
Err(Either::A((e, _))) => Err(e),
Err(Either::B((e, _))) => Err(e),
}
}))
}
}

#[cfg(test)]
mod tests {
use std::io;
use std::time::Duration;
use tokio::reactor::Core;
use super::{Connect, HttpConnector};
use super::{Connect, HttpConnector, TimeoutConnector};

#[test]
fn test_errors_missing_authority() {
Expand Down Expand Up @@ -275,4 +343,18 @@ mod tests {

assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}

#[test]
fn test_timeout_connector() {
let mut core = Core::new().unwrap();
// 10.255.255.1 is a not a routable IP address
let url = "http://10.255.255.1".parse().unwrap();
let connector = TimeoutConnector::with_connector(
HttpConnector::new(1, &core.handle()),
&core.handle(),
Duration::from_millis(1)
);

assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::TimedOut);
}
}
56 changes: 50 additions & 6 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio_proto::streaming::Message;
use tokio_proto::streaming::pipeline::ClientProto;
use tokio_proto::util::client_proxy::ClientProxy;
pub use tokio_service::Service;
use tokio_io_timeout::TimeoutStream;

use header::{Headers, Host};
use http::{self, TokioBody};
Expand All @@ -27,7 +28,7 @@ use uri::{self, Uri};

pub use http::response::Response;
pub use http::request::Request;
pub use self::connect::{HttpConnector, Connect};
pub use self::connect::{HttpConnector, Connect, TimeoutConnector};

mod connect;
mod dns;
Expand All @@ -39,6 +40,9 @@ pub struct Client<C, B = http::Body> {
connector: C,
handle: Handle,
pool: Pool<TokioClient<B>>,
//connect_timeout: Option<Duration>,
read_timeout: Option<Duration>,
write_timeout: Option<Duration>,
}

impl Client<HttpConnector, http::Body> {
Expand Down Expand Up @@ -87,6 +91,9 @@ impl<C, B> Client<C, B> {
connector: config.connector,
handle: handle.clone(),
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
//connect_timeout: config.connect_timeout,
read_timeout: config.read_timeout,
write_timeout: config.write_timeout,
}
}
}
Expand Down Expand Up @@ -162,8 +169,13 @@ where C: Connect,
let handle = self.handle.clone();
let pool = self.pool.clone();
let pool_key = Rc::new(domain.to_string());
let read_timeout = self.read_timeout.clone();
let write_timeout = self.write_timeout.clone();
self.connector.connect(url)
.map(move |io| {
let mut io = TimeoutStream::new(io, &handle);
io.set_read_timeout(read_timeout);
io.set_write_timeout(write_timeout);
let (tx, rx) = oneshot::channel();
let client = HttpClient {
client_rx: RefCell::new(Some(rx)),
Expand Down Expand Up @@ -209,6 +221,9 @@ impl<C: Clone, B> Clone for Client<C, B> {
connector: self.connector.clone(),
handle: self.handle.clone(),
pool: self.pool.clone(),
//connect_timeout: self.connect_timeout.clone(),
read_timeout: self.read_timeout.clone(),
write_timeout: self.write_timeout.clone(),
}
}
}
Expand Down Expand Up @@ -273,7 +288,9 @@ where T: AsyncRead + AsyncWrite + 'static,
/// Configuration for a Client
pub struct Config<C, B> {
_body_type: PhantomData<B>,
//connect_timeout: Duration,
//connect_timeout: Option<Duration>,
read_timeout: Option<Duration>,
write_timeout: Option<Duration>,
connector: C,
keep_alive: bool,
keep_alive_timeout: Option<Duration>,
Expand All @@ -289,7 +306,9 @@ impl Default for Config<UseDefaultConnector, http::Body> {
fn default() -> Config<UseDefaultConnector, http::Body> {
Config {
_body_type: PhantomData::<http::Body>,
//connect_timeout: Duration::from_secs(10),
//connect_timeout: Some(Duration::from_secs(10)),
read_timeout: Some(Duration::from_secs(10)),
write_timeout: Some(Duration::from_secs(10)),
connector: UseDefaultConnector(()),
keep_alive: true,
keep_alive_timeout: Some(Duration::from_secs(90)),
Expand All @@ -312,11 +331,13 @@ impl<C, B> Config<C, B> {
pub fn body<BB>(self) -> Config<C, BB> {
Config {
_body_type: PhantomData::<BB>,
//connect_timeout: self.connect_timeout,
connector: self.connector,
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
//connect_timeout: self.connect_timeout,
read_timeout: self.read_timeout,
write_timeout: self.write_timeout,
}
}

Expand All @@ -325,11 +346,13 @@ impl<C, B> Config<C, B> {
pub fn connector<CC>(self, val: CC) -> Config<CC, B> {
Config {
_body_type: self._body_type,
//connect_timeout: self.connect_timeout,
connector: val,
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
//connect_timeout: self.connect_timeout,
read_timeout: self.read_timeout,
write_timeout: self.write_timeout,
}
}

Expand Down Expand Up @@ -358,11 +381,29 @@ impl<C, B> Config<C, B> {
///
/// Default is 10 seconds.
#[inline]
pub fn connect_timeout(mut self, val: Duration) -> Config<C, B> {
pub fn connect_timeout(mut self, val: Option<Duration>) -> Config<C, B> {
self.connect_timeout = val;
self
}
*/

/// Set the timeout for the response.
///
/// Default is 10 seconds.
#[inline]
pub fn read_timeout(mut self, val: Option<Duration>) -> Config<C, B> {
self.read_timeout = val;
self
}

/// Set the timeout for the request.
///
/// Default is 10 seconds.
#[inline]
pub fn write_timeout(mut self, val: Option<Duration>) -> Config<C, B> {
self.write_timeout = val;
self
}
}

impl<C, B> Config<C, B>
Expand Down Expand Up @@ -406,6 +447,9 @@ impl<C: Clone, B> Clone for Config<C, B> {
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
//connect_timeout: self.connect_timeout,
read_timeout: self.read_timeout,
write_timeout: self.write_timeout,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extern crate tokio_core as tokio;
extern crate tokio_proto;
extern crate tokio_service;
extern crate unicase;
extern crate tokio_io_timeout;

#[cfg(all(test, feature = "nightly"))]
extern crate test;
Expand Down
49 changes: 49 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extern crate futures;
extern crate tokio_core;
extern crate pretty_env_logger;

use std::error::Error;
use std::io::{self, Read, Write};
use std::net::TcpListener;
use std::thread;
Expand Down Expand Up @@ -262,6 +263,54 @@ fn client_keep_alive() {
core.run(res.join(rx).map(|r| r.0)).unwrap();
}

#[test]
fn client_read_timeout() {
let _ = pretty_env_logger::init();
let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let mut core = Core::new().unwrap();
let client = Client::configure().read_timeout(Some(Duration::from_millis(1))).build(&core.handle());

let (tx, rx) = oneshot::channel();
thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
let delay = Duration::from_millis(2);
thread::sleep(delay);
let _ = tx.send(());
});

let rx = rx.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
let res = client.get(format!("http://{}/a", addr).parse().unwrap());

// tokio-proto currently swallows the read timeout error and returns "broken pipe" insetead
assert_eq!(core.run(res.join(rx).map(|r| r.0)).unwrap_err().description(), "broken pipe");
}

#[test]
fn client_write_timeout() {
let _ = pretty_env_logger::init();
let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let mut core = Core::new().unwrap();
let client = Client::configure().write_timeout(Some(Duration::from_millis(1))).build(&core.handle());

let (tx, rx) = oneshot::channel();
thread::spawn(move || {
let _sock = server.accept().unwrap().0;
let delay = Duration::from_millis(2);
thread::sleep(delay);
let _ = tx.send(());
});

let rx = rx.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
let res = client.get(format!("http://{}/a", addr).parse().unwrap());

// tokio-proto currently swallows the write timeout error and returns "broken pipe" insetead
assert_eq!(core.run(res.join(rx).map(|r| r.0)).unwrap_err().description(), "broken pipe");
}


/* TODO: re-enable once retry works, its currently a flaky test
#[test]
Expand Down

0 comments on commit da5a074

Please sign in to comment.