Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
refactor: prefer impl trait when possible
Browse files Browse the repository at this point in the history
also kill some unnecessary boxing

Issue #1238
  • Loading branch information
pjenvey committed May 18, 2018
1 parent 1b2fdcf commit 04cae08
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 153 deletions.
223 changes: 103 additions & 120 deletions autopush_rs/src/db/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,52 +40,52 @@ pub fn fetch_messages(
table_name: &str,
uaid: &Uuid,
limit: u32,
) -> MyFuture<FetchMessageResponse> {
) -> impl Future<Item = FetchMessageResponse, Error = Error> {
let attr_values = hashmap! {
":uaid".to_string() => val!(S => uaid.simple().to_string()),
":cmi".to_string() => val!(S => "02"),
};
let query_input = QueryInput {
let input = QueryInput {
key_condition_expression: Some("uaid = :uaid AND chidmessageid < :cmi".to_string()),
expression_attribute_values: Some(attr_values),
table_name: table_name.to_string(),
limit: Some(limit as i64),
..Default::default()
};
let response = retry_if(
move || ddb.query(&query_input),
retry_if(
move || ddb.query(&input),
|err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error fetching messages");
let response = response.and_then(|data| {
let mut notifs: Vec<DynamoDbNotification> = data.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
// TODO: Capture translation errors and report them as we shouldn't
// have corrupt data
items
.into_iter()
.inspect(|i| debug!("Item: {:?}", i))
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.collect()
});
if notifs.is_empty() {
return Ok(Default::default());
}
).chain_err(|| "Error fetching messages")
.and_then(|output| {
let mut notifs: Vec<DynamoDbNotification> =
output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
// TODO: Capture translation errors and report them as we shouldn't
// have corrupt data
items
.into_iter()
.inspect(|i| debug!("Item: {:?}", i))
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.collect()
});
if notifs.is_empty() {
return Ok(Default::default());
}

// Load the current_timestamp from the subscription registry entry which is
// the first DynamoDbNotification and remove it from the vec.
let timestamp = notifs.remove(0).current_timestamp;
// Convert any remaining DynamoDbNotifications to Notification's
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
let messages = notifs
.into_iter()
.filter_map(|ddb_notif| ddb_notif.into_notif().ok())
.collect();
Ok(FetchMessageResponse {
timestamp,
messages,
// Load the current_timestamp from the subscription registry entry which is
// the first DynamoDbNotification and remove it from the vec.
let timestamp = notifs.remove(0).current_timestamp;
// Convert any remaining DynamoDbNotifications to Notification's
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
let messages = notifs
.into_iter()
.filter_map(|ddb_notif| ddb_notif.into_notif().ok())
.collect();
Ok(FetchMessageResponse {
timestamp,
messages,
})
})
});
Box::new(response)
}

pub fn fetch_timestamp_messages(
Expand All @@ -94,7 +94,7 @@ pub fn fetch_timestamp_messages(
uaid: &Uuid,
timestamp: Option<u64>,
limit: u32,
) -> MyFuture<FetchMessageResponse> {
) -> impl Future<Item = FetchMessageResponse, Error = Error> {
let range_key = if let Some(ts) = timestamp {
format!("02:{}:z", ts)
} else {
Expand All @@ -104,123 +104,116 @@ pub fn fetch_timestamp_messages(
":uaid".to_string() => val!(S => uaid.simple().to_string()),
":cmi".to_string() => val!(S => range_key),
};
let query_input = QueryInput {
let input = QueryInput {
key_condition_expression: Some("uaid = :uaid AND chidmessageid > :cmi".to_string()),
expression_attribute_values: Some(attr_values),
table_name: table_name.to_string(),
limit: Some(limit as i64),
..Default::default()
};
let response = retry_if(
move || ddb.query(&query_input),
retry_if(
move || ddb.query(&input),
|err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error fetching messages");
let response = response.and_then(|data| {
let messages = data.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
items
.into_iter()
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.filter_map(|ddb_notif: DynamoDbNotification| ddb_notif.into_notif().ok())
.collect()
});
if messages.is_empty() {
return Ok(Default::default());
}
).chain_err(|| "Error fetching messages")
.and_then(|output| {
let messages = output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
items
.into_iter()
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.filter_map(|ddb_notif: DynamoDbNotification| ddb_notif.into_notif().ok())
.collect()
});
if messages.is_empty() {
return Ok(Default::default());
}

let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max();
Ok(FetchMessageResponse {
timestamp,
messages,
let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max();
Ok(FetchMessageResponse {
timestamp,
messages,
})
})
});
Box::new(response)
}

pub fn drop_user(
ddb: Rc<Box<DynamoDb>>,
uaid: &Uuid,
router_table_name: &str,
) -> MyFuture<DeleteItemOutput> {
let delete_input = DeleteItemInput {
) -> impl Future<Item = DeleteItemOutput, Error = Error> {
let input = DeleteItemInput {
table_name: router_table_name.to_string(),
key: ddb_item! { uaid: s => uaid.simple().to_string() },
..Default::default()
};
let response = retry_if(
move || ddb.delete_item(&delete_input),
retry_if(
move || ddb.delete_item(&input),
|err: &DeleteItemError| matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error dropping user");
Box::new(response)
).chain_err(|| "Error dropping user")
}

fn get_uaid(
ddb: Rc<Box<DynamoDb>>,
uaid: &Uuid,
router_table_name: &str,
) -> MyFuture<GetItemOutput> {
let get_input = GetItemInput {
) -> impl Future<Item = GetItemOutput, Error = Error> {
let input = GetItemInput {
table_name: router_table_name.to_string(),
consistent_read: Some(true),
key: ddb_item! { uaid: s => uaid.simple().to_string() },
..Default::default()
};
let response = retry_if(
move || ddb.get_item(&get_input),
retry_if(
move || ddb.get_item(&input),
|err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error fetching user");
Box::new(response)
).chain_err(|| "Error fetching user")
}

pub fn register_user(
ddb: Rc<Box<DynamoDb>>,
user: &DynamoDbUser,
router_table: &str,
) -> MyFuture<PutItemOutput> {
) -> impl Future<Item = PutItemOutput, Error = Error> {
let item = match serde_dynamodb::to_hashmap(user) {
Ok(item) => item,
Err(e) => return Box::new(future::err(e)).chain_err(|| "Failed to serialize item"),
Err(e) => return future::err(e).chain_err(|| "Failed to serialize item"),
};
let router_table = router_table.to_string();
let attr_values = hashmap! {
":router_type".to_string() => val!(S => user.router_type),
":connected_at".to_string() => val!(N => user.connected_at),
};
let response: MyFuture<PutItemOutput> = {
let ddb_response = retry_if(
move || {
debug!("Registering user: {:?}", item);
ddb.put_item(&PutItemInput {
item: item.clone(),
table_name: router_table.clone(),
expression_attribute_values: Some(attr_values.clone()),
condition_expression: Some(
r#"(
retry_if(
move || {
debug!("Registering user: {:?}", item);
ddb.put_item(&PutItemInput {
item: item.clone(),
table_name: router_table.clone(),
expression_attribute_values: Some(attr_values.clone()),
condition_expression: Some(
r#"(
attribute_not_exists(router_type) or
(router_type = :router_type)
) and (
attribute_not_exists(node_id) or
(connected_at < :connected_at)
)"#.to_string(),
),
return_values: Some("ALL_OLD".to_string()),
..Default::default()
})
},
|err: &PutItemError| matches!(err, &PutItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error storing user record");
Box::new(ddb_response)
};
Box::new(response)
),
return_values: Some("ALL_OLD".to_string()),
..Default::default()
})
},
|err: &PutItemError| matches!(err, &PutItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error storing user record")
}

pub fn update_user_message_month(
ddb: Rc<Box<DynamoDb>>,
uaid: &Uuid,
router_table_name: &str,
message_month: &str,
) -> MyFuture<()> {
) -> impl Future<Item = (), Error = Error> {
let attr_values = hashmap! {
":curmonth".to_string() => val!(S => message_month.to_string()),
":lastconnect".to_string() => val!(N => generate_last_connect().to_string()),
Expand All @@ -234,22 +227,18 @@ pub fn update_user_message_month(
table_name: router_table_name.to_string(),
..Default::default()
};
let ddb_response = retry_if(
move || {
ddb.update_item(&update_item)
.and_then(|_| Box::new(future::ok(())))
},
retry_if(
move || ddb.update_item(&update_item).and_then(|_| future::ok(())),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error updating user message month");
Box::new(ddb_response)
).chain_err(|| "Error updating user message month")
}

pub fn all_channels(
ddb: Rc<Box<DynamoDb>>,
uaid: &Uuid,
message_table_name: &str,
) -> MyFuture<HashSet<String>> {
let get_input = GetItemInput {
) -> impl Future<Item = HashSet<String>, Error = Error> {
let input = GetItemInput {
table_name: message_table_name.to_string(),
consistent_read: Some(true),
key: ddb_item! {
Expand All @@ -258,30 +247,29 @@ pub fn all_channels(
},
..Default::default()
};
let response = retry_if(
move || ddb.get_item(&get_input),
retry_if(
move || ddb.get_item(&input),
|err: &GetItemError| matches!(err, &GetItemError::ProvisionedThroughputExceeded(_)),
).and_then(|get_item_output| {
let channels = get_item_output
).and_then(|output| {
let channels = output
.item
.and_then(|item| {
serde_dynamodb::from_hashmap::<DynamoDbNotification>(item)
.ok()
.and_then(|notif| notif.chids)
})
.unwrap_or_else(HashSet::new);
Box::new(future::ok(channels))
future::ok(channels)
})
.or_else(|_err| Box::new(future::ok(HashSet::new())));
Box::new(response)
.or_else(|_err| future::ok(HashSet::new()))
}

pub fn save_channels(
ddb: Rc<Box<DynamoDb>>,
uaid: &Uuid,
channels: HashSet<String>,
message_table_name: &str,
) -> MyFuture<()> {
) -> impl Future<Item = (), Error = Error> {
let chids: Vec<String> = channels.into_iter().collect();
let expiry = sec_since_epoch() + 2 * MAX_EXPIRY;
let attr_values = hashmap! {
Expand All @@ -298,22 +286,18 @@ pub fn save_channels(
table_name: message_table_name.to_string(),
..Default::default()
};
let ddb_response = retry_if(
move || {
ddb.update_item(&update_item)
.and_then(|_| Box::new(future::ok(())))
},
retry_if(
move || ddb.update_item(&update_item).and_then(|_| future::ok(())),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error saving channels");
Box::new(ddb_response)
).chain_err(|| "Error saving channels")
}

pub fn unregister_channel_id(
ddb: Rc<Box<DynamoDb>>,
uaid: &Uuid,
channel_id: &Uuid,
message_table_name: &str,
) -> MyFuture<UpdateItemOutput> {
) -> impl Future<Item = UpdateItemOutput, Error = Error> {
let chid = channel_id.hyphenated().to_string();
let attr_values = hashmap! {
":channel_id".to_string() => val!(SS => vec![chid]),
Expand All @@ -328,11 +312,10 @@ pub fn unregister_channel_id(
table_name: message_table_name.to_string(),
..Default::default()
};
let ddb_response = retry_if(
retry_if(
move || ddb.update_item(&update_item),
|err: &UpdateItemError| matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)),
).chain_err(|| "Error unregistering channel");
Box::new(ddb_response)
).chain_err(|| "Error unregistering channel")
}

pub fn lookup_user(
Expand Down
Loading

0 comments on commit 04cae08

Please sign in to comment.