From 4782084111e3ad638c804e05d3f92cd74d718128 Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Fri, 30 Mar 2018 17:47:43 -0700 Subject: [PATCH] feat: refactor client to remove duplicate logic WaitingForAck's was essentially identical to Await, except it allowed for Ack's to come in. Combining these reduces duplication and errors that may occur as a result while simplifying command handling. Closes #1165 --- autopush_rs/src/client.rs | 83 +++++++-------------------------------- 1 file changed, 14 insertions(+), 69 deletions(-) diff --git a/autopush_rs/src/client.rs b/autopush_rs/src/client.rs index 789178b0..7dc76dbc 100644 --- a/autopush_rs/src/client.rs +++ b/autopush_rs/src/client.rs @@ -135,7 +135,6 @@ pub enum ClientState { SendMessages(Option>), CheckStorage, IncrementStorage, - WaitingForAcks, Await, Done, ShutdownCleanup(Option), @@ -257,7 +256,7 @@ where }) } } else { - ClientState::WaitingForAcks + ClientState::Await } } ClientState::CheckStorage => { @@ -359,7 +358,7 @@ where // No messages remaining ClientState::FinishSend( None, - Some(Box::new(ClientState::WaitingForAcks)) + Some(Box::new(ClientState::Await)) ) } } else { @@ -371,7 +370,7 @@ where debug!("State: WaitingForIncrementStorage"); try_ready!(response.poll()); self.data.webpush.as_mut().unwrap().flags.increment_storage = false; - ClientState::WaitingForAcks + ClientState::Await } ClientState::WaitingForMigrateUser(ref mut response) => { debug!("State: WaitingForMigrateUser"); @@ -404,7 +403,7 @@ where } }; let next_state = if self.data.unacked_messages() { - ClientState::WaitingForAcks + ClientState::Await } else { ClientState::Await }; @@ -427,71 +426,16 @@ where } }; let next_state = if self.data.unacked_messages() { - ClientState::WaitingForAcks + ClientState::Await } else { ClientState::Await }; ClientState::FinishSend(Some(msg), Some(Box::new(next_state))) } - ClientState::WaitingForAcks => { - debug!("State: WaitingForAcks"); - if let Some(next_state) = self.data.determine_acked_state() { - return Ok(next_state.into()); - } - match try_ready!(self.data.input_or_notif()) { - Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => { - let webpush = self.data.webpush.as_mut().unwrap(); - let service_delta = self.data.srv.client_service_add_service( - &mut webpush.broadcast_services, - &Service::from_hashmap(broadcasts), - ); - if let Some(delta) = service_delta { - ClientState::FinishSend( - Some(ServerMessage::Broadcast { - broadcasts: Service::into_hashmap(delta) - }), - Some(Box::new(ClientState::WaitingForAcks)), - ) - } else { - ClientState::WaitingForAcks - } - } - Either::A(ClientMessage::Register { channel_id, key }) => { - self.data.process_register(channel_id, key) - } - Either::A(ClientMessage::Unregister { channel_id, code }) => { - self.data.process_unregister(channel_id, code) - } - Either::A(ClientMessage::Nack { .. }) => { - self.data.srv.metrics.incr("ua.command.nack").ok(); - self.data.webpush.as_mut().unwrap().stats.nacks += 1; - ClientState::WaitingForAcks - } - Either::A(ClientMessage::Ack { updates }) => self.data.process_acks(updates), - Either::B(ServerNotification::Notification(notif)) => { - let webpush = self.data.webpush.as_mut().unwrap(); - if notif.ttl != 0 { - webpush.unacked_direct_notifs.push(notif.clone()); - } - debug!("Got a notification to send, sending!"); - ClientState::FinishSend( - Some(ServerMessage::Notification(notif)), - Some(Box::new(ClientState::WaitingForAcks)), - ) - } - Either::B(ServerNotification::CheckStorage) => { - let webpush = self.data.webpush.as_mut().unwrap(); - webpush.flags.include_topic = true; - webpush.flags.check = true; - ClientState::Await - } - _ => return Err("Invalid state transition".into()), - } - } ClientState::WaitingForDelete(ref mut response) => { debug!("State: WaitingForDelete"); try_ready!(response.poll()); - ClientState::WaitingForAcks + ClientState::Await } ClientState::WaitingForDropUser(ref mut response) => { debug!("State: WaitingForDropUser"); @@ -500,8 +444,8 @@ where } ClientState::Await => { debug!("State: Await"); - if self.data.webpush.as_ref().unwrap().flags.check { - return Ok(ClientState::CheckStorage.into()); + if let Some(next_state) = self.data.determine_acked_state() { + return Ok(next_state.into()); } match try_ready!(self.data.input_or_notif()) { Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => { @@ -532,6 +476,7 @@ where self.data.webpush.as_mut().unwrap().stats.nacks += 1; ClientState::Await } + Either::A(ClientMessage::Ack { updates }) => self.data.process_acks(updates), Either::B(ServerNotification::Notification(notif)) => { let webpush = self.data.webpush.as_mut().unwrap(); if notif.ttl != 0 { @@ -540,7 +485,7 @@ where debug!("Got a notification to send, sending!"); ClientState::FinishSend( Some(ServerMessage::Notification(notif)), - Some(Box::new(ClientState::WaitingForAcks)), + Some(Box::new(ClientState::Await)), ) } Either::B(ServerNotification::CheckStorage) => { @@ -732,12 +677,11 @@ where if let Some(my_fut) = fut { ClientState::WaitingForDelete(my_fut) } else { - ClientState::WaitingForAcks + ClientState::Await } } - // Called from WaitingForAcks to determine if we're in fact done waiting for acks - // and to determine where we might go next + // Called from Await to determine any needed state changes fn determine_acked_state(&mut self) -> Option { let webpush = self.webpush.as_ref().unwrap(); let all_acked = !self.unacked_messages(); @@ -755,7 +699,8 @@ where self.srv.drop_user(webpush.uaid.simple().to_string()), )) } else if all_acked && webpush.flags.none() { - Some(ClientState::Await) + // Explicit call-out that this condition results in no state change. + None } else { None }