diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index 09ee633c..a32cb7df 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -462,15 +462,23 @@ impl Future for PingManager { fn poll(&mut self) -> Poll<(), Error> { // If it's time for us to send a ping, then queue up a ping to get sent - // and start the clock for that ping to time out. + // and start the clock for that ping to time out once the ping is + // actually sent out on the socket. while let Async::Ready(_) = self.ping_interval.poll()? { match self.timeout { TimeoutState::None => {} _ => continue, } + debug!("scheduling a ping to be sent"); self.socket.borrow_mut().ping = true; - let timeout = Timeout::new(self.srv.opts.auto_ping_timeout, &self.srv.handle)?; - self.timeout = TimeoutState::Ping(timeout); + } + { + let mut socket = self.socket.borrow_mut(); + if socket.ping && socket.send_ping()?.is_ready() { + let timeout = Timeout::new(self.srv.opts.auto_ping_timeout, + &self.srv.handle)?; + self.timeout = TimeoutState::Ping(timeout); + } } // If the client takes too long to respond to our websocket ping or too @@ -499,14 +507,12 @@ impl Future for PingManager { // Received pongs will clear our ping timeout, but not the close // timeout. if self.socket.borrow_mut().poll_pong().is_ready() { + debug!("clearing ping timeout"); if let TimeoutState::Ping(_) = self.timeout { self.timeout = TimeoutState::None; } } - // Ensure the scheduled ping is actually flushed out - self.socket.borrow_mut().poll_complete()?; - // At this point looks our state of ping management A-OK, so try to // make progress on our client, and when done with that execute the // closing handshake. @@ -554,6 +560,7 @@ impl WebpushSocket { Pong::Received => return Async::Ready(()), Pong::Waiting(_) => {} } + debug!("waiting for a pong"); self.pong = Pong::Waiting(task::current()); Async::NotReady } @@ -562,9 +569,16 @@ impl WebpushSocket { where T: Sink, Error: From { if self.ping { + debug!("sending a ping"); match self.inner.start_send(Message::Ping(Vec::new()))? { - AsyncSink::Ready => self.ping = false, - AsyncSink::NotReady(_) => return Ok(Async::NotReady), + AsyncSink::Ready => { + debug!("ping sent"); + self.ping = false; + } + AsyncSink::NotReady(_) => { + debug!("ping not ready to be sent"); + return Ok(Async::NotReady) + } } } Ok(Async::Ready(())) @@ -582,6 +596,7 @@ impl Stream for WebpushSocket loop { match try_ready!(self.inner.poll()) { Some(Message::Text(ref s)) => { + trace!("text message {}", s); let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?; return Ok(Some(msg).into()) } @@ -600,6 +615,7 @@ impl Stream for WebpushSocket Pong::None => {} Pong::Received => {} Pong::Waiting(task) => { + debug!("notifying a task of a pong"); task.notify(); } }