diff --git a/Cargo.lock b/Cargo.lock index fb25566e0..6546613fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -787,6 +787,7 @@ dependencies = [ "chrono", "config", "deadpool", + "derive_builder", "fernet", "form_urlencoded", "futures 0.3.30", @@ -1315,6 +1316,37 @@ dependencies = [ "serde", ] +[[package]] +name = "derive_builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.71", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" +dependencies = [ + "derive_builder_core", + "syn 2.0.71", +] + [[package]] name = "derive_more" version = "0.99.18" @@ -2534,9 +2566,9 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl" -version = "0.10.64" +version = "0.10.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" dependencies = [ "bitflags 2.6.0", "cfg-if", @@ -2566,9 +2598,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.102" +version = "0.9.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" +checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" dependencies = [ "cc", "libc", diff --git a/autoconnect/autoconnect-common/src/test_support.rs b/autoconnect/autoconnect-common/src/test_support.rs index d75a69575..760542de0 100644 --- a/autoconnect/autoconnect-common/src/test_support.rs +++ b/autoconnect/autoconnect-common/src/test_support.rs @@ -30,12 +30,12 @@ pub fn hello_db() -> MockDbClient { pub fn hello_again_db(uaid: Uuid) -> MockDbClient { let mut db = MockDbClient::new(); db.expect_get_user().times(1).return_once(move |_| { - Ok(Some(User { - uaid, - // Last connected 10 minutes ago - connected_at: ms_since_epoch() - (10 * 60 * 1000), - ..Default::default() - })) + let user = User::builder() + .uaid(uaid) + .connected_at(ms_since_epoch() - (10 * 60 * 1000)) + .build() + .unwrap(); + Ok(Some(user)) }); db.expect_update_user().times(1).return_once(|_| Ok(true)); diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs index 9fcc71a24..dfef1b2b2 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs @@ -154,11 +154,11 @@ impl UnidentifiedClient { // change from the previous state machine impl) } - let user = User { - node_id: Some(self.app_state.router_url.to_owned()), - connected_at, - ..Default::default() - }; + let user = User::builder() + .node_id(self.app_state.router_url.to_owned()) + .connected_at(connected_at) + .build() + .map_err(|e| SMErrorKind::Internal(format!("User::builder error: {e}")))?; Ok(GetOrCreateUser { user, existing_user: false, diff --git a/autoendpoint/src/routers/common.rs b/autoendpoint/src/routers/common.rs index 7c5a035c2..92eeebfba 100644 --- a/autoendpoint/src/routers/common.rs +++ b/autoendpoint/src/routers/common.rs @@ -228,14 +228,15 @@ pub mod tests { data: Option, router_type: RouterType, ) -> Notification { + let user = User::builder() + .router_data(router_data) + .router_type(router_type.to_string()) + .build() + .unwrap(); Notification { message_id: "test-message-id".to_string(), subscription: Subscription { - user: User { - router_data: Some(router_data), - router_type: router_type.to_string(), - ..Default::default() - }, + user, channel_id: channel_id(), vapid: None, }, diff --git a/autoendpoint/src/routes/registration.rs b/autoendpoint/src/routes/registration.rs index 905a82221..0fd97b770 100644 --- a/autoendpoint/src/routes/registration.rs +++ b/autoendpoint/src/routes/registration.rs @@ -35,11 +35,11 @@ pub async fn register_uaid_route( incr_metric("ua.command.register", &app_state.metrics, &request); // Register user and channel in database - let user = User { - router_type: path_args.router_type.to_string(), - router_data: Some(router_data), - ..Default::default() - }; + let user = User::builder() + .router_type(path_args.router_type.to_string()) + .router_data(router_data) + .build() + .map_err(|e| ApiErrorKind::General(format!("User::builder error: {e}")))?; let channel_id = router_data_input.channel_id.unwrap_or_else(Uuid::new_v4); trace!("🌍 Creating user with UAID {}", user.uaid); trace!("🌍 user = {:?}", user); diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index 8a0770cff..2164633cd 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -45,6 +45,7 @@ url.workspace = true again = "0.1" async-trait = "0.1" +derive_builder = "0.20" gethostname = "0.4" num_cpus = "1.16" woothee = "0.13" diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 33f7ecb2c..a19fad064 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt; use std::fmt::Display; @@ -28,7 +29,7 @@ use crate::db::{ }; pub use self::metadata::MetadataBuilder; -use self::row::Row; +use self::row::{Row, RowCells}; use super::pool::BigTablePool; use super::BigTableDbSettings; @@ -201,6 +202,46 @@ fn to_string(value: Vec, name: &str) -> Result { }) } +/// Parse the "set" (see [DbClient::add_channels]) of channel ids in a bigtable Row. +/// +/// Cells should solely contain the set of channels otherwise an Error is returned. +fn channels_from_cells(cells: &RowCells) -> DbResult> { + let mut result = HashSet::new(); + for cells in cells.values() { + let Some(cell) = cells.last() else { + continue; + }; + let Some((_, chid)) = cell.qualifier.split_once("chid:") else { + return Err(DbError::Integrity( + "get_channels expected: chid:".to_owned(), + None, + )); + }; + result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?); + } + Ok(result) +} + +/// Convert the [HashSet] of channel ids to cell entries for a bigtable Row +fn channels_to_cells(channels: Cow>, expiry: SystemTime) -> Vec { + let channels = channels.into_owned(); + let mut cells = Vec::with_capacity(channels.len().min(100_000)); + for (i, channel_id) in channels.into_iter().enumerate() { + // There is a limit of 100,000 mutations per batch for bigtable. + // https://cloud.google.com/bigtable/quotas + // If you have 100,000 channels, you have too many. + if i >= 100_000 { + break; + } + cells.push(cell::Cell { + qualifier: format!("chid:{}", channel_id.as_hyphenated()), + timestamp: expiry, + ..Default::default() + }); + } + cells +} + pub fn retry_policy(max: usize) -> RetryPolicy { RetryPolicy::default() .with_max_retries(max) @@ -281,7 +322,7 @@ pub fn retryable_error(metrics: Arc) -> impl Fn(&grpcio::Error) -> /// 2) When router TTLs are eventually enabled: `add_channel` and /// `increment_storage` can write cells with later expiry times than the other /// router cells -fn is_incomplete_router_record(cells: &HashMap>) -> bool { +fn is_incomplete_router_record(cells: &RowCells) -> bool { cells .keys() .all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:")) @@ -770,6 +811,11 @@ impl BigTableClientImpl { }); }; + cells.extend(channels_to_cells( + Cow::Borrowed(&user.priv_channels), + expiry, + )); + row.add_cells(ROUTER_FAMILY, cells); row } @@ -942,6 +988,9 @@ impl DbClient for BigTableClientImpl { result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?) } + // Read the channels last, after removal of all non channel cells + result.priv_channels = channels_from_cells(&row.cells)?; + Ok(Some(result)) } @@ -976,24 +1025,13 @@ impl DbClient for BigTableClientImpl { let mut row = Row::new(row_key); let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_CHANNEL_TTL); - let mut cells = Vec::with_capacity(channels.len().min(100_000)); - for (i, channel_id) in channels.into_iter().enumerate() { - // There is a limit of 100,000 mutations per batch for bigtable. - // https://cloud.google.com/bigtable/quotas - // If you have 100,000 channels, you have too many. - if i >= 100_000 { - break; - } - cells.push(cell::Cell { - qualifier: format!("chid:{}", channel_id.as_hyphenated()), - timestamp: expiry, - ..Default::default() - }); - } // Note: updating the version column isn't necessary here because this // write only adds a new (or updates an existing) column with a 0 byte // value - row.add_cells(ROUTER_FAMILY, cells); + row.add_cells( + ROUTER_FAMILY, + channels_to_cells(Cow::Owned(channels), expiry), + ); self.write_row(row).await?; Ok(()) @@ -1011,23 +1049,10 @@ impl DbClient for BigTableClientImpl { cq_filter, ])); - let mut result = HashSet::new(); - if let Some(record) = self.read_row(req).await? { - for mut cells in record.cells.into_values() { - let Some(cell) = cells.pop() else { - continue; - }; - let Some((_, chid)) = cell.qualifier.split_once("chid:") else { - return Err(DbError::Integrity( - "get_channels expected: chid:".to_owned(), - None, - )); - }; - result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?); - } - } - - Ok(result) + let Some(row) = self.read_row(req).await? else { + return Ok(Default::default()); + }; + channels_from_cells(&row.cells) } /// Delete the channel. Does not delete its associated pending messages. @@ -1769,4 +1794,79 @@ mod tests { client.remove_user(&uaid).await.unwrap(); } + + #[actix_rt::test] + async fn channel_and_current_timestamp_ttl_updates() { + let client = new_client().unwrap(); + let uaid = gen_test_uaid(); + let chid = Uuid::parse_str(TEST_CHID).unwrap(); + client.remove_user(&uaid).await.unwrap(); + + // Setup a user with some channels and a current_timestamp + let user = User { + uaid, + ..Default::default() + }; + client.add_user(&user).await.unwrap(); + + client.add_channel(&uaid, &chid).await.unwrap(); + client + .add_channel(&uaid, &uuid::Uuid::new_v4()) + .await + .unwrap(); + + client + .increment_storage( + &uaid, + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + ) + .await + .unwrap(); + + let req = client.read_row_request(&uaid.as_simple().to_string()); + let Some(mut row) = client.read_row(req).await.unwrap() else { + panic!("Expected row"); + }; + + // Ensure the initial expiry (timestamp) of all the cells in the row + let expiry = row.take_required_cell("connected_at").unwrap().timestamp; + for mut cells in row.cells.into_values() { + let Some(cell) = cells.pop() else { + continue; + }; + assert!( + cell.timestamp >= expiry, + "{} cell timestamp should >= connected_at's", + cell.qualifier + ); + } + + let mut user = client.get_user(&uaid).await.unwrap().unwrap(); + client.update_user(&mut user).await.unwrap(); + + // Ensure update_user updated the expiry (timestamp) of every cell in the row + let req = client.read_row_request(&uaid.as_simple().to_string()); + let Some(mut row) = client.read_row(req).await.unwrap() else { + panic!("Expected row"); + }; + + let expiry2 = row.take_required_cell("connected_at").unwrap().timestamp; + assert!(expiry2 > expiry); + + for mut cells in row.cells.into_values() { + let Some(cell) = cells.pop() else { + continue; + }; + assert_eq!( + cell.timestamp, expiry2, + "{} cell timestamp should match connected_at's", + cell.qualifier + ); + } + + client.remove_user(&uaid).await.unwrap(); + } } diff --git a/autopush-common/src/db/bigtable/bigtable_client/row.rs b/autopush-common/src/db/bigtable/bigtable_client/row.rs index 2ace5d8c8..2f5bbd417 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/row.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/row.rs @@ -4,6 +4,8 @@ use crate::db::error::{DbError, DbResult}; use super::{cell::Cell, RowKey}; +pub type RowCells = HashMap>; + /// A Bigtable storage row. Bigtable stores by Family ID which isn't /// very useful for us later, so we overload this structure a bit. /// When we read data back out of Bigtable, we index cells by @@ -19,7 +21,7 @@ pub struct Row { pub row_key: RowKey, /// The row's collection of cells, indexed by either the /// FamilyID (for write) or Qualifier (for read). - pub cells: HashMap>, + pub cells: RowCells, } impl Row { diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 4624885aa..4ed9e5a64 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -12,6 +12,7 @@ use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::result::Result as StdResult; +use derive_builder::Builder; use lazy_static::lazy_static; use regex::RegexSet; use serde::Serializer; @@ -135,7 +136,8 @@ pub struct CheckStorageResponse { } /// A user data record. -#[derive(Deserialize, PartialEq, Debug, Clone, Serialize)] +#[derive(Deserialize, PartialEq, Debug, Clone, Serialize, Builder)] +#[builder(default, setter(strip_option))] pub struct User { /// The UAID. This is generally a UUID4. It needs to be globally /// unique. @@ -161,6 +163,19 @@ pub struct User { /// UUID4 version number for optimistic locking of updates on Bigtable #[serde(skip_serializing)] pub version: Option, + /// Set of user's channel ids. These are stored in router (user) record's + /// row in Bigtable. They are read along with the rest of the user record + /// so that them, along with every other field in the router record, will + /// automatically have their TTL (cell timestamp) reset during + /// [DbClient::update_user]. + /// + /// This is solely used for the sake of that update thus private. + /// [DbClient::get_channels] is preferred for reading the latest version of + /// the channel ids (partly due to historical purposes but also is a more + /// flexible API that might benefit different, non Bigtable [DbClient] + /// backends that don't necessarily store the channel ids in the router + /// record). + priv_channels: HashSet, } impl Default for User { @@ -176,10 +191,18 @@ impl Default for User { record_version: Some(USER_RECORD_VERSION), current_timestamp: None, version: Some(Uuid::new_v4()), + priv_channels: HashSet::new(), } } } +impl User { + /// Return a new [UserBuilder] (generated from [derive_builder::Builder]) + pub fn builder() -> UserBuilder { + UserBuilder::default() + } +} + /// A stored Notification record. This is a notification that is to be stored /// until the User Agent reconnects. These are then converted to publishable /// [crate::db::Notification] records. @@ -328,3 +351,16 @@ impl NotificationRecord { } } } + +#[cfg(test)] +mod tests { + use super::{User, USER_RECORD_VERSION}; + + #[test] + fn user_defaults() { + let user = User::builder().current_timestamp(22).build().unwrap(); + assert_eq!(user.current_timestamp, Some(22)); + assert_eq!(user.router_type, "webpush".to_owned()); + assert_eq!(user.record_version, Some(USER_RECORD_VERSION)); + } +}