From b581438832aa2bb96f78fc357260ba114bd6d255 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Wed, 24 Jan 2024 17:18:05 -0800 Subject: [PATCH] feat: don't store duplicated message columns - channel_id/topic/sortkey_timestamp (in the row key) - expiry (in the cell timestamps) (we can always add these back to messages later if they are worth having) - make add_user conditional, erroring out if the user already exists - utilize row_limit in message reads - enforce non null columns on deserialization Closes: SYNC-4070 Closes: SYNC-4099 Closes: SYNC-4106 --- .../src/db/bigtable/bigtable_client/merge.rs | 6 - .../src/db/bigtable/bigtable_client/mod.rs | 315 +++++++----------- .../src/db/bigtable/bigtable_client/row.rs | 10 +- autopush-common/src/db/error.rs | 3 + autopush-common/src/db/models.rs | 1 + 5 files changed, 140 insertions(+), 195 deletions(-) diff --git a/autopush-common/src/db/bigtable/bigtable_client/merge.rs b/autopush-common/src/db/bigtable/bigtable_client/merge.rs index 2f67e5f4d..af129c56c 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/merge.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/merge.rs @@ -376,7 +376,6 @@ impl RowMerger { /// Iterate through all the returned chunks and compile them into a hash of finished cells indexed by row_key pub async fn process_chunks( mut stream: ClientSStreamReceiver, - limit: Option, ) -> Result, BigTableError> { // Work object let mut merger = Self::default(); @@ -385,11 +384,6 @@ impl RowMerger { let mut rows = BTreeMap::::new(); while let (Some(row_resp_res), s) = stream.into_future().await { - if let Some(limit) = limit { - if limit > 0 && rows.len() > limit { - break; - } - } stream = s; let row = match row_resp_res { Ok(v) => v, diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index a791ed3c1..60db121b0 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -24,7 +24,7 @@ use uuid::Uuid; use crate::db::{ client::{DbClient, FetchMessageResponse}, error::{DbError, DbResult}, - DbSettings, Notification, User, MAX_CHANNEL_TTL, + DbSettings, Notification, NotificationRecord, User, MAX_CHANNEL_TTL, }; use self::row::Row; @@ -159,7 +159,7 @@ impl BigTableClientImpl { async fn read_row(&self, row_key: &str) -> Result, error::BigTableError> { debug!("🉑 Row key: {}", row_key); let req = self.read_row_request(row_key); - let mut rows = self.read_rows(req, None).await?; + let mut rows = self.read_rows(req).await?; Ok(rows.remove(row_key)) } @@ -223,14 +223,13 @@ impl BigTableClientImpl { async fn read_rows( &self, req: ReadRowsRequest, - limit: Option, ) -> Result, error::BigTableError> { let bigtable = self.pool.get().await?; let resp = bigtable .conn .read_rows(&req) .map_err(error::BigTableError::Read)?; - merge::RowMerger::process_chunks(resp, limit).await + merge::RowMerger::process_chunks(resp).await } /// write a given row. @@ -287,19 +286,29 @@ impl BigTableClientImpl { Ok(mutations) } - /// Check and write rows that match the associated filter, returning if the filter - /// matched records (and the update was successful) + /// Write mutations if the row meets a condition specified by the filter. + /// + /// Mutations can be applied either when the filter matches (state `true`) + /// or doesn't match (state `false`). + /// + /// Returns whether the filter matched records (which indicates whether the + /// mutations were applied, depending on the state) async fn check_and_mutate_row( &self, row: row::Row, filter: RowFilter, + state: bool, ) -> Result { let mut req = bigtable::CheckAndMutateRowRequest::default(); req.set_table_name(self.settings.table_name.clone()); req.set_row_key(row.row_key.into_bytes()); let mutations = self.get_mutations(row.cells)?; req.set_predicate_filter(filter); - req.set_true_mutations(mutations); + if state { + req.set_true_mutations(mutations); + } else { + req.set_false_mutations(mutations); + } // Do the actual commit. let bigtable = self.pool.get().await?; @@ -412,75 +421,44 @@ impl BigTableClientImpl { fn rows_to_notifications( &self, rows: BTreeMap, - limit: Option, - ) -> Result { - let mut messages: Vec = Vec::new(); - let mut max_timestamp: u64 = 0; - - for (_key, mut row) in rows { - if let Some(limit) = limit { - if messages.len() >= limit { - break; - } - } - // get the dominant family type for this row. - if let Some(cell) = row.take_cell("channel_id") { - let mut notif = Notification { - channel_id: Uuid::from_str(&to_string(cell.value, "channel_id")?).map_err( - |e| { - DbError::Serialization(format!( - "Could not deserialize chid to uuid: {:?}", - e - )) - }, - )?, - ..Default::default() - }; - if let Some(cell) = row.take_cell("version") { - notif.version = to_string(cell.value, "version")?; - } - if let Some(cell) = row.take_cell("topic") { - notif.topic = Some(to_string(cell.value, "topic")?); - } - - if let Some(cell) = row.take_cell("ttl") { - notif.ttl = to_u64(cell.value, "ttl")?; - } - - if let Some(cell) = row.take_cell("data") { - notif.data = Some(to_string(cell.value, "data")?); - } - if let Some(cell) = row.take_cell("sortkey_timestamp") { - let sk_ts = to_u64(cell.value, "sortkey_timestamp")?; - notif.sortkey_timestamp = Some(sk_ts); - if sk_ts > max_timestamp { - max_timestamp = sk_ts; - } - } - if let Some(cell) = row.take_cell("timestamp") { - notif.timestamp = to_u64(cell.value, "timestamp")?; - } - if let Some(cell) = row.take_cell("headers") { - notif.headers = Some( - serde_json::from_str::>(&to_string( - cell.value, "headers", - )?) - .map_err(|e| DbError::Serialization(e.to_string()))?, - ); - } - trace!("🚣 adding row: {:?}", ¬if); - messages.push(notif); - } + ) -> Result, DbError> { + rows.into_iter() + .map(|(row_key, row)| self.row_to_notification(&row_key, row)) + .collect() + } + + fn row_to_notification(&self, row_key: &str, mut row: Row) -> Result { + let Some((_, chidmessageid)) = row_key.split_once('#') else { + return Err(DbError::Integrity( + "rows_to_notification expected row_key: uaid:chidmessageid ".to_owned(), + )); + }; + let range_key = NotificationRecord::parse_chidmessageid(chidmessageid).map_err(|e| { + DbError::Integrity(format!("rows_to_notification expected chidmessageid: {e}")) + })?; + + let mut notif = Notification { + channel_id: range_key.channel_id, + topic: range_key.topic, + sortkey_timestamp: range_key.sortkey_timestamp, + version: to_string(row.take_required_cell("version")?.value, "version")?, + ttl: to_u64(row.take_required_cell("ttl")?.value, "ttl")?, + timestamp: to_u64(row.take_required_cell("timestamp")?.value, "timestamp")?, + ..Default::default() + }; + + if let Some(cell) = row.take_cell("data") { + notif.data = Some(to_string(cell.value, "data")?); + } + if let Some(cell) = row.take_cell("headers") { + notif.headers = Some( + serde_json::from_str::>(&to_string(cell.value, "headers")?) + .map_err(|e| DbError::Serialization(e.to_string()))?, + ); } - Ok(FetchMessageResponse { - messages, - timestamp: if max_timestamp > 0 { - Some(max_timestamp) - } else { - None - }, - }) + trace!("🚣 Deserialized message row: {:?}", ¬if); + Ok(notif) } fn user_to_row(&self, user: &User) -> Row { @@ -582,9 +560,17 @@ impl BigtableDb { impl DbClient for BigTableClientImpl { /// add user to the database async fn add_user(&self, user: &User) -> DbResult<()> { - let row = self.user_to_row(user); trace!("🉑 Adding user"); - self.write_row(row).await.map_err(|e| e.into()) + let row = self.user_to_row(user); + + // Only add when the user doesn't already exist + let mut filter = RowFilter::default(); + filter.set_row_key_regex_filter(format!("^{}$", row.row_key).into_bytes()); + + if self.check_and_mutate_row(row, filter, false).await? { + return Err(DbError::Conditional); + } + Ok(()) } /// BigTable doesn't really have the concept of an "update". You simply write the data and @@ -669,78 +655,45 @@ impl DbClient for BigTableClientImpl { cond_filter.set_condition(cond); // dbg!(&cond_filter); - Ok(self.check_and_mutate_row(row, cond_filter).await?) + Ok(self.check_and_mutate_row(row, cond_filter, true).await?) } async fn get_user(&self, uaid: &Uuid) -> DbResult> { let row_key = uaid.as_simple().to_string(); + let Some(mut row) = self.read_row(&row_key).await? else { + return Ok(None); + }; + + trace!("🉑 Found a record for {}", row_key); let mut result = User { uaid: *uaid, + connected_at: to_u64( + row.take_required_cell("connected_at")?.value, + "connected_at", + )?, + router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?, ..Default::default() }; - if let Some(mut record) = self.read_row(&row_key).await? { - trace!("🉑 Found a record for that user"); - if let Some(mut cells) = record.take_cells("connected_at") { - if let Some(cell) = cells.pop() { - result.connected_at = to_u64(cell.value, "connected_at")?; - } - } - - if let Some(mut cells) = record.take_cells("router_type") { - if let Some(cell) = cells.pop() { - result.router_type = String::from_utf8(cell.value).map_err(|e| { - DbError::Serialization(format!( - "Could not deserialize router_type: {:?}", - e - )) - })?; - } - } - - if let Some(mut cells) = record.take_cells("router_data") { - if let Some(cell) = cells.pop() { - result.router_data = from_str(&String::from_utf8(cell.value).map_err(|e| { - DbError::Serialization(format!( - "Could not deserialize router_type: {:?}", - e - )) - })?) - .map_err(|e| { - DbError::Serialization(format!( - "Could not deserialize router_type: {:?}", - e - )) - })?; - } - } - - if let Some(mut cells) = record.take_cells("node_id") { - if let Some(cell) = cells.pop() { - result.node_id = Some(String::from_utf8(cell.value).map_err(|e| { - DbError::Serialization(format!( - "Could not deserialize router_type: {:?}", - e - )) - })?); - } - } + if let Some(cell) = row.take_cell("router_data") { + result.router_data = from_str(&to_string(cell.value, "router_type")?).map_err(|e| { + DbError::Serialization(format!("Could not deserialize router_type: {e:?}")) + })?; + } - if let Some(mut cells) = record.take_cells("record_version") { - if let Some(cell) = cells.pop() { - result.record_version = Some(to_u64(cell.value, "record_version")?) - } - } + if let Some(cell) = row.take_cell("node_id") { + result.node_id = Some(to_string(cell.value, "node_id")?); + } - if let Some(mut cells) = record.take_cells("current_timestamp") { - if let Some(cell) = cells.pop() { - result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?) - } - } + if let Some(cell) = row.take_cell("record_version") { + result.record_version = Some(to_u64(cell.value, "record_version")?) + } - return Ok(Some(result)); + if let Some(cell) = row.take_cell("current_timestamp") { + result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?) } - Ok(None) + + Ok(Some(result)) } async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> { @@ -849,14 +802,14 @@ impl DbClient for BigTableClientImpl { filter.set_chain(filter_chain); req.set_filter(filter); - let mut rows = self.read_rows(req, None).await?; + let mut rows = self.read_rows(req).await?; let mut result = HashSet::new(); if let Some(record) = rows.remove(&row_key) { for mut cells in record.cells.into_values() { let Some(cell) = cells.pop() else { continue; }; - let Some(chid) = cell.qualifier.split("chid:").last() else { + let Some((_, chid)) = cell.qualifier.split_once("chid:") else { return Err(DbError::Integrity( "get_channels expected: chid:".to_owned(), )); @@ -925,23 +878,11 @@ impl DbClient for BigTableClientImpl { let mut cells: Vec = Vec::new(); - let family = if let Some(topic) = message.topic { - // Set the correct flag so we know how to read this row later. - cells.push(cell::Cell { - family: MESSAGE_TOPIC_FAMILY.to_owned(), - qualifier: "topic".to_owned(), - value: topic.into_bytes(), - timestamp: ttl, - ..Default::default() - }); + let family = if message.topic.is_some() { MESSAGE_TOPIC_FAMILY } else { MESSAGE_FAMILY }; - let expiry: u128 = ttl - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_millis(); cells.extend(vec![ cell::Cell { qualifier: "ttl".to_owned(), @@ -949,12 +890,6 @@ impl DbClient for BigTableClientImpl { timestamp: ttl, ..Default::default() }, - cell::Cell { - qualifier: "channel_id".to_owned(), - value: message.channel_id.as_hyphenated().to_string().into_bytes(), - timestamp: ttl, - ..Default::default() - }, cell::Cell { qualifier: "timestamp".to_owned(), value: message.timestamp.to_be_bytes().to_vec(), @@ -967,12 +902,6 @@ impl DbClient for BigTableClientImpl { timestamp: ttl, ..Default::default() }, - cell::Cell { - qualifier: "expiry".to_owned(), - value: expiry.to_be_bytes().to_vec(), - timestamp: ttl, - ..Default::default() - }, ]); if let Some(headers) = message.headers { if !headers.is_empty() { @@ -992,14 +921,6 @@ impl DbClient for BigTableClientImpl { ..Default::default() }); } - if let Some(sortkey_timestamp) = message.sortkey_timestamp { - cells.push(cell::Cell { - qualifier: "sortkey_timestamp".to_owned(), - value: sortkey_timestamp.to_be_bytes().to_vec(), - timestamp: ttl, - ..Default::default() - }); - } row.add_cells(family, cells); trace!("🉑 Adding row"); self.write_row(row).await.map_err(|e| e.into()) @@ -1086,22 +1007,26 @@ impl DbClient for BigTableClientImpl { req.set_rows(rows); req.set_filter(timestamp_filter()?); - // Note set_rows_limit(v) limits the returned results - // If you're doing additional filtering later, this is not what - // you want. - /* if limit > 0 { trace!("🉑 Setting limit to {limit}"); req.set_rows_limit(limit as i64); } - // */ - let rows = self.read_rows(req, Some(limit)).await?; + let rows = self.read_rows(req).await?; debug!( "🉑 Fetch Topic Messages. Found {} row(s) of {}", rows.len(), limit ); - self.rows_to_notifications(rows, if limit > 0 { Some(limit) } else { None }) + + let messages = self.rows_to_notifications(rows)?; + // Note: Bigtable always returns a timestamp of None here whereas + // DynamoDB returns the `current_timestamp` read from its meta + // record. Under Bigtable `current_timestamp` is instead initially read + // from [get_user] + Ok(FetchMessageResponse { + messages, + timestamp: None, + }) } /// Return `limit` messages pending for a UAID that have a sortkey_timestamp after @@ -1148,22 +1073,24 @@ impl DbClient for BigTableClientImpl { // and another to fetch the content of the messages. */ req.set_filter(timestamp_filter()?); - // Note set_rows_limit(v) limits the returned results from Bigtable. - // If you're doing additional filtering later, this may not be what - // you want and may artificially truncate possible return sets. - /* if limit > 0 { req.set_rows_limit(limit as i64); } - // */ - let rows = self.read_rows(req, Some(limit)).await?; + let rows = self.read_rows(req).await?; debug!( "🉑 Fetch Timestamp Messages ({:?}) Found {} row(s) of {}", timestamp, rows.len(), limit, ); - self.rows_to_notifications(rows, if limit > 0 { Some(limit) } else { None }) + + let messages = self.rows_to_notifications(rows)?; + // The timestamp of the last message read + let timestamp = messages.last().and_then(|m| m.sortkey_timestamp); + Ok(FetchMessageResponse { + messages, + timestamp, + }) } async fn health_check(&self) -> DbResult { @@ -1496,9 +1423,21 @@ mod tests { client.remove_user(&uaid).await } - // #[actix_rt::test] - // async fn sometest() {} + /* + // XXX: uncomment after the uaid clashing fix + #[actix_rt::test] + async fn add_user_existing() { + let client = new_client().unwrap(); + let uaid = Uuid::parse_str(TEST_USER).unwrap(); + let user = User { + uaid, + ..Default::default() + }; + client.remove_user(&uaid).await.unwrap(); - // #[test] - // fn sometest () {} + client.add_user(&user).await.unwrap(); + let err = client.add_user(&user).await.unwrap_err(); + assert!(matches!(err, DbError::Conditional)); + } + */ } diff --git a/autopush-common/src/db/bigtable/bigtable_client/row.rs b/autopush-common/src/db/bigtable/bigtable_client/row.rs index 3a0ac8315..b8781aa1c 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/row.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/row.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use crate::db::error::{DbError, DbResult}; + use super::{cell::Cell, RowKey}; /// A Bigtable storage row. Bigtable stores by Family ID which isn't @@ -28,12 +30,18 @@ impl Row { /// get only the "top" cell value. Ignore other values. pub fn take_cell(&mut self, column: &str) -> Option { - if let Some(mut cells) = self.cells.remove(column) { + if let Some(mut cells) = self.take_cells(column) { return cells.pop(); } None } + /// Like [take_cell] but returns an Error when no cell is present + pub fn take_required_cell(&mut self, column: &str) -> DbResult { + self.take_cell(column) + .ok_or_else(|| DbError::Integrity(format!("Expected column: {column}"))) + } + /// Add cells to a given column pub fn add_cells(&mut self, column: &str, cells: Vec) -> Option> { self.cells.insert(column.to_owned(), cells) diff --git a/autopush-common/src/db/error.rs b/autopush-common/src/db/error.rs index ce186e284..3dc5a6c4a 100644 --- a/autopush-common/src/db/error.rs +++ b/autopush-common/src/db/error.rs @@ -61,6 +61,9 @@ pub enum DbError { #[error("Connection failure {0}")] ConnectionError(String), + #[error("The conditional request failed")] + Conditional, + #[error("Database integrity error: {}", _0)] Integrity(String), diff --git a/autopush-common/src/db/models.rs b/autopush-common/src/db/models.rs index 3eada4979..0ea633572 100644 --- a/autopush-common/src/db/models.rs +++ b/autopush-common/src/db/models.rs @@ -45,6 +45,7 @@ impl From> for NotificationHeaders { } /// Contains some meta info regarding the message we're handling. +#[derive(Debug)] pub(crate) struct RangeKey { /// The channel_identifier pub(crate) channel_id: Uuid,