Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support webpush API pings #56

Merged
merged 1 commit into from
Aug 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ pub struct WebPushClient {
unacked_stored_highest: Option<u64>,
connected_at: u64,
sent_from_storage: u32,
last_ping: u64,
stats: SessionStatistics,
}

Expand All @@ -177,6 +178,7 @@ impl Default for WebPushClient {
unacked_stored_highest: Default::default(),
connected_at: Default::default(),
sent_from_storage: Default::default(),
last_ping: Default::default(),
stats: Default::default(),
}
}
Expand Down Expand Up @@ -855,6 +857,21 @@ where
transition!(DetermineAck { data });
}
}
Either::A(ClientMessage::Ping) => {
// Clients shouldn't ping > than once per minute or we
// disconnect them
if sec_since_epoch() - webpush.last_ping >= 55 {
debug!("Got a ping, sending pong");
webpush.last_ping = sec_since_epoch();
transition!(Send {
smessages: vec![ServerMessage::Ping],
data
})
} else {
debug!("Got a ping too quickly, disconnecting");
Err("code=4774".into())
}
}
Either::B(ServerNotification::Notification(notif)) => {
if notif.ttl != 0 {
webpush.unacked_direct_notifs.push(notif.clone());
Expand Down
30 changes: 29 additions & 1 deletion src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
//! are used to generate the ability to serialize these structures to JSON,
//! using the `serde` crate. More docs for serde can be found at
//! https://serde.rs

use std::collections::HashMap;
use std::str::FromStr;

use serde_json;
use uuid::Uuid;

use util::ms_since_epoch;
Expand Down Expand Up @@ -62,6 +64,19 @@ pub enum ClientMessage {
code: Option<i32>,
version: String,
},

Ping,
}

impl FromStr for ClientMessage {
type Err = serde_json::error::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
// parse empty object "{}" as a Ping
serde_json::from_str::<HashMap<(), ()>>(s)
.map(|_| ClientMessage::Ping)
.or_else(|_| serde_json::from_str(s))
}
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -101,6 +116,19 @@ pub enum ServerMessage {
},

Notification(Notification),

Ping,
}

impl ServerMessage {
pub fn to_json(&self) -> Result<String, serde_json::error::Error> {
match self {
// clients recognize {"messageType": "ping"} but traditionally both
// client/server send the empty object version
ServerMessage::Ping => Ok("{}".to_owned()),
_ => serde_json::to_string(self),
}
}
}

#[derive(Serialize, Default, Deserialize, Clone, Debug)]
Expand Down
74 changes: 37 additions & 37 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ impl Future for PingManager {
fn poll(&mut self) -> Poll<(), Error> {
let mut socket = self.socket.borrow_mut();
loop {
if socket.ping {
if socket.ws_ping {
// Don't check if we already have a delta to broadcast
if socket.broadcast_delta.is_none() {
// Determine if we can do a broadcast check, we need a connected webpush client
Expand All @@ -696,7 +696,7 @@ impl Future for PingManager {
}
}

if socket.send_ping()?.is_ready() {
if socket.send_ws_ping()?.is_ready() {
// If we just sent a broadcast, reset the ping interval and clear the delta
if socket.broadcast_delta.is_some() {
let at = Instant::now() + self.srv.opts.auto_ping_interval;
Expand All @@ -712,34 +712,34 @@ impl Future for PingManager {
break;
}
}
debug_assert!(!socket.ping);
debug_assert!(!socket.ws_ping);
match self.waiting {
WaitingFor::SendPing => {
debug_assert!(!socket.pong_timeout);
debug_assert!(!socket.pong_received);
debug_assert!(!socket.ws_pong_timeout);
debug_assert!(!socket.ws_pong_received);
match self.timeout.poll()? {
Async::Ready(()) => {
debug!("scheduling a ping to get sent");
socket.ping = true;
debug!("scheduling a ws ping to get sent");
socket.ws_ping = true;
}
Async::NotReady => break,
}
}
WaitingFor::Pong => {
if socket.pong_received {
if socket.ws_pong_received {
// If we received a pong, then switch us back to waiting
// to send out a ping
debug!("pong received, going back to sending a ping");
debug_assert!(!socket.pong_timeout);
debug!("ws pong received, going back to sending a ping");
debug_assert!(!socket.ws_pong_timeout);
let at = Instant::now() + self.srv.opts.auto_ping_interval;
self.timeout.reset(at);
self.waiting = WaitingFor::SendPing;
socket.pong_received = false;
} else if socket.pong_timeout {
socket.ws_pong_received = false;
} else if socket.ws_pong_timeout {
// If our socket is waiting to deliver a pong timeout,
// then no need to keep checking the timer and we can
// keep going
debug!("waiting for socket to see pong timed out");
debug!("waiting for socket to see ws pong timed out");
break;
} else if self.timeout.poll()?.is_ready() {
// We may not actually be reading messages from the
Expand All @@ -748,14 +748,14 @@ impl Future for PingManager {
// error here wait for the stream to return `NotReady`
// when looking for messages, as then we're extra sure
// that no pong was received after this timeout elapsed.
debug!("waited too long for a pong");
socket.pong_timeout = true;
debug!("waited too long for a ws pong");
socket.ws_pong_timeout = true;
} else {
break;
}
}
WaitingFor::Close => {
debug_assert!(!socket.pong_timeout);
debug_assert!(!socket.ws_pong_timeout);
if self.timeout.poll()?.is_ready() {
if let CloseState::Exchange(ref mut client) = self.client {
client.shutdown();
Expand Down Expand Up @@ -796,47 +796,47 @@ impl Future for PingManager {
// `ClientMessage` and `ServerMessage`.
struct WebpushSocket<T> {
inner: T,
pong_received: bool,
ping: bool,
pong_timeout: bool,
ws_pong_received: bool,
ws_ping: bool,
ws_pong_timeout: bool,
broadcast_delta: Option<Vec<Broadcast>>,
}

impl<T> WebpushSocket<T> {
fn new(t: T) -> WebpushSocket<T> {
WebpushSocket {
inner: t,
pong_received: false,
ping: false,
pong_timeout: false,
ws_pong_received: false,
ws_ping: false,
ws_pong_timeout: false,
broadcast_delta: None,
}
}

fn send_ping(&mut self) -> Poll<(), Error>
fn send_ws_ping(&mut self) -> Poll<(), Error>
where
T: Sink<SinkItem = Message>,
Error: From<T::SinkError>,
{
if self.ping {
if self.ws_ping {
let msg = if let Some(broadcasts) = self.broadcast_delta.clone() {
debug!("sending a broadcast delta");
let server_msg = ServerMessage::Broadcast {
broadcasts: Broadcast::into_hashmap(broadcasts),
};
let s = serde_json::to_string(&server_msg).chain_err(|| "failed to serialize")?;
let s = server_msg.to_json().chain_err(|| "failed to serialize")?;
Message::Text(s)
} else {
debug!("sending a ping");
debug!("sending a ws ping");
Message::Ping(Vec::new())
};
match self.inner.start_send(msg)? {
AsyncSink::Ready => {
debug!("ping sent");
self.ping = false;
debug!("ws ping sent");
self.ws_ping = false;
}
AsyncSink::NotReady(_) => {
debug!("ping not ready to be sent");
debug!("ws ping not ready to be sent");
return Ok(Async::NotReady);
}
}
Expand All @@ -862,16 +862,16 @@ where
// If we don't have any more messages and our pong timeout
// elapsed (set above) then this is where we start
// triggering errors.
if self.pong_timeout {
return Err("failed to receive a pong in time".into());
if self.ws_pong_timeout {
return Err("failed to receive a ws pong in time".into());
}
return Ok(Async::NotReady);
}
};
match msg {
Message::Text(ref s) => {
trace!("text message {}", s);
let msg = serde_json::from_str(s).chain_err(|| "invalid json text")?;
let msg = s.parse().chain_err(|| "invalid json text")?;
return Ok(Some(msg).into());
}

Expand All @@ -884,8 +884,8 @@ where
// Wake up ourselves to ensure the above ping logic eventually
// sees this pong.
Message::Pong(_) => {
self.pong_received = true;
self.pong_timeout = false;
self.ws_pong_received = true;
self.ws_pong_timeout = false;
task::current().notify();
}
}
Expand All @@ -902,18 +902,18 @@ where
type SinkError = Error;

fn start_send(&mut self, msg: ServerMessage) -> StartSend<ServerMessage, Error> {
if self.send_ping()?.is_not_ready() {
if self.send_ws_ping()?.is_not_ready() {
return Ok(AsyncSink::NotReady(msg));
}
let s = serde_json::to_string(&msg).chain_err(|| "failed to serialize")?;
let s = msg.to_json().chain_err(|| "failed to serialize")?;
match self.inner.start_send(Message::Text(s))? {
AsyncSink::Ready => Ok(AsyncSink::Ready),
AsyncSink::NotReady(_) => Ok(AsyncSink::NotReady(msg)),
}
}

fn poll_complete(&mut self) -> Poll<(), Error> {
try_ready!(self.send_ping());
try_ready!(self.send_ws_ping());
Ok(self.inner.poll_complete()?)
}

Expand Down
14 changes: 14 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,20 @@ def test_msg_limit(self):
assert client.uaid != uaid
yield self.shut_down(client)

@inlineCallbacks
def test_can_ping(self):
client = yield self.quick_register()
yield client.ping()
assert client.ws.connected
try:
yield client.ping()
except AssertionError:
# pinging too quickly should disconnect without a valid ping
# repsonse
pass
assert not client.ws.connected
yield self.shut_down(client)


class TestRustWebPushBroadcast(unittest.TestCase):
_endpoint_defaults = dict(
Expand Down