Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Copy and upgrade parts of DynamoStorage into autoendpoint #174

Merged
merged 5 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 247 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions autoendpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ actix-http = "1.0"
actix-web = "2.0"
actix-rt = "1.0"
actix-cors = "0.2.0"
again = { version = "0.1.2", default-features = false, features = ["log"] }
async-trait = "0.1.36"
autopush_common = { path = "../autopush-common" }
backtrace = "0.3"
Expand All @@ -24,8 +25,11 @@ lazy_static = "1.4.0"
openssl = "0.10"
regex = "1.3"
reqwest = "0.10.6"
rusoto_core = "0.44.0"
rusoto_dynamodb = "0.44.0"
sentry = { version = "0.18", features = ["with_curl_transport"] }
serde = { version = "1.0", features = ["derive"] }
serde_dynamodb = "0.5.1"
serde_json = "1.0"
slog = { version = "2.5", features = ["max_level_trace", "release_max_level_error", "dynamic-keys"] }
slog-async = "2.4"
Expand Down
183 changes: 183 additions & 0 deletions autoendpoint/src/db/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use crate::db::error::{DbError, DbResult};
use crate::db::retry::{
retry_policy, retryable_delete_error, retryable_getitem_error, retryable_putitem_error,
retryable_updateitem_error,
};
use autopush_common::db::{DynamoDbNotification, DynamoDbUser};
use autopush_common::notification::Notification;
use autopush_common::{ddb_item, hashmap, val};
use cadence::StatsdClient;
use rusoto_core::credential::StaticProvider;
use rusoto_core::{HttpClient, Region};
use rusoto_dynamodb::{
AttributeValue, DeleteItemInput, DynamoDb, DynamoDbClient, GetItemInput, PutItemInput,
UpdateItemInput,
};
use std::collections::HashSet;
use std::env;
use uuid::Uuid;

/// Provides high-level operations over the DynamoDB database
#[derive(Clone)]
pub struct DbClient {
ddb: DynamoDbClient,
metrics: StatsdClient,
router_table: String,
pub message_table: String,
}

impl DbClient {
pub fn new(
metrics: StatsdClient,
router_table: String,
message_table: String,
) -> DbResult<Self> {
let ddb = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") {
DynamoDbClient::new_with(
HttpClient::new().expect("TLS initialization error"),
StaticProvider::new_minimal("BogusKey".to_string(), "BogusKey".to_string()),
Region::Custom {
name: "us-east-1".to_string(),
endpoint,
},
)
} else {
DynamoDbClient::new(Region::default())
};

Ok(Self {
ddb,
metrics,
router_table,
message_table,
})
}

/// Read a user from the database
pub async fn get_user(&self, uaid: Uuid) -> DbResult<Option<DynamoDbUser>> {
let input = GetItemInput {
table_name: self.router_table.clone(),
consistent_read: Some(true),
key: ddb_item! { uaid: s => uaid.to_simple().to_string() },
..Default::default()
};

retry_policy()
.retry_if(
|| self.ddb.get_item(input.clone()),
retryable_getitem_error(self.metrics.clone()),
)
.await?
.item
.map(serde_dynamodb::from_hashmap)
.transpose()
.map_err(DbError::from)
}

/// Delete a user from the router table
pub async fn drop_user(&self, uaid: Uuid) -> DbResult<()> {
let input = DeleteItemInput {
table_name: self.router_table.clone(),
key: ddb_item! { uaid: s => uaid.to_simple().to_string() },
..Default::default()
};

retry_policy()
.retry_if(
|| self.ddb.delete_item(input.clone()),
retryable_delete_error(self.metrics.clone()),
)
.await?;
Ok(())
}

/// Get the set of channel IDs for a user
pub async fn get_user_channels(&self, uaid: Uuid) -> DbResult<HashSet<Uuid>> {
// Channel IDs are stored in a special row in the message table, where
// chidmessageid = " "
let input = GetItemInput {
table_name: self.message_table.clone(),
consistent_read: Some(true),
key: ddb_item! {
uaid: s => uaid.to_simple().to_string(),
chidmessageid: s => " ".to_string()
},
..Default::default()
};

let output = retry_policy()
.retry_if(
|| self.ddb.get_item(input.clone()),
retryable_getitem_error(self.metrics.clone()),
)
.await?;

// The channel IDs are in the notification's `chids` field
let channels = output
.item
// Deserialize the notification
.map(serde_dynamodb::from_hashmap::<DynamoDbNotification, _>)
.transpose()?
// Extract the channel IDs
.and_then(|n| n.chids)
.unwrap_or_default();

// Convert the IDs from String to Uuid
let channels = channels
.into_iter()
.map(|s| Uuid::parse_str(&s))
.filter_map(Result::ok)
.collect();

Ok(channels)
}

/// Remove the node ID from a user in the router table.
/// The node ID will only be cleared if `connected_at` matches up with the
/// item's `connected_at`.
pub async fn remove_node_id(
&self,
uaid: Uuid,
node_id: String,
connected_at: u64,
) -> DbResult<()> {
let update_item = UpdateItemInput {
key: ddb_item! { uaid: s => uaid.to_simple().to_string() },
update_expression: Some("REMOVE node_id".to_string()),
condition_expression: Some("(node_id = :node) and (connected_at = :conn)".to_string()),
expression_attribute_values: Some(hashmap! {
":node".to_string() => val!(S => node_id),
":conn".to_string() => val!(N => connected_at.to_string())
}),
table_name: self.router_table.clone(),
..Default::default()
};

retry_policy()
.retry_if(
|| self.ddb.update_item(update_item.clone()),
retryable_updateitem_error(self.metrics.clone()),
)
.await?;

Ok(())
}

/// Store a single message
pub async fn store_message(&self, uaid: Uuid, message: Notification) -> DbResult<()> {
let put_item = PutItemInput {
item: serde_dynamodb::to_hashmap(&DynamoDbNotification::from_notif(&uaid, message))?,
table_name: self.message_table.clone(),
..Default::default()
};

retry_policy()
.retry_if(
|| self.ddb.put_item(put_item.clone()),
retryable_putitem_error(self.metrics.clone()),
)
.await?;

Ok(())
}
}
24 changes: 24 additions & 0 deletions autoendpoint/src/db/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use thiserror::Error;

use rusoto_core::RusotoError;
use rusoto_dynamodb::{DeleteItemError, GetItemError, PutItemError, UpdateItemError};

pub type DbResult<T> = Result<T, DbError>;

#[derive(Debug, Error)]
pub enum DbError {
#[error("Database error while performing GetItem")]
GetItem(#[from] RusotoError<GetItemError>),

#[error("Database error while performing UpdateItem")]
UpdateItem(#[from] RusotoError<UpdateItemError>),

#[error("Database error while performing PutItem")]
PutItem(#[from] RusotoError<PutItemError>),

#[error("Database error while performing DeleteItem")]
DeleteItem(#[from] RusotoError<DeleteItemError>),

#[error("Error while performing (de)serialization: {0}")]
Serialization(#[from] serde_dynamodb::Error),
}
7 changes: 7 additions & 0 deletions autoendpoint/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//! This DynamoDB client is a selectively upgraded version of `DynamoStorage` in `autopush_common`.
//! Due to #172, autoendpoint cannot use any Tokio 0.1 code, so for now we have to copy and update
//! pieces of `DynamoStorage` as needed.

pub mod client;
pub mod error;
mod retry;
34 changes: 34 additions & 0 deletions autoendpoint/src/db/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use again::RetryPolicy;
use cadence::{Counted, StatsdClient};
use rusoto_core::RusotoError;
use rusoto_dynamodb::{DeleteItemError, GetItemError, PutItemError, UpdateItemError};
use std::time::Duration;

/// Create a retry function for the given error
macro_rules! retryable_error {
($name:ident, $error:tt, $error_tag:expr) => {
pub fn $name(metrics: StatsdClient) -> impl Fn(&RusotoError<$error>) -> bool {
move |err| match err {
RusotoError::Service($error::InternalServerError(_))
| RusotoError::Service($error::ProvisionedThroughputExceeded(_)) => {
metrics
AzureMarker marked this conversation as resolved.
Show resolved Hide resolved
.incr_with_tags("database.retry")
.with_tag("error", $error_tag)
.send();
true
}
_ => false,
}
}
};
}

retryable_error!(retryable_getitem_error, GetItemError, "get_item");
retryable_error!(retryable_updateitem_error, UpdateItemError, "update_item");
retryable_error!(retryable_putitem_error, PutItemError, "put_item");
retryable_error!(retryable_delete_error, DeleteItemError, "delete_item");

/// Build an exponential retry policy
pub fn retry_policy() -> RetryPolicy {
RetryPolicy::exponential(Duration::from_millis(100))
}
3 changes: 2 additions & 1 deletion autoendpoint/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Error types and transformations

use crate::db::error::DbError;
use crate::headers::vapid::VapidError;
use crate::routers::RouterError;
use actix_web::{
Expand Down Expand Up @@ -71,7 +72,7 @@ pub enum ApiErrorKind {
TokenHashValidation(#[source] openssl::error::ErrorStack),

#[error("Database error: {0}")]
Database(#[source] autopush_common::errors::Error),
Database(#[from] DbError),

#[error("Invalid token")]
InvalidToken,
Expand Down
9 changes: 3 additions & 6 deletions autoendpoint/src/extractors/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use actix_web::{FromRequest, HttpRequest};
use autopush_common::db::DynamoDbUser;
use autopush_common::util::sec_since_epoch;
use cadence::{Counted, StatsdClient};
use futures::compat::Future01CompatExt;
use futures::future::LocalBoxFuture;
use futures::FutureExt;
use jsonwebtoken::{Algorithm, DecodingKey, Validation};
Expand Down Expand Up @@ -65,11 +64,9 @@ impl FromRequest for Subscription {
let channel_id = Uuid::from_slice(&token[16..32]).unwrap();
let user = state
.ddb
.get_user(&uaid)
.compat()
.await
.map_err(ApiErrorKind::Database)?
.ok_or(ApiErrorKind::NoUser)?;
.get_user(uaid)
.await?
.ok_or(ApiErrorKind::NoSubscription)?;
let router_type = validate_user(&user, &channel_id, &state).await?;

// Validate the VAPID JWT token and record the version
Expand Down
27 changes: 10 additions & 17 deletions autoendpoint/src/extractors/user.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! User validations

use crate::db::client::DbClient;
use crate::error::{ApiErrorKind, ApiResult};
use crate::extractors::routers::RouterType;
use crate::server::ServerState;
use autopush_common::db::{DynamoDbUser, DynamoStorage};
use autopush_common::db::DynamoDbUser;
use cadence::{Counted, StatsdClient};
use futures::compat::Future01CompatExt;
use uuid::Uuid;

/// Perform some validations on the user, including:
Expand All @@ -23,7 +23,7 @@ pub async fn validate_user(
Ok(router_type) => router_type,
Err(_) => {
debug!("Unknown router type, dropping user"; "user" => ?user);
drop_user(&user.uaid, &state.ddb, &state.metrics).await?;
drop_user(user.uaid, &state.ddb, &state.metrics).await?;
return Err(ApiErrorKind::NoSubscription.into());
}
};
Expand All @@ -39,31 +39,27 @@ pub async fn validate_user(
async fn validate_webpush_user(
user: &DynamoDbUser,
channel_id: &Uuid,
ddb: &DynamoStorage,
ddb: &DbClient,
metrics: &StatsdClient,
) -> ApiResult<()> {
// Make sure the user is active (has a valid message table)
let message_table = match user.current_month.as_ref() {
Some(table) => table,
None => {
debug!("Missing `current_month` value, dropping user"; "user" => ?user);
drop_user(&user.uaid, ddb, metrics).await?;
drop_user(user.uaid, ddb, metrics).await?;
return Err(ApiErrorKind::NoSubscription.into());
}
};

if !ddb.message_table_names.contains(message_table) {
if ddb.message_table.as_str() != message_table {
debug!("User is inactive, dropping user"; "user" => ?user);
drop_user(&user.uaid, ddb, metrics).await?;
drop_user(user.uaid, ddb, metrics).await?;
return Err(ApiErrorKind::NoSubscription.into());
}

// Make sure the subscription channel exists
let channel_ids = ddb
.get_user_channels(&user.uaid, message_table)
.compat()
.await
.map_err(ApiErrorKind::Database)?;
let channel_ids = ddb.get_user_channels(user.uaid).await?;

if !channel_ids.contains(channel_id) {
return Err(ApiErrorKind::NoSubscription.into());
Expand All @@ -73,16 +69,13 @@ async fn validate_webpush_user(
}

/// Drop a user and increment associated metric
async fn drop_user(uaid: &Uuid, ddb: &DynamoStorage, metrics: &StatsdClient) -> ApiResult<()> {
async fn drop_user(uaid: Uuid, ddb: &DbClient, metrics: &StatsdClient) -> ApiResult<()> {
metrics
.incr_with_tags("updates.drop_user")
.with_tag("errno", "102")
.send();

ddb.drop_uaid(uaid)
.compat()
.await
.map_err(ApiErrorKind::Database)?;
ddb.drop_user(uaid).await?;

Ok(())
}
1 change: 1 addition & 0 deletions autoendpoint/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#[macro_use]
extern crate slog_scope;

mod db;
mod error;
mod extractors;
mod headers;
Expand Down
Loading