Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make bigtable calls retryable #591

Merged
merged 24 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
99da39d
feat: Make bigtable calls retryable
jrconlin Jan 31, 2024
3a2e0b2
f fix test, make some calls sync in async wrappers
jrconlin Jan 31, 2024
1f899da
f remove dbg
jrconlin Jan 31, 2024
5baebf9
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Feb 3, 2024
5dd651f
f wrap new functions.
jrconlin Feb 3, 2024
372c200
Merge branch 'master' into feat/SYNC-4085_retry
jrconlin Feb 5, 2024
bbe0120
Merge branch 'master' into feat/SYNC-4085_retry
jrconlin Feb 5, 2024
97307d7
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Feb 6, 2024
b256133
Merge branch 'master' into feat/SYNC-4085_retry
jrconlin Feb 9, 2024
2340b7e
Merge branch 'master' into feat/SYNC-4085_retry
jrconlin Feb 12, 2024
354722e
Merge branch 'master' into feat/SYNC-4085_retry
jrconlin Feb 13, 2024
6eca60a
Merge branch 'master' into feat/SYNC-4085_retry
jrconlin Feb 13, 2024
80af2aa
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Feb 20, 2024
b98621e
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Feb 20, 2024
15d52a1
Merge branch 'master' into feat/SYNC-4085_retry
jrconlin Feb 20, 2024
6530ebd
Merge branch 'master' into feat/SYNC-4085_retry
jrconlin Feb 26, 2024
11a17a1
f fine tune retry errors
jrconlin Feb 26, 2024
f596eea
Merge branch 'feat/SYNC-4085_retry' of github.com:mozilla-services/au…
jrconlin Feb 26, 2024
9bb651d
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Feb 27, 2024
c9f4bf6
f merge conflict
jrconlin Feb 27, 2024
1dffa0c
f r's #1
jrconlin Feb 27, 2024
be3610f
Merge branch 'master' of github.com:mozilla-services/autopush-rs into…
jrconlin Feb 27, 2024
a774267
f switch to RETRY_COUNT const
jrconlin Feb 27, 2024
31e8b02
Merge branch 'master' into feat/SYNC-4085_retry
pjenvey Feb 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion autoconnect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ actix-service = "2.0"
docopt = "1.1"

[features]
default = ["dual"]
default = ["dual", "emulator"]
bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"]
dynamodb = ["autopush_common/dynamodb", "autoconnect_settings/dynamodb"]
dual = ["bigtable", "dynamodb"]
Expand Down
2 changes: 2 additions & 0 deletions autopush-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ google-cloud-rust-raw = { version = "0.16", default-features = false, features =
"bigtable",
], optional = true }
grpcio = { version = "=0.13.0", features = ["openssl"], optional = true }
grpcio-sys = { version = "=0.13.0", optional = true }
protobuf = { version = "=2.28.0", optional = true } # grpcio does not support protobuf 3+
form_urlencoded = { version = "1.2", optional = true }

Expand All @@ -83,6 +84,7 @@ actix-rt = "2.8"
bigtable = [
"dep:google-cloud-rust-raw",
"dep:grpcio",
"dep:grpcio-sys",
"dep:protobuf",
"dep:form_urlencoded",
]
Expand Down
140 changes: 120 additions & 20 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use again::RetryPolicy;
use async_trait::async_trait;
use cadence::{CountedExt, StatsdClient};
use futures_util::StreamExt;
Expand All @@ -14,7 +15,7 @@ use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest;
use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient;
use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain};
use google_cloud_rust_raw::bigtable::v2::{bigtable, data};
use grpcio::{Channel, Metadata};
use grpcio::{Channel, Metadata, RpcStatus, RpcStatusCode};
use protobuf::RepeatedField;
use serde_json::{from_str, json};
use uuid::Uuid;
Expand Down Expand Up @@ -51,6 +52,8 @@ const ROUTER_FAMILY: &str = "router";
const MESSAGE_FAMILY: &str = "message"; // The default family for messages
const MESSAGE_TOPIC_FAMILY: &str = "message_topic";

pub(crate) const RETRY_COUNT: usize = 5;

/// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently.
// TODO:Should we create something similar for ChannelID?
struct Uaid(Uuid);
Expand Down Expand Up @@ -193,6 +196,67 @@ fn to_string(value: Vec<u8>, name: &str) -> Result<String, DbError> {
})
}

pub fn retry_policy(max: usize) -> RetryPolicy {
RetryPolicy::default()
.with_max_retries(max)
.with_jitter(true)
}

fn retriable_internal_error(status: &RpcStatus) -> bool {
match status.code() {
RpcStatusCode::UNKNOWN => {
"error occurred when fetching oauth2 token" == status.message().to_ascii_lowercase()
}
RpcStatusCode::INTERNAL => [
"rst_stream",
"rst stream",
"received unexpected eos on data from from server",
]
.contains(&status.message().to_lowercase().as_str()),
RpcStatusCode::UNAVAILABLE | RpcStatusCode::DEADLINE_EXCEEDED => true,
_ => false,
}
}

pub fn metric(metrics: &Arc<StatsdClient>, err_type: &str, code: Option<&str>) {
let mut metric = metrics
.incr_with_tags("database.retry")
.with_tag("error", err_type)
.with_tag("type", "bigtable");
if let Some(code) = code {
metric = metric.with_tag("code", code);
}
metric.send();
}

pub fn retryable_error(metrics: Arc<StatsdClient>) -> impl Fn(&grpcio::Error) -> bool {
move |err| {
debug!("🉑 Checking error...{err}");
match err {
grpcio::Error::RpcFailure(status) => {
info!("GRPC Failure :{:?}", status);
metric(&metrics, "RpcFailure", Some(&status.code().to_string()));
retriable_internal_error(status)
}
grpcio::Error::BindFail(_) => {
metric(&metrics, "BindFail", None);
true
}
// The parameter here is a [grpcio_sys::grpc_call_error] enum
// Not all of these are retriable.
grpcio::Error::CallFailure(grpc_call_status) => {
metric(
&metrics,
"CallFailure",
Some(&format!("{:?}", grpc_call_status)),
);
grpc_call_status == &grpcio_sys::grpc_call_error::GRPC_CALL_ERROR
jrconlin marked this conversation as resolved.
Show resolved Hide resolved
}
_ => false,
}
}
}

fn call_opts(metadata: Metadata) -> ::grpcio::CallOption {
::grpcio::CallOption::default().headers(metadata)
}
Expand Down Expand Up @@ -268,10 +332,15 @@ impl BigTableClientImpl {
req: bigtable::MutateRowRequest,
) -> Result<(), error::BigTableError> {
let bigtable = self.pool.get().await?;
bigtable
.conn
.mutate_row_async_opt(&req, call_opts(self.metadata.clone()))
.map_err(error::BigTableError::Write)?
retry_policy(self.settings.retry_count)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should make this a method

Suggested change
retry_policy(self.settings.retry_count)
self.retry_policy()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Remembers why it's not):
I use the same retry_policy for the heath_check which doesn't take a BigClientTableImpl.
I may need to restructure things a bit to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that, I'm fine with a one off for it. We have an open issue to rework the health check so that whole method will probably change quite a bit sooner rather than later anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, gonna push back on this one.
The function takes one parameter, and we're using it in different paths, so making this dependent on BigTableClientImpl would over complicate things right now.

We might want to revisit the structure of all of this later, but that should be a different ticket.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there's a lot of "Get It Working" in this. There's lots of room for fit and finish later.

.retry_if(
|| async {
bigtable
.conn
.mutate_row_opt(&req, call_opts(self.metadata.clone()))
},
retryable_error(self.metrics.clone()),
)
.await
.map_err(error::BigTableError::Write)?;
Ok(())
Expand All @@ -285,9 +354,16 @@ impl BigTableClientImpl {
) -> Result<(), error::BigTableError> {
let bigtable = self.pool.get().await?;
// ClientSStreamReceiver will cancel an operation if it's dropped before it's done.
let resp = bigtable
.conn
.mutate_rows_opt(&req, call_opts(self.metadata.clone()))
let resp = retry_policy(self.settings.retry_count)
.retry_if(
|| async {
bigtable
.conn
.mutate_rows_opt(&req, call_opts(self.metadata.clone()))
},
retryable_error(self.metrics.clone()),
)
.await
.map_err(error::BigTableError::Write)?;

// Scan the returned stream looking for errors.
Expand Down Expand Up @@ -349,9 +425,16 @@ impl BigTableClientImpl {
req: ReadRowsRequest,
) -> Result<BTreeMap<RowKey, row::Row>, error::BigTableError> {
let bigtable = self.pool.get().await?;
let resp = bigtable
.conn
.read_rows_opt(&req, call_opts(self.metadata.clone()))
let resp = retry_policy(self.settings.retry_count)
.retry_if(
|| async {
bigtable
.conn
.read_rows_opt(&req, call_opts(self.metadata.clone()))
},
retryable_error(self.metrics.clone()),
)
.await
.map_err(error::BigTableError::Read)?;
merge::RowMerger::process_chunks(resp).await
}
Expand Down Expand Up @@ -428,10 +511,17 @@ impl BigTableClientImpl {
req: bigtable::CheckAndMutateRowRequest,
) -> Result<bool, error::BigTableError> {
let bigtable = self.pool.get().await?;
let resp = bigtable
.conn
.check_and_mutate_row_async_opt(&req, call_opts(self.metadata.clone()))
.map_err(error::BigTableError::Write)?
let resp = retry_policy(self.settings.retry_count)
.retry_if(
|| async {
// Note: check_and_mutate_row_async may return before the row
// is written, which can cause race conditions for reads
bigtable
.conn
.check_and_mutate_row_opt(&req, call_opts(self.metadata.clone()))
},
retryable_error(self.metrics.clone()),
)
.await
.map_err(error::BigTableError::Write)?;
debug!("🉑 Predicate Matched: {}", &resp.get_predicate_matched(),);
Expand Down Expand Up @@ -659,16 +749,26 @@ impl BigtableDb {
/// Recycle check as well, so it has to be fairly low in the implementation
/// stack.
///
pub async fn health_check(&mut self, table_name: &str) -> DbResult<bool> {
pub async fn health_check(
&mut self,
table_name: &str,
metrics: Arc<StatsdClient>,
) -> DbResult<bool> {
// Create a request that is GRPC valid, but does not point to a valid row.
let mut req = read_row_request(table_name, "NOT FOUND");
let mut filter = data::RowFilter::default();
filter.set_block_all_filter(true);
req.set_filter(filter);

let r = self
.conn
.read_rows_opt(&req, call_opts(self.metadata.clone()))
let r = retry_policy(RETRY_COUNT)
.retry_if(
|| async {
self.conn
.read_rows_opt(&req, call_opts(self.metadata.clone()))
},
retryable_error(metrics.clone()),
)
.await
.map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?;

let (v, _stream) = r.into_future().await;
Expand Down Expand Up @@ -1192,7 +1292,7 @@ impl DbClient for BigTableClientImpl {
self.pool
.get()
.await?
.health_check(&self.settings.table_name)
.health_check(&self.settings.table_name, self.metrics.clone())
.await
}

Expand Down
7 changes: 7 additions & 0 deletions autopush-common/src/db/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ use crate::db::bigtable::bigtable_client::MetadataBuilder;
use crate::db::error::DbError;
use crate::util::deserialize_opt_u32_to_duration;

fn retry_default() -> usize {
bigtable_client::RETRY_COUNT
}

/// The settings for accessing the BigTable contents.
#[derive(Clone, Debug, Deserialize)]
pub struct BigTableDbSettings {
Expand Down Expand Up @@ -75,6 +79,9 @@ pub struct BigTableDbSettings {
/// Include route to leader header in metadata
#[serde(default)]
pub route_to_leader: bool,
/// Number of times to retry a GRPC function
#[serde(default = "retry_default")]
pub retry_count: usize,
}

impl BigTableDbSettings {
Expand Down
13 changes: 10 additions & 3 deletions autopush-common/src/db/bigtable/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,12 @@ impl BigTablePool {
debug!("🉑 connection string {}", &connection);

// Construct a new manager and put them in a pool for handling future requests.
let manager =
BigtableClientManager::new(&bt_settings, settings.dsn.clone(), connection.clone())?;
let manager = BigtableClientManager::new(
&bt_settings,
settings.dsn.clone(),
connection.clone(),
metrics.clone(),
)?;
let mut config = PoolConfig::default();
if let Some(size) = bt_settings.database_pool_max_size {
debug!("🏊 Setting pool max size {}", &size);
Expand Down Expand Up @@ -119,18 +123,21 @@ pub struct BigtableClientManager {
settings: BigTableDbSettings,
dsn: Option<String>,
connection: String,
metrics: Arc<StatsdClient>,
}

impl BigtableClientManager {
fn new(
settings: &BigTableDbSettings,
dsn: Option<String>,
connection: String,
metrics: Arc<StatsdClient>,
) -> Result<Self, DbError> {
Ok(Self {
settings: settings.clone(),
dsn,
connection,
metrics,
})
}
}
Expand Down Expand Up @@ -182,7 +189,7 @@ impl Manager for BigtableClientManager {
// note, this changes to `blocks_in_conditions` for 1.76+
#[allow(clippy::blocks_in_conditions)]
if !client
.health_check(&self.settings.table_name)
.health_check(&self.settings.table_name, self.metrics.clone())
.await
.map_err(|e| {
debug!("🏊 Recycle requested (health). {:?}", e);
Expand Down