Skip to content

Commit

Permalink
Add support for local bind to HttpConnector.
Browse files Browse the repository at this point in the history
Resolves hyperium#1498.
  • Loading branch information
kw217 committed Apr 26, 2018
1 parent f44fa0e commit 01727df
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 30 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ runtime = [
"tokio-tcp",
]

[[example]]
name = "bound_client"
path = "examples/bound_client.rs"
required-features = ["runtime"]

[[example]]
name = "client"
path = "examples/client.rs"
Expand Down
56 changes: 56 additions & 0 deletions examples/bound_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#![deny(warnings)]
extern crate hyper;
extern crate pretty_env_logger;

use std::env;
use std::net::IpAddr;
use std::io::{self, Write};

use hyper::{Body, Client, Request};
use hyper::rt::{self, Future, Stream};
use hyper::client::connect::HttpConnector;

fn main() {
pretty_env_logger::init();

let url = match env::args().nth(1) {
Some(url) => url,
None => {
println!("Usage: client <url> [<bind_addr>]");
return;
}
};

let url = url.parse::<hyper::Uri>().unwrap();
if url.scheme_part().map(|s| s.as_ref()) != Some("http") {
println!("This example only works with 'http' URLs.");
return;
}

let bind_addr = env::args().nth(2);

let bind_addr: Option<IpAddr> = bind_addr.map(|s| s.parse::<IpAddr>().unwrap());

rt::run(rt::lazy(move || {
let mut connector = HttpConnector::new(4);
connector.set_local_address(bind_addr);
let client = Client::builder().build(connector);

let mut req = Request::new(Body::empty());
*req.uri_mut() = url;

client.request(req).and_then(|res| {
println!("Response: {}", res.status());
println!("Headers: {:#?}", res.headers());

res.into_body().for_each(|chunk| {
io::stdout().write_all(&chunk)
.map_err(|e| panic!("example expects stdout is open, error={}", e))
})
}).map(|_| {
println!("\n\nDone.");
}).map_err(|err| {
eprintln!("Error {}", err);
})
}));
}
81 changes: 51 additions & 30 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! establishes connections over TCP.
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
use std::error::Error as StdError;
use std::borrow::Cow;

use futures::Future;
use http::Uri;
Expand Down Expand Up @@ -128,7 +129,7 @@ mod http {
use std::fmt;
use std::io;
use std::mem;
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -146,30 +147,35 @@ mod http {
use self::http_connector::HttpConnectorBlockingTask;


fn connect(addr: &SocketAddr, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
if let Some(ref handle) = *handle {
let builder = match addr {
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
let builder = match addr {
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
};

if let Some(ref local_addr) = *local_addr {
// Caller has requested this socket be bound before calling connect
builder.bind(SocketAddr::new(local_addr.clone(), 0))?;
}
else if cfg!(windows) {
// Windows requires a socket be bound before calling connect
let any: SocketAddr = match addr {
&SocketAddr::V4(_) => {
([0, 0, 0, 0], 0).into()
},
&SocketAddr::V6(_) => {
([0, 0, 0, 0, 0, 0, 0, 0], 0).into()
}
};
builder.bind(any)?;
}

if cfg!(windows) {
// Windows requires a socket be bound before calling connect
let any: SocketAddr = match addr {
&SocketAddr::V4(_) => {
([0, 0, 0, 0], 0).into()
},
&SocketAddr::V6(_) => {
([0, 0, 0, 0, 0, 0, 0, 0], 0).into()
}
};
builder.bind(any)?;
}
let handle = match *handle {
Some(ref handle) => Cow::Borrowed(handle),
None => Cow::Owned(Handle::current()),
};

Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle))
} else {
Ok(TcpStream::connect(addr))
}
Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, &handle))
}

/// A connector for the `http` scheme.
Expand All @@ -182,6 +188,7 @@ mod http {
handle: Option<Handle>,
keep_alive_timeout: Option<Duration>,
nodelay: bool,
local_address: Option<IpAddr>,
}

impl HttpConnector {
Expand Down Expand Up @@ -218,6 +225,7 @@ mod http {
handle,
keep_alive_timeout: None,
nodelay: false,
local_address: None,
}
}

Expand Down Expand Up @@ -246,6 +254,16 @@ mod http {
pub fn set_nodelay(&mut self, nodelay: bool) {
self.nodelay = nodelay;
}

/// Set that all sockets are bound to the configured address before connection.
///
/// If `None`, the sockets will not be bound.
///
/// Default is `None`.
#[inline]
pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
self.local_address = addr;
}
}

impl fmt::Debug for HttpConnector {
Expand Down Expand Up @@ -287,7 +305,7 @@ mod http {
};

HttpConnecting {
state: State::Lazy(self.executor.clone(), host.into(), port),
state: State::Lazy(self.executor.clone(), host.into(), port, self.local_address),
handle: self.handle.clone(),
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
Expand Down Expand Up @@ -337,8 +355,8 @@ mod http {
}

enum State {
Lazy(HttpConnectExecutor, String, u16),
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>),
Lazy(HttpConnectExecutor, String, u16, Option<IpAddr>),
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>, Option<IpAddr>),
Connecting(ConnectingTcp),
Error(Option<io::Error>),
}
Expand All @@ -351,26 +369,28 @@ mod http {
loop {
let state;
match self.state {
State::Lazy(ref executor, ref mut host, port) => {
State::Lazy(ref executor, ref mut host, port, local_addr) => {
// If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
local_addr: local_addr,
current: None
})
} else {
let host = mem::replace(host, String::new());
let work = dns::Work::new(host, port);
state = State::Resolving(oneshot::spawn(work, executor));
state = State::Resolving(oneshot::spawn(work, executor), local_addr);
}
},
State::Resolving(ref mut future) => {
State::Resolving(ref mut future, local_addr) => {
match try!(future.poll()) {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
local_addr: local_addr,
current: None,
})
}
Expand Down Expand Up @@ -402,6 +422,7 @@ mod http {

struct ConnectingTcp {
addrs: dns::IpAddrs,
local_addr: Option<IpAddr>,
current: Option<ConnectFuture>,
}

Expand All @@ -418,14 +439,14 @@ mod http {
err = Some(e);
if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr);
*current = connect(&addr, handle)?;
*current = connect(&addr, &self.local_addr, handle)?;
continue;
}
}
}
} else if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr);
self.current = Some(connect(&addr, handle)?);
self.current = Some(connect(&addr, &self.local_addr, handle)?);
continue;
}

Expand Down

0 comments on commit 01727df

Please sign in to comment.