Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
refactor: Reorganize the Rust ping manager
Browse files Browse the repository at this point in the history
Focus on retaining just one `Timeout` instance instead of creating many, using
the `reset` function where possible. Additionally try to streamline the number
of timers that we hit during each pass to ensure that we're at most only hitting
one on the fast path.
  • Loading branch information
alexcrichton committed Sep 13, 2017
1 parent 4d5a44f commit 2a1ab5d
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 106 deletions.
20 changes: 10 additions & 10 deletions autopush_rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

178 changes: 82 additions & 96 deletions autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@ use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::ffi::CStr;
use std::io;
use std::mem;
use std::net::{IpAddr, ToSocketAddrs};
use std::panic;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::{Instant, Duration};

use futures::sync::oneshot;
use futures::task::{self, Task};
use futures::task;
use futures::{Stream, Future, Sink, Async, Poll, AsyncSink, StartSend};
use libc::c_char;
use serde_json;
use time;
use tokio_core::net::TcpListener;
use tokio_core::reactor::{Core, Timeout, Handle, Interval};
use tokio_core::reactor::{Core, Timeout, Handle};
use tokio_io;
use tokio_tungstenite::{accept_async, WebSocketStream};
use tungstenite::Message;
Expand Down Expand Up @@ -386,7 +385,7 @@ impl Server {
if let Some(client) = uaids.get_mut(&uaid) {
debug!("Found a client to deliver a notification to");
// TODO: Don't unwrap, handle error properly
(&client.tx).send(ServerNotification::Notification(notif)).unwrap();
client.tx.unbounded_send(ServerNotification::Notification(notif)).unwrap();
info!("Dropped notification in queue");
return Ok(());
}
Expand All @@ -410,16 +409,16 @@ impl Drop for Server {

struct PingManager {
socket: RcObject<WebpushSocket<WebSocketStream<WebpushIo>>>,
ping_interval: Interval,
timeout: TimeoutState,
timeout: Timeout,
waiting: WaitingFor,
srv: Rc<Server>,
client: CloseState<Client<RcObject<WebpushSocket<WebSocketStream<WebpushIo>>>>>,
}

enum TimeoutState {
None,
Ping(Timeout),
Close(Timeout),
enum WaitingFor {
SendPing,
Pong,
Close,
}

enum CloseState<T> {
Expand Down Expand Up @@ -447,8 +446,8 @@ impl PingManager {
// management and message shuffling.
let socket = RcObject::new(WebpushSocket::new(socket));
Ok(PingManager {
ping_interval: Interval::new(srv.opts.auto_ping_interval, &srv.handle)?,
timeout: TimeoutState::None,
timeout: Timeout::new(srv.opts.auto_ping_interval, &srv.handle)?,
waiting: WaitingFor::SendPing,
socket: socket.clone(),
client: CloseState::Exchange(Client::new(socket, srv)),
srv: srv.clone(),
Expand All @@ -461,82 +460,91 @@ impl Future for PingManager {
type Error = Error;

fn poll(&mut self) -> Poll<(), Error> {
// If it's time for us to send a ping, then queue up a ping to get sent
// and start the clock for that ping to time out once the ping is
// actually sent out on the socket.
while let Async::Ready(_) = self.ping_interval.poll()? {
match self.timeout {
TimeoutState::None => {}
_ => continue,
}
debug!("scheduling a ping to be sent");
self.socket.borrow_mut().ping = true;
}
{
let mut socket = self.socket.borrow_mut();
if socket.ping && socket.send_ping()?.is_ready() {
let timeout = Timeout::new(self.srv.opts.auto_ping_timeout,
&self.srv.handle)?;
self.timeout = TimeoutState::Ping(timeout);
let mut socket = self.socket.borrow_mut();
loop {
if socket.ping {
if socket.send_ping()?.is_ready() {
let at = Instant::now() + self.srv.opts.auto_ping_timeout;
self.timeout.reset(at);
self.waiting = WaitingFor::Pong;
} else {
break
}
}
}

// If the client takes too long to respond to our websocket ping or too
// long to execute the closing handshake then we terminate the whole
// connection.
match self.timeout {
TimeoutState::None => {}
TimeoutState::Close(ref mut timeout) => {
if timeout.poll()?.is_ready() {
if let CloseState::Exchange(ref mut client) = self.client {
client.shutdown();
assert!(!socket.ping);
match self.waiting {
WaitingFor::SendPing => {
assert!(!socket.pong_timeout);
assert!(!socket.pong_received);
match self.timeout.poll()? {
Async::Ready(()) => {
debug!("scheduling a ping to get sent");
socket.ping = true;
}
Async::NotReady => break,
}
return Err("close handshake took too long".into())
}
}
TimeoutState::Ping(ref mut timeout) => {
if timeout.poll()?.is_ready() {
// We may not actually be reading messages from the
// websocket right now, could have been waiting on something
// else. Instead of immediately returning an error here wait
// for the stream to return `NotReady` when looking for
// messages, as then we're extra sure that no pong was
// received after this timeout elapsed.
debug!("ping timeout fired, scheduling error to maybe happen");
self.socket.borrow_mut().pong_timeout = true;
// `timeout` is cleared in the clause below
WaitingFor::Pong => {
if socket.pong_received {
// If we received a pong, then switch us back to waiting
// to send out a ping
debug!("pong received, going back to sending a ping");
assert!(!socket.pong_timeout);
let at = Instant::now() + self.srv.opts.auto_ping_interval;
self.timeout.reset(at);
self.waiting = WaitingFor::SendPing;
socket.pong_received = false;
} else if socket.pong_timeout {
// If our socket is waiting to deliver a pong timeout,
// then no need to keep checking the timer and we can
// keep going
debug!("waiting for socket to see pong timed out");
break
} else if self.timeout.poll()?.is_ready() {
// We may not actually be reading messages from the
// websocket right now, could have been waiting on
// something else. Instead of immediately returning an
// error here wait for the stream to return `NotReady`
// when looking for messages, as then we're extra sure
// that no pong was received after this timeout elapsed.
debug!("waited too long for a pong");
socket.pong_timeout = true;
} else {
break
}
}
WaitingFor::Close => {
assert!(!socket.pong_timeout);
if self.timeout.poll()?.is_ready() {
if let CloseState::Exchange(ref mut client) = self.client {
client.shutdown();
}
return Err("close handshake took too long".into())
}
}
}
}

// Received pongs will clear our ping timeout, but not the close
// timeout.
if let TimeoutState::Ping(_) = self.timeout {
let mut socket = self.socket.borrow_mut();
if socket.poll_pong().is_ready() || socket.pong_timeout {
debug!("clearing ping timeout");
self.timeout = TimeoutState::None;
}
}

// Be sure to always flush out any buffered messages/pings
self.socket.borrow_mut().poll_complete().chain_err(|| {
socket.poll_complete().chain_err(|| {
"failed routine `poll_complete` call"
})?;
drop(socket);

// At this point looks our state of ping management A-OK, so try to
// make progress on our client, and when done with that execute the
// closing handshake.
loop {
match self.client {
CloseState::Exchange(ref mut client) => try_ready!(client.poll()),
CloseState::Closing => return Ok(self.socket.close()?),
CloseState::Closing => return Ok(self.socket.borrow_mut().close()?),
}

self.client = CloseState::Closing;
if let Some(dur) = self.srv.opts.close_handshake_timeout {
let timeout = Timeout::new(dur, &self.srv.handle)?;
self.timeout = TimeoutState::Close(timeout);
let at = Instant::now() + dur;
self.timeout.reset(at);
self.waiting = WaitingFor::Close;
}
}
}
Expand All @@ -546,38 +554,21 @@ impl Future for PingManager {
// `ClientMessage` and `ServerMessage`.
struct WebpushSocket<T> {
inner: T,
pong: Pong,
pong_received: bool,
ping: bool,
pong_timeout: bool,
}

enum Pong {
None,
Received,
Waiting(Task),
}

impl<T> WebpushSocket<T> {
fn new(t: T) -> WebpushSocket<T> {
WebpushSocket {
inner: t,
pong: Pong::None,
pong_received: false,
ping: false,
pong_timeout: false,
}
}

fn poll_pong(&mut self) -> Async<()> {
match mem::replace(&mut self.pong, Pong::None) {
Pong::None => {}
Pong::Received => return Async::Ready(()),
Pong::Waiting(_) => {}
}
debug!("waiting for a pong");
self.pong = Pong::Waiting(task::current());
Async::NotReady
}

fn send_ping(&mut self) -> Poll<(), Error>
where T: Sink<SinkItem = Message>, Error: From<T::SinkError>
{
Expand Down Expand Up @@ -635,17 +626,12 @@ impl<T> Stream for WebpushSocket<T>
// the next message
Message::Ping(_) => {}

// Wake up tasks waiting for a pong, if any.
// Wake up ourselves to ensure the above ping logic eventually
// sees this pong.
Message::Pong(_) => {
self.pong_received = true;
self.pong_timeout = false;
match mem::replace(&mut self.pong, Pong::Received) {
Pong::None => {}
Pong::Received => {}
Pong::Waiting(task) => {
debug!("notifying a task of a pong");
task.notify();
}
}
task::current().notify();
}
}
}
Expand Down

0 comments on commit 2a1ab5d

Please sign in to comment.