diff --git a/.circleci/config.yml b/.circleci/config.yml index c8ff7d7cc..d1a5a46ad 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -38,7 +38,7 @@ jobs: audit: docker: # NOTE: update version for all # RUST_VER - - image: rust:1.73 + - image: rust:1.74 auth: username: $DOCKER_USER password: $DOCKER_PASS diff --git a/Dockerfile b/Dockerfile index 470fe4b2f..63881427c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # NOTE: Ensure builder's Rust version matches CI's in .circleci/config.yml -FROM rust:1.73-buster as builder +FROM rust:1.74-buster as builder ARG CRATE ADD . /app diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs index edce8c2a3..03d582b3f 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/unidentified.rs @@ -130,6 +130,10 @@ impl UnidentifiedClient { if let Some(mut user) = self.app_state.db.get_user(&uaid).await? { if let Some(flags) = process_existing_user(&self.app_state, &user).await? { user.node_id = Some(self.app_state.router_url.to_owned()); + if user.connected_at > connected_at { + let _ = self.app_state.metrics.incr("ua.already_connected"); + return Err(SMErrorKind::AlreadyConnected.into()); + } user.connected_at = connected_at; user.set_last_connect(); if !self.app_state.db.update_user(&user).await? { diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 2670f5b47..10574f3ee 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -12,6 +12,9 @@ use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRan use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient; use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest; use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient; +use google_cloud_rust_raw::bigtable::v2::data::{ + RowFilter, RowFilter_Chain, RowFilter_Condition, ValueRange, +}; use google_cloud_rust_raw::bigtable::v2::{bigtable, data}; use grpcio::Channel; use protobuf::RepeatedField; @@ -186,10 +189,29 @@ impl BigTableClientImpl { // It's possible to do a lot here, including altering in process // mutations, clearing them, etc. It's all up for grabs until we commit // below. For now, let's just presume a write and be done. - let mut mutations = protobuf::RepeatedField::default(); req.set_table_name(self.settings.table_name.clone()); req.set_row_key(row.row_key.into_bytes()); - for (_family, cells) in row.cells { + let mutations = self.get_mutations(row.cells)?; + req.set_mutations(mutations); + + // Do the actual commit. + let bigtable = self.pool.get().await?; + let _resp = bigtable + .conn + .mutate_row_async(&req) + .map_err(error::BigTableError::Write)? + .await + .map_err(error::BigTableError::Write)?; + Ok(()) + } + + /// Compile the list of mutations for this row. + fn get_mutations( + &self, + cells: HashMap>, + ) -> Result, error::BigTableError> { + let mut mutations = protobuf::RepeatedField::default(); + for (_family, cells) in cells { for cell in cells { let mut mutation = data::Mutation::default(); let mut set_cell = data::Mutation_SetCell::default(); @@ -208,18 +230,33 @@ impl BigTableClientImpl { mutations.push(mutation); } } - req.set_mutations(mutations); + Ok(mutations) + } + + /// Check and write rows that match the associated filter, returning if the filter + /// matched records (and the update was successful) + async fn check_and_mutate_row( + &self, + row: row::Row, + filter: RowFilter, + ) -> Result { + let mut req = bigtable::CheckAndMutateRowRequest::default(); + req.set_table_name(self.settings.table_name.clone()); + req.set_row_key(row.row_key.into_bytes()); + let mutations = self.get_mutations(row.cells)?; + req.set_predicate_filter(filter); + req.set_true_mutations(mutations); // Do the actual commit. - // fails with `cannot execute `LocalPool` executor from within another executor: EnterError` let bigtable = self.pool.get().await?; - let _resp = bigtable + let resp = bigtable .conn - .mutate_row_async(&req) + .check_and_mutate_row_async(&req) .map_err(error::BigTableError::Write)? .await .map_err(error::BigTableError::Write)?; - Ok(()) + debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),); + Ok(resp.get_predicate_matched()) } /// Delete all cell data from the specified columns with the optional time range. @@ -391,43 +428,8 @@ impl BigTableClientImpl { }, }) } -} -#[derive(Clone)] -pub struct BigtableDb { - pub(super) conn: BigtableClient, -} - -impl BigtableDb { - pub fn new(channel: Channel) -> Self { - Self { - conn: BigtableClient::new(channel), - } - } - - /// Perform a simple connectivity check. - pub fn health_check(&mut self, table_name: &str) -> DbResult { - let mut req = bigtable::ReadRowsRequest::default(); - req.set_table_name(table_name.to_owned()); - let mut row = data::Row::default(); - row.set_key("NOT FOUND".to_owned().as_bytes().to_vec()); - let mut filter = data::RowFilter::default(); - filter.set_block_all_filter(true); - req.set_filter(filter); - - let _ = self - .conn - .read_rows(&req) - .map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?; - - Ok(true) - } -} - -#[async_trait] -impl DbClient for BigTableClientImpl { - /// add user to the database - async fn add_user(&self, user: &User) -> DbResult<()> { + fn user_to_row(&self, user: &User) -> Row { let mut row = Row { row_key: user.uaid.simple().to_string(), ..Default::default() @@ -497,6 +499,58 @@ impl DbClient for BigTableClientImpl { }); }; row.add_cells(ROUTER_FAMILY, cells); + row + } +} + +#[derive(Clone)] +pub struct BigtableDb { + pub(super) conn: BigtableClient, +} + +impl BigtableDb { + pub fn new(channel: Channel) -> Self { + Self { + conn: BigtableClient::new(channel), + } + } + + /// Perform a simple connectivity check. This should return no actual results + /// but should verify that the connection is valid. We use this for the + /// Recycle check as well, so it has to be fairly low in the implementation + /// stack. + /// + pub async fn health_check(&mut self, table_name: &str) -> DbResult { + // build the associated request. + let mut req = bigtable::ReadRowsRequest::default(); + req.set_table_name(table_name.to_owned()); + // Create a request that is GRPC valid, but does not point to a valid row. + let mut row_keys = RepeatedField::default(); + row_keys.push("NOT FOUND".to_owned().as_bytes().to_vec()); + let mut row_set = data::RowSet::default(); + row_set.set_row_keys(row_keys); + req.set_rows(row_set); + let mut filter = data::RowFilter::default(); + filter.set_block_all_filter(true); + req.set_filter(filter); + + let r = self + .conn + .read_rows(&req) + .map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?; + + let (v, _stream) = r.into_future().await; + // Since this should return no rows (with the row key set to a value that shouldn't exist) + // the first component of the tuple should be None. + Ok(v.is_none()) + } +} + +#[async_trait] +impl DbClient for BigTableClientImpl { + /// add user to the database + async fn add_user(&self, user: &User) -> DbResult<()> { + let row = self.user_to_row(user); trace!("🉑 Adding user"); self.write_row(row).await.map_err(|e| e.into()) } @@ -505,9 +559,85 @@ impl DbClient for BigTableClientImpl { /// the individual cells create a new version. Depending on the garbage collection rules for /// the family, these can either persist or be automatically deleted. async fn update_user(&self, user: &User) -> DbResult { - self.add_user(user).await?; - // TODO: is a conditional check possible? - Ok(true) + // `update_user` supposes the following logic: + // check to see if the user is in the router table AND + // (either does not have a router_type or the router type is the same as what they're currently using) AND + // (either there's no node_id assigned or the `connected_at` is earlier than the current connected_at) + // Bigtable can't really do a few of those (e.g. detect non-existing values) so we have to simplify + // things down a bit. We presume the record exists and that the `router_type` was specified when the + // record was created. + // + // Filters are not very sophisticated on BigTable. + // You can create RowFilterChains, where each filter acts as an "AND" or + // a RowFilterUnion which acts as an "OR". + // + // There do not appear to be negative checks (e.g. check if not set) + // According to [the docs](https://cloud.google.com/bigtable/docs/using-filters#chain) + // ConditionalRowFilter is not atomic and changes can occur between predicate + // and execution. + // + // We then use a ChainFilter (essentially an AND operation) to make sure that the router_type + // matches and the new `connected_at` time is later than the existing `connected_at` + + let row = self.user_to_row(user); + + // === Router Filter Chain. + let mut router_filter_chain = RowFilter_Chain::default(); + let mut filter_set: RepeatedField = RepeatedField::default(); + // First check to see if the router type is either empty or matches exactly. + // Yes, these are multiple filters. Each filter is basically an AND + let mut filter = RowFilter::default(); + filter.set_family_name_regex_filter(ROUTER_FAMILY.to_owned()); + filter_set.push(filter); + + let mut filter = RowFilter::default(); + filter.set_column_qualifier_regex_filter("router_type".to_owned().as_bytes().to_vec()); + filter_set.push(filter); + + let mut filter = RowFilter::default(); + filter.set_value_regex_filter(user.router_type.as_bytes().to_vec()); + filter_set.push(filter); + + router_filter_chain.set_filters(filter_set); + let mut router_filter = RowFilter::default(); + router_filter.set_chain(router_filter_chain); + + // === Connected_At filter chain + let mut connected_filter_chain = RowFilter_Chain::default(); + let mut filter_set: RepeatedField = RepeatedField::default(); + + // then check to make sure that the last connected_at time is before this one. + // Note: `check_and_mutate_row` uses `set_true_mutations`, meaning that only rows + // that match the provided filters will be modified. + let mut filter = RowFilter::default(); + filter.set_family_name_regex_filter(ROUTER_FAMILY.to_owned()); + filter_set.push(filter); + + let mut filter = RowFilter::default(); + filter.set_column_qualifier_regex_filter("connected_at".to_owned().as_bytes().to_vec()); + filter_set.push(filter); + + let mut filter = RowFilter::default(); + let mut val_range = ValueRange::default(); + val_range.set_start_value_open(0_u64.to_be_bytes().to_vec()); + val_range.set_end_value_open(user.connected_at.to_be_bytes().to_vec()); + filter.set_value_range_filter(val_range); + filter_set.push(filter); + + connected_filter_chain.set_filters(filter_set); + let mut connected_filter = RowFilter::default(); + connected_filter.set_chain(connected_filter_chain); + + // Gather the collections and try to update the row. + + let mut cond = RowFilter_Condition::default(); + cond.set_predicate_filter(router_filter); + cond.set_true_filter(connected_filter); + let mut cond_filter = RowFilter::default(); + cond_filter.set_condition(cond); + // dbg!(&cond_filter); + + Ok(self.check_and_mutate_row(row, cond_filter).await?) } async fn get_user(&self, uaid: &Uuid) -> DbResult> { @@ -602,7 +732,6 @@ impl DbClient for BigTableClientImpl { } } - //TODO: rename this to `last_notification_timestamp` if let Some(mut cells) = record.take_cells("current_timestamp") { if let Some(cell) = cells.pop() { let v: [u8; 8] = cell.value.try_into().map_err(|e| { @@ -677,6 +806,7 @@ impl DbClient for BigTableClientImpl { }; set_cell.set_column_qualifier("updated".to_owned().into_bytes().to_vec()); set_cell.set_value(now.to_be_bytes().to_vec()); + set_cell.set_timestamp_micros((now * 1000) as i64); mutation.set_set_cell(set_cell); cell_mutations.push(mutation); @@ -814,7 +944,6 @@ impl DbClient for BigTableClientImpl { Some(&time_range), ) .await?; - // TODO: is a conditional check possible? Ok(true) } @@ -1125,6 +1254,7 @@ impl DbClient for BigTableClientImpl { .get() .await? .health_check(&self.settings.table_name) + .await } /// Returns true, because there's only one table in BigTable. We divide things up @@ -1209,6 +1339,15 @@ mod tests { assert_eq!(k, "deadbeef0000000000000123456789ab#decafbad0000000000000123456789ab#01:decafbad-0000-0000-0000-0123456789ab:Inbox"); } + #[actix_rt::test] + async fn health_check() { + let client = new_client().unwrap(); + + let result = client.health_check().await; + assert!(result.is_ok()); + assert!(result.unwrap()); + } + /// run a gauntlet of testing. These are a bit linear because they need /// to run in sequence. #[actix_rt::test] @@ -1224,6 +1363,9 @@ mod tests { let chid = Uuid::parse_str(TEST_CHID).unwrap(); let node_id = "test_node".to_owned(); + // purge the user record if it exists. + let _ = client.remove_user(&uaid).await; + let test_user = User { uaid, router_type: "webpush".to_owned(), @@ -1239,7 +1381,8 @@ mod tests { assert!(user.is_ok()); let fetched = client.get_user(&uaid).await.unwrap(); assert!(fetched.is_some()); - assert_eq!(fetched.unwrap().router_type, "webpush".to_owned()); + let fetched = fetched.unwrap(); + assert_eq!(fetched.router_type, "webpush".to_owned()); // can we add channels? client.add_channel(&uaid, &chid).await.unwrap(); @@ -1259,12 +1402,28 @@ mod tests { let channels = client.get_channels(&uaid).await.unwrap(); assert_eq!(channels, new_channels); - // can we modify the user record? + // now ensure that we can update a user that's after the time we set prior. + // first ensure that we can't update a user that's before the time we set prior. + let updated = User { + connected_at: fetched.connected_at - 300, + ..test_user.clone() + }; + let result = client.update_user(&updated).await; + assert!(result.is_ok()); + assert!(!result.unwrap()); + + // Make sure that the `connected_at` wasn't modified + let fetched2 = client.get_user(&fetched.uaid).await.unwrap().unwrap(); + assert_eq!(fetched.connected_at, fetched2.connected_at); + + // and make sure we can update a record with a later connected_at time. let updated = User { - connected_at: now() + 3, + connected_at: fetched.connected_at + 300, ..test_user }; - assert!(client.update_user(&updated).await.is_ok()); + let result = client.update_user(&updated).await; + assert!(result.is_ok()); + assert!(result.unwrap()); assert_ne!( test_user.connected_at, client.get_user(&uaid).await.unwrap().unwrap().connected_at diff --git a/autopush-common/src/db/bigtable/pool.rs b/autopush-common/src/db/bigtable/pool.rs index 5c5deef60..c6f528f79 100644 --- a/autopush-common/src/db/bigtable/pool.rs +++ b/autopush-common/src/db/bigtable/pool.rs @@ -179,11 +179,13 @@ impl Manager for BigtableClientManager { #[allow(clippy::blocks_in_if_conditions)] if !client .health_check(&self.settings.table_name) + .await .map_err(|e| { debug!("🏊 Recycle requested (health). {:?}", e); DbError::BTError(BigTableError::Recycle) })? { + debug!("🏊 Health check failed"); return Err(DbError::BTError(BigTableError::Recycle).into()); }