Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
fix: Correctly schedule pong timeouts
Browse files Browse the repository at this point in the history
Previously if the client code were otherwise waiting on an unrelated future the
pong timeout expiring would immediately kill the connection. Instead wait for
the connection to start reading messages from the websocket again, and only then
kill the connection if we for sure don't see a pong.
  • Loading branch information
alexcrichton committed Sep 12, 2017
1 parent 11a30a9 commit 3b5075a
Showing 1 changed file with 39 additions and 14 deletions.
53 changes: 39 additions & 14 deletions autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,23 +496,34 @@ impl Future for PingManager {
}
TimeoutState::Ping(ref mut timeout) => {
if timeout.poll()?.is_ready() {
if let CloseState::Exchange(ref mut client) = self.client {
client.shutdown();
}
return Err("pong not received within timeout".into())
// We may not actually be reading messages from the
// websocket right now, could have been waiting on something
// else. Instead of immediately returning an error here wait
// for the stream to return `NotReady` when looking for
// messages, as then we're extra sure that no pong was
// received after this timeout elapsed.
debug!("ping timeout fired, scheduling error to maybe happen");
self.socket.borrow_mut().pong_timeout = true;
// `timeout` is cleared in the clause below
}
}
}

// Received pongs will clear our ping timeout, but not the close
// timeout.
if self.socket.borrow_mut().poll_pong().is_ready() {
debug!("clearing ping timeout");
if let TimeoutState::Ping(_) = self.timeout {
if let TimeoutState::Ping(_) = self.timeout {
let mut socket = self.socket.borrow_mut();
if socket.poll_pong().is_ready() || socket.pong_timeout {
debug!("clearing ping timeout");
self.timeout = TimeoutState::None;
}
}

// Be sure to always flush out any buffered messages/pings
self.socket.borrow_mut().poll_complete().chain_err(|| {
"failed routine `poll_complete` call"
})?;

// At this point looks our state of ping management A-OK, so try to
// make progress on our client, and when done with that execute the
// closing handshake.
Expand All @@ -537,6 +548,7 @@ struct WebpushSocket<T> {
inner: T,
pong: Pong,
ping: bool,
pong_timeout: bool,
}

enum Pong {
Expand All @@ -551,6 +563,7 @@ impl<T> WebpushSocket<T> {
inner: t,
pong: Pong::None,
ping: false,
pong_timeout: false,
}
}

Expand Down Expand Up @@ -594,23 +607,37 @@ impl<T> Stream for WebpushSocket<T>

fn poll(&mut self) -> Poll<Option<ClientMessage>, Error> {
loop {
match try_ready!(self.inner.poll()) {
Some(Message::Text(ref s)) => {
let msg = match self.inner.poll()? {
Async::Ready(Some(msg)) => msg,
Async::Ready(None) => return Ok(None.into()),
Async::NotReady => {
// If we don't have any more messages and our pong timeout
// elapsed (set above) then this is where we start
// triggering errors.
if self.pong_timeout {
return Err("failed to receive a pong in time".into())
}
return Ok(Async::NotReady)
}
};
match msg {
Message::Text(ref s) => {
trace!("text message {}", s);
let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?;
return Ok(Some(msg).into())
}

Some(Message::Binary(_)) => {
Message::Binary(_) => {
return Err("binary messages not accepted".into())
}

// sending a pong is already managed by lower layers, just go to
// the next message
Some(Message::Ping(_)) => {}
Message::Ping(_) => {}

// Wake up tasks waiting for a pong, if any.
Some(Message::Pong(_)) => {
Message::Pong(_) => {
self.pong_timeout = false;
match mem::replace(&mut self.pong, Pong::Received) {
Pong::None => {}
Pong::Received => {}
Expand All @@ -620,8 +647,6 @@ impl<T> Stream for WebpushSocket<T>
}
}
}

None => return Ok(None.into()),
}
}
}
Expand Down

0 comments on commit 3b5075a

Please sign in to comment.