diff --git a/autopush-common/src/db/bigtable/bigtable_client/error.rs b/autopush-common/src/db/bigtable/bigtable_client/error.rs index d7cb847ef..48f0689b6 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/error.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/error.rs @@ -115,6 +115,9 @@ pub enum BigTableError { /// General Pool builder errors. #[error("Pool Error: {0}")] Pool(String), + + #[error("BigTable config error: {0}")] + Config(String), } impl ReportableError for BigTableError { @@ -144,6 +147,7 @@ impl ReportableError for BigTableError { BigTableError::Recycle => "storage.bigtable.error.recycle", BigTableError::Pool(_) => "storage.bigtable.error.pool", BigTableError::GRPC(_) => "storage.bigtable.error.grpc", + BigTableError::Config(_) => "storage.bigtable.error.config", }; Some(err) } diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index ae5b33295..580ef8d9e 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -11,7 +11,7 @@ use cadence::{CountedExt, StatsdClient}; use futures_util::StreamExt; use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest; use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient; -use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest; +use google_cloud_rust_raw::bigtable::v2::bigtable::{PingAndWarmRequest, ReadRowsRequest}; use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient; use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain}; use google_cloud_rust_raw::bigtable::v2::{bigtable, data}; @@ -761,49 +761,47 @@ impl BigTableClientImpl { #[derive(Clone)] pub struct BigtableDb { pub(super) conn: BigtableClient, - pub(super) metadata: Metadata, + pub(super) health_metadata: Metadata, + instance_name: String, } impl BigtableDb { - pub fn new(channel: Channel, metadata: &Metadata) -> Self { + pub fn new(channel: Channel, health_metadata: &Metadata, instance_name: &str) -> Self { Self { conn: BigtableClient::new(channel), - metadata: metadata.clone(), + health_metadata: health_metadata.clone(), + instance_name: instance_name.to_owned(), } } - /// Perform a simple connectivity check. This should return no actual results /// but should verify that the connection is valid. We use this for the /// Recycle check as well, so it has to be fairly low in the implementation /// stack. /// - pub async fn health_check( - &mut self, - table_name: &str, - metrics: Arc, - ) -> DbResult { - // Create a request that is GRPC valid, but does not point to a valid row. - let mut req = read_row_request(table_name, "NOT FOUND"); - let mut filter = data::RowFilter::default(); - filter.set_block_all_filter(true); - req.set_filter(filter); - - let r = retry_policy(RETRY_COUNT) + /// "instance_name" is the "projects/{project}/instances/{instance}" portion of + /// the tablename. + /// + pub async fn health_check(&mut self, metrics: Arc) -> DbResult { + let req = PingAndWarmRequest { + name: self.instance_name.clone(), + ..Default::default() + }; + // PingAndWarmResponse does not implement a stream since it does not return data. + let _r = retry_policy(RETRY_COUNT) .retry_if( || async { self.conn - .read_rows_opt(&req, call_opts(self.metadata.clone())) + .ping_and_warm_opt(&req, call_opts(self.health_metadata.clone())) }, retryable_error(metrics.clone()), ) .await .map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?; - let (v, _stream) = r.into_future().await; // Since this should return no rows (with the row key set to a value that shouldn't exist) // the first component of the tuple should be None. debug!("🉑 health check"); - Ok(v.is_none()) + Ok(true) } } @@ -1321,7 +1319,7 @@ impl DbClient for BigTableClientImpl { self.pool .get() .await? - .health_check(&self.settings.table_name, self.metrics.clone()) + .health_check(self.metrics.clone()) .await } diff --git a/autopush-common/src/db/bigtable/mod.rs b/autopush-common/src/db/bigtable/mod.rs index 7a306d0a7..6754bd6c7 100644 --- a/autopush-common/src/db/bigtable/mod.rs +++ b/autopush-common/src/db/bigtable/mod.rs @@ -84,6 +84,28 @@ pub struct BigTableDbSettings { pub retry_count: usize, } +// Used by test, but we don't want available for release. +#[allow(clippy::derivable_impls)] +#[cfg(test)] +impl Default for BigTableDbSettings { + fn default() -> Self { + Self { + table_name: Default::default(), + router_family: Default::default(), + message_family: Default::default(), + message_topic_family: Default::default(), + database_pool_max_size: Default::default(), + database_pool_create_timeout: Default::default(), + database_pool_wait_timeout: Default::default(), + database_pool_recycle_timeout: Default::default(), + database_pool_connection_ttl: Default::default(), + database_pool_max_idle: Default::default(), + route_to_leader: Default::default(), + retry_count: Default::default(), + } + } +} + impl BigTableDbSettings { pub fn metadata(&self) -> Result { MetadataBuilder::with_prefix(&self.table_name) @@ -93,13 +115,20 @@ impl BigTableDbSettings { .map_err(BigTableError::GRPC) } + pub fn health_metadata(&self) -> Result { + MetadataBuilder::with_prefix(&self.table_name) + .routing_param("name", &self.get_instance_name()?) + .route_to_leader(self.route_to_leader) + .build() + .map_err(BigTableError::GRPC) + } + pub fn admin_metadata(&self) -> Result { // Admin calls use a slightly different routing param and a truncated prefix // See https://github.com/googleapis/google-cloud-cpp/issues/190#issuecomment-370520185 let Some(admin_prefix) = self.table_name.split_once("/tables/").map(|v| v.0) else { - return Err(BigTableError::Admin( + return Err(BigTableError::Config( "Invalid table name specified".to_owned(), - None, )); }; MetadataBuilder::with_prefix(admin_prefix) @@ -108,6 +137,16 @@ impl BigTableDbSettings { .build() .map_err(BigTableError::GRPC) } + + pub fn get_instance_name(&self) -> Result { + let parts: Vec<&str> = self.table_name.split('/').collect(); + if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" { + return Err(BigTableError::Config( + "Invalid table name specified. Cannot parse instance".to_owned(), + )); + } + Ok(parts[0..4].join("/")) + } } impl TryFrom<&str> for BigTableDbSettings { @@ -138,4 +177,33 @@ mod tests { ); Ok(()) } + #[test] + fn test_get_instance() -> Result<(), super::BigTableError> { + let settings = super::BigTableDbSettings { + table_name: "projects/foo/instances/bar/tables/gorp".to_owned(), + ..Default::default() + }; + let res = settings.get_instance_name()?; + assert_eq!(res.as_str(), "projects/foo/instances/bar"); + + let settings = super::BigTableDbSettings { + table_name: "projects/foo/".to_owned(), + ..Default::default() + }; + assert!(settings.get_instance_name().is_err()); + + let settings = super::BigTableDbSettings { + table_name: "protect/foo/instances/bar/tables/gorp".to_owned(), + ..Default::default() + }; + assert!(settings.get_instance_name().is_err()); + + let settings = super::BigTableDbSettings { + table_name: "project/foo/instance/bar/tables/gorp".to_owned(), + ..Default::default() + }; + assert!(settings.get_instance_name().is_err()); + + Ok(()) + } } diff --git a/autopush-common/src/db/bigtable/pool.rs b/autopush-common/src/db/bigtable/pool.rs index ac916e754..18b63c72d 100644 --- a/autopush-common/src/db/bigtable/pool.rs +++ b/autopush-common/src/db/bigtable/pool.rs @@ -186,7 +186,11 @@ impl Manager for BigtableClientManager { /// `BigtableClient` is the most atomic we can go. async fn create(&self) -> Result { debug!("🏊 Create a new pool entry."); - let entry = BigtableDb::new(self.get_channel()?, &self.settings.metadata()?); + let entry = BigtableDb::new( + self.get_channel()?, + &self.settings.health_metadata()?, + &self.settings.get_instance_name()?, + ); debug!("🏊 Bigtable connection acquired"); Ok(entry) } @@ -216,7 +220,7 @@ impl Manager for BigtableClientManager { // note, this changes to `blocks_in_conditions` for 1.76+ #[allow(clippy::blocks_in_conditions)] if !client - .health_check(&self.settings.table_name, self.metrics.clone()) + .health_check(self.metrics.clone()) .await .map_err(|e| { debug!("🏊 Recycle requested (health). {:?}", e);