Skip to content

Commit

Permalink
Merge branch 'master' into release/1.71
Browse files Browse the repository at this point in the history
  • Loading branch information
pjenvey committed Mar 19, 2024
2 parents edc4764 + 38d6a6d commit d0081d5
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 110 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:

python-checks:
docker:
- image: python:3.10-slim-bullseye
- image: python:3.12-slim-bullseye
auth:
username: $DOCKER_USER
password: $DOCKER_PASS
Expand All @@ -79,7 +79,7 @@ jobs:

test:
docker:
- image: python:3.10-slim-bullseye
- image: python:3.12-slim-bullseye
auth:
username: $DOCKER_USER
password: $DOCKER_PASS
Expand Down
6 changes: 5 additions & 1 deletion autoconnect/autoconnect-settings/src/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ impl AppState {
Box::new(client)
}
#[cfg(all(feature = "bigtable", feature = "dynamodb"))]
StorageType::Dual => Box::new(DualClientImpl::new(metrics.clone(), &db_settings)?),
StorageType::Dual => {
let client = DualClientImpl::new(metrics.clone(), &db_settings)?;
client.spawn_sweeper(Duration::from_secs(30));
Box::new(client)
}
_ => panic!(
"Invalid Storage type {:?}. Check {}__DB_DSN.",
storage_type,
Expand Down
6 changes: 5 additions & 1 deletion autoendpoint/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ impl Server {
Box::new(client)
}
#[cfg(all(feature = "bigtable", feature = "dual"))]
StorageType::Dual => Box::new(DualClientImpl::new(metrics.clone(), &db_settings)?),
StorageType::Dual => {
let client = DualClientImpl::new(metrics.clone(), &db_settings)?;
client.spawn_sweeper(Duration::from_secs(30));
Box::new(client)
}
_ => {
debug!("No idea what {:?} is", &db_settings.dsn);
return Err(ApiErrorKind::General(
Expand Down
4 changes: 4 additions & 0 deletions autopush-common/src/db/bigtable/bigtable_client/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ pub enum BigTableError {
/// General Pool builder errors.
#[error("Pool Error: {0}")]
Pool(String),

#[error("BigTable config error: {0}")]
Config(String),
}

impl ReportableError for BigTableError {
Expand Down Expand Up @@ -144,6 +147,7 @@ impl ReportableError for BigTableError {
BigTableError::Recycle => "storage.bigtable.error.recycle",
BigTableError::Pool(_) => "storage.bigtable.error.pool",
BigTableError::GRPC(_) => "storage.bigtable.error.grpc",
BigTableError::Config(_) => "storage.bigtable.error.config",
};
Some(err)
}
Expand Down
40 changes: 19 additions & 21 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use cadence::{CountedExt, StatsdClient};
use futures_util::StreamExt;
use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest;
use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient;
use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
use google_cloud_rust_raw::bigtable::v2::bigtable::{PingAndWarmRequest, ReadRowsRequest};
use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
Expand Down Expand Up @@ -761,49 +761,47 @@ impl BigTableClientImpl {
#[derive(Clone)]
pub struct BigtableDb {
pub(super) conn: BigtableClient,
pub(super) metadata: Metadata,
pub(super) health_metadata: Metadata,
instance_name: String,
}

impl BigtableDb {
pub fn new(channel: Channel, metadata: &Metadata) -> Self {
pub fn new(channel: Channel, health_metadata: &Metadata, instance_name: &str) -> Self {
Self {
conn: BigtableClient::new(channel),
metadata: metadata.clone(),
health_metadata: health_metadata.clone(),
instance_name: instance_name.to_owned(),
}
}

/// Perform a simple connectivity check. This should return no actual results
/// but should verify that the connection is valid. We use this for the
/// Recycle check as well, so it has to be fairly low in the implementation
/// stack.
///
pub async fn health_check(
&mut self,
table_name: &str,
metrics: Arc<StatsdClient>,
) -> DbResult<bool> {
// Create a request that is GRPC valid, but does not point to a valid row.
let mut req = read_row_request(table_name, "NOT FOUND");
let mut filter = data::RowFilter::default();
filter.set_block_all_filter(true);
req.set_filter(filter);

let r = retry_policy(RETRY_COUNT)
/// "instance_name" is the "projects/{project}/instances/{instance}" portion of
/// the tablename.
///
pub async fn health_check(&mut self, metrics: Arc<StatsdClient>) -> DbResult<bool> {
let req = PingAndWarmRequest {
name: self.instance_name.clone(),
..Default::default()
};
// PingAndWarmResponse does not implement a stream since it does not return data.
let _r = retry_policy(RETRY_COUNT)
.retry_if(
|| async {
self.conn
.read_rows_opt(&req, call_opts(self.metadata.clone()))
.ping_and_warm_opt(&req, call_opts(self.health_metadata.clone()))
},
retryable_error(metrics.clone()),
)
.await
.map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?;

let (v, _stream) = r.into_future().await;
// Since this should return no rows (with the row key set to a value that shouldn't exist)
// the first component of the tuple should be None.
debug!("🉑 health check");
Ok(v.is_none())
Ok(true)
}
}

Expand Down Expand Up @@ -1321,7 +1319,7 @@ impl DbClient for BigTableClientImpl {
self.pool
.get()
.await?
.health_check(&self.settings.table_name, self.metrics.clone())
.health_check(self.metrics.clone())
.await
}

Expand Down
72 changes: 70 additions & 2 deletions autopush-common/src/db/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,28 @@ pub struct BigTableDbSettings {
pub retry_count: usize,
}

// Used by test, but we don't want available for release.
#[allow(clippy::derivable_impls)]
#[cfg(test)]
impl Default for BigTableDbSettings {
fn default() -> Self {
Self {
table_name: Default::default(),
router_family: Default::default(),
message_family: Default::default(),
message_topic_family: Default::default(),
database_pool_max_size: Default::default(),
database_pool_create_timeout: Default::default(),
database_pool_wait_timeout: Default::default(),
database_pool_recycle_timeout: Default::default(),
database_pool_connection_ttl: Default::default(),
database_pool_max_idle: Default::default(),
route_to_leader: Default::default(),
retry_count: Default::default(),
}
}
}

impl BigTableDbSettings {
pub fn metadata(&self) -> Result<Metadata, BigTableError> {
MetadataBuilder::with_prefix(&self.table_name)
Expand All @@ -93,13 +115,20 @@ impl BigTableDbSettings {
.map_err(BigTableError::GRPC)
}

pub fn health_metadata(&self) -> Result<Metadata, BigTableError> {
MetadataBuilder::with_prefix(&self.table_name)
.routing_param("name", &self.get_instance_name()?)
.route_to_leader(self.route_to_leader)
.build()
.map_err(BigTableError::GRPC)
}

pub fn admin_metadata(&self) -> Result<Metadata, BigTableError> {
// Admin calls use a slightly different routing param and a truncated prefix
// See https://github.com/googleapis/google-cloud-cpp/issues/190#issuecomment-370520185
let Some(admin_prefix) = self.table_name.split_once("/tables/").map(|v| v.0) else {
return Err(BigTableError::Admin(
return Err(BigTableError::Config(
"Invalid table name specified".to_owned(),
None,
));
};
MetadataBuilder::with_prefix(admin_prefix)
Expand All @@ -108,6 +137,16 @@ impl BigTableDbSettings {
.build()
.map_err(BigTableError::GRPC)
}

pub fn get_instance_name(&self) -> Result<String, BigTableError> {
let parts: Vec<&str> = self.table_name.split('/').collect();
if parts.len() < 4 || parts[0] != "projects" || parts[2] != "instances" {
return Err(BigTableError::Config(
"Invalid table name specified. Cannot parse instance".to_owned(),
));
}
Ok(parts[0..4].join("/"))
}
}

impl TryFrom<&str> for BigTableDbSettings {
Expand Down Expand Up @@ -138,4 +177,33 @@ mod tests {
);
Ok(())
}
#[test]
fn test_get_instance() -> Result<(), super::BigTableError> {
let settings = super::BigTableDbSettings {
table_name: "projects/foo/instances/bar/tables/gorp".to_owned(),
..Default::default()
};
let res = settings.get_instance_name()?;
assert_eq!(res.as_str(), "projects/foo/instances/bar");

let settings = super::BigTableDbSettings {
table_name: "projects/foo/".to_owned(),
..Default::default()
};
assert!(settings.get_instance_name().is_err());

let settings = super::BigTableDbSettings {
table_name: "protect/foo/instances/bar/tables/gorp".to_owned(),
..Default::default()
};
assert!(settings.get_instance_name().is_err());

let settings = super::BigTableDbSettings {
table_name: "project/foo/instance/bar/tables/gorp".to_owned(),
..Default::default()
};
assert!(settings.get_instance_name().is_err());

Ok(())
}
}
8 changes: 6 additions & 2 deletions autopush-common/src/db/bigtable/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,11 @@ impl Manager for BigtableClientManager {
/// `BigtableClient` is the most atomic we can go.
async fn create(&self) -> Result<BigtableDb, DbError> {
debug!("🏊 Create a new pool entry.");
let entry = BigtableDb::new(self.get_channel()?, &self.settings.metadata()?);
let entry = BigtableDb::new(
self.get_channel()?,
&self.settings.health_metadata()?,
&self.settings.get_instance_name()?,
);
debug!("🏊 Bigtable connection acquired");
Ok(entry)
}
Expand Down Expand Up @@ -216,7 +220,7 @@ impl Manager for BigtableClientManager {
// note, this changes to `blocks_in_conditions` for 1.76+
#[allow(clippy::blocks_in_conditions)]
if !client
.health_check(&self.settings.table_name, self.metrics.clone())
.health_check(self.metrics.clone())
.await
.map_err(|e| {
debug!("🏊 Recycle requested (health). {:?}", e);
Expand Down
Loading

0 comments on commit d0081d5

Please sign in to comment.