diff --git a/Cargo.lock b/Cargo.lock index c98c8290f..c14705efb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -743,6 +743,7 @@ dependencies = [ "gethostname", "google-cloud-rust-raw", "grpcio", + "grpcio-sys", "hex", "httparse", "hyper", diff --git a/autoconnect/Cargo.toml b/autoconnect/Cargo.toml index ad1cf200e..4abe06919 100644 --- a/autoconnect/Cargo.toml +++ b/autoconnect/Cargo.toml @@ -53,7 +53,7 @@ actix-service = "2.0" docopt = "1.1" [features] -default = ["dual"] +default = ["dual", "emulator"] bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"] dynamodb = ["autopush_common/dynamodb", "autoconnect_settings/dynamodb"] dual = ["bigtable", "dynamodb"] diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index 9f065bf23..a716e0f54 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -68,6 +68,7 @@ google-cloud-rust-raw = { version = "0.16", default-features = false, features = "bigtable", ], optional = true } grpcio = { version = "=0.13.0", features = ["openssl"], optional = true } +grpcio-sys = { version = "=0.13.0", optional = true } protobuf = { version = "=2.28.0", optional = true } # grpcio does not support protobuf 3+ form_urlencoded = { version = "1.2", optional = true } @@ -83,6 +84,7 @@ actix-rt = "2.8" bigtable = [ "dep:google-cloud-rust-raw", "dep:grpcio", + "dep:grpcio-sys", "dep:protobuf", "dep:form_urlencoded", ] diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 0d93bc212..7a8be803d 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -5,6 +5,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, SystemTime}; +use again::RetryPolicy; use async_trait::async_trait; use cadence::{CountedExt, StatsdClient}; use futures_util::StreamExt; @@ -14,7 +15,7 @@ use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest; use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient; use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain}; use google_cloud_rust_raw::bigtable::v2::{bigtable, data}; -use grpcio::{Channel, Metadata}; +use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode}; use protobuf::RepeatedField; use serde_json::{from_str, json}; use uuid::Uuid; @@ -51,6 +52,8 @@ const ROUTER_FAMILY: &str = "router"; const MESSAGE_FAMILY: &str = "message"; // The default family for messages const MESSAGE_TOPIC_FAMILY: &str = "message_topic"; +pub(crate) const RETRY_COUNT: usize = 5; + /// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently. // TODO:Should we create something similar for ChannelID? struct Uaid(Uuid); @@ -193,6 +196,67 @@ fn to_string(value: Vec, name: &str) -> Result { }) } +pub fn retry_policy(max: usize) -> RetryPolicy { + RetryPolicy::default() + .with_max_retries(max) + .with_jitter(true) +} + +fn retriable_internal_error(status: &RpcStatus) -> bool { + match status.code() { + RpcStatusCode::UNKNOWN => { + "error occurred when fetching oauth2 token" == status.message().to_ascii_lowercase() + } + RpcStatusCode::INTERNAL => [ + "rst_stream", + "rst stream", + "received unexpected eos on data from from server", + ] + .contains(&status.message().to_lowercase().as_str()), + RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true, + _ => false, + } +} + +pub fn metric(metrics: &Arc, err_type: &str, code: Option<&str>) { + let mut metric = metrics + .incr_with_tags("database.retry") + .with_tag("error", err_type) + .with_tag("type", "bigtable"); + if let Some(code) = code { + metric = metric.with_tag("code", code); + } + metric.send(); +} + +pub fn retryable_error(metrics: Arc) -> impl Fn(&grpcio::Error) -> bool { + move |err| { + debug!("🉑 Checking error...{err}"); + match err { + grpcio::Error::RpcFailure(status) => { + info!("GRPC Failure :{:?}", status); + metric(&metrics, "RpcFailure", Some(&status.code().to_string())); + retriable_internal_error(status) + } + grpcio::Error::BindFail(_) => { + metric(&metrics, "BindFail", None); + true + } + // The parameter here is a [grpcio_sys::grpc_call_error] enum + // Not all of these are retriable. + grpcio::Error::CallFailure(grpc_call_status) => { + metric( + &metrics, + "CallFailure", + Some(&format!("{:?}", grpc_call_status)), + ); + grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR + } + _ => false, + } + } +} + fn call_opts(metadata: Metadata) -> ::grpcio::CallOption { ::grpcio::CallOption::default().headers(metadata) } @@ -268,10 +332,15 @@ impl BigTableClientImpl { req: bigtable::MutateRowRequest, ) -> Result<(), error::BigTableError> { let bigtable = self.pool.get().await?; - bigtable - .conn - .mutate_row_async_opt(&req, call_opts(self.metadata.clone())) - .map_err(error::BigTableError::Write)? + retry_policy(self.settings.retry_count) + .retry_if( + || async { + bigtable + .conn + .mutate_row_opt(&req, call_opts(self.metadata.clone())) + }, + retryable_error(self.metrics.clone()), + ) .await .map_err(error::BigTableError::Write)?; Ok(()) @@ -285,9 +354,16 @@ impl BigTableClientImpl { ) -> Result<(), error::BigTableError> { let bigtable = self.pool.get().await?; // ClientSStreamReceiver will cancel an operation if it's dropped before it's done. - let resp = bigtable - .conn - .mutate_rows_opt(&req, call_opts(self.metadata.clone())) + let resp = retry_policy(self.settings.retry_count) + .retry_if( + || async { + bigtable + .conn + .mutate_rows_opt(&req, call_opts(self.metadata.clone())) + }, + retryable_error(self.metrics.clone()), + ) + .await .map_err(error::BigTableError::Write)?; // Scan the returned stream looking for errors. @@ -349,9 +425,16 @@ impl BigTableClientImpl { req: ReadRowsRequest, ) -> Result, error::BigTableError> { let bigtable = self.pool.get().await?; - let resp = bigtable - .conn - .read_rows_opt(&req, call_opts(self.metadata.clone())) + let resp = retry_policy(self.settings.retry_count) + .retry_if( + || async { + bigtable + .conn + .read_rows_opt(&req, call_opts(self.metadata.clone())) + }, + retryable_error(self.metrics.clone()), + ) + .await .map_err(error::BigTableError::Read)?; merge::RowMerger::process_chunks(resp).await } @@ -428,10 +511,17 @@ impl BigTableClientImpl { req: bigtable::CheckAndMutateRowRequest, ) -> Result { let bigtable = self.pool.get().await?; - let resp = bigtable - .conn - .check_and_mutate_row_async_opt(&req, call_opts(self.metadata.clone())) - .map_err(error::BigTableError::Write)? + let resp = retry_policy(self.settings.retry_count) + .retry_if( + || async { + // Note: check_and_mutate_row_async may return before the row + // is written, which can cause race conditions for reads + bigtable + .conn + .check_and_mutate_row_opt(&req, call_opts(self.metadata.clone())) + }, + retryable_error(self.metrics.clone()), + ) .await .map_err(error::BigTableError::Write)?; debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),); @@ -659,16 +749,26 @@ impl BigtableDb { /// Recycle check as well, so it has to be fairly low in the implementation /// stack. /// - pub async fn health_check(&mut self, table_name: &str) -> DbResult { + pub async fn health_check( + &mut self, + table_name: &str, + metrics: Arc, + ) -> DbResult { // Create a request that is GRPC valid, but does not point to a valid row. let mut req = read_row_request(table_name, "NOT FOUND"); let mut filter = data::RowFilter::default(); filter.set_block_all_filter(true); req.set_filter(filter); - let r = self - .conn - .read_rows_opt(&req, call_opts(self.metadata.clone())) + let r = retry_policy(RETRY_COUNT) + .retry_if( + || async { + self.conn + .read_rows_opt(&req, call_opts(self.metadata.clone())) + }, + retryable_error(metrics.clone()), + ) + .await .map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?; let (v, _stream) = r.into_future().await; @@ -1192,7 +1292,7 @@ impl DbClient for BigTableClientImpl { self.pool .get() .await? - .health_check(&self.settings.table_name) + .health_check(&self.settings.table_name, self.metrics.clone()) .await } diff --git a/autopush-common/src/db/bigtable/mod.rs b/autopush-common/src/db/bigtable/mod.rs index 05c79ec9d..7a306d0a7 100644 --- a/autopush-common/src/db/bigtable/mod.rs +++ b/autopush-common/src/db/bigtable/mod.rs @@ -33,6 +33,10 @@ use crate::db::bigtable::bigtable_client::MetadataBuilder; use crate::db::error::DbError; use crate::util::deserialize_opt_u32_to_duration; +fn retry_default() -> usize { + bigtable_client::RETRY_COUNT +} + /// The settings for accessing the BigTable contents. #[derive(Clone, Debug, Deserialize)] pub struct BigTableDbSettings { @@ -75,6 +79,9 @@ pub struct BigTableDbSettings { /// Include route to leader header in metadata #[serde(default)] pub route_to_leader: bool, + /// Number of times to retry a GRPC function + #[serde(default = "retry_default")] + pub retry_count: usize, } impl BigTableDbSettings { diff --git a/autopush-common/src/db/bigtable/pool.rs b/autopush-common/src/db/bigtable/pool.rs index d3cbefd72..ebae53ee8 100644 --- a/autopush-common/src/db/bigtable/pool.rs +++ b/autopush-common/src/db/bigtable/pool.rs @@ -87,8 +87,12 @@ impl BigTablePool { debug!("🉑 connection string {}", &connection); // Construct a new manager and put them in a pool for handling future requests. - let manager = - BigtableClientManager::new(&bt_settings, settings.dsn.clone(), connection.clone())?; + let manager = BigtableClientManager::new( + &bt_settings, + settings.dsn.clone(), + connection.clone(), + metrics.clone(), + )?; let mut config = PoolConfig::default(); if let Some(size) = bt_settings.database_pool_max_size { debug!("🏊 Setting pool max size {}", &size); @@ -119,6 +123,7 @@ pub struct BigtableClientManager { settings: BigTableDbSettings, dsn: Option, connection: String, + metrics: Arc, } impl BigtableClientManager { @@ -126,11 +131,13 @@ impl BigtableClientManager { settings: &BigTableDbSettings, dsn: Option, connection: String, + metrics: Arc, ) -> Result { Ok(Self { settings: settings.clone(), dsn, connection, + metrics, }) } } @@ -182,7 +189,7 @@ impl Manager for BigtableClientManager { // note, this changes to `blocks_in_conditions` for 1.76+ #[allow(clippy::blocks_in_conditions)] if !client - .health_check(&self.settings.table_name) + .health_check(&self.settings.table_name, self.metrics.clone()) .await .map_err(|e| { debug!("🏊 Recycle requested (health). {:?}", e);