From b9f462484dd0ea014b071fb246e60a69870e322d Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Tue, 15 May 2018 09:02:18 -0700 Subject: [PATCH] feat: port store messages command to Rust Add's Rust integration test to verify the direct message is stored that was not present before. Closes #1208 --- autopush/tests/test_rs_integration.py | 15 +++++ autopush/tests/test_webpush_server.py | 27 -------- autopush/webpush_server.py | 29 --------- autopush_rs/src/call.rs | 28 -------- autopush_rs/src/client.rs | 10 +-- autopush_rs/src/util/ddb_helpers.rs | 92 +++++++++++++++++++++++---- 6 files changed, 101 insertions(+), 100 deletions(-) diff --git a/autopush/tests/test_rs_integration.py b/autopush/tests/test_rs_integration.py index 676ba00d..be7b388a 100644 --- a/autopush/tests/test_rs_integration.py +++ b/autopush/tests/test_rs_integration.py @@ -399,6 +399,21 @@ def test_delivery_repeat_without_ack(self): assert result["data"] == base64url_encode(data) yield self.shut_down(client) + @inlineCallbacks + def test_repeat_delivery_with_disconnect_without_ack(self): + data = str(uuid.uuid4()) + client = yield self.quick_register() + result = yield client.send_notification(data=data) + assert result != {} + assert result["data"] == base64url_encode(data) + yield client.disconnect() + yield client.connect() + yield client.hello() + result = yield client.get_notification() + assert result != {} + assert result["data"] == base64url_encode(data) + yield self.shut_down(client) + @inlineCallbacks def test_multiple_delivery_repeat_without_ack(self): data = str(uuid.uuid4()) diff --git a/autopush/tests/test_webpush_server.py b/autopush/tests/test_webpush_server.py index 5dc5145d..33715a01 100644 --- a/autopush/tests/test_webpush_server.py +++ b/autopush/tests/test_webpush_server.py @@ -24,7 +24,6 @@ from autopush.websocket import USER_RECORD_VERSION from autopush.webpush_server import ( MigrateUser, - StoreMessages, WebPushMessage, ) import autopush.tests @@ -114,20 +113,6 @@ def webpush_messages(obj): for _ in range(obj.message_count)] -class StoreMessageFactory(factory.Factory): - class Meta: - model = StoreMessages - - messages = factory.LazyAttribute(webpush_messages) - message_month = factory.LazyFunction( - lambda: make_rotating_tablename("message") - ) - - class Params: - message_count = 20 - uaid = factory.LazyFunction(lambda: uuid4().hex) - - class BaseSetup(unittest.TestCase): def setUp(self): self.conf = AutopushConfig( @@ -239,15 +224,3 @@ def test_no_migrate(self): prefix="message_int_test" ) assert db.message.tablename == tablename - - -class TestStoreMessagesProcessor(BaseSetup): - def _makeFUT(self): - from autopush.webpush_server import StoreMessagesUserCommand - return StoreMessagesUserCommand(self.conf, self.db) - - def test_store_messages(self): - cmd = self._makeFUT() - store_message = StoreMessageFactory() - response = cmd.process(store_message) - assert response.success is True diff --git a/autopush/webpush_server.py b/autopush/webpush_server.py index fa328b06..1d3eb057 100644 --- a/autopush/webpush_server.py +++ b/autopush/webpush_server.py @@ -128,14 +128,6 @@ class MigrateUser(InputCommand): message_month = attrib() # type: str -@attrs(slots=True) -class StoreMessages(InputCommand): - message_month = attrib() # type: str - messages = attrib( - default=attr.Factory(list) - ) # type: List[WebPushMessage] - - ############################################################################### # Output messages serialized to the outgoing queue ############################################################################### @@ -149,11 +141,6 @@ class MigrateUserResponse(OutputCommand): message_month = attrib() # type: str -@attrs(slots=True) -class StoreMessagesResponse(OutputCommand): - success = attrib(default=True) # type: bool - - ############################################################################### # Main push server class ############################################################################### @@ -222,14 +209,11 @@ def __init__(self, conf, db): self.conf = conf self.db = db self.migrate_user_proocessor = MigrateUserCommand(conf, db) - self.store_messages_process = StoreMessagesUserCommand(conf, db) self.deserialize = dict( migrate_user=MigrateUser, - store_messages=StoreMessages, ) self.command_dict = dict( migrate_user=self.migrate_user_proocessor, - store_messages=self.store_messages_process, ) # type: Dict[str, ProcessorCommand] def process_message(self, input): @@ -293,19 +277,6 @@ def process(self, command): return MigrateUserResponse(message_month=cur_month) -class StoreMessagesUserCommand(ProcessorCommand): - def process(self, command): - # type: (StoreMessages) -> StoreMessagesResponse - message = Message(command.message_month, - boto_resource=self.db.resource) - for m in command.messages: - if "topic" not in m: - m["topic"] = None - notif = WebPushMessage(**m).to_WebPushNotification() - message.store_message(notif) - return StoreMessagesResponse() - - def _validate_chid(chid): # type: (str) -> Tuple[bool, Optional[str]] """Ensure valid channel id format for register/unregister""" diff --git a/autopush_rs/src/call.rs b/autopush_rs/src/call.rs index 0e3a9f70..8d24486e 100644 --- a/autopush_rs/src/call.rs +++ b/autopush_rs/src/call.rs @@ -24,7 +24,6 @@ use serde::ser; use serde_json; use errors::*; -use protocol; use rt::{self, AutopushError, UnwindGuard}; use server::Server; @@ -119,11 +118,6 @@ enum Call { uaid: String, message_month: String, }, - - StoreMessages { - message_month: String, - messages: Vec, - }, } #[derive(Deserialize)] @@ -137,11 +131,6 @@ pub struct MigrateUserResponse { pub message_month: String, } -#[derive(Deserialize)] -pub struct StoreMessagesResponse { - pub success: bool, -} - impl Server { pub fn migrate_user( &self, @@ -156,23 +145,6 @@ impl Server { return fut; } - pub fn store_messages( - &self, - uaid: String, - message_month: String, - mut messages: Vec, - ) -> MyFuture { - for message in messages.iter_mut() { - message.uaid = Some(uaid.clone()); - } - let (call, fut) = PythonCall::new(&Call::StoreMessages { - message_month, - messages, - }); - self.send_to_python(call); - return fut; - } - fn send_to_python(&self, call: PythonCall) { self.tx.send(Some(call)).expect("python went away?"); } diff --git a/autopush_rs/src/client.rs b/autopush_rs/src/client.rs index 4e65c417..eadb66e2 100644 --- a/autopush_rs/src/client.rs +++ b/autopush_rs/src/client.rs @@ -499,6 +499,7 @@ where let mut stats = webpush.stats.clone(); let unacked_direct_notifs = webpush.unacked_direct_notifs.len(); if unacked_direct_notifs > 0 { + debug!("Writing direct notifications to storage"); stats.direct_storage += unacked_direct_notifs as i32; let mut notifs = mem::replace(&mut webpush.unacked_direct_notifs, Vec::new()); // Ensure we don't store these as legacy by setting a 0 as the sortkey_timestamp @@ -507,15 +508,14 @@ where for notif in notifs.iter_mut() { notif.sortkey_timestamp = Some(0); } - - srv.handle.spawn(srv.store_messages( - webpush.uaid.simple().to_string(), - webpush.message_month.clone(), + srv.handle.spawn(srv.ddb.store_messages( + &webpush.uaid, + &webpush.message_month, notifs, ).then(|_| { debug!("Finished saving unacked direct notifications"); Ok(()) - })) + })); } // Log out the final stats message diff --git a/autopush_rs/src/util/ddb_helpers.rs b/autopush_rs/src/util/ddb_helpers.rs index a460cf66..251741cf 100644 --- a/autopush_rs/src/util/ddb_helpers.rs +++ b/autopush_rs/src/util/ddb_helpers.rs @@ -1,3 +1,4 @@ +use std::cmp::min; /// DynamoDB Client helpers use std::collections::{HashMap, HashSet}; use std::env; @@ -16,10 +17,11 @@ use regex::RegexSet; use rusoto_core::Region; use rusoto_core::reactor::RequestDispatcher; use rusoto_credential::StaticProvider; -use rusoto_dynamodb::{AttributeValue, DeleteItemError, DeleteItemInput, DeleteItemOutput, - DynamoDb, DynamoDbClient, GetItemError, GetItemInput, GetItemOutput, - PutItemError, PutItemInput, PutItemOutput, QueryError, QueryInput, - UpdateItemError, UpdateItemInput, UpdateItemOutput}; +use rusoto_dynamodb::{AttributeValue, BatchWriteItemError, BatchWriteItemInput, DeleteItemError, + DeleteItemInput, DeleteItemOutput, DynamoDb, DynamoDbClient, GetItemError, + GetItemInput, GetItemOutput, PutItemError, PutItemInput, PutItemOutput, + PutRequest, QueryError, QueryInput, UpdateItemError, UpdateItemInput, + UpdateItemOutput, WriteRequest}; use serde::Serializer; use serde_dynamodb; @@ -182,6 +184,17 @@ impl From for HashMap { } } +impl From> for NotificationHeaders { + fn from(val: HashMap) -> NotificationHeaders { + NotificationHeaders { + crypto_key: val.get("crypto_key").map(|v| v.to_string()), + encryption: val.get("encryption").map(|v| v.to_string()), + encryption_key: val.get("encryption_key").map(|v| v.to_string()), + encoding: val.get("encoding").map(|v| v.to_string()), + } + } +} + /// Generate a last_connect /// /// This intentionally generates a limited set of keys for each month in a @@ -344,6 +357,7 @@ impl DynamoDbNotification { } } + // TODO: Implement as TryFrom whenever that lands fn to_notif(self) -> Result { let key = DynamoDbNotification::parse_sort_key(&self.chidmessageid)?; let version = key.legacy_version @@ -362,9 +376,25 @@ impl DynamoDbNotification { sortkey_timestamp: key.sortkey_timestamp, }) } -} -/// + // TODO: Implement as TryFrom when that lands in case uaid wasn't set + fn from_notif(val: Notification) -> Result { + let sort_key = val.sort_key(); + let uaid = val.uaid.ok_or("No uaid found")?; + let uaid = Uuid::parse_str(&uaid)?; + Ok(DynamoDbNotification { + uaid, + chidmessageid: sort_key, + timestamp: Some(val.timestamp), + expiry: sec_since_epoch() as u32 + min(val.ttl, MAX_EXPIRY as u32), + ttl: Some(val.ttl), + data: val.data, + headers: val.headers.map(|h| h.into()), + updateid: Some(val.version), + ..Default::default() + }) + } +} /// Basic requirements for notification content to deliver to websocket client // - channelID (the subscription website intended for) @@ -899,6 +929,47 @@ impl DynamoStorage { Box::new(response) } + /// Store a batch of messages when shutting down + pub fn store_messages( + &self, + uaid: &Uuid, + message_month: &str, + messages: Vec, + ) -> MyFuture<()> { + let ddb = self.ddb.clone(); + let put_items: Vec = messages + .into_iter() + .filter_map(|mut n| { + n.uaid = Some(uaid.simple().to_string()); + DynamoDbNotification::from_notif(n) + .map(|notif| serde_dynamodb::to_hashmap(¬if).ok()) + .unwrap_or_default() + }) + .map(|hm| WriteRequest { + put_request: Some(PutRequest { item: hm }), + delete_request: None, + }) + .collect(); + let batch_input = BatchWriteItemInput { + request_items: hashmap! { message_month.to_string() => put_items }, + ..Default::default() + }; + let response = retry_if( + move || ddb.batch_write_item(&batch_input), + |err: &BatchWriteItemError| { + matches!(err, &BatchWriteItemError::ProvisionedThroughputExceeded(_)) + }, + ) + .and_then(|_| Box::new(future::ok(()))) + .map_err(|err| { + debug!("Error saving notification: {:?}", err); + err + }) + // TODO: Use Sentry to capture/report this error + .chain_err(|| "Error saving notifications"); + Box::new(response) + } + /// Delete a given notification from the database /// /// No checks are done to see that this message came from the database or has @@ -994,8 +1065,7 @@ impl DynamoStorage { #[cfg(test)] mod tests { - use super::parse_sort_key; - use chrono::prelude::*; + use super::DynamoDbNotification; use util::us_since_epoch; use uuid::Uuid; @@ -1003,7 +1073,7 @@ mod tests { fn test_parse_sort_key_ver1() { let chid = Uuid::new_v4(); let chidmessageid = format!("01:{}:mytopic", chid.hyphenated().to_string()); - let key = parse_sort_key(&chidmessageid).unwrap(); + let key = DynamoDbNotification::parse_sort_key(&chidmessageid).unwrap(); assert_eq!(key.topic, Some("mytopic".to_string())); assert_eq!(key.channel_id, chid); assert_eq!(key.sortkey_timestamp, None); @@ -1014,7 +1084,7 @@ mod tests { let chid = Uuid::new_v4(); let sortkey_timestamp = us_since_epoch(); let chidmessageid = format!("02:{}:{}", sortkey_timestamp, chid.hyphenated().to_string()); - let key = parse_sort_key(&chidmessageid).unwrap(); + let key = DynamoDbNotification::parse_sort_key(&chidmessageid).unwrap(); assert_eq!(key.topic, None); assert_eq!(key.channel_id, chid); assert_eq!(key.sortkey_timestamp, Some(sortkey_timestamp)); @@ -1023,7 +1093,7 @@ mod tests { #[test] fn test_parse_sort_key_bad_values() { for val in vec!["02j3i2o", "03:ffas:wef", "01::mytopic", "02:oops:ohnoes"] { - let key = parse_sort_key(val); + let key = DynamoDbNotification::parse_sort_key(val); assert!(key.is_err()); } }