Skip to content

Commit

Permalink
feat: retry all dynamodb errors correctly, hide invalid chids
Browse files Browse the repository at this point in the history
Closes #96, #95
  • Loading branch information
bbangert committed Feb 7, 2019
1 parent e5672bf commit 5815223
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 23 deletions.
10 changes: 7 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ where
connected_at,
),
HelloResponse { uaid: None, .. } => {
return Err("Already connected elsewhere".into())
return Err("Already connected elsewhere".into());
}
}
};
Expand Down Expand Up @@ -845,8 +845,12 @@ where
}) => {
debug!("Got a register command";
"channel_id" => &channel_id_str);
let channel_id =
Uuid::parse_str(&channel_id_str).chain_err(|| "Invalid channelID")?;
let channel_id = Uuid::parse_str(&channel_id_str).chain_err(|| {
ErrorKind::InvalidClientMessage(format!(
"Invalid channelID: {}",
channel_id_str
))
})?;
if channel_id.to_hyphenated().to_string() != channel_id_str {
return Err(ErrorKind::InvalidClientMessage(format!(
"Invalid UUID format, not lower-case/dashed: {}",
Expand Down
14 changes: 10 additions & 4 deletions src/db/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use chrono::Utc;
use futures::{future, Future};
use futures_backoff::retry_if;
use rusoto_dynamodb::{
AttributeValue, DeleteItemError, DeleteItemInput, DeleteItemOutput, DynamoDb, GetItemError,
GetItemInput, GetItemOutput, ListTablesInput, ListTablesOutput, PutItemError, PutItemInput,
PutItemOutput, QueryError, QueryInput, UpdateItemError, UpdateItemInput, UpdateItemOutput,
AttributeValue, BatchWriteItemError, DeleteItemError, DeleteItemInput, DeleteItemOutput,
DynamoDb, GetItemError, GetItemInput, GetItemOutput, ListTablesInput, ListTablesOutput,
PutItemError, PutItemInput, PutItemOutput, QueryError, QueryInput, UpdateItemError,
UpdateItemInput, UpdateItemOutput,
};
use serde_dynamodb;

Expand All @@ -24,7 +25,7 @@ use crate::util::timing::sec_since_epoch;

macro_rules! retryable_error {
($name:ident, $type:ty, $property:ident) => {
fn $name(err: &$type) -> bool {
pub fn $name(err: &$type) -> bool {
match err {
$property::InternalServerError(_) | $property::ProvisionedThroughputExceeded(_) => {
true
Expand All @@ -35,6 +36,11 @@ macro_rules! retryable_error {
};
}

retryable_error!(
retryable_batchwriteitem_error,
BatchWriteItemError,
BatchWriteItemError
);
retryable_error!(retryable_query_error, QueryError, QueryError);
retryable_error!(retryable_delete_error, DeleteItemError, DeleteItemError);
retryable_error!(retryable_getitem_error, GetItemError, GetItemError);
Expand Down
25 changes: 10 additions & 15 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ use uuid::Uuid;
use cadence::StatsdClient;
use futures::{future, Future};
use futures_backoff::retry_if;
use matches::matches;
use rusoto_core::{HttpClient, Region};
use rusoto_credential::StaticProvider;
use rusoto_dynamodb::{
AttributeValue, BatchWriteItemError, BatchWriteItemInput, DeleteItemError, DeleteItemInput,
DynamoDb, DynamoDbClient, PutRequest, UpdateItemError, UpdateItemInput, UpdateItemOutput,
WriteRequest,
AttributeValue, BatchWriteItemInput, DeleteItemInput, DynamoDb, DynamoDbClient, PutRequest,
UpdateItemInput, UpdateItemOutput, WriteRequest,
};
use serde_dynamodb;

Expand All @@ -27,7 +25,10 @@ use crate::protocol::Notification;
use crate::server::{Server, ServerOptions};
use crate::util::timing::sec_since_epoch;

use self::commands::FetchMessageResponse;
use self::commands::{
retryable_batchwriteitem_error, retryable_delete_error, retryable_updateitem_error,
FetchMessageResponse,
};
use self::models::{DynamoDbNotification, DynamoDbUser};

const MAX_EXPIRY: u64 = 2_592_000;
Expand Down Expand Up @@ -130,9 +131,7 @@ impl DynamoStorage {

retry_if(
move || ddb.update_item(update_input.clone()),
|err: &UpdateItemError| {
matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_))
},
retryable_updateitem_error,
)
.chain_err(|| "Error incrementing storage")
}
Expand Down Expand Up @@ -208,7 +207,7 @@ impl DynamoStorage {
return Box::new(future::ok(RegisterResponse::Error {
error_msg: "Failed to generate endpoint".to_string(),
status: 400,
}))
}));
}
};
let mut chids = HashSet::new();
Expand Down Expand Up @@ -295,9 +294,7 @@ impl DynamoStorage {

retry_if(
move || ddb.batch_write_item(batch_input.clone()),
|err: &BatchWriteItemError| {
matches!(err, &BatchWriteItemError::ProvisionedThroughputExceeded(_))
},
retryable_batchwriteitem_error,
)
.and_then(|_| future::ok(()))
.map_err(|err| {
Expand Down Expand Up @@ -331,9 +328,7 @@ impl DynamoStorage {

retry_if(
move || ddb.delete_item(delete_input.clone()),
|err: &DeleteItemError| {
matches!(err, &DeleteItemError::ProvisionedThroughputExceeded(_))
},
retryable_delete_error,
)
.and_then(|_| future::ok(()))
.chain_err(|| "Error deleting notification")
Expand Down
4 changes: 3 additions & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,9 @@ where
}

Message::Binary(_) => {
return Err(ErrorKind::InvalidClientMessage("binary content".to_string()).into())
return Err(
ErrorKind::InvalidClientMessage("binary content".to_string()).into(),
);
}

// sending a pong is already managed by lower layers, just go to
Expand Down

0 comments on commit 5815223

Please sign in to comment.