From 3b5075acda57d5471ea9d18a7e2c11380a19935e Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 11 Sep 2017 21:46:19 -0700 Subject: [PATCH] fix: Correctly schedule pong timeouts Previously if the client code were otherwise waiting on an unrelated future the pong timeout expiring would immediately kill the connection. Instead wait for the connection to start reading messages from the websocket again, and only then kill the connection if we for sure don't see a pong. --- autopush_rs/src/server/mod.rs | 53 ++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index a32cb7df..5c4b549a 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -496,23 +496,34 @@ impl Future for PingManager { } TimeoutState::Ping(ref mut timeout) => { if timeout.poll()?.is_ready() { - if let CloseState::Exchange(ref mut client) = self.client { - client.shutdown(); - } - return Err("pong not received within timeout".into()) + // We may not actually be reading messages from the + // websocket right now, could have been waiting on something + // else. Instead of immediately returning an error here wait + // for the stream to return `NotReady` when looking for + // messages, as then we're extra sure that no pong was + // received after this timeout elapsed. + debug!("ping timeout fired, scheduling error to maybe happen"); + self.socket.borrow_mut().pong_timeout = true; + // `timeout` is cleared in the clause below } } } // Received pongs will clear our ping timeout, but not the close // timeout. - if self.socket.borrow_mut().poll_pong().is_ready() { - debug!("clearing ping timeout"); - if let TimeoutState::Ping(_) = self.timeout { + if let TimeoutState::Ping(_) = self.timeout { + let mut socket = self.socket.borrow_mut(); + if socket.poll_pong().is_ready() || socket.pong_timeout { + debug!("clearing ping timeout"); self.timeout = TimeoutState::None; } } + // Be sure to always flush out any buffered messages/pings + self.socket.borrow_mut().poll_complete().chain_err(|| { + "failed routine `poll_complete` call" + })?; + // At this point looks our state of ping management A-OK, so try to // make progress on our client, and when done with that execute the // closing handshake. @@ -537,6 +548,7 @@ struct WebpushSocket { inner: T, pong: Pong, ping: bool, + pong_timeout: bool, } enum Pong { @@ -551,6 +563,7 @@ impl WebpushSocket { inner: t, pong: Pong::None, ping: false, + pong_timeout: false, } } @@ -594,23 +607,37 @@ impl Stream for WebpushSocket fn poll(&mut self) -> Poll, Error> { loop { - match try_ready!(self.inner.poll()) { - Some(Message::Text(ref s)) => { + let msg = match self.inner.poll()? { + Async::Ready(Some(msg)) => msg, + Async::Ready(None) => return Ok(None.into()), + Async::NotReady => { + // If we don't have any more messages and our pong timeout + // elapsed (set above) then this is where we start + // triggering errors. + if self.pong_timeout { + return Err("failed to receive a pong in time".into()) + } + return Ok(Async::NotReady) + } + }; + match msg { + Message::Text(ref s) => { trace!("text message {}", s); let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?; return Ok(Some(msg).into()) } - Some(Message::Binary(_)) => { + Message::Binary(_) => { return Err("binary messages not accepted".into()) } // sending a pong is already managed by lower layers, just go to // the next message - Some(Message::Ping(_)) => {} + Message::Ping(_) => {} // Wake up tasks waiting for a pong, if any. - Some(Message::Pong(_)) => { + Message::Pong(_) => { + self.pong_timeout = false; match mem::replace(&mut self.pong, Pong::Received) { Pong::None => {} Pong::Received => {} @@ -620,8 +647,6 @@ impl Stream for WebpushSocket } } } - - None => return Ok(None.into()), } } }