From da5a074237981ec2709683d37a9c37b8845b3fb7 Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Thu, 6 Jul 2017 18:37:14 -0700 Subject: [PATCH] feat(client): add connect, read, and write timeouts to client - Add a `TimeoutConnector` implememtation that can be used to wrap any implemention of `Connect`. - Add read and write timeout support to `Client` and client `Config`. The internal client implementation now wraps I/O with the tokio-io-timeout crate. Due to the way tokio-proto works, the user will see a "broken pipe" error instead of a "timed out" error when a read or write timeout occurs. Closes #1234 --- Cargo.toml | 1 + src/client/connect.rs | 86 ++++++++++++++++++++++++++++++++++++++++++- src/client/mod.rs | 56 +++++++++++++++++++++++++--- src/lib.rs | 1 + tests/client.rs | 49 ++++++++++++++++++++++++ 5 files changed, 185 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 13f28b3b88..336b6a7b7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ tokio-core = "0.1.6" tokio-proto = "0.1" tokio-service = "0.1" tokio-io = "0.1" +tokio-io-timeout = "0.1.0" unicase = "2.0" [dev-dependencies] diff --git a/src/client/connect.rs b/src/client/connect.rs index 716e2f94e1..d205c26e18 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -3,10 +3,12 @@ use std::fmt; use std::io; use std::mem; //use std::net::SocketAddr; +use std::time::Duration; use futures::{Future, Poll, Async}; +use futures::future::Either; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio::reactor::Handle; +use tokio::reactor::{Handle, Timeout}; use tokio::net::{TcpStream, TcpStreamNew}; use tokio_service::Service; use Uri; @@ -242,11 +244,77 @@ impl HttpsConnector { } */ +/// A connector that enforces as connection timeout +#[derive(Debug)] +pub struct TimeoutConnector { + /// A connector implementing the `Connect` trait + connector: T, + /// Handle to be used to set the timeout within tokio's core + handle: Handle, + /// Amount of time to wait connecting + connect_timeout: Duration, +} + +impl TimeoutConnector { + /// Construct a new TimeoutConnector + /// + /// Takes number of DNS worker threads + /// + /// This uses hyper's default `HttpConnector`. If you wish to use something besides the defaults, + /// create the connector and then use `TimeoutConnector::with_connector`. + pub fn new(threads: usize, handle: &Handle, timeout: Duration) -> Self { + let http = HttpConnector::new(threads, handle); + TimeoutConnector::with_connector(http, handle, timeout) + } +} + +impl TimeoutConnector { + /// Construct a new TimeoutConnector with a given connector implementing the `Connect` trait + pub fn with_connector(connector: T, handle: &Handle, timeout: Duration) -> Self { + TimeoutConnector { + connector: connector, + handle: handle.clone(), + connect_timeout: timeout, + } + } +} + +impl Service for TimeoutConnector + where T: Service + 'static, + T::Response: AsyncRead + AsyncWrite, + T::Future: Future, +{ + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type Future = Box>; + + fn call(&self, req: Self::Request) -> Self::Future { + let connecting = self.connector.call(req); + let timeout = Timeout::new(self.connect_timeout, &self.handle).unwrap(); + + Box::new(connecting.select2(timeout).then(|res| { + match res { + Ok(Either::A((connecting, _))) => Ok(connecting), + Ok(Either::B((_, _))) => { + Err(io::Error::new( + io::ErrorKind::TimedOut, + "Client timed out while connecting" + )) + } + Err(Either::A((e, _))) => Err(e), + Err(Either::B((e, _))) => Err(e), + } + })) + } +} + #[cfg(test)] mod tests { use std::io; + use std::time::Duration; use tokio::reactor::Core; - use super::{Connect, HttpConnector}; + use super::{Connect, HttpConnector, TimeoutConnector}; #[test] fn test_errors_missing_authority() { @@ -275,4 +343,18 @@ mod tests { assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput); } + + #[test] + fn test_timeout_connector() { + let mut core = Core::new().unwrap(); + // 10.255.255.1 is a not a routable IP address + let url = "http://10.255.255.1".parse().unwrap(); + let connector = TimeoutConnector::with_connector( + HttpConnector::new(1, &core.handle()), + &core.handle(), + Duration::from_millis(1) + ); + + assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::TimedOut); + } } diff --git a/src/client/mod.rs b/src/client/mod.rs index cf27367f7d..dd8ae8b153 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -16,6 +16,7 @@ use tokio_proto::streaming::Message; use tokio_proto::streaming::pipeline::ClientProto; use tokio_proto::util::client_proxy::ClientProxy; pub use tokio_service::Service; +use tokio_io_timeout::TimeoutStream; use header::{Headers, Host}; use http::{self, TokioBody}; @@ -27,7 +28,7 @@ use uri::{self, Uri}; pub use http::response::Response; pub use http::request::Request; -pub use self::connect::{HttpConnector, Connect}; +pub use self::connect::{HttpConnector, Connect, TimeoutConnector}; mod connect; mod dns; @@ -39,6 +40,9 @@ pub struct Client { connector: C, handle: Handle, pool: Pool>, + //connect_timeout: Option, + read_timeout: Option, + write_timeout: Option, } impl Client { @@ -87,6 +91,9 @@ impl Client { connector: config.connector, handle: handle.clone(), pool: Pool::new(config.keep_alive, config.keep_alive_timeout), + //connect_timeout: config.connect_timeout, + read_timeout: config.read_timeout, + write_timeout: config.write_timeout, } } } @@ -162,8 +169,13 @@ where C: Connect, let handle = self.handle.clone(); let pool = self.pool.clone(); let pool_key = Rc::new(domain.to_string()); + let read_timeout = self.read_timeout.clone(); + let write_timeout = self.write_timeout.clone(); self.connector.connect(url) .map(move |io| { + let mut io = TimeoutStream::new(io, &handle); + io.set_read_timeout(read_timeout); + io.set_write_timeout(write_timeout); let (tx, rx) = oneshot::channel(); let client = HttpClient { client_rx: RefCell::new(Some(rx)), @@ -209,6 +221,9 @@ impl Clone for Client { connector: self.connector.clone(), handle: self.handle.clone(), pool: self.pool.clone(), + //connect_timeout: self.connect_timeout.clone(), + read_timeout: self.read_timeout.clone(), + write_timeout: self.write_timeout.clone(), } } } @@ -273,7 +288,9 @@ where T: AsyncRead + AsyncWrite + 'static, /// Configuration for a Client pub struct Config { _body_type: PhantomData, - //connect_timeout: Duration, + //connect_timeout: Option, + read_timeout: Option, + write_timeout: Option, connector: C, keep_alive: bool, keep_alive_timeout: Option, @@ -289,7 +306,9 @@ impl Default for Config { fn default() -> Config { Config { _body_type: PhantomData::, - //connect_timeout: Duration::from_secs(10), + //connect_timeout: Some(Duration::from_secs(10)), + read_timeout: Some(Duration::from_secs(10)), + write_timeout: Some(Duration::from_secs(10)), connector: UseDefaultConnector(()), keep_alive: true, keep_alive_timeout: Some(Duration::from_secs(90)), @@ -312,11 +331,13 @@ impl Config { pub fn body(self) -> Config { Config { _body_type: PhantomData::, - //connect_timeout: self.connect_timeout, connector: self.connector, keep_alive: self.keep_alive, keep_alive_timeout: self.keep_alive_timeout, max_idle: self.max_idle, + //connect_timeout: self.connect_timeout, + read_timeout: self.read_timeout, + write_timeout: self.write_timeout, } } @@ -325,11 +346,13 @@ impl Config { pub fn connector(self, val: CC) -> Config { Config { _body_type: self._body_type, - //connect_timeout: self.connect_timeout, connector: val, keep_alive: self.keep_alive, keep_alive_timeout: self.keep_alive_timeout, max_idle: self.max_idle, + //connect_timeout: self.connect_timeout, + read_timeout: self.read_timeout, + write_timeout: self.write_timeout, } } @@ -358,11 +381,29 @@ impl Config { /// /// Default is 10 seconds. #[inline] - pub fn connect_timeout(mut self, val: Duration) -> Config { + pub fn connect_timeout(mut self, val: Option) -> Config { self.connect_timeout = val; self } */ + + /// Set the timeout for the response. + /// + /// Default is 10 seconds. + #[inline] + pub fn read_timeout(mut self, val: Option) -> Config { + self.read_timeout = val; + self + } + + /// Set the timeout for the request. + /// + /// Default is 10 seconds. + #[inline] + pub fn write_timeout(mut self, val: Option) -> Config { + self.write_timeout = val; + self + } } impl Config @@ -406,6 +447,9 @@ impl Clone for Config { keep_alive: self.keep_alive, keep_alive_timeout: self.keep_alive_timeout, max_idle: self.max_idle, + //connect_timeout: self.connect_timeout, + read_timeout: self.read_timeout, + write_timeout: self.write_timeout, } } } diff --git a/src/lib.rs b/src/lib.rs index 0955a4757b..4b9ab5f55d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ extern crate tokio_core as tokio; extern crate tokio_proto; extern crate tokio_service; extern crate unicase; +extern crate tokio_io_timeout; #[cfg(all(test, feature = "nightly"))] extern crate test; diff --git a/tests/client.rs b/tests/client.rs index e93426c87d..7c8cdc2d8f 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -4,6 +4,7 @@ extern crate futures; extern crate tokio_core; extern crate pretty_env_logger; +use std::error::Error; use std::io::{self, Read, Write}; use std::net::TcpListener; use std::thread; @@ -262,6 +263,54 @@ fn client_keep_alive() { core.run(res.join(rx).map(|r| r.0)).unwrap(); } +#[test] +fn client_read_timeout() { + let _ = pretty_env_logger::init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut core = Core::new().unwrap(); + let client = Client::configure().read_timeout(Some(Duration::from_millis(1))).build(&core.handle()); + + let (tx, rx) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + let delay = Duration::from_millis(2); + thread::sleep(delay); + let _ = tx.send(()); + }); + + let rx = rx.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let res = client.get(format!("http://{}/a", addr).parse().unwrap()); + + // tokio-proto currently swallows the read timeout error and returns "broken pipe" insetead + assert_eq!(core.run(res.join(rx).map(|r| r.0)).unwrap_err().description(), "broken pipe"); +} + +#[test] +fn client_write_timeout() { + let _ = pretty_env_logger::init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut core = Core::new().unwrap(); + let client = Client::configure().write_timeout(Some(Duration::from_millis(1))).build(&core.handle()); + + let (tx, rx) = oneshot::channel(); + thread::spawn(move || { + let _sock = server.accept().unwrap().0; + let delay = Duration::from_millis(2); + thread::sleep(delay); + let _ = tx.send(()); + }); + + let rx = rx.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let res = client.get(format!("http://{}/a", addr).parse().unwrap()); + + // tokio-proto currently swallows the write timeout error and returns "broken pipe" insetead + assert_eq!(core.run(res.join(rx).map(|r| r.0)).unwrap_err().description(), "broken pipe"); +} + /* TODO: re-enable once retry works, its currently a flaky test #[test]