diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index a32cb7df..5c4b549a 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -496,23 +496,34 @@ impl Future for PingManager { } TimeoutState::Ping(ref mut timeout) => { if timeout.poll()?.is_ready() { - if let CloseState::Exchange(ref mut client) = self.client { - client.shutdown(); - } - return Err("pong not received within timeout".into()) + // We may not actually be reading messages from the + // websocket right now, could have been waiting on something + // else. Instead of immediately returning an error here wait + // for the stream to return `NotReady` when looking for + // messages, as then we're extra sure that no pong was + // received after this timeout elapsed. + debug!("ping timeout fired, scheduling error to maybe happen"); + self.socket.borrow_mut().pong_timeout = true; + // `timeout` is cleared in the clause below } } } // Received pongs will clear our ping timeout, but not the close // timeout. - if self.socket.borrow_mut().poll_pong().is_ready() { - debug!("clearing ping timeout"); - if let TimeoutState::Ping(_) = self.timeout { + if let TimeoutState::Ping(_) = self.timeout { + let mut socket = self.socket.borrow_mut(); + if socket.poll_pong().is_ready() || socket.pong_timeout { + debug!("clearing ping timeout"); self.timeout = TimeoutState::None; } } + // Be sure to always flush out any buffered messages/pings + self.socket.borrow_mut().poll_complete().chain_err(|| { + "failed routine `poll_complete` call" + })?; + // At this point looks our state of ping management A-OK, so try to // make progress on our client, and when done with that execute the // closing handshake. @@ -537,6 +548,7 @@ struct WebpushSocket { inner: T, pong: Pong, ping: bool, + pong_timeout: bool, } enum Pong { @@ -551,6 +563,7 @@ impl WebpushSocket { inner: t, pong: Pong::None, ping: false, + pong_timeout: false, } } @@ -594,23 +607,37 @@ impl Stream for WebpushSocket fn poll(&mut self) -> Poll, Error> { loop { - match try_ready!(self.inner.poll()) { - Some(Message::Text(ref s)) => { + let msg = match self.inner.poll()? { + Async::Ready(Some(msg)) => msg, + Async::Ready(None) => return Ok(None.into()), + Async::NotReady => { + // If we don't have any more messages and our pong timeout + // elapsed (set above) then this is where we start + // triggering errors. + if self.pong_timeout { + return Err("failed to receive a pong in time".into()) + } + return Ok(Async::NotReady) + } + }; + match msg { + Message::Text(ref s) => { trace!("text message {}", s); let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?; return Ok(Some(msg).into()) } - Some(Message::Binary(_)) => { + Message::Binary(_) => { return Err("binary messages not accepted".into()) } // sending a pong is already managed by lower layers, just go to // the next message - Some(Message::Ping(_)) => {} + Message::Ping(_) => {} // Wake up tasks waiting for a pong, if any. - Some(Message::Pong(_)) => { + Message::Pong(_) => { + self.pong_timeout = false; match mem::replace(&mut self.pong, Pong::Received) { Pong::None => {} Pong::Received => {} @@ -620,8 +647,6 @@ impl Stream for WebpushSocket } } } - - None => return Ok(None.into()), } } }