Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: refactor client to remove duplicate logic
Browse files Browse the repository at this point in the history
WaitingForAck's was essentially identical to Await, except it allowed
for Ack's to come in. Combining these reduces duplication and errors
that may occur as a result while simplifying command handling.

Closes #1165
  • Loading branch information
bbangert committed Mar 31, 2018
1 parent c1a74e9 commit 4782084
Showing 1 changed file with 14 additions and 69 deletions.
83 changes: 14 additions & 69 deletions autopush_rs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ pub enum ClientState {
SendMessages(Option<Vec<Notification>>),
CheckStorage,
IncrementStorage,
WaitingForAcks,
Await,
Done,
ShutdownCleanup(Option<Error>),
Expand Down Expand Up @@ -257,7 +256,7 @@ where
})
}
} else {
ClientState::WaitingForAcks
ClientState::Await
}
}
ClientState::CheckStorage => {
Expand Down Expand Up @@ -359,7 +358,7 @@ where
// No messages remaining
ClientState::FinishSend(
None,
Some(Box::new(ClientState::WaitingForAcks))
Some(Box::new(ClientState::Await))
)
}
} else {
Expand All @@ -371,7 +370,7 @@ where
debug!("State: WaitingForIncrementStorage");
try_ready!(response.poll());
self.data.webpush.as_mut().unwrap().flags.increment_storage = false;
ClientState::WaitingForAcks
ClientState::Await
}
ClientState::WaitingForMigrateUser(ref mut response) => {
debug!("State: WaitingForMigrateUser");
Expand Down Expand Up @@ -404,7 +403,7 @@ where
}
};
let next_state = if self.data.unacked_messages() {
ClientState::WaitingForAcks
ClientState::Await
} else {
ClientState::Await
};
Expand All @@ -427,71 +426,16 @@ where
}
};
let next_state = if self.data.unacked_messages() {
ClientState::WaitingForAcks
ClientState::Await
} else {
ClientState::Await
};
ClientState::FinishSend(Some(msg), Some(Box::new(next_state)))
}
ClientState::WaitingForAcks => {
debug!("State: WaitingForAcks");
if let Some(next_state) = self.data.determine_acked_state() {
return Ok(next_state.into());
}
match try_ready!(self.data.input_or_notif()) {
Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => {
let webpush = self.data.webpush.as_mut().unwrap();
let service_delta = self.data.srv.client_service_add_service(
&mut webpush.broadcast_services,
&Service::from_hashmap(broadcasts),
);
if let Some(delta) = service_delta {
ClientState::FinishSend(
Some(ServerMessage::Broadcast {
broadcasts: Service::into_hashmap(delta)
}),
Some(Box::new(ClientState::WaitingForAcks)),
)
} else {
ClientState::WaitingForAcks
}
}
Either::A(ClientMessage::Register { channel_id, key }) => {
self.data.process_register(channel_id, key)
}
Either::A(ClientMessage::Unregister { channel_id, code }) => {
self.data.process_unregister(channel_id, code)
}
Either::A(ClientMessage::Nack { .. }) => {
self.data.srv.metrics.incr("ua.command.nack").ok();
self.data.webpush.as_mut().unwrap().stats.nacks += 1;
ClientState::WaitingForAcks
}
Either::A(ClientMessage::Ack { updates }) => self.data.process_acks(updates),
Either::B(ServerNotification::Notification(notif)) => {
let webpush = self.data.webpush.as_mut().unwrap();
if notif.ttl != 0 {
webpush.unacked_direct_notifs.push(notif.clone());
}
debug!("Got a notification to send, sending!");
ClientState::FinishSend(
Some(ServerMessage::Notification(notif)),
Some(Box::new(ClientState::WaitingForAcks)),
)
}
Either::B(ServerNotification::CheckStorage) => {
let webpush = self.data.webpush.as_mut().unwrap();
webpush.flags.include_topic = true;
webpush.flags.check = true;
ClientState::Await
}
_ => return Err("Invalid state transition".into()),
}
}
ClientState::WaitingForDelete(ref mut response) => {
debug!("State: WaitingForDelete");
try_ready!(response.poll());
ClientState::WaitingForAcks
ClientState::Await
}
ClientState::WaitingForDropUser(ref mut response) => {
debug!("State: WaitingForDropUser");
Expand All @@ -500,8 +444,8 @@ where
}
ClientState::Await => {
debug!("State: Await");
if self.data.webpush.as_ref().unwrap().flags.check {
return Ok(ClientState::CheckStorage.into());
if let Some(next_state) = self.data.determine_acked_state() {
return Ok(next_state.into());
}
match try_ready!(self.data.input_or_notif()) {
Either::A(ClientMessage::BroadcastSubscribe { broadcasts }) => {
Expand Down Expand Up @@ -532,6 +476,7 @@ where
self.data.webpush.as_mut().unwrap().stats.nacks += 1;
ClientState::Await
}
Either::A(ClientMessage::Ack { updates }) => self.data.process_acks(updates),
Either::B(ServerNotification::Notification(notif)) => {
let webpush = self.data.webpush.as_mut().unwrap();
if notif.ttl != 0 {
Expand All @@ -540,7 +485,7 @@ where
debug!("Got a notification to send, sending!");
ClientState::FinishSend(
Some(ServerMessage::Notification(notif)),
Some(Box::new(ClientState::WaitingForAcks)),
Some(Box::new(ClientState::Await)),
)
}
Either::B(ServerNotification::CheckStorage) => {
Expand Down Expand Up @@ -732,12 +677,11 @@ where
if let Some(my_fut) = fut {
ClientState::WaitingForDelete(my_fut)
} else {
ClientState::WaitingForAcks
ClientState::Await
}
}

// Called from WaitingForAcks to determine if we're in fact done waiting for acks
// and to determine where we might go next
// Called from Await to determine any needed state changes
fn determine_acked_state(&mut self) -> Option<ClientState> {
let webpush = self.webpush.as_ref().unwrap();
let all_acked = !self.unacked_messages();
Expand All @@ -755,7 +699,8 @@ where
self.srv.drop_user(webpush.uaid.simple().to_string()),
))
} else if all_acked && webpush.flags.none() {
Some(ClientState::Await)
// Explicit call-out that this condition results in no state change.
None
} else {
None
}
Expand Down

0 comments on commit 4782084

Please sign in to comment.