From e043fb02dc4ccd13f39c5083c94b7311887f3fef Mon Sep 17 00:00:00 2001 From: JR Conlin Date: Thu, 15 Feb 2024 11:36:37 -0800 Subject: [PATCH] feat: add metadata headers for bigtable (#586) Closes: [SYNC-3967](https://mozilla-hub.atlassian.net/browse/SYNC-3967) [SYNC-3967]: https://mozilla-hub.atlassian.net/browse/SYNC-3967?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ --------- Co-authored-by: Philip Jenvey --- .circleci/config.yml | 4 +- Cargo.lock | 33 +++-- Dockerfile | 3 +- autoconnect/autoconnect-web/src/dockerflow.rs | 2 +- autopush-common/Cargo.toml | 8 +- .../src/db/bigtable/bigtable_client/error.rs | 5 + .../db/bigtable/bigtable_client/metadata.rs | 138 ++++++++++++++++++ .../src/db/bigtable/bigtable_client/mod.rs | 34 ++++- autopush-common/src/db/bigtable/mod.rs | 31 ++++ autopush-common/src/db/bigtable/pool.rs | 4 +- 10 files changed, 231 insertions(+), 31 deletions(-) create mode 100644 autopush-common/src/db/bigtable/bigtable_client/metadata.rs diff --git a/.circleci/config.yml b/.circleci/config.yml index 4ffe4a339..d98cd1b58 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -38,7 +38,7 @@ jobs: audit: docker: # NOTE: update version for all # RUST_VER - - image: rust:1.74 + - image: rust:1.76 auth: username: $DOCKER_USER password: $DOCKER_PASS @@ -133,7 +133,7 @@ jobs: apt install build-essential curl libstdc++6 libstdc++-10-dev libssl-dev pkg-config -y apt install cmake -y # RUST_VER - curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain 1.73 -y + curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain 1.76 -y export PATH=$PATH:$HOME/.cargo/bin echo 'export PATH=$PATH:$HOME/.cargo/bin' >> $BASH_ENV rustc --version diff --git a/Cargo.lock b/Cargo.lock index 56ab294a0..1c4a32b11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -736,6 +736,7 @@ dependencies = [ "config", "deadpool", "fernet", + "form_urlencoded", "futures 0.3.30", "futures-backoff", "futures-util", @@ -2217,9 +2218,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" dependencies = [ "wasm-bindgen", ] @@ -4759,9 +4760,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" dependencies = [ "cfg-if 1.0.0", "wasm-bindgen-macro", @@ -4769,9 +4770,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" dependencies = [ "bumpalo", "log", @@ -4784,9 +4785,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bde2032aeb86bdfaecc8b261eef3cba735cc426c1f3a3416d1e0791be95fc461" +checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" dependencies = [ "cfg-if 1.0.0", "js-sys", @@ -4796,9 +4797,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4806,9 +4807,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2", "quote", @@ -4819,9 +4820,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" [[package]] name = "wasm-timer" @@ -4840,9 +4841,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed" +checksum = "96565907687f7aceb35bc5fc03770a8a0471d82e479f25832f54a0e3f4b28446" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Dockerfile b/Dockerfile index 63881427c..a458bc565 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,6 @@ # NOTE: Ensure builder's Rust version matches CI's in .circleci/config.yml -FROM rust:1.74-buster as builder +# RUST_VER +FROM rust:1.76-buster as builder ARG CRATE ADD . /app diff --git a/autoconnect/autoconnect-web/src/dockerflow.rs b/autoconnect/autoconnect-web/src/dockerflow.rs index 50ec48f8e..b886a9fb7 100644 --- a/autoconnect/autoconnect-web/src/dockerflow.rs +++ b/autoconnect/autoconnect-web/src/dockerflow.rs @@ -27,7 +27,7 @@ pub fn config(config: &mut web::ServiceConfig) { /// Handle the `/health` and `/__heartbeat__` routes // note, this changes to `blocks_in_conditions` for 1.76+ -#[allow(clippy::blocks_in_if_conditions)] +#[allow(clippy::blocks_in_conditions)] pub async fn health_route(state: Data) -> Json { let status = if state .db diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index 891f39f17..b6e2f3a4d 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -68,6 +68,7 @@ google-cloud-rust-raw = { version = "0.16", default-features = false, features = ], optional = true } grpcio = { version = "=0.13.0", features = ["openssl"], optional = true } protobuf = { version = "=2.28.0", optional = true } # grpcio does not support protobuf 3+ +form_urlencoded = { version = "1.2", optional = true } [dev-dependencies] mockito = "0.31" @@ -78,7 +79,12 @@ actix-rt = "2.8" [features] # NOTE: Do not set a `default` here, rather specify them in the calling library. # This is to reduce complexity around feature specification. -bigtable = ["dep:google-cloud-rust-raw", "dep:grpcio", "dep:protobuf"] +bigtable = [ + "dep:google-cloud-rust-raw", + "dep:grpcio", + "dep:protobuf", + "dep:form_urlencoded", +] dynamodb = ["dep:rusoto_core", "dep:rusoto_credential", "dep:rusoto_dynamodb"] dual = ["dynamodb", "bigtable"] aws = [] diff --git a/autopush-common/src/db/bigtable/bigtable_client/error.rs b/autopush-common/src/db/bigtable/bigtable_client/error.rs index 19427b96c..d7cb847ef 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/error.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/error.rs @@ -98,6 +98,9 @@ pub enum BigTableError { #[error("Bigtable write error: {0}")] Write(#[source] grpcio::Error), + #[error("GRPC Error: {0}")] + GRPC(#[source] grpcio::Error), + /// Return a GRPC status code and any message. /// See https://grpc.github.io/grpc/core/md_doc_statuscodes.html #[error("Bigtable status response: {0:?}")] @@ -140,6 +143,7 @@ impl ReportableError for BigTableError { BigTableError::Admin(_, _) => "storage.bigtable.error.admin", BigTableError::Recycle => "storage.bigtable.error.recycle", BigTableError::Pool(_) => "storage.bigtable.error.pool", + BigTableError::GRPC(_) => "storage.bigtable.error.grpc", }; Some(err) } @@ -148,6 +152,7 @@ impl ReportableError for BigTableError { match &self { BigTableError::InvalidRowResponse(s) => vec![("error", s.to_string())], BigTableError::InvalidChunk(s) => vec![("error", s.to_string())], + BigTableError::GRPC(s) => vec![("error", s.to_string())], BigTableError::Read(s) => vec![("error", s.to_string())], BigTableError::Write(s) => vec![("error", s.to_string())], BigTableError::Status(code, s) => { diff --git a/autopush-common/src/db/bigtable/bigtable_client/metadata.rs b/autopush-common/src/db/bigtable/bigtable_client/metadata.rs new file mode 100644 index 000000000..8a891c545 --- /dev/null +++ b/autopush-common/src/db/bigtable/bigtable_client/metadata.rs @@ -0,0 +1,138 @@ +/// gRPC metadata Resource prefix header +/// +/// Generic across Google APIs. This "improves routing by the backend" as +/// described by other clients +const PREFIX_KEY: &str = "google-cloud-resource-prefix"; + +/// gRPC metadata Client information header +/// +/// A `User-Agent` like header, likely its main use is for GCP's metrics +const METRICS_KEY: &str = "x-goog-api-client"; + +/// gRPC metadata Dynamic Routing header: +/// https://google.aip.dev/client-libraries/4222 +/// +/// See the googleapis protobuf for which routing header params are used for +/// each Spanner operation (under the `google.api.http` option). +/// +/// https://github.com/googleapis/googleapis/blob/master/google/spanner/v1/spanner.proto +const ROUTING_KEY: &str = "x-goog-request-params"; + +/// gRPC metadata Leader Aware Routing header +/// +/// Not well documented. Added to clients in early 2023 defaulting to disabled. +/// Clients have began defaulting it to enabled in late 2023. +/// +/// "Enabling leader aware routing would route all requests in RW/PDML +/// transactions to the leader region." as described by other Spanner clients +const LEADER_AWARE_KEY: &str = "x-goog-spanner-route-to-leader"; + +/// The USER_AGENT string is a static value specified by Google. +/// Its meaning is not to be known to the uninitiated. +const USER_AGENT: &str = "gl-external/1.0 gccl/1.0"; + +/// Builds the [grpcio::Metadata] for all db operations +#[derive(Default)] +pub struct MetadataBuilder<'a> { + prefix: &'a str, + routing_params: Vec<(&'a str, &'a str)>, + route_to_leader: bool, +} + +impl<'a> MetadataBuilder<'a> { + /// Initialize a new builder with a [PREFIX_KEY] header for the given + /// resource + pub fn with_prefix(prefix: &'a str) -> Self { + Self { + prefix, + ..Default::default() + } + } + + /// Add a [ROUTING_KEY] header + /// This normally specifies the session name, but unlike spanner, bigtable does not appear to have one of those? + pub fn routing_param(mut self, key: &'a str, value: &'a str) -> Self { + self.routing_params.push((key, value)); + self + } + + /// Toggle the [LEADER_AWARE_KEY] header + pub fn route_to_leader(mut self, route_to_leader: bool) -> Self { + self.route_to_leader = route_to_leader; + self + } + + /// Build the [grpcio::Metadata] + pub fn build(self) -> Result { + let mut meta = grpcio::MetadataBuilder::new(); + + meta.add_str(PREFIX_KEY, self.prefix)?; + meta.add_str(METRICS_KEY, USER_AGENT)?; + if self.route_to_leader { + meta.add_str(LEADER_AWARE_KEY, "true")?; + } + if !self.routing_params.is_empty() { + meta.add_str(ROUTING_KEY, &self.routing_header())?; + } + Ok(meta.build()) + } + + fn routing_header(self) -> String { + let mut ser = form_urlencoded::Serializer::new(String::new()); + for (key, val) in self.routing_params { + ser.append_pair(key, val); + } + // python-spanner (python-api-core) doesn't encode '/': + // https://github.com/googleapis/python-api-core/blob/6251eab/google/api_core/gapic_v1/routing_header.py#L85 + ser.finish().replace("%2F", "/") + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, str}; + + use super::{ + MetadataBuilder, LEADER_AWARE_KEY, METRICS_KEY, PREFIX_KEY, ROUTING_KEY, USER_AGENT, + }; + + // Resource paths should not start with a "/" + pub const DB: &str = "projects/foo/instances/bar/databases/gorp"; + pub const SESSION: &str = "projects/foo/instances/bar/databases/gorp/sessions/f00B4r_quuX"; + + #[test] + fn metadata_basic() { + let meta = MetadataBuilder::with_prefix(DB) + .routing_param("session", SESSION) + .routing_param("foo", "bar baz") + .build() + .unwrap(); + let meta: HashMap<_, _> = meta.into_iter().collect(); + + assert_eq!(meta.len(), 3); + assert_eq!(str::from_utf8(meta.get(PREFIX_KEY).unwrap()).unwrap(), DB); + assert_eq!( + str::from_utf8(meta.get(METRICS_KEY).unwrap()).unwrap(), + USER_AGENT + ); + assert_eq!( + str::from_utf8(meta.get(ROUTING_KEY).unwrap()).unwrap(), + format!("session={SESSION}&foo=bar+baz") + ); + } + + #[test] + fn leader_aware() { + let meta = MetadataBuilder::with_prefix(DB) + .route_to_leader(true) + .build() + .unwrap(); + let meta: HashMap<_, _> = meta.into_iter().collect(); + + assert_eq!(meta.len(), 3); + assert_eq!( + str::from_utf8(meta.get(LEADER_AWARE_KEY).unwrap()).unwrap(), + "true" + ); + } +} diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index ae40dd4b7..59cf9945d 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -14,7 +14,7 @@ use google_cloud_rust_raw::bigtable::v2::bigtable::ReadRowsRequest; use google_cloud_rust_raw::bigtable::v2::bigtable_grpc::BigtableClient; use google_cloud_rust_raw::bigtable::v2::data::{RowFilter, RowFilter_Chain}; use google_cloud_rust_raw::bigtable::v2::{bigtable, data}; -use grpcio::Channel; +use grpcio::{Channel, Metadata}; use protobuf::RepeatedField; use serde_json::{from_str, json}; use uuid::Uuid; @@ -25,6 +25,7 @@ use crate::db::{ DbSettings, Notification, NotificationRecord, User, MAX_CHANNEL_TTL, MAX_ROUTER_TTL, }; +pub use self::metadata::MetadataBuilder; use self::row::Row; use super::pool::BigTablePool; use super::BigTableDbSettings; @@ -32,6 +33,7 @@ use super::BigTableDbSettings; pub mod cell; pub mod error; pub(crate) mod merge; +pub mod metadata; pub mod row; // these are normally Vec @@ -72,6 +74,8 @@ pub struct BigTableClientImpl { _metrics: Arc, /// Connection Channel (used for alternate calls) pool: BigTablePool, + metadata: Metadata, + admin_metadata: Metadata, } /// Return a a RowFilter matching the GC policy of the router Column Family @@ -180,6 +184,10 @@ fn to_string(value: Vec, name: &str) -> Result { }) } +fn call_opts(metadata: Metadata) -> ::grpcio::CallOption { + ::grpcio::CallOption::default().headers(metadata) +} + /// Connect to a BigTable storage model. /// /// BigTable is available via the Google Console, and is a schema less storage system. @@ -211,9 +219,15 @@ impl BigTableClientImpl { let db_settings = BigTableDbSettings::try_from(settings.db_settings.as_ref())?; info!("🉑 {:#?}", db_settings); let pool = BigTablePool::new(settings, &metrics)?; + + // create the metadata header blocks required by Google for accessing GRPC resources. + let metadata = db_settings.metadata()?; + let admin_metadata = db_settings.admin_metadata()?; Ok(Self { settings: db_settings, _metrics: metrics, + metadata, + admin_metadata, pool, }) } @@ -247,7 +261,7 @@ impl BigTableClientImpl { let bigtable = self.pool.get().await?; bigtable .conn - .mutate_row_async(&req) + .mutate_row_async_opt(&req, call_opts(self.metadata.clone())) .map_err(error::BigTableError::Write)? .await .map_err(error::BigTableError::Write)?; @@ -264,7 +278,7 @@ impl BigTableClientImpl { // ClientSStreamReceiver will cancel an operation if it's dropped before it's done. let resp = bigtable .conn - .mutate_rows(&req) + .mutate_rows_opt(&req, call_opts(self.metadata.clone())) .map_err(error::BigTableError::Write)?; // Scan the returned stream looking for errors. @@ -327,7 +341,7 @@ impl BigTableClientImpl { let bigtable = self.pool.get().await?; let resp = bigtable .conn - .read_rows(&req) + .read_rows_opt(&req, call_opts(self.metadata.clone())) .map_err(error::BigTableError::Read)?; merge::RowMerger::process_chunks(resp).await } @@ -406,7 +420,7 @@ impl BigTableClientImpl { let bigtable = self.pool.get().await?; let resp = bigtable .conn - .check_and_mutate_row_async(&req) + .check_and_mutate_row_async_opt(&req, call_opts(self.metadata.clone())) .map_err(error::BigTableError::Write)? .await .map_err(error::BigTableError::Write)?; @@ -438,6 +452,7 @@ impl BigTableClientImpl { Ok(mutations) } + #[allow(unused)] /// Delete all cell data from the specified columns with the optional time range. #[allow(unused)] async fn delete_cells( @@ -472,8 +487,9 @@ impl BigTableClientImpl { let mut req = DropRowRangeRequest::new(); req.set_name(self.settings.table_name.clone()); req.set_row_key_prefix(row_key.as_bytes().to_vec()); + admin - .drop_row_range_async(&req) + .drop_row_range_async_opt(&req, call_opts(self.admin_metadata.clone())) .map_err(|e| { error!("{:?}", e); error::BigTableError::Admin( @@ -606,12 +622,14 @@ impl BigTableClientImpl { #[derive(Clone)] pub struct BigtableDb { pub(super) conn: BigtableClient, + pub(super) metadata: Metadata, } impl BigtableDb { - pub fn new(channel: Channel) -> Self { + pub fn new(channel: Channel, metadata: &Metadata) -> Self { Self { conn: BigtableClient::new(channel), + metadata: metadata.clone(), } } @@ -629,7 +647,7 @@ impl BigtableDb { let r = self .conn - .read_rows(&req) + .read_rows_opt(&req, call_opts(self.metadata.clone())) .map_err(|e| DbError::General(format!("BigTable connectivity error: {:?}", e)))?; let (v, _stream) = r.into_future().await; diff --git a/autopush-common/src/db/bigtable/mod.rs b/autopush-common/src/db/bigtable/mod.rs index b3352996f..2dd2eda24 100644 --- a/autopush-common/src/db/bigtable/mod.rs +++ b/autopush-common/src/db/bigtable/mod.rs @@ -25,9 +25,11 @@ mod pool; pub use bigtable_client::error::BigTableError; pub use bigtable_client::BigTableClientImpl; +use grpcio::Metadata; use serde::Deserialize; use std::time::Duration; +use crate::db::bigtable::bigtable_client::MetadataBuilder; use crate::db::error::DbError; use crate::util::deserialize_u32_to_duration; @@ -62,6 +64,35 @@ pub struct BigTableDbSettings { #[serde(default)] #[serde(deserialize_with = "deserialize_u32_to_duration")] pub database_pool_max_idle: Duration, + /// Include route to leader header in metadata + #[serde(default)] + pub route_to_leader: bool, +} + +impl BigTableDbSettings { + pub fn metadata(&self) -> Result { + MetadataBuilder::with_prefix(&self.table_name) + .routing_param("table_name", &self.table_name) + .route_to_leader(self.route_to_leader) + .build() + .map_err(BigTableError::GRPC) + } + + pub fn admin_metadata(&self) -> Result { + // Admin calls use a slightly different routing param and a truncated prefix + // See https://github.com/googleapis/google-cloud-cpp/issues/190#issuecomment-370520185 + let Some(admin_prefix) = self.table_name.split_once("/tables/").map(|v| v.0) else { + return Err(BigTableError::Admin( + "Invalid table name specified".to_owned(), + None, + )); + }; + MetadataBuilder::with_prefix(admin_prefix) + .routing_param("name", &self.table_name) + .route_to_leader(self.route_to_leader) + .build() + .map_err(BigTableError::GRPC) + } } impl TryFrom<&str> for BigTableDbSettings { diff --git a/autopush-common/src/db/bigtable/pool.rs b/autopush-common/src/db/bigtable/pool.rs index 380dadc8b..17b166423 100644 --- a/autopush-common/src/db/bigtable/pool.rs +++ b/autopush-common/src/db/bigtable/pool.rs @@ -154,7 +154,7 @@ impl Manager for BigtableClientManager { /// `BigtableClient` is the most atomic we can go. async fn create(&self) -> Result { debug!("🏊 Create a new pool entry."); - let entry = BigtableDb::new(self.get_channel()?); + let entry = BigtableDb::new(self.get_channel()?, &self.settings.metadata()?); debug!("🏊 Bigtable connection acquired"); Ok(entry) } @@ -182,7 +182,7 @@ impl Manager for BigtableClientManager { // Clippy 0.1.73 complains about the `.map_err` being hard to read. // note, this changes to `blocks_in_conditions` for 1.76+ - #[allow(clippy::blocks_in_if_conditions)] + #[allow(clippy::blocks_in_conditions)] if !client .health_check(&self.settings.table_name) .await