Skip to content

Commit

Permalink
bug: add jitter to retry
Browse files Browse the repository at this point in the history
Closes: #318
  • Loading branch information
jrconlin committed Jun 27, 2022
1 parent f40a14a commit 2cf1244
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions autoendpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jsonwebtoken = "8.0"
lazy_static = "1.4.0"
log = "0.4"
openssl = "0.10"
rand = "0.8"
regex = "1.4"
reqwest = "0.10.6" # 0.11+ requires futures 0.3+
rusoto_core = "0.45.0" # 0.46+ requires futures 0.3+
Expand Down
25 changes: 14 additions & 11 deletions autoendpoint/src/db/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ pub struct DbClientImpl {
metrics: Arc<StatsdClient>,
router_table: String,
message_table: String,
jitter: u64,
}

impl DbClientImpl {
pub fn new(
metrics: Arc<StatsdClient>,
router_table: String,
message_table: String,
jitter: u64,
) -> DbResult<Self> {
let ddb = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") {
DynamoDbClient::new_with(
Expand All @@ -111,14 +113,15 @@ impl DbClientImpl {
metrics,
router_table,
message_table,
jitter,
})
}

/// Check if a table exists
async fn table_exists(&self, table_name: String) -> DbResult<bool> {
let input = DescribeTableInput { table_name };

let output = match retry_policy()
let output = match retry_policy(self.jitter)
.retry_if(
|| self.ddb.describe_table(input.clone()),
retryable_describe_table_error(self.metrics.clone()),
Expand Down Expand Up @@ -153,7 +156,7 @@ impl DbClient for DbClientImpl {
..Default::default()
};

retry_policy()
retry_policy(self.jitter)
.retry_if(
|| self.ddb.put_item(input.clone()),
retryable_putitem_error(self.metrics.clone()),
Expand Down Expand Up @@ -195,7 +198,7 @@ impl DbClient for DbClientImpl {
..Default::default()
};

retry_policy()
retry_policy(self.jitter)
.retry_if(
|| self.ddb.update_item(input.clone()),
retryable_updateitem_error(self.metrics.clone()),
Expand All @@ -212,7 +215,7 @@ impl DbClient for DbClientImpl {
..Default::default()
};

retry_policy()
retry_policy(self.jitter)
.retry_if(
|| self.ddb.get_item(input.clone()),
retryable_getitem_error(self.metrics.clone()),
Expand All @@ -231,7 +234,7 @@ impl DbClient for DbClientImpl {
..Default::default()
};

retry_policy()
retry_policy(self.jitter)
.retry_if(
|| self.ddb.delete_item(input.clone()),
retryable_delete_error(self.metrics.clone()),
Expand All @@ -255,7 +258,7 @@ impl DbClient for DbClientImpl {
..Default::default()
};

retry_policy()
retry_policy(self.jitter)
.retry_if(
|| self.ddb.update_item(input.clone()),
retryable_updateitem_error(self.metrics.clone()),
Expand All @@ -277,7 +280,7 @@ impl DbClient for DbClientImpl {
..Default::default()
};

let output = retry_policy()
let output = retry_policy(self.jitter)
.retry_if(
|| self.ddb.get_item(input.clone()),
retryable_getitem_error(self.metrics.clone()),
Expand Down Expand Up @@ -319,7 +322,7 @@ impl DbClient for DbClientImpl {
..Default::default()
};

let output = retry_policy()
let output = retry_policy(self.jitter)
.retry_if(
|| self.ddb.update_item(input.clone()),
retryable_updateitem_error(self.metrics.clone()),
Expand Down Expand Up @@ -349,7 +352,7 @@ impl DbClient for DbClientImpl {
..Default::default()
};

retry_policy()
retry_policy(self.jitter)
.retry_if(
|| self.ddb.update_item(input.clone()),
retryable_updateitem_error(self.metrics.clone()),
Expand All @@ -366,7 +369,7 @@ impl DbClient for DbClientImpl {
..Default::default()
};

retry_policy()
retry_policy(self.jitter)
.retry_if(
|| self.ddb.put_item(input.clone()),
retryable_putitem_error(self.metrics.clone()),
Expand All @@ -386,7 +389,7 @@ impl DbClient for DbClientImpl {
..Default::default()
};

retry_policy()
retry_policy(self.jitter)
.retry_if(
|| self.ddb.delete_item(input.clone()),
retryable_delete_error(self.metrics.clone()),
Expand Down
8 changes: 6 additions & 2 deletions autoendpoint/src/db/retry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use again::RetryPolicy;
use cadence::{CountedExt, StatsdClient};
use rand::distributions::{Distribution, Uniform};
use rusoto_core::RusotoError;
use rusoto_dynamodb::{
DeleteItemError, DescribeTableError, GetItemError, PutItemError, UpdateItemError,
Expand Down Expand Up @@ -48,6 +49,9 @@ pub fn retryable_describe_table_error(
}

/// Build an exponential retry policy
pub fn retry_policy() -> RetryPolicy {
RetryPolicy::exponential(Duration::from_millis(100))
pub fn retry_policy(jitter: u64) -> RetryPolicy {
let dist = Uniform::from(100..100 + jitter);
let mut rng = rand::thread_rng();
let jitter_val = dist.sample(&mut rng);
RetryPolicy::exponential(Duration::from_millis(jitter_val))
}
1 change: 1 addition & 0 deletions autoendpoint/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl Server {
metrics.clone(),
settings.router_table_name.clone(),
settings.message_table_name.clone(),
settings.jitter,
)?);
let http = reqwest::Client::new();
let fcm_router = Arc::new(
Expand Down
2 changes: 2 additions & 0 deletions autoendpoint/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct Settings {

pub router_table_name: String,
pub message_table_name: String,
pub jitter: u64,

pub max_data_bytes: usize,
pub crypto_keys: String,
Expand All @@ -45,6 +46,7 @@ impl Default for Settings {
port: 8000,
router_table_name: "router".to_string(),
message_table_name: "message".to_string(),
jitter: 100,
max_data_bytes: 4096,
crypto_keys: format!("[{}]", Fernet::generate_key()),
auth_keys: r#"["AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB="]"#.to_string(),
Expand Down

0 comments on commit 2cf1244

Please sign in to comment.