Skip to content

Commit

Permalink
Merge pull request #916 from jwilm/full-slab-evict-idle
Browse files Browse the repository at this point in the history
Evict idle connections when client is full
  • Loading branch information
seanmonstar authored Sep 22, 2016
2 parents 8c6e6f5 + 27cab37 commit bed4815
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 47 deletions.
182 changes: 135 additions & 47 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! The HTTP `Client` uses asynchronous IO, and utilizes the `Handler` trait
//! to convey when IO events are available for a given request.
use std::collections::HashMap;
use std::collections::{VecDeque, HashMap};
use std::fmt;
use std::io;
use std::marker::PhantomData;
Expand Down Expand Up @@ -107,6 +107,7 @@ impl<H: Send> Client<H> {
keep_alive: keep_alive,
idle_conns: HashMap::new(),
queue: HashMap::new(),
awaiting_slot: VecDeque::new(),
}).unwrap()
}));

Expand Down Expand Up @@ -328,19 +329,52 @@ impl<H: Handler<T>, T: Transport> http::MessageHandler<T> for Message<H, T> {
}
}

struct Context<K, H> {
struct Context<K, H, C: Connect> {
connect_timeout: Duration,
keep_alive: bool,
idle_conns: HashMap<K, Vec<http::Control>>,
queue: HashMap<K, Vec<Queued<H>>>,
idle_conns: HashMap<K, VecDeque<http::Control>>,
queue: HashMap<K, VecDeque<Queued<H>>>,
awaiting_slot: VecDeque<(C::Key, C::Output)>,
}

impl<K: http::Key, H> Context<K, H> {
/// Macro for advancing state of a ClientFsm::Socket
///
/// This was previously a method on Context, but due to eviction needs, this
/// block now needs access to the registration APIs on rotor::Scope.
macro_rules! conn_response {
($scope:expr, $conn:expr, $time:expr) => {{
match $conn {
Some((conn, timeout)) => {
//TODO: HTTP2: a connection doesn't need to be idle to be used for a second stream
if conn.is_idle() {
$scope.idle_conns.entry(conn.key().clone()).or_insert_with(VecDeque::new)
.push_back(conn.control());
}
match timeout {
Some(dur) => rotor::Response::ok(ClientFsm::Socket(conn))
.deadline($time + dur),
None => rotor::Response::ok(ClientFsm::Socket(conn)),
}

}
None => {
if let Some((key, socket)) = $scope.awaiting_slot.pop_front() {
rotor_try!($scope.register(&socket, EventSet::writable(), PollOpt::level()));
rotor::Response::ok(ClientFsm::Connecting((key, socket)))
} else {
rotor::Response::done()
}
}
}
}}
}

impl<K: http::Key, H, C: Connect> Context<K, H, C> {
fn pop_queue(&mut self, key: &K) -> Option<Queued<H>> {
let mut should_remove = false;
let queued = {
self.queue.get_mut(key).map(|vec| {
let queued = vec.remove(0);
self.queue.get_mut(key).and_then(|vec| {
let queued = vec.pop_front();
if vec.is_empty() {
should_remove = true;
}
Expand All @@ -350,32 +384,17 @@ impl<K: http::Key, H> Context<K, H> {
if should_remove {
self.queue.remove(key);
}
queued
}

fn conn_response<C>(&mut self, conn: Option<(http::Conn<K, C::Output, Message<H, C::Output>>, Option<Duration>)>, time: rotor::Time)
-> rotor::Response<ClientFsm<C, H>, (C::Key, C::Output)>
where C: Connect<Key=K>, H: Handler<C::Output> {
match conn {
Some((conn, timeout)) => {
//TODO: HTTP2: a connection doesn't need to be idle to be used for a second stream
if conn.is_idle() {
self.idle_conns.entry(conn.key().clone()).or_insert_with(Vec::new)
.push(conn.control());
}
match timeout {
Some(dur) => rotor::Response::ok(ClientFsm::Socket(conn))
.deadline(time + dur),
None => rotor::Response::ok(ClientFsm::Socket(conn)),
}

}
None => rotor::Response::done()
}
queued
}
}

impl<K: http::Key, H: Handler<T>, T: Transport> http::MessageHandlerFactory<K, T> for Context<K, H> {
impl<K, H, T, C> http::MessageHandlerFactory<K, T> for Context<K, H, C>
where K: http::Key,
H: Handler<T>,
T: Transport,
C: Connect
{
type Output = Message<H, T>;

fn create(&mut self, seed: http::Seed<K>) -> Option<Self::Output> {
Expand Down Expand Up @@ -424,7 +443,7 @@ where C: Connect,
C::Key: fmt::Debug,
C::Output: Transport,
H: Handler<C::Output> {
type Context = Context<C::Key, H>;
type Context = Context<C::Key, H, C>;
type Seed = (C::Key, C::Output);

fn create(seed: Self::Seed, scope: &mut Scope<Self::Context>) -> rotor::Response<Self, rotor::Void> {
Expand All @@ -437,7 +456,7 @@ where C: Connect,
ClientFsm::Socket(conn) => {
let res = conn.ready(events, scope);
let now = scope.now();
scope.conn_response(res, now)
conn_response!(scope, res, now)
},
ClientFsm::Connecting(mut seed) => {
if events.is_error() || events.is_hup() {
Expand Down Expand Up @@ -480,6 +499,70 @@ where C: Connect,
}
}

fn spawn_error(
self,
scope: &mut Scope<Self::Context>,
error: rotor::SpawnError<Self::Seed>
) -> rotor::Response<Self, Self::Seed> {
// see if there's an idle connections that can be terminated. If yes, put this seed on a
// list waiting for empty slot.
if let rotor::SpawnError::NoSlabSpace((key, socket)) = error {
if let Some(mut queued) = scope.pop_queue(&key) {
trace!("attempting to remove an idle socket");
// Remove an idle connection. Any connection. Just make some space
// for the new request.
let mut remove_keys = Vec::new();
let mut found_idle = false;

// Check all idle connections regardless of origin
for (key, idle) in scope.idle_conns.iter_mut() {
while let Some(ctrl) = idle.pop_front() {
// Signal connection to close. An err here means the
// socket is already dead can should be tossed.
if ctrl.ready(Next::remove()).is_ok() {
found_idle = true;
break;
}
}

// This list is empty, mark it for removal
if idle.is_empty() {
remove_keys.push(key.to_owned());
}

// if found, stop looking for an idle connection.
if found_idle {
break;
}
}

trace!("idle conns: {:?}", scope.idle_conns);

// Remove empty idle lists.
for key in &remove_keys {
scope.idle_conns.remove(&key);
}

if found_idle {
// A socket should be evicted soon; put it on a queue to
// consume newly freed slot. Also need to put the Queued<H>
// back onto front of queue.
scope.awaiting_slot.push_back((key.clone(), socket));
scope.queue
.entry(key)
.or_insert_with(VecDeque::new)
.push_back(queued);
} else {
// Couldn't evict a socket, just run the error handler.
debug!("Error spawning state machine; slab full and no sockets idle");
let _ = queued.handler.on_error(::Error::Full);
}
}
}

self.connect(scope)
}

fn timeout(self, scope: &mut Scope<Self::Context>) -> rotor::Response<Self, Self::Seed> {
trace!("timeout now = {:?}", scope.now());
match self {
Expand All @@ -489,8 +572,8 @@ where C: Connect,
{
for (key, mut vec) in &mut scope.queue {
while !vec.is_empty() && vec[0].deadline <= now {
let mut queued = vec.remove(0);
let _ = queued.handler.on_error(::Error::Timeout);
vec.pop_front()
.map(|mut queued| queued.handler.on_error(::Error::Timeout));
}
if vec.is_empty() {
empty_keys.push(key.clone());
Expand All @@ -511,7 +594,7 @@ where C: Connect,
ClientFsm::Socket(conn) => {
let res = conn.timeout(scope);
let now = scope.now();
scope.conn_response(res, now)
conn_response!(scope, res, now)
}
}
}
Expand All @@ -524,7 +607,7 @@ where C: Connect,
ClientFsm::Socket(conn) => {
let res = conn.wakeup(scope);
let now = scope.now();
scope.conn_response(res, now)
conn_response!(scope, res, now)
},
ClientFsm::Connecting(..) => unreachable!("connecting sockets should not be woken up")
}
Expand Down Expand Up @@ -559,8 +642,7 @@ where C: Connect,
let mut remove_idle = false;
let mut woke_up = false;
if let Some(mut idle) = scope.idle_conns.get_mut(&key) {
while !idle.is_empty() {
let ctrl = idle.remove(0);
while let Some(ctrl) = idle.pop_front() {
// err means the socket has since died
if ctrl.ready(Next::write()).is_ok() {
woke_up = true;
Expand All @@ -576,11 +658,14 @@ where C: Connect,
if woke_up {
trace!("woke up idle conn for '{}'", url);
let deadline = scope.now() + scope.connect_timeout;
scope.queue.entry(key).or_insert_with(Vec::new).push(Queued {
deadline: deadline,
handler: handler,
url: url
});
scope.queue
.entry(key)
.or_insert_with(VecDeque::new)
.push_back(Queued {
deadline: deadline,
handler: handler,
url: url
});
continue;
}
} else {
Expand All @@ -592,11 +677,14 @@ where C: Connect,
match connector.connect(&url) {
Ok(key) => {
let deadline = scope.now() + scope.connect_timeout;
scope.queue.entry(key).or_insert_with(Vec::new).push(Queued {
deadline: deadline,
handler: handler,
url: url
});
scope.queue
.entry(key)
.or_insert_with(VecDeque::new)
.push_back(Queued {
deadline: deadline,
handler: handler,
url: url
});
}
Err(e) => {
let _todo = handler.on_error(e.into());
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub enum Error {
Status,
/// A timeout occurred waiting for an IO event.
Timeout,
/// Event loop is full and cannot process request
Full,
/// An `io::Error` that occurred while trying to read or write to a network stream.
Io(IoError),
/// An error from a SSL library.
Expand Down Expand Up @@ -90,6 +92,7 @@ impl StdError for Error {
Status => "Invalid Status provided",
Incomplete => "Message is incomplete",
Timeout => "Timeout",
Error::Full => "Event loop is full",
Uri(ref e) => e.description(),
Io(ref e) => e.description(),
Ssl(ref e) => e.description(),
Expand Down

0 comments on commit bed4815

Please sign in to comment.