Skip to content

Commit

Permalink
feat(client): support local bind for HttpConnector
Browse files Browse the repository at this point in the history
Add `set_local_address` to the `HttpConnector`.
This configures the client to bind the socket to a local address of
the host before it connects to the destination. This is useful on
hosts which have multiple network interfaces, to ensure the request is
issued over a specific interface.

Closes #1498
  • Loading branch information
kw217 committed May 1, 2018
1 parent 792b55f commit b6a3c85
Showing 1 changed file with 51 additions and 30 deletions.
81 changes: 51 additions & 30 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ impl Connected {
mod http {
use super::*;

use std::borrow::Cow;
use std::fmt;
use std::io;
use std::mem;
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -146,30 +147,35 @@ mod http {
use self::http_connector::HttpConnectorBlockingTask;


fn connect(addr: &SocketAddr, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
if let Some(ref handle) = *handle {
let builder = match addr {
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
let builder = match addr {
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
};

if let Some(ref local_addr) = *local_addr {
// Caller has requested this socket be bound before calling connect
builder.bind(SocketAddr::new(local_addr.clone(), 0))?;
}
else if cfg!(windows) {
// Windows requires a socket be bound before calling connect
let any: SocketAddr = match addr {
&SocketAddr::V4(_) => {
([0, 0, 0, 0], 0).into()
},
&SocketAddr::V6(_) => {
([0, 0, 0, 0, 0, 0, 0, 0], 0).into()
}
};
builder.bind(any)?;
}

if cfg!(windows) {
// Windows requires a socket be bound before calling connect
let any: SocketAddr = match addr {
&SocketAddr::V4(_) => {
([0, 0, 0, 0], 0).into()
},
&SocketAddr::V6(_) => {
([0, 0, 0, 0, 0, 0, 0, 0], 0).into()
}
};
builder.bind(any)?;
}
let handle = match *handle {
Some(ref handle) => Cow::Borrowed(handle),
None => Cow::Owned(Handle::current()),
};

Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle))
} else {
Ok(TcpStream::connect(addr))
}
Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, &handle))
}

/// A connector for the `http` scheme.
Expand All @@ -182,6 +188,7 @@ mod http {
handle: Option<Handle>,
keep_alive_timeout: Option<Duration>,
nodelay: bool,
local_address: Option<IpAddr>,
}

impl HttpConnector {
Expand Down Expand Up @@ -218,6 +225,7 @@ mod http {
handle,
keep_alive_timeout: None,
nodelay: false,
local_address: None,
}
}

Expand Down Expand Up @@ -246,6 +254,16 @@ mod http {
pub fn set_nodelay(&mut self, nodelay: bool) {
self.nodelay = nodelay;
}

/// Set that all sockets are bound to the configured address before connection.
///
/// If `None`, the sockets will not be bound.
///
/// Default is `None`.
#[inline]
pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
self.local_address = addr;
}
}

impl fmt::Debug for HttpConnector {
Expand Down Expand Up @@ -287,7 +305,7 @@ mod http {
};

HttpConnecting {
state: State::Lazy(self.executor.clone(), host.into(), port),
state: State::Lazy(self.executor.clone(), host.into(), port, self.local_address),
handle: self.handle.clone(),
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
Expand Down Expand Up @@ -337,8 +355,8 @@ mod http {
}

enum State {
Lazy(HttpConnectExecutor, String, u16),
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>),
Lazy(HttpConnectExecutor, String, u16, Option<IpAddr>),
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>, Option<IpAddr>),
Connecting(ConnectingTcp),
Error(Option<io::Error>),
}
Expand All @@ -351,26 +369,28 @@ mod http {
loop {
let state;
match self.state {
State::Lazy(ref executor, ref mut host, port) => {
State::Lazy(ref executor, ref mut host, port, local_addr) => {
// If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
local_addr: local_addr,
current: None
})
} else {
let host = mem::replace(host, String::new());
let work = dns::Work::new(host, port);
state = State::Resolving(oneshot::spawn(work, executor));
state = State::Resolving(oneshot::spawn(work, executor), local_addr);
}
},
State::Resolving(ref mut future) => {
State::Resolving(ref mut future, local_addr) => {
match try!(future.poll()) {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
local_addr: local_addr,
current: None,
})
}
Expand Down Expand Up @@ -402,6 +422,7 @@ mod http {

struct ConnectingTcp {
addrs: dns::IpAddrs,
local_addr: Option<IpAddr>,
current: Option<ConnectFuture>,
}

Expand All @@ -418,14 +439,14 @@ mod http {
err = Some(e);
if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr);
*current = connect(&addr, handle)?;
*current = connect(&addr, &self.local_addr, handle)?;
continue;
}
}
}
} else if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr);
self.current = Some(connect(&addr, handle)?);
self.current = Some(connect(&addr, &self.local_addr, handle)?);
continue;
}

Expand Down

0 comments on commit b6a3c85

Please sign in to comment.