From 99da39d5b487a6f4b8c5afaa361aa45183c4dcff Mon Sep 17 00:00:00 2001 From: jrconlin Date: Tue, 30 Jan 2024 16:43:04 -0800 Subject: [PATCH 1/8] feat: Make bigtable calls retryable Closes SYNC-4085 --- .../src/db/bigtable/bigtable_client/mod.rs | 95 +++++++++++++++---- autopush-common/src/db/bigtable/mod.rs | 6 ++ 2 files changed, 83 insertions(+), 18 deletions(-) diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 301768cfe..5b1931ac4 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -5,8 +5,9 @@ use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use again::RetryPolicy; use async_trait::async_trait; -use cadence::StatsdClient; +use cadence::{CountedExt, StatsdClient}; use futures_util::StreamExt; use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin::DropRowRangeRequest; use google_cloud_rust_raw::bigtable::admin::v2::bigtable_table_admin_grpc::BigtableTableAdminClient; @@ -71,7 +72,7 @@ impl From for String { pub struct BigTableClientImpl { pub(crate) settings: BigTableDbSettings, /// Metrics client - _metrics: Arc, + metrics: Arc, /// Connection Channel (used for alternate calls) pool: BigTablePool, } @@ -117,6 +118,55 @@ fn to_string(value: Vec, name: &str) -> Result { }) } +pub fn retry_policy(max: usize) -> RetryPolicy { + RetryPolicy::default() + .with_max_retries(max) + .with_jitter(true) +} + +pub fn retryable_describe_table_error( + metrics: Arc, +) -> impl Fn(&grpcio::Error) -> bool { + move |err| { + debug!("🉑 Checking error...{err}"); + match err { + grpcio::Error::RpcFailure(_) => { + metrics + .incr_with_tags("database.retry") + .with_tag("error", "RpcFailure") + .with_tag("type", "bigtable") + .send(); + true + } + grpcio::Error::QueueShutdown => { + metrics + .incr_with_tags("database.retry") + .with_tag("error", "QueueShutdown") + .with_tag("type", "bigtable") + .send(); + true + } + grpcio::Error::BindFail(_) => { + metrics + .incr_with_tags("database.retry") + .with_tag("error", "BindFail") + .with_tag("type", "bigtable") + .send(); + true + } + grpcio::Error::CallFailure(_) => { + metrics + .incr_with_tags("database.retry") + .with_tag("error", "CallFailure") + .with_tag("type", "bigtable") + .send(); + true + } + _ => false, + } + } +} + /// Connect to a BigTable storage model. /// /// BigTable is available via the Google Console, and is a schema less storage system. @@ -150,7 +200,7 @@ impl BigTableClientImpl { let pool = BigTablePool::new(settings, &metrics)?; Ok(Self { settings: db_settings, - _metrics: metrics, + metrics, pool, }) } @@ -231,10 +281,13 @@ impl BigTableClientImpl { req: ReadRowsRequest, ) -> Result, error::BigTableError> { let bigtable = self.pool.get().await?; - let resp = bigtable - .conn - .read_rows(&req) - .map_err(error::BigTableError::Read)?; + let resp = retry_policy(self.settings.retry_count) + .retry_if( + || async { bigtable.conn.read_rows(&req) }, + retryable_describe_table_error(self.metrics.clone()), + ) + .await + .map_err(error::BigTableError::Write)?; merge::RowMerger::process_chunks(resp).await } @@ -256,10 +309,11 @@ impl BigTableClientImpl { // Do the actual commit. let bigtable = self.pool.get().await?; debug!("🉑 writing row..."); - let _resp = bigtable - .conn - .mutate_row_async(&req) - .map_err(error::BigTableError::Write)? + let _resp = retry_policy(self.settings.retry_count) + .retry_if( + || async { bigtable.conn.mutate_row_async(&req) }, + retryable_describe_table_error(self.metrics.clone()), + ) .await .map_err(error::BigTableError::Write)?; Ok(()) @@ -319,9 +373,12 @@ impl BigTableClientImpl { // Do the actual commit. let bigtable = self.pool.get().await?; - let resp = bigtable - .conn - .check_and_mutate_row_async(&req) + let resp = retry_policy(self.settings.retry_count) + .retry_if( + || async { bigtable.conn.check_and_mutate_row_async(&req) }, + retryable_describe_table_error(self.metrics.clone()), + ) + .await .map_err(error::BigTableError::Write)? .await .map_err(error::BigTableError::Write)?; @@ -380,10 +437,11 @@ impl BigTableClientImpl { req.set_mutations(mutations); let bigtable = self.pool.get().await?; - let _resp = bigtable - .conn - .mutate_row_async(&req) - .map_err(error::BigTableError::Write)? + let _resp = retry_policy(self.settings.retry_count) + .retry_if( + || async { bigtable.conn.mutate_row_async(&req) }, + retryable_describe_table_error(self.metrics.clone()), + ) .await .map_err(error::BigTableError::Write)?; Ok(()) @@ -1368,6 +1426,7 @@ mod tests { Uuid::parse_str(&parts.join("-")).unwrap() }; let client = new_client().unwrap(); + debug!("UAID: {uaid}"); client.remove_user(&uaid).await.unwrap(); let qualifier = "foo".to_owned(); diff --git a/autopush-common/src/db/bigtable/mod.rs b/autopush-common/src/db/bigtable/mod.rs index b3352996f..1ae3abb44 100644 --- a/autopush-common/src/db/bigtable/mod.rs +++ b/autopush-common/src/db/bigtable/mod.rs @@ -31,6 +31,10 @@ use std::time::Duration; use crate::db::error::DbError; use crate::util::deserialize_u32_to_duration; +fn retry_default() -> usize { + 10 +} + /// The settings for accessing the BigTable contents. #[derive(Clone, Debug, Deserialize)] pub struct BigTableDbSettings { @@ -62,6 +66,8 @@ pub struct BigTableDbSettings { #[serde(default)] #[serde(deserialize_with = "deserialize_u32_to_duration")] pub database_pool_max_idle: Duration, + #[serde(default = "retry_default")] + pub retry_count: usize, } impl TryFrom<&str> for BigTableDbSettings { From 3a2e0b2b93213b815c2a0fd79c0eea27d8aed30f Mon Sep 17 00:00:00 2001 From: jrconlin Date: Wed, 31 Jan 2024 10:47:34 -0800 Subject: [PATCH 2/8] f fix test, make some calls sync in async wrappers --- autoconnect/Cargo.toml | 2 +- .../src/db/bigtable/bigtable_client/mod.rs | 53 +++++++++++++------ autopush-common/src/db/bigtable/pool.rs | 13 +++-- 3 files changed, 49 insertions(+), 19 deletions(-) diff --git a/autoconnect/Cargo.toml b/autoconnect/Cargo.toml index ad1cf200e..4abe06919 100644 --- a/autoconnect/Cargo.toml +++ b/autoconnect/Cargo.toml @@ -53,7 +53,7 @@ actix-service = "2.0" docopt = "1.1" [features] -default = ["dual"] +default = ["dual", "emulator"] bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"] dynamodb = ["autopush_common/dynamodb", "autoconnect_settings/dynamodb"] dual = ["bigtable", "dynamodb"] diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 5b1931ac4..48e35ed07 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -212,7 +212,7 @@ impl BigTableClientImpl { /// Read a given row from the row key. async fn read_row(&self, row_key: &str) -> Result, error::BigTableError> { - debug!("🉑 Row key: {}", row_key); + dbg!("🉑 Row key: {}", row_key); let req = self.read_row_request(row_key); let mut rows = self.read_rows(req).await?; Ok(rows.remove(row_key)) @@ -311,7 +311,11 @@ impl BigTableClientImpl { debug!("🉑 writing row..."); let _resp = retry_policy(self.settings.retry_count) .retry_if( - || async { bigtable.conn.mutate_row_async(&req) }, + || async { + // Note: `mutate_row_async` will return before the row is written, which may + // cause race conditions for reads. (see test::read_cells_family_id() ) + bigtable.conn.mutate_row(&req) + }, retryable_describe_table_error(self.metrics.clone()), ) .await @@ -375,12 +379,14 @@ impl BigTableClientImpl { let bigtable = self.pool.get().await?; let resp = retry_policy(self.settings.retry_count) .retry_if( - || async { bigtable.conn.check_and_mutate_row_async(&req) }, + || async { + // Note: check_and_mutate_row_async may return before the row + // is written, which can cause race conditions for reads + bigtable.conn.check_and_mutate_row(&req) + }, retryable_describe_table_error(self.metrics.clone()), ) .await - .map_err(error::BigTableError::Write)? - .await .map_err(error::BigTableError::Write)?; debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),); Ok(resp.get_predicate_matched()) @@ -416,10 +422,15 @@ impl BigTableClientImpl { req.set_mutations(mutations); let bigtable = self.pool.get().await?; - let _resp = bigtable - .conn - .mutate_row_async(&req) - .map_err(error::BigTableError::Write)? + let _resp = retry_policy(self.settings.retry_count) + .retry_if( + || async { + // NOTE: `mutate_row_async` will return before the row is written, which may + // cause race conditions for reads. + bigtable.conn.mutate_row(&req) + }, + retryable_describe_table_error(self.metrics.clone()), + ) .await .map_err(error::BigTableError::Write)?; Ok(()) @@ -439,7 +450,11 @@ impl BigTableClientImpl { let bigtable = self.pool.get().await?; let _resp = retry_policy(self.settings.retry_count) .retry_if( - || async { bigtable.conn.mutate_row_async(&req) }, + || async { + // NOTE: `mutate_row_async` will return before the row is written, which may + // cause race conditions for reads. + bigtable.conn.mutate_row(&req) + }, retryable_describe_table_error(self.metrics.clone()), ) .await @@ -600,16 +615,23 @@ impl BigtableDb { /// Recycle check as well, so it has to be fairly low in the implementation /// stack. /// - pub async fn health_check(&mut self, table_name: &str) -> DbResult { + pub async fn health_check( + &mut self, + table_name: &str, + metrics: Arc, + ) -> DbResult { // Create a request that is GRPC valid, but does not point to a valid row. let mut req = read_row_request(table_name, "NOT FOUND"); let mut filter = data::RowFilter::default(); filter.set_block_all_filter(true); req.set_filter(filter); - let r = self - .conn - .read_rows(&req) + let r = retry_policy(10) + .retry_if( + || async { self.conn.read_rows(&req) }, + retryable_describe_table_error(metrics.clone()), + ) + .await .map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?; let (v, _stream) = r.into_future().await; @@ -1123,7 +1145,7 @@ impl DbClient for BigTableClientImpl { self.pool .get() .await? - .health_check(&self.settings.table_name) + .health_check(&self.settings.table_name, self.metrics.clone()) .await } @@ -1446,6 +1468,7 @@ mod tests { panic!("Expected row"); }; assert_eq!(row.cells.len(), 1); + dbg!(&row.cells.keys()); assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str()); client.remove_user(&uaid).await } diff --git a/autopush-common/src/db/bigtable/pool.rs b/autopush-common/src/db/bigtable/pool.rs index 778ec949d..8f12cc413 100644 --- a/autopush-common/src/db/bigtable/pool.rs +++ b/autopush-common/src/db/bigtable/pool.rs @@ -87,8 +87,12 @@ impl BigTablePool { debug!("🉑 connection string {}", &connection); // Construct a new manager and put them in a pool for handling future requests. - let manager = - BigtableClientManager::new(&bt_settings, settings.dsn.clone(), connection.clone())?; + let manager = BigtableClientManager::new( + &bt_settings, + settings.dsn.clone(), + connection.clone(), + metrics.clone(), + )?; let mut config = PoolConfig::default(); if let Some(size) = bt_settings.database_pool_max_size { debug!("🏊 Setting pool max size {}", &size); @@ -121,6 +125,7 @@ pub struct BigtableClientManager { settings: BigTableDbSettings, dsn: Option, connection: String, + metrics: Arc, } impl BigtableClientManager { @@ -128,11 +133,13 @@ impl BigtableClientManager { settings: &BigTableDbSettings, dsn: Option, connection: String, + metrics: Arc, ) -> Result { Ok(Self { settings: settings.clone(), dsn, connection, + metrics, }) } } @@ -183,7 +190,7 @@ impl Manager for BigtableClientManager { // Clippy 0.1.73 complains about the `.map_err` being hard to read. #[allow(clippy::blocks_in_if_conditions)] if !client - .health_check(&self.settings.table_name) + .health_check(&self.settings.table_name, self.metrics.clone()) .await .map_err(|e| { debug!("🏊 Recycle requested (health). {:?}", e); From 1f899dab8f438ba78345781c17fe94050e230168 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Wed, 31 Jan 2024 11:22:30 -0800 Subject: [PATCH 3/8] f remove dbg --- autopush-common/src/db/bigtable/bigtable_client/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 48e35ed07..57f0e2726 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -212,7 +212,6 @@ impl BigTableClientImpl { /// Read a given row from the row key. async fn read_row(&self, row_key: &str) -> Result, error::BigTableError> { - dbg!("🉑 Row key: {}", row_key); let req = self.read_row_request(row_key); let mut rows = self.read_rows(req).await?; Ok(rows.remove(row_key)) @@ -1468,7 +1467,6 @@ mod tests { panic!("Expected row"); }; assert_eq!(row.cells.len(), 1); - dbg!(&row.cells.keys()); assert_eq!(row.cells.keys().next().unwrap(), qualifier.as_str()); client.remove_user(&uaid).await } From 5dd651f4699cd94a5b1cf53cd3406d09c77e0fe9 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Fri, 2 Feb 2024 16:39:02 -0800 Subject: [PATCH 4/8] f wrap new functions. --- .../src/db/bigtable/bigtable_client/mod.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index b7c2a9cfc..69b0c2c51 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -287,10 +287,11 @@ impl BigTableClientImpl { req: bigtable::MutateRowRequest, ) -> Result<(), error::BigTableError> { let bigtable = self.pool.get().await?; - bigtable - .conn - .mutate_row_async(&req) - .map_err(error::BigTableError::Write)? + retry_policy(self.settings.retry_count) + .retry_if( + || async { bigtable.conn.mutate_row(&req) }, + retryable_describe_table_error(self.metrics.clone()), + ) .await .map_err(error::BigTableError::Write)?; Ok(()) @@ -304,9 +305,12 @@ impl BigTableClientImpl { ) -> Result<(), error::BigTableError> { let bigtable = self.pool.get().await?; // ClientSStreamReceiver will cancel an operation if it's dropped before it's done. - let resp = bigtable - .conn - .mutate_rows(&req) + let resp = retry_policy(self.settings.retry_count) + .retry_if( + || async { bigtable.conn.mutate_rows(&req) }, + retryable_describe_table_error(self.metrics.clone()), + ) + .await .map_err(error::BigTableError::Write)?; // Scan the returned stream looking for errors. From 11a17a1079a56373c73285f8d883d8f78ea24ab3 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Mon, 26 Feb 2024 15:57:17 -0800 Subject: [PATCH 5/8] f fine tune retry errors --- Cargo.lock | 1 + autopush-common/Cargo.toml | 2 + .../src/db/bigtable/bigtable_client/mod.rs | 38 +++++++++++++++++-- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c4a32b11..01a693325 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -743,6 +743,7 @@ dependencies = [ "gethostname", "google-cloud-rust-raw", "grpcio", + "grpcio-sys", "hex", "httparse", "hyper", diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index b6e2f3a4d..27a73b7d1 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -67,6 +67,7 @@ google-cloud-rust-raw = { version = "0.16", default-features = false, features = "bigtable", ], optional = true } grpcio = { version = "=0.13.0", features = ["openssl"], optional = true } +grpcio-sys = { version = "=0.13.0", optional = true } protobuf = { version = "=2.28.0", optional = true } # grpcio does not support protobuf 3+ form_urlencoded = { version = "1.2", optional = true } @@ -82,6 +83,7 @@ actix-rt = "2.8" bigtable = [ "dep:google-cloud-rust-raw", "dep:grpcio", + "dep:grpcio-sys", "dep:protobuf", "dep:form_urlencoded", ] diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index a92a7780b..68740afc5 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -15,7 +15,7 @@ use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest; use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient; use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain}; use google_cloud_rust_raw::bigtable::v2::{bigtable, data}; -use grpcio::{Channel, Metadata}; +use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode}; use protobuf::RepeatedField; use serde_json::{from_str, json}; use uuid::Uuid; @@ -200,19 +200,38 @@ pub fn retry_policy(max: usize) -> RetryPolicy { .with_jitter(true) } +fn retriable_internal_error(status: &RpcStatus) -> bool { + match status.code() { + RpcStatusCode::UNKNOWN => { + "error occurred when fetching oauth2 token" == status.message().to_ascii_lowercase() + } + RpcStatusCode::INTERNAL => [ + "rst_stream", + "rst stream", + "received unexpected eos on data from from server", + ] + .contains(&status.message().to_lowercase().as_str()), + RpcStatusCode::UNAVAILABLE => true, + RpcStatusCode::DEADLINE_EXCEEDED => true, + _ => false, + } +} + pub fn retryable_describe_table_error( metrics: Arc, ) -> impl Fn(&grpcio::Error) -> bool { move |err| { debug!("🉑 Checking error...{err}"); match err { - grpcio::Error::RpcFailure(_) => { + grpcio::Error::RpcFailure(status) => { + info!("GRPC Failure :{:?}", status); metrics .incr_with_tags("database.retry") .with_tag("error", "RpcFailure") .with_tag("type", "bigtable") + .with_tag("code", &status.code().to_string()) .send(); - true + retriable_internal_error(status) } grpcio::Error::QueueShutdown => { metrics @@ -230,11 +249,22 @@ pub fn retryable_describe_table_error( .send(); true } - grpcio::Error::CallFailure(_) => { + // The parameter here is a [grpcio_sys::grpc_call_error] enum + // Not all of these are retriable. + grpcio::Error::CallFailure(grpc_call_status) => { metrics .incr_with_tags("database.retry") .with_tag("error", "CallFailure") .with_tag("type", "bigtable") + .with_tag("code", &format!("{:?}", grpc_call_status)) + .send(); + grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR + } + grpcio::Error::ShutdownFailed => { + metrics + .incr_with_tags("database.retry") + .with_tag("error", "ShutdownFailed") + .with_tag("type", "bigtable") .send(); true } From c9f4bf6ea0c8efc2ee167f3aa9c576ee4e49c822 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Mon, 26 Feb 2024 17:00:42 -0800 Subject: [PATCH 6/8] f merge conflict --- autopush-common/src/db/bigtable/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/autopush-common/src/db/bigtable/mod.rs b/autopush-common/src/db/bigtable/mod.rs index b9be66edb..67108deca 100644 --- a/autopush-common/src/db/bigtable/mod.rs +++ b/autopush-common/src/db/bigtable/mod.rs @@ -73,6 +73,7 @@ pub struct BigTableDbSettings { #[serde(deserialize_with = "deserialize_opt_u32_to_duration")] pub database_pool_connection_ttl: Option, /// Max idle time(in seconds) for a connection + #[serde(default)] #[serde(deserialize_with = "deserialize_opt_u32_to_duration")] pub database_pool_max_idle: Option, /// Include route to leader header in metadata From 1dffa0c1e665c30d71191c5172192a9ef4812f88 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Tue, 27 Feb 2024 12:09:06 -0800 Subject: [PATCH 7/8] f r's #1 --- .../src/db/bigtable/bigtable_client/mod.rs | 68 +++++++------------ autopush-common/src/db/bigtable/mod.rs | 2 +- 2 files changed, 26 insertions(+), 44 deletions(-) diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 9933c926c..b154f6ac6 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -211,63 +211,45 @@ fn retriable_internal_error(status: &RpcStatus) -> bool { "received unexpected eos on data from from server", ] .contains(&status.message().to_lowercase().as_str()), - RpcStatusCode::UNAVAILABLE => true, - RpcStatusCode::DEADLINE_EXCEEDED => true, + RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true, _ => false, } } -pub fn retryable_describe_table_error( - metrics: Arc, -) -> impl Fn(&grpcio::Error) -> bool { +pub fn metric(metrics: &Arc, err_type: &str, code: Option<&str>) { + let mut metric = metrics + .incr_with_tags("database.retry") + .with_tag("error", err_type) + .with_tag("type", "bigtable"); + if let Some(code) = code { + metric = metric.with_tag("code", code); + } + metric.send(); +} + +pub fn retryable_error(metrics: Arc) -> impl Fn(&grpcio::Error) -> bool { move |err| { debug!("🉑 Checking error...{err}"); match err { grpcio::Error::RpcFailure(status) => { info!("GRPC Failure :{:?}", status); - metrics - .incr_with_tags("database.retry") - .with_tag("error", "RpcFailure") - .with_tag("type", "bigtable") - .with_tag("code", &status.code().to_string()) - .send(); + metric(&metrics, "RpcFailure", Some(&status.code().to_string())); retriable_internal_error(status) } - grpcio::Error::QueueShutdown => { - metrics - .incr_with_tags("database.retry") - .with_tag("error", "QueueShutdown") - .with_tag("type", "bigtable") - .send(); - true - } grpcio::Error::BindFail(_) => { - metrics - .incr_with_tags("database.retry") - .with_tag("error", "BindFail") - .with_tag("type", "bigtable") - .send(); + metric(&metrics, "BindFail", None); true } // The parameter here is a [grpcio_sys::grpc_call_error] enum // Not all of these are retriable. grpcio::Error::CallFailure(grpc_call_status) => { - metrics - .incr_with_tags("database.retry") - .with_tag("error", "CallFailure") - .with_tag("type", "bigtable") - .with_tag("code", &format!("{:?}", grpc_call_status)) - .send(); + metric( + &metrics, + "CallFailure", + Some(&format!("{:?}", grpc_call_status)), + ); grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR } - grpcio::Error::ShutdownFailed => { - metrics - .incr_with_tags("database.retry") - .with_tag("error", "ShutdownFailed") - .with_tag("type", "bigtable") - .send(); - true - } _ => false, } } @@ -355,7 +337,7 @@ impl BigTableClientImpl { .conn .mutate_row_opt(&req, call_opts(self.metadata.clone())) }, - retryable_describe_table_error(self.metrics.clone()), + retryable_error(self.metrics.clone()), ) .await .map_err(error::BigTableError::Write)?; @@ -377,7 +359,7 @@ impl BigTableClientImpl { .conn .mutate_rows_opt(&req, call_opts(self.metadata.clone())) }, - retryable_describe_table_error(self.metrics.clone()), + retryable_error(self.metrics.clone()), ) .await .map_err(error::BigTableError::Write)?; @@ -448,7 +430,7 @@ impl BigTableClientImpl { .conn .read_rows_opt(&req, call_opts(self.metadata.clone())) }, - retryable_describe_table_error(self.metrics.clone()), + retryable_error(self.metrics.clone()), ) .await .map_err(error::BigTableError::Read)?; @@ -536,7 +518,7 @@ impl BigTableClientImpl { .conn .check_and_mutate_row_opt(&req, call_opts(self.metadata.clone())) }, - retryable_describe_table_error(self.metrics.clone()), + retryable_error(self.metrics.clone()), ) .await .map_err(error::BigTableError::Write)?; @@ -782,7 +764,7 @@ impl BigtableDb { self.conn .read_rows_opt(&req, call_opts(self.metadata.clone())) }, - retryable_describe_table_error(metrics.clone()), + retryable_error(metrics.clone()), ) .await .map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?; diff --git a/autopush-common/src/db/bigtable/mod.rs b/autopush-common/src/db/bigtable/mod.rs index 67108deca..a7fe336b7 100644 --- a/autopush-common/src/db/bigtable/mod.rs +++ b/autopush-common/src/db/bigtable/mod.rs @@ -34,7 +34,7 @@ use crate::db::error::DbError; use crate::util::deserialize_opt_u32_to_duration; fn retry_default() -> usize { - 10 + 5 } /// The settings for accessing the BigTable contents. From a774267bf4538442bcc7ef564f95bd2c760eddb6 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Tue, 27 Feb 2024 13:29:18 -0800 Subject: [PATCH 8/8] f switch to RETRY_COUNT const --- autopush-common/src/db/bigtable/bigtable_client/mod.rs | 4 +++- autopush-common/src/db/bigtable/mod.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 86b3ee7a5..89fc6b44c 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -52,6 +52,8 @@ const ROUTER_FAMILY: &str = "router"; const MESSAGE_FAMILY: &str = "message"; // The default family for messages const MESSAGE_TOPIC_FAMILY: &str = "message_topic"; +pub(crate) const RETRY_COUNT: usize = 5; + /// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently. // TODO:Should we create something similar for ChannelID? struct Uaid(Uuid); @@ -758,7 +760,7 @@ impl BigtableDb { filter.set_block_all_filter(true); req.set_filter(filter); - let r = retry_policy(10) + let r = retry_policy(RETRY_COUNT) .retry_if( || async { self.conn diff --git a/autopush-common/src/db/bigtable/mod.rs b/autopush-common/src/db/bigtable/mod.rs index a7fe336b7..7a306d0a7 100644 --- a/autopush-common/src/db/bigtable/mod.rs +++ b/autopush-common/src/db/bigtable/mod.rs @@ -34,7 +34,7 @@ use crate::db::error::DbError; use crate::util::deserialize_opt_u32_to_duration; fn retry_default() -> usize { - 5 + bigtable_client::RETRY_COUNT } /// The settings for accessing the BigTable contents.