diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index e12015fa6..ffeaa140c 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -24,7 +24,7 @@ use uuid::Uuid; use crate::db::{ client::{DbClient, FetchMessageResponse}, error::{DbError, DbResult}, - DbSettings, Notification, User, + DbSettings, Notification, User, MAX_CHANNEL_TTL, }; use self::row::Row; @@ -137,6 +137,20 @@ impl BigTableClientImpl { }) } + /// Return a ReadRowsRequest for a given row key + fn read_row_request(&self, row_key: &str) -> bigtable::ReadRowsRequest { + let mut req = bigtable::ReadRowsRequest::default(); + req.set_table_name(self.settings.table_name.clone()); + + let mut row_keys = RepeatedField::default(); + row_keys.push(row_key.as_bytes().to_vec()); + let mut row_set = data::RowSet::default(); + row_set.set_row_keys(row_keys); + req.set_rows(row_set); + + req + } + /// Read a given row from the row key. async fn read_row( &self, @@ -144,19 +158,63 @@ impl BigTableClientImpl { timestamp_filter: Option, ) -> Result, error::BigTableError> { debug!("🉑 Row key: {}", row_key); + let req = self.read_row_request(row_key); + let mut rows = self.read_rows(req, timestamp_filter, None).await?; + Ok(rows.remove(row_key)) + } - let mut row_keys = RepeatedField::default(); - row_keys.push(row_key.as_bytes().to_vec()); - - let mut row_set = data::RowSet::default(); - row_set.set_row_keys(row_keys); + /// Perform a MutateRowsRequest + async fn mutate_rows( + &self, + req: bigtable::MutateRowsRequest, + ) -> Result<(), error::BigTableError> { + let bigtable = self.pool.get().await?; + // ClientSStreamReceiver will cancel an operation if it's dropped before it's done. + let resp = bigtable + .conn + .mutate_rows(&req) + .map_err(error::BigTableError::Write)?; - let mut req = bigtable::ReadRowsRequest::default(); - req.set_table_name(self.settings.table_name.clone()); - req.set_rows(row_set); + // Scan the returned stream looking for errors. + // As I understand, the returned stream contains chunked MutateRowsResponse structs. Each + // struct contains the result of the row mutation, and contains a `status` (non-zero on error) + // and an optional message string (empty if none). + // The structure also contains an overall `status` but that does not appear to be exposed. + // Status codes are defined at https://grpc.github.io/grpc/core/md_doc_statuscodes.html + let mut stream = Box::pin(resp); + let mut cnt = 0; + loop { + let (result, remainder) = stream.into_future().await; + if let Some(result) = result { + debug!("🎏 Result block: {}", cnt); + match result { + Ok(r) => { + for e in r.get_entries() { + if e.has_status() { + let status = e.get_status(); + // See status code definitions: https://grpc.github.io/grpc/core/md_doc_statuscodes.html + let code = error::MutateRowStatus::from(status.get_code()); + if !code.is_ok() { + return Err(error::BigTableError::Status( + code, + status.get_message().to_owned(), + )); + } + debug!("🎏 Response: {} OK", e.index); + } + } + } + Err(e) => return Err(error::BigTableError::Write(e)), + }; + cnt += 1; + } else { + debug!("🎏 Done!"); + break; + } + stream = remainder; + } - let mut rows = self.read_rows(req, timestamp_filter, None).await?; - Ok(rows.remove(row_key)) + Ok(()) } /// Take a big table ReadRowsRequest (containing the keys and filters) and return a set of row data indexed by row key. @@ -261,7 +319,7 @@ impl BigTableClientImpl { &self, row_key: &str, family: &str, - column_names: &Vec<&str>, + column_names: &[&str], time_range: Option<&data::TimestampRange>, ) -> Result<(), error::BigTableError> { let mut req = bigtable::MutateRowRequest::default(); @@ -622,13 +680,13 @@ impl DbClient for BigTableClientImpl { } async fn get_user(&self, uaid: &Uuid) -> DbResult> { - let key = uaid.as_simple().to_string(); + let row_key = uaid.as_simple().to_string(); let mut result = User { uaid: *uaid, ..Default::default() }; - if let Some(mut record) = self.read_row(&key, None).await? { + if let Some(mut record) = self.read_row(&row_key, None).await? { trace!("🉑 Found a record for that user"); if let Some(mut cells) = record.take_cells("connected_at") { if let Some(cell) = cells.pop() { @@ -699,24 +757,31 @@ impl DbClient for BigTableClientImpl { } async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> { - let key = format!("{}@{}", uaid.simple(), channel_id.as_hyphenated()); - - // We can use the default timestamp here because there shouldn't be a time - // based GC for router records. + let row_key = uaid.simple().to_string(); + // channel_ids are stored as a set within one Bigtable row + // + // Bigtable allows "millions of columns in a table, as long as no row + // exceeds the maximum limit of 256 MB per row" enabling the use of + // column qualifiers as data. + // + // The "set" of channel_ids consists of column qualifiers named + // "chid:" as set member entries (with their cell values + // being a single 0 byte). + // + // Storing the full set in a single row makes batch updates + // (particularly to reset the GC expiry timestamps) potentially more + // easy/efficient let mut row = Row { - row_key: key, + row_key, ..Default::default() }; - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map_err(|e| DbError::General(e.to_string()))? - .as_millis(); + let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_CHANNEL_TTL); row.cells.insert( ROUTER_FAMILY.to_owned(), vec![cell::Cell { family: ROUTER_FAMILY.to_owned(), - qualifier: "updated".to_owned(), - value: now.to_be_bytes().to_vec(), + qualifier: format!("chid:{}", channel_id.as_hyphenated()), + timestamp: expiry, ..Default::default() }], ); @@ -726,36 +791,34 @@ impl DbClient for BigTableClientImpl { /// Add channels in bulk (used mostly during migration) /// async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()> { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map_err(|e| DbError::General(e.to_string()))? + let row_key = uaid.simple().to_string(); + let expiry = (std::time::SystemTime::now() + Duration::from_secs(MAX_CHANNEL_TTL)) + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(error::BigTableError::WriteTime)? .as_millis(); + let mut entries = protobuf::RepeatedField::default(); let mut req = bigtable::MutateRowsRequest::default(); let mut limit: u32 = 0; req.set_table_name(self.settings.table_name.clone()); - // Create entries that define rows that contain mutations to hold the updated value which - // will create/update the channels. - for channel in channels { - let mut entry = bigtable::MutateRowsRequest_Entry::default(); - let key = format!("{}@{}", uaid.simple(), channel.as_hyphenated()); - entry.set_row_key(key.into_bytes()); + let mut entry = bigtable::MutateRowsRequest_Entry::default(); + entry.set_row_key(row_key.into_bytes()); + let mut cell_mutations = protobuf::RepeatedField::default(); - let mut cell_mutations = protobuf::RepeatedField::default(); + // Create entries that define rows that contain mutations to + // create/update the channels. + for channel in channels { let mut mutation = data::Mutation::default(); - let mut set_cell = data::Mutation_SetCell { + let set_cell = data::Mutation_SetCell { family_name: ROUTER_FAMILY.to_owned(), + column_qualifier: format!("chid:{}", channel.as_hyphenated()).into_bytes(), + timestamp_micros: (expiry * 1000) as i64, ..Default::default() }; - set_cell.set_column_qualifier("updated".as_bytes().to_vec()); - set_cell.set_value(now.to_be_bytes().to_vec()); - set_cell.set_timestamp_micros((now * 1000) as i64); mutation.set_set_cell(set_cell); cell_mutations.push(mutation); - entry.set_mutations(cell_mutations); - entries.push(entry); // There is a limit of 100,000 mutations per batch for bigtable. // https://cloud.google.com/bigtable/quotas // If you have 100,000 channels, you have too many. @@ -764,85 +827,48 @@ impl DbClient for BigTableClientImpl { break; } } + entry.set_mutations(cell_mutations); + entries.push(entry); req.set_entries(entries); - let bigtable = self.pool.get().await?; - - // ClientSStreamReceiver will cancel an operation if it's dropped before it's done. - let resp = bigtable - .conn - .mutate_rows(&req) - .map_err(error::BigTableError::Write)?; - - // Scan the returned stream looking for errors. - // As I understand, the returned stream contains chunked MutateRowsResponse structs. Each - // struct contains the result of the row mutation, and contains a `status` (non-zero on error) - // and an optional message string (empty if none). - // The structure also contains an overall `status` but that does not appear to be exposed. - // Status codes are defined at https://grpc.github.io/grpc/core/md_doc_statuscodes.html - let mut stream = Box::pin(resp); - let mut cnt = 0; - loop { - let (result, remainder) = stream.into_future().await; - if let Some(result) = result { - debug!("🎏 Result block: {}", cnt); - match result { - Ok(r) => { - for e in r.get_entries() { - if e.has_status() { - let status = e.get_status(); - // See status code definitions: https://grpc.github.io/grpc/core/md_doc_statuscodes.html - let code = error::MutateRowStatus::from(status.get_code()); - if !code.is_ok() { - return Err(error::BigTableError::Status( - code, - status.get_message().to_owned(), - ) - .into()); - } - debug!("🎏 Response: {} OK", e.index); - } - } - } - Err(e) => return Err(error::BigTableError::Write(e).into()), - }; - cnt += 1; - } else { - debug!("🎏 Done!"); - break; - } - stream = remainder; - } + self.mutate_rows(req).await?; Ok(()) } - /// Delete all the rows that start with the given prefix. NOTE: this may be metered and should - /// be used with caution. async fn get_channels(&self, uaid: &Uuid) -> DbResult> { - let mut result = HashSet::new(); + let row_key = uaid.simple().to_string(); + let mut req = self.read_row_request(&row_key); - let mut req = bigtable::ReadRowsRequest::default(); - req.set_table_name(self.settings.table_name.clone()); + let mut filter_set: RepeatedField = RepeatedField::default(); - let start_key = format!("{}@", uaid.simple()); - // '[' is the char after '@' - let end_key = format!("{}[", uaid.simple()); - let mut rows = data::RowSet::default(); - let mut row_range = data::RowRange::default(); - row_range.set_start_key_open(start_key.into_bytes()); - row_range.set_end_key_open(end_key.into_bytes()); - let mut row_ranges = RepeatedField::default(); - row_ranges.push(row_range); - rows.set_row_ranges(row_ranges); - req.set_rows(rows); + let mut family_filter = data::RowFilter::default(); + family_filter.set_family_name_regex_filter(format!("^{ROUTER_FAMILY}$")); - let mut strip_filter = data::RowFilter::default(); - strip_filter.set_strip_value_transformer(true); - req.set_filter(strip_filter); + let mut cq_filter = data::RowFilter::default(); + cq_filter.set_column_qualifier_regex_filter("^chid:.*$".as_bytes().to_vec()); + + filter_set.push(family_filter); + filter_set.push(cq_filter); + + let mut filter_chain = RowFilter_Chain::default(); + filter_chain.set_filters(filter_set); + + let mut filter = data::RowFilter::default(); + filter.set_chain(filter_chain); + req.set_filter(filter); - let rows = self.read_rows(req, None, None).await?; - for key in rows.keys().map(|v| v.to_owned()).collect::>() { - if let Some(chid) = key.split('@').last() { + let mut rows = self.read_rows(req, None, None).await?; + let mut result = HashSet::new(); + if let Some(record) = rows.remove(&row_key) { + for mut cells in record.cells.into_values() { + let Some(cell) = cells.pop() else { + continue; + }; + let Some(chid) = cell.qualifier.split("chid:").last() else { + return Err(DbError::Integrity( + "get_channels expected: chid:".to_owned(), + )); + }; result.insert(Uuid::from_str(chid).map_err(|e| DbError::General(e.to_string()))?); } } @@ -850,10 +876,15 @@ impl DbClient for BigTableClientImpl { Ok(result) } - /// Delete the channel and all associated pending messages. + /// Delete the channel. Does not delete its associated pending messages. async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult { - let row_key = format!("{}@{}", uaid.simple(), channel_id.as_hyphenated()); - Ok(self.delete_rows(&row_key).await?) + let row_key = uaid.simple().to_string(); + let column = format!("chid:{}", channel_id.as_hyphenated()); + self.delete_cells(&row_key, ROUTER_FAMILY, &[column.as_ref()], None) + .await?; + // XXX: Can we determine if the cq was actually removed (existed) from + // mutate_row's response? + Ok(true) } /// Remove the node_id. Can't really "surgically strike" this @@ -872,13 +903,8 @@ impl DbClient for BigTableClientImpl { let mut time_range = data::TimestampRange::default(); // convert connected at seconds into microseconds time_range.set_end_timestamp_micros((connected_at * 1000000) as i64); - self.delete_cells( - &row_key, - ROUTER_FAMILY, - &["node_id"].to_vec(), - Some(&time_range), - ) - .await?; + self.delete_cells(&row_key, ROUTER_FAMILY, &["node_id"], Some(&time_range)) + .await?; Ok(true) } diff --git a/autopush-common/src/db/error.rs b/autopush-common/src/db/error.rs index 3fc6f4e92..ce186e284 100644 --- a/autopush-common/src/db/error.rs +++ b/autopush-common/src/db/error.rs @@ -61,6 +61,9 @@ pub enum DbError { #[error("Connection failure {0}")] ConnectionError(String), + #[error("Database integrity error: {}", _0)] + Integrity(String), + #[error("Unknown Database Error {0}")] General(String), }