diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index ae5b33295..79932b01b 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -11,7 +11,7 @@ use cadence::{CountedExt, StatsdClient}; use futures_util::StreamExt; use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest; use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient; -use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest; +use google_cloud_rust_raw::bigtable::v2::bigtable::{PingAndWarmRequest, ReadRowsRequest}; use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient; use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain}; use google_cloud_rust_raw::bigtable::v2::{bigtable, data}; @@ -762,48 +762,46 @@ impl BigTableClientImpl { pub struct BigtableDb { pub(super) conn: BigtableClient, pub(super) metadata: Metadata, + instance_name: String, } impl BigtableDb { - pub fn new(channel: Channel, metadata: &Metadata) -> Self { + pub fn new(channel: Channel, metadata: &Metadata, instance_name: &str) -> Self { Self { conn: BigtableClient::new(channel), metadata: metadata.clone(), + instance_name: instance_name.to_owned(), } } - /// Perform a simple connectivity check. This should return no actual results /// but should verify that the connection is valid. We use this for the /// Recycle check as well, so it has to be fairly low in the implementation /// stack. /// - pub async fn health_check( - &mut self, - table_name: &str, - metrics: Arc, - ) -> DbResult { - // Create a request that is GRPC valid, but does not point to a valid row. - let mut req = read_row_request(table_name, "NOT FOUND"); - let mut filter = data::RowFilter::default(); - filter.set_block_all_filter(true); - req.set_filter(filter); - - let r = retry_policy(RETRY_COUNT) + /// "instance_name" is the "projects/{project}/instances/{instance}" portion of + /// the tablename. + /// + pub async fn health_check(&mut self, metrics: Arc) -> DbResult { + let req = PingAndWarmRequest { + name: self.instance_name.clone(), + ..Default::default() + }; + // PingAndWarmResponse does not implement a stream since it does not return data. + let _r = retry_policy(RETRY_COUNT) .retry_if( || async { self.conn - .read_rows_opt(&req, call_opts(self.metadata.clone())) + .ping_and_warm_opt(&req, call_opts(self.metadata.clone())) }, retryable_error(metrics.clone()), ) .await .map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?; - let (v, _stream) = r.into_future().await; // Since this should return no rows (with the row key set to a value that shouldn't exist) // the first component of the tuple should be None. debug!("🉑 health check"); - Ok(v.is_none()) + Ok(true) } } @@ -1321,7 +1319,7 @@ impl DbClient for BigTableClientImpl { self.pool .get() .await? - .health_check(&self.settings.table_name, self.metrics.clone()) + .health_check(self.metrics.clone()) .await } diff --git a/autopush-common/src/db/bigtable/pool.rs b/autopush-common/src/db/bigtable/pool.rs index ac916e754..0fcaacf60 100644 --- a/autopush-common/src/db/bigtable/pool.rs +++ b/autopush-common/src/db/bigtable/pool.rs @@ -169,6 +169,16 @@ impl BigtableClientManager { } } +fn get_instance_name(table_name: &str) -> Result { + let parts: Vec<&str> = table_name.split('/').collect(); + if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" { + return Err(DbError::General( + "Invalid table name specified. Cannot parse instance".to_owned(), + )); + } + return Ok(parts[0..4].join("/")); +} + impl fmt::Debug for BigtableClientManager { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("deadpool::BtClientManager") @@ -186,7 +196,11 @@ impl Manager for BigtableClientManager { /// `BigtableClient` is the most atomic we can go. async fn create(&self) -> Result { debug!("🏊 Create a new pool entry."); - let entry = BigtableDb::new(self.get_channel()?, &self.settings.metadata()?); + let entry = BigtableDb::new( + self.get_channel()?, + &self.settings.metadata()?, + &get_instance_name(&self.settings.table_name)?, + ); debug!("🏊 Bigtable connection acquired"); Ok(entry) } @@ -216,7 +230,7 @@ impl Manager for BigtableClientManager { // note, this changes to `blocks_in_conditions` for 1.76+ #[allow(clippy::blocks_in_conditions)] if !client - .health_check(&self.settings.table_name, self.metrics.clone()) + .health_check(self.metrics.clone()) .await .map_err(|e| { debug!("🏊 Recycle requested (health). {:?}", e); @@ -265,3 +279,15 @@ impl BigtableClientManager { Ok(chan) } } + +#[test] +fn test_get_instance() -> Result<(), DbError> { + let res = get_instance_name("projects/foo/instances/bar/tables/gorp")?; + assert_eq!(res.as_str(), "projects/foo/instances/bar"); + + assert!(get_instance_name("projects/foo/").is_err()); + assert!(get_instance_name("protect/foo/instances/bar/tables/gorp").is_err()); + assert!(get_instance_name("project/foo/instance/bar/tables/gorp").is_err()); + + Ok(()) +}