Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix add/update_user version field handling on migrations #619

Merged
merged 1 commit into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl UnidentifiedClient {
}
user.connected_at = connected_at;
user.set_last_connect();
if !self.app_state.db.update_user(&user).await? {
if !self.app_state.db.update_user(&mut user).await? {
let _ = self.app_state.metrics.incr("ua.already_connected");
return Err(SMErrorKind::AlreadyConnected.into());
}
Expand Down
2 changes: 1 addition & 1 deletion autoendpoint/src/routers/adm/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl Router for AdmRouter {
);
user.router_data = Some(router_data);

if !self.db.update_user(&user).await? {
if !self.db.update_user(&mut user).await? {
// Unlikely to occur on mobile records
return Err(ApiErrorKind::General("Conditional update failed".to_owned()).into());
}
Expand Down
4 changes: 2 additions & 2 deletions autoendpoint/src/routes/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ pub async fn update_token_route(
let router_data = router.register(&router_data_input, &path_args.app_id)?;

// Update the user in the database
let user = User {
let mut user = User {
uaid: path_args.uaid,
router_type: path_args.router_type.to_string(),
router_data: Some(router_data),
..Default::default()
};
trace!("🌍 Updating user with UAID {}", user.uaid);
trace!("🌍 user = {:?}", user);
if !app_state.db.update_user(&user).await? {
if !app_state.db.update_user(&mut user).await? {
// Unlikely to occur on mobile records
return Err(ApiErrorKind::General("Conditional update failed".to_owned()).into());
}
Expand Down
52 changes: 36 additions & 16 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,12 @@ impl BigTableClientImpl {
Ok(notif)
}

fn user_to_row(&self, user: &User) -> Row {
/// Return a Row for writing from a [User] and a `version`
///
/// `version` is specified as an argument (ignoring [User::version]) so
/// that [update_user] may specify a new version to write before modifying
/// the [User] struct
fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
let row_key = user.uaid.simple().to_string();
let mut row = Row::new(row_key);
let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL);
Expand Down Expand Up @@ -611,8 +616,12 @@ impl BigTableClientImpl {
});
};

// Always write a newly generated version
cells.push(new_version_cell(expiry));
cells.push(cell::Cell {
qualifier: "version".to_owned(),
value: (*version).into(),
timestamp: expiry,
..Default::default()
});

row.add_cells(ROUTER_FAMILY, cells);
row
Expand Down Expand Up @@ -663,7 +672,12 @@ impl DbClient for BigTableClientImpl {
/// add user to the database
async fn add_user(&self, user: &User) -> DbResult<()> {
trace!("🉑 Adding user");
let row = self.user_to_row(user);
let Some(ref version) = user.version else {
return Err(DbError::General(
"add_user expected a user version field".to_owned(),
));
};
let row = self.user_to_row(user, version);

// Only add when the user doesn't already exist
let mut row_key_filter = RowFilter::default();
Expand All @@ -679,18 +693,24 @@ impl DbClient for BigTableClientImpl {
/// BigTable doesn't really have the concept of an "update". You simply write the data and
/// the individual cells create a new version. Depending on the garbage collection rules for
/// the family, these can either persist or be automatically deleted.
async fn update_user(&self, user: &User) -> DbResult<bool> {
async fn update_user(&self, user: &mut User) -> DbResult<bool> {
let Some(ref version) = user.version else {
return Err(DbError::General("Expected a user version field".to_owned()));
return Err(DbError::General(
"update_user expected a user version field".to_owned(),
));
};

let mut filters = vec![router_gc_policy_filter()];
filters.extend(version_filter(version));
let filter = filter_chain(filters);

Ok(self
.check_and_mutate_row(self.user_to_row(user), filter, true)
.await?)
let new_version = Uuid::new_v4();
// Always write a newly generated version
let row = self.user_to_row(user, &new_version);

let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
user.version = Some(new_version);
Ok(predicate_matched)
}

async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
Expand Down Expand Up @@ -1286,11 +1306,11 @@ mod tests {
// now ensure that we can update a user that's after the time we set
// prior. first ensure that we can't update a user that's before the
// time we set prior to the last write
let updated = User {
let mut updated = User {
connected_at,
..test_user.clone()
};
let result = client.update_user(&updated).await;
let result = client.update_user(&mut updated).await;
assert!(result.is_ok());
assert!(!result.unwrap());

Expand All @@ -1299,11 +1319,11 @@ mod tests {
assert_eq!(fetched.connected_at, fetched2.connected_at);

// and make sure we can update a record with a later connected_at time.
let updated = User {
let mut updated = User {
connected_at: fetched.connected_at + 300,
..fetched2
};
let result = client.update_user(&updated).await;
let result = client.update_user(&mut updated).await;
assert!(result.is_ok());
assert!(result.unwrap());
assert_ne!(
Expand Down Expand Up @@ -1477,13 +1497,13 @@ mod tests {
client.remove_user(&uaid).await.unwrap();

client.add_user(&user).await.unwrap();
let user = client.get_user(&uaid).await.unwrap().unwrap();
assert!(client.update_user(&user).await.unwrap());
let mut user = client.get_user(&uaid).await.unwrap().unwrap();
assert!(client.update_user(&mut user.clone()).await.unwrap());

let fetched = client.get_user(&uaid).await.unwrap().unwrap();
assert_ne!(user.version, fetched.version);
// should now fail w/ a stale version
assert!(!client.update_user(&user).await.unwrap());
assert!(!client.update_user(&mut user).await.unwrap());

client.remove_user(&uaid).await.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion autopush-common/src/db/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub trait DbClient: Send + Sync {
/// update will not occur if the user does not already exist, has a
/// different router type, or has a newer `connected_at` timestamp.
// TODO: make the bool a #[must_use]
async fn update_user(&self, user: &User) -> DbResult<bool>;
async fn update_user(&self, user: &mut User) -> DbResult<bool>;

/// Read a user from the database
async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>>;
Expand Down
8 changes: 6 additions & 2 deletions autopush-common/src/db/dual/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl DbClient for DualClientImpl {
Ok(result)
}

async fn update_user(&self, user: &User) -> DbResult<bool> {
async fn update_user(&self, user: &mut User) -> DbResult<bool> {
// If the UAID is in the allowance, move them to the new data store
let (target, is_primary) = self.allot(&user.uaid).await?;
let result = target.update_user(user).await?;
Expand All @@ -181,9 +181,13 @@ impl DbClient for DualClientImpl {
Ok(None) => {
if is_primary {
// The user wasn't in the current primary, so fetch them from the secondary.
if let Ok(Some(user)) = self.secondary.get_user(uaid).await {
if let Ok(Some(mut user)) = self.secondary.get_user(uaid).await {
// copy the user record over to the new data store.
debug!("⚖ Found user record in secondary, moving to primary");
// Users read from DynamoDB lack a version field needed
// for Bigtable
debug_assert!(user.version.is_none());
user.version = Some(Uuid::new_v4());
self.primary.add_user(&user).await?;
let channels = self.secondary.get_channels(uaid).await?;
self.primary.add_channels(uaid, channels).await?;
Expand Down
2 changes: 1 addition & 1 deletion autopush-common/src/db/dynamodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl DbClient for DdbClientImpl {
Ok(())
}

async fn update_user(&self, user: &User) -> DbResult<bool> {
async fn update_user(&self, user: &mut User) -> DbResult<bool> {
let mut user_map = serde_dynamodb::to_hashmap(&user)?;
user_map.remove("uaid");
let input = UpdateItemInput {
Expand Down
2 changes: 1 addition & 1 deletion autopush-common/src/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl DbClient for Arc<MockDbClient> {
Arc::as_ref(self).add_user(user).await
}

async fn update_user(&self, user: &User) -> DbResult<bool> {
async fn update_user(&self, user: &mut User) -> DbResult<bool> {
Arc::as_ref(self).update_user(user).await
}

Expand Down