diff --git a/autoendpoint/src/routes/registration.rs b/autoendpoint/src/routes/registration.rs index 3fdcd5b1c..01232ed04 100644 --- a/autoendpoint/src/routes/registration.rs +++ b/autoendpoint/src/routes/registration.rs @@ -64,7 +64,7 @@ pub async fn register_uaid_route( trace!("🌍 Creating secret for UAID {}", user.uaid); let auth_keys = app_state.settings.auth_keys(); let auth_key = auth_keys - .get(0) + .first() .expect("At least one auth key must be provided in the settings"); let secret = AuthorizationCheck::generate_token(auth_key, &user.uaid) .map_err(ApiErrorKind::RegistrationSecretHash)?; diff --git a/autopush-common/src/db/bigtable/bigtable_client/Bigtable_Learnings.md b/autopush-common/src/db/bigtable/bigtable_client/Bigtable_Learnings.md new file mode 100644 index 000000000..973508146 --- /dev/null +++ b/autopush-common/src/db/bigtable/bigtable_client/Bigtable_Learnings.md @@ -0,0 +1,48 @@ +# Various things we learned while dealing with Bigtable + +This document contains tricks and traps that we've discovered while working with Bigtable. + +## Indicies + +* Bigtable uses a single key, which it uses to locate data as proximate to other values (e.g. +"Foo-123" would be stored proximate to "Foo-456"). The suggested mechanism is to collapse significant values into a single key using well-known separator values and use regular-expression syntax for the key index to limit searches. + +* Regular expressions must match the entire key. A partial match is not considered for inclusion in the result set. (e.g. `Foo` will not match `Foo-123`, however `Foo.*` will) + +## Cells + +* Cell values are stored as Byte arrays, and library functions use field hints in order to store and retrieve values, but be mindful of storage differences. (e.g. a u64 value or search will not match a u128 byte array.) + +## Filters + +* Cell Filters are exclusive by default. This means that when you do a filter for a given cell value or qualifier, the result set will only contain the filtered results. +This is a bit hard to explain without a proper example, but presume the following filter: + +```rust +fn expiry_filter() -> Result { + let mut expiry_filter = data::RowFilter::default(); + let mut chain = data::RowFilter_Chain::default(); + let mut filter_chain = RepeatedField::default(); + + let mut key_filter = data::RowFilter::default(); + key_filter.set_column_qualifier_regex_filter("expiry".as_bytes().to_vec()); + let bt_now: u128 = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(error::BigTableError::WriteTime)? + .as_millis(); + let mut value_filter = data::RowFilter::default(); + let mut range = data::ValueRange::default(); + + // Valid timestamps have not yet expired. + range.set_start_value_open(bt_now.to_be_bytes().to_vec()); + value_filter.set_value_range_filter(range); + filter_chain.push(key_filter); + filter_chain.push(value_filter); + chain.set_filters(filter_chain); + expiry_filter.set_chain(chain); + Ok(expiry_filter) +} + +``` + +Adding this filter to a query will return a result that only includes the "expiry" cells which have a value that is greater than the current time. No other cells or values will be included in the return set, however each "row" will include the row meta information, including the key. diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index c8f75535c..e544cd316 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -73,6 +73,19 @@ pub struct BigTableClientImpl { pool: BigTablePool, } +fn timestamp_filter() -> Result { + let mut timestamp_filter = data::RowFilter::default(); + let bt_now: i64 = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(error::BigTableError::WriteTime)? + .as_millis() as i64; + let mut range_filter = data::TimestampRange::default(); + range_filter.set_start_timestamp_micros(bt_now * 1000); + timestamp_filter.set_timestamp_range_filter(range_filter); + + Ok(timestamp_filter) +} + fn to_u64(value: Vec, name: &str) -> Result { let v: [u8; 8] = value .try_into() @@ -168,7 +181,7 @@ impl BigTableClientImpl { async fn read_rows( &self, req: ReadRowsRequest, - timestamp_filter: Option, + sortkey_filter: Option, limit: Option, ) -> Result, error::BigTableError> { let bigtable = self.pool.get().await?; @@ -176,7 +189,7 @@ impl BigTableClientImpl { .conn .read_rows(&req) .map_err(error::BigTableError::Read)?; - merge::RowMerger::process_chunks(resp, timestamp_filter, limit).await + merge::RowMerger::process_chunks(resp, sortkey_filter, limit).await } /// write a given row. @@ -651,13 +664,7 @@ impl DbClient for BigTableClientImpl { trace!("🉑 Found a record for that user"); if let Some(mut cells) = record.take_cells("connected_at") { if let Some(cell) = cells.pop() { - let v: [u8; 8] = cell.value.try_into().map_err(|e| { - DbError::Serialization(format!( - "Could not deserialize connected_at: {:?}", - e - )) - })?; - result.connected_at = u64::from_be_bytes(v); + result.connected_at = to_u64(cell.value, "connected_at")?; } } @@ -691,13 +698,7 @@ impl DbClient for BigTableClientImpl { if let Some(mut cells) = record.take_cells("last_connect") { if let Some(cell) = cells.pop() { - let v: [u8; 8] = cell.value.try_into().map_err(|e| { - DbError::Serialization(format!( - "Could not deserialize last_connect: {:?}", - e - )) - })?; - result.last_connect = Some(u64::from_be_bytes(v)); + result.last_connect = Some(to_u64(cell.value, "last_connect")?) } } @@ -713,11 +714,8 @@ impl DbClient for BigTableClientImpl { } if let Some(mut cells) = record.take_cells("record_version") { - if let Some(mut cell) = cells.pop() { - // there's only one byte, so pop it off and use it. - if let Some(b) = cell.value.pop() { - result.record_version = Some(b) - } + if let Some(cell) = cells.pop() { + result.record_version = Some(to_u64(cell.value, "record_version")?) } } @@ -734,13 +732,7 @@ impl DbClient for BigTableClientImpl { if let Some(mut cells) = record.take_cells("current_timestamp") { if let Some(cell) = cells.pop() { - let v: [u8; 8] = cell.value.try_into().map_err(|e| { - DbError::Serialization(format!( - "Could not deserialize current_timestamp: {:?}", - e - )) - })?; - result.current_timestamp = Some(u64::from_be_bytes(v)); + result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?) } } @@ -996,6 +988,10 @@ impl DbClient for BigTableClientImpl { } else { MESSAGE_FAMILY }; + let expiry: u128 = ttl + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); cells.extend(vec![ cell::Cell { family: family.to_owned(), @@ -1028,12 +1024,7 @@ impl DbClient for BigTableClientImpl { cell::Cell { family: family.to_owned(), qualifier: "expiry".to_owned(), - value: ttl - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() - .to_be_bytes() - .to_vec(), + value: expiry.to_be_bytes().to_vec(), timestamp: ttl, ..Default::default() }, @@ -1166,15 +1157,29 @@ impl DbClient for BigTableClientImpl { let mut req = ReadRowsRequest::default(); req.set_table_name(self.settings.table_name.clone()); req.set_filter({ - let mut regex_filter = data::RowFilter::default(); + let mut filter = data::RowFilter::default(); + + let mut chain = data::RowFilter_Chain::default(); + let mut filter_chain = RepeatedField::default(); + + let mut row_filter = data::RowFilter::default(); // channels for a given UAID all begin with `{uaid}#` // this will fetch all messages for all channels and all sort_keys - regex_filter.set_row_key_regex_filter( + row_filter.set_row_key_regex_filter( format!("^{}#[^#]+#01:.+", uaid.simple()) .as_bytes() .to_vec(), ); - regex_filter + + let time_filter = timestamp_filter()?; + // Filter by the keyed value first, then by the time. + filter_chain.push(row_filter); + filter_chain.push(time_filter); + + chain.set_filters(filter_chain); + filter.set_chain(chain); + + filter }); // Note set_rows_limit(v) limits the returned results // If you're doing additional filtering later, this is not what @@ -1210,6 +1215,12 @@ impl DbClient for BigTableClientImpl { // // let filter = { + let mut filter = data::RowFilter::default(); + + let mut chain = data::RowFilter_Chain::default(); + let mut filter_chain = RepeatedField::default(); + + let mut row_filter = data::RowFilter::default(); // Only look for channelids for the given UAID. // start by looking for rows that roughly match what we want // Note: BigTable provides a good deal of specialized filtering, but @@ -1218,7 +1229,7 @@ impl DbClient for BigTableClientImpl { // cells. There does not appear to be a way to chain this so that it only // looks for rows with ranged values within a given family or qualifier types // That must be done externally.) - let mut filter = data::RowFilter::default(); + // look for anything belonging to this UAID that is also a Standard Notification let pattern = format!( "^{}#[^#]+#{}:.*", @@ -1226,7 +1237,25 @@ impl DbClient for BigTableClientImpl { STANDARD_NOTIFICATION_PREFIX, ); trace!("🉑 regex filter {:?}", pattern); - filter.set_row_key_regex_filter(pattern.as_bytes().to_vec()); + row_filter.set_row_key_regex_filter(pattern.as_bytes().to_vec()); + filter_chain.push(row_filter); + + // Filter on our TTL. + let time_filter = timestamp_filter()?; + filter_chain.push(time_filter); + + /* + //NOTE: if you filter on a given field, BigTable will only + // return that specific field. Adding filters for the rest of + // the known elements may NOT return those elements or may + // cause the message to not be returned because any of + // those elements are not present. It may be preferable to + // therefore run two filters, one to fetch the candidate IDs + // and another to fetch the content of the messages. + */ + + chain.set_filters(filter_chain); + filter.set_chain(chain); filter }; req.set_filter(filter); @@ -1375,6 +1404,10 @@ mod tests { ..Default::default() }; + // purge the old user (if present) + // in case a prior test failed for whatever reason. + let _ = client.remove_user(&uaid).await; + // can we add the user? let user = client.add_user(&test_user).await; assert!(user.is_ok()); @@ -1441,10 +1474,8 @@ mod tests { sortkey_timestamp: Some(sort_key), ..Default::default() }; - assert!(client - .save_message(&uaid, test_notification.clone()) - .await - .is_ok()); + let res = client.save_message(&uaid, test_notification.clone()).await; + assert!(res.is_ok()); let mut fetched = client .fetch_timestamp_messages(&uaid, None, 999) diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 4e892e220..43691f11e 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -41,7 +41,7 @@ use crate::util::timing::{ms_since_epoch, sec_since_epoch}; use models::{NotificationHeaders, RangeKey}; const MAX_EXPIRY: u64 = 2_592_000; -pub const USER_RECORD_VERSION: u8 = 1; +pub const USER_RECORD_VERSION: u64 = 1; /// The maximum TTL for channels, 30 days pub const MAX_CHANNEL_TTL: u64 = 30 * 24 * 60 * 60; @@ -166,7 +166,7 @@ pub struct User { pub node_id: Option, /// Record version #[serde(skip_serializing_if = "Option::is_none")] - pub record_version: Option, + pub record_version: Option, /// LEGACY: Current month table in the database the user is on #[serde(skip_serializing_if = "Option::is_none")] pub current_month: Option,