From 2366efe4e1132aec66e0992e98616084937c49e7 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Fri, 3 Aug 2018 13:57:28 -0700 Subject: [PATCH] feat: support webpush API pings Closes #55 --- src/client.rs | 17 +++++++++ src/protocol.rs | 30 +++++++++++++++- src/server/mod.rs | 74 +++++++++++++++++++-------------------- tests/test_integration.py | 14 ++++++++ 4 files changed, 97 insertions(+), 38 deletions(-) diff --git a/src/client.rs b/src/client.rs index 0882af1d6..9796dccfb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -160,6 +160,7 @@ pub struct WebPushClient { unacked_stored_highest: Option, connected_at: u64, sent_from_storage: u32, + last_ping: u64, stats: SessionStatistics, } @@ -177,6 +178,7 @@ impl Default for WebPushClient { unacked_stored_highest: Default::default(), connected_at: Default::default(), sent_from_storage: Default::default(), + last_ping: Default::default(), stats: Default::default(), } } @@ -855,6 +857,21 @@ where transition!(DetermineAck { data }); } } + Either::A(ClientMessage::Ping) => { + // Clients shouldn't ping > than once per minute or we + // disconnect them + if sec_since_epoch() - webpush.last_ping >= 55 { + debug!("Got a ping, sending pong"); + webpush.last_ping = sec_since_epoch(); + transition!(Send { + smessages: vec![ServerMessage::Ping], + data + }) + } else { + debug!("Got a ping too quickly, disconnecting"); + Err("code=4774".into()) + } + } Either::B(ServerNotification::Notification(notif)) => { if notif.ttl != 0 { webpush.unacked_direct_notifs.push(notif.clone()); diff --git a/src/protocol.rs b/src/protocol.rs index 5f4e4611d..368f4e6ab 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -6,8 +6,10 @@ //! are used to generate the ability to serialize these structures to JSON, //! using the `serde` crate. More docs for serde can be found at //! https://serde.rs - use std::collections::HashMap; +use std::str::FromStr; + +use serde_json; use uuid::Uuid; use util::ms_since_epoch; @@ -62,6 +64,19 @@ pub enum ClientMessage { code: Option, version: String, }, + + Ping, +} + +impl FromStr for ClientMessage { + type Err = serde_json::error::Error; + + fn from_str(s: &str) -> Result { + // parse empty object "{}" as a Ping + serde_json::from_str::>(s) + .map(|_| ClientMessage::Ping) + .or_else(|_| serde_json::from_str(s)) + } } #[derive(Deserialize)] @@ -101,6 +116,19 @@ pub enum ServerMessage { }, Notification(Notification), + + Ping, +} + +impl ServerMessage { + pub fn to_json(&self) -> Result { + match self { + // clients recognize {"messageType": "ping"} but traditionally both + // client/server send the empty object version + ServerMessage::Ping => Ok("{}".to_owned()), + _ => serde_json::to_string(self), + } + } } #[derive(Serialize, Default, Deserialize, Clone, Debug)] diff --git a/src/server/mod.rs b/src/server/mod.rs index 6fec9ed5f..1594a3b24 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -685,7 +685,7 @@ impl Future for PingManager { fn poll(&mut self) -> Poll<(), Error> { let mut socket = self.socket.borrow_mut(); loop { - if socket.ping { + if socket.ws_ping { // Don't check if we already have a delta to broadcast if socket.broadcast_delta.is_none() { // Determine if we can do a broadcast check, we need a connected webpush client @@ -696,7 +696,7 @@ impl Future for PingManager { } } - if socket.send_ping()?.is_ready() { + if socket.send_ws_ping()?.is_ready() { // If we just sent a broadcast, reset the ping interval and clear the delta if socket.broadcast_delta.is_some() { let at = Instant::now() + self.srv.opts.auto_ping_interval; @@ -712,34 +712,34 @@ impl Future for PingManager { break; } } - debug_assert!(!socket.ping); + debug_assert!(!socket.ws_ping); match self.waiting { WaitingFor::SendPing => { - debug_assert!(!socket.pong_timeout); - debug_assert!(!socket.pong_received); + debug_assert!(!socket.ws_pong_timeout); + debug_assert!(!socket.ws_pong_received); match self.timeout.poll()? { Async::Ready(()) => { - debug!("scheduling a ping to get sent"); - socket.ping = true; + debug!("scheduling a ws ping to get sent"); + socket.ws_ping = true; } Async::NotReady => break, } } WaitingFor::Pong => { - if socket.pong_received { + if socket.ws_pong_received { // If we received a pong, then switch us back to waiting // to send out a ping - debug!("pong received, going back to sending a ping"); - debug_assert!(!socket.pong_timeout); + debug!("ws pong received, going back to sending a ping"); + debug_assert!(!socket.ws_pong_timeout); let at = Instant::now() + self.srv.opts.auto_ping_interval; self.timeout.reset(at); self.waiting = WaitingFor::SendPing; - socket.pong_received = false; - } else if socket.pong_timeout { + socket.ws_pong_received = false; + } else if socket.ws_pong_timeout { // If our socket is waiting to deliver a pong timeout, // then no need to keep checking the timer and we can // keep going - debug!("waiting for socket to see pong timed out"); + debug!("waiting for socket to see ws pong timed out"); break; } else if self.timeout.poll()?.is_ready() { // We may not actually be reading messages from the @@ -748,14 +748,14 @@ impl Future for PingManager { // error here wait for the stream to return `NotReady` // when looking for messages, as then we're extra sure // that no pong was received after this timeout elapsed. - debug!("waited too long for a pong"); - socket.pong_timeout = true; + debug!("waited too long for a ws pong"); + socket.ws_pong_timeout = true; } else { break; } } WaitingFor::Close => { - debug_assert!(!socket.pong_timeout); + debug_assert!(!socket.ws_pong_timeout); if self.timeout.poll()?.is_ready() { if let CloseState::Exchange(ref mut client) = self.client { client.shutdown(); @@ -796,9 +796,9 @@ impl Future for PingManager { // `ClientMessage` and `ServerMessage`. struct WebpushSocket { inner: T, - pong_received: bool, - ping: bool, - pong_timeout: bool, + ws_pong_received: bool, + ws_ping: bool, + ws_pong_timeout: bool, broadcast_delta: Option>, } @@ -806,37 +806,37 @@ impl WebpushSocket { fn new(t: T) -> WebpushSocket { WebpushSocket { inner: t, - pong_received: false, - ping: false, - pong_timeout: false, + ws_pong_received: false, + ws_ping: false, + ws_pong_timeout: false, broadcast_delta: None, } } - fn send_ping(&mut self) -> Poll<(), Error> + fn send_ws_ping(&mut self) -> Poll<(), Error> where T: Sink, Error: From, { - if self.ping { + if self.ws_ping { let msg = if let Some(broadcasts) = self.broadcast_delta.clone() { debug!("sending a broadcast delta"); let server_msg = ServerMessage::Broadcast { broadcasts: Broadcast::into_hashmap(broadcasts), }; - let s = serde_json::to_string(&server_msg).chain_err(|| "failed to serialize")?; + let s = server_msg.to_json().chain_err(|| "failed to serialize")?; Message::Text(s) } else { - debug!("sending a ping"); + debug!("sending a ws ping"); Message::Ping(Vec::new()) }; match self.inner.start_send(msg)? { AsyncSink::Ready => { - debug!("ping sent"); - self.ping = false; + debug!("ws ping sent"); + self.ws_ping = false; } AsyncSink::NotReady(_) => { - debug!("ping not ready to be sent"); + debug!("ws ping not ready to be sent"); return Ok(Async::NotReady); } } @@ -862,8 +862,8 @@ where // If we don't have any more messages and our pong timeout // elapsed (set above) then this is where we start // triggering errors. - if self.pong_timeout { - return Err("failed to receive a pong in time".into()); + if self.ws_pong_timeout { + return Err("failed to receive a ws pong in time".into()); } return Ok(Async::NotReady); } @@ -871,7 +871,7 @@ where match msg { Message::Text(ref s) => { trace!("text message {}", s); - let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?; + let msg = s.parse().chain_err(|| "invalid json text")?; return Ok(Some(msg).into()); } @@ -884,8 +884,8 @@ where // Wake up ourselves to ensure the above ping logic eventually // sees this pong. Message::Pong(_) => { - self.pong_received = true; - self.pong_timeout = false; + self.ws_pong_received = true; + self.ws_pong_timeout = false; task::current().notify(); } } @@ -902,10 +902,10 @@ where type SinkError = Error; fn start_send(&mut self, msg: ServerMessage) -> StartSend { - if self.send_ping()?.is_not_ready() { + if self.send_ws_ping()?.is_not_ready() { return Ok(AsyncSink::NotReady(msg)); } - let s = serde_json::to_string(&msg).chain_err(|| "failed to serialize")?; + let s = msg.to_json().chain_err(|| "failed to serialize")?; match self.inner.start_send(Message::Text(s))? { AsyncSink::Ready => Ok(AsyncSink::Ready), AsyncSink::NotReady(_) => Ok(AsyncSink::NotReady(msg)), @@ -913,7 +913,7 @@ where } fn poll_complete(&mut self) -> Poll<(), Error> { - try_ready!(self.send_ping()); + try_ready!(self.send_ws_ping()); Ok(self.inner.poll_complete()?) } diff --git a/tests/test_integration.py b/tests/test_integration.py index be520b704..d862d0464 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -884,6 +884,20 @@ def test_msg_limit(self): assert client.uaid != uaid yield self.shut_down(client) + @inlineCallbacks + def test_can_ping(self): + client = yield self.quick_register() + yield client.ping() + assert client.ws.connected + try: + yield client.ping() + except AssertionError: + # pinging too quickly should disconnect without a valid ping + # repsonse + pass + assert not client.ws.connected + yield self.shut_down(client) + class TestRustWebPushBroadcast(unittest.TestCase): _endpoint_defaults = dict(