diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs index 03d582b3f..f16b7099d 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs @@ -136,7 +136,7 @@ impl UnidentifiedClient { } user.connected_at = connected_at; user.set_last_connect(); - if !self.app_state.db.update_user(&user).await? { + if !self.app_state.db.update_user(&mut user).await? { let _ = self.app_state.metrics.incr("ua.already_connected"); return Err(SMErrorKind::AlreadyConnected.into()); } diff --git a/autoendpoint/src/routers/adm/router.rs b/autoendpoint/src/routers/adm/router.rs index 6c3fe0691..3c3431bb9 100644 --- a/autoendpoint/src/routers/adm/router.rs +++ b/autoendpoint/src/routers/adm/router.rs @@ -148,7 +148,7 @@ impl Router for AdmRouter { ); user.router_data = Some(router_data); - if !self.db.update_user(&user).await? { + if !self.db.update_user(&mut user).await? { // Unlikely to occur on mobile records return Err(ApiErrorKind::General("Conditional update failed".to_owned()).into()); } diff --git a/autoendpoint/src/routes/registration.rs b/autoendpoint/src/routes/registration.rs index 01232ed04..20a819bf9 100644 --- a/autoendpoint/src/routes/registration.rs +++ b/autoendpoint/src/routes/registration.rs @@ -107,7 +107,7 @@ pub async fn update_token_route( let router_data = router.register(&router_data_input, &path_args.app_id)?; // Update the user in the database - let user = User { + let mut user = User { uaid: path_args.uaid, router_type: path_args.router_type.to_string(), router_data: Some(router_data), @@ -115,7 +115,7 @@ pub async fn update_token_route( }; trace!("🌍 Updating user with UAID {}", user.uaid); trace!("🌍 user = {:?}", user); - if !app_state.db.update_user(&user).await? { + if !app_state.db.update_user(&mut user).await? { // Unlikely to occur on mobile records return Err(ApiErrorKind::General("Conditional update failed".to_owned()).into()); } diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 59cf9945d..a5261464a 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -558,7 +558,12 @@ impl BigTableClientImpl { Ok(notif) } - fn user_to_row(&self, user: &User) -> Row { + /// Return a Row for writing from a [User] and a `version` + /// + /// `version` is specified as an argument (ignoring [User::version]) so + /// that [update_user] may specify a new version to write before modifying + /// the [User] struct + fn user_to_row(&self, user: &User, version: &Uuid) -> Row { let row_key = user.uaid.simple().to_string(); let mut row = Row::new(row_key); let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL); @@ -611,8 +616,12 @@ impl BigTableClientImpl { }); }; - // Always write a newly generated version - cells.push(new_version_cell(expiry)); + cells.push(cell::Cell { + qualifier: "version".to_owned(), + value: (*version).into(), + timestamp: expiry, + ..Default::default() + }); row.add_cells(ROUTER_FAMILY, cells); row @@ -663,7 +672,12 @@ impl DbClient for BigTableClientImpl { /// add user to the database async fn add_user(&self, user: &User) -> DbResult<()> { trace!("🉑 Adding user"); - let row = self.user_to_row(user); + let Some(ref version) = user.version else { + return Err(DbError::General( + "add_user expected a user version field".to_owned(), + )); + }; + let row = self.user_to_row(user, version); // Only add when the user doesn't already exist let mut row_key_filter = RowFilter::default(); @@ -679,18 +693,24 @@ impl DbClient for BigTableClientImpl { /// BigTable doesn't really have the concept of an "update". You simply write the data and /// the individual cells create a new version. Depending on the garbage collection rules for /// the family, these can either persist or be automatically deleted. - async fn update_user(&self, user: &User) -> DbResult { + async fn update_user(&self, user: &mut User) -> DbResult { let Some(ref version) = user.version else { - return Err(DbError::General("Expected a user version field".to_owned())); + return Err(DbError::General( + "update_user expected a user version field".to_owned(), + )); }; let mut filters = vec![router_gc_policy_filter()]; filters.extend(version_filter(version)); let filter = filter_chain(filters); - Ok(self - .check_and_mutate_row(self.user_to_row(user), filter, true) - .await?) + let new_version = Uuid::new_v4(); + // Always write a newly generated version + let row = self.user_to_row(user, &new_version); + + let predicate_matched = self.check_and_mutate_row(row, filter, true).await?; + user.version = Some(new_version); + Ok(predicate_matched) } async fn get_user(&self, uaid: &Uuid) -> DbResult> { @@ -1286,11 +1306,11 @@ mod tests { // now ensure that we can update a user that's after the time we set // prior. first ensure that we can't update a user that's before the // time we set prior to the last write - let updated = User { + let mut updated = User { connected_at, ..test_user.clone() }; - let result = client.update_user(&updated).await; + let result = client.update_user(&mut updated).await; assert!(result.is_ok()); assert!(!result.unwrap()); @@ -1299,11 +1319,11 @@ mod tests { assert_eq!(fetched.connected_at, fetched2.connected_at); // and make sure we can update a record with a later connected_at time. - let updated = User { + let mut updated = User { connected_at: fetched.connected_at + 300, ..fetched2 }; - let result = client.update_user(&updated).await; + let result = client.update_user(&mut updated).await; assert!(result.is_ok()); assert!(result.unwrap()); assert_ne!( @@ -1477,13 +1497,13 @@ mod tests { client.remove_user(&uaid).await.unwrap(); client.add_user(&user).await.unwrap(); - let user = client.get_user(&uaid).await.unwrap().unwrap(); - assert!(client.update_user(&user).await.unwrap()); + let mut user = client.get_user(&uaid).await.unwrap().unwrap(); + assert!(client.update_user(&mut user.clone()).await.unwrap()); let fetched = client.get_user(&uaid).await.unwrap().unwrap(); assert_ne!(user.version, fetched.version); // should now fail w/ a stale version - assert!(!client.update_user(&user).await.unwrap()); + assert!(!client.update_user(&mut user).await.unwrap()); client.remove_user(&uaid).await.unwrap(); } diff --git a/autopush-common/src/db/client.rs b/autopush-common/src/db/client.rs index 183878911..42e5e66f6 100644 --- a/autopush-common/src/db/client.rs +++ b/autopush-common/src/db/client.rs @@ -30,7 +30,7 @@ pub trait DbClient: Send + Sync { /// update will not occur if the user does not already exist, has a /// different router type, or has a newer `connected_at` timestamp. // TODO: make the bool a #[must_use] - async fn update_user(&self, user: &User) -> DbResult; + async fn update_user(&self, user: &mut User) -> DbResult; /// Read a user from the database async fn get_user(&self, uaid: &Uuid) -> DbResult>; diff --git a/autopush-common/src/db/dual/mod.rs b/autopush-common/src/db/dual/mod.rs index 3b2a730c3..a591f8b0a 100644 --- a/autopush-common/src/db/dual/mod.rs +++ b/autopush-common/src/db/dual/mod.rs @@ -157,7 +157,7 @@ impl DbClient for DualClientImpl { Ok(result) } - async fn update_user(&self, user: &User) -> DbResult { + async fn update_user(&self, user: &mut User) -> DbResult { // If the UAID is in the allowance, move them to the new data store let (target, is_primary) = self.allot(&user.uaid).await?; let result = target.update_user(user).await?; @@ -181,9 +181,13 @@ impl DbClient for DualClientImpl { Ok(None) => { if is_primary { // The user wasn't in the current primary, so fetch them from the secondary. - if let Ok(Some(user)) = self.secondary.get_user(uaid).await { + if let Ok(Some(mut user)) = self.secondary.get_user(uaid).await { // copy the user record over to the new data store. debug!("⚖ Found user record in secondary, moving to primary"); + // Users read from DynamoDB lack a version field needed + // for Bigtable + debug_assert!(user.version.is_none()); + user.version = Some(Uuid::new_v4()); self.primary.add_user(&user).await?; let channels = self.secondary.get_channels(uaid).await?; self.primary.add_channels(uaid, channels).await?; diff --git a/autopush-common/src/db/dynamodb/mod.rs b/autopush-common/src/db/dynamodb/mod.rs index 28bf128e2..fa8c18283 100644 --- a/autopush-common/src/db/dynamodb/mod.rs +++ b/autopush-common/src/db/dynamodb/mod.rs @@ -159,7 +159,7 @@ impl DbClient for DdbClientImpl { Ok(()) } - async fn update_user(&self, user: &User) -> DbResult { + async fn update_user(&self, user: &mut User) -> DbResult { let mut user_map = serde_dynamodb::to_hashmap(&user)?; user_map.remove("uaid"); let input = UpdateItemInput { diff --git a/autopush-common/src/db/mock.rs b/autopush-common/src/db/mock.rs index 6b5f5f48c..eedf52905 100644 --- a/autopush-common/src/db/mock.rs +++ b/autopush-common/src/db/mock.rs @@ -20,7 +20,7 @@ impl DbClient for Arc { Arc::as_ref(self).add_user(user).await } - async fn update_user(&self, user: &User) -> DbResult { + async fn update_user(&self, user: &mut User) -> DbResult { Arc::as_ref(self).update_user(user).await }