diff --git a/autoconnect/autoconnect-settings/src/app_state.rs b/autoconnect/autoconnect-settings/src/app_state.rs index 8b5e91cf4..f36c4e3ed 100644 --- a/autoconnect/autoconnect-settings/src/app_state.rs +++ b/autoconnect/autoconnect-settings/src/app_state.rs @@ -51,7 +51,7 @@ impl AppState { ); } let crypto_key = &crypto_key[1..crypto_key.len() - 1]; - debug!("Fernet keys: {:?}", &crypto_key); + debug!("🔐 Fernet keys: {:?}", &crypto_key); let fernets: Vec = crypto_key .split(',') .map(|s| s.trim().to_string()) diff --git a/autoendpoint/src/extractors/subscription.rs b/autoendpoint/src/extractors/subscription.rs index 56493fc9c..8d29831a2 100644 --- a/autoendpoint/src/extractors/subscription.rs +++ b/autoendpoint/src/extractors/subscription.rs @@ -74,7 +74,7 @@ impl FromRequest for Subscription { async move { // Collect token info and server state let token_info = TokenInfo::extract(&req).await?; - trace!("Token info: {:?}", &token_info); + trace!("🔐 Token info: {:?}", &token_info); let app_state: Data = Data::extract(&req).await.expect("No server state found"); let metrics = Metrics::from(&app_state); @@ -84,7 +84,7 @@ impl FromRequest for Subscription { .fernet .decrypt(&repad_base64(&token_info.token)) .map_err(|e| { - error!("fernet: {:?}", e); + error!("🔐 fernet: {:?}", e); ApiErrorKind::InvalidToken })?; diff --git a/autoendpoint/src/settings.rs b/autoendpoint/src/settings.rs index 8cd59d91a..f3fa34659 100644 --- a/autoendpoint/src/settings.rs +++ b/autoendpoint/src/settings.rs @@ -134,7 +134,7 @@ impl Settings { let keys = &self.crypto_keys.replace(['"', ' '], ""); let fernets = Self::read_list_from_str(keys, "Invalid AUTOEND_CRYPTO_KEYS") .map(|key| { - debug!("Fernet keys: {:?}", &key); + debug!("🔐 Fernet keys: {:?}", &key); Fernet::new(key).expect("Invalid AUTOEND_CRYPTO_KEYS") }) .collect(); diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 7451fb792..ae40dd4b7 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -635,6 +635,7 @@ impl BigtableDb { let (v, _stream) = r.into_future().await; // Since this should return no rows (with the row key set to a value that shouldn't exist) // the first component of the tuple should be None. + debug!("🉑 health check"); Ok(v.is_none()) } } diff --git a/autopush-common/src/db/dual/mod.rs b/autopush-common/src/db/dual/mod.rs index 1429f193a..42e670385 100644 --- a/autopush-common/src/db/dual/mod.rs +++ b/autopush-common/src/db/dual/mod.rs @@ -10,7 +10,7 @@ use std::collections::HashSet; use std::sync::Arc; use async_trait::async_trait; -use cadence::StatsdClient; +use cadence::{CountedExt, StatsdClient}; use serde::Deserialize; use serde_json::from_str; use uuid::Uuid; @@ -27,18 +27,35 @@ use super::StorageType; #[derive(Clone)] pub struct DualClientImpl { + /// The primary data store, which will always be Bigtable. primary: BigTableClientImpl, + /// The secondary data store, which will always be DynamoDB. secondary: DdbClientImpl, + /// Write changes to the secondary, including messages and updates + /// as well as account and channel additions/deletions. write_to_secondary: bool, + /// Hex value to use to specify the first byte of the median offset. + /// e.g. "0a" will start from include all UUIDs upto and including "0a" median: Option, + metrics: Arc, +} + +fn default_true() -> bool { + true } #[derive(Clone, Debug, Deserialize)] pub struct DualDbSettings { + /// The primary data store, which will always be Bigtable. primary: DbSettings, + /// The secondary data store, which will always be DynamoDB. secondary: DbSettings, - #[serde(default)] + /// Write changes to the secondary, including messages and updates + /// as well as account and channel additions/deletions. + #[serde(default = "default_true")] write_to_secondary: bool, + /// Hex value to use to specify the first byte of the median offset. + /// e.g. "0a" will start from include all UUIDs upto and including "0a" #[serde(default)] median: Option, } @@ -46,7 +63,7 @@ pub struct DualDbSettings { impl DualClientImpl { pub fn new(metrics: Arc, settings: &DbSettings) -> DbResult { // Not really sure we need the dsn here. - info!("Trying: {:?}", settings.db_settings); + info!("⚖ Trying: {:?}", settings.db_settings); let db_settings: DualDbSettings = from_str(&settings.db_settings).map_err(|e| { DbError::General(format!("Could not parse DualDBSettings string {:?}", e)) })?; @@ -87,6 +104,7 @@ impl DualClientImpl { secondary: secondary.clone(), median, write_to_secondary: db_settings.write_to_secondary, + metrics, }) } } @@ -110,6 +128,10 @@ impl DualClientImpl { } else { (Box::new(&self.primary), true) }; + self.metrics + .incr_with_tags("database.dual.error") + .with_tag("target", &target.0.name()) + .send(); debug!("⚖ alloting to {}", target.0.name()); Ok(target) } @@ -119,11 +141,18 @@ impl DualClientImpl { impl DbClient for DualClientImpl { async fn add_user(&self, user: &User) -> DbResult<()> { let (target, is_primary) = self.allot(&user.uaid).await?; - if is_primary && self.write_to_secondary { - let _ = self.secondary.add_user(user).await?; - } debug!("⚖ adding user to {}...", target.name()); let result = target.add_user(user).await?; + if is_primary && self.write_to_secondary { + let _ = self.secondary.add_user(user).await.map_err(|e| { + error!("⚖ Error: {:?}", e); + self.metrics + .incr_with_tags("database.dual.error") + .with_tag("func", "add_user") + .send(); + e + }); + } debug!("⚖ User added..."); Ok(result) } @@ -131,10 +160,18 @@ impl DbClient for DualClientImpl { async fn update_user(&self, user: &User) -> DbResult { // If the UAID is in the allowance, move them to the new data store let (target, is_primary) = self.allot(&user.uaid).await?; + let result = target.update_user(user).await?; if is_primary && self.write_to_secondary { - let _ = self.secondary.add_user(user).await?; + let _ = self.secondary.add_user(user).await.map_err(|e| { + error!("⚡ Error: {:?}", e); + self.metrics + .incr_with_tags("database.dual.error") + .with_tag("func", "update_user") + .send(); + e + }); } - target.update_user(user).await + Ok(result) } async fn get_user(&self, uaid: &Uuid) -> DbResult> { @@ -165,38 +202,81 @@ impl DbClient for DualClientImpl { if is_primary { // try removing the user from the old store, just in case. // leaving them could cause false reporting later. - let _ = self.secondary.remove_user(uaid).await; + let _ = self.secondary.remove_user(uaid).await.map_err(|e| { + debug!("⚖ Secondary remove_user error {:?}", e); + self.metrics + .incr_with_tags("database.dual.error") + .with_tag("func", "remove_user") + .send(); + e + }); } Ok(result) } async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> { debug!("⚖ getting target"); - let (target, _) = self.allot(uaid).await?; + let (target, is_primary) = self.allot(uaid).await?; debug!("⚖ Adding channel to {}", target.name()); - target.add_channel(uaid, channel_id).await + let result = target.add_channel(uaid, channel_id).await; + if is_primary && self.write_to_secondary { + let _ = self + .secondary + .add_channel(uaid, channel_id) + .await + .map_err(|e| { + self.metrics + .incr_with_tags("database.dual.error") + .with_tag("func", "add_channel") + .send(); + e + }); + } + result } async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()> { - let (target, _) = self.allot(uaid).await?; - target.add_channels(uaid, channels).await + let (target, is_primary) = self.allot(uaid).await?; + let result = target.add_channels(uaid, channels.clone()).await; + if is_primary && self.write_to_secondary { + let _ = self + .secondary + .add_channels(uaid, channels) + .await + .map_err(|e| { + self.metrics + .incr_with_tags("database.dual.error") + .with_tag("func", "add_channels") + .send(); + e + }); + } + result } async fn get_channels(&self, uaid: &Uuid) -> DbResult> { - let (target, is_primary) = self.allot(uaid).await?; - let mut channels = target.get_channels(uaid).await?; - // check to see if we need to copy over channels from the secondary - if channels.is_empty() && is_primary { - channels = self.secondary.get_channels(uaid).await?; - } - Ok(channels) + let (target, _is_primary) = self.allot(uaid).await?; + target.get_channels(uaid).await } async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult { let (target, is_primary) = self.allot(uaid).await?; let result = target.remove_channel(uaid, channel_id).await?; + // Always remove the channel if is_primary { - let _ = self.secondary.remove_channel(uaid, channel_id).await?; + let _ = self + .secondary + .remove_channel(uaid, channel_id) + .await + .map_err(|e| { + debug!("⚖ Secondary remove_channel error: {:?}", e); + self.metrics + .incr_with_tags("database.dual.error") + .with_tag("func", "remove_channel") + .send(); + + e + }); } Ok(result) } @@ -212,11 +292,21 @@ impl DbClient for DualClientImpl { let mut result = target .remove_node_id(uaid, node_id, connected_at, version) .await?; + // Always remove the node_id. if is_primary { result = self .secondary .remove_node_id(uaid, node_id, connected_at, version) - .await? + .await + .unwrap_or_else(|e| { + debug!("⚖ Secondary remove_node_id error: {:?}", e); + self.metrics + .incr_with_tags("database.dual.error") + .with_tag("func", "remove_node_id") + .send(); + + false + }) || result; } Ok(result) @@ -233,8 +323,21 @@ impl DbClient for DualClientImpl { async fn remove_message(&self, uaid: &Uuid, sort_key: &str) -> DbResult<()> { let (target, is_primary) = self.allot(uaid).await?; let result = target.remove_message(uaid, sort_key).await?; + // Always remove the message if is_primary { - let _ = self.primary.remove_message(uaid, sort_key).await?; + // this will be increasingly chatty as we wind down dynamodb. + let _ = self + .secondary + .remove_message(uaid, sort_key) + .await + .map_err(|e| { + debug!("⚖ Secondary remove_message error: {:?}", e); + self.metrics + .incr_with_tags("database.dual.error") + .with_tag("func", "remove_message") + .send(); + e + }); } Ok(result) } @@ -281,8 +384,19 @@ impl DbClient for DualClientImpl { async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> { let (target, is_primary) = self.allot(uaid).await?; - if is_primary && self.write_to_secondary { - let _ = self.secondary.increment_storage(uaid, timestamp).await?; + if is_primary { + let _ = self + .secondary + .increment_storage(uaid, timestamp) + .await + .map_err(|e| { + debug!("⚖ Secondary increment_storage error: {:?}", e); + self.metrics + .incr_with_tags("database.dual.error") + .with_tag("func", "increment_storage") + .send(); + e + }); } target.increment_storage(uaid, timestamp).await } diff --git a/autopush-common/src/db/dynamodb/mod.rs b/autopush-common/src/db/dynamodb/mod.rs index 9ab51ab19..28bf128e2 100644 --- a/autopush-common/src/db/dynamodb/mod.rs +++ b/autopush-common/src/db/dynamodb/mod.rs @@ -632,6 +632,7 @@ impl DbClient for DdbClientImpl { .map_err(|e| DbError::General(format!("DynamoDB health check failure: {:?}", e)))?; if let Some(names) = result.table_names { // We found at least one table that matches the message_table + debug!("dynamodb ok"); return Ok(!names.is_empty()); } // Huh, we couldn't find a message table? That's a failure.