From f1879c3aed1c50cf07594c76e3a3b65dda339a95 Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Fri, 11 May 2018 11:03:15 -0700 Subject: [PATCH] feat: port delete command to Rust Closes #1204 --- autopush/tests/test_webpush_server.py | 40 -------- autopush/webpush_server.py | 87 ----------------- autopush_rs/src/call.rs | 23 ----- autopush_rs/src/client.rs | 8 +- autopush_rs/src/protocol.rs | 32 +++++- autopush_rs/src/util/ddb_helpers.rs | 134 +++++++++++++++++--------- 6 files changed, 122 insertions(+), 202 deletions(-) diff --git a/autopush/tests/test_webpush_server.py b/autopush/tests/test_webpush_server.py index 86088cb8..5dc5145d 100644 --- a/autopush/tests/test_webpush_server.py +++ b/autopush/tests/test_webpush_server.py @@ -23,8 +23,6 @@ from autopush.utils import WebPushNotification, ns_time from autopush.websocket import USER_RECORD_VERSION from autopush.webpush_server import ( - CheckStorage, - DeleteMessage, MigrateUser, StoreMessages, WebPushMessage, @@ -111,14 +109,6 @@ class Meta: version = factory.LazyAttribute(generate_version) -class CheckStorageFactory(factory.Factory): - class Meta: - model = CheckStorage - - uaid = factory.LazyFunction(lambda: uuid4().hex) - include_topic = True - - def webpush_messages(obj): return [attr.asdict(WebPushMessageFactory(uaid=obj.uaid)) for _ in range(obj.message_count)] @@ -198,36 +188,6 @@ def test_start_stop(self): ws.stop() -class TestDeleteMessageProcessor(BaseSetup): - def _makeFUT(self): - from autopush.webpush_server import DeleteMessageCommand - return DeleteMessageCommand(self.conf, self.db) - - def test_delete_message(self): - from autopush.webpush_server import CheckStorageCommand - check_command = CheckStorageCommand(self.conf, self.db) - check = CheckStorageFactory(message_month=self.db.current_msg_month) - delete_command = self._makeFUT() - - # Store some topic messages - self._store_messages(check.uaid, topic=True, num=7) - - # Fetch them - results = check_command.process(check) - assert len(results.messages) == 7 - - # Delete 2 of them - for notif in results.messages[:2]: - delete_command.process(DeleteMessage( - message_month=self.db.current_msg_month, - message=notif, - )) - - # Fetch messages again - results = check_command.process(check) - assert len(results.messages) == 5 - - class TestMigrateUserProcessor(BaseSetup): def _makeFUT(self): from autopush.webpush_server import MigrateUserCommand diff --git a/autopush/webpush_server.py b/autopush/webpush_server.py index 8d1f072b..fa328b06 100644 --- a/autopush/webpush_server.py +++ b/autopush/webpush_server.py @@ -122,20 +122,6 @@ class InputCommand(object): pass -@attrs(slots=True) -class CheckStorage(InputCommand): - uaid = attrib(convert=uaid_from_str) # type: UUID - message_month = attrib() # type: str - include_topic = attrib() # type: bool - timestamp = attrib(default=None) # type: Optional[int] - - -@attrs(slots=True) -class DeleteMessage(InputCommand): - message_month = attrib() # type: str - message = attrib(convert=dict_to_webpush_message) # type: WebPushMessage - - @attrs(slots=True) class MigrateUser(InputCommand): uaid = attrib(convert=uaid_from_str) # type: UUID @@ -158,20 +144,6 @@ class OutputCommand(object): pass -@attrs(slots=True) -class CheckStorageResponse(OutputCommand): - include_topic = attrib() # type: bool - messages = attrib( - default=attr.Factory(list) - ) # type: List[WebPushMessage] - timestamp = attrib(default=None) # type: Optional[int] - - -@attrs(slots=True) -class DeleteMessageResponse(OutputCommand): - success = attrib(default=True) # type: bool - - @attrs(slots=True) class MigrateUserResponse(OutputCommand): message_month = attrib() # type: str @@ -249,17 +221,13 @@ def __init__(self, conf, db): # type: (AutopushConfig, DatabaseManager) -> None self.conf = conf self.db = db - self.check_storage_processor = CheckStorageCommand(conf, db) - self.delete_message_processor = DeleteMessageCommand(conf, db) self.migrate_user_proocessor = MigrateUserCommand(conf, db) self.store_messages_process = StoreMessagesUserCommand(conf, db) self.deserialize = dict( - delete_message=DeleteMessage, migrate_user=MigrateUser, store_messages=StoreMessages, ) self.command_dict = dict( - delete_message=self.delete_message_processor, migrate_user=self.migrate_user_proocessor, store_messages=self.store_messages_process, ) # type: Dict[str, ProcessorCommand] @@ -302,61 +270,6 @@ def process(self, command): raise NotImplementedError() -class CheckStorageCommand(ProcessorCommand): - def process(self, command): - # type: (CheckStorage) -> CheckStorageResponse - timestamp, messages, include_topic = self._check_storage(command) - return CheckStorageResponse( - timestamp=timestamp, - messages=messages, - include_topic=include_topic, - ) - - def _check_storage(self, command): - timestamp = None - messages = [] - message = Message(command.message_month, - boto_resource=self.db.resource) - if command.include_topic: - timestamp, messages = message.fetch_messages( - uaid=command.uaid, limit=11 - ) - - # If we have topic messages, return them immediately - messages = [WebPushMessage.from_WebPushNotification(m) - for m in messages] - if messages: - return timestamp, messages, True - - # No messages, update the command to include the last timestamp - # that was ack'd - command.timestamp = timestamp - - if not messages or command.timestamp: - timestamp, messages = message.fetch_timestamp_messages( - uaid=command.uaid, - timestamp=command.timestamp, - ) - messages = [WebPushMessage.from_WebPushNotification(m) - for m in messages] - - # If we're out of messages, timestamp is set to None, so we return - # the last timestamp supplied - if not timestamp: - timestamp = command.timestamp - return timestamp, messages, False - - -class DeleteMessageCommand(ProcessorCommand): - def process(self, command): - # type: (DeleteMessage) -> DeleteMessageResponse - notif = command.message.to_WebPushNotification() - message = Message(command.message_month, - boto_resource=self.db.resource) - message.delete_message(notif) - return DeleteMessageResponse() - - class MigrateUserCommand(ProcessorCommand): def process(self, command): # type: (MigrateUser) -> MigrateUserResponse diff --git a/autopush_rs/src/call.rs b/autopush_rs/src/call.rs index 3cdc9c91..0dfe286d 100644 --- a/autopush_rs/src/call.rs +++ b/autopush_rs/src/call.rs @@ -116,11 +116,6 @@ impl FnBox for F { #[derive(Serialize)] #[serde(tag = "command", rename_all = "snake_case")] enum Call { - DeleteMessage { - message: protocol::Notification, - message_month: String, - }, - MigrateUser { uaid: String, message_month: String, @@ -138,11 +133,6 @@ struct PythonError { pub error_msg: String, } -#[derive(Deserialize)] -pub struct DeleteMessageResponse { - pub success: bool, -} - #[derive(Deserialize)] pub struct MigrateUserResponse { pub message_month: String, @@ -154,19 +144,6 @@ pub struct StoreMessagesResponse { } impl Server { - pub fn delete_message( - &self, - message_month: String, - notif: protocol::Notification, - ) -> MyFuture { - let (call, fut) = PythonCall::new(&Call::DeleteMessage { - message: notif, - message_month: message_month, - }); - self.send_to_python(call); - return fut; - } - pub fn migrate_user( &self, uaid: String, diff --git a/autopush_rs/src/client.rs b/autopush_rs/src/client.rs index b366ee31..8f2de42c 100644 --- a/autopush_rs/src/client.rs +++ b/autopush_rs/src/client.rs @@ -612,7 +612,7 @@ where #[state_machine_future(transitions(DetermineAck))] AwaitDelete { - response: MyFuture, + response: MyFuture<()>, data: AuthClientData, }, @@ -772,7 +772,7 @@ where } Either::A(ClientMessage::Ack { updates }) => { data.srv.metrics.incr("ua.command.ack").ok(); - let mut fut: Option> = None; + let mut fut: Option> = None; for notif in updates.iter() { if let Some(pos) = webpush.unacked_direct_notifs.iter().position(|v| { v.channel_id == notif.channel_id && v.version == notif.version @@ -790,10 +790,10 @@ where // Topic/legacy messages have no sortkey_timestamp if n.sortkey_timestamp.is_none() { fut = if let Some(call) = fut { - let my_fut = data.srv.delete_message(message_month, n); + let my_fut = data.srv.ddb.delete_message(&message_month, n); Some(Box::new(call.and_then(move |_| my_fut))) } else { - Some(data.srv.delete_message(message_month, n)) + Some(data.srv.ddb.delete_message(&message_month, n)) } } continue; diff --git a/autopush_rs/src/protocol.rs b/autopush_rs/src/protocol.rs index 5df9050c..48ede961 100644 --- a/autopush_rs/src/protocol.rs +++ b/autopush_rs/src/protocol.rs @@ -10,6 +10,8 @@ use std::collections::HashMap; use uuid::Uuid; +use util::ms_since_epoch; + // Used for the server to flag a webpush client to deliver a Notification or Check storage pub enum ServerNotification { CheckStorage, @@ -103,11 +105,12 @@ pub enum ServerMessage { #[derive(Serialize, Default, Deserialize, Clone, Debug)] pub struct Notification { + #[serde(skip_serializing)] pub uaid: Option, #[serde(rename = "channelID")] pub channel_id: Uuid, pub version: String, - #[serde(default = "default_ttl")] + #[serde(default = "default_ttl", skip_serializing)] pub ttl: u32, #[serde(skip_serializing_if = "Option::is_none")] pub topic: Option, @@ -120,6 +123,33 @@ pub struct Notification { pub headers: Option>, } +impl Notification { + /// Return an appropriate sort_key to use for the chidmessageid + /// + /// For new messages: + /// 02:{sortkey_timestamp}:{chid} + /// + /// For topic messages: + /// 01:{chid}:{topic} + /// + /// Old format for non-topic messages that is no longer returned: + /// {chid}:{message_id} + pub fn sort_key(&self) -> String { + let chid = self.channel_id.hyphenated(); + if let Some(ref topic) = self.topic { + format!("01:{}:{}", chid, topic) + } else if let Some(ref sortkey_timestamp) = self.sortkey_timestamp { + format!("02:{}:{}", + if *sortkey_timestamp == 0 { ms_since_epoch() } else { *sortkey_timestamp }, + chid + ) + } else { + // Legacy messages which we should never get anymore + format!("{}:{}", chid, self.version) + } + } +} + fn default_ttl() -> u32 { 0 } diff --git a/autopush_rs/src/util/ddb_helpers.rs b/autopush_rs/src/util/ddb_helpers.rs index a582792a..7f0c6d0e 100644 --- a/autopush_rs/src/util/ddb_helpers.rs +++ b/autopush_rs/src/util/ddb_helpers.rs @@ -290,62 +290,65 @@ struct RangeKey { legacy_version: Option, } -fn parse_sort_key(key: &str) -> Result { - lazy_static! { - static ref RE: RegexSet = - RegexSet::new(&[r"^01:\S+:\S+$", r"^02:\d+:\S+$", r"^\S{3,}:\S+$",]).unwrap(); - } - if !RE.is_match(key) { - return Err("Invalid chidmessageid".into()).into(); +impl DynamoDbNotification { + fn parse_sort_key(key: &str) -> Result { + lazy_static! { + static ref RE: RegexSet = RegexSet::new(&[ + r"^01:\S+:\S+$", + r"^02:\d+:\S+$", + r"^\S{3,}:\S+$", + ]).unwrap(); } + if !RE.is_match(key) { + return Err("Invalid chidmessageid".into()).into(); + } - let v: Vec<&str> = key.split(":").collect(); - match v[0] { - "01" => { - if v.len() != 3 { - return Err("Invalid topic key".into()); + let v: Vec<&str> = key.split(":").collect(); + match v[0] { + "01" => { + if v.len() != 3 { + return Err("Invalid topic key".into()); + } + let (channel_id, topic) = (v[1], v[2]); + let channel_id = Uuid::parse_str(channel_id)?; + Ok(RangeKey { + channel_id, + topic: Some(topic.to_string()), + sortkey_timestamp: None, + legacy_version: None, + }) } - let (channel_id, topic) = (v[1], v[2]); - let channel_id = Uuid::parse_str(channel_id)?; - Ok(RangeKey { - channel_id, - topic: Some(topic.to_string()), - sortkey_timestamp: None, - legacy_version: None, - }) - } - "02" => { - if v.len() != 3 { - return Err("Invalid topic key".into()); + "02" => { + if v.len() != 3 { + return Err("Invalid topic key".into()); + } + let (sortkey, channel_id) = (v[1], v[2]); + let channel_id = Uuid::parse_str(channel_id)?; + Ok(RangeKey { + channel_id, + topic: None, + sortkey_timestamp: Some(sortkey.parse()?), + legacy_version: None, + }) } - let (sortkey, channel_id) = (v[1], v[2]); - let channel_id = Uuid::parse_str(channel_id)?; - Ok(RangeKey { - channel_id, - topic: None, - sortkey_timestamp: Some(sortkey.parse()?), - legacy_version: None, - }) - } - _ => { - if v.len() != 2 { - return Err("Invalid topic key".into()); + _ => { + if v.len() != 2 { + return Err("Invalid topic key".into()); + } + let (channel_id, legacy_version) = (v[0], v[1]); + let channel_id = Uuid::parse_str(channel_id)?; + Ok(RangeKey { + channel_id, + topic: None, + sortkey_timestamp: None, + legacy_version: Some(legacy_version.to_string()), + }) } - let (channel_id, legacy_version) = (v[0], v[1]); - let channel_id = Uuid::parse_str(channel_id)?; - Ok(RangeKey { - channel_id, - topic: None, - sortkey_timestamp: None, - legacy_version: Some(legacy_version.to_string()), - }) } } -} -impl DynamoDbNotification { fn to_notif(self) -> Result { - let key = parse_sort_key(&self.chidmessageid)?; + let key = DynamoDbNotification::parse_sort_key(&self.chidmessageid)?; let version = key.legacy_version .or(self.updateid) .ok_or("No valid updateid/version found")?; @@ -905,6 +908,43 @@ impl DynamoStorage { Box::new(response) } + /// Delete a given notification from the database + /// + /// No checks are done to see that this message came from the database or has + /// sufficient properties for a delete as that is expected to have been done + /// before this is called. In the event information is missing, a future::ok + /// is returned. + pub fn delete_message( + &self, + table_name: &str, + notif: Notification, + ) -> MyFuture<()> { + let ddb = self.ddb.clone(); + let uaid = if let Some(ref uaid) = notif.uaid { + uaid.clone() + } else { + return Box::new(future::ok(())); + }; + let chidmessageid = notif.sort_key(); + let delete_input = DeleteItemInput { + table_name: table_name.to_string(), + key: ddb_item! { + uaid: s => uaid, + chidmessageid: s => chidmessageid + }, + ..Default::default() + }; + let response = retry_if( + move || ddb.delete_item(&delete_input), + |err: &DeleteItemError| { + matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_)) + }, + ) + .and_then(|_| Box::new(future::ok(()))) + .chain_err(|| "Error deleting notification"); + Box::new(response) + } + pub fn check_storage( &self, table_name: &str,