Skip to content

Commit

Permalink
Merge pull request #62 from mozilla-services/feat/issue-58
Browse files Browse the repository at this point in the history
feat: notify other nodes if user has reconnected for missed messages
  • Loading branch information
pjenvey authored Aug 24, 2018
2 parents 0a47aa4 + 10152fc commit 22bc4dd
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 25 deletions.
81 changes: 57 additions & 24 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
use std::cell::RefCell;
use std::mem;
use std::rc::Rc;
use std::time::Duration;

use cadence::{prelude::*, StatsdClient};
use error_chain::ChainedError;
use futures::future;
use futures::future::Either;
use futures::sync::mpsc;
use futures::sync::oneshot::Receiver;
use futures::AsyncSink;
use futures::{Async, Future, Poll, Sink, Stream};
use reqwest::unstable::async::Client as AsyncClient;
use rusoto_dynamodb::UpdateItemOutput;
use state_machine_future::RentToOwn;
use tokio_core::reactor::Timeout;
Expand Down Expand Up @@ -506,14 +509,8 @@ where
for notif in &mut notifs {
notif.sortkey_timestamp = Some(0);
}
srv.handle.spawn(
srv.ddb
.store_messages(&webpush.uaid, &webpush.message_month, notifs)
.then(|_| {
debug!("Finished saving unacked direct notifications");
Ok(())
}),
);
let srv1 = srv.clone();
save_and_notify_undelivered_messages(&webpush, srv1, notifs);
}

// Log out the final stats message
Expand Down Expand Up @@ -542,6 +539,55 @@ where
}
}

fn save_and_notify_undelivered_messages(
webpush: &WebPushClient,
srv: Rc<Server>,
notifs: Vec<Notification>,
) {
let srv2 = srv.clone();
let srv3 = srv.clone();
let uaid = webpush.uaid.clone();
let connected_at = webpush.connected_at.clone();
srv.handle.spawn(
srv.ddb
.store_messages(&webpush.uaid, &webpush.message_month, notifs)
.and_then(move |_| {
debug!("Finished saving unacked direct notifications, checking for reconnect");
srv2.ddb.get_user(&uaid)
})
.and_then(move |user| {
// Return an err to stop processing if the user hasn't reconnected yet, otherwise
// attempt to construct a client to make the request
if user.connected_at == connected_at {
future::err("No notify needed".into())
} else if let Some(node_id) = user.node_id {
let result = AsyncClient::builder()
.timeout(Duration::from_secs(1))
.build(&srv3.handle);
if let Ok(client) = result {
future::ok((client, user.uaid, node_id))
} else {
future::err("Unable to build http client".into())
}
} else {
future::err("No new node_id, notify not needed".into())
}
})
.and_then(|(client, uaid, node_id)| {
// Send the notify to the user
let notify_url = format!("{}/notif/{}", node_id, uaid.simple());
client
.put(&notify_url)
.send()
.map_err(|_| "Failed to send".into())
})
.then(|_| {
debug!("Finished cleanup");
Ok(())
}),
);
}

#[derive(StateMachineFuture)]
pub enum AuthClientState<T>
where
Expand All @@ -562,25 +608,12 @@ where
},

#[state_machine_future(
transitions(
IncrementStorage,
CheckStorage,
AwaitDropUser,
AwaitMigrateUser,
AwaitInput
)
transitions(IncrementStorage, CheckStorage, AwaitDropUser, AwaitMigrateUser, AwaitInput)
)]
DetermineAck { data: AuthClientData<T> },

#[state_machine_future(
transitions(
DetermineAck,
Send,
AwaitInput,
AwaitRegister,
AwaitUnregister,
AwaitDelete
)
transitions(DetermineAck, Send, AwaitInput, AwaitRegister, AwaitUnregister, AwaitDelete)
)]
AwaitInput { data: AuthClientData<T> },

Expand Down Expand Up @@ -865,7 +898,7 @@ where
webpush.last_ping = sec_since_epoch();
transition!(Send {
smessages: vec![ServerMessage::Ping],
data
data,
})
} else {
debug!("Got a ping too quickly, disconnecting");
Expand Down
2 changes: 1 addition & 1 deletion src/db/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ pub fn drop_user(
).chain_err(|| "Error dropping user")
}

fn get_uaid(
pub fn get_uaid(
ddb: Rc<Box<DynamoDb>>,
uaid: &Uuid,
router_table_name: &str,
Expand Down
17 changes: 17 additions & 0 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,23 @@ impl DynamoStorage {
Box::new(next_query)
})
}

pub fn get_user(&self, uaid: &Uuid) -> impl Future<Item = DynamoDbUser, Error = Error> {
let ddb = self.ddb.clone();
let result = commands::get_uaid(ddb, uaid, &self.router_table_name)
.and_then(|result| {
future::result(
result
.item
.ok_or_else(|| "No user record found".into())
.and_then(|item| {
let user = serde_dynamodb::from_hashmap(item);
user.chain_err(|| "Error deserializing")
}),
)
});
Box::new(result)
}
}

pub fn list_message_tables(ddb: &Rc<Box<DynamoDb>>, prefix: &str) -> Result<Vec<String>> {
Expand Down

0 comments on commit 22bc4dd

Please sign in to comment.