Skip to content
This repository has been archived by the owner on Oct 12, 2022. It is now read-only.

Commit

Permalink
Merge pull request #122 from hjr3/issue-18
Browse files Browse the repository at this point in the history
Support timeouts with backend
  • Loading branch information
hjr3 authored Jul 15, 2017
2 parents de23b8f + bf42f0e commit 611053e
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 62 deletions.
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ env_logger = "0.3.1"
futures = "0.1.11"
hyper = "0.11.0"
hyper-tls = "0.1.1"
hyper-timeout = "0.1"
native-tls = "0.1"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-service = "0.1.0"
Expand Down
37 changes: 33 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#[derive(Default)]
use std::time::Duration;

#[derive(Debug, Default, Clone)]
pub struct Config {
pub health_check: HealthCheck,
pub timeout: Timeout,
}

#[derive(Debug, Clone)]
pub struct HealthCheck {
/// The time (in seconds) between two consecutive health checks
pub interval: u64,
pub interval: Duration,

/// The URI path to health check
pub uri_path: String,
Expand All @@ -20,17 +24,42 @@ pub struct HealthCheck {
impl Default for HealthCheck {
fn default() -> HealthCheck {
HealthCheck {
interval: 10,
interval: Duration::from_secs(10),
uri_path: "/".to_string(),
failures: 3,
passes: 2,
}
}
}

#[derive(Debug, Clone)]
pub struct Timeout {
/// Amount of time to wait connecting
pub connect: Option<Duration>,

/// Amount of time to wait writing request
pub write: Option<Duration>,

/// Amount of time to wait reading response
pub read: Option<Duration>,
}

impl Default for Timeout {
fn default() -> Timeout {
Timeout {
connect: Some(Duration::from_millis(200)),
write: Some(Duration::from_secs(2)),
read: Some(Duration::from_secs(2)),
}
}
}

#[test]
fn test_config() {
let conf = Config::default();
assert_eq!(10, conf.health_check.interval);
assert_eq!(Duration::from_secs(10), conf.health_check.interval);
assert_eq!("/", conf.health_check.uri_path);
assert_eq!(Some(Duration::from_millis(200)), conf.timeout.connect);
assert_eq!(Some(Duration::from_secs(2)), conf.timeout.write);
assert_eq!(Some(Duration::from_secs(2)), conf.timeout.read);
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ extern crate env_logger;
#[macro_use]
extern crate hyper;
extern crate hyper_tls;
extern crate hyper_timeout;
extern crate native_tls;
extern crate serde;
extern crate serde_json;
#[macro_use]
Expand Down
8 changes: 4 additions & 4 deletions src/mgmt/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl BackendHealth {
}
}

pub fn run(pool: Pool, handle: &Handle, conf: &Config, manager: Manager, health: BackendHealth) {
pub fn run(pool: Pool, handle: &Handle, config: &Config, manager: Manager, health: BackendHealth) {
let client = Client::configure()
.connector(HttpsConnector::new(4, &handle).unwrap())
.build(&handle);
Expand All @@ -118,7 +118,7 @@ pub fn run(pool: Pool, handle: &Handle, conf: &Config, manager: Manager, health:
let handle1 = handle1.clone();
let server = backend.server();
let health = health.clone();
let url = format!("{}{}", server.url(), &conf.health_check.uri_path);
let url = format!("{}{}", server.url(), &config.health_check.uri_path);
let url = match Uri::from_str(&url) {
Ok(url) => url,
Err(e) => {
Expand All @@ -133,8 +133,8 @@ pub fn run(pool: Pool, handle: &Handle, conf: &Config, manager: Manager, health:
}
};

let allowed_failures = conf.health_check.failures;
let allowed_successes = conf.health_check.passes;
let allowed_failures = config.health_check.failures;
let allowed_successes = config.health_check.passes;
debug!("Health check {:?}", url);
let req = client.get(url).then(move |res| match res {
Ok(res) => {
Expand Down
9 changes: 4 additions & 5 deletions src/mgmt/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::io;
use std::net::SocketAddr;
use std::time::Duration;

use futures::Stream;
use futures::stream::MergedItem;
Expand All @@ -26,14 +25,14 @@ pub fn run(
pool: Pool,
mut core: Core,
manager: Manager,
conf: &Config,
config: &Config,
health: BackendHealth,
) -> io::Result<()> {
let handle = core.handle();
let listener = TcpListener::bind(&sock, &handle)?;
let timer = Timer::default();
let health_timer = timer
.interval(Duration::from_secs(conf.health_check.interval))
.interval(config.health_check.interval)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));

let admin_addr = listener.local_addr()?;
Expand All @@ -47,12 +46,12 @@ pub fn run(
mgmt(socket, addr, pool.clone(), &handle, manager.clone());
}
MergedItem::Second(()) => {
health::run(pool.clone(), &handle, &conf, manager.clone(), health.clone());
health::run(pool.clone(), &handle, &config, manager.clone(), health.clone());
}
MergedItem::Both((socket, addr), ()) => {
mgmt(socket, addr, pool.clone(), &handle, manager.clone());
info!("health check");
health::run(pool.clone(), &handle, &conf, manager.clone(), health.clone());
health::run(pool.clone(), &handle, &config, manager.clone(), health.clone());
}
}

Expand Down
59 changes: 18 additions & 41 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ use std::io;
use std::net::SocketAddr;
use std::str::{self, FromStr};

use net2::TcpBuilder;
use net2::unix::UnixTcpBuilderExt;
use futures::{future, Future, Stream};
use tokio_core::reactor::{Core, Handle};
use futures::{Future, Stream};
use tokio_core::reactor::Handle;
use tokio_core::net::{TcpListener, TcpStream};
use hyper::{self, Headers, Body, Client, HttpVersion};
use hyper::client::{self, HttpConnector, Service};
use hyper::header;
use hyper::server::{self, Http};
use hyper_tls::HttpsConnector;
use hyper::Uri;
use hyper_timeout::TimeoutConnector;

use pool::Pool;
use config::Config;

// testing here before sending PR upstream
// TODO make this typed
Expand Down Expand Up @@ -142,7 +142,7 @@ fn map_response(res: client::Response) -> server::Response {
}

struct Proxy {
client: Client<HttpsConnector<HttpConnector>, Body>,
client: Client<TimeoutConnector<HttpsConnector<HttpConnector>>, Body>,
pool: Pool,
}

Expand Down Expand Up @@ -198,56 +198,33 @@ impl Service for Proxy {
}
}

/// Run server with default Core
pub fn run(addr: SocketAddr, pool: Pool, core: Core) -> io::Result<()> {
let handle = core.handle();

let listener = TcpBuilder::new_v4()?;
listener.reuse_address(true)?;
listener.reuse_port(true)?;
let listener = listener.bind(&addr)?;
let listener = listener.listen(128)?;
let listener = TcpListener::from_listener(listener, &addr, &handle)?;

run_with(core, listener, pool, future::empty())
}

/// Run server with specified Core, TcpListener, Pool
///
/// This is useful for integration testing where the port is set to 0 and the test code needs to
/// determine the local addr.
pub fn run_with<F>(
mut core: Core,
listener: TcpListener,
pool: Pool,
shutdown_signal: F,
) -> io::Result<()>
where
F: Future<Item = (), Error = hyper::Error>,
pub fn serve(listener: TcpListener, pool: Pool, handle: &Handle, config: &Config) -> io::Result<Box<Future<Item = (), Error = io::Error>>>
{
let handle = core.handle();

let handle = handle.clone();
let config = config.clone();
let local_addr = listener.local_addr()?;
info!("Listening on http://{}", &local_addr);
let srv = listener.incoming().for_each(move |(socket, addr)| {
proxy(socket, addr, pool.clone(), &handle);
proxy(socket, addr, pool.clone(), &handle, &config);

Ok(())
});

info!("Listening on http://{}", &local_addr);
match core.run(shutdown_signal.select(srv.map_err(|e| e.into()))) {
Ok(((), _incoming)) => Ok(()),
Err((e, _other)) => return Err(io::Error::new(io::ErrorKind::Other, e)),
}
return Ok(Box::new(srv));
}

fn proxy(socket: TcpStream, addr: SocketAddr, pool: Pool, handle: &Handle) {
fn proxy(socket: TcpStream, addr: SocketAddr, pool: Pool, handle: &Handle, config: &Config) {

// disable Nagle's algo
// https://github.com/hyperium/hyper/issues/944
socket.set_nodelay(true).unwrap();
let connector = HttpsConnector::new(4, handle).unwrap();
let mut tm = TimeoutConnector::new(connector, &handle);
tm.set_connect_timeout(config.timeout.connect);
tm.set_read_timeout(config.timeout.read);
tm.set_write_timeout(config.timeout.write);
let client = Client::configure()
.connector(HttpsConnector::new(4, handle).unwrap())
.connector(tm)
.build(&handle);
let service = Proxy {
client: client,
Expand Down
29 changes: 24 additions & 5 deletions src/weldr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ extern crate hyper;
extern crate weldr;
extern crate clap;
extern crate tokio_core;
extern crate net2;

use std::io;
use std::net::SocketAddr;

use clap::{Arg, App, SubCommand};
use net2::TcpBuilder;
use net2::unix::UnixTcpBuilderExt;

use tokio_core::reactor::Core;
use tokio_core::reactor::{Core, Handle};
use tokio_core::net::TcpListener;

use weldr::pool::Pool;
use weldr::config::Config;
Expand Down Expand Up @@ -49,7 +54,7 @@ fn main() {
.get_matches();


let core = Core::new().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();

// TODO make this dynamic and pass it down to workers
Expand All @@ -62,15 +67,18 @@ fn main() {
let ip = ip.parse::<SocketAddr>().unwrap();

let pool = Pool::default();
let config = Config::default();

if let Some(matches) = matches.subcommand_matches("worker") {
let id = matches.value_of("id").unwrap();
debug!("Spawned worker {}", id);
let _result = worker::subscribe(internal_addr, handle, pool.clone());

weldr::proxy::run(ip, pool, core).expect("Failed to start server");
let listener = setup_listener(ip, &core.handle()).expect("Failed to setup listener");
//weldr::proxy::run(ip, pool, core).expect("Failed to start server");
let srv = weldr::proxy::serve(listener, pool, &core.handle(), &config).expect("Failed to create server future");
core.run(srv).expect("Server failed");
} else {
let conf = Config::default();
let mut manager = manager::Manager::new();
manager.listen(internal_addr, handle.clone());
manager.start_workers(5).expect("Failed to start manager");
Expand All @@ -79,7 +87,18 @@ fn main() {

let admin_ip = matches.value_of("worker").unwrap_or("0.0.0.0:8687");
let admin_ip = admin_ip.parse::<SocketAddr>().unwrap();
weldr::mgmt::run(admin_ip, pool, core, manager.clone(), &conf, health.clone())
weldr::mgmt::run(admin_ip, pool, core, manager.clone(), &config, health.clone())
.expect("Failed to start server");
}
}

fn setup_listener(addr: SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
let listener = TcpBuilder::new_v4()?;
listener.reuse_address(true)?;
listener.reuse_port(true)?;
let listener = listener.bind(&addr)?;
let listener = listener.listen(128)?;
let listener = TcpListener::from_listener(listener, &addr, &handle)?;

Ok(listener)
}
Loading

0 comments on commit 611053e

Please sign in to comment.