Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1152 from mozilla-services/feat/issue-1129.2
Browse files Browse the repository at this point in the history
feat: add megaphone broadcast handling, ping check, server tracking
  • Loading branch information
bbangert authored Mar 13, 2018
2 parents 1b0be54 + 1fe4eeb commit a98c045
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 20 deletions.
65 changes: 60 additions & 5 deletions autopush_rs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! of connected clients. Note that it's expected there'll be a lot of connected
//! clients, so this may appears relatively heavily optimized!
use std::collections::HashMap;
use std::rc::Rc;

use cadence::prelude::*;
Expand All @@ -23,6 +24,7 @@ use errors::*;
use protocol::{ClientAck, ClientMessage, ServerMessage, ServerNotification, Notification};
use server::Server;
use util::parse_user_agent;
use util::megaphone::{ClientServices, Service, ServiceClientInit};

pub struct RegisteredClient {
pub uaid: Uuid,
Expand Down Expand Up @@ -68,6 +70,7 @@ pub struct WebPushClient {
uaid: Uuid,
rx: mpsc::UnboundedReceiver<ServerNotification>,
flags: ClientFlags,
broadcast_services: ClientServices,
message_month: String,
unacked_direct_notifs: Vec<Notification>,
unacked_stored_notifs: Vec<Notification>,
Expand Down Expand Up @@ -120,7 +123,7 @@ impl ClientFlags {

pub enum ClientState {
WaitingForHello(Timeout),
WaitingForProcessHello(MyFuture<call::HelloResponse>),
WaitingForProcessHello(MyFuture<call::HelloResponse>, Vec<Service>),
WaitingForRegister(Uuid, MyFuture<call::RegisterResponse>),
WaitingForUnRegister(Uuid, MyFuture<call::UnRegisterResponse>),
WaitingForCheckStorage(MyFuture<call::CheckStorageResponse>),
Expand Down Expand Up @@ -187,6 +190,14 @@ where
self.data.shutdown();
}

pub fn broadcast_delta(&mut self) -> Option<Vec<Service>> {
if let Some(ref mut webpush) = self.data.webpush {
self.data.srv.broadcast_delta(&mut webpush.broadcast_services)
} else {
None
}
}

fn transition(&mut self) -> Poll<ClientState, Error> {
let host = self.data.host.clone();
let next_state = match self.state {
Expand Down Expand Up @@ -271,20 +282,25 @@ where
}
ClientState::WaitingForHello(ref mut timeout) => {
debug!("State: WaitingForHello");
let uaid = match try_ready!(self.data.input_with_timeout(timeout)) {
let (uaid, services) = match try_ready!(self.data.input_with_timeout(timeout)) {
ClientMessage::Hello {
uaid,
use_webpush: Some(true),
broadcasts,
..
} => uaid.and_then(|uaid| Uuid::parse_str(uaid.as_str()).ok()),
} => (
uaid.and_then(|uaid| Uuid::parse_str(uaid.as_str()).ok()),
Service::from_hashmap(broadcasts.unwrap_or(HashMap::new()))
),
_ => return Err("Invalid message, must be hello".into()),
};
let connected_at = time::precise_time_ns() / 1000;
ClientState::WaitingForProcessHello(
self.data.srv.hello(&connected_at, uaid.as_ref()),
services,
)
}
ClientState::WaitingForProcessHello(ref mut response) => {
ClientState::WaitingForProcessHello(ref mut response, ref services) => {
debug!("State: WaitingForProcessHello");
match try_ready!(response.poll()) {
call::HelloResponse {
Expand All @@ -302,6 +318,7 @@ where
rotate_message_table,
check_storage,
connected_at,
services,
)
}
call::HelloResponse { uaid: None, .. } => {
Expand Down Expand Up @@ -422,6 +439,23 @@ where
return Ok(next_state.into());
}
match try_ready!(self.data.input_or_notif()) {
Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => {
let webpush = self.data.webpush.as_mut().unwrap();
let service_delta = self.data.srv.client_service_add_service(
&mut webpush.broadcast_services,
&Service::from_hashmap(broadcasts),
);
if let Some(delta) = service_delta {
ClientState::FinishSend(
Some(ServerMessage::Broadcast {
broadcasts: Service::into_hashmap(delta)
}),
Some(Box::new(ClientState::WaitingForAcks)),
)
} else {
ClientState::WaitingForAcks
}
}
Either::A(ClientMessage::Register { channel_id, key }) => {
self.data.process_register(channel_id, key)
}
Expand Down Expand Up @@ -470,6 +504,23 @@ where
return Ok(ClientState::CheckStorage.into());
}
match try_ready!(self.data.input_or_notif()) {
Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => {
let webpush = self.data.webpush.as_mut().unwrap();
let service_delta = self.data.srv.client_service_add_service(
&mut webpush.broadcast_services,
&Service::from_hashmap(broadcasts),
);
if let Some(delta) = service_delta {
ClientState::FinishSend(
Some(ServerMessage::Broadcast {
broadcasts: Service::into_hashmap(delta)
}),
Some(Box::new(ClientState::Await)),
)
} else {
ClientState::Await
}
}
Either::A(ClientMessage::Register { channel_id, key }) => {
self.data.process_register(channel_id, key)
}
Expand All @@ -479,7 +530,7 @@ where
Either::A(ClientMessage::Nack { .. }) => {
self.data.srv.metrics.incr("ua.command.nack").ok();
self.data.webpush.as_mut().unwrap().stats.nacks += 1;
ClientState::WaitingForAcks
ClientState::Await
}
Either::B(ServerNotification::Notification(notif)) => {
let webpush = self.data.webpush.as_mut().unwrap();
Expand Down Expand Up @@ -570,15 +621,18 @@ where
rotate_message_table: bool,
check_storage: bool,
connected_at: u64,
services: &Vec<Service>,
) -> ClientState {
let (tx, rx) = mpsc::unbounded();
let mut flags = ClientFlags::new();
flags.check = check_storage;
flags.reset_uaid = reset_uaid;
flags.rotate_message_table = rotate_message_table;

let ServiceClientInit(client_services, broadcasts) = self.srv.broadcast_init(services);
self.webpush = Some(WebPushClient {
uaid,
broadcast_services: client_services,
flags,
rx,
message_month,
Expand Down Expand Up @@ -608,6 +662,7 @@ where
uaid: uaid.hyphenated().to_string(),
status: 200,
use_webpush: Some(true),
broadcasts: Service::into_hashmap(broadcasts),
};
ClientState::FinishSend(Some(response), Some(Box::new(ClientState::Await)))
}
Expand Down
15 changes: 13 additions & 2 deletions autopush_rs/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ pub enum ServerNotification {
}

#[derive(Deserialize)]
#[serde(tag = "messageType", rename_all = "lowercase")]
#[serde(tag = "messageType", rename_all = "snake_case")]
pub enum ClientMessage {
Hello {
uaid: Option<String>,
#[serde(rename = "channelIDs", skip_serializing_if = "Option::is_none")]
channel_ids: Option<Vec<Uuid>>,
#[serde(skip_serializing_if = "Option::is_none")]
use_webpush: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
broadcasts: Option<HashMap<String, String>>,
},

Register {
Expand All @@ -39,6 +41,10 @@ pub enum ClientMessage {
code: Option<i32>,
},

BroadcastSubscribe {
broadcasts: HashMap<String, String>,
},

Ack { updates: Vec<ClientAck> },

Nack {
Expand All @@ -56,13 +62,14 @@ pub struct ClientAck {
}

#[derive(Serialize)]
#[serde(tag = "messageType", rename_all = "lowercase")]
#[serde(tag = "messageType", rename_all = "snake_case")]
pub enum ServerMessage {
Hello {
uaid: String,
status: u32,
#[serde(skip_serializing_if = "Option::is_none")]
use_webpush: Option<bool>,
broadcasts: HashMap<String, String>,
},

Register {
Expand All @@ -79,6 +86,10 @@ pub enum ServerMessage {
status: u32,
},

Broadcast {
broadcasts: HashMap<String, String>,
},

Notification(Notification),
}

Expand Down
65 changes: 60 additions & 5 deletions autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use server::dispatch::{Dispatch, RequestType};
use server::metrics::metrics_from_opts;
use server::webpush_io::WebpushIo;
use util::{self, RcObject, timeout};
use util::megaphone::{ClientServices, Service, ServiceClientInit, ServiceChangeTracker};

mod dispatch;
mod metrics;
Expand Down Expand Up @@ -84,6 +85,7 @@ pub struct AutopushServerOptions {

pub struct Server {
uaids: RefCell<HashMap<Uuid, RegisteredClient>>,
broadcaster: ServiceChangeTracker,
open_connections: Cell<u32>,
tls_acceptor: Option<SslAcceptor>,
pub tx: queue::Sender,
Expand Down Expand Up @@ -317,6 +319,7 @@ impl Server {
let core = Core::new()?;
let srv = Rc::new(Server {
opts: opts.clone(),
broadcaster: ServiceChangeTracker::new(Vec::new()),
uaids: RefCell::new(HashMap::new()),
open_connections: Cell::new(0),
handle: core.handle(),
Expand Down Expand Up @@ -478,6 +481,26 @@ impl Server {
let mut uaids = self.uaids.borrow_mut();
uaids.remove(uaid).expect("uaid not registered");
}

/// Generate a new service client list for a newly connected client
pub fn broadcast_init(&self, services: &[Service]) -> ServiceClientInit {
debug!("Initialized broadcast services");
self.broadcaster.service_delta(services)
}

/// Calculate whether there's new service versions to go out
pub fn broadcast_delta(&self, client_services: &mut ClientServices) -> Option<Vec<Service>> {
self.broadcaster.change_count_delta(client_services)
}

/// Add services to be tracked by a client
pub fn client_service_add_service(
&self,
client_services: &mut ClientServices,
services: &[Service],
) -> Option<Vec<Service>> {
self.broadcaster.client_service_add_service(client_services, services)
}
}

impl Drop for Server {
Expand Down Expand Up @@ -546,10 +569,28 @@ impl Future for PingManager {
let mut socket = self.socket.borrow_mut();
loop {
if socket.ping {
// Don't check if we already have a delta to broadcast
if socket.broadcast_delta.is_none() {
// Determine if we can do a broadcast check, we need a connected webpush client
if let CloseState::Exchange(ref mut client) = self.client {
if let Some(delta) = client.broadcast_delta() {
socket.broadcast_delta = Some(delta);
}
}
}

if socket.send_ping()?.is_ready() {
let at = Instant::now() + self.srv.opts.auto_ping_timeout;
self.timeout.reset(at);
self.waiting = WaitingFor::Pong;
// If we just sent a broadcast, reset the ping interval and clear the delta
if socket.broadcast_delta.is_some() {
let at = Instant::now() + self.srv.opts.auto_ping_interval;
self.timeout.reset(at);
socket.broadcast_delta = None;
self.waiting = WaitingFor::SendPing
} else {
let at = Instant::now() + self.srv.opts.auto_ping_timeout;
self.timeout.reset(at);
self.waiting = WaitingFor::Pong
}
} else {
break;
}
Expand Down Expand Up @@ -641,6 +682,7 @@ struct WebpushSocket<T> {
pong_received: bool,
ping: bool,
pong_timeout: bool,
broadcast_delta: Option<Vec<Service>>,
}

impl<T> WebpushSocket<T> {
Expand All @@ -650,6 +692,7 @@ impl<T> WebpushSocket<T> {
pong_received: false,
ping: false,
pong_timeout: false,
broadcast_delta: None,
}
}

Expand All @@ -659,8 +702,20 @@ impl<T> WebpushSocket<T> {
Error: From<T::SinkError>,
{
if self.ping {
debug!("sending a ping");
match self.inner.start_send(Message::Ping(Vec::new()))? {
let msg = if let Some(broadcasts) = self.broadcast_delta.clone() {
debug!("sending a broadcast delta");
let server_msg = ServerMessage::Broadcast {
broadcasts: Service::into_hashmap(broadcasts)
};
let s = serde_json::to_string(&server_msg).chain_err(
|| "failed to serialize",
)?;
Message::Text(s)
} else {
debug!("sending a ping");
Message::Ping(Vec::new())
};
match self.inner.start_send(msg)? {
AsyncSink::Ready => {
debug!("ping sent");
self.ping = false;
Expand Down
Loading

0 comments on commit a98c045

Please sign in to comment.