Skip to content

Commit

Permalink
fix: Copy and upgrade parts of DynamoStorage into autoendpoint (#174)
Browse files Browse the repository at this point in the history
* Copy and upgrade parts of DynamoStorage into autoendpoint

Autoendpoint no longer uses any Tokio 0.1 code (and consequently doesn't
crash).

Some of the autopush_common macros are exported so they can be used in
the autoendpoint database client.

The autoendpoint database client returns an option for get_user instead
of an error if it doesn't exist.

* Don't expose internal database error messages in responses

* Fix hashmap macro example

* Enable the log feature of "again"

* Handle both serialization AND deserialization errors in DbClient

Closes #172
  • Loading branch information
AzureMarker authored Jul 21, 2020
1 parent 147aed8 commit 120a46b
Show file tree
Hide file tree
Showing 14 changed files with 540 additions and 56 deletions.
247 changes: 247 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions autoendpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ actix-http = "1.0"
actix-web = "2.0"
actix-rt = "1.0"
actix-cors = "0.2.0"
again = { version = "0.1.2", default-features = false, features = ["log"] }
async-trait = "0.1.36"
autopush_common = { path = "../autopush-common" }
backtrace = "0.3"
Expand All @@ -24,8 +25,11 @@ lazy_static = "1.4.0"
openssl = "0.10"
regex = "1.3"
reqwest = "0.10.6"
rusoto_core = "0.44.0"
rusoto_dynamodb = "0.44.0"
sentry = { version = "0.18", features = ["with_curl_transport"] }
serde = { version = "1.0", features = ["derive"] }
serde_dynamodb = "0.5.1"
serde_json = "1.0"
slog = { version = "2.5", features = ["max_level_trace", "release_max_level_error", "dynamic-keys"] }
slog-async = "2.4"
Expand Down
183 changes: 183 additions & 0 deletions autoendpoint/src/db/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use crate::db::error::{DbError, DbResult};
use crate::db::retry::{
retry_policy, retryable_delete_error, retryable_getitem_error, retryable_putitem_error,
retryable_updateitem_error,
};
use autopush_common::db::{DynamoDbNotification, DynamoDbUser};
use autopush_common::notification::Notification;
use autopush_common::{ddb_item, hashmap, val};
use cadence::StatsdClient;
use rusoto_core::credential::StaticProvider;
use rusoto_core::{HttpClient, Region};
use rusoto_dynamodb::{
AttributeValue, DeleteItemInput, DynamoDb, DynamoDbClient, GetItemInput, PutItemInput,
UpdateItemInput,
};
use std::collections::HashSet;
use std::env;
use uuid::Uuid;

/// Provides high-level operations over the DynamoDB database
#[derive(Clone)]
pub struct DbClient {
ddb: DynamoDbClient,
metrics: StatsdClient,
router_table: String,
pub message_table: String,
}

impl DbClient {
pub fn new(
metrics: StatsdClient,
router_table: String,
message_table: String,
) -> DbResult<Self> {
let ddb = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") {
DynamoDbClient::new_with(
HttpClient::new().expect("TLS initialization error"),
StaticProvider::new_minimal("BogusKey".to_string(), "BogusKey".to_string()),
Region::Custom {
name: "us-east-1".to_string(),
endpoint,
},
)
} else {
DynamoDbClient::new(Region::default())
};

Ok(Self {
ddb,
metrics,
router_table,
message_table,
})
}

/// Read a user from the database
pub async fn get_user(&self, uaid: Uuid) -> DbResult<Option<DynamoDbUser>> {
let input = GetItemInput {
table_name: self.router_table.clone(),
consistent_read: Some(true),
key: ddb_item! { uaid: s => uaid.to_simple().to_string() },
..Default::default()
};

retry_policy()
.retry_if(
|| self.ddb.get_item(input.clone()),
retryable_getitem_error(self.metrics.clone()),
)
.await?
.item
.map(serde_dynamodb::from_hashmap)
.transpose()
.map_err(DbError::from)
}

/// Delete a user from the router table
pub async fn drop_user(&self, uaid: Uuid) -> DbResult<()> {
let input = DeleteItemInput {
table_name: self.router_table.clone(),
key: ddb_item! { uaid: s => uaid.to_simple().to_string() },
..Default::default()
};

retry_policy()
.retry_if(
|| self.ddb.delete_item(input.clone()),
retryable_delete_error(self.metrics.clone()),
)
.await?;
Ok(())
}

/// Get the set of channel IDs for a user
pub async fn get_user_channels(&self, uaid: Uuid) -> DbResult<HashSet<Uuid>> {
// Channel IDs are stored in a special row in the message table, where
// chidmessageid = " "
let input = GetItemInput {
table_name: self.message_table.clone(),
consistent_read: Some(true),
key: ddb_item! {
uaid: s => uaid.to_simple().to_string(),
chidmessageid: s => " ".to_string()
},
..Default::default()
};

let output = retry_policy()
.retry_if(
|| self.ddb.get_item(input.clone()),
retryable_getitem_error(self.metrics.clone()),
)
.await?;

// The channel IDs are in the notification's `chids` field
let channels = output
.item
// Deserialize the notification
.map(serde_dynamodb::from_hashmap::<DynamoDbNotification, _>)
.transpose()?
// Extract the channel IDs
.and_then(|n| n.chids)
.unwrap_or_default();

// Convert the IDs from String to Uuid
let channels = channels
.into_iter()
.map(|s| Uuid::parse_str(&s))
.filter_map(Result::ok)
.collect();

Ok(channels)
}

/// Remove the node ID from a user in the router table.
/// The node ID will only be cleared if `connected_at` matches up with the
/// item's `connected_at`.
pub async fn remove_node_id(
&self,
uaid: Uuid,
node_id: String,
connected_at: u64,
) -> DbResult<()> {
let update_item = UpdateItemInput {
key: ddb_item! { uaid: s => uaid.to_simple().to_string() },
update_expression: Some("REMOVE node_id".to_string()),
condition_expression: Some("(node_id = :node) and (connected_at = :conn)".to_string()),
expression_attribute_values: Some(hashmap! {
":node".to_string() => val!(S => node_id),
":conn".to_string() => val!(N => connected_at.to_string())
}),
table_name: self.router_table.clone(),
..Default::default()
};

retry_policy()
.retry_if(
|| self.ddb.update_item(update_item.clone()),
retryable_updateitem_error(self.metrics.clone()),
)
.await?;

Ok(())
}

/// Store a single message
pub async fn store_message(&self, uaid: Uuid, message: Notification) -> DbResult<()> {
let put_item = PutItemInput {
item: serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(&uaid, message))?,
table_name: self.message_table.clone(),
..Default::default()
};

retry_policy()
.retry_if(
|| self.ddb.put_item(put_item.clone()),
retryable_putitem_error(self.metrics.clone()),
)
.await?;

Ok(())
}
}
24 changes: 24 additions & 0 deletions autoendpoint/src/db/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use thiserror::Error;

use rusoto_core::RusotoError;
use rusoto_dynamodb::{DeleteItemError, GetItemError, PutItemError, UpdateItemError};

pub type DbResult<T> = Result<T, DbError>;

#[derive(Debug, Error)]
pub enum DbError {
#[error("Database error while performing GetItem")]
GetItem(#[from] RusotoError<GetItemError>),

#[error("Database error while performing UpdateItem")]
UpdateItem(#[from] RusotoError<UpdateItemError>),

#[error("Database error while performing PutItem")]
PutItem(#[from] RusotoError<PutItemError>),

#[error("Database error while performing DeleteItem")]
DeleteItem(#[from] RusotoError<DeleteItemError>),

#[error("Error while performing (de)serialization: {0}")]
Serialization(#[from] serde_dynamodb::Error),
}
7 changes: 7 additions & 0 deletions autoendpoint/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//! This DynamoDB client is a selectively upgraded version of `DynamoStorage` in `autopush_common`.
//! Due to #172, autoendpoint cannot use any Tokio 0.1 code, so for now we have to copy and update
//! pieces of `DynamoStorage` as needed.
pub mod client;
pub mod error;
mod retry;
34 changes: 34 additions & 0 deletions autoendpoint/src/db/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use again::RetryPolicy;
use cadence::{Counted, StatsdClient};
use rusoto_core::RusotoError;
use rusoto_dynamodb::{DeleteItemError, GetItemError, PutItemError, UpdateItemError};
use std::time::Duration;

/// Create a retry function for the given error
macro_rules! retryable_error {
($name:ident, $error:tt, $error_tag:expr) => {
pub fn $name(metrics: StatsdClient) -> impl Fn(&RusotoError<$error>) -> bool {
move |err| match err {
RusotoError::Service($error::InternalServerError(_))
| RusotoError::Service($error::ProvisionedThroughputExceeded(_)) => {
metrics
.incr_with_tags("database.retry")
.with_tag("error", $error_tag)
.send();
true
}
_ => false,
}
}
};
}

retryable_error!(retryable_getitem_error, GetItemError, "get_item");
retryable_error!(retryable_updateitem_error, UpdateItemError, "update_item");
retryable_error!(retryable_putitem_error, PutItemError, "put_item");
retryable_error!(retryable_delete_error, DeleteItemError, "delete_item");

/// Build an exponential retry policy
pub fn retry_policy() -> RetryPolicy {
RetryPolicy::exponential(Duration::from_millis(100))
}
3 changes: 2 additions & 1 deletion autoendpoint/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Error types and transformations
use crate::db::error::DbError;
use crate::headers::vapid::VapidError;
use crate::routers::RouterError;
use actix_web::{
Expand Down Expand Up @@ -71,7 +72,7 @@ pub enum ApiErrorKind {
TokenHashValidation(#[source] openssl::error::ErrorStack),

#[error("Database error: {0}")]
Database(#[source] autopush_common::errors::Error),
Database(#[from] DbError),

#[error("Invalid token")]
InvalidToken,
Expand Down
9 changes: 3 additions & 6 deletions autoendpoint/src/extractors/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use actix_web::{FromRequest, HttpRequest};
use autopush_common::db::DynamoDbUser;
use autopush_common::util::sec_since_epoch;
use cadence::{Counted, StatsdClient};
use futures::compat::Future01CompatExt;
use futures::future::LocalBoxFuture;
use futures::FutureExt;
use jsonwebtoken::{Algorithm, DecodingKey, Validation};
Expand Down Expand Up @@ -65,11 +64,9 @@ impl FromRequest for Subscription {
let channel_id = Uuid::from_slice(&token[16..32]).unwrap();
let user = state
.ddb
.get_user(&uaid)
.compat()
.await
.map_err(ApiErrorKind::Database)?
.ok_or(ApiErrorKind::NoUser)?;
.get_user(uaid)
.await?
.ok_or(ApiErrorKind::NoSubscription)?;
let router_type = validate_user(&user, &channel_id, &state).await?;

// Validate the VAPID JWT token and record the version
Expand Down
27 changes: 10 additions & 17 deletions autoendpoint/src/extractors/user.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! User validations
use crate::db::client::DbClient;
use crate::error::{ApiErrorKind, ApiResult};
use crate::extractors::routers::RouterType;
use crate::server::ServerState;
use autopush_common::db::{DynamoDbUser, DynamoStorage};
use autopush_common::db::DynamoDbUser;
use cadence::{Counted, StatsdClient};
use futures::compat::Future01CompatExt;
use uuid::Uuid;

/// Perform some validations on the user, including:
Expand All @@ -23,7 +23,7 @@ pub async fn validate_user(
Ok(router_type) => router_type,
Err(_) => {
debug!("Unknown router type, dropping user"; "user" => ?user);
drop_user(&user.uaid, &state.ddb, &state.metrics).await?;
drop_user(user.uaid, &state.ddb, &state.metrics).await?;
return Err(ApiErrorKind::NoSubscription.into());
}
};
Expand All @@ -39,31 +39,27 @@ pub async fn validate_user(
async fn validate_webpush_user(
user: &DynamoDbUser,
channel_id: &Uuid,
ddb: &DynamoStorage,
ddb: &DbClient,
metrics: &StatsdClient,
) -> ApiResult<()> {
// Make sure the user is active (has a valid message table)
let message_table = match user.current_month.as_ref() {
Some(table) => table,
None => {
debug!("Missing `current_month` value, dropping user"; "user" => ?user);
drop_user(&user.uaid, ddb, metrics).await?;
drop_user(user.uaid, ddb, metrics).await?;
return Err(ApiErrorKind::NoSubscription.into());
}
};

if !ddb.message_table_names.contains(message_table) {
if ddb.message_table.as_str() != message_table {
debug!("User is inactive, dropping user"; "user" => ?user);
drop_user(&user.uaid, ddb, metrics).await?;
drop_user(user.uaid, ddb, metrics).await?;
return Err(ApiErrorKind::NoSubscription.into());
}

// Make sure the subscription channel exists
let channel_ids = ddb
.get_user_channels(&user.uaid, message_table)
.compat()
.await
.map_err(ApiErrorKind::Database)?;
let channel_ids = ddb.get_user_channels(user.uaid).await?;

if !channel_ids.contains(channel_id) {
return Err(ApiErrorKind::NoSubscription.into());
Expand All @@ -73,16 +69,13 @@ async fn validate_webpush_user(
}

/// Drop a user and increment associated metric
async fn drop_user(uaid: &Uuid, ddb: &DynamoStorage, metrics: &StatsdClient) -> ApiResult<()> {
async fn drop_user(uaid: Uuid, ddb: &DbClient, metrics: &StatsdClient) -> ApiResult<()> {
metrics
.incr_with_tags("updates.drop_user")
.with_tag("errno", "102")
.send();

ddb.drop_uaid(uaid)
.compat()
.await
.map_err(ApiErrorKind::Database)?;
ddb.drop_user(uaid).await?;

Ok(())
}
1 change: 1 addition & 0 deletions autoendpoint/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#[macro_use]
extern crate slog_scope;

mod db;
mod error;
mod extractors;
mod headers;
Expand Down
Loading

0 comments on commit 120a46b

Please sign in to comment.