From cdfb169079d98cc0f2b76bed2a3eff0564a10ddc Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Wed, 20 Jun 2018 12:22:44 -0700 Subject: [PATCH 1/2] refactor: service -> broadcast Issue #14 --- src/client.rs | 52 ++++---- src/server/mod.rs | 42 +++---- src/util/megaphone.rs | 282 +++++++++++++++++++++--------------------- 3 files changed, 188 insertions(+), 188 deletions(-) diff --git a/src/client.rs b/src/client.rs index 7b560075a..f67f4ae05 100644 --- a/src/client.rs +++ b/src/client.rs @@ -24,7 +24,7 @@ use errors::*; use protocol::{ClientMessage, Notification, ServerMessage, ServerNotification}; use server::Server; use db::{CheckStorageResponse, HelloResponse, RegisterResponse}; -use util::megaphone::{ClientServices, Service, ServiceClientInit}; +use util::megaphone::{ClientBroadcasts, Broadcast, BroadcastClientInit}; use util::{ms_since_epoch, parse_user_agent, sec_since_epoch}; // Created and handed to the AutopushServer @@ -42,7 +42,7 @@ pub struct Client { state_machine: UnAuthClientStateFuture, srv: Rc, - broadcast_services: Rc>, + broadcast_broadcasts: Rc>, tx: mpsc::UnboundedSender, } @@ -80,14 +80,14 @@ where } }; - let broadcast_services = Rc::new(RefCell::new(Default::default())); + let broadcast_broadcasts = Rc::new(RefCell::new(Default::default())); let sm = UnAuthClientState::start( UnAuthClientData { srv: srv.clone(), ws, user_agent: uastr, host, - broadcast_services: broadcast_services.clone(), + broadcast_broadcasts: broadcast_broadcasts.clone(), }, timeout, tx.clone(), @@ -97,14 +97,14 @@ where Self { state_machine: sm, srv: srv.clone(), - broadcast_services, + broadcast_broadcasts, tx, } } - pub fn broadcast_delta(&mut self) -> Option> { - let mut broadcast_services = self.broadcast_services.borrow_mut(); - self.srv.broadcast_delta(&mut broadcast_services) + pub fn broadcast_delta(&mut self) -> Option> { + let mut broadcast_broadcasts = self.broadcast_broadcasts.borrow_mut(); + self.srv.broadcast_delta(&mut broadcast_broadcasts) } pub fn shutdown(&mut self) { @@ -212,7 +212,7 @@ pub struct UnAuthClientData { ws: T, user_agent: String, host: String, - broadcast_services: Rc>, + broadcast_broadcasts: Rc>, } impl UnAuthClientData @@ -238,7 +238,7 @@ pub struct AuthClientData { srv: Rc, ws: T, webpush: Rc>, - broadcast_services: Rc>, + broadcast_broadcasts: Rc>, } impl AuthClientData @@ -282,7 +282,7 @@ where AwaitProcessHello { response: MyFuture, data: UnAuthClientData, - interested_broadcasts: Vec, + interested_broadcasts: Vec, tx: mpsc::UnboundedSender, rx: mpsc::UnboundedReceiver, }, @@ -312,7 +312,7 @@ where hello: &'a mut RentToOwn<'a, AwaitHello>, ) -> Poll, Error> { trace!("State: AwaitHello"); - let (uaid, services) = { + let (uaid, broadcasts) = { let AwaitHello { ref mut data, ref mut timeout, @@ -326,7 +326,7 @@ where .. } => ( uaid.and_then(|uaid| Uuid::parse_str(uaid.as_str()).ok()), - Service::from_hashmap(broadcasts.unwrap_or_default()), + Broadcast::from_hashmap(broadcasts.unwrap_or_default()), ), _ => return Err("Invalid message, must be hello".into()), } @@ -346,7 +346,7 @@ where transition!(AwaitProcessHello { response, data, - interested_broadcasts: services, + interested_broadcasts: broadcasts, tx, rx, }) @@ -393,7 +393,7 @@ where ws, user_agent, host, - broadcast_services, + broadcast_broadcasts, } = data; // Setup the objects and such needed for a WebPushClient @@ -401,9 +401,9 @@ where flags.check = check_storage; flags.reset_uaid = reset_uaid; flags.rotate_message_table = rotate_message_table; - let ServiceClientInit(client_services, broadcasts) = + let BroadcastClientInit(client_broadcasts, broadcasts) = srv.broadcast_init(&interested_broadcasts); - broadcast_services.replace(client_services); + broadcast_broadcasts.replace(client_broadcasts); let uid = Uuid::new_v4(); let webpush = Rc::new(RefCell::new(WebPushClient { uaid, @@ -428,7 +428,7 @@ where uaid: uaid.simple().to_string(), status: 200, use_webpush: Some(true), - broadcasts: Service::into_hashmap(broadcasts), + broadcasts: Broadcast::into_hashmap(broadcasts), }; let auth_state_machine = AuthClientState::start( vec![response], @@ -437,7 +437,7 @@ where srv: srv.clone(), ws, webpush: webpush.clone(), - broadcast_services: broadcast_services.clone(), + broadcast_broadcasts: broadcast_broadcasts.clone(), }, ); transition!(AwaitSessionComplete { @@ -719,18 +719,18 @@ where let mut webpush = webpush_rc.borrow_mut(); match input { Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => { - let service_delta = { - let mut broadcast_services = data.broadcast_services.borrow_mut(); - data.srv.client_service_add_service( - &mut broadcast_services, - &Service::from_hashmap(broadcasts), + let broadcast_delta = { + let mut broadcast_broadcasts = data.broadcast_broadcasts.borrow_mut(); + data.srv.client_broadcast_add_broadcast( + &mut broadcast_broadcasts, + &Broadcast::from_hashmap(broadcasts), ) }; - if let Some(delta) = service_delta { + if let Some(delta) = broadcast_delta { transition!(SendThenWait { remaining_data: vec![ ServerMessage::Broadcast { - broadcasts: Service::into_hashmap(delta), + broadcasts: Broadcast::into_hashmap(delta), }, ], poll_complete: false, diff --git a/src/server/mod.rs b/src/server/mod.rs index 8bafaf192..bd1eb5617 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -47,7 +47,7 @@ use server::metrics::metrics_from_opts; use server::webpush_io::WebpushIo; use settings::Settings; use util::megaphone::{ - ClientServices, MegaphoneAPIResponse, Service, ServiceChangeTracker, ServiceClientInit, + ClientBroadcasts, MegaphoneAPIResponse, Broadcast, BroadcastChangeTracker, BroadcastClientInit, }; use util::{timeout, RcObject}; @@ -210,7 +210,7 @@ impl ServerOptions { pub struct Server { uaids: RefCell>, - broadcaster: RefCell, + broadcaster: RefCell, pub ddb: DynamoStorage, open_connections: Cell, tls_acceptor: Option, @@ -307,10 +307,10 @@ impl Server { .megaphone_api_token .as_ref() .expect("Megaphone API requires a Megaphone API Token to be set"); - ServiceChangeTracker::with_api_services(megaphone_url, megaphone_token) + BroadcastChangeTracker::with_api_broadcasts(megaphone_url, megaphone_token) .expect("Unable to initialize megaphone with provided URL") } else { - ServiceChangeTracker::new(Vec::new()) + BroadcastChangeTracker::new(Vec::new()) }; let srv = Rc::new(Server { opts: opts.clone(), @@ -529,28 +529,28 @@ impl Server { } } - /// Generate a new service client list for a newly connected client - pub fn broadcast_init(&self, services: &[Service]) -> ServiceClientInit { - debug!("Initialized broadcast services"); - self.broadcaster.borrow().service_delta(services) + /// Generate a new broadcast client list for a newly connected client + pub fn broadcast_init(&self, broadcasts: &[Broadcast]) -> BroadcastClientInit { + debug!("Initialized broadcast broadcasts"); + self.broadcaster.borrow().broadcast_delta(broadcasts) } - /// Calculate whether there's new service versions to go out - pub fn broadcast_delta(&self, client_services: &mut ClientServices) -> Option> { + /// Calculate whether there's new broadcast versions to go out + pub fn broadcast_delta(&self, client_broadcasts: &mut ClientBroadcasts) -> Option> { self.broadcaster .borrow() - .change_count_delta(client_services) + .change_count_delta(client_broadcasts) } - /// Add services to be tracked by a client - pub fn client_service_add_service( + /// Add broadcasts to be tracked by a client + pub fn client_broadcast_add_broadcast( &self, - client_services: &mut ClientServices, - services: &[Service], - ) -> Option> { + client_broadcasts: &mut ClientBroadcasts, + broadcasts: &[Broadcast], + ) -> Option> { self.broadcaster .borrow() - .client_service_add_service(client_services, services) + .client_broadcast_add_broadcast(client_broadcasts, broadcasts) } } @@ -618,8 +618,8 @@ impl Future for MegaphoneUpdater { Ok(Async::Ready(MegaphoneAPIResponse { broadcasts })) => { debug!("Fetched broadcasts: {:?}", broadcasts); let mut broadcaster = self.srv.broadcaster.borrow_mut(); - for srv in Service::from_hashmap(broadcasts) { - broadcaster.add_service(srv); + for srv in Broadcast::from_hashmap(broadcasts) { + broadcaster.add_broadcast(srv); } } Ok(Async::NotReady) => return Ok(Async::NotReady), @@ -809,7 +809,7 @@ struct WebpushSocket { pong_received: bool, ping: bool, pong_timeout: bool, - broadcast_delta: Option>, + broadcast_delta: Option>, } impl WebpushSocket { @@ -832,7 +832,7 @@ impl WebpushSocket { let msg = if let Some(broadcasts) = self.broadcast_delta.clone() { debug!("sending a broadcast delta"); let server_msg = ServerMessage::Broadcast { - broadcasts: Service::into_hashmap(broadcasts), + broadcasts: Broadcast::into_hashmap(broadcasts), }; let s = serde_json::to_string(&server_msg).chain_err(|| "failed to serialize")?; Message::Text(s) diff --git a/src/util/megaphone.rs b/src/util/megaphone.rs index 7011c5c1a..0963f6f75 100644 --- a/src/util/megaphone.rs +++ b/src/util/megaphone.rs @@ -4,103 +4,103 @@ use std::time::Duration; use reqwest; -// A Service entry Key in a ServiceRegistry +// A Broadcast entry Key in a BroadcastRegistry #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash)] -struct ServiceKey(u32); +struct BroadcastKey(u32); -// A list of services that a client is interested in and the last change seen +// A list of broadcasts that a client is interested in and the last change seen #[derive(Debug, Default)] -pub struct ClientServices { - service_list: Vec, +pub struct ClientBroadcasts { + broadcast_list: Vec, change_count: u32, } #[derive(Debug)] -struct ServiceRegistry { +struct BroadcastRegistry { lookup: HashMap, table: Vec, } -// Return result of the first delta call for a client given a full list of service id's and versions +// Return result of the first delta call for a client given a full list of broadcast id's and versions #[derive(Debug)] -pub struct ServiceClientInit(pub ClientServices, pub Vec); +pub struct BroadcastClientInit(pub ClientBroadcasts, pub Vec); -impl ServiceRegistry { - fn new() -> ServiceRegistry { - ServiceRegistry { +impl BroadcastRegistry { + fn new() -> BroadcastRegistry { + BroadcastRegistry { lookup: HashMap::new(), table: Vec::new(), } } - // Add's a new service to the lookup table, returns the existing key if the service already + // Add's a new broadcast to the lookup table, returns the existing key if the broadcast already // exists - fn add_service(&mut self, service_id: String) -> ServiceKey { - if let Some(v) = self.lookup.get(&service_id) { - return ServiceKey(*v); + fn add_broadcast(&mut self, broadcast_id: String) -> BroadcastKey { + if let Some(v) = self.lookup.get(&broadcast_id) { + return BroadcastKey(*v); } let i = self.table.len(); - self.table.push(service_id.clone()); - self.lookup.insert(service_id, i as u32); - ServiceKey(i as u32) + self.table.push(broadcast_id.clone()); + self.lookup.insert(broadcast_id, i as u32); + BroadcastKey(i as u32) } - fn lookup_id(&self, key: ServiceKey) -> Option { + fn lookup_id(&self, key: BroadcastKey) -> Option { self.table.get(key.0 as usize).cloned() } - fn lookup_key(&self, service_id: &str) -> Option { - self.lookup.get(service_id).cloned().map(ServiceKey) + fn lookup_key(&self, broadcast_id: &str) -> Option { + self.lookup.get(broadcast_id).cloned().map(BroadcastKey) } } -// An individual service and the current change count +// An individual broadcast and the current change count #[derive(Debug)] -struct ServiceRevision { +struct BroadcastRevision { change_count: u32, - service: ServiceKey, + broadcast: BroadcastKey, } -// A provided Service/Version used for `ChangeList` initialization, client comparisons, and +// A provided Broadcast/Version used for `ChangeList` initialization, client comparisons, and // outgoing deltas #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] -pub struct Service { - service_id: String, +pub struct Broadcast { + broadcast_id: String, version: String, } // Handy From impls for common hashmap to/from conversions -impl From<(String, String)> for Service { - fn from(val: (String, String)) -> Service { - Service { - service_id: val.0, +impl From<(String, String)> for Broadcast { + fn from(val: (String, String)) -> Broadcast { + Broadcast { + broadcast_id: val.0, version: val.1, } } } -impl From for (String, String) { - fn from(svc: Service) -> (String, String) { - (svc.service_id, svc.version) +impl From for (String, String) { + fn from(svc: Broadcast) -> (String, String) { + (svc.broadcast_id, svc.version) } } -impl Service { - pub fn from_hashmap(val: HashMap) -> Vec { +impl Broadcast { + pub fn from_hashmap(val: HashMap) -> Vec { val.into_iter().map(|v| v.into()).collect() } - pub fn into_hashmap(service_vec: Vec) -> HashMap { - service_vec.into_iter().map(|v| v.into()).collect() + pub fn into_hashmap(broadcast_vec: Vec) -> HashMap { + broadcast_vec.into_iter().map(|v| v.into()).collect() } } -// ServiceChangeTracker tracks the services, their change_count, and the service lookup registry +// BroadcastChangeTracker tracks the broadcasts, their change_count, and the broadcast lookup registry #[derive(Debug)] -pub struct ServiceChangeTracker { - service_list: Vec, - service_registry: ServiceRegistry, - service_versions: HashMap, +pub struct BroadcastChangeTracker { + broadcast_list: Vec, + broadcast_registry: BroadcastRegistry, + broadcast_versions: HashMap, change_count: u32, } @@ -109,29 +109,29 @@ pub struct MegaphoneAPIResponse { pub broadcasts: HashMap, } -impl ServiceChangeTracker { - /// Creates a new `ServiceChangeTracker` initialized with the provided `services`. - pub fn new(services: Vec) -> ServiceChangeTracker { - let mut svc_change_tracker = ServiceChangeTracker { - service_list: Vec::new(), - service_registry: ServiceRegistry::new(), - service_versions: HashMap::new(), +impl BroadcastChangeTracker { + /// Creates a new `BroadcastChangeTracker` initialized with the provided `broadcasts`. + pub fn new(broadcasts: Vec) -> BroadcastChangeTracker { + let mut svc_change_tracker = BroadcastChangeTracker { + broadcast_list: Vec::new(), + broadcast_registry: BroadcastRegistry::new(), + broadcast_versions: HashMap::new(), change_count: 0, }; - for srv in services { + for srv in broadcasts { let key = svc_change_tracker - .service_registry - .add_service(srv.service_id); - svc_change_tracker.service_versions.insert(key, srv.version); + .broadcast_registry + .add_broadcast(srv.broadcast_id); + svc_change_tracker.broadcast_versions.insert(key, srv.version); } svc_change_tracker } - /// Creates a new `ServiceChangeTracker` initialized from a Megaphone API server version set + /// Creates a new `BroadcastChangeTracker` initialized from a Megaphone API server version set /// as provided as the fetch URL. /// /// This method uses a synchronous HTTP call. - pub fn with_api_services(url: &str, token: &str) -> reqwest::Result { + pub fn with_api_broadcasts(url: &str, token: &str) -> reqwest::Result { let client = reqwest::Client::builder() .timeout(Duration::from_secs(1)) .build()?; @@ -141,80 +141,80 @@ impl ServiceChangeTracker { .send()? .error_for_status()? .json()?; - let services = Service::from_hashmap(broadcasts); - Ok(ServiceChangeTracker::new(services)) + let broadcasts = Broadcast::from_hashmap(broadcasts); + Ok(BroadcastChangeTracker::new(broadcasts)) } - /// Add a new service to the ServiceChangeTracker, triggering a change_count increase. - /// Note: If the service already exists, it will be updated instead. - pub fn add_service(&mut self, service: Service) -> u32 { - if let Ok(change_count) = self.update_service(service.clone()) { + /// Add a new broadcast to the BroadcastChangeTracker, triggering a change_count increase. + /// Note: If the broadcast already exists, it will be updated instead. + pub fn add_broadcast(&mut self, broadcast: Broadcast) -> u32 { + if let Ok(change_count) = self.update_broadcast(broadcast.clone()) { return change_count; } self.change_count += 1; - let key = self.service_registry.add_service(service.service_id); - self.service_versions.insert(key, service.version); - self.service_list.push(ServiceRevision { + let key = self.broadcast_registry.add_broadcast(broadcast.broadcast_id); + self.broadcast_versions.insert(key, broadcast.version); + self.broadcast_list.push(BroadcastRevision { change_count: self.change_count, - service: key, + broadcast: key, }); self.change_count } - /// Update a `service` to a new revision, triggering a change_count increase. + /// Update a `broadcast` to a new revision, triggering a change_count increase. /// - /// Returns an error if the `service` was never initialized/added. - pub fn update_service(&mut self, service: Service) -> Result { - let key = self.service_registry - .lookup_key(&service.service_id) - .ok_or("Service not found")?; - - if let Some(ver) = self.service_versions.get_mut(&key) { - if *ver == service.version { + /// Returns an error if the `broadcast` was never initialized/added. + pub fn update_broadcast(&mut self, broadcast: Broadcast) -> Result { + let key = self.broadcast_registry + .lookup_key(&broadcast.broadcast_id) + .ok_or("Broadcast not found")?; + + if let Some(ver) = self.broadcast_versions.get_mut(&key) { + if *ver == broadcast.version { return Ok(self.change_count); } - *ver = service.version; + *ver = broadcast.version; } else { - return Err("Service not found".into()); + return Err("Broadcast not found".into()); } - // Check to see if this service has been updated since initialization - let svc_index = self.service_list + // Check to see if this broadcast has been updated since initialization + let svc_index = self.broadcast_list .iter() .enumerate() - .filter_map(|(i, svc)| if svc.service == key { Some(i) } else { None }) + .filter_map(|(i, svc)| if svc.broadcast == key { Some(i) } else { None }) .nth(0); self.change_count += 1; if let Some(svc_index) = svc_index { - let mut svc = self.service_list.remove(svc_index); + let mut svc = self.broadcast_list.remove(svc_index); svc.change_count = self.change_count; - self.service_list.push(svc); + self.broadcast_list.push(svc); } else { - self.service_list.push(ServiceRevision { + self.broadcast_list.push(BroadcastRevision { change_count: self.change_count, - service: key, + broadcast: key, }) } Ok(self.change_count) } - /// Returns the new service versions since the provided `client_set`. - pub fn change_count_delta(&self, client_set: &mut ClientServices) -> Option> { + /// Returns the new broadcast versions since the provided `client_set`. + pub fn change_count_delta(&self, client_set: &mut ClientBroadcasts) -> Option> { if self.change_count <= client_set.change_count { return None; } let mut svc_delta = Vec::new(); - for svc in self.service_list.iter().rev() { + for svc in self.broadcast_list.iter().rev() { if svc.change_count <= client_set.change_count { break; } - if !client_set.service_list.contains(&svc.service) { + if !client_set.broadcast_list.contains(&svc.broadcast) { continue; } - if let Some(ver) = self.service_versions.get(&svc.service) { - if let Some(svc_id) = self.service_registry.lookup_id(svc.service) { - svc_delta.push(Service { - service_id: svc_id, + if let Some(ver) = self.broadcast_versions.get(&svc.broadcast) { + if let Some(svc_id) = self.broadcast_registry.lookup_id(svc.broadcast) { + svc_delta.push(Broadcast { + broadcast_id: svc_id, version: (*ver).clone(), }); } @@ -228,17 +228,17 @@ impl ServiceChangeTracker { } } - /// Returns a delta for `services` that are out of date with the latest version and a new + /// Returns a delta for `broadcasts` that are out of date with the latest version and a new /// `ClientSet``. - pub fn service_delta(&self, services: &[Service]) -> ServiceClientInit { + pub fn broadcast_delta(&self, broadcasts: &[Broadcast]) -> BroadcastClientInit { let mut svc_list = Vec::new(); let mut svc_delta = Vec::new(); - for svc in services.iter() { - if let Some(svc_key) = self.service_registry.lookup_key(&svc.service_id) { - if let Some(ver) = self.service_versions.get(&svc_key) { + for svc in broadcasts.iter() { + if let Some(svc_key) = self.broadcast_registry.lookup_key(&svc.broadcast_id) { + if let Some(ver) = self.broadcast_versions.get(&svc_key) { if *ver != svc.version { - svc_delta.push(Service { - service_id: svc.service_id.clone(), + svc_delta.push(Broadcast { + broadcast_id: svc.broadcast_id.clone(), version: (*ver).clone(), }); } @@ -246,36 +246,36 @@ impl ServiceChangeTracker { svc_list.push(svc_key); } } - ServiceClientInit( - ClientServices { - service_list: svc_list, + BroadcastClientInit( + ClientBroadcasts { + broadcast_list: svc_list, change_count: self.change_count, }, svc_delta, ) } - /// Update a `ClientServices` to account for a new service. + /// Update a `ClientBroadcasts` to account for a new broadcast. /// - /// Returns services that have changed. - pub fn client_service_add_service( + /// Returns broadcasts that have changed. + pub fn client_broadcast_add_broadcast( &self, - client_service: &mut ClientServices, - services: &[Service], - ) -> Option> { - let mut svc_delta = self.change_count_delta(client_service) + client_broadcast: &mut ClientBroadcasts, + broadcasts: &[Broadcast], + ) -> Option> { + let mut svc_delta = self.change_count_delta(client_broadcast) .unwrap_or_default(); - for svc in services.iter() { - if let Some(svc_key) = self.service_registry.lookup_key(&svc.service_id) { - if let Some(ver) = self.service_versions.get(&svc_key) { + for svc in broadcasts.iter() { + if let Some(svc_key) = self.broadcast_registry.lookup_key(&svc.broadcast_id) { + if let Some(ver) = self.broadcast_versions.get(&svc_key) { if *ver != svc.version { - svc_delta.push(Service { - service_id: svc.service_id.clone(), + svc_delta.push(Broadcast { + broadcast_id: svc.broadcast_id.clone(), version: (*ver).clone(), }); } } - client_service.service_list.push(svc_key) + client_broadcast.broadcast_list.push(svc_key) } } if svc_delta.is_empty() { @@ -290,33 +290,33 @@ impl ServiceChangeTracker { mod tests { use super::*; - fn make_service_base() -> Vec { + fn make_broadcast_base() -> Vec { vec![ - Service { - service_id: String::from("svca"), + Broadcast { + broadcast_id: String::from("svca"), version: String::from("rev1"), }, - Service { - service_id: String::from("svcb"), + Broadcast { + broadcast_id: String::from("svcb"), version: String::from("revalha"), }, ] } #[test] - fn test_service_change_tracker() { - let services = make_service_base(); - let client_services = services.clone(); - let mut svc_chg_tracker = ServiceChangeTracker::new(services); - let ServiceClientInit(mut client_svc, delta) = - svc_chg_tracker.service_delta(&client_services); + fn test_broadcast_change_tracker() { + let broadcasts = make_broadcast_base(); + let client_broadcasts = broadcasts.clone(); + let mut svc_chg_tracker = BroadcastChangeTracker::new(broadcasts); + let BroadcastClientInit(mut client_svc, delta) = + svc_chg_tracker.broadcast_delta(&client_broadcasts); assert_eq!(delta.len(), 0); assert_eq!(client_svc.change_count, 0); - assert_eq!(client_svc.service_list.len(), 2); + assert_eq!(client_svc.broadcast_list.len(), 2); svc_chg_tracker - .update_service(Service { - service_id: String::from("svca"), + .update_broadcast(Broadcast { + broadcast_id: String::from("svca"), version: String::from("rev2"), }) .ok(); @@ -327,25 +327,25 @@ mod tests { } #[test] - fn test_service_change_handles_new_services() { - let services = make_service_base(); - let client_services = services.clone(); - let mut svc_chg_tracker = ServiceChangeTracker::new(services); - let ServiceClientInit(mut client_svc, _) = svc_chg_tracker.service_delta(&client_services); - - svc_chg_tracker.add_service(Service { - service_id: String::from("svcc"), + fn test_broadcast_change_handles_new_broadcasts() { + let broadcasts = make_broadcast_base(); + let client_broadcasts = broadcasts.clone(); + let mut svc_chg_tracker = BroadcastChangeTracker::new(broadcasts); + let BroadcastClientInit(mut client_svc, _) = svc_chg_tracker.broadcast_delta(&client_broadcasts); + + svc_chg_tracker.add_broadcast(Broadcast { + broadcast_id: String::from("svcc"), version: String::from("revmega"), }); let delta = svc_chg_tracker.change_count_delta(&mut client_svc); assert!(delta.is_none()); let delta = svc_chg_tracker - .client_service_add_service( + .client_broadcast_add_broadcast( &mut client_svc, &vec![ - Service { - service_id: String::from("svcc"), + Broadcast { + broadcast_id: String::from("svcc"), version: String::from("revision_alpha"), }, ], @@ -354,6 +354,6 @@ mod tests { assert_eq!(delta.len(), 1); assert_eq!(delta[0].version, String::from("revmega")); assert_eq!(client_svc.change_count, 1); - assert_eq!(svc_chg_tracker.service_list.len(), 1); + assert_eq!(svc_chg_tracker.broadcast_list.len(), 1); } } From 1d5e718849c19b882ec777c95f2c199cbb97851f Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Wed, 20 Jun 2018 12:31:52 -0700 Subject: [PATCH 2/2] refactor: some more renaming svc -> bcast, 'broadcast subscriptions', etc. Closes #14 --- src/client.rs | 42 +++++------ src/server/mod.rs | 26 +++---- src/util/megaphone.rs | 170 +++++++++++++++++++++--------------------- 3 files changed, 120 insertions(+), 118 deletions(-) diff --git a/src/client.rs b/src/client.rs index f67f4ae05..2f53c74e5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -24,7 +24,7 @@ use errors::*; use protocol::{ClientMessage, Notification, ServerMessage, ServerNotification}; use server::Server; use db::{CheckStorageResponse, HelloResponse, RegisterResponse}; -use util::megaphone::{ClientBroadcasts, Broadcast, BroadcastClientInit}; +use util::megaphone::{Broadcast, BroadcastSubs, BroadcastSubsInit}; use util::{ms_since_epoch, parse_user_agent, sec_since_epoch}; // Created and handed to the AutopushServer @@ -42,7 +42,7 @@ pub struct Client { state_machine: UnAuthClientStateFuture, srv: Rc, - broadcast_broadcasts: Rc>, + broadcast_subs: Rc>, tx: mpsc::UnboundedSender, } @@ -80,14 +80,14 @@ where } }; - let broadcast_broadcasts = Rc::new(RefCell::new(Default::default())); + let broadcast_subs = Rc::new(RefCell::new(Default::default())); let sm = UnAuthClientState::start( UnAuthClientData { srv: srv.clone(), ws, user_agent: uastr, host, - broadcast_broadcasts: broadcast_broadcasts.clone(), + broadcast_subs: broadcast_subs.clone(), }, timeout, tx.clone(), @@ -97,14 +97,14 @@ where Self { state_machine: sm, srv: srv.clone(), - broadcast_broadcasts, + broadcast_subs, tx, } } pub fn broadcast_delta(&mut self) -> Option> { - let mut broadcast_broadcasts = self.broadcast_broadcasts.borrow_mut(); - self.srv.broadcast_delta(&mut broadcast_broadcasts) + let mut broadcast_subs = self.broadcast_subs.borrow_mut(); + self.srv.broadcast_delta(&mut broadcast_subs) } pub fn shutdown(&mut self) { @@ -212,7 +212,7 @@ pub struct UnAuthClientData { ws: T, user_agent: String, host: String, - broadcast_broadcasts: Rc>, + broadcast_subs: Rc>, } impl UnAuthClientData @@ -238,7 +238,7 @@ pub struct AuthClientData { srv: Rc, ws: T, webpush: Rc>, - broadcast_broadcasts: Rc>, + broadcast_subs: Rc>, } impl AuthClientData @@ -282,7 +282,7 @@ where AwaitProcessHello { response: MyFuture, data: UnAuthClientData, - interested_broadcasts: Vec, + desired_broadcasts: Vec, tx: mpsc::UnboundedSender, rx: mpsc::UnboundedReceiver, }, @@ -312,7 +312,7 @@ where hello: &'a mut RentToOwn<'a, AwaitHello>, ) -> Poll, Error> { trace!("State: AwaitHello"); - let (uaid, broadcasts) = { + let (uaid, desired_broadcasts) = { let AwaitHello { ref mut data, ref mut timeout, @@ -346,7 +346,7 @@ where transition!(AwaitProcessHello { response, data, - interested_broadcasts: broadcasts, + desired_broadcasts, tx, rx, }) @@ -381,7 +381,7 @@ where let AwaitProcessHello { data, - interested_broadcasts, + desired_broadcasts, tx, rx, .. @@ -393,7 +393,7 @@ where ws, user_agent, host, - broadcast_broadcasts, + broadcast_subs, } = data; // Setup the objects and such needed for a WebPushClient @@ -401,9 +401,9 @@ where flags.check = check_storage; flags.reset_uaid = reset_uaid; flags.rotate_message_table = rotate_message_table; - let BroadcastClientInit(client_broadcasts, broadcasts) = - srv.broadcast_init(&interested_broadcasts); - broadcast_broadcasts.replace(client_broadcasts); + let BroadcastSubsInit(initialized_subs, broadcasts) = + srv.broadcast_init(&desired_broadcasts); + broadcast_subs.replace(initialized_subs); let uid = Uuid::new_v4(); let webpush = Rc::new(RefCell::new(WebPushClient { uaid, @@ -437,7 +437,7 @@ where srv: srv.clone(), ws, webpush: webpush.clone(), - broadcast_broadcasts: broadcast_broadcasts.clone(), + broadcast_subs: broadcast_subs.clone(), }, ); transition!(AwaitSessionComplete { @@ -720,9 +720,9 @@ where match input { Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => { let broadcast_delta = { - let mut broadcast_broadcasts = data.broadcast_broadcasts.borrow_mut(); - data.srv.client_broadcast_add_broadcast( - &mut broadcast_broadcasts, + let mut broadcast_subs = data.broadcast_subs.borrow_mut(); + data.srv.subscribe_to_broadcasts( + &mut broadcast_subs, &Broadcast::from_hashmap(broadcasts), ) }; diff --git a/src/server/mod.rs b/src/server/mod.rs index bd1eb5617..7eb7a48e9 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -47,7 +47,7 @@ use server::metrics::metrics_from_opts; use server::webpush_io::WebpushIo; use settings::Settings; use util::megaphone::{ - ClientBroadcasts, MegaphoneAPIResponse, Broadcast, BroadcastChangeTracker, BroadcastClientInit, + Broadcast, BroadcastChangeTracker, BroadcastSubs, BroadcastSubsInit, MegaphoneAPIResponse, }; use util::{timeout, RcObject}; @@ -529,28 +529,28 @@ impl Server { } } - /// Generate a new broadcast client list for a newly connected client - pub fn broadcast_init(&self, broadcasts: &[Broadcast]) -> BroadcastClientInit { - debug!("Initialized broadcast broadcasts"); - self.broadcaster.borrow().broadcast_delta(broadcasts) + /// Initialize broadcasts for a newly connected client + pub fn broadcast_init(&self, desired_broadcasts: &[Broadcast]) -> BroadcastSubsInit { + debug!("Initialized broadcasts"); + self.broadcaster + .borrow() + .broadcast_delta(desired_broadcasts) } /// Calculate whether there's new broadcast versions to go out - pub fn broadcast_delta(&self, client_broadcasts: &mut ClientBroadcasts) -> Option> { - self.broadcaster - .borrow() - .change_count_delta(client_broadcasts) + pub fn broadcast_delta(&self, broadcast_subs: &mut BroadcastSubs) -> Option> { + self.broadcaster.borrow().change_count_delta(broadcast_subs) } - /// Add broadcasts to be tracked by a client - pub fn client_broadcast_add_broadcast( + /// Add new broadcasts to be tracked by a client + pub fn subscribe_to_broadcasts( &self, - client_broadcasts: &mut ClientBroadcasts, + broadcast_subs: &mut BroadcastSubs, broadcasts: &[Broadcast], ) -> Option> { self.broadcaster .borrow() - .client_broadcast_add_broadcast(client_broadcasts, broadcasts) + .subscribe_to_broadcasts(broadcast_subs, broadcasts) } } diff --git a/src/util/megaphone.rs b/src/util/megaphone.rs index 0963f6f75..df1bcae5f 100644 --- a/src/util/megaphone.rs +++ b/src/util/megaphone.rs @@ -8,9 +8,9 @@ use reqwest; #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash)] struct BroadcastKey(u32); -// A list of broadcasts that a client is interested in and the last change seen +// Broadcasts a client is subscribed to and the last change seen #[derive(Debug, Default)] -pub struct ClientBroadcasts { +pub struct BroadcastSubs { broadcast_list: Vec, change_count: u32, } @@ -21,9 +21,10 @@ struct BroadcastRegistry { table: Vec, } -// Return result of the first delta call for a client given a full list of broadcast id's and versions +// Return result of the first delta call for a client given a full list of broadcast id's and +// versions #[derive(Debug)] -pub struct BroadcastClientInit(pub ClientBroadcasts, pub Vec); +pub struct BroadcastSubsInit(pub BroadcastSubs, pub Vec); impl BroadcastRegistry { fn new() -> BroadcastRegistry { @@ -61,8 +62,8 @@ struct BroadcastRevision { broadcast: BroadcastKey, } -// A provided Broadcast/Version used for `ChangeList` initialization, client comparisons, and -// outgoing deltas +// A provided Broadcast/Version used for `BroadcastSubsInit`, client comparisons, and outgoing +// deltas #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct Broadcast { broadcast_id: String, @@ -80,8 +81,8 @@ impl From<(String, String)> for Broadcast { } impl From for (String, String) { - fn from(svc: Broadcast) -> (String, String) { - (svc.broadcast_id, svc.version) + fn from(bcast: Broadcast) -> (String, String) { + (bcast.broadcast_id, bcast.version) } } @@ -90,12 +91,13 @@ impl Broadcast { val.into_iter().map(|v| v.into()).collect() } - pub fn into_hashmap(broadcast_vec: Vec) -> HashMap { - broadcast_vec.into_iter().map(|v| v.into()).collect() + pub fn into_hashmap(broadcasts: Vec) -> HashMap { + broadcasts.into_iter().map(|v| v.into()).collect() } } -// BroadcastChangeTracker tracks the broadcasts, their change_count, and the broadcast lookup registry +// BroadcastChangeTracker tracks the broadcasts, their change_count, and the broadcast lookup +// registry #[derive(Debug)] pub struct BroadcastChangeTracker { broadcast_list: Vec, @@ -112,19 +114,19 @@ pub struct MegaphoneAPIResponse { impl BroadcastChangeTracker { /// Creates a new `BroadcastChangeTracker` initialized with the provided `broadcasts`. pub fn new(broadcasts: Vec) -> BroadcastChangeTracker { - let mut svc_change_tracker = BroadcastChangeTracker { + let mut tracker = BroadcastChangeTracker { broadcast_list: Vec::new(), broadcast_registry: BroadcastRegistry::new(), broadcast_versions: HashMap::new(), change_count: 0, }; for srv in broadcasts { - let key = svc_change_tracker + let key = tracker .broadcast_registry .add_broadcast(srv.broadcast_id); - svc_change_tracker.broadcast_versions.insert(key, srv.version); + tracker.broadcast_versions.insert(key, srv.version); } - svc_change_tracker + tracker } /// Creates a new `BroadcastChangeTracker` initialized from a Megaphone API server version set @@ -179,16 +181,16 @@ impl BroadcastChangeTracker { } // Check to see if this broadcast has been updated since initialization - let svc_index = self.broadcast_list + let bcast_index = self.broadcast_list .iter() .enumerate() - .filter_map(|(i, svc)| if svc.broadcast == key { Some(i) } else { None }) + .filter_map(|(i, bcast)| if bcast.broadcast == key { Some(i) } else { None }) .nth(0); self.change_count += 1; - if let Some(svc_index) = svc_index { - let mut svc = self.broadcast_list.remove(svc_index); - svc.change_count = self.change_count; - self.broadcast_list.push(svc); + if let Some(bcast_index) = bcast_index { + let mut bcast = self.broadcast_list.remove(bcast_index); + bcast.change_count = self.change_count; + self.broadcast_list.push(bcast); } else { self.broadcast_list.push(BroadcastRevision { change_count: self.change_count, @@ -199,89 +201,89 @@ impl BroadcastChangeTracker { } /// Returns the new broadcast versions since the provided `client_set`. - pub fn change_count_delta(&self, client_set: &mut ClientBroadcasts) -> Option> { + pub fn change_count_delta(&self, client_set: &mut BroadcastSubs) -> Option> { if self.change_count <= client_set.change_count { return None; } - let mut svc_delta = Vec::new(); - for svc in self.broadcast_list.iter().rev() { - if svc.change_count <= client_set.change_count { + let mut bcast_delta = Vec::new(); + for bcast in self.broadcast_list.iter().rev() { + if bcast.change_count <= client_set.change_count { break; } - if !client_set.broadcast_list.contains(&svc.broadcast) { + if !client_set.broadcast_list.contains(&bcast.broadcast) { continue; } - if let Some(ver) = self.broadcast_versions.get(&svc.broadcast) { - if let Some(svc_id) = self.broadcast_registry.lookup_id(svc.broadcast) { - svc_delta.push(Broadcast { - broadcast_id: svc_id, + if let Some(ver) = self.broadcast_versions.get(&bcast.broadcast) { + if let Some(bcast_id) = self.broadcast_registry.lookup_id(bcast.broadcast) { + bcast_delta.push(Broadcast { + broadcast_id: bcast_id, version: (*ver).clone(), }); } } } client_set.change_count = self.change_count; - if svc_delta.is_empty() { + if bcast_delta.is_empty() { None } else { - Some(svc_delta) + Some(bcast_delta) } } - /// Returns a delta for `broadcasts` that are out of date with the latest version and a new - /// `ClientSet``. - pub fn broadcast_delta(&self, broadcasts: &[Broadcast]) -> BroadcastClientInit { - let mut svc_list = Vec::new(); - let mut svc_delta = Vec::new(); - for svc in broadcasts.iter() { - if let Some(svc_key) = self.broadcast_registry.lookup_key(&svc.broadcast_id) { - if let Some(ver) = self.broadcast_versions.get(&svc_key) { - if *ver != svc.version { - svc_delta.push(Broadcast { - broadcast_id: svc.broadcast_id.clone(), + /// Returns a delta for `broadcasts` that are out of date with the latest version and a + /// the collection of broadcast subscriptions. + pub fn broadcast_delta(&self, broadcasts: &[Broadcast]) -> BroadcastSubsInit { + let mut bcast_list = Vec::new(); + let mut bcast_delta = Vec::new(); + for bcast in broadcasts.iter() { + if let Some(bcast_key) = self.broadcast_registry.lookup_key(&bcast.broadcast_id) { + if let Some(ver) = self.broadcast_versions.get(&bcast_key) { + if *ver != bcast.version { + bcast_delta.push(Broadcast { + broadcast_id: bcast.broadcast_id.clone(), version: (*ver).clone(), }); } } - svc_list.push(svc_key); + bcast_list.push(bcast_key); } } - BroadcastClientInit( - ClientBroadcasts { - broadcast_list: svc_list, + BroadcastSubsInit( + BroadcastSubs { + broadcast_list: bcast_list, change_count: self.change_count, }, - svc_delta, + bcast_delta, ) } - /// Update a `ClientBroadcasts` to account for a new broadcast. + /// Update a `BroadcastSubs` to account for new broadcasts. /// /// Returns broadcasts that have changed. - pub fn client_broadcast_add_broadcast( + pub fn subscribe_to_broadcasts( &self, - client_broadcast: &mut ClientBroadcasts, + broadcast_subs: &mut BroadcastSubs, broadcasts: &[Broadcast], ) -> Option> { - let mut svc_delta = self.change_count_delta(client_broadcast) + let mut bcast_delta = self.change_count_delta(broadcast_subs) .unwrap_or_default(); - for svc in broadcasts.iter() { - if let Some(svc_key) = self.broadcast_registry.lookup_key(&svc.broadcast_id) { - if let Some(ver) = self.broadcast_versions.get(&svc_key) { - if *ver != svc.version { - svc_delta.push(Broadcast { - broadcast_id: svc.broadcast_id.clone(), + for bcast in broadcasts.iter() { + if let Some(bcast_key) = self.broadcast_registry.lookup_key(&bcast.broadcast_id) { + if let Some(ver) = self.broadcast_versions.get(&bcast_key) { + if *ver != bcast.version { + bcast_delta.push(Broadcast { + broadcast_id: bcast.broadcast_id.clone(), version: (*ver).clone(), }); } } - client_broadcast.broadcast_list.push(svc_key) + broadcast_subs.broadcast_list.push(bcast_key) } } - if svc_delta.is_empty() { + if bcast_delta.is_empty() { None } else { - Some(svc_delta) + Some(bcast_delta) } } } @@ -293,11 +295,11 @@ mod tests { fn make_broadcast_base() -> Vec { vec![ Broadcast { - broadcast_id: String::from("svca"), + broadcast_id: String::from("bcasta"), version: String::from("rev1"), }, Broadcast { - broadcast_id: String::from("svcb"), + broadcast_id: String::from("bcastb"), version: String::from("revalha"), }, ] @@ -306,21 +308,21 @@ mod tests { #[test] fn test_broadcast_change_tracker() { let broadcasts = make_broadcast_base(); - let client_broadcasts = broadcasts.clone(); - let mut svc_chg_tracker = BroadcastChangeTracker::new(broadcasts); - let BroadcastClientInit(mut client_svc, delta) = - svc_chg_tracker.broadcast_delta(&client_broadcasts); + let desired_broadcasts = broadcasts.clone(); + let mut tracker = BroadcastChangeTracker::new(broadcasts); + let BroadcastSubsInit(mut broadcast_subs, delta) = + tracker.broadcast_delta(&desired_broadcasts); assert_eq!(delta.len(), 0); - assert_eq!(client_svc.change_count, 0); - assert_eq!(client_svc.broadcast_list.len(), 2); + assert_eq!(broadcast_subs.change_count, 0); + assert_eq!(broadcast_subs.broadcast_list.len(), 2); - svc_chg_tracker + tracker .update_broadcast(Broadcast { - broadcast_id: String::from("svca"), + broadcast_id: String::from("bcasta"), version: String::from("rev2"), }) .ok(); - let delta = svc_chg_tracker.change_count_delta(&mut client_svc); + let delta = tracker.change_count_delta(&mut broadcast_subs); assert!(delta.is_some()); let delta = delta.unwrap(); assert_eq!(delta.len(), 1); @@ -329,23 +331,23 @@ mod tests { #[test] fn test_broadcast_change_handles_new_broadcasts() { let broadcasts = make_broadcast_base(); - let client_broadcasts = broadcasts.clone(); - let mut svc_chg_tracker = BroadcastChangeTracker::new(broadcasts); - let BroadcastClientInit(mut client_svc, _) = svc_chg_tracker.broadcast_delta(&client_broadcasts); + let desired_broadcasts = broadcasts.clone(); + let mut tracker = BroadcastChangeTracker::new(broadcasts); + let BroadcastSubsInit(mut broadcast_subs, _) = tracker.broadcast_delta(&desired_broadcasts); - svc_chg_tracker.add_broadcast(Broadcast { - broadcast_id: String::from("svcc"), + tracker.add_broadcast(Broadcast { + broadcast_id: String::from("bcastc"), version: String::from("revmega"), }); - let delta = svc_chg_tracker.change_count_delta(&mut client_svc); + let delta = tracker.change_count_delta(&mut broadcast_subs); assert!(delta.is_none()); - let delta = svc_chg_tracker - .client_broadcast_add_broadcast( - &mut client_svc, + let delta = tracker + .subscribe_to_broadcasts( + &mut broadcast_subs, &vec![ Broadcast { - broadcast_id: String::from("svcc"), + broadcast_id: String::from("bcastc"), version: String::from("revision_alpha"), }, ], @@ -353,7 +355,7 @@ mod tests { .unwrap(); assert_eq!(delta.len(), 1); assert_eq!(delta[0].version, String::from("revmega")); - assert_eq!(client_svc.change_count, 1); - assert_eq!(svc_chg_tracker.broadcast_list.len(), 1); + assert_eq!(broadcast_subs.change_count, 1); + assert_eq!(tracker.broadcast_list.len(), 1); } }