Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1219 from mozilla-services/refactor/check_storage
Browse files Browse the repository at this point in the history
refactor: some cleanup for #1202
  • Loading branch information
bbangert authored May 7, 2018
2 parents 2cfdf2b + bd0f37f commit 49ab486
Showing 1 changed file with 80 additions and 94 deletions.
174 changes: 80 additions & 94 deletions autopush_rs/src/util/ddb_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub struct NotificationHeaders {
encoding: Option<String>,
}

fn insert_to_map(map: &mut HashMap<String, String>, val: Option<String>, name: &str) {
fn insert_to_map(map: &mut HashMap<String, String>, name: &str, val: Option<String>) {
if let Some(val) = val {
map.insert(name.to_string(), val);
}
Expand All @@ -151,10 +151,10 @@ fn insert_to_map(map: &mut HashMap<String, String>, val: Option<String>, name: &
impl From<NotificationHeaders> for HashMap<String, String> {
fn from(val: NotificationHeaders) -> HashMap<String, String> {
let mut map = HashMap::new();
insert_to_map(&mut map, val.crypto_key, "crypto_key");
insert_to_map(&mut map, val.encryption, "encryption");
insert_to_map(&mut map, val.encryption_key, "encryption_key");
insert_to_map(&mut map, val.encoding, "encoding");
insert_to_map(&mut map, "crypto_key", val.crypto_key);
insert_to_map(&mut map, "encryption", val.encryption);
insert_to_map(&mut map, "encryption_key", val.encryption_key);
insert_to_map(&mut map, "encoding", val.encoding);
map
}
}
Expand Down Expand Up @@ -209,45 +209,48 @@ fn parse_sort_key(key: &str) -> Result<RangeKey> {
if !RE.is_match(key) {
return Err("Invalid chidmessageid".into()).into();
}
if key.starts_with("01:") {
let v: Vec<&str> = key.split(":").collect();
if v.len() != 3 {
return Err("Invalid topic key".into());

let v: Vec<&str> = key.split(":").collect();
match v[0] {
"01" => {
if v.len() != 3 {
return Err("Invalid topic key".into());
}
let (channel_id, topic) = (v[1], v[2]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: Some(topic.to_string()),
sortkey_timestamp: None,
legacy_version: None,
})
}
let (channel_id, topic) = (v[1], v[2]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: Some(topic.to_string()),
sortkey_timestamp: None,
legacy_version: None,
})
} else if key.starts_with("02:") {
let v: Vec<&str> = key.split(":").collect();
if v.len() != 3 {
return Err("Invalid topic key".into());
"02" => {
if v.len() != 3 {
return Err("Invalid topic key".into());
}
let (sortkey, channel_id) = (v[1], v[2]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: None,
sortkey_timestamp: Some(sortkey.parse()?),
legacy_version: None,
})
}
let (sortkey, channel_id) = (v[1], v[2]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: None,
sortkey_timestamp: Some(sortkey.parse()?),
legacy_version: None,
})
} else {
let v: Vec<&str> = key.split(":").collect();
if v.len() != 2 {
return Err("Invalid topic key".into());
_ => {
if v.len() != 2 {
return Err("Invalid topic key".into());
}
let (channel_id, legacy_version) = (v[0], v[1]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: None,
sortkey_timestamp: None,
legacy_version: Some(legacy_version.to_string()),
})
}
let (channel_id, legacy_version) = (v[0], v[1]);
let channel_id = Uuid::parse_str(channel_id)?;
Ok(RangeKey {
channel_id,
topic: None,
sortkey_timestamp: None,
legacy_version: Some(legacy_version.to_string()),
})
}
}

Expand Down Expand Up @@ -290,7 +293,7 @@ pub struct CheckStorageResponse {
#[derive(Default)]
pub struct FetchMessageResponse {
pub timestamp: Option<u64>,
pub messages: Option<Vec<Notification>>,
pub messages: Vec<Notification>,
}

pub struct DynamoStorage {
Expand Down Expand Up @@ -400,45 +403,32 @@ impl DynamoStorage {
|err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)),
).map_err(|_| "Error fetching messages".into());
let response = response.and_then(|data| {
let mut notifs: Option<Vec<DynamoDbNotification>> = data.items.and_then(|items| {
let mut notifs: Vec<DynamoDbNotification> = data.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
Some(
items
.into_iter()
.inspect(|i| debug!("Item: {:?}", i))
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.collect(),
)
// TODO: Capture translation errors and report them as we shouldn't
// have corrupt data
items
.into_iter()
.inspect(|i| debug!("Item: {:?}", i))
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.collect()
});
if notifs.is_empty() {
return Ok(Default::default());
}

// Load the current_timestamp from the subscription registry entry which is
// the first DynamoDbNotification and remove it from the vec.
let mut timestamp = None;
if let Some(ref mut messages) = notifs {
if messages.len() == 0 {
return Ok(Default::default());
}
let first = messages.remove(0);
if let Some(ts) = first.current_timestamp {
timestamp = Some(ts);
}
}
let timestamp = notifs.remove(0).current_timestamp;
// Convert any remaining DynamoDbNotifications to Notification's
let notifs = notifs.and_then(|items| {
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
let items: Vec<Notification> = items
.into_iter()
.filter_map(|ddb_notif| ddb_notif.to_notif().ok())
.collect();
if items.len() > 0 {
Some(items)
} else {
None
}
});
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
let messages = notifs
.into_iter()
.filter_map(|ddb_notif| ddb_notif.to_notif().ok())
.collect();
Ok(FetchMessageResponse {
timestamp,
messages: notifs,
messages,
})
});
Box::new(response)
Expand Down Expand Up @@ -489,27 +479,23 @@ impl DynamoStorage {
|err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)),
).map_err(|_| "Error fetching messages".into());
let response = response.and_then(|data| {
let notifs: Option<Vec<Notification>> = data.items.and_then(|items| {
let messages = data.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
Some(
items
.into_iter()
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.filter_map(|ddb_notif: DynamoDbNotification| ddb_notif.to_notif().ok())
.collect(),
)
items
.into_iter()
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.filter_map(|ddb_notif: DynamoDbNotification| ddb_notif.to_notif().ok())
.collect()
});
let mut timestamp = None;
if let Some(ref messages) = notifs {
if messages.len() == 0 {
return Ok(Default::default());
}
timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max();
if messages.is_empty() {
return Ok(Default::default());
}

let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max();
Ok(FetchMessageResponse {
timestamp,
messages: notifs,
messages,
})
});
Box::new(response)
Expand All @@ -533,11 +519,11 @@ impl DynamoStorage {
let ddb2 = self.ddb.clone();
let response = response.and_then(move |resp| {
// Return now from this future if we have messages
if let Some(messages) = resp.messages {
debug!("Topic message returns: {:?}", messages);
if !resp.messages.is_empty() {
debug!("Topic message returns: {:?}", resp.messages);
return future::Either::A(future::ok(CheckStorageResponse {
include_topic: true,
messages,
messages: resp.messages,
timestamp: resp.timestamp,
}));
}
Expand All @@ -548,7 +534,7 @@ impl DynamoStorage {
timestamp
};
let next_query = {
if resp.messages.is_none() || resp.timestamp.is_some() {
if resp.messages.is_empty() || resp.timestamp.is_some() {
DynamoStorage::fetch_timestamp_messages(
ddb2,
table_name.as_ref(),
Expand All @@ -566,7 +552,7 @@ impl DynamoStorage {
let timestamp = resp.timestamp.or(timestamp);
Ok(CheckStorageResponse {
include_topic: false,
messages: resp.messages.unwrap_or(Vec::new()),
messages: resp.messages,
timestamp,
})
});
Expand Down

0 comments on commit 49ab486

Please sign in to comment.