Skip to content

Commit

Permalink
feat: read via row ranges instead of regex (#564)
Browse files Browse the repository at this point in the history
probably more importantly this removes the chid the message row_keys so
we always read messages with a concrete prefix

also switches the chid "metarecords" row key to uaid@chid to distinguish
from message's uaid#chidmessageid

update shlex per the latest RUSTSEC (and also ignore it for the sake of
leagcy autopush's usage)

SYNC-4068
  • Loading branch information
pjenvey authored Jan 23, 2024
1 parent 60c8b33 commit 0a96e1b
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 146 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

223 changes: 81 additions & 142 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use uuid::Uuid;
use crate::db::{
client::{DbClient, FetchMessageResponse},
error::{DbError, DbResult},
DbSettings, Notification, NotificationRecord, User,
DbSettings, Notification, User,
};
use crate::notification::STANDARD_NOTIFICATION_PREFIX;

use self::row::Row;
use super::pool::BigTablePool;
Expand Down Expand Up @@ -100,21 +99,6 @@ fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
})
}

/// Create a normalized index key.
fn as_key(uaid: &Uuid, channel_id: Option<&Uuid>, chidmessageid: Option<&str>) -> String {
let mut parts: Vec<String> = Vec::new();
parts.push(uaid.as_simple().to_string());
if let Some(channel_id) = channel_id {
parts.push(channel_id.as_hyphenated().to_string());
} else if chidmessageid.is_some() {
parts.push("".to_string())
}
if let Some(chidmessageid) = chidmessageid {
parts.push(chidmessageid.to_owned());
}
parts.join("#")
}

/// Connect to a BigTable storage model.
///
/// BigTable is available via the Google Console, and is a schema less storage system.
Expand Down Expand Up @@ -709,12 +693,13 @@ impl DbClient for BigTableClientImpl {
}

async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> {
self.delete_rows(&as_key(uaid, None, None)).await?;
let row_key = uaid.simple().to_string();
self.delete_rows(&row_key).await?;
Ok(())
}

async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
let key = as_key(uaid, Some(channel_id), None);
let key = format!("{}@{}", uaid.simple(), channel_id.as_hyphenated());

// We can use the default timestamp here because there shouldn't be a time
// based GC for router records.
Expand Down Expand Up @@ -754,7 +739,7 @@ impl DbClient for BigTableClientImpl {
// will create/update the channels.
for channel in channels {
let mut entry = bigtable::MutateRowsRequest_Entry::default();
let key = as_key(uaid, Some(&channel), None);
let key = format!("{}@{}", uaid.simple(), channel.as_hyphenated());
entry.set_row_key(key.into_bytes());

let mut cell_mutations = protobuf::RepeatedField::default();
Expand Down Expand Up @@ -836,37 +821,28 @@ impl DbClient for BigTableClientImpl {
async fn get_channels(&self, uaid: &Uuid) -> DbResult<HashSet<Uuid>> {
let mut result = HashSet::new();

let req = {
let filter = {
let mut strip_filter = data::RowFilter::default();
strip_filter.set_strip_value_transformer(true);
let mut regex_filter = data::RowFilter::default();
// Your regex expression must match the WHOLE string. No partial matches.
// For this, we only want to grab the channel meta records (which do not
// have a sort key suffix)
let key = format!("^{}#[^#]+", uaid.simple());
regex_filter.set_row_key_regex_filter(key.as_bytes().to_vec());
let mut chain = data::RowFilter_Chain::default();
let mut repeat_field = RepeatedField::default();
repeat_field.push(strip_filter);
repeat_field.push(regex_filter);
chain.set_filters(repeat_field);

let mut filter = data::RowFilter::default();
filter.set_chain(chain);
filter
};

let mut req = bigtable::ReadRowsRequest::default();
req.set_table_name(self.settings.table_name.clone());
req.set_filter(filter);
let mut req = bigtable::ReadRowsRequest::default();
req.set_table_name(self.settings.table_name.clone());

req
};
let start_key = format!("{}@", uaid.simple());
// '[' is the char after '@'
let end_key = format!("{}[", uaid.simple());
let mut rows = data::RowSet::default();
let mut row_range = data::RowRange::default();
row_range.set_start_key_open(start_key.into_bytes());
row_range.set_end_key_open(end_key.into_bytes());
let mut row_ranges = RepeatedField::default();
row_ranges.push(row_range);
rows.set_row_ranges(row_ranges);
req.set_rows(rows);

let mut strip_filter = data::RowFilter::default();
strip_filter.set_strip_value_transformer(true);
req.set_filter(strip_filter);

let rows = self.read_rows(req, None, None).await?;
for key in rows.keys().map(|v| v.to_owned()).collect::<Vec<String>>() {
if let Some(chid) = key.split('#').last() {
if let Some(chid) = key.split('@').last() {
result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?);
}
}
Expand All @@ -876,7 +852,7 @@ impl DbClient for BigTableClientImpl {

/// Delete the channel and all associated pending messages.
async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<bool> {
let row_key = as_key(uaid, Some(channel_id), None);
let row_key = format!("{}@{}", uaid.simple(), channel_id.as_hyphenated());
Ok(self.delete_rows(&row_key).await?)
}

Expand All @@ -892,7 +868,7 @@ impl DbClient for BigTableClientImpl {
&uaid.simple().to_string(),
UNIX_EPOCH + Duration::from_secs(connected_at)
);
let row_key = as_key(uaid, None, None);
let row_key = uaid.simple().to_string();
let mut time_range = data::TimestampRange::default();
// convert connected at seconds into microseconds
time_range.set_end_timestamp_micros((connected_at * 1000000) as i64);
Expand All @@ -908,11 +884,7 @@ impl DbClient for BigTableClientImpl {

/// Write the notification to storage.
async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> {
let row_key = as_key(
uaid,
Some(&message.channel_id),
Some(&message.chidmessageid()),
);
let row_key = format!("{}#{}", uaid.simple(), message.chidmessageid());
debug!("🗄️ Saving message {} :: {:?}", &row_key, &message);
trace!(
"🉑 timestamp: {:?}",
Expand Down Expand Up @@ -1049,16 +1021,16 @@ impl DbClient for BigTableClientImpl {
/// records with timestamps later than `current_timestamp`.
///
async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> {
let mut row = Row {
row_key: as_key(uaid, None, None),
..Default::default()
};

let row_key = uaid.simple().to_string();
debug!(
"🉑 Updating {} current_timestamp: {:?}",
as_key(uaid, None, None),
&row_key,
timestamp.to_be_bytes().to_vec()
);
let mut row = Row {
row_key,
..Default::default()
};

row.cells.insert(
MESSAGE_FAMILY.to_owned(),
Expand All @@ -1079,9 +1051,7 @@ impl DbClient for BigTableClientImpl {
uaid.to_string(),
chidmessageid
);
let range_key = NotificationRecord::parse_chidmessageid(chidmessageid)
.map_err(|_| DbError::General(format!("Invalid ChidMessageId {}", chidmessageid)))?;
let row_key = as_key(uaid, Some(&range_key.channel_id), Some(chidmessageid));
let row_key = format!("{}#{}", uaid.simple(), chidmessageid);
debug!("🉑🔥 Deleting message {}", &row_key);
self.delete_row(&row_key).await.map_err(|e| e.into())
}
Expand All @@ -1094,31 +1064,19 @@ impl DbClient for BigTableClientImpl {
) -> DbResult<FetchMessageResponse> {
let mut req = ReadRowsRequest::default();
req.set_table_name(self.settings.table_name.clone());
req.set_filter({
let mut filter = data::RowFilter::default();

let mut chain = data::RowFilter_Chain::default();
let mut filter_chain = RepeatedField::default();

let mut row_filter = data::RowFilter::default();
// channels for a given UAID all begin with `{uaid}#`
// this will fetch all messages for all channels and all sort_keys
row_filter.set_row_key_regex_filter(
format!("^{}#[^#]+#01:.+", uaid.simple())
.as_bytes()
.to_vec(),
);

let time_filter = timestamp_filter()?;
// Filter by the keyed value first, then by the time.
filter_chain.push(row_filter);
filter_chain.push(time_filter);

chain.set_filters(filter_chain);
filter.set_chain(chain);

filter
});

let start_key = format!("{}#01:", uaid.simple());
let end_key = format!("{}#02:", uaid.simple());
let mut rows = data::RowSet::default();
let mut row_range = data::RowRange::default();
row_range.set_start_key_open(start_key.into_bytes());
row_range.set_end_key_open(end_key.into_bytes());
let mut row_ranges = RepeatedField::default();
row_ranges.push(row_range);
rows.set_row_ranges(row_ranges);
req.set_rows(rows);

req.set_filter(timestamp_filter()?);
// Note set_rows_limit(v) limits the returned results
// If you're doing additional filtering later, this is not what
// you want.
Expand Down Expand Up @@ -1148,55 +1106,37 @@ impl DbClient for BigTableClientImpl {
let mut req = ReadRowsRequest::default();
req.set_table_name(self.settings.table_name.clone());

let mut rows = data::RowSet::default();
let mut row_range = data::RowRange::default();

let start_key = if let Some(ts) = timestamp {
format!("{}#02:{}", uaid.simple(), ts)
} else {
format!("{}#02:", uaid.simple())
};
let end_key = format!("{}#03:", uaid.simple());
row_range.set_start_key_open(start_key.into_bytes());
row_range.set_end_key_open(end_key.into_bytes());

let mut row_ranges = RepeatedField::default();
row_ranges.push(row_range);
rows.set_row_ranges(row_ranges);
req.set_rows(rows);

// We can fetch data and do [some remote filtering](https://cloud.google.com/bigtable/docs/filters),
// unfortunately I don't think the filtering we need will be super helpful.
//
//
let filter = {
let mut filter = data::RowFilter::default();

let mut chain = data::RowFilter_Chain::default();
let mut filter_chain = RepeatedField::default();

let mut row_filter = data::RowFilter::default();
// Only look for channelids for the given UAID.
// start by looking for rows that roughly match what we want
// Note: BigTable provides a good deal of specialized filtering, but
// it tends to be overly specialized. (For instance, a value range retuns
// cells which has values within a specific range. Not rows, not families,
// cells. There does not appear to be a way to chain this so that it only
// looks for rows with ranged values within a given family or qualifier types
// That must be done externally.)

// look for anything belonging to this UAID that is also a Standard Notification
let pattern = format!(
"^{}#[^#]+#{}:.*",
uaid.simple(),
STANDARD_NOTIFICATION_PREFIX,
);
trace!("🉑 regex filter {:?}", pattern);
row_filter.set_row_key_regex_filter(pattern.as_bytes().to_vec());
filter_chain.push(row_filter);

// Filter on our TTL.
let time_filter = timestamp_filter()?;
filter_chain.push(time_filter);

/*
//NOTE: if you filter on a given field, BigTable will only
// return that specific field. Adding filters for the rest of
// the known elements may NOT return those elements or may
// cause the message to not be returned because any of
// those elements are not present. It may be preferable to
// therefore run two filters, one to fetch the candidate IDs
// and another to fetch the content of the messages.
*/

chain.set_filters(filter_chain);
filter.set_chain(chain);
filter
};
req.set_filter(filter);
/*
//NOTE: if you filter on a given field, BigTable will only
// return that specific field. Adding filters for the rest of
// the known elements may NOT return those elements or may
// cause the message to not be returned because any of
// those elements are not present. It may be preferable to
// therefore run two filters, one to fetch the candidate IDs
// and another to fetch the content of the messages.
*/
req.set_filter(timestamp_filter()?);
// Note set_rows_limit(v) limits the returned results from Bigtable.
// If you're doing additional filtering later, this may not be what
// you want and may artificially truncate possible return sets.
Expand Down Expand Up @@ -1297,15 +1237,6 @@ mod tests {
BigTableClientImpl::new(metrics, &settings)
}

#[test]
fn row_key() {
let uaid = Uuid::parse_str(TEST_USER).unwrap();
let chid = Uuid::parse_str(TEST_CHID).unwrap();
let chidmessageid = "01:decafbad-0000-0000-0000-0123456789ab:Inbox";
let k = as_key(&uaid, Some(&chid), Some(chidmessageid));
assert_eq!(k, "deadbeef0000000000000123456789ab#decafbad-0000-0000-0000-0123456789ab#01:decafbad-0000-0000-0000-0123456789ab:Inbox");
}

#[actix_rt::test]
async fn health_check() {
let client = new_client().unwrap();
Expand Down Expand Up @@ -1367,10 +1298,18 @@ mod tests {
for _ in 1..10 {
new_channels.insert(uuid::Uuid::new_v4());
}
let chid_to_remove = uuid::Uuid::new_v4();
new_channels.insert(chid_to_remove);
client.add_channels(&uaid, new_channels.clone()).await?;
let channels = client.get_channels(&uaid).await?;
assert_eq!(channels, new_channels);

// can we remove a channel?
client.remove_channel(&uaid, &chid_to_remove).await?;
new_channels.remove(&chid_to_remove);
let channels = client.get_channels(&uaid).await?;
assert_eq!(channels, new_channels);

// now ensure that we can update a user that's after the time we set prior.
// first ensure that we can't update a user that's before the time we set prior.
let updated = User {
Expand Down

0 comments on commit 0a96e1b

Please sign in to comment.