Skip to content

Commit

Permalink
feat: Swtich to lighter weight health check
Browse files Browse the repository at this point in the history
Closes SYNC-4197
  • Loading branch information
jrconlin committed Mar 15, 2024
1 parent 56658bf commit bed0f7f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 21 deletions.
36 changes: 17 additions & 19 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use cadence::{CountedExt, StatsdClient};
use futures_util::StreamExt;
use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
use google_cloud_rust_raw::bigtable::v2::bigtable::{PingAndWarmRequest, ReadRowsRequest};
use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
Expand Down Expand Up @@ -762,48 +762,46 @@ impl BigTableClientImpl {
pub struct BigtableDb {
pub(super) conn: BigtableClient,
pub(super) metadata: Metadata,
instance_name: String,
}

impl BigtableDb {
pub fn new(channel: Channel, metadata: &Metadata) -> Self {
pub fn new(channel: Channel, metadata: &Metadata, instance_name: &str) -> Self {
Self {
conn: BigtableClient::new(channel),
metadata: metadata.clone(),
instance_name: instance_name.to_owned(),
}
}

/// Perform a simple connectivity check. This should return no actual results
/// but should verify that the connection is valid. We use this for the
/// Recycle check as well, so it has to be fairly low in the implementation
/// stack.
///
pub async fn health_check(
&mut self,
table_name: &str,
metrics: Arc<StatsdClient>,
) -> DbResult<bool> {
// Create a request that is GRPC valid, but does not point to a valid row.
let mut req = read_row_request(table_name, "NOT FOUND");
let mut filter = data::RowFilter::default();
filter.set_block_all_filter(true);
req.set_filter(filter);

let r = retry_policy(RETRY_COUNT)
/// "instance_name" is the "projects/{project}/instances/{instance}" portion of
/// the tablename.
///
pub async fn health_check(&mut self, metrics: Arc<StatsdClient>) -> DbResult<bool> {
let req = PingAndWarmRequest {
name: self.instance_name.clone(),
..Default::default()
};
// PingAndWarmResponse does not implement a stream since it does not return data.
let _r = retry_policy(RETRY_COUNT)
.retry_if(
|| async {
self.conn
.read_rows_opt(&req, call_opts(self.metadata.clone()))
.ping_and_warm_opt(&req, call_opts(self.metadata.clone()))
},
retryable_error(metrics.clone()),
)
.await
.map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?;

let (v, _stream) = r.into_future().await;
// Since this should return no rows (with the row key set to a value that shouldn't exist)
// the first component of the tuple should be None.
debug!("🉑 health check");
Ok(v.is_none())
Ok(true)
}
}

Expand Down Expand Up @@ -1321,7 +1319,7 @@ impl DbClient for BigTableClientImpl {
self.pool
.get()
.await?
.health_check(&self.settings.table_name, self.metrics.clone())
.health_check(self.metrics.clone())
.await
}

Expand Down
30 changes: 28 additions & 2 deletions autopush-common/src/db/bigtable/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ impl BigtableClientManager {
}
}

fn get_instance_name(table_name: &str) -> Result<String, DbError> {
let parts: Vec<&str> = table_name.split('/').collect();
if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" {
return Err(DbError::General(
"Invalid table name specified. Cannot parse instance".to_owned(),
));
}
return Ok(parts[0..4].join("/"));
}

impl fmt::Debug for BigtableClientManager {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("deadpool::BtClientManager")
Expand All @@ -186,7 +196,11 @@ impl Manager for BigtableClientManager {
/// `BigtableClient` is the most atomic we can go.
async fn create(&self) -> Result<BigtableDb, DbError> {
debug!("🏊 Create a new pool entry.");
let entry = BigtableDb::new(self.get_channel()?, &self.settings.metadata()?);
let entry = BigtableDb::new(
self.get_channel()?,
&self.settings.metadata()?,
&get_instance_name(&self.settings.table_name)?,
);
debug!("🏊 Bigtable connection acquired");
Ok(entry)
}
Expand Down Expand Up @@ -216,7 +230,7 @@ impl Manager for BigtableClientManager {
// note, this changes to `blocks_in_conditions` for 1.76+
#[allow(clippy::blocks_in_conditions)]
if !client
.health_check(&self.settings.table_name, self.metrics.clone())
.health_check(self.metrics.clone())
.await
.map_err(|e| {
debug!("🏊 Recycle requested (health). {:?}", e);
Expand Down Expand Up @@ -265,3 +279,15 @@ impl BigtableClientManager {
Ok(chan)
}
}

#[test]
fn test_get_instance() -> Result<(), DbError> {
let res = get_instance_name("projects/foo/instances/bar/tables/gorp")?;
assert_eq!(res.as_str(), "projects/foo/instances/bar");

assert!(get_instance_name("projects/foo/").is_err());
assert!(get_instance_name("protect/foo/instances/bar/tables/gorp").is_err());
assert!(get_instance_name("project/foo/instance/bar/tables/gorp").is_err());

Ok(())
}

0 comments on commit bed0f7f

Please sign in to comment.