From 2d5af177c1f0cfa3f592eec56f3a971fd9770f72 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 17 Oct 2018 16:06:49 -0700 Subject: [PATCH] feat(client): add `Resolve`, used by `HttpConnector` This introduces a `Resolve` trait to describe asynchronous DNS resolution. The `HttpConnector` can be configured with a resolver, allowing a user to still use all the functionality of the `HttpConnector`, while customizing the DNS resolution. To prevent a breaking change, the `HttpConnector` has its `Resolve` generic set by default to `GaiResolver`. This is same as the existing resolver, which uses `getaddrinfo` inside a thread pool. Closes #1517 --- src/client/connect/dns.rs | 196 +++++++++++++++++++++++++++++++++---- src/client/connect/http.rs | 153 ++++++++++++++--------------- src/client/connect/mod.rs | 1 + tests/client.rs | 3 +- 4 files changed, 253 insertions(+), 100 deletions(-) diff --git a/src/client/connect/dns.rs b/src/client/connect/dns.rs index 866b0e5e9c..6628e63eff 100644 --- a/src/client/connect/dns.rs +++ b/src/client/connect/dns.rs @@ -1,45 +1,183 @@ -use std::io; +use std::{fmt, io, vec}; use std::net::{ - Ipv4Addr, Ipv6Addr, + IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, SocketAddrV4, SocketAddrV6, }; -use std::vec; +use std::sync::Arc; -use ::futures::{Async, Future, Poll}; +use futures::{Async, Future, Poll}; +use futures::future::{Executor, ExecuteError}; +use futures::sync::oneshot; +use futures_cpupool::{Builder as CpuPoolBuilder}; -pub struct Work { +use self::sealed::GaiTask; + +/// Resolve a hostname to a set of IP addresses. +pub trait Resolve { + /// The set of IP addresses to try to connect to. + type Addrs: Iterator; + /// A Future of the resolved set of addresses. + type Future: Future; + /// Resolve a hostname. + fn resolve(&self, name: Name) -> Self::Future; +} + +/// A domain name to resolve into IP addresses. +pub struct Name { + host: String, +} + +/// A resolver using blocking `getaddrinfo` calls in a threadpool. +#[derive(Clone)] +pub struct GaiResolver { + executor: GaiExecutor, +} + +pub struct GaiAddrs { + inner: IpAddrs, +} + +pub struct GaiFuture { + rx: oneshot::SpawnHandle, +} + +impl Name { + pub(super) fn new(host: String) -> Name { + Name { + host, + } + } + + /// View the hostname as a string slice. + pub fn as_str(&self) -> &str { + &self.host + } +} + +impl fmt::Debug for Name { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.host, f) + } +} + +impl GaiResolver { + /// Construct a new `GaiResolver`. + /// + /// Takes number of DNS worker threads. + pub fn new(threads: usize) -> Self { + let pool = CpuPoolBuilder::new() + .name_prefix("hyper-dns") + .pool_size(threads) + .create(); + GaiResolver::new_with_executor(pool) + } + + /// Construct a new `GaiResolver` with a shared thread pool executor. + /// + /// Takes an executor to run blocking `getaddrinfo` tasks on. + pub fn new_with_executor(executor: E) -> Self + where + E: Executor + Send + Sync, + { + GaiResolver { + executor: GaiExecutor(Arc::new(executor)), + } + } +} + +impl Resolve for GaiResolver { + type Addrs = GaiAddrs; + type Future = GaiFuture; + + fn resolve(&self, name: Name) -> Self::Future { + let blocking = GaiBlocking::new(name.host); + let rx = oneshot::spawn(blocking, &self.executor); + GaiFuture { + rx, + } + } +} + +impl fmt::Debug for GaiResolver { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("GaiResolver") + } +} + +impl Future for GaiFuture { + type Item = GaiAddrs; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + let addrs = try_ready!(self.rx.poll()); + Ok(Async::Ready(GaiAddrs { + inner: addrs, + })) + } +} + +impl fmt::Debug for GaiFuture { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("GaiFuture") + } +} + +impl Iterator for GaiAddrs { + type Item = IpAddr; + + fn next(&mut self) -> Option { + self.inner.next().map(|sa| sa.ip()) + } +} + +impl fmt::Debug for GaiAddrs { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("GaiAddrs") + } +} + +#[derive(Clone)] +struct GaiExecutor(Arc + Send + Sync>); + +impl Executor> for GaiExecutor { + fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> { + self.0.execute(GaiTask { work: future }) + .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) + } +} + +pub(super) struct GaiBlocking { host: String, - port: u16 } -impl Work { - pub fn new(host: String, port: u16) -> Work { - Work { host: host, port: port } +impl GaiBlocking { + pub(super) fn new(host: String) -> GaiBlocking { + GaiBlocking { host } } } -impl Future for Work { +impl Future for GaiBlocking { type Item = IpAddrs; type Error = io::Error; fn poll(&mut self) -> Poll { - debug!("resolving host={:?}, port={:?}", self.host, self.port); - (&*self.host, self.port).to_socket_addrs() + debug!("resolving host={:?}", self.host); + (&*self.host, 0).to_socket_addrs() .map(|i| Async::Ready(IpAddrs { iter: i })) } } -pub struct IpAddrs { +pub(super) struct IpAddrs { iter: vec::IntoIter, } impl IpAddrs { - pub fn new(addrs: Vec) -> Self { + pub(super) fn new(addrs: Vec) -> Self { IpAddrs { iter: addrs.into_iter() } } - pub fn try_parse(host: &str, port: u16) -> Option { + pub(super) fn try_parse(host: &str, port: u16) -> Option { if let Ok(addr) = host.parse::() { let addr = SocketAddrV4::new(addr, port); return Some(IpAddrs { iter: vec![SocketAddr::V4(addr)].into_iter() }) @@ -51,7 +189,7 @@ impl IpAddrs { None } - pub fn split_by_preference(self) -> (IpAddrs, IpAddrs) { + pub(super) fn split_by_preference(self) -> (IpAddrs, IpAddrs) { let preferring_v6 = self.iter .as_slice() .first() @@ -64,7 +202,7 @@ impl IpAddrs { (IpAddrs::new(preferred), IpAddrs::new(fallback)) } - pub fn is_empty(&self) -> bool { + pub(super) fn is_empty(&self) -> bool { self.iter.as_slice().is_empty() } } @@ -77,6 +215,30 @@ impl Iterator for IpAddrs { } } +// Make this Future unnameable outside of this crate. +pub(super) mod sealed { + use super::*; + // Blocking task to be executed on a thread pool. + pub struct GaiTask { + pub(super) work: oneshot::Execute + } + + impl fmt::Debug for GaiTask { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("GaiTask") + } + } + + impl Future for GaiTask { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + self.work.poll() + } + } +} + #[cfg(test)] mod tests { use std::net::{Ipv4Addr, Ipv6Addr}; diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index 31e865fcf7..3ad245a630 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -4,22 +4,18 @@ use std::error::Error as StdError; use std::io; use std::mem; use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; use std::time::{Duration, Instant}; use futures::{Async, Future, Poll}; -use futures::future::{Executor, ExecuteError}; -use futures::sync::oneshot; -use futures_cpupool::{Builder as CpuPoolBuilder}; +use futures::future::{Executor}; use http::uri::Scheme; use net2::TcpBuilder; use tokio_reactor::Handle; use tokio_tcp::{TcpStream, ConnectFuture}; use tokio_timer::Delay; -use super::{dns, Connect, Connected, Destination}; - -use self::sealed::HttpConnectorBlockingTask; +use super::{Connect, Connected, Destination}; +use super::dns::{self, GaiResolver, Resolve}; /// A connector for the `http` scheme. /// @@ -30,14 +26,14 @@ use self::sealed::HttpConnectorBlockingTask; /// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes /// transport information such as the remote socket address used. #[derive(Clone)] -pub struct HttpConnector { - executor: HttpConnectExecutor, +pub struct HttpConnector { enforce_http: bool, handle: Option, + happy_eyeballs_timeout: Option, keep_alive_timeout: Option, - nodelay: bool, local_address: Option, - happy_eyeballs_timeout: Option, + nodelay: bool, + resolver: R, reuse_address: bool, } @@ -78,36 +74,45 @@ impl HttpConnector { /// Takes number of DNS worker threads. #[inline] pub fn new(threads: usize) -> HttpConnector { - HttpConnector::new_with_handle_opt(threads, None) + HttpConnector::new_with_resolver(GaiResolver::new(threads)) } - /// Construct a new HttpConnector with a specific Tokio handle. + #[doc(hidden)] + #[deprecated(note = "Use HttpConnector::set_reactor to set a reactor handle")] pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector { - HttpConnector::new_with_handle_opt(threads, Some(handle)) - } - - fn new_with_handle_opt(threads: usize, handle: Option) -> HttpConnector { - let pool = CpuPoolBuilder::new() - .name_prefix("hyper-dns") - .pool_size(threads) - .create(); - HttpConnector::new_with_executor(pool, handle) + let resolver = GaiResolver::new(threads); + let mut http = HttpConnector::new_with_resolver(resolver); + http.set_reactor(Some(handle)); + http } /// Construct a new HttpConnector. /// - /// Takes an executor to run blocking tasks on. + /// Takes an executor to run blocking `getaddrinfo` tasks on. pub fn new_with_executor(executor: E, handle: Option) -> HttpConnector - where E: Executor + Send + Sync + where E: Executor + Send + Sync { + let resolver = GaiResolver::new_with_executor(executor); + let mut http = HttpConnector::new_with_resolver(resolver); + http.set_reactor(handle); + http + } +} + + +impl HttpConnector { + /// Construct a new HttpConnector. + /// + /// Takes a `Resolve` to handle DNS lookups. + pub fn new_with_resolver(resolver: R) -> HttpConnector { HttpConnector { - executor: HttpConnectExecutor(Arc::new(executor)), enforce_http: true, - handle, + handle: None, + happy_eyeballs_timeout: Some(Duration::from_millis(300)), keep_alive_timeout: None, - nodelay: false, local_address: None, - happy_eyeballs_timeout: Some(Duration::from_millis(300)), + nodelay: false, + resolver, reuse_address: false, } } @@ -120,6 +125,14 @@ impl HttpConnector { self.enforce_http = is_enforced; } + /// Set a handle to a `Reactor` to register connections to. + /// + /// If `None`, the implicit default reactor will be used. + #[inline] + pub fn set_reactor(&mut self, handle: Option) { + self.handle = handle; + } + /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. /// /// If `None`, the option will not be set. @@ -175,17 +188,22 @@ impl HttpConnector { } } -impl fmt::Debug for HttpConnector { +// R: Debug required for now to allow adding it to debug output later... +impl fmt::Debug for HttpConnector { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("HttpConnector") .finish() } } -impl Connect for HttpConnector { +impl Connect for HttpConnector +where + R: Resolve + Clone + Send + Sync, + R::Future: Send, +{ type Transport = TcpStream; type Error = io::Error; - type Future = HttpConnecting; + type Future = HttpConnecting; fn connect(&self, dst: Destination) -> Self::Future { trace!( @@ -213,11 +231,12 @@ impl Connect for HttpConnector { }; HttpConnecting { - state: State::Lazy(self.executor.clone(), host.into(), port, self.local_address), + state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), handle: self.handle.clone(), + happy_eyeballs_timeout: self.happy_eyeballs_timeout, keep_alive_timeout: self.keep_alive_timeout, nodelay: self.nodelay, - happy_eyeballs_timeout: self.happy_eyeballs_timeout, + port, reuse_address: self.reuse_address, } } @@ -231,12 +250,13 @@ impl HttpInfo { } #[inline] -fn invalid_url(err: InvalidUrl, handle: &Option) -> HttpConnecting { +fn invalid_url(err: InvalidUrl, handle: &Option) -> HttpConnecting { HttpConnecting { state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))), handle: handle.clone(), keep_alive_timeout: None, nodelay: false, + port: 0, happy_eyeballs_timeout: None, reuse_address: false, } @@ -266,23 +286,24 @@ impl StdError for InvalidUrl { } /// A Future representing work to connect to a URL. #[must_use = "futures do nothing unless polled"] -pub struct HttpConnecting { - state: State, +pub struct HttpConnecting { + state: State, handle: Option, + happy_eyeballs_timeout: Option, keep_alive_timeout: Option, nodelay: bool, - happy_eyeballs_timeout: Option, + port: u16, reuse_address: bool, } -enum State { - Lazy(HttpConnectExecutor, String, u16, Option), - Resolving(oneshot::SpawnHandle, Option), +enum State { + Lazy(R, String, Option), + Resolving(R::Future, Option), Connecting(ConnectingTcp), Error(Option), } -impl Future for HttpConnecting { +impl Future for HttpConnecting { type Item = (TcpStream, Connected); type Error = io::Error; @@ -290,22 +311,26 @@ impl Future for HttpConnecting { loop { let state; match self.state { - State::Lazy(ref executor, ref mut host, port, local_addr) => { + State::Lazy(ref resolver, ref mut host, local_addr) => { // If the host is already an IP addr (v4 or v6), // skip resolving the dns and start connecting right away. - if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { + if let Some(addrs) = dns::IpAddrs::try_parse(host, self.port) { state = State::Connecting(ConnectingTcp::new( local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address)); } else { - let host = mem::replace(host, String::new()); - let work = dns::Work::new(host, port); - state = State::Resolving(oneshot::spawn(work, executor), local_addr); + let name = dns::Name::new(mem::replace(host, String::new())); + state = State::Resolving(resolver.resolve(name), local_addr); } }, State::Resolving(ref mut future, local_addr) => { match try!(future.poll()) { Async::NotReady => return Ok(Async::NotReady), Async::Ready(addrs) => { + let port = self.port; + let addrs = addrs + .map(|addr| SocketAddr::new(addr, port)) + .collect(); + let addrs = dns::IpAddrs::new(addrs); state = State::Connecting(ConnectingTcp::new( local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address)); } @@ -335,7 +360,7 @@ impl Future for HttpConnecting { } } -impl fmt::Debug for HttpConnecting { +impl fmt::Debug for HttpConnecting { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.pad("HttpConnecting") } @@ -522,40 +547,6 @@ impl ConnectingTcp { } } -// Make this Future unnameable outside of this crate. -mod sealed { - use super::*; - // Blocking task to be executed on a thread pool. - pub struct HttpConnectorBlockingTask { - pub(super) work: oneshot::Execute - } - - impl fmt::Debug for HttpConnectorBlockingTask { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("HttpConnectorBlockingTask") - } - } - - impl Future for HttpConnectorBlockingTask { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - self.work.poll() - } - } -} - -#[derive(Clone)] -struct HttpConnectExecutor(Arc + Send + Sync>); - -impl Executor> for HttpConnectExecutor { - fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> { - self.0.execute(HttpConnectorBlockingTask { work: future }) - .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) - } -} - #[cfg(test)] mod tests { use std::io; diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index a67d6d1079..ec0aeac163 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -15,6 +15,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] mod dns; #[cfg(feature = "runtime")] mod http; +#[cfg(feature = "runtime")] pub use self::dns::{GaiResolver, Name, Resolve}; #[cfg(feature = "runtime")] pub use self::http::{HttpConnector, HttpInfo}; /// Connect to a destination, returning an IO transport. diff --git a/tests/client.rs b/tests/client.rs index fa48d89919..dae5fb9864 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -17,7 +17,6 @@ use hyper::{Body, Client, Method, Request, StatusCode}; use futures::{Future, Stream}; use futures::sync::oneshot; -use tokio::reactor::Handle; use tokio::runtime::current_thread::Runtime; use tokio::net::tcp::{ConnectFuture, TcpStream}; @@ -226,7 +225,7 @@ macro_rules! test { let addr = server.local_addr().expect("local_addr"); let mut rt = $runtime; - let connector = ::hyper::client::HttpConnector::new_with_handle(1, Handle::default()); + let connector = ::hyper::client::HttpConnector::new(1); let client = Client::builder() .set_host($set_host) .http1_title_case_headers($title_case_headers)