From bf607061c7545af792d1c1d3f55f3f517a4f5402 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Sat, 3 Feb 2024 20:15:36 -0800 Subject: [PATCH 1/2] feat: filter reads by latest cell and generally add the exact GC policy of the column family to all reads also fix User::version from writing to DynamoDB (it's Bigtable only) Issue: SYNC-4094 --- .../src/db/bigtable/bigtable_client/mod.rs | 119 +++++++++++------- autopush-common/src/db/mod.rs | 1 + 2 files changed, 76 insertions(+), 44 deletions(-) diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 380c27340..23d6baf26 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -74,7 +74,16 @@ pub struct BigTableClientImpl { pool: BigTablePool, } -fn timestamp_filter() -> Result { +/// Return a a RowFilter matching the GC policy of the router Column Family +fn router_gc_policy_filter() -> data::RowFilter { + let mut latest_cell_filter = data::RowFilter::default(); + latest_cell_filter.set_cells_per_column_limit_filter(1); + latest_cell_filter +} + +/// Return a chain of RowFilters matching the GC policy of the message Column +/// Families +fn message_gc_policy_filter() -> Result, error::BigTableError> { let mut timestamp_filter = data::RowFilter::default(); let bt_now: i64 = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -84,7 +93,7 @@ fn timestamp_filter() -> Result { range_filter.set_start_timestamp_micros(bt_now * 1000); timestamp_filter.set_timestamp_range_filter(range_filter); - Ok(timestamp_filter) + Ok(vec![router_gc_policy_filter(), timestamp_filter]) } /// Escape bytes for RE values @@ -109,12 +118,9 @@ fn escape_bytes(bytes: &[u8]) -> Vec { vec } -/// Return a RowFilter limiting to a match of the specified `version`'s column -/// value -fn version_filter(version: &Uuid) -> data::RowFilter { - let mut router_filter_chain = RowFilter_Chain::default(); - let mut filter_set: RepeatedField = RepeatedField::default(); - +/// Return a chain of RowFilters limiting to a match of the specified +/// `version`'s column value +fn version_filter(version: &Uuid) -> Vec { let mut family_filter = data::RowFilter::default(); family_filter.set_family_name_regex_filter(format!("^{ROUTER_FAMILY}$")); @@ -124,26 +130,28 @@ fn version_filter(version: &Uuid) -> data::RowFilter { let mut value_filter = data::RowFilter::default(); value_filter.set_value_regex_filter(escape_bytes(version.as_bytes())); - filter_set.push(family_filter); - filter_set.push(cq_filter); - filter_set.push(value_filter); - - router_filter_chain.set_filters(filter_set); - let mut router_filter = RowFilter::default(); - router_filter.set_chain(router_filter_chain); - router_filter + vec![family_filter, cq_filter, value_filter] } /// Return a newly generated `version` column `Cell` fn new_version_cell(timestamp: SystemTime) -> cell::Cell { cell::Cell { qualifier: "version".to_owned(), - value: Uuid::new_v4().into_bytes().to_vec(), + value: Uuid::new_v4().into(), timestamp, ..Default::default() } } +/// Return a RowFilter chain from multiple RowFilters +fn filter_chain(filters: impl Into>) -> RowFilter { + let mut chain = RowFilter_Chain::default(); + chain.set_filters(filters.into()); + let mut filter = RowFilter::default(); + filter.set_chain(chain); + filter +} + /// Return a ReadRowsRequest against table for a given row key fn read_row_request(table_name: &str, row_key: &str) -> bigtable::ReadRowsRequest { let mut req = bigtable::ReadRowsRequest::default(); @@ -431,6 +439,7 @@ impl BigTableClientImpl { } /// Delete all cell data from the specified columns with the optional time range. + #[allow(unused)] async fn delete_cells( &self, row_key: &str, @@ -638,8 +647,9 @@ impl DbClient for BigTableClientImpl { let row = self.user_to_row(user); // Only add when the user doesn't already exist - let mut filter = RowFilter::default(); - filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes()); + let mut row_key_filter = RowFilter::default(); + row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes()); + let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]); if self.check_and_mutate_row(row, filter, false).await? { return Err(DbError::Conditional); @@ -654,8 +664,13 @@ impl DbClient for BigTableClientImpl { let Some(ref version) = user.version else { return Err(DbError::General("Expected a user version field".to_owned())); }; + + let mut filters = vec![router_gc_policy_filter()]; + filters.extend(version_filter(version)); + let filter = filter_chain(filters); + Ok(self - .check_and_mutate_row(self.user_to_row(user), version_filter(version), true) + .check_and_mutate_row(self.user_to_row(user), filter, true) .await?) } @@ -666,11 +681,6 @@ impl DbClient for BigTableClientImpl { }; trace!("🉑 Found a record for {}", row_key); - let version = row - .take_required_cell("version")? - .value - .try_into() - .map_err(|e| DbError::Serialization(format!("Could not deserialize version: {e:?}")))?; let mut result = User { uaid: *uaid, connected_at: to_u64( @@ -678,7 +688,14 @@ impl DbClient for BigTableClientImpl { "connected_at", )?, router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?, - version: Some(Uuid::from_bytes(version)), + version: Some( + row.take_required_cell("version")? + .value + .try_into() + .map_err(|e| { + DbError::Serialization(format!("Could not deserialize version: {e:?}")) + })?, + ), ..Default::default() }; @@ -761,23 +778,17 @@ impl DbClient for BigTableClientImpl { let row_key = uaid.simple().to_string(); let mut req = self.read_row_request(&row_key); - let mut filter_set: RepeatedField = RepeatedField::default(); - let mut family_filter = data::RowFilter::default(); family_filter.set_family_name_regex_filter(format!("^{ROUTER_FAMILY}$")); let mut cq_filter = data::RowFilter::default(); cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec()); - filter_set.push(family_filter); - filter_set.push(cq_filter); - - let mut filter_chain = RowFilter_Chain::default(); - filter_chain.set_filters(filter_set); - - let mut filter = data::RowFilter::default(); - filter.set_chain(filter_chain); - req.set_filter(filter); + req.set_filter(filter_chain(vec![ + router_gc_policy_filter(), + family_filter, + cq_filter, + ])); let mut rows = self.read_rows(req).await?; let mut result = HashSet::new(); @@ -835,7 +846,10 @@ impl DbClient for BigTableClientImpl { }; let mut req = self.check_and_mutate_row_request(&row_key); - req.set_predicate_filter(version_filter(version)); + + let mut filters = vec![router_gc_policy_filter()]; + filters.extend(version_filter(version)); + req.set_predicate_filter(filter_chain(filters)); req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?); Ok(self.check_and_mutate(req).await?) @@ -993,7 +1007,7 @@ impl DbClient for BigTableClientImpl { rows.set_row_ranges(row_ranges); req.set_rows(rows); - req.set_filter(timestamp_filter()?); + req.set_filter(filter_chain(message_gc_policy_filter()?)); if limit > 0 { trace!("🉑 Setting limit to {limit}"); req.set_rows_limit(limit as i64); @@ -1059,7 +1073,7 @@ impl DbClient for BigTableClientImpl { // therefore run two filters, one to fetch the candidate IDs // and another to fetch the content of the messages. */ - req.set_filter(timestamp_filter()?); + req.set_filter(filter_chain(message_gc_policy_filter()?)); if limit > 0 { req.set_rows_limit(limit as i64); } @@ -1426,12 +1440,10 @@ mod tests { client.remove_user(&uaid).await } - /* - // XXX: uncomment after the uaid clashing fix #[actix_rt::test] async fn add_user_existing() { let client = new_client().unwrap(); - let uaid = Uuid::parse_str(TEST_USER).unwrap(); + let uaid = Uuid::new_v4(); let user = User { uaid, ..Default::default() @@ -1442,5 +1454,24 @@ mod tests { let err = client.add_user(&user).await.unwrap_err(); assert!(matches!(err, DbError::Conditional)); } - */ + + #[actix_rt::test] + async fn version_check() { + let client = new_client().unwrap(); + let uaid = Uuid::new_v4(); + let user = User { + uaid, + ..Default::default() + }; + client.remove_user(&uaid).await.unwrap(); + + client.add_user(&user).await.unwrap(); + let user = client.get_user(&uaid).await.unwrap().unwrap(); + assert!(client.update_user(&user).await.unwrap()); + + let fetched = client.get_user(&uaid).await.unwrap().unwrap(); + assert_ne!(user.version, fetched.version); + // should now fail w/ a stale version + assert!(!client.update_user(&user).await.unwrap()); + } } diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 1c8619e37..e5e0e23f7 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -183,6 +183,7 @@ pub struct User { #[serde(skip_serializing_if = "Option::is_none")] pub current_timestamp: Option, /// UUID4 version number for optimistic locking of updates on Bigtable + #[serde(skip_serializing)] pub version: Option, } From 4032cb3c9fd6493b1e9baad18fdfdd6497e5c951 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Mon, 5 Feb 2024 16:33:45 -0800 Subject: [PATCH 2/2] add gen_test_uaid --- .../src/db/bigtable/bigtable_client/mod.rs | 18 ++++++------------ autopush-common/src/lib.rs | 1 + autopush-common/src/test_support.rs | 11 +++++++++++ 3 files changed, 18 insertions(+), 12 deletions(-) create mode 100644 autopush-common/src/test_support.rs diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 23d6baf26..3d45a2380 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -1142,7 +1142,7 @@ mod tests { use uuid; use super::*; - use crate::{db::DbSettings, util::ms_since_epoch}; + use crate::{db::DbSettings, test_support::gen_test_uaid, util::ms_since_epoch}; const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB"; const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB"; @@ -1407,16 +1407,8 @@ mod tests { #[actix_rt::test] async fn read_cells_family_id() -> DbResult<()> { - // let uaid = Uuid::parse_str(TEST_USER).unwrap(); - // generate a somewhat random test UAID to prevent possible false test fails - // if the account is deleted before this test completes. - let uaid = { - let temp = Uuid::new_v4().to_string(); - let mut parts: Vec<&str> = temp.split('-').collect(); - parts[0] = "DEADBEEF"; - Uuid::parse_str(&parts.join("-")).unwrap() - }; let client = new_client().unwrap(); + let uaid = gen_test_uaid(); client.remove_user(&uaid).await.unwrap(); let qualifier = "foo".to_owned(); @@ -1443,7 +1435,7 @@ mod tests { #[actix_rt::test] async fn add_user_existing() { let client = new_client().unwrap(); - let uaid = Uuid::new_v4(); + let uaid = gen_test_uaid(); let user = User { uaid, ..Default::default() @@ -1458,7 +1450,7 @@ mod tests { #[actix_rt::test] async fn version_check() { let client = new_client().unwrap(); - let uaid = Uuid::new_v4(); + let uaid = gen_test_uaid(); let user = User { uaid, ..Default::default() @@ -1473,5 +1465,7 @@ mod tests { assert_ne!(user.version, fetched.version); // should now fail w/ a stale version assert!(!client.update_user(&user).await.unwrap()); + + client.remove_user(&uaid).await.unwrap(); } } diff --git a/autopush-common/src/lib.rs b/autopush-common/src/lib.rs index b8dc4c844..a96df47b5 100644 --- a/autopush-common/src/lib.rs +++ b/autopush-common/src/lib.rs @@ -15,6 +15,7 @@ pub mod middleware; pub mod notification; pub mod sentry; pub mod tags; +pub mod test_support; #[macro_use] pub mod util; diff --git a/autopush-common/src/test_support.rs b/autopush-common/src/test_support.rs new file mode 100644 index 000000000..787efd708 --- /dev/null +++ b/autopush-common/src/test_support.rs @@ -0,0 +1,11 @@ +use uuid::Uuid; + +/// Generate a UAID that is prefixed with the test-identification ID "DEADBEEF". +/// Note: It's absolutely possible that this might cause a conflict with valid UAIDs, but +/// the risk is reasonably small, and we could limit pruning to whenever we had +/// accidentally run the test script against production. +pub fn gen_test_uaid() -> Uuid { + let temp = Uuid::new_v4(); + let (_, d2, d3, d4) = temp.as_fields(); + Uuid::from_fields(0xdeadbeef, d2, d3, d4) +}