diff --git a/autopush_rs/Cargo.lock b/autopush_rs/Cargo.lock index 9cf57255..852fc3b1 100644 --- a/autopush_rs/Cargo.lock +++ b/autopush_rs/Cargo.lock @@ -5,7 +5,7 @@ dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.2 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", @@ -123,7 +123,7 @@ dependencies = [ [[package]] name = "futures" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -131,7 +131,7 @@ name = "futures-cpupool" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.6.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -152,7 +152,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "base64 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -446,7 +446,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -461,7 +461,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -470,7 +470,7 @@ name = "tokio-proto" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", @@ -487,7 +487,7 @@ name = "tokio-service" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -495,7 +495,7 @@ name = "tokio-tungstenite" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "tungstenite 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -603,7 +603,7 @@ dependencies = [ "checksum dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "80c8b71fd71146990a9742fc06dcbbde19161a267e0ad4e572c35162f4578c90" "checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b" "checksum error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" -"checksum futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "4b63a4792d4f8f686defe3b39b92127fea6344de5d38202b2ee5a11bbbf29d6a" +"checksum futures 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "a82bdc62350ca9d7974c760e9665102fc9d740992a528c2254aa930e53b783c4" "checksum futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a283c84501e92cade5ea673a2a7ca44f71f209ccdd302a3e0896f50083d2c5ff" "checksum gcc 0.3.53 (registry+https://github.com/rust-lang/crates.io-index)" = "e8310f7e9c890398b0e80e301c4f474e9918d2b27fca8f48486ca775fa9ffc5a" "checksum httparse 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "af2f2dd97457e8fb1ae7c5a420db346af389926e36f43768b96f101546b04a07" diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index 5c4b549a..878f3141 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -2,23 +2,22 @@ use std::cell::{Cell, RefCell}; use std::collections::HashMap; use std::ffi::CStr; use std::io; -use std::mem; use std::net::{IpAddr, ToSocketAddrs}; use std::panic; use std::path::PathBuf; use std::rc::Rc; use std::sync::Arc; use std::thread; -use std::time::Duration; +use std::time::{Instant, Duration}; use futures::sync::oneshot; -use futures::task::{self, Task}; +use futures::task; use futures::{Stream, Future, Sink, Async, Poll, AsyncSink, StartSend}; use libc::c_char; use serde_json; use time; use tokio_core::net::TcpListener; -use tokio_core::reactor::{Core, Timeout, Handle, Interval}; +use tokio_core::reactor::{Core, Timeout, Handle}; use tokio_io; use tokio_tungstenite::{accept_async, WebSocketStream}; use tungstenite::Message; @@ -386,7 +385,7 @@ impl Server { if let Some(client) = uaids.get_mut(&uaid) { debug!("Found a client to deliver a notification to"); // TODO: Don't unwrap, handle error properly - (&client.tx).send(ServerNotification::Notification(notif)).unwrap(); + client.tx.unbounded_send(ServerNotification::Notification(notif)).unwrap(); info!("Dropped notification in queue"); return Ok(()); } @@ -410,16 +409,16 @@ impl Drop for Server { struct PingManager { socket: RcObject>>, - ping_interval: Interval, - timeout: TimeoutState, + timeout: Timeout, + waiting: WaitingFor, srv: Rc, client: CloseState>>>>, } -enum TimeoutState { - None, - Ping(Timeout), - Close(Timeout), +enum WaitingFor { + SendPing, + Pong, + Close, } enum CloseState { @@ -447,8 +446,8 @@ impl PingManager { // management and message shuffling. let socket = RcObject::new(WebpushSocket::new(socket)); Ok(PingManager { - ping_interval: Interval::new(srv.opts.auto_ping_interval, &srv.handle)?, - timeout: TimeoutState::None, + timeout: Timeout::new(srv.opts.auto_ping_interval, &srv.handle)?, + waiting: WaitingFor::SendPing, socket: socket.clone(), client: CloseState::Exchange(Client::new(socket, srv)), srv: srv.clone(), @@ -461,68 +460,76 @@ impl Future for PingManager { type Error = Error; fn poll(&mut self) -> Poll<(), Error> { - // If it's time for us to send a ping, then queue up a ping to get sent - // and start the clock for that ping to time out once the ping is - // actually sent out on the socket. - while let Async::Ready(_) = self.ping_interval.poll()? { - match self.timeout { - TimeoutState::None => {} - _ => continue, - } - debug!("scheduling a ping to be sent"); - self.socket.borrow_mut().ping = true; - } - { - let mut socket = self.socket.borrow_mut(); - if socket.ping && socket.send_ping()?.is_ready() { - let timeout = Timeout::new(self.srv.opts.auto_ping_timeout, - &self.srv.handle)?; - self.timeout = TimeoutState::Ping(timeout); + let mut socket = self.socket.borrow_mut(); + loop { + if socket.ping { + if socket.send_ping()?.is_ready() { + let at = Instant::now() + self.srv.opts.auto_ping_timeout; + self.timeout.reset(at); + self.waiting = WaitingFor::Pong; + } else { + break + } } - } - - // If the client takes too long to respond to our websocket ping or too - // long to execute the closing handshake then we terminate the whole - // connection. - match self.timeout { - TimeoutState::None => {} - TimeoutState::Close(ref mut timeout) => { - if timeout.poll()?.is_ready() { - if let CloseState::Exchange(ref mut client) = self.client { - client.shutdown(); + assert!(!socket.ping); + match self.waiting { + WaitingFor::SendPing => { + assert!(!socket.pong_timeout); + assert!(!socket.pong_received); + match self.timeout.poll()? { + Async::Ready(()) => { + debug!("scheduling a ping to get sent"); + socket.ping = true; + } + Async::NotReady => break, } - return Err("close handshake took too long".into()) } - } - TimeoutState::Ping(ref mut timeout) => { - if timeout.poll()?.is_ready() { - // We may not actually be reading messages from the - // websocket right now, could have been waiting on something - // else. Instead of immediately returning an error here wait - // for the stream to return `NotReady` when looking for - // messages, as then we're extra sure that no pong was - // received after this timeout elapsed. - debug!("ping timeout fired, scheduling error to maybe happen"); - self.socket.borrow_mut().pong_timeout = true; - // `timeout` is cleared in the clause below + WaitingFor::Pong => { + if socket.pong_received { + // If we received a pong, then switch us back to waiting + // to send out a ping + debug!("pong received, going back to sending a ping"); + assert!(!socket.pong_timeout); + let at = Instant::now() + self.srv.opts.auto_ping_interval; + self.timeout.reset(at); + self.waiting = WaitingFor::SendPing; + socket.pong_received = false; + } else if socket.pong_timeout { + // If our socket is waiting to deliver a pong timeout, + // then no need to keep checking the timer and we can + // keep going + debug!("waiting for socket to see pong timed out"); + break + } else if self.timeout.poll()?.is_ready() { + // We may not actually be reading messages from the + // websocket right now, could have been waiting on + // something else. Instead of immediately returning an + // error here wait for the stream to return `NotReady` + // when looking for messages, as then we're extra sure + // that no pong was received after this timeout elapsed. + debug!("waited too long for a pong"); + socket.pong_timeout = true; + } else { + break + } + } + WaitingFor::Close => { + assert!(!socket.pong_timeout); + if self.timeout.poll()?.is_ready() { + if let CloseState::Exchange(ref mut client) = self.client { + client.shutdown(); + } + return Err("close handshake took too long".into()) + } } - } - } - - // Received pongs will clear our ping timeout, but not the close - // timeout. - if let TimeoutState::Ping(_) = self.timeout { - let mut socket = self.socket.borrow_mut(); - if socket.poll_pong().is_ready() || socket.pong_timeout { - debug!("clearing ping timeout"); - self.timeout = TimeoutState::None; } } // Be sure to always flush out any buffered messages/pings - self.socket.borrow_mut().poll_complete().chain_err(|| { + socket.poll_complete().chain_err(|| { "failed routine `poll_complete` call" })?; + drop(socket); // At this point looks our state of ping management A-OK, so try to // make progress on our client, and when done with that execute the @@ -530,13 +537,14 @@ impl Future for PingManager { loop { match self.client { CloseState::Exchange(ref mut client) => try_ready!(client.poll()), - CloseState::Closing => return Ok(self.socket.close()?), + CloseState::Closing => return Ok(self.socket.borrow_mut().close()?), } self.client = CloseState::Closing; if let Some(dur) = self.srv.opts.close_handshake_timeout { - let timeout = Timeout::new(dur, &self.srv.handle)?; - self.timeout = TimeoutState::Close(timeout); + let at = Instant::now() + dur; + self.timeout.reset(at); + self.waiting = WaitingFor::Close; } } } @@ -546,38 +554,21 @@ impl Future for PingManager { // `ClientMessage` and `ServerMessage`. struct WebpushSocket { inner: T, - pong: Pong, + pong_received: bool, ping: bool, pong_timeout: bool, } -enum Pong { - None, - Received, - Waiting(Task), -} - impl WebpushSocket { fn new(t: T) -> WebpushSocket { WebpushSocket { inner: t, - pong: Pong::None, + pong_received: false, ping: false, pong_timeout: false, } } - fn poll_pong(&mut self) -> Async<()> { - match mem::replace(&mut self.pong, Pong::None) { - Pong::None => {} - Pong::Received => return Async::Ready(()), - Pong::Waiting(_) => {} - } - debug!("waiting for a pong"); - self.pong = Pong::Waiting(task::current()); - Async::NotReady - } - fn send_ping(&mut self) -> Poll<(), Error> where T: Sink, Error: From { @@ -635,17 +626,12 @@ impl Stream for WebpushSocket // the next message Message::Ping(_) => {} - // Wake up tasks waiting for a pong, if any. + // Wake up ourselves to ensure the above ping logic eventually + // sees this pong. Message::Pong(_) => { + self.pong_received = true; self.pong_timeout = false; - match mem::replace(&mut self.pong, Pong::Received) { - Pong::None => {} - Pong::Received => {} - Pong::Waiting(task) => { - debug!("notifying a task of a pong"); - task.notify(); - } - } + task::current().notify(); } } }