From 11a30a98b5423c65082252f9d77e70efce0bbbb6 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 11 Sep 2017 21:38:28 -0700 Subject: [PATCH 1/2] fix: Start ping timeouts once pints are sent Instead of starting the timeout once we enqueue the ping to be sent, start the timeout once the websocket has actually accepted the ping to get sent. --- autopush_rs/src/server/mod.rs | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index 09ee633c..a32cb7df 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -462,15 +462,23 @@ impl Future for PingManager { fn poll(&mut self) -> Poll<(), Error> { // If it's time for us to send a ping, then queue up a ping to get sent - // and start the clock for that ping to time out. + // and start the clock for that ping to time out once the ping is + // actually sent out on the socket. while let Async::Ready(_) = self.ping_interval.poll()? { match self.timeout { TimeoutState::None => {} _ => continue, } + debug!("scheduling a ping to be sent"); self.socket.borrow_mut().ping = true; - let timeout = Timeout::new(self.srv.opts.auto_ping_timeout, &self.srv.handle)?; - self.timeout = TimeoutState::Ping(timeout); + } + { + let mut socket = self.socket.borrow_mut(); + if socket.ping && socket.send_ping()?.is_ready() { + let timeout = Timeout::new(self.srv.opts.auto_ping_timeout, + &self.srv.handle)?; + self.timeout = TimeoutState::Ping(timeout); + } } // If the client takes too long to respond to our websocket ping or too @@ -499,14 +507,12 @@ impl Future for PingManager { // Received pongs will clear our ping timeout, but not the close // timeout. if self.socket.borrow_mut().poll_pong().is_ready() { + debug!("clearing ping timeout"); if let TimeoutState::Ping(_) = self.timeout { self.timeout = TimeoutState::None; } } - // Ensure the scheduled ping is actually flushed out - self.socket.borrow_mut().poll_complete()?; - // At this point looks our state of ping management A-OK, so try to // make progress on our client, and when done with that execute the // closing handshake. @@ -554,6 +560,7 @@ impl WebpushSocket { Pong::Received => return Async::Ready(()), Pong::Waiting(_) => {} } + debug!("waiting for a pong"); self.pong = Pong::Waiting(task::current()); Async::NotReady } @@ -562,9 +569,16 @@ impl WebpushSocket { where T: Sink, Error: From { if self.ping { + debug!("sending a ping"); match self.inner.start_send(Message::Ping(Vec::new()))? { - AsyncSink::Ready => self.ping = false, - AsyncSink::NotReady(_) => return Ok(Async::NotReady), + AsyncSink::Ready => { + debug!("ping sent"); + self.ping = false; + } + AsyncSink::NotReady(_) => { + debug!("ping not ready to be sent"); + return Ok(Async::NotReady) + } } } Ok(Async::Ready(())) @@ -582,6 +596,7 @@ impl Stream for WebpushSocket loop { match try_ready!(self.inner.poll()) { Some(Message::Text(ref s)) => { + trace!("text message {}", s); let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?; return Ok(Some(msg).into()) } @@ -600,6 +615,7 @@ impl Stream for WebpushSocket Pong::None => {} Pong::Received => {} Pong::Waiting(task) => { + debug!("notifying a task of a pong"); task.notify(); } } From 3b5075acda57d5471ea9d18a7e2c11380a19935e Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 11 Sep 2017 21:46:19 -0700 Subject: [PATCH 2/2] fix: Correctly schedule pong timeouts Previously if the client code were otherwise waiting on an unrelated future the pong timeout expiring would immediately kill the connection. Instead wait for the connection to start reading messages from the websocket again, and only then kill the connection if we for sure don't see a pong. --- autopush_rs/src/server/mod.rs | 53 ++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index a32cb7df..5c4b549a 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -496,23 +496,34 @@ impl Future for PingManager { } TimeoutState::Ping(ref mut timeout) => { if timeout.poll()?.is_ready() { - if let CloseState::Exchange(ref mut client) = self.client { - client.shutdown(); - } - return Err("pong not received within timeout".into()) + // We may not actually be reading messages from the + // websocket right now, could have been waiting on something + // else. Instead of immediately returning an error here wait + // for the stream to return `NotReady` when looking for + // messages, as then we're extra sure that no pong was + // received after this timeout elapsed. + debug!("ping timeout fired, scheduling error to maybe happen"); + self.socket.borrow_mut().pong_timeout = true; + // `timeout` is cleared in the clause below } } } // Received pongs will clear our ping timeout, but not the close // timeout. - if self.socket.borrow_mut().poll_pong().is_ready() { - debug!("clearing ping timeout"); - if let TimeoutState::Ping(_) = self.timeout { + if let TimeoutState::Ping(_) = self.timeout { + let mut socket = self.socket.borrow_mut(); + if socket.poll_pong().is_ready() || socket.pong_timeout { + debug!("clearing ping timeout"); self.timeout = TimeoutState::None; } } + // Be sure to always flush out any buffered messages/pings + self.socket.borrow_mut().poll_complete().chain_err(|| { + "failed routine `poll_complete` call" + })?; + // At this point looks our state of ping management A-OK, so try to // make progress on our client, and when done with that execute the // closing handshake. @@ -537,6 +548,7 @@ struct WebpushSocket { inner: T, pong: Pong, ping: bool, + pong_timeout: bool, } enum Pong { @@ -551,6 +563,7 @@ impl WebpushSocket { inner: t, pong: Pong::None, ping: false, + pong_timeout: false, } } @@ -594,23 +607,37 @@ impl Stream for WebpushSocket fn poll(&mut self) -> Poll, Error> { loop { - match try_ready!(self.inner.poll()) { - Some(Message::Text(ref s)) => { + let msg = match self.inner.poll()? { + Async::Ready(Some(msg)) => msg, + Async::Ready(None) => return Ok(None.into()), + Async::NotReady => { + // If we don't have any more messages and our pong timeout + // elapsed (set above) then this is where we start + // triggering errors. + if self.pong_timeout { + return Err("failed to receive a pong in time".into()) + } + return Ok(Async::NotReady) + } + }; + match msg { + Message::Text(ref s) => { trace!("text message {}", s); let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?; return Ok(Some(msg).into()) } - Some(Message::Binary(_)) => { + Message::Binary(_) => { return Err("binary messages not accepted".into()) } // sending a pong is already managed by lower layers, just go to // the next message - Some(Message::Ping(_)) => {} + Message::Ping(_) => {} // Wake up tasks waiting for a pong, if any. - Some(Message::Pong(_)) => { + Message::Pong(_) => { + self.pong_timeout = false; match mem::replace(&mut self.pong, Pong::Received) { Pong::None => {} Pong::Received => {} @@ -620,8 +647,6 @@ impl Stream for WebpushSocket } } } - - None => return Ok(None.into()), } } }