Skip to content

Commit

Permalink
feat(client): add a Connection Pool
Browse files Browse the repository at this point in the history
This adds a connection pool to the Client that is used by default. It
accepts any other NetworkConnector, and simply acts as a
NetworkConnector itself. Other Pools can exist by simply providing a
custom NetworkConnector. This Pool is only used by default if you also
use the default connector, which is `HttpConnector`. If you wish to use
the Pool with a custom connector, you'll need to create the Pool with
your custom connector, and then pass that pool to the
Client::with_connector.

This also adds a method to `NetworkStream`, `close`, which can be used
to know when the Stream should be put down, because a server requested
that the connection close instead of be kept alive.

Closes #363
Closes #41
  • Loading branch information
seanmonstar committed Apr 28, 2015
1 parent 9d83ed6 commit 1f0d3db
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,17 @@ use status::StatusClass::Redirection;
use {Url, HttpResult};
use HttpError::HttpUriError;

pub use self::pool::Pool;
pub use self::request::Request;
pub use self::response::Response;

pub mod pool;
pub mod request;
pub mod response;

/// A Client to use additional features with Requests.
///
/// Clients can handle things such as: redirect policy.
/// Clients can handle things such as: redirect policy, connection pooling.
pub struct Client {
connector: Connector,
redirect_policy: RedirectPolicy,
Expand All @@ -64,7 +66,12 @@ impl Client {

/// Create a new Client.
pub fn new() -> Client {
Client::with_connector(HttpConnector(None))
Client::with_pool_config(Default::default())
}

/// Create a new Client with a configured Pool Config.
pub fn with_pool_config(config: pool::Config) -> Client {
Client::with_connector(Pool::new(config))
}

/// Create a new client with a specific connector.
Expand All @@ -78,7 +85,10 @@ impl Client {

/// Set the SSL verifier callback for use with OpenSSL.
pub fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.connector = with_connector(HttpConnector(Some(verifier)));
self.connector = with_connector(Pool::with_connector(
Default::default(),
HttpConnector(Some(verifier))
));
}

/// Set the RedirectPolicy.
Expand Down
217 changes: 217 additions & 0 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
//! Client Connection Pooling
use std::borrow::ToOwned;
use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};

use net::{NetworkConnector, NetworkStream, HttpConnector};

/// The `NetworkConnector` that behaves as a connection pool used by hyper's `Client`.
pub struct Pool<C: NetworkConnector> {
connector: C,
inner: Arc<Mutex<PoolImpl<<C as NetworkConnector>::Stream>>>
}

/// Config options for the `Pool`.
#[derive(Debug)]
pub struct Config {
/// The maximum idle connections *per host*.
pub max_idle: usize,
}

impl Default for Config {
#[inline]
fn default() -> Config {
Config {
max_idle: 5,
}
}
}

#[derive(Debug)]
struct PoolImpl<S> {
conns: HashMap<Key, Vec<S>>,
config: Config,
}

type Key = (String, u16, Scheme);

fn key<T: Into<Scheme>>(host: &str, port: u16, scheme: T) -> Key {
(host.to_owned(), port, scheme.into())
}

#[derive(Clone, PartialEq, Eq, Debug, Hash)]
enum Scheme {
Http,
Https,
Other(String)
}

impl<'a> From<&'a str> for Scheme {
fn from(s: &'a str) -> Scheme {
match s {
"http" => Scheme::Http,
"https" => Scheme::Https,
s => Scheme::Other(String::from(s))
}
}
}

impl Pool<HttpConnector> {
/// Creates a `Pool` with an `HttpConnector`.
#[inline]
pub fn new(config: Config) -> Pool<HttpConnector> {
Pool::with_connector(config, HttpConnector(None))
}
}

impl<C: NetworkConnector> Pool<C> {
/// Creates a `Pool` with a specified `NetworkConnector`.
#[inline]
pub fn with_connector(config: Config, connector: C) -> Pool<C> {
Pool {
connector: connector,
inner: Arc::new(Mutex::new(PoolImpl {
conns: HashMap::new(),
config: config,
}))
}
}
}

impl<S> PoolImpl<S> {
fn reuse(&mut self, key: Key, conn: S) {
trace!("reuse {:?}", key);
let conns = self.conns.entry(key).or_insert(vec![]);
if conns.len() < self.config.max_idle {
conns.push(conn);
}
}
}

impl<C: NetworkConnector<Stream=S>, S: NetworkStream + Send> NetworkConnector for Pool<C> {
type Stream = PooledStream<S>;
fn connect(&mut self, host: &str, port: u16, scheme: &str) -> io::Result<PooledStream<S>> {
let key = key(host, port, scheme);
let conn = {
let mut locked = self.inner.lock().unwrap();
match locked.conns.get_mut(&key) {
Some(ref mut vec) if vec.len() > 0 => vec.pop().unwrap(),
_ => try!(self.connector.connect(host, port, scheme))
}
};
Ok(PooledStream {
inner: Some((key, conn)),
is_closed: false,
is_drained: false,
pool: self.inner.clone()
})
}
}

/// A Stream that will try to be returned to the Pool when dropped.
pub struct PooledStream<S> {
inner: Option<(Key, S)>,
is_closed: bool,
is_drained: bool,
pool: Arc<Mutex<PoolImpl<S>>>
}

impl<S: NetworkStream> Read for PooledStream<S> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.inner.as_mut().unwrap().1.read(buf) {
Ok(0) => {
self.is_drained = true;
Ok(0)
}
r => r
}
}
}

impl<S: NetworkStream> Write for PooledStream<S> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.as_mut().unwrap().1.write(buf)
}

#[inline]
fn flush(&mut self) -> io::Result<()> {
self.inner.as_mut().unwrap().1.flush()
}
}

impl<S: NetworkStream> NetworkStream for PooledStream<S> {
#[inline]
fn peer_addr(&mut self) -> io::Result<SocketAddr> {
self.inner.as_mut().unwrap().1.peer_addr()
}

#[inline]
fn close(&mut self) {
if !self.is_closed {
self.is_closed = true;
self.inner.as_mut().map(|inner| inner.1.close());
}
}
}

impl<S> Drop for PooledStream<S> {
fn drop(&mut self) {
trace!("PooledStream.drop, is_closed={}, is_drained={}", self.is_closed, self.is_drained);
if !self.is_closed && self.is_drained {
self.inner.take().map(|(key, conn)| {
if let Ok(mut pool) = self.pool.lock() {
pool.reuse(key, conn);
}
// else poisoned, give up
});
}
}
}

#[cfg(test)]
mod tests {
use mock::MockConnector;
use net::{NetworkConnector, NetworkStream};

use super::{Pool, key};

macro_rules! mocked {
() => ({
Pool::with_connector(Default::default(), MockConnector)
})
}

#[test]
fn test_connect_and_drop() {
let mut pool = mocked!();
let key = key("127.0.0.1", 3000, "http");
pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true;
{
let locked = pool.inner.lock().unwrap();
assert_eq!(locked.conns.len(), 1);
assert_eq!(locked.conns.get(&key).unwrap().len(), 1);
}
pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true; //reused
{
let locked = pool.inner.lock().unwrap();
assert_eq!(locked.conns.len(), 1);
assert_eq!(locked.conns.get(&key).unwrap().len(), 1);
}
}

#[test]
fn test_closed() {
let mut pool = mocked!();
let mut stream = pool.connect("127.0.0.1", 3000, "http").unwrap();
stream.close();
drop(stream);
let locked = pool.inner.lock().unwrap();
assert_eq!(locked.conns.len(), 0);
}


}
3 changes: 3 additions & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> {
pub trait NetworkStream: Read + Write + Any + Send + Typeable {
/// Get the remote address of the underlying connection.
fn peer_addr(&mut self) -> io::Result<SocketAddr>;
/// This will be called when Stream should no longer be kept alive.
fn close(&mut self) {}
}

/// A connector creates a NetworkStream.
Expand Down Expand Up @@ -123,6 +125,7 @@ impl NetworkStream + Send {
}

/// If the underlying type is T, extract it.
#[inline]
pub fn downcast<T: Any>(self: Box<NetworkStream + Send>)
-> Result<Box<T>, Box<NetworkStream + Send>> {
if self.is::<T>() {
Expand Down

0 comments on commit 1f0d3db

Please sign in to comment.