diff --git a/autoendpoint/src/error.rs b/autoendpoint/src/error.rs index 28d61ad68..671fd9f31 100644 --- a/autoendpoint/src/error.rs +++ b/autoendpoint/src/error.rs @@ -150,6 +150,8 @@ impl ApiErrorKind { ApiErrorKind::LogCheck => StatusCode::IM_A_TEAPOT, + ApiErrorKind::Database(DbError::Backoff(_)) => StatusCode::SERVICE_UNAVAILABLE, + ApiErrorKind::General(_) | ApiErrorKind::Io(_) | ApiErrorKind::Metrics(_) diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 3bed0fada..a5b30ddd0 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -728,7 +728,9 @@ impl DbClient for BigTableClientImpl { async fn get_user(&self, uaid: &Uuid) -> DbResult> { let row_key = uaid.as_simple().to_string(); let mut req = self.read_row_request(&row_key); - req.set_filter(family_filter(format!("^{ROUTER_FAMILY}$"))); + let mut filters = vec![router_gc_policy_filter()]; + filters.push(family_filter(format!("^{ROUTER_FAMILY}$"))); + req.set_filter(filter_chain(filters)); let Some(mut row) = self.read_row(req).await? else { return Ok(None); }; diff --git a/autopush-common/src/db/dual/mod.rs b/autopush-common/src/db/dual/mod.rs index 086c07f26..7adf26988 100644 --- a/autopush-common/src/db/dual/mod.rs +++ b/autopush-common/src/db/dual/mod.rs @@ -10,7 +10,7 @@ use std::collections::HashSet; use std::sync::Arc; use async_trait::async_trait; -use cadence::{CountedExt, StatsdClient}; +use cadence::{CountedExt, StatsdClient, Timed}; use serde::Deserialize; use serde_json::from_str; use uuid::Uuid; @@ -182,6 +182,7 @@ impl DbClient for DualClientImpl { Ok(None) => { if is_primary { // The user wasn't in the current primary, so fetch them from the secondary. + let start = std::time::Instant::now(); if let Ok(Some(mut user)) = self.secondary.get_user(uaid).await { // copy the user record over to the new data store. debug!("⚖ Found user record in secondary, moving to primary"); @@ -189,7 +190,25 @@ impl DbClient for DualClientImpl { // for Bigtable debug_assert!(user.version.is_none()); user.version = Some(Uuid::new_v4()); - self.primary.add_user(&user).await?; + if let Err(e) = self.primary.add_user(&user).await { + if matches!(e, DbError::Conditional) { + // User is being migrated underneath us. + // Try fetching the record from primary again, + // and back off if still not there. + let user = self.primary.get_user(uaid).await?; + // Possibly a higher number of these occur than + // expected, so sanity check that a user now + // exists + self.metrics + .incr_with_tags("database.already_migrated") + .with_tag("exists", &user.is_some().to_string()) + .send(); + if user.is_none() { + return Err(DbError::Backoff("Move in progress".to_owned())); + }; + return Ok(user); + } + }; self.metrics.incr_with_tags("database.migrate").send(); let channels = self.secondary.get_channels(uaid).await?; if !channels.is_empty() { @@ -197,6 +216,12 @@ impl DbClient for DualClientImpl { // user.version is still valid self.primary.add_channels(uaid, channels).await?; } + self.metrics + .time_with_tags( + "database.migrate.time", + (std::time::Instant::now() - start).as_millis() as u64, + ) + .send(); return Ok(Some(user)); } } diff --git a/autopush-common/src/db/error.rs b/autopush-common/src/db/error.rs index 64fb3082c..13f3a697b 100644 --- a/autopush-common/src/db/error.rs +++ b/autopush-common/src/db/error.rs @@ -79,6 +79,10 @@ pub enum DbError { #[error("Unknown Database Error {0}")] General(String), + + // Return a 503 error + #[error("Process pending, please wait.")] + Backoff(String), } impl ReportableError for DbError { @@ -106,6 +110,7 @@ impl ReportableError for DbError { match &self { #[cfg(feature = "bigtable")] DbError::BTError(e) => e.metric_label(), + DbError::Backoff(_) => Some("storage.error.backoff"), _ => None, } } @@ -114,6 +119,9 @@ impl ReportableError for DbError { match &self { #[cfg(feature = "bigtable")] DbError::BTError(e) => e.extras(), + DbError::Backoff(e) => { + vec![("raw", e.to_string())] + } _ => vec![], } }