Skip to content

Commit

Permalink
bug: filter by timestamp (#548)
Browse files Browse the repository at this point in the history
This includes a filter by cell timestamp. This also includes a lot of
inline comments pointing out potential hazards and pitfalls that I
experienced working on this.

NOTE: There is a (hopefully remote) possibility that a record could be
returned with a specific cell missing if that cell's timestamp has
expired. Timestamps are per cell, not per row.

Closes: SYNC-4060
  • Loading branch information
jrconlin committed Jan 29, 2024
1 parent 30b498f commit 1a64b93
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 46 deletions.
2 changes: 1 addition & 1 deletion autoendpoint/src/routes/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn register_uaid_route(
trace!("🌍 Creating secret for UAID {}", user.uaid);
let auth_keys = app_state.settings.auth_keys();
let auth_key = auth_keys
.get(0)
.first()
.expect("At least one auth key must be provided in the settings");
let secret = AuthorizationCheck::generate_token(auth_key, &user.uaid)
.map_err(ApiErrorKind::RegistrationSecretHash)?;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Various things we learned while dealing with Bigtable

This document contains tricks and traps that we've discovered while working with Bigtable.

## Indicies

* Bigtable uses a single key, which it uses to locate data as proximate to other values (e.g.
"Foo-123" would be stored proximate to "Foo-456"). The suggested mechanism is to collapse significant values into a single key using well-known separator values and use regular-expression syntax for the key index to limit searches.

* Regular expressions must match the entire key. A partial match is not considered for inclusion in the result set. (e.g. `Foo` will not match `Foo-123`, however `Foo.*` will)

## Cells

* Cell values are stored as Byte arrays, and library functions use field hints in order to store and retrieve values, but be mindful of storage differences. (e.g. a u64 value or search will not match a u128 byte array.)

## Filters

* Cell Filters are exclusive by default. This means that when you do a filter for a given cell value or qualifier, the result set will only contain the filtered results.
This is a bit hard to explain without a proper example, but presume the following filter:

```rust
fn expiry_filter() -> Result<data::RowFilter, error::BigTableError> {
let mut expiry_filter = data::RowFilter::default();
let mut chain = data::RowFilter_Chain::default();
let mut filter_chain = RepeatedField::default();

let mut key_filter = data::RowFilter::default();
key_filter.set_column_qualifier_regex_filter("expiry".as_bytes().to_vec());
let bt_now: u128 = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(error::BigTableError::WriteTime)?
.as_millis();
let mut value_filter = data::RowFilter::default();
let mut range = data::ValueRange::default();

// Valid timestamps have not yet expired.
range.set_start_value_open(bt_now.to_be_bytes().to_vec());
value_filter.set_value_range_filter(range);
filter_chain.push(key_filter);
filter_chain.push(value_filter);
chain.set_filters(filter_chain);
expiry_filter.set_chain(chain);
Ok(expiry_filter)
}

```

Adding this filter to a query will return a result that only includes the "expiry" cells which have a value that is greater than the current time. No other cells or values will be included in the return set, however each "row" will include the row meta information, including the key.
117 changes: 74 additions & 43 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ pub struct BigTableClientImpl {
pool: BigTablePool,
}

fn timestamp_filter() -> Result<data::RowFilter, error::BigTableError> {
let mut timestamp_filter = data::RowFilter::default();
let bt_now: i64 = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(error::BigTableError::WriteTime)?
.as_millis() as i64;
let mut range_filter = data::TimestampRange::default();
range_filter.set_start_timestamp_micros(bt_now * 1000);
timestamp_filter.set_timestamp_range_filter(range_filter);

Ok(timestamp_filter)
}

fn to_u64(value: Vec<u8>, name: &str) -> Result<u64, DbError> {
let v: [u8; 8] = value
.try_into()
Expand Down Expand Up @@ -168,15 +181,15 @@ impl BigTableClientImpl {
async fn read_rows(
&self,
req: ReadRowsRequest,
timestamp_filter: Option<u64>,
sortkey_filter: Option<u64>,
limit: Option<usize>,
) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
let bigtable = self.pool.get().await?;
let resp = bigtable
.conn
.read_rows(&req)
.map_err(error::BigTableError::Read)?;
merge::RowMerger::process_chunks(resp, timestamp_filter, limit).await
merge::RowMerger::process_chunks(resp, sortkey_filter, limit).await
}

/// write a given row.
Expand Down Expand Up @@ -651,13 +664,7 @@ impl DbClient for BigTableClientImpl {
trace!("🉑 Found a record for that user");
if let Some(mut cells) = record.take_cells("connected_at") {
if let Some(cell) = cells.pop() {
let v: [u8; 8] = cell.value.try_into().map_err(|e| {
DbError::Serialization(format!(
"Could not deserialize connected_at: {:?}",
e
))
})?;
result.connected_at = u64::from_be_bytes(v);
result.connected_at = to_u64(cell.value, "connected_at")?;
}
}

Expand Down Expand Up @@ -691,13 +698,7 @@ impl DbClient for BigTableClientImpl {

if let Some(mut cells) = record.take_cells("last_connect") {
if let Some(cell) = cells.pop() {
let v: [u8; 8] = cell.value.try_into().map_err(|e| {
DbError::Serialization(format!(
"Could not deserialize last_connect: {:?}",
e
))
})?;
result.last_connect = Some(u64::from_be_bytes(v));
result.last_connect = Some(to_u64(cell.value, "last_connect")?)
}
}

Expand All @@ -713,11 +714,8 @@ impl DbClient for BigTableClientImpl {
}

if let Some(mut cells) = record.take_cells("record_version") {
if let Some(mut cell) = cells.pop() {
// there's only one byte, so pop it off and use it.
if let Some(b) = cell.value.pop() {
result.record_version = Some(b)
}
if let Some(cell) = cells.pop() {
result.record_version = Some(to_u64(cell.value, "record_version")?)
}
}

Expand All @@ -734,13 +732,7 @@ impl DbClient for BigTableClientImpl {

if let Some(mut cells) = record.take_cells("current_timestamp") {
if let Some(cell) = cells.pop() {
let v: [u8; 8] = cell.value.try_into().map_err(|e| {
DbError::Serialization(format!(
"Could not deserialize current_timestamp: {:?}",
e
))
})?;
result.current_timestamp = Some(u64::from_be_bytes(v));
result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
}
}

Expand Down Expand Up @@ -996,6 +988,10 @@ impl DbClient for BigTableClientImpl {
} else {
MESSAGE_FAMILY
};
let expiry: u128 = ttl
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
cells.extend(vec![
cell::Cell {
family: family.to_owned(),
Expand Down Expand Up @@ -1028,12 +1024,7 @@ impl DbClient for BigTableClientImpl {
cell::Cell {
family: family.to_owned(),
qualifier: "expiry".to_owned(),
value: ttl
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
.to_be_bytes()
.to_vec(),
value: expiry.to_be_bytes().to_vec(),
timestamp: ttl,
..Default::default()
},
Expand Down Expand Up @@ -1166,15 +1157,29 @@ impl DbClient for BigTableClientImpl {
let mut req = ReadRowsRequest::default();
req.set_table_name(self.settings.table_name.clone());
req.set_filter({
let mut regex_filter = data::RowFilter::default();
let mut filter = data::RowFilter::default();

let mut chain = data::RowFilter_Chain::default();
let mut filter_chain = RepeatedField::default();

let mut row_filter = data::RowFilter::default();
// channels for a given UAID all begin with `{uaid}#`
// this will fetch all messages for all channels and all sort_keys
regex_filter.set_row_key_regex_filter(
row_filter.set_row_key_regex_filter(
format!("^{}#[^#]+#01:.+", uaid.simple())
.as_bytes()
.to_vec(),
);
regex_filter

let time_filter = timestamp_filter()?;
// Filter by the keyed value first, then by the time.
filter_chain.push(row_filter);
filter_chain.push(time_filter);

chain.set_filters(filter_chain);
filter.set_chain(chain);

filter
});
// Note set_rows_limit(v) limits the returned results
// If you're doing additional filtering later, this is not what
Expand Down Expand Up @@ -1210,6 +1215,12 @@ impl DbClient for BigTableClientImpl {
//
//
let filter = {
let mut filter = data::RowFilter::default();

let mut chain = data::RowFilter_Chain::default();
let mut filter_chain = RepeatedField::default();

let mut row_filter = data::RowFilter::default();
// Only look for channelids for the given UAID.
// start by looking for rows that roughly match what we want
// Note: BigTable provides a good deal of specialized filtering, but
Expand All @@ -1218,15 +1229,33 @@ impl DbClient for BigTableClientImpl {
// cells. There does not appear to be a way to chain this so that it only
// looks for rows with ranged values within a given family or qualifier types
// That must be done externally.)
let mut filter = data::RowFilter::default();

// look for anything belonging to this UAID that is also a Standard Notification
let pattern = format!(
"^{}#[^#]+#{}:.*",
uaid.simple(),
STANDARD_NOTIFICATION_PREFIX,
);
trace!("🉑 regex filter {:?}", pattern);
filter.set_row_key_regex_filter(pattern.as_bytes().to_vec());
row_filter.set_row_key_regex_filter(pattern.as_bytes().to_vec());
filter_chain.push(row_filter);

// Filter on our TTL.
let time_filter = timestamp_filter()?;
filter_chain.push(time_filter);

/*
//NOTE: if you filter on a given field, BigTable will only
// return that specific field. Adding filters for the rest of
// the known elements may NOT return those elements or may
// cause the message to not be returned because any of
// those elements are not present. It may be preferable to
// therefore run two filters, one to fetch the candidate IDs
// and another to fetch the content of the messages.
*/

chain.set_filters(filter_chain);
filter.set_chain(chain);
filter
};
req.set_filter(filter);
Expand Down Expand Up @@ -1375,6 +1404,10 @@ mod tests {
..Default::default()
};

// purge the old user (if present)
// in case a prior test failed for whatever reason.
let _ = client.remove_user(&uaid).await;

// can we add the user?
let user = client.add_user(&test_user).await;
assert!(user.is_ok());
Expand Down Expand Up @@ -1441,10 +1474,8 @@ mod tests {
sortkey_timestamp: Some(sort_key),
..Default::default()
};
assert!(client
.save_message(&uaid, test_notification.clone())
.await
.is_ok());
let res = client.save_message(&uaid, test_notification.clone()).await;
assert!(res.is_ok());

let mut fetched = client
.fetch_timestamp_messages(&uaid, None, 999)
Expand Down
4 changes: 2 additions & 2 deletions autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::util::timing::{ms_since_epoch, sec_since_epoch};
use models::{NotificationHeaders, RangeKey};

const MAX_EXPIRY: u64 = 2_592_000;
pub const USER_RECORD_VERSION: u8 = 1;
pub const USER_RECORD_VERSION: u64 = 1;
/// The maximum TTL for channels, 30 days
pub const MAX_CHANNEL_TTL: u64 = 30 * 24 * 60 * 60;

Expand Down Expand Up @@ -166,7 +166,7 @@ pub struct User {
pub node_id: Option<String>,
/// Record version
#[serde(skip_serializing_if = "Option::is_none")]
pub record_version: Option<u8>,
pub record_version: Option<u64>,
/// LEGACY: Current month table in the database the user is on
#[serde(skip_serializing_if = "Option::is_none")]
pub current_month: Option<String>,
Expand Down

0 comments on commit 1a64b93

Please sign in to comment.