Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: port store messages command to Rust
Browse files Browse the repository at this point in the history
Add's Rust integration test to verify the direct message is stored that
was not present before.

Closes #1208
  • Loading branch information
bbangert committed May 15, 2018
1 parent 8849b83 commit a83cfc6
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 116 deletions.
15 changes: 15 additions & 0 deletions autopush/tests/test_rs_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,21 @@ def test_delivery_repeat_without_ack(self):
assert result["data"] == base64url_encode(data)
yield self.shut_down(client)

@inlineCallbacks
def test_repeat_delivery_with_disconnect_without_ack(self):
data = str(uuid.uuid4())
client = yield self.quick_register()
result = yield client.send_notification(data=data)
assert result != {}
assert result["data"] == base64url_encode(data)
yield client.disconnect()
yield client.connect()
yield client.hello()
result = yield client.get_notification()
assert result != {}
assert result["data"] == base64url_encode(data)
yield self.shut_down(client)

@inlineCallbacks
def test_multiple_delivery_repeat_without_ack(self):
data = str(uuid.uuid4())
Expand Down
27 changes: 0 additions & 27 deletions autopush/tests/test_webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from autopush.websocket import USER_RECORD_VERSION
from autopush.webpush_server import (
MigrateUser,
StoreMessages,
WebPushMessage,
)
import autopush.tests
Expand Down Expand Up @@ -114,20 +113,6 @@ def webpush_messages(obj):
for _ in range(obj.message_count)]


class StoreMessageFactory(factory.Factory):
class Meta:
model = StoreMessages

messages = factory.LazyAttribute(webpush_messages)
message_month = factory.LazyFunction(
lambda: make_rotating_tablename("message")
)

class Params:
message_count = 20
uaid = factory.LazyFunction(lambda: uuid4().hex)


class BaseSetup(unittest.TestCase):
def setUp(self):
self.conf = AutopushConfig(
Expand Down Expand Up @@ -239,15 +224,3 @@ def test_no_migrate(self):
prefix="message_int_test"
)
assert db.message.tablename == tablename


class TestStoreMessagesProcessor(BaseSetup):
def _makeFUT(self):
from autopush.webpush_server import StoreMessagesUserCommand
return StoreMessagesUserCommand(self.conf, self.db)

def test_store_messages(self):
cmd = self._makeFUT()
store_message = StoreMessageFactory()
response = cmd.process(store_message)
assert response.success is True
29 changes: 0 additions & 29 deletions autopush/webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,6 @@ class MigrateUser(InputCommand):
message_month = attrib() # type: str


@attrs(slots=True)
class StoreMessages(InputCommand):
message_month = attrib() # type: str
messages = attrib(
default=attr.Factory(list)
) # type: List[WebPushMessage]


###############################################################################
# Output messages serialized to the outgoing queue
###############################################################################
Expand All @@ -149,11 +141,6 @@ class MigrateUserResponse(OutputCommand):
message_month = attrib() # type: str


@attrs(slots=True)
class StoreMessagesResponse(OutputCommand):
success = attrib(default=True) # type: bool


###############################################################################
# Main push server class
###############################################################################
Expand Down Expand Up @@ -222,14 +209,11 @@ def __init__(self, conf, db):
self.conf = conf
self.db = db
self.migrate_user_proocessor = MigrateUserCommand(conf, db)
self.store_messages_process = StoreMessagesUserCommand(conf, db)
self.deserialize = dict(
migrate_user=MigrateUser,
store_messages=StoreMessages,
)
self.command_dict = dict(
migrate_user=self.migrate_user_proocessor,
store_messages=self.store_messages_process,
) # type: Dict[str, ProcessorCommand]

def process_message(self, input):
Expand Down Expand Up @@ -293,19 +277,6 @@ def process(self, command):
return MigrateUserResponse(message_month=cur_month)


class StoreMessagesUserCommand(ProcessorCommand):
def process(self, command):
# type: (StoreMessages) -> StoreMessagesResponse
message = Message(command.message_month,
boto_resource=self.db.resource)
for m in command.messages:
if "topic" not in m:
m["topic"] = None
notif = WebPushMessage(**m).to_WebPushNotification()
message.store_message(notif)
return StoreMessagesResponse()


def _validate_chid(chid):
# type: (str) -> Tuple[bool, Optional[str]]
"""Ensure valid channel id format for register/unregister"""
Expand Down
16 changes: 0 additions & 16 deletions autopush_rs/build.rs

This file was deleted.

28 changes: 0 additions & 28 deletions autopush_rs/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use serde::ser;
use serde_json;

use errors::*;
use protocol;
use rt::{self, AutopushError, UnwindGuard};
use server::Server;

Expand Down Expand Up @@ -119,11 +118,6 @@ enum Call {
uaid: String,
message_month: String,
},

StoreMessages {
message_month: String,
messages: Vec<protocol::Notification>,
},
}

#[derive(Deserialize)]
Expand All @@ -137,11 +131,6 @@ pub struct MigrateUserResponse {
pub message_month: String,
}

#[derive(Deserialize)]
pub struct StoreMessagesResponse {
pub success: bool,
}

impl Server {
pub fn migrate_user(
&self,
Expand All @@ -156,23 +145,6 @@ impl Server {
return fut;
}

pub fn store_messages(
&self,
uaid: String,
message_month: String,
mut messages: Vec<protocol::Notification>,
) -> MyFuture<StoreMessagesResponse> {
for message in messages.iter_mut() {
message.uaid = Some(uaid.clone());
}
let (call, fut) = PythonCall::new(&Call::StoreMessages {
message_month,
messages,
});
self.send_to_python(call);
return fut;
}

fn send_to_python(&self, call: PythonCall) {
self.tx.send(Some(call)).expect("python went away?");
}
Expand Down
10 changes: 5 additions & 5 deletions autopush_rs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ where
let mut stats = webpush.stats.clone();
let unacked_direct_notifs = webpush.unacked_direct_notifs.len();
if unacked_direct_notifs > 0 {
debug!("Writing direct notifications to storage");
stats.direct_storage += unacked_direct_notifs as i32;
let mut notifs = mem::replace(&mut webpush.unacked_direct_notifs, Vec::new());
// Ensure we don't store these as legacy by setting a 0 as the sortkey_timestamp
Expand All @@ -507,15 +508,14 @@ where
for notif in notifs.iter_mut() {
notif.sortkey_timestamp = Some(0);
}

srv.handle.spawn(srv.store_messages(
webpush.uaid.simple().to_string(),
webpush.message_month.clone(),
srv.handle.spawn(srv.ddb.store_messages(
&webpush.uaid,
&webpush.message_month,
notifs,
).then(|_| {
debug!("Finished saving unacked direct notifications");
Ok(())
}))
}));
}

// Log out the final stats message
Expand Down
94 changes: 83 additions & 11 deletions autopush_rs/src/util/ddb_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::min;
/// DynamoDB Client helpers
use std::collections::{HashMap, HashSet};
use std::env;
Expand All @@ -16,10 +17,11 @@ use regex::RegexSet;
use rusoto_core::Region;
use rusoto_core::reactor::RequestDispatcher;
use rusoto_credential::StaticProvider;
use rusoto_dynamodb::{AttributeValue, DeleteItemError, DeleteItemInput, DeleteItemOutput,
DynamoDb, DynamoDbClient, GetItemError, GetItemInput, GetItemOutput,
PutItemError, PutItemInput, PutItemOutput, QueryError, QueryInput,
UpdateItemError, UpdateItemInput, UpdateItemOutput};
use rusoto_dynamodb::{AttributeValue, BatchWriteItemError, BatchWriteItemInput, DeleteItemError,
DeleteItemInput, DeleteItemOutput, DynamoDb, DynamoDbClient, GetItemError,
GetItemInput, GetItemOutput, PutItemError, PutItemInput, PutItemOutput,
PutRequest, QueryError, QueryInput, UpdateItemError, UpdateItemInput,
UpdateItemOutput, WriteRequest};
use serde::Serializer;
use serde_dynamodb;

Expand Down Expand Up @@ -182,6 +184,17 @@ impl From<NotificationHeaders> for HashMap<String, String> {
}
}

impl From<HashMap<String, String>> for NotificationHeaders {
fn from(val: HashMap<String, String>) -> NotificationHeaders {
NotificationHeaders {
crypto_key: val.get("crypto_key").map(|v| v.to_string()),
encryption: val.get("encryption").map(|v| v.to_string()),
encryption_key: val.get("encryption_key").map(|v| v.to_string()),
encoding: val.get("encoding").map(|v| v.to_string()),
}
}
}

/// Generate a last_connect
///
/// This intentionally generates a limited set of keys for each month in a
Expand Down Expand Up @@ -344,6 +357,7 @@ impl DynamoDbNotification {
}
}

// TODO: Implement as TryFrom whenever that lands
fn to_notif(self) -> Result<Notification> {
let key = DynamoDbNotification::parse_sort_key(&self.chidmessageid)?;
let version = key.legacy_version
Expand All @@ -362,9 +376,25 @@ impl DynamoDbNotification {
sortkey_timestamp: key.sortkey_timestamp,
})
}
}

///
// TODO: Implement as TryFrom when that lands in case uaid wasn't set
fn from_notif(val: Notification) -> Result<DynamoDbNotification> {
let sort_key = val.sort_key();
let uaid = val.uaid.ok_or("No uaid found")?;
let uaid = Uuid::parse_str(&uaid)?;
Ok(DynamoDbNotification {
uaid,
chidmessageid: sort_key,
timestamp: Some(val.timestamp),
expiry: sec_since_epoch() as u32 + min(val.ttl, MAX_EXPIRY as u32),
ttl: Some(val.ttl),
data: val.data,
headers: val.headers.map(|h| h.into()),
updateid: Some(val.version),
..Default::default()
})
}
}

/// Basic requirements for notification content to deliver to websocket client
// - channelID (the subscription website intended for)
Expand Down Expand Up @@ -899,6 +929,49 @@ impl DynamoStorage {
Box::new(response)
}

/// Store a batch of messages when shutting down
pub fn store_messages(
&self,
uaid: &Uuid,
message_month: &str,
messages: Vec<Notification>,
) -> MyFuture<()> {
let ddb = self.ddb.clone();
let put_items: Vec<WriteRequest> = messages
.into_iter()
.filter_map(|mut n| {
n.uaid = Some(uaid.simple().to_string());
DynamoDbNotification::from_notif(n)
.map(|notif| serde_dynamodb::to_hashmap(&notif).ok())
.unwrap_or_default()
})
.map(|hm| WriteRequest {
put_request: Some(PutRequest { item: hm }),
delete_request: None,
})
.collect();
let mut request_items = HashMap::new();
request_items.insert(message_month.to_string(), put_items);
let batch_input = BatchWriteItemInput {
request_items: request_items,
..Default::default()
};
let response = retry_if(
move || ddb.batch_write_item(&batch_input),
|err: &BatchWriteItemError| {
matches!(err, &BatchWriteItemError::ProvisionedThroughputExceeded(_))
},
)
.and_then(|_| Box::new(future::ok(())))
.map_err(|err| {
debug!("Error saving notification: {:?}", err);
err
})
// TODO: Use Sentry to capture/report this error
.chain_err(|| "Error saving notifications");
Box::new(response)
}

/// Delete a given notification from the database
///
/// No checks are done to see that this message came from the database or has
Expand Down Expand Up @@ -994,16 +1067,15 @@ impl DynamoStorage {

#[cfg(test)]
mod tests {
use super::parse_sort_key;
use chrono::prelude::*;
use super::DynamoDbNotification;
use util::us_since_epoch;
use uuid::Uuid;

#[test]
fn test_parse_sort_key_ver1() {
let chid = Uuid::new_v4();
let chidmessageid = format!("01:{}:mytopic", chid.hyphenated().to_string());
let key = parse_sort_key(&chidmessageid).unwrap();
let key = DynamoDbNotification::parse_sort_key(&chidmessageid).unwrap();
assert_eq!(key.topic, Some("mytopic".to_string()));
assert_eq!(key.channel_id, chid);
assert_eq!(key.sortkey_timestamp, None);
Expand All @@ -1014,7 +1086,7 @@ mod tests {
let chid = Uuid::new_v4();
let sortkey_timestamp = us_since_epoch();
let chidmessageid = format!("02:{}:{}", sortkey_timestamp, chid.hyphenated().to_string());
let key = parse_sort_key(&chidmessageid).unwrap();
let key = DynamoDbNotification::parse_sort_key(&chidmessageid).unwrap();
assert_eq!(key.topic, None);
assert_eq!(key.channel_id, chid);
assert_eq!(key.sortkey_timestamp, Some(sortkey_timestamp));
Expand All @@ -1023,7 +1095,7 @@ mod tests {
#[test]
fn test_parse_sort_key_bad_values() {
for val in vec!["02j3i2o", "03:ffas:wef", "01::mytopic", "02:oops:ohnoes"] {
let key = parse_sort_key(val);
let key = DynamoDbNotification::parse_sort_key(val);
assert!(key.is_err());
}
}
Expand Down

0 comments on commit a83cfc6

Please sign in to comment.