Skip to content

Commit

Permalink
bug: add secondary_write for channels (#597)
Browse files Browse the repository at this point in the history
* also set the default `write_to_secondary` to true

Closes SYNC-4121

---------

Co-authored-by: Philip Jenvey <[email protected]>
  • Loading branch information
jrconlin and pjenvey authored Feb 13, 2024
1 parent 158e01c commit 37b377a
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 29 deletions.
2 changes: 1 addition & 1 deletion autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl AppState {
);
}
let crypto_key = &crypto_key[1..crypto_key.len() - 1];
debug!("Fernet keys: {:?}", &crypto_key);
debug!("🔐 Fernet keys: {:?}", &crypto_key);
let fernets: Vec<Fernet> = crypto_key
.split(',')
.map(|s| s.trim().to_string())
Expand Down
4 changes: 2 additions & 2 deletions autoendpoint/src/extractors/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl FromRequest for Subscription {
async move {
// Collect token info and server state
let token_info = TokenInfo::extract(&req).await?;
trace!("Token info: {:?}", &token_info);
trace!("🔐 Token info: {:?}", &token_info);
let app_state: Data<AppState> =
Data::extract(&req).await.expect("No server state found");
let metrics = Metrics::from(&app_state);
Expand All @@ -84,7 +84,7 @@ impl FromRequest for Subscription {
.fernet
.decrypt(&repad_base64(&token_info.token))
.map_err(|e| {
error!("fernet: {:?}", e);
error!("🔐 fernet: {:?}", e);
ApiErrorKind::InvalidToken
})?;

Expand Down
2 changes: 1 addition & 1 deletion autoendpoint/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl Settings {
let keys = &self.crypto_keys.replace(['"', ' '], "");
let fernets = Self::read_list_from_str(keys, "Invalid AUTOEND_CRYPTO_KEYS")
.map(|key| {
debug!("Fernet keys: {:?}", &key);
debug!("🔐 Fernet keys: {:?}", &key);
Fernet::new(key).expect("Invalid AUTOEND_CRYPTO_KEYS")
})
.collect();
Expand Down
1 change: 1 addition & 0 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ impl BigtableDb {
let (v, _stream) = r.into_future().await;
// Since this should return no rows (with the row key set to a value that shouldn't exist)
// the first component of the tuple should be None.
debug!("🉑 health check");
Ok(v.is_none())
}
}
Expand Down
164 changes: 139 additions & 25 deletions autopush-common/src/db/dual/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashSet;
use std::sync::Arc;

use async_trait::async_trait;
use cadence::StatsdClient;
use cadence::{CountedExt, StatsdClient};
use serde::Deserialize;
use serde_json::from_str;
use uuid::Uuid;
Expand All @@ -27,26 +27,43 @@ use super::StorageType;

#[derive(Clone)]
pub struct DualClientImpl {
/// The primary data store, which will always be Bigtable.
primary: BigTableClientImpl,
/// The secondary data store, which will always be DynamoDB.
secondary: DdbClientImpl,
/// Write changes to the secondary, including messages and updates
/// as well as account and channel additions/deletions.
write_to_secondary: bool,
/// Hex value to use to specify the first byte of the median offset.
/// e.g. "0a" will start from include all UUIDs upto and including "0a"
median: Option<u8>,
metrics: Arc<StatsdClient>,
}

fn default_true() -> bool {
true
}

#[derive(Clone, Debug, Deserialize)]
pub struct DualDbSettings {
/// The primary data store, which will always be Bigtable.
primary: DbSettings,
/// The secondary data store, which will always be DynamoDB.
secondary: DbSettings,
#[serde(default)]
/// Write changes to the secondary, including messages and updates
/// as well as account and channel additions/deletions.
#[serde(default = "default_true")]
write_to_secondary: bool,
/// Hex value to use to specify the first byte of the median offset.
/// e.g. "0a" will start from include all UUIDs upto and including "0a"
#[serde(default)]
median: Option<String>,
}

impl DualClientImpl {
pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
// Not really sure we need the dsn here.
info!("Trying: {:?}", settings.db_settings);
info!("Trying: {:?}", settings.db_settings);
let db_settings: DualDbSettings = from_str(&settings.db_settings).map_err(|e| {
DbError::General(format!("Could not parse DualDBSettings string {:?}", e))
})?;
Expand Down Expand Up @@ -87,6 +104,7 @@ impl DualClientImpl {
secondary: secondary.clone(),
median,
write_to_secondary: db_settings.write_to_secondary,
metrics,
})
}
}
Expand All @@ -110,6 +128,10 @@ impl DualClientImpl {
} else {
(Box::new(&self.primary), true)
};
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("target", &target.0.name())
.send();
debug!("⚖ alloting to {}", target.0.name());
Ok(target)
}
Expand All @@ -119,22 +141,37 @@ impl DualClientImpl {
impl DbClient for DualClientImpl {
async fn add_user(&self, user: &User) -> DbResult<()> {
let (target, is_primary) = self.allot(&user.uaid).await?;
if is_primary && self.write_to_secondary {
let _ = self.secondary.add_user(user).await?;
}
debug!("⚖ adding user to {}...", target.name());
let result = target.add_user(user).await?;
if is_primary && self.write_to_secondary {
let _ = self.secondary.add_user(user).await.map_err(|e| {
error!("⚖ Error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "add_user")
.send();
e
});
}
debug!("⚖ User added...");
Ok(result)
}

async fn update_user(&self, user: &User) -> DbResult<bool> {
// If the UAID is in the allowance, move them to the new data store
let (target, is_primary) = self.allot(&user.uaid).await?;
let result = target.update_user(user).await?;
if is_primary && self.write_to_secondary {
let _ = self.secondary.add_user(user).await?;
let _ = self.secondary.add_user(user).await.map_err(|e| {
error!("⚡ Error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "update_user")
.send();
e
});
}
target.update_user(user).await
Ok(result)
}

async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
Expand Down Expand Up @@ -165,38 +202,81 @@ impl DbClient for DualClientImpl {
if is_primary {
// try removing the user from the old store, just in case.
// leaving them could cause false reporting later.
let _ = self.secondary.remove_user(uaid).await;
let _ = self.secondary.remove_user(uaid).await.map_err(|e| {
debug!("⚖ Secondary remove_user error {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "remove_user")
.send();
e
});
}
Ok(result)
}

async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
debug!("⚖ getting target");
let (target, _) = self.allot(uaid).await?;
let (target, is_primary) = self.allot(uaid).await?;
debug!("⚖ Adding channel to {}", target.name());
target.add_channel(uaid, channel_id).await
let result = target.add_channel(uaid, channel_id).await;
if is_primary && self.write_to_secondary {
let _ = self
.secondary
.add_channel(uaid, channel_id)
.await
.map_err(|e| {
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "add_channel")
.send();
e
});
}
result
}

async fn add_channels(&self, uaid: &Uuid, channels: HashSet<Uuid>) -> DbResult<()> {
let (target, _) = self.allot(uaid).await?;
target.add_channels(uaid, channels).await
let (target, is_primary) = self.allot(uaid).await?;
let result = target.add_channels(uaid, channels.clone()).await;
if is_primary && self.write_to_secondary {
let _ = self
.secondary
.add_channels(uaid, channels)
.await
.map_err(|e| {
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "add_channels")
.send();
e
});
}
result
}

async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
let (target, is_primary) = self.allot(uaid).await?;
let mut channels = target.get_channels(uaid).await?;
// check to see if we need to copy over channels from the secondary
if channels.is_empty() && is_primary {
channels = self.secondary.get_channels(uaid).await?;
}
Ok(channels)
let (target, _is_primary) = self.allot(uaid).await?;
target.get_channels(uaid).await
}

async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
let (target, is_primary) = self.allot(uaid).await?;
let result = target.remove_channel(uaid, channel_id).await?;
// Always remove the channel
if is_primary {
let _ = self.secondary.remove_channel(uaid, channel_id).await?;
let _ = self
.secondary
.remove_channel(uaid, channel_id)
.await
.map_err(|e| {
debug!("⚖ Secondary remove_channel error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "remove_channel")
.send();

e
});
}
Ok(result)
}
Expand All @@ -212,11 +292,21 @@ impl DbClient for DualClientImpl {
let mut result = target
.remove_node_id(uaid, node_id, connected_at, version)
.await?;
// Always remove the node_id.
if is_primary {
result = self
.secondary
.remove_node_id(uaid, node_id, connected_at, version)
.await?
.await
.unwrap_or_else(|e| {
debug!("⚖ Secondary remove_node_id error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "remove_node_id")
.send();

false
})
|| result;
}
Ok(result)
Expand All @@ -233,8 +323,21 @@ impl DbClient for DualClientImpl {
async fn remove_message(&self, uaid: &Uuid, sort_key: &str) -> DbResult<()> {
let (target, is_primary) = self.allot(uaid).await?;
let result = target.remove_message(uaid, sort_key).await?;
// Always remove the message
if is_primary {
let _ = self.primary.remove_message(uaid, sort_key).await?;
// this will be increasingly chatty as we wind down dynamodb.
let _ = self
.secondary
.remove_message(uaid, sort_key)
.await
.map_err(|e| {
debug!("⚖ Secondary remove_message error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "remove_message")
.send();
e
});
}
Ok(result)
}
Expand Down Expand Up @@ -281,8 +384,19 @@ impl DbClient for DualClientImpl {

async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
let (target, is_primary) = self.allot(uaid).await?;
if is_primary && self.write_to_secondary {
let _ = self.secondary.increment_storage(uaid, timestamp).await?;
if is_primary {
let _ = self
.secondary
.increment_storage(uaid, timestamp)
.await
.map_err(|e| {
debug!("⚖ Secondary increment_storage error: {:?}", e);
self.metrics
.incr_with_tags("database.dual.error")
.with_tag("func", "increment_storage")
.send();
e
});
}
target.increment_storage(uaid, timestamp).await
}
Expand Down
1 change: 1 addition & 0 deletions autopush-common/src/db/dynamodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ impl DbClient for DdbClientImpl {
.map_err(|e| DbError::General(format!("DynamoDB health check failure: {:?}", e)))?;
if let Some(names) = result.table_names {
// We found at least one table that matches the message_table
debug!("dynamodb ok");
return Ok(!names.is_empty());
}
// Huh, we couldn't find a message table? That's a failure.
Expand Down

0 comments on commit 37b377a

Please sign in to comment.