Skip to content

Commit

Permalink
Merge branch 'master' into release/1.70
Browse files Browse the repository at this point in the history
  • Loading branch information
pjenvey committed Feb 22, 2024
2 parents 455e903 + 87ed38c commit c0b81e2
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 7 deletions.
2 changes: 2 additions & 0 deletions autoendpoint/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ impl ApiErrorKind {

ApiErrorKind::LogCheck => StatusCode::IM_A_TEAPOT,

ApiErrorKind::Database(DbError::Backoff(_)) => StatusCode::SERVICE_UNAVAILABLE,

ApiErrorKind::General(_)
| ApiErrorKind::Io(_)
| ApiErrorKind::Metrics(_)
Expand Down
4 changes: 3 additions & 1 deletion autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,9 @@ impl DbClient for BigTableClientImpl {
async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
let row_key = uaid.as_simple().to_string();
let mut req = self.read_row_request(&row_key);
req.set_filter(family_filter(format!("^{ROUTER_FAMILY}$")));
let mut filters = vec![router_gc_policy_filter()];
filters.push(family_filter(format!("^{ROUTER_FAMILY}$")));
req.set_filter(filter_chain(filters));
let Some(mut row) = self.read_row(req).await? else {
return Ok(None);
};
Expand Down
29 changes: 27 additions & 2 deletions autopush-common/src/db/dual/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use async_trait::async_trait;
use cadence::{CountedExt, StatsdClient};
use cadence::{CountedExt, StatsdClient, Timed};
use serde::Deserialize;
use serde_json::from_str;
use uuid::Uuid;
Expand Down Expand Up @@ -182,21 +182,46 @@ impl DbClient for DualClientImpl {
Ok(None) => {
if is_primary {
// The user wasn't in the current primary, so fetch them from the secondary.
let start = std::time::Instant::now();
if let Ok(Some(mut user)) = self.secondary.get_user(uaid).await {
// copy the user record over to the new data store.
debug!("⚖ Found user record in secondary, moving to primary");
// Users read from DynamoDB lack a version field needed
// for Bigtable
debug_assert!(user.version.is_none());
user.version = Some(Uuid::new_v4());
self.primary.add_user(&user).await?;
if let Err(e) = self.primary.add_user(&user).await {
if matches!(e, DbError::Conditional) {
// User is being migrated underneath us.
// Try fetching the record from primary again,
// and back off if still not there.
let user = self.primary.get_user(uaid).await?;
// Possibly a higher number of these occur than
// expected, so sanity check that a user now
// exists
self.metrics
.incr_with_tags("database.already_migrated")
.with_tag("exists", &user.is_some().to_string())
.send();
if user.is_none() {
return Err(DbError::Backoff("Move in progress".to_owned()));
};
return Ok(user);
}
};
self.metrics.incr_with_tags("database.migrate").send();
let channels = self.secondary.get_channels(uaid).await?;
if !channels.is_empty() {
// NOTE: add_channels doesn't write a new version:
// user.version is still valid
self.primary.add_channels(uaid, channels).await?;
}
self.metrics
.time_with_tags(
"database.migrate.time",
(std::time::Instant::now() - start).as_millis() as u64,
)
.send();
return Ok(Some(user));
}
}
Expand Down
8 changes: 8 additions & 0 deletions autopush-common/src/db/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ pub enum DbError {

#[error("Unknown Database Error {0}")]
General(String),

// Return a 503 error
#[error("Process pending, please wait.")]
Backoff(String),
}

impl ReportableError for DbError {
Expand Down Expand Up @@ -106,6 +110,7 @@ impl ReportableError for DbError {
match &self {
#[cfg(feature = "bigtable")]
DbError::BTError(e) => e.metric_label(),
DbError::Backoff(_) => Some("storage.error.backoff"),
_ => None,
}
}
Expand All @@ -114,6 +119,9 @@ impl ReportableError for DbError {
match &self {
#[cfg(feature = "bigtable")]
DbError::BTError(e) => e.extras(),
DbError::Backoff(e) => {
vec![("raw", e.to_string())]
}
_ => vec![],
}
}
Expand Down
2 changes: 1 addition & 1 deletion docs/src/architecture.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Architecture

![image](assets/push_architecture2.svg)
![image](assets/push_architecture.svg)

## Overview

Expand Down
6 changes: 3 additions & 3 deletions docs/src/assets/push_architecture.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit c0b81e2

Please sign in to comment.