Skip to content

Commit

Permalink
Merge branch 'master' into release/1.48
Browse files Browse the repository at this point in the history
  • Loading branch information
bbangert authored Jun 27, 2018
2 parents c40dd0f + 68d654c commit 296c460
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 217 deletions.
56 changes: 28 additions & 28 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use errors::*;
use protocol::{ClientMessage, Notification, ServerMessage, ServerNotification};
use server::Server;
use db::{CheckStorageResponse, HelloResponse, RegisterResponse};
use util::megaphone::{ClientServices, Service, ServiceClientInit};
use util::megaphone::{Broadcast, BroadcastSubs, BroadcastSubsInit};
use util::{ms_since_epoch, parse_user_agent, sec_since_epoch};

// Created and handed to the AutopushServer
Expand All @@ -42,7 +42,7 @@ pub struct Client<T>
{
state_machine: UnAuthClientStateFuture<T>,
srv: Rc<Server>,
broadcast_services: Rc<RefCell<ClientServices>>,
broadcast_subs: Rc<RefCell<BroadcastSubs>>,
tx: mpsc::UnboundedSender<ServerNotification>,
}

Expand Down Expand Up @@ -80,14 +80,14 @@ where
}
};

let broadcast_services = Rc::new(RefCell::new(Default::default()));
let broadcast_subs = Rc::new(RefCell::new(Default::default()));
let sm = UnAuthClientState::start(
UnAuthClientData {
srv: srv.clone(),
ws,
user_agent: uastr,
host,
broadcast_services: broadcast_services.clone(),
broadcast_subs: broadcast_subs.clone(),
},
timeout,
tx.clone(),
Expand All @@ -97,14 +97,14 @@ where
Self {
state_machine: sm,
srv: srv.clone(),
broadcast_services,
broadcast_subs,
tx,
}
}

pub fn broadcast_delta(&mut self) -> Option<Vec<Service>> {
let mut broadcast_services = self.broadcast_services.borrow_mut();
self.srv.broadcast_delta(&mut broadcast_services)
pub fn broadcast_delta(&mut self) -> Option<Vec<Broadcast>> {
let mut broadcast_subs = self.broadcast_subs.borrow_mut();
self.srv.broadcast_delta(&mut broadcast_subs)
}

pub fn shutdown(&mut self) {
Expand Down Expand Up @@ -212,7 +212,7 @@ pub struct UnAuthClientData<T> {
ws: T,
user_agent: String,
host: String,
broadcast_services: Rc<RefCell<ClientServices>>,
broadcast_subs: Rc<RefCell<BroadcastSubs>>,
}

impl<T> UnAuthClientData<T>
Expand All @@ -238,7 +238,7 @@ pub struct AuthClientData<T> {
srv: Rc<Server>,
ws: T,
webpush: Rc<RefCell<WebPushClient>>,
broadcast_services: Rc<RefCell<ClientServices>>,
broadcast_subs: Rc<RefCell<BroadcastSubs>>,
}

impl<T> AuthClientData<T>
Expand Down Expand Up @@ -282,7 +282,7 @@ where
AwaitProcessHello {
response: MyFuture<HelloResponse>,
data: UnAuthClientData<T>,
interested_broadcasts: Vec<Service>,
desired_broadcasts: Vec<Broadcast>,
tx: mpsc::UnboundedSender<ServerNotification>,
rx: mpsc::UnboundedReceiver<ServerNotification>,
},
Expand Down Expand Up @@ -312,7 +312,7 @@ where
hello: &'a mut RentToOwn<'a, AwaitHello<T>>,
) -> Poll<AfterAwaitHello<T>, Error> {
trace!("State: AwaitHello");
let (uaid, services) = {
let (uaid, desired_broadcasts) = {
let AwaitHello {
ref mut data,
ref mut timeout,
Expand All @@ -326,7 +326,7 @@ where
..
} => (
uaid.and_then(|uaid| Uuid::parse_str(uaid.as_str()).ok()),
Service::from_hashmap(broadcasts.unwrap_or_default()),
Broadcast::from_hashmap(broadcasts.unwrap_or_default()),
),
_ => return Err("Invalid message, must be hello".into()),
}
Expand All @@ -346,7 +346,7 @@ where
transition!(AwaitProcessHello {
response,
data,
interested_broadcasts: services,
desired_broadcasts,
tx,
rx,
})
Expand Down Expand Up @@ -381,7 +381,7 @@ where

let AwaitProcessHello {
data,
interested_broadcasts,
desired_broadcasts,
tx,
rx,
..
Expand All @@ -393,17 +393,17 @@ where
ws,
user_agent,
host,
broadcast_services,
broadcast_subs,
} = data;

// Setup the objects and such needed for a WebPushClient
let mut flags = ClientFlags::new();
flags.check = check_storage;
flags.reset_uaid = reset_uaid;
flags.rotate_message_table = rotate_message_table;
let ServiceClientInit(client_services, broadcasts) =
srv.broadcast_init(&interested_broadcasts);
broadcast_services.replace(client_services);
let BroadcastSubsInit(initialized_subs, broadcasts) =
srv.broadcast_init(&desired_broadcasts);
broadcast_subs.replace(initialized_subs);
let uid = Uuid::new_v4();
let webpush = Rc::new(RefCell::new(WebPushClient {
uaid,
Expand All @@ -428,7 +428,7 @@ where
uaid: uaid.simple().to_string(),
status: 200,
use_webpush: Some(true),
broadcasts: Service::into_hashmap(broadcasts),
broadcasts: Broadcast::into_hashmap(broadcasts),
};
let auth_state_machine = AuthClientState::start(
vec![response],
Expand All @@ -437,7 +437,7 @@ where
srv: srv.clone(),
ws,
webpush: webpush.clone(),
broadcast_services: broadcast_services.clone(),
broadcast_subs: broadcast_subs.clone(),
},
);
transition!(AwaitSessionComplete {
Expand Down Expand Up @@ -719,18 +719,18 @@ where
let mut webpush = webpush_rc.borrow_mut();
match input {
Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => {
let service_delta = {
let mut broadcast_services = data.broadcast_services.borrow_mut();
data.srv.client_service_add_service(
&mut broadcast_services,
&Service::from_hashmap(broadcasts),
let broadcast_delta = {
let mut broadcast_subs = data.broadcast_subs.borrow_mut();
data.srv.subscribe_to_broadcasts(
&mut broadcast_subs,
&Broadcast::from_hashmap(broadcasts),
)
};
if let Some(delta) = service_delta {
if let Some(delta) = broadcast_delta {
transition!(SendThenWait {
remaining_data: vec![
ServerMessage::Broadcast {
broadcasts: Service::into_hashmap(delta),
broadcasts: Broadcast::into_hashmap(delta),
},
],
poll_complete: false,
Expand Down
46 changes: 23 additions & 23 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use server::metrics::metrics_from_opts;
use server::webpush_io::WebpushIo;
use settings::Settings;
use util::megaphone::{
ClientServices, MegaphoneAPIResponse, Service, ServiceChangeTracker, ServiceClientInit,
Broadcast, BroadcastChangeTracker, BroadcastSubs, BroadcastSubsInit, MegaphoneAPIResponse,
};
use util::{timeout, RcObject};

Expand Down Expand Up @@ -210,7 +210,7 @@ impl ServerOptions {

pub struct Server {
uaids: RefCell<HashMap<Uuid, RegisteredClient>>,
broadcaster: RefCell<ServiceChangeTracker>,
broadcaster: RefCell<BroadcastChangeTracker>,
pub ddb: DynamoStorage,
open_connections: Cell<u32>,
tls_acceptor: Option<SslAcceptor>,
Expand Down Expand Up @@ -307,10 +307,10 @@ impl Server {
.megaphone_api_token
.as_ref()
.expect("Megaphone API requires a Megaphone API Token to be set");
ServiceChangeTracker::with_api_services(megaphone_url, megaphone_token)
BroadcastChangeTracker::with_api_broadcasts(megaphone_url, megaphone_token)
.expect("Unable to initialize megaphone with provided URL")
} else {
ServiceChangeTracker::new(Vec::new())
BroadcastChangeTracker::new(Vec::new())
};
let srv = Rc::new(Server {
opts: opts.clone(),
Expand Down Expand Up @@ -529,28 +529,28 @@ impl Server {
}
}

/// Generate a new service client list for a newly connected client
pub fn broadcast_init(&self, services: &[Service]) -> ServiceClientInit {
debug!("Initialized broadcast services");
self.broadcaster.borrow().service_delta(services)
}

/// Calculate whether there's new service versions to go out
pub fn broadcast_delta(&self, client_services: &mut ClientServices) -> Option<Vec<Service>> {
/// Initialize broadcasts for a newly connected client
pub fn broadcast_init(&self, desired_broadcasts: &[Broadcast]) -> BroadcastSubsInit {
debug!("Initialized broadcasts");
self.broadcaster
.borrow()
.change_count_delta(client_services)
.broadcast_delta(desired_broadcasts)
}

/// Calculate whether there's new broadcast versions to go out
pub fn broadcast_delta(&self, broadcast_subs: &mut BroadcastSubs) -> Option<Vec<Broadcast>> {
self.broadcaster.borrow().change_count_delta(broadcast_subs)
}

/// Add services to be tracked by a client
pub fn client_service_add_service(
/// Add new broadcasts to be tracked by a client
pub fn subscribe_to_broadcasts(
&self,
client_services: &mut ClientServices,
services: &[Service],
) -> Option<Vec<Service>> {
broadcast_subs: &mut BroadcastSubs,
broadcasts: &[Broadcast],
) -> Option<Vec<Broadcast>> {
self.broadcaster
.borrow()
.client_service_add_service(client_services, services)
.subscribe_to_broadcasts(broadcast_subs, broadcasts)
}
}

Expand Down Expand Up @@ -618,8 +618,8 @@ impl Future for MegaphoneUpdater {
Ok(Async::Ready(MegaphoneAPIResponse { broadcasts })) => {
debug!("Fetched broadcasts: {:?}", broadcasts);
let mut broadcaster = self.srv.broadcaster.borrow_mut();
for srv in Service::from_hashmap(broadcasts) {
broadcaster.add_service(srv);
for srv in Broadcast::from_hashmap(broadcasts) {
broadcaster.add_broadcast(srv);
}
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Expand Down Expand Up @@ -809,7 +809,7 @@ struct WebpushSocket<T> {
pong_received: bool,
ping: bool,
pong_timeout: bool,
broadcast_delta: Option<Vec<Service>>,
broadcast_delta: Option<Vec<Broadcast>>,
}

impl<T> WebpushSocket<T> {
Expand All @@ -832,7 +832,7 @@ impl<T> WebpushSocket<T> {
let msg = if let Some(broadcasts) = self.broadcast_delta.clone() {
debug!("sending a broadcast delta");
let server_msg = ServerMessage::Broadcast {
broadcasts: Service::into_hashmap(broadcasts),
broadcasts: Broadcast::into_hashmap(broadcasts),
};
let s = serde_json::to_string(&server_msg).chain_err(|| "failed to serialize")?;
Message::Text(s)
Expand Down
Loading

0 comments on commit 296c460

Please sign in to comment.