Skip to content
This repository has been archived by the owner on Oct 12, 2022. It is now read-only.

Support timeouts with backend #122

Merged
merged 1 commit into from
Jul 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ env_logger = "0.3.1"
futures = "0.1.11"
hyper = "0.11.0"
hyper-tls = "0.1.1"
hyper-timeout = "0.1"
native-tls = "0.1"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-service = "0.1.0"
Expand Down
37 changes: 33 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#[derive(Default)]
use std::time::Duration;

#[derive(Debug, Default, Clone)]
pub struct Config {
pub health_check: HealthCheck,
pub timeout: Timeout,
}

#[derive(Debug, Clone)]
pub struct HealthCheck {
/// The time (in seconds) between two consecutive health checks
pub interval: u64,
pub interval: Duration,

/// The URI path to health check
pub uri_path: String,
Expand All @@ -20,17 +24,42 @@ pub struct HealthCheck {
impl Default for HealthCheck {
fn default() -> HealthCheck {
HealthCheck {
interval: 10,
interval: Duration::from_secs(10),
uri_path: "/".to_string(),
failures: 3,
passes: 2,
}
}
}

#[derive(Debug, Clone)]
pub struct Timeout {
/// Amount of time to wait connecting
pub connect: Option<Duration>,

/// Amount of time to wait writing request
pub write: Option<Duration>,

/// Amount of time to wait reading response
pub read: Option<Duration>,
}

impl Default for Timeout {
fn default() -> Timeout {
Timeout {
connect: Some(Duration::from_millis(200)),
write: Some(Duration::from_secs(2)),
read: Some(Duration::from_secs(2)),
}
}
}

#[test]
fn test_config() {
let conf = Config::default();
assert_eq!(10, conf.health_check.interval);
assert_eq!(Duration::from_secs(10), conf.health_check.interval);
assert_eq!("/", conf.health_check.uri_path);
assert_eq!(Some(Duration::from_millis(200)), conf.timeout.connect);
assert_eq!(Some(Duration::from_secs(2)), conf.timeout.write);
assert_eq!(Some(Duration::from_secs(2)), conf.timeout.read);
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ extern crate env_logger;
#[macro_use]
extern crate hyper;
extern crate hyper_tls;
extern crate hyper_timeout;
extern crate native_tls;
extern crate serde;
extern crate serde_json;
#[macro_use]
Expand Down
8 changes: 4 additions & 4 deletions src/mgmt/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl BackendHealth {
}
}

pub fn run(pool: Pool, handle: &Handle, conf: &Config, manager: Manager, health: BackendHealth) {
pub fn run(pool: Pool, handle: &Handle, config: &Config, manager: Manager, health: BackendHealth) {
let client = Client::configure()
.connector(HttpsConnector::new(4, &handle).unwrap())
.build(&handle);
Expand All @@ -118,7 +118,7 @@ pub fn run(pool: Pool, handle: &Handle, conf: &Config, manager: Manager, health:
let handle1 = handle1.clone();
let server = backend.server();
let health = health.clone();
let url = format!("{}{}", server.url(), &conf.health_check.uri_path);
let url = format!("{}{}", server.url(), &config.health_check.uri_path);
let url = match Uri::from_str(&url) {
Ok(url) => url,
Err(e) => {
Expand All @@ -133,8 +133,8 @@ pub fn run(pool: Pool, handle: &Handle, conf: &Config, manager: Manager, health:
}
};

let allowed_failures = conf.health_check.failures;
let allowed_successes = conf.health_check.passes;
let allowed_failures = config.health_check.failures;
let allowed_successes = config.health_check.passes;
debug!("Health check {:?}", url);
let req = client.get(url).then(move |res| match res {
Ok(res) => {
Expand Down
9 changes: 4 additions & 5 deletions src/mgmt/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::io;
use std::net::SocketAddr;
use std::time::Duration;

use futures::Stream;
use futures::stream::MergedItem;
Expand All @@ -26,14 +25,14 @@ pub fn run(
pool: Pool,
mut core: Core,
manager: Manager,
conf: &Config,
config: &Config,
health: BackendHealth,
) -> io::Result<()> {
let handle = core.handle();
let listener = TcpListener::bind(&sock, &handle)?;
let timer = Timer::default();
let health_timer = timer
.interval(Duration::from_secs(conf.health_check.interval))
.interval(config.health_check.interval)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));

let admin_addr = listener.local_addr()?;
Expand All @@ -47,12 +46,12 @@ pub fn run(
mgmt(socket, addr, pool.clone(), &handle, manager.clone());
}
MergedItem::Second(()) => {
health::run(pool.clone(), &handle, &conf, manager.clone(), health.clone());
health::run(pool.clone(), &handle, &config, manager.clone(), health.clone());
}
MergedItem::Both((socket, addr), ()) => {
mgmt(socket, addr, pool.clone(), &handle, manager.clone());
info!("health check");
health::run(pool.clone(), &handle, &conf, manager.clone(), health.clone());
health::run(pool.clone(), &handle, &config, manager.clone(), health.clone());
}
}

Expand Down
59 changes: 18 additions & 41 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ use std::io;
use std::net::SocketAddr;
use std::str::{self, FromStr};

use net2::TcpBuilder;
use net2::unix::UnixTcpBuilderExt;
use futures::{future, Future, Stream};
use tokio_core::reactor::{Core, Handle};
use futures::{Future, Stream};
use tokio_core::reactor::Handle;
use tokio_core::net::{TcpListener, TcpStream};
use hyper::{self, Headers, Body, Client, HttpVersion};
use hyper::client::{self, HttpConnector, Service};
use hyper::header;
use hyper::server::{self, Http};
use hyper_tls::HttpsConnector;
use hyper::Uri;
use hyper_timeout::TimeoutConnector;

use pool::Pool;
use config::Config;

// testing here before sending PR upstream
// TODO make this typed
Expand Down Expand Up @@ -142,7 +142,7 @@ fn map_response(res: client::Response) -> server::Response {
}

struct Proxy {
client: Client<HttpsConnector<HttpConnector>, Body>,
client: Client<TimeoutConnector<HttpsConnector<HttpConnector>>, Body>,
pool: Pool,
}

Expand Down Expand Up @@ -198,56 +198,33 @@ impl Service for Proxy {
}
}

/// Run server with default Core
pub fn run(addr: SocketAddr, pool: Pool, core: Core) -> io::Result<()> {
let handle = core.handle();

let listener = TcpBuilder::new_v4()?;
listener.reuse_address(true)?;
listener.reuse_port(true)?;
let listener = listener.bind(&addr)?;
let listener = listener.listen(128)?;
let listener = TcpListener::from_listener(listener, &addr, &handle)?;

run_with(core, listener, pool, future::empty())
}

/// Run server with specified Core, TcpListener, Pool
///
/// This is useful for integration testing where the port is set to 0 and the test code needs to
/// determine the local addr.
pub fn run_with<F>(
mut core: Core,
listener: TcpListener,
pool: Pool,
shutdown_signal: F,
) -> io::Result<()>
where
F: Future<Item = (), Error = hyper::Error>,
pub fn serve(listener: TcpListener, pool: Pool, handle: &Handle, config: &Config) -> io::Result<Box<Future<Item = (), Error = io::Error>>>
{
let handle = core.handle();

let handle = handle.clone();
let config = config.clone();
let local_addr = listener.local_addr()?;
info!("Listening on http://{}", &local_addr);
let srv = listener.incoming().for_each(move |(socket, addr)| {
proxy(socket, addr, pool.clone(), &handle);
proxy(socket, addr, pool.clone(), &handle, &config);

Ok(())
});

info!("Listening on http://{}", &local_addr);
match core.run(shutdown_signal.select(srv.map_err(|e| e.into()))) {
Ok(((), _incoming)) => Ok(()),
Err((e, _other)) => return Err(io::Error::new(io::ErrorKind::Other, e)),
}
return Ok(Box::new(srv));
}

fn proxy(socket: TcpStream, addr: SocketAddr, pool: Pool, handle: &Handle) {
fn proxy(socket: TcpStream, addr: SocketAddr, pool: Pool, handle: &Handle, config: &Config) {

// disable Nagle's algo
// https://github.com/hyperium/hyper/issues/944
socket.set_nodelay(true).unwrap();
let connector = HttpsConnector::new(4, handle).unwrap();
let mut tm = TimeoutConnector::new(connector, &handle);
tm.set_connect_timeout(config.timeout.connect);
tm.set_read_timeout(config.timeout.read);
tm.set_write_timeout(config.timeout.write);
let client = Client::configure()
.connector(HttpsConnector::new(4, handle).unwrap())
.connector(tm)
.build(&handle);
let service = Proxy {
client: client,
Expand Down
29 changes: 24 additions & 5 deletions src/weldr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ extern crate hyper;
extern crate weldr;
extern crate clap;
extern crate tokio_core;
extern crate net2;

use std::io;
use std::net::SocketAddr;

use clap::{Arg, App, SubCommand};
use net2::TcpBuilder;
use net2::unix::UnixTcpBuilderExt;

use tokio_core::reactor::Core;
use tokio_core::reactor::{Core, Handle};
use tokio_core::net::TcpListener;

use weldr::pool::Pool;
use weldr::config::Config;
Expand Down Expand Up @@ -49,7 +54,7 @@ fn main() {
.get_matches();


let core = Core::new().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();

// TODO make this dynamic and pass it down to workers
Expand All @@ -62,15 +67,18 @@ fn main() {
let ip = ip.parse::<SocketAddr>().unwrap();

let pool = Pool::default();
let config = Config::default();

if let Some(matches) = matches.subcommand_matches("worker") {
let id = matches.value_of("id").unwrap();
debug!("Spawned worker {}", id);
let _result = worker::subscribe(internal_addr, handle, pool.clone());

weldr::proxy::run(ip, pool, core).expect("Failed to start server");
let listener = setup_listener(ip, &core.handle()).expect("Failed to setup listener");
//weldr::proxy::run(ip, pool, core).expect("Failed to start server");
let srv = weldr::proxy::serve(listener, pool, &core.handle(), &config).expect("Failed to create server future");
core.run(srv).expect("Server failed");
} else {
let conf = Config::default();
let mut manager = manager::Manager::new();
manager.listen(internal_addr, handle.clone());
manager.start_workers(5).expect("Failed to start manager");
Expand All @@ -79,7 +87,18 @@ fn main() {

let admin_ip = matches.value_of("worker").unwrap_or("0.0.0.0:8687");
let admin_ip = admin_ip.parse::<SocketAddr>().unwrap();
weldr::mgmt::run(admin_ip, pool, core, manager.clone(), &conf, health.clone())
weldr::mgmt::run(admin_ip, pool, core, manager.clone(), &config, health.clone())
.expect("Failed to start server");
}
}

fn setup_listener(addr: SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
let listener = TcpBuilder::new_v4()?;
listener.reuse_address(true)?;
listener.reuse_port(true)?;
let listener = listener.bind(&addr)?;
let listener = listener.listen(128)?;
let listener = TcpListener::from_listener(listener, &addr, &handle)?;

Ok(listener)
}
Loading