Skip to content

Commit

Permalink
fix(client): properly close idle connections after timeout
Browse files Browse the repository at this point in the history
Additionally fixes if there were idle connections when a `Client` is
dropped.

Only fixes with the no-proto dispatcher, as changing internals for the
tokio-proto dispatcher would be much harder, and it will replace it very
soon.

Closes #1397
  • Loading branch information
seanmonstar committed Dec 14, 2017
1 parent 16aa92c commit 139dc7a
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 13 deletions.
37 changes: 32 additions & 5 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,12 @@ where C: Connect,
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = mpsc::channel(1);
let pooled = pool.pooled(pool_key, RefCell::new(tx));
let (tx, rx) = mpsc::channel(0);
let tx = HyperClient {
tx: RefCell::new(tx),
should_close: true,
};
let pooled = pool.pooled(pool_key, tx);
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err)));
Expand All @@ -269,9 +273,10 @@ where C: Connect,
e.into()
});

let resp = race.and_then(move |client| {
let resp = race.and_then(move |mut client| {
let (callback, rx) = oneshot::channel();
client.borrow_mut().start_send((head, body, callback)).unwrap();
client.tx.borrow_mut().start_send(proto::dispatch::ClientMsg::Request(head, body, callback)).unwrap();
client.should_close = false;
rx.then(|res| {
match res {
Ok(Ok(res)) => Ok(res),
Expand Down Expand Up @@ -309,7 +314,29 @@ impl<C, B> fmt::Debug for Client<C, B> {
}

type ProtoClient<B> = ClientProxy<Message<RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>;
type HyperClient<B> = RefCell<::futures::sync::mpsc::Sender<(RequestHead, Option<B>, ::futures::sync::oneshot::Sender<::Result<::Response>>)>>;

struct HyperClient<B> {
tx: RefCell<::futures::sync::mpsc::Sender<proto::dispatch::ClientMsg<B>>>,
should_close: bool,
}

impl<B> Clone for HyperClient<B> {
fn clone(&self) -> HyperClient<B> {
HyperClient {
tx: self.tx.clone(),
should_close: self.should_close,
}
}
}

impl<B> Drop for HyperClient<B> {
fn drop(&mut self) {
if self.should_close {
self.should_close = false;
let _ = self.tx.borrow_mut().try_send(proto::dispatch::ClientMsg::Close);
}
}
}

enum Dispatch<B> {
Proto(Pool<ProtoClient<B>>),
Expand Down
8 changes: 6 additions & 2 deletions src/proto/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ where I: AsyncRead + AsyncWrite,

}

pub fn close_and_shutdown(&mut self) -> Poll<(), io::Error> {
try_ready!(self.flush());
self.shutdown()
}

pub fn shutdown(&mut self) -> Poll<(), io::Error> {
match self.io.io_mut().shutdown() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Expand Down Expand Up @@ -625,8 +630,7 @@ where I: AsyncRead + AsyncWrite,

#[inline]
fn close(&mut self) -> Poll<(), Self::SinkError> {
try_ready!(self.poll_complete());
self.shutdown()
self.close_and_shutdown()
}
}

Expand Down
40 changes: 35 additions & 5 deletions src/proto/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct Dispatcher<D, Bs, I, B, T, K> {
dispatch: D,
body_tx: Option<super::body::BodySender>,
body_rx: Option<Bs>,
is_closing: bool,
}

pub trait Dispatch {
Expand All @@ -34,7 +35,12 @@ pub struct Client<B> {
rx: ClientRx<B>,
}

type ClientRx<B> = mpsc::Receiver<(RequestHead, Option<B>, oneshot::Sender<::Result<::Response>>)>;
pub enum ClientMsg<B> {
Request(RequestHead, Option<B>, oneshot::Sender<::Result<::Response>>),
Close,
}

type ClientRx<B> = mpsc::Receiver<ClientMsg<B>>;

impl<D, Bs, I, B, T, K> Dispatcher<D, Bs, I, B, T, K>
where
Expand All @@ -51,6 +57,7 @@ where
dispatch: dispatch,
body_tx: None,
body_rx: None,
is_closing: false,
}
}

Expand All @@ -60,7 +67,9 @@ where

fn poll_read(&mut self) -> Poll<(), ::Error> {
loop {
if self.conn.can_read_head() {
if self.is_closing {
return Ok(Async::Ready(()));
} else if self.conn.can_read_head() {
match self.conn.read_head() {
Ok(Async::Ready(Some((head, has_body)))) => {
let body = if has_body {
Expand Down Expand Up @@ -149,12 +158,16 @@ where

fn poll_write(&mut self) -> Poll<(), ::Error> {
loop {
if self.body_rx.is_none() && self.dispatch.should_poll() {
if self.is_closing {
return Ok(Async::Ready(()));
} else if self.body_rx.is_none() && self.dispatch.should_poll() {
if let Some((head, body)) = try_ready!(self.dispatch.poll_msg()) {
self.conn.write_head(head, body.is_some());
self.body_rx = body;
} else {
self.conn.close_write();
self.is_closing = true;
//self.conn.close_read();
//self.conn.close_write();
return Ok(Async::Ready(()));
}
} else if self.conn.has_queued_body() {
Expand Down Expand Up @@ -190,6 +203,16 @@ where
})
}

fn poll_close(&mut self) -> Poll<(), ::Error> {
debug_assert!(self.is_closing);

try_ready!(self.conn.close_and_shutdown());
self.conn.close_read();
self.conn.close_write();
self.is_closing = false;
Ok(Async::Ready(()))
}

fn is_done(&self) -> bool {
let read_done = self.conn.is_read_closed();

Expand Down Expand Up @@ -224,6 +247,10 @@ where
self.poll_write()?;
self.poll_flush()?;

if self.is_closing {
self.poll_close()?;
}

if self.is_done() {
try_ready!(self.conn.shutdown());
trace!("Dispatch::poll done");
Expand Down Expand Up @@ -285,6 +312,7 @@ where

// ===== impl Client =====


impl<B> Client<B> {
pub fn new(rx: ClientRx<B>) -> Client<B> {
Client {
Expand All @@ -305,11 +333,13 @@ where

fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> {
match self.rx.poll() {
Ok(Async::Ready(Some((head, body, cb)))) => {
Ok(Async::Ready(Some(ClientMsg::Request(head, body, cb)))) => {
self.callback = Some(cb);
Ok(Async::Ready(Some((head, body))))
},
Ok(Async::Ready(Some(ClientMsg::Close))) |
Ok(Async::Ready(None)) => {
trace!("client tx closed");
// user has dropped sender handle
Ok(Async::Ready(None))
},
Expand Down
53 changes: 52 additions & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ mod dispatch_impl {
}

#[test]
fn drop_client_closes_connection() {
fn dropped_client_closes_connection() {
// https://github.com/hyperium/hyper/issues/1353
let _ = pretty_env_logger::init();

Expand Down Expand Up @@ -653,6 +653,57 @@ mod dispatch_impl {
assert_eq!(closes.load(Ordering::Relaxed), 1);
}


#[test]
fn drop_client_closes_idle_connections() {
let _ = pretty_env_logger::init();

let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let closes = Arc::new(AtomicUsize::new(0));

let (tx1, rx1) = oneshot::channel();
let (_client_drop_tx, client_drop_rx) = oneshot::channel::<()>();

thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
let body =[b'x'; 64];
write!(sock, "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", body.len()).expect("write head");
let _ = sock.write_all(&body);
let _ = tx1.send(());

// prevent this thread from closing until end of test, so the connection
// stays open and idle until Client is dropped
let _ = client_drop_rx.wait();
});

let uri = format!("http://{}/a", addr).parse().unwrap();

let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.no_proto()
.build(&handle);
let res = client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok);
res.body().concat2()
});
let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
core.run(res.join(rx).map(|r| r.0)).unwrap();

// not closed yet, just idle
assert_eq!(closes.load(Ordering::Relaxed), 0);
drop(client);
core.run(Timeout::new(Duration::from_millis(100), &handle).unwrap()).unwrap();

assert_eq!(closes.load(Ordering::Relaxed), 1);
}

#[test]
fn no_keep_alive_closes_connection() {
// https://github.com/hyperium/hyper/issues/1383
Expand Down

0 comments on commit 139dc7a

Please sign in to comment.