Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): add connect, read, and write timeouts to client #1255

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tokio-core = "0.1.6"
tokio-proto = "0.1"
tokio-service = "0.1"
tokio-io = "0.1"
tokio-io-timeout = "0.1.0"
unicase = "2.0"

[dev-dependencies]
Expand Down
86 changes: 84 additions & 2 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ use std::fmt;
use std::io;
use std::mem;
//use std::net::SocketAddr;
use std::time::Duration;

use futures::{Future, Poll, Async};
use futures::future::Either;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio::reactor::{Handle, Timeout};
use tokio::net::{TcpStream, TcpStreamNew};
use tokio_service::Service;
use Uri;
Expand Down Expand Up @@ -242,11 +244,77 @@ impl<S: SslClient> HttpsConnector<S> {
}
*/

/// A connector that enforces as connection timeout
#[derive(Debug)]
pub struct TimeoutConnector<T> {
/// A connector implementing the `Connect` trait
connector: T,
/// Handle to be used to set the timeout within tokio's core
handle: Handle,
/// Amount of time to wait connecting
connect_timeout: Duration,
}

impl TimeoutConnector<HttpConnector> {
/// Construct a new TimeoutConnector
///
/// Takes number of DNS worker threads
///
/// This uses hyper's default `HttpConnector`. If you wish to use something besides the defaults,
/// create the connector and then use `TimeoutConnector::with_connector`.
pub fn new(threads: usize, handle: &Handle, timeout: Duration) -> Self {
let http = HttpConnector::new(threads, handle);
TimeoutConnector::with_connector(http, handle, timeout)
}
}

impl<T: Connect> TimeoutConnector<T> {
/// Construct a new TimeoutConnector with a given connector implementing the `Connect` trait
pub fn with_connector(connector: T, handle: &Handle, timeout: Duration) -> Self {
TimeoutConnector {
connector: connector,
handle: handle.clone(),
connect_timeout: timeout,
}
}
}

impl<T> Service for TimeoutConnector<T>
where T: Service<Error=io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error=io::Error>,
{
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;

fn call(&self, req: Self::Request) -> Self::Future {
let connecting = self.connector.call(req);
let timeout = Timeout::new(self.connect_timeout, &self.handle).unwrap();

Box::new(connecting.select2(timeout).then(|res| {
match res {
Ok(Either::A((connecting, _))) => Ok(connecting),
Ok(Either::B((_, _))) => {
Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client timed out while connecting"
))
}
Err(Either::A((e, _))) => Err(e),
Err(Either::B((e, _))) => Err(e),
}
}))
}
}

#[cfg(test)]
mod tests {
use std::io;
use std::time::Duration;
use tokio::reactor::Core;
use super::{Connect, HttpConnector};
use super::{Connect, HttpConnector, TimeoutConnector};

#[test]
fn test_errors_missing_authority() {
Expand Down Expand Up @@ -275,4 +343,18 @@ mod tests {

assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}

#[test]
fn test_timeout_connector() {
let mut core = Core::new().unwrap();
// 10.255.255.1 is a not a routable IP address
let url = "http://10.255.255.1".parse().unwrap();
let connector = TimeoutConnector::with_connector(
HttpConnector::new(1, &core.handle()),
&core.handle(),
Duration::from_millis(1)
);

assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::TimedOut);
}
}
56 changes: 50 additions & 6 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio_proto::streaming::Message;
use tokio_proto::streaming::pipeline::ClientProto;
use tokio_proto::util::client_proxy::ClientProxy;
pub use tokio_service::Service;
use tokio_io_timeout::TimeoutStream;

use header::{Headers, Host};
use http::{self, TokioBody};
Expand All @@ -27,7 +28,7 @@ use uri::{self, Uri};

pub use http::response::Response;
pub use http::request::Request;
pub use self::connect::{HttpConnector, Connect};
pub use self::connect::{HttpConnector, Connect, TimeoutConnector};

mod connect;
mod dns;
Expand All @@ -39,6 +40,9 @@ pub struct Client<C, B = http::Body> {
connector: C,
handle: Handle,
pool: Pool<TokioClient<B>>,
//connect_timeout: Option<Duration>,
read_timeout: Option<Duration>,
write_timeout: Option<Duration>,
}

impl Client<HttpConnector, http::Body> {
Expand Down Expand Up @@ -87,6 +91,9 @@ impl<C, B> Client<C, B> {
connector: config.connector,
handle: handle.clone(),
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
//connect_timeout: config.connect_timeout,
read_timeout: config.read_timeout,
write_timeout: config.write_timeout,
}
}
}
Expand Down Expand Up @@ -162,8 +169,13 @@ where C: Connect,
let handle = self.handle.clone();
let pool = self.pool.clone();
let pool_key = Rc::new(domain.to_string());
let read_timeout = self.read_timeout.clone();
let write_timeout = self.write_timeout.clone();
self.connector.connect(url)
.map(move |io| {
let mut io = TimeoutStream::new(io, &handle);
io.set_read_timeout(read_timeout);
io.set_write_timeout(write_timeout);
let (tx, rx) = oneshot::channel();
let client = HttpClient {
client_rx: RefCell::new(Some(rx)),
Expand Down Expand Up @@ -209,6 +221,9 @@ impl<C: Clone, B> Clone for Client<C, B> {
connector: self.connector.clone(),
handle: self.handle.clone(),
pool: self.pool.clone(),
//connect_timeout: self.connect_timeout.clone(),
read_timeout: self.read_timeout.clone(),
write_timeout: self.write_timeout.clone(),
}
}
}
Expand Down Expand Up @@ -273,7 +288,9 @@ where T: AsyncRead + AsyncWrite + 'static,
/// Configuration for a Client
pub struct Config<C, B> {
_body_type: PhantomData<B>,
//connect_timeout: Duration,
//connect_timeout: Option<Duration>,
read_timeout: Option<Duration>,
write_timeout: Option<Duration>,
connector: C,
keep_alive: bool,
keep_alive_timeout: Option<Duration>,
Expand All @@ -289,7 +306,9 @@ impl Default for Config<UseDefaultConnector, http::Body> {
fn default() -> Config<UseDefaultConnector, http::Body> {
Config {
_body_type: PhantomData::<http::Body>,
//connect_timeout: Duration::from_secs(10),
//connect_timeout: Some(Duration::from_secs(10)),
read_timeout: Some(Duration::from_secs(10)),
write_timeout: Some(Duration::from_secs(10)),
connector: UseDefaultConnector(()),
keep_alive: true,
keep_alive_timeout: Some(Duration::from_secs(90)),
Expand All @@ -312,11 +331,13 @@ impl<C, B> Config<C, B> {
pub fn body<BB>(self) -> Config<C, BB> {
Config {
_body_type: PhantomData::<BB>,
//connect_timeout: self.connect_timeout,
connector: self.connector,
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
//connect_timeout: self.connect_timeout,
read_timeout: self.read_timeout,
write_timeout: self.write_timeout,
}
}

Expand All @@ -325,11 +346,13 @@ impl<C, B> Config<C, B> {
pub fn connector<CC>(self, val: CC) -> Config<CC, B> {
Config {
_body_type: self._body_type,
//connect_timeout: self.connect_timeout,
connector: val,
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
//connect_timeout: self.connect_timeout,
read_timeout: self.read_timeout,
write_timeout: self.write_timeout,
}
}

Expand Down Expand Up @@ -358,11 +381,29 @@ impl<C, B> Config<C, B> {
///
/// Default is 10 seconds.
#[inline]
pub fn connect_timeout(mut self, val: Duration) -> Config<C, B> {
pub fn connect_timeout(mut self, val: Option<Duration>) -> Config<C, B> {
self.connect_timeout = val;
self
}
*/

/// Set the timeout for the response.
///
/// Default is 10 seconds.
#[inline]
pub fn read_timeout(mut self, val: Option<Duration>) -> Config<C, B> {
self.read_timeout = val;
self
}

/// Set the timeout for the request.
///
/// Default is 10 seconds.
#[inline]
pub fn write_timeout(mut self, val: Option<Duration>) -> Config<C, B> {
self.write_timeout = val;
self
}
}

impl<C, B> Config<C, B>
Expand Down Expand Up @@ -406,6 +447,9 @@ impl<C: Clone, B> Clone for Config<C, B> {
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
//connect_timeout: self.connect_timeout,
read_timeout: self.read_timeout,
write_timeout: self.write_timeout,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extern crate tokio_core as tokio;
extern crate tokio_proto;
extern crate tokio_service;
extern crate unicase;
extern crate tokio_io_timeout;

#[cfg(all(test, feature = "nightly"))]
extern crate test;
Expand Down
49 changes: 49 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extern crate futures;
extern crate tokio_core;
extern crate pretty_env_logger;

use std::error::Error;
use std::io::{self, Read, Write};
use std::net::TcpListener;
use std::thread;
Expand Down Expand Up @@ -262,6 +263,54 @@ fn client_keep_alive() {
core.run(res.join(rx).map(|r| r.0)).unwrap();
}

#[test]
fn client_read_timeout() {
let _ = pretty_env_logger::init();
let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let mut core = Core::new().unwrap();
let client = Client::configure().read_timeout(Some(Duration::from_millis(1))).build(&core.handle());

let (tx, rx) = oneshot::channel();
thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
let delay = Duration::from_millis(2);
thread::sleep(delay);
let _ = tx.send(());
});

let rx = rx.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
let res = client.get(format!("http://{}/a", addr).parse().unwrap());

// tokio-proto currently swallows the read timeout error and returns "broken pipe" insetead
assert_eq!(core.run(res.join(rx).map(|r| r.0)).unwrap_err().description(), "broken pipe");
}

#[test]
fn client_write_timeout() {
let _ = pretty_env_logger::init();
let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let mut core = Core::new().unwrap();
let client = Client::configure().write_timeout(Some(Duration::from_millis(1))).build(&core.handle());

let (tx, rx) = oneshot::channel();
thread::spawn(move || {
let _sock = server.accept().unwrap().0;
let delay = Duration::from_millis(2);
thread::sleep(delay);
let _ = tx.send(());
});

let rx = rx.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
let res = client.get(format!("http://{}/a", addr).parse().unwrap());

// tokio-proto currently swallows the write timeout error and returns "broken pipe" insetead
assert_eq!(core.run(res.join(rx).map(|r| r.0)).unwrap_err().description(), "broken pipe");
}


/* TODO: re-enable once retry works, its currently a flaky test
#[test]
Expand Down