From bf42f0ed990ee1b518fc1398f510ec20c65b3059 Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Thu, 6 Jul 2017 08:57:55 -0700 Subject: [PATCH] Support timeouts with backend - hyper-timeout allows connect, read and write timeouts - updated Config with timeout values - refactor the way the worker is started Fixes #18 --- Cargo.lock | 29 ++++++++++++++++++++++ Cargo.toml | 2 ++ src/config.rs | 37 +++++++++++++++++++++++++--- src/lib.rs | 2 ++ src/mgmt/health.rs | 8 +++--- src/mgmt/mod.rs | 9 +++---- src/proxy.rs | 59 ++++++++++++++------------------------------- src/weldr.rs | 29 ++++++++++++++++++---- tests/test-proxy.rs | 11 ++++++--- 9 files changed, 124 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 95f521c..f4fc32d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,9 +9,11 @@ dependencies = [ "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper-timeout 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "hyper-tls 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "native-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -238,6 +240,19 @@ dependencies = [ "unicase 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hyper-timeout" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io-timeout 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hyper-tls" version = "0.1.1" @@ -677,6 +692,18 @@ dependencies = [ "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-io-timeout" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio-proto" version = "0.1.1" @@ -806,6 +833,7 @@ dependencies = [ "checksum gcc 0.3.51 (registry+https://github.com/rust-lang/crates.io-index)" = "120d07f202dcc3f72859422563522b66fe6463a4c513df062874daad05f85f0a" "checksum httparse 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "af2f2dd97457e8fb1ae7c5a420db346af389926e36f43768b96f101546b04a07" "checksum hyper 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b8590f308416a428dca05ca67020283105344e94059fd2f02cc72e9c913c30fb" +"checksum hyper-timeout 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "11be8b6384dacc687ba18f2bc793130bc5f5879e00a83c2253dd2916513cda88" "checksum hyper-tls 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "faea5efeea2b1cd9596a5d2792ff13de558cc2e578f32310b3ad2e8c35c349f3" "checksum iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "29d062ee61fccdf25be172e70f34c9f6efc597e1fb8f6526e8437b2046ab26be" "checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c" @@ -859,6 +887,7 @@ dependencies = [ "checksum time 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)" = "ffd7ccbf969a892bf83f1e441126968a07a3941c24ff522a26af9f9f4585d1a3" "checksum tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "6a20ba4738d283cac7495ca36e045c80c2a8df3e05dd0909b17a06646af5a7ed" "checksum tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c2c3ce9739f7387a0fa65b5421e81feae92e04d603f008898f4257790ce8c2db" +"checksum tokio-io-timeout 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c23bb67562a1fafa11947d444fdab978b949961ab656e2b13f7c9d531850bd6f" "checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389" "checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" "checksum tokio-timer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "86f33def658c14724fc13ec6289b3875a8152ee8ae767a5b1ccbded363b03db8" diff --git a/Cargo.toml b/Cargo.toml index 07cbeef..91025ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,8 @@ env_logger = "0.3.1" futures = "0.1.11" hyper = "0.11.0" hyper-tls = "0.1.1" +hyper-timeout = "0.1" +native-tls = "0.1" tokio-core = "0.1" tokio-io = "0.1" tokio-service = "0.1.0" diff --git a/src/config.rs b/src/config.rs index 982fd72..ece3f16 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,11 +1,15 @@ -#[derive(Default)] +use std::time::Duration; + +#[derive(Debug, Default, Clone)] pub struct Config { pub health_check: HealthCheck, + pub timeout: Timeout, } +#[derive(Debug, Clone)] pub struct HealthCheck { /// The time (in seconds) between two consecutive health checks - pub interval: u64, + pub interval: Duration, /// The URI path to health check pub uri_path: String, @@ -20,7 +24,7 @@ pub struct HealthCheck { impl Default for HealthCheck { fn default() -> HealthCheck { HealthCheck { - interval: 10, + interval: Duration::from_secs(10), uri_path: "/".to_string(), failures: 3, passes: 2, @@ -28,9 +32,34 @@ impl Default for HealthCheck { } } +#[derive(Debug, Clone)] +pub struct Timeout { + /// Amount of time to wait connecting + pub connect: Option, + + /// Amount of time to wait writing request + pub write: Option, + + /// Amount of time to wait reading response + pub read: Option, +} + +impl Default for Timeout { + fn default() -> Timeout { + Timeout { + connect: Some(Duration::from_millis(200)), + write: Some(Duration::from_secs(2)), + read: Some(Duration::from_secs(2)), + } + } +} + #[test] fn test_config() { let conf = Config::default(); - assert_eq!(10, conf.health_check.interval); + assert_eq!(Duration::from_secs(10), conf.health_check.interval); assert_eq!("/", conf.health_check.uri_path); + assert_eq!(Some(Duration::from_millis(200)), conf.timeout.connect); + assert_eq!(Some(Duration::from_secs(2)), conf.timeout.write); + assert_eq!(Some(Duration::from_secs(2)), conf.timeout.read); } diff --git a/src/lib.rs b/src/lib.rs index 36dc15d..abfc997 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,8 @@ extern crate env_logger; #[macro_use] extern crate hyper; extern crate hyper_tls; +extern crate hyper_timeout; +extern crate native_tls; extern crate serde; extern crate serde_json; #[macro_use] diff --git a/src/mgmt/health.rs b/src/mgmt/health.rs index 51fb0f2..794485c 100644 --- a/src/mgmt/health.rs +++ b/src/mgmt/health.rs @@ -106,7 +106,7 @@ impl BackendHealth { } } -pub fn run(pool: Pool, handle: &Handle, conf: &Config, manager: Manager, health: BackendHealth) { +pub fn run(pool: Pool, handle: &Handle, config: &Config, manager: Manager, health: BackendHealth) { let client = Client::configure() .connector(HttpsConnector::new(4, &handle).unwrap()) .build(&handle); @@ -118,7 +118,7 @@ pub fn run(pool: Pool, handle: &Handle, conf: &Config, manager: Manager, health: let handle1 = handle1.clone(); let server = backend.server(); let health = health.clone(); - let url = format!("{}{}", server.url(), &conf.health_check.uri_path); + let url = format!("{}{}", server.url(), &config.health_check.uri_path); let url = match Uri::from_str(&url) { Ok(url) => url, Err(e) => { @@ -133,8 +133,8 @@ pub fn run(pool: Pool, handle: &Handle, conf: &Config, manager: Manager, health: } }; - let allowed_failures = conf.health_check.failures; - let allowed_successes = conf.health_check.passes; + let allowed_failures = config.health_check.failures; + let allowed_successes = config.health_check.passes; debug!("Health check {:?}", url); let req = client.get(url).then(move |res| match res { Ok(res) => { diff --git a/src/mgmt/mod.rs b/src/mgmt/mod.rs index 859db0a..f546a36 100644 --- a/src/mgmt/mod.rs +++ b/src/mgmt/mod.rs @@ -1,6 +1,5 @@ use std::io; use std::net::SocketAddr; -use std::time::Duration; use futures::Stream; use futures::stream::MergedItem; @@ -26,14 +25,14 @@ pub fn run( pool: Pool, mut core: Core, manager: Manager, - conf: &Config, + config: &Config, health: BackendHealth, ) -> io::Result<()> { let handle = core.handle(); let listener = TcpListener::bind(&sock, &handle)?; let timer = Timer::default(); let health_timer = timer - .interval(Duration::from_secs(conf.health_check.interval)) + .interval(config.health_check.interval) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)); let admin_addr = listener.local_addr()?; @@ -47,12 +46,12 @@ pub fn run( mgmt(socket, addr, pool.clone(), &handle, manager.clone()); } MergedItem::Second(()) => { - health::run(pool.clone(), &handle, &conf, manager.clone(), health.clone()); + health::run(pool.clone(), &handle, &config, manager.clone(), health.clone()); } MergedItem::Both((socket, addr), ()) => { mgmt(socket, addr, pool.clone(), &handle, manager.clone()); info!("health check"); - health::run(pool.clone(), &handle, &conf, manager.clone(), health.clone()); + health::run(pool.clone(), &handle, &config, manager.clone(), health.clone()); } } diff --git a/src/proxy.rs b/src/proxy.rs index 61dd425..773fd17 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -2,10 +2,8 @@ use std::io; use std::net::SocketAddr; use std::str::{self, FromStr}; -use net2::TcpBuilder; -use net2::unix::UnixTcpBuilderExt; -use futures::{future, Future, Stream}; -use tokio_core::reactor::{Core, Handle}; +use futures::{Future, Stream}; +use tokio_core::reactor::Handle; use tokio_core::net::{TcpListener, TcpStream}; use hyper::{self, Headers, Body, Client, HttpVersion}; use hyper::client::{self, HttpConnector, Service}; @@ -13,8 +11,10 @@ use hyper::header; use hyper::server::{self, Http}; use hyper_tls::HttpsConnector; use hyper::Uri; +use hyper_timeout::TimeoutConnector; use pool::Pool; +use config::Config; // testing here before sending PR upstream // TODO make this typed @@ -142,7 +142,7 @@ fn map_response(res: client::Response) -> server::Response { } struct Proxy { - client: Client, Body>, + client: Client>, Body>, pool: Pool, } @@ -198,56 +198,33 @@ impl Service for Proxy { } } -/// Run server with default Core -pub fn run(addr: SocketAddr, pool: Pool, core: Core) -> io::Result<()> { - let handle = core.handle(); - - let listener = TcpBuilder::new_v4()?; - listener.reuse_address(true)?; - listener.reuse_port(true)?; - let listener = listener.bind(&addr)?; - let listener = listener.listen(128)?; - let listener = TcpListener::from_listener(listener, &addr, &handle)?; - - run_with(core, listener, pool, future::empty()) -} - -/// Run server with specified Core, TcpListener, Pool -/// -/// This is useful for integration testing where the port is set to 0 and the test code needs to -/// determine the local addr. -pub fn run_with( - mut core: Core, - listener: TcpListener, - pool: Pool, - shutdown_signal: F, -) -> io::Result<()> -where - F: Future, +pub fn serve(listener: TcpListener, pool: Pool, handle: &Handle, config: &Config) -> io::Result>> { - let handle = core.handle(); - + let handle = handle.clone(); + let config = config.clone(); let local_addr = listener.local_addr()?; + info!("Listening on http://{}", &local_addr); let srv = listener.incoming().for_each(move |(socket, addr)| { - proxy(socket, addr, pool.clone(), &handle); + proxy(socket, addr, pool.clone(), &handle, &config); Ok(()) }); - info!("Listening on http://{}", &local_addr); - match core.run(shutdown_signal.select(srv.map_err(|e| e.into()))) { - Ok(((), _incoming)) => Ok(()), - Err((e, _other)) => return Err(io::Error::new(io::ErrorKind::Other, e)), - } + return Ok(Box::new(srv)); } -fn proxy(socket: TcpStream, addr: SocketAddr, pool: Pool, handle: &Handle) { +fn proxy(socket: TcpStream, addr: SocketAddr, pool: Pool, handle: &Handle, config: &Config) { // disable Nagle's algo // https://github.com/hyperium/hyper/issues/944 socket.set_nodelay(true).unwrap(); + let connector = HttpsConnector::new(4, handle).unwrap(); + let mut tm = TimeoutConnector::new(connector, &handle); + tm.set_connect_timeout(config.timeout.connect); + tm.set_read_timeout(config.timeout.read); + tm.set_write_timeout(config.timeout.write); let client = Client::configure() - .connector(HttpsConnector::new(4, handle).unwrap()) + .connector(tm) .build(&handle); let service = Proxy { client: client, diff --git a/src/weldr.rs b/src/weldr.rs index f6b47ee..10551d6 100644 --- a/src/weldr.rs +++ b/src/weldr.rs @@ -5,12 +5,17 @@ extern crate hyper; extern crate weldr; extern crate clap; extern crate tokio_core; +extern crate net2; +use std::io; use std::net::SocketAddr; use clap::{Arg, App, SubCommand}; +use net2::TcpBuilder; +use net2::unix::UnixTcpBuilderExt; -use tokio_core::reactor::Core; +use tokio_core::reactor::{Core, Handle}; +use tokio_core::net::TcpListener; use weldr::pool::Pool; use weldr::config::Config; @@ -49,7 +54,7 @@ fn main() { .get_matches(); - let core = Core::new().unwrap(); + let mut core = Core::new().unwrap(); let handle = core.handle(); // TODO make this dynamic and pass it down to workers @@ -62,15 +67,18 @@ fn main() { let ip = ip.parse::().unwrap(); let pool = Pool::default(); + let config = Config::default(); if let Some(matches) = matches.subcommand_matches("worker") { let id = matches.value_of("id").unwrap(); debug!("Spawned worker {}", id); let _result = worker::subscribe(internal_addr, handle, pool.clone()); - weldr::proxy::run(ip, pool, core).expect("Failed to start server"); + let listener = setup_listener(ip, &core.handle()).expect("Failed to setup listener"); + //weldr::proxy::run(ip, pool, core).expect("Failed to start server"); + let srv = weldr::proxy::serve(listener, pool, &core.handle(), &config).expect("Failed to create server future"); + core.run(srv).expect("Server failed"); } else { - let conf = Config::default(); let mut manager = manager::Manager::new(); manager.listen(internal_addr, handle.clone()); manager.start_workers(5).expect("Failed to start manager"); @@ -79,7 +87,18 @@ fn main() { let admin_ip = matches.value_of("worker").unwrap_or("0.0.0.0:8687"); let admin_ip = admin_ip.parse::().unwrap(); - weldr::mgmt::run(admin_ip, pool, core, manager.clone(), &conf, health.clone()) + weldr::mgmt::run(admin_ip, pool, core, manager.clone(), &config, health.clone()) .expect("Failed to start server"); } } + +fn setup_listener(addr: SocketAddr, handle: &Handle) -> io::Result { + let listener = TcpBuilder::new_v4()?; + listener.reuse_address(true)?; + listener.reuse_port(true)?; + let listener = listener.bind(&addr)?; + let listener = listener.listen(128)?; + let listener = TcpListener::from_listener(listener, &addr, &handle)?; + + Ok(listener) +} diff --git a/tests/test-proxy.rs b/tests/test-proxy.rs index e36d360..2cd678e 100644 --- a/tests/test-proxy.rs +++ b/tests/test-proxy.rs @@ -24,6 +24,7 @@ use hyper::header::{ContentLength, TransferEncoding}; use weldr::server::Server; use weldr::pool::Pool; +use weldr::config::Config; #[derive(Clone, Copy)] struct Origin; @@ -100,7 +101,7 @@ where let pool = Pool::default(); let addr = "127.0.0.1:0".parse::().unwrap(); - let core = Core::new().unwrap(); + let mut core = Core::new().unwrap(); let handle = core.handle(); let listener = TcpListener::bind(&addr, &handle).unwrap(); @@ -132,8 +133,12 @@ where ) }); - weldr::proxy::run_with(core, listener, pool.clone(), shutdown_signal) - .expect("Failed to start server"); + let config = Config::default(); + let srv = weldr::proxy::serve(listener, pool.clone(), &handle, &config).unwrap(); + match core.run(shutdown_signal.select(srv.map_err(|e| e.into()))) { + Ok(((), _incoming)) => {}, + Err((e, _other)) => panic!(e), + } } fn client_send_request(