Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filter reads by latest cell #599

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 75 additions & 44 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ pub struct BigTableClientImpl {
pool: BigTablePool,
}

fn timestamp_filter() -> Result<data::RowFilter, error::BigTableError> {
/// Return a a RowFilter matching the GC policy of the router Column Family
fn router_gc_policy_filter() -> data::RowFilter {
let mut latest_cell_filter = data::RowFilter::default();
latest_cell_filter.set_cells_per_column_limit_filter(1);
latest_cell_filter
}

/// Return a chain of RowFilters matching the GC policy of the message Column
/// Families
fn message_gc_policy_filter() -> Result<Vec<data::RowFilter>, error::BigTableError> {
let mut timestamp_filter = data::RowFilter::default();
let bt_now: i64 = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
Expand All @@ -84,7 +93,7 @@ fn timestamp_filter() -> Result<data::RowFilter, error::BigTableError> {
range_filter.set_start_timestamp_micros(bt_now * 1000);
timestamp_filter.set_timestamp_range_filter(range_filter);

Ok(timestamp_filter)
Ok(vec![router_gc_policy_filter(), timestamp_filter])
}

/// Escape bytes for RE values
Expand All @@ -109,12 +118,9 @@ fn escape_bytes(bytes: &[u8]) -> Vec<u8> {
vec
}

/// Return a RowFilter limiting to a match of the specified `version`'s column
/// value
fn version_filter(version: &Uuid) -> data::RowFilter {
let mut router_filter_chain = RowFilter_Chain::default();
let mut filter_set: RepeatedField<RowFilter> = RepeatedField::default();

/// Return a chain of RowFilters limiting to a match of the specified
/// `version`'s column value
fn version_filter(version: &Uuid) -> Vec<data::RowFilter> {
let mut family_filter = data::RowFilter::default();
family_filter.set_family_name_regex_filter(format!("^{ROUTER_FAMILY}$"));

Expand All @@ -124,26 +130,28 @@ fn version_filter(version: &Uuid) -> data::RowFilter {
let mut value_filter = data::RowFilter::default();
value_filter.set_value_regex_filter(escape_bytes(version.as_bytes()));

filter_set.push(family_filter);
filter_set.push(cq_filter);
filter_set.push(value_filter);

router_filter_chain.set_filters(filter_set);
let mut router_filter = RowFilter::default();
router_filter.set_chain(router_filter_chain);
router_filter
vec![family_filter, cq_filter, value_filter]
}

/// Return a newly generated `version` column `Cell`
fn new_version_cell(timestamp: SystemTime) -> cell::Cell {
cell::Cell {
qualifier: "version".to_owned(),
value: Uuid::new_v4().into_bytes().to_vec(),
value: Uuid::new_v4().into(),
timestamp,
..Default::default()
}
}

/// Return a RowFilter chain from multiple RowFilters
fn filter_chain(filters: impl Into<RepeatedField<RowFilter>>) -> RowFilter {
let mut chain = RowFilter_Chain::default();
chain.set_filters(filters.into());
let mut filter = RowFilter::default();
filter.set_chain(chain);
filter
}

/// Return a ReadRowsRequest against table for a given row key
fn read_row_request(table_name: &str, row_key: &str) -> bigtable::ReadRowsRequest {
let mut req = bigtable::ReadRowsRequest::default();
Expand Down Expand Up @@ -431,6 +439,7 @@ impl BigTableClientImpl {
}

/// Delete all cell data from the specified columns with the optional time range.
#[allow(unused)]
async fn delete_cells(
&self,
row_key: &str,
Expand Down Expand Up @@ -638,8 +647,9 @@ impl DbClient for BigTableClientImpl {
let row = self.user_to_row(user);

// Only add when the user doesn't already exist
let mut filter = RowFilter::default();
filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
let mut row_key_filter = RowFilter::default();
row_key_filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes());
let filter = filter_chain(vec![router_gc_policy_filter(), row_key_filter]);

if self.check_and_mutate_row(row, filter, false).await? {
return Err(DbError::Conditional);
Expand All @@ -654,8 +664,13 @@ impl DbClient for BigTableClientImpl {
let Some(ref version) = user.version else {
return Err(DbError::General("Expected a user version field".to_owned()));
};

let mut filters = vec![router_gc_policy_filter()];
filters.extend(version_filter(version));
let filter = filter_chain(filters);

Ok(self
.check_and_mutate_row(self.user_to_row(user), version_filter(version), true)
.check_and_mutate_row(self.user_to_row(user), filter, true)
.await?)
}

Expand All @@ -666,19 +681,21 @@ impl DbClient for BigTableClientImpl {
};

trace!("🉑 Found a record for {}", row_key);
let version = row
.take_required_cell("version")?
.value
.try_into()
.map_err(|e| DbError::Serialization(format!("Could not deserialize version: {e:?}")))?;
let mut result = User {
uaid: *uaid,
connected_at: to_u64(
row.take_required_cell("connected_at")?.value,
"connected_at",
)?,
router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
version: Some(Uuid::from_bytes(version)),
version: Some(
row.take_required_cell("version")?
.value
.try_into()
.map_err(|e| {
DbError::Serialization(format!("Could not deserialize version: {e:?}"))
})?,
),
..Default::default()
};

Expand Down Expand Up @@ -761,23 +778,17 @@ impl DbClient for BigTableClientImpl {
let row_key = uaid.simple().to_string();
let mut req = self.read_row_request(&row_key);

let mut filter_set: RepeatedField<RowFilter> = RepeatedField::default();

let mut family_filter = data::RowFilter::default();
family_filter.set_family_name_regex_filter(format!("^{ROUTER_FAMILY}$"));

let mut cq_filter = data::RowFilter::default();
cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec());

filter_set.push(family_filter);
filter_set.push(cq_filter);

let mut filter_chain = RowFilter_Chain::default();
filter_chain.set_filters(filter_set);

let mut filter = data::RowFilter::default();
filter.set_chain(filter_chain);
req.set_filter(filter);
req.set_filter(filter_chain(vec![
router_gc_policy_filter(),
family_filter,
cq_filter,
]));

let mut rows = self.read_rows(req).await?;
let mut result = HashSet::new();
Expand Down Expand Up @@ -835,7 +846,10 @@ impl DbClient for BigTableClientImpl {
};

let mut req = self.check_and_mutate_row_request(&row_key);
req.set_predicate_filter(version_filter(version));

let mut filters = vec![router_gc_policy_filter()];
filters.extend(version_filter(version));
req.set_predicate_filter(filter_chain(filters));
req.set_true_mutations(self.get_delete_mutations(ROUTER_FAMILY, &["node_id"], None)?);

Ok(self.check_and_mutate(req).await?)
Expand Down Expand Up @@ -993,7 +1007,7 @@ impl DbClient for BigTableClientImpl {
rows.set_row_ranges(row_ranges);
req.set_rows(rows);

req.set_filter(timestamp_filter()?);
req.set_filter(filter_chain(message_gc_policy_filter()?));
if limit > 0 {
trace!("🉑 Setting limit to {limit}");
req.set_rows_limit(limit as i64);
Expand Down Expand Up @@ -1059,7 +1073,7 @@ impl DbClient for BigTableClientImpl {
// therefore run two filters, one to fetch the candidate IDs
// and another to fetch the content of the messages.
*/
req.set_filter(timestamp_filter()?);
req.set_filter(filter_chain(message_gc_policy_filter()?));
if limit > 0 {
req.set_rows_limit(limit as i64);
}
Expand Down Expand Up @@ -1426,12 +1440,10 @@ mod tests {
client.remove_user(&uaid).await
}

/*
// XXX: uncomment after the uaid clashing fix
#[actix_rt::test]
async fn add_user_existing() {
let client = new_client().unwrap();
let uaid = Uuid::parse_str(TEST_USER).unwrap();
let uaid = Uuid::new_v4();
pjenvey marked this conversation as resolved.
Show resolved Hide resolved
let user = User {
uaid,
..Default::default()
Expand All @@ -1442,5 +1454,24 @@ mod tests {
let err = client.add_user(&user).await.unwrap_err();
assert!(matches!(err, DbError::Conditional));
}
*/

#[actix_rt::test]
async fn version_check() {
let client = new_client().unwrap();
let uaid = Uuid::new_v4();
let user = User {
uaid,
..Default::default()
};
client.remove_user(&uaid).await.unwrap();
pjenvey marked this conversation as resolved.
Show resolved Hide resolved

client.add_user(&user).await.unwrap();
let user = client.get_user(&uaid).await.unwrap().unwrap();
assert!(client.update_user(&user).await.unwrap());

let fetched = client.get_user(&uaid).await.unwrap().unwrap();
assert_ne!(user.version, fetched.version);
// should now fail w/ a stale version
assert!(!client.update_user(&user).await.unwrap());
}
}
1 change: 1 addition & 0 deletions autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub struct User {
#[serde(skip_serializing_if = "Option::is_none")]
pub current_timestamp: Option<u64>,
/// UUID4 version number for optimistic locking of updates on Bigtable
#[serde(skip_serializing)]
pub version: Option<Uuid>,
}

Expand Down