From 1f0d3db9374dac4ed3f9ae05bc44cf2167dd70aa Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 23 Jan 2015 13:15:15 -0800 Subject: [PATCH] feat(client): add a Connection Pool This adds a connection pool to the Client that is used by default. It accepts any other NetworkConnector, and simply acts as a NetworkConnector itself. Other Pools can exist by simply providing a custom NetworkConnector. This Pool is only used by default if you also use the default connector, which is `HttpConnector`. If you wish to use the Pool with a custom connector, you'll need to create the Pool with your custom connector, and then pass that pool to the Client::with_connector. This also adds a method to `NetworkStream`, `close`, which can be used to know when the Stream should be put down, because a server requested that the connection close instead of be kept alive. Closes #363 Closes #41 --- src/client/mod.rs | 16 +++- src/client/pool.rs | 217 +++++++++++++++++++++++++++++++++++++++++++++ src/net.rs | 3 + 3 files changed, 233 insertions(+), 3 deletions(-) create mode 100644 src/client/pool.rs diff --git a/src/client/mod.rs b/src/client/mod.rs index fdc53413a5..5587edb2f0 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -46,15 +46,17 @@ use status::StatusClass::Redirection; use {Url, HttpResult}; use HttpError::HttpUriError; +pub use self::pool::Pool; pub use self::request::Request; pub use self::response::Response; +pub mod pool; pub mod request; pub mod response; /// A Client to use additional features with Requests. /// -/// Clients can handle things such as: redirect policy. +/// Clients can handle things such as: redirect policy, connection pooling. pub struct Client { connector: Connector, redirect_policy: RedirectPolicy, @@ -64,7 +66,12 @@ impl Client { /// Create a new Client. pub fn new() -> Client { - Client::with_connector(HttpConnector(None)) + Client::with_pool_config(Default::default()) + } + + /// Create a new Client with a configured Pool Config. + pub fn with_pool_config(config: pool::Config) -> Client { + Client::with_connector(Pool::new(config)) } /// Create a new client with a specific connector. @@ -78,7 +85,10 @@ impl Client { /// Set the SSL verifier callback for use with OpenSSL. pub fn set_ssl_verifier(&mut self, verifier: ContextVerifier) { - self.connector = with_connector(HttpConnector(Some(verifier))); + self.connector = with_connector(Pool::with_connector( + Default::default(), + HttpConnector(Some(verifier)) + )); } /// Set the RedirectPolicy. diff --git a/src/client/pool.rs b/src/client/pool.rs new file mode 100644 index 0000000000..a78067f05e --- /dev/null +++ b/src/client/pool.rs @@ -0,0 +1,217 @@ +//! Client Connection Pooling +use std::borrow::ToOwned; +use std::collections::HashMap; +use std::io::{self, Read, Write}; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +use net::{NetworkConnector, NetworkStream, HttpConnector}; + +/// The `NetworkConnector` that behaves as a connection pool used by hyper's `Client`. +pub struct Pool { + connector: C, + inner: Arc::Stream>>> +} + +/// Config options for the `Pool`. +#[derive(Debug)] +pub struct Config { + /// The maximum idle connections *per host*. + pub max_idle: usize, +} + +impl Default for Config { + #[inline] + fn default() -> Config { + Config { + max_idle: 5, + } + } +} + +#[derive(Debug)] +struct PoolImpl { + conns: HashMap>, + config: Config, +} + +type Key = (String, u16, Scheme); + +fn key>(host: &str, port: u16, scheme: T) -> Key { + (host.to_owned(), port, scheme.into()) +} + +#[derive(Clone, PartialEq, Eq, Debug, Hash)] +enum Scheme { + Http, + Https, + Other(String) +} + +impl<'a> From<&'a str> for Scheme { + fn from(s: &'a str) -> Scheme { + match s { + "http" => Scheme::Http, + "https" => Scheme::Https, + s => Scheme::Other(String::from(s)) + } + } +} + +impl Pool { + /// Creates a `Pool` with an `HttpConnector`. + #[inline] + pub fn new(config: Config) -> Pool { + Pool::with_connector(config, HttpConnector(None)) + } +} + +impl Pool { + /// Creates a `Pool` with a specified `NetworkConnector`. + #[inline] + pub fn with_connector(config: Config, connector: C) -> Pool { + Pool { + connector: connector, + inner: Arc::new(Mutex::new(PoolImpl { + conns: HashMap::new(), + config: config, + })) + } + } +} + +impl PoolImpl { + fn reuse(&mut self, key: Key, conn: S) { + trace!("reuse {:?}", key); + let conns = self.conns.entry(key).or_insert(vec![]); + if conns.len() < self.config.max_idle { + conns.push(conn); + } + } +} + +impl, S: NetworkStream + Send> NetworkConnector for Pool { + type Stream = PooledStream; + fn connect(&mut self, host: &str, port: u16, scheme: &str) -> io::Result> { + let key = key(host, port, scheme); + let conn = { + let mut locked = self.inner.lock().unwrap(); + match locked.conns.get_mut(&key) { + Some(ref mut vec) if vec.len() > 0 => vec.pop().unwrap(), + _ => try!(self.connector.connect(host, port, scheme)) + } + }; + Ok(PooledStream { + inner: Some((key, conn)), + is_closed: false, + is_drained: false, + pool: self.inner.clone() + }) + } +} + +/// A Stream that will try to be returned to the Pool when dropped. +pub struct PooledStream { + inner: Option<(Key, S)>, + is_closed: bool, + is_drained: bool, + pool: Arc>> +} + +impl Read for PooledStream { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self.inner.as_mut().unwrap().1.read(buf) { + Ok(0) => { + self.is_drained = true; + Ok(0) + } + r => r + } + } +} + +impl Write for PooledStream { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.as_mut().unwrap().1.write(buf) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + self.inner.as_mut().unwrap().1.flush() + } +} + +impl NetworkStream for PooledStream { + #[inline] + fn peer_addr(&mut self) -> io::Result { + self.inner.as_mut().unwrap().1.peer_addr() + } + + #[inline] + fn close(&mut self) { + if !self.is_closed { + self.is_closed = true; + self.inner.as_mut().map(|inner| inner.1.close()); + } + } +} + +impl Drop for PooledStream { + fn drop(&mut self) { + trace!("PooledStream.drop, is_closed={}, is_drained={}", self.is_closed, self.is_drained); + if !self.is_closed && self.is_drained { + self.inner.take().map(|(key, conn)| { + if let Ok(mut pool) = self.pool.lock() { + pool.reuse(key, conn); + } + // else poisoned, give up + }); + } + } +} + +#[cfg(test)] +mod tests { + use mock::MockConnector; + use net::{NetworkConnector, NetworkStream}; + + use super::{Pool, key}; + + macro_rules! mocked { + () => ({ + Pool::with_connector(Default::default(), MockConnector) + }) + } + + #[test] + fn test_connect_and_drop() { + let mut pool = mocked!(); + let key = key("127.0.0.1", 3000, "http"); + pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true; + { + let locked = pool.inner.lock().unwrap(); + assert_eq!(locked.conns.len(), 1); + assert_eq!(locked.conns.get(&key).unwrap().len(), 1); + } + pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true; //reused + { + let locked = pool.inner.lock().unwrap(); + assert_eq!(locked.conns.len(), 1); + assert_eq!(locked.conns.get(&key).unwrap().len(), 1); + } + } + + #[test] + fn test_closed() { + let mut pool = mocked!(); + let mut stream = pool.connect("127.0.0.1", 3000, "http").unwrap(); + stream.close(); + drop(stream); + let locked = pool.inner.lock().unwrap(); + assert_eq!(locked.conns.len(), 0); + } + + +} diff --git a/src/net.rs b/src/net.rs index eb4ceca103..c8f3c2d2c8 100644 --- a/src/net.rs +++ b/src/net.rs @@ -57,6 +57,8 @@ impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> { pub trait NetworkStream: Read + Write + Any + Send + Typeable { /// Get the remote address of the underlying connection. fn peer_addr(&mut self) -> io::Result; + /// This will be called when Stream should no longer be kept alive. + fn close(&mut self) {} } /// A connector creates a NetworkStream. @@ -123,6 +125,7 @@ impl NetworkStream + Send { } /// If the underlying type is T, extract it. + #[inline] pub fn downcast(self: Box) -> Result, Box> { if self.is::() {