From 92a8a664b3278a101c30d2ed689f832ef7b7b733 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Mon, 26 Feb 2024 11:59:47 -0800 Subject: [PATCH 1/2] feat: report db pool metrics and support configuring all of deadpool's Timeouts Closes: SYNC-4158 --- Cargo.lock | 1 + autoconnect/src/main.rs | 12 ++++-- autoendpoint/src/main.rs | 4 +- autoendpoint/src/server.rs | 17 +++++--- autopush-common/Cargo.toml | 1 + .../src/db/bigtable/bigtable_client/mod.rs | 4 ++ autopush-common/src/db/bigtable/mod.rs | 24 +++++++++-- autopush-common/src/db/bigtable/pool.rs | 16 +++---- autopush-common/src/db/client.rs | 9 +++- autopush-common/src/db/dual/mod.rs | 4 ++ autopush-common/src/db/mod.rs | 3 ++ autopush-common/src/db/reporter.rs | 43 +++++++++++++++++++ autopush-common/src/logging.rs | 10 +++++ 13 files changed, 122 insertions(+), 26 deletions(-) create mode 100644 autopush-common/src/db/reporter.rs diff --git a/Cargo.lock b/Cargo.lock index 30bbdb946..8fdbbd6d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -750,6 +750,7 @@ dependencies = [ "log", "mockall", "mockito", + "num_cpus", "openssl", "protobuf", "rand 0.8.5", diff --git a/autoconnect/src/main.rs b/autoconnect/src/main.rs index 845cb6ae1..420112f91 100644 --- a/autoconnect/src/main.rs +++ b/autoconnect/src/main.rs @@ -4,7 +4,7 @@ #[macro_use] extern crate slog_scope; -use std::{env, vec::Vec}; +use std::{env, time::Duration, vec::Vec}; use actix_http::HttpService; use actix_server::Server; @@ -16,6 +16,7 @@ use serde::Deserialize; use autoconnect_settings::{AppState, Settings}; use autoconnect_web::{build_app, config, config_router}; use autopush_common::{ + db::spawn_pool_periodic_reporter, errors::{ApcErrorKind, Result}, logging, }; @@ -79,12 +80,17 @@ async fn main() -> Result<()> { let actix_workers = settings.actix_workers; let app_state = AppState::from_settings(settings)?; app_state.init_and_spawn_megaphone_updater().await?; + spawn_pool_periodic_reporter( + Duration::from_secs(10), + app_state.db.clone(), + app_state.metrics.clone(), + ); info!( - "Starting autoconnect on port: {} router_port: {} (available_parallelism: {:?})", + "Starting autoconnect on port: {} router_port: {} ({})", port, router_port, - std::thread::available_parallelism() + logging::parallelism_banner() ); let router_app_state = app_state.clone(); diff --git a/autoendpoint/src/main.rs b/autoendpoint/src/main.rs index 630239e2d..702ba61d2 100644 --- a/autoendpoint/src/main.rs +++ b/autoendpoint/src/main.rs @@ -59,9 +59,9 @@ async fn main() -> Result<(), Box> { .await .expect("Could not start server"); info!( - "Starting autoendpoint on port: {} (available_parallelism: {:?})", + "Starting autoendpoint on port: {} ({})", host_port, - std::thread::available_parallelism() + logging::parallelism_banner() ); server.await?; diff --git a/autoendpoint/src/server.rs b/autoendpoint/src/server.rs index 1b6c506ee..3be7cedbd 100644 --- a/autoendpoint/src/server.rs +++ b/autoendpoint/src/server.rs @@ -7,19 +7,18 @@ use actix_cors::Cors; use actix_web::{ dev, http::StatusCode, middleware::ErrorHandlers, web, web::Data, App, HttpServer, }; -#[cfg(feature = "bigtable")] -use autopush_common::db::bigtable::BigTableClientImpl; -#[cfg(feature = "dual")] -use autopush_common::db::dual::DualClientImpl; use cadence::StatsdClient; use fernet::MultiFernet; use serde_json::json; +#[cfg(feature = "bigtable")] +use autopush_common::db::bigtable::BigTableClientImpl; +#[cfg(feature = "dual")] +use autopush_common::db::dual::DualClientImpl; #[cfg(feature = "dynamodb")] use autopush_common::db::dynamodb::DdbClientImpl; - use autopush_common::{ - db::{client::DbClient, DbSettings, StorageType}, + db::{client::DbClient, spawn_pool_periodic_reporter, DbSettings, StorageType}, middleware::sentry::SentryWrapper, }; @@ -130,6 +129,12 @@ impl Server { adm_router, }; + spawn_pool_periodic_reporter( + Duration::from_secs(10), + app_state.db.clone(), + app_state.metrics.clone(), + ); + let server = HttpServer::new(move || { // These have a bad habit of being reset. Specify them explicitly. let cors = Cors::default() diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index b6e2f3a4d..78dde9e47 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -59,6 +59,7 @@ async-trait = "0.1" deadpool = "0.10" gethostname = "0.4" futures-backoff = "0.1.0" +num_cpus = "1.16" woothee = "0.13" # #[cfg(bigtable)] for this section. diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index a5b30ddd0..ef9350fce 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -1199,6 +1199,10 @@ impl DbClient for BigTableClientImpl { fn name(&self) -> String { "Bigtable".to_owned() } + + fn pool_status(&self) -> Option { + Some(self.pool.pool.status()) + } } #[cfg(all(test, feature = "emulator"))] diff --git a/autopush-common/src/db/bigtable/mod.rs b/autopush-common/src/db/bigtable/mod.rs index 2dd2eda24..418eebb01 100644 --- a/autopush-common/src/db/bigtable/mod.rs +++ b/autopush-common/src/db/bigtable/mod.rs @@ -26,7 +26,7 @@ pub use bigtable_client::error::BigTableError; pub use bigtable_client::BigTableClientImpl; use grpcio::Metadata; -use serde::Deserialize; +use serde::{Deserialize, Deserializer}; use std::time::Duration; use crate::db::bigtable::bigtable_client::MetadataBuilder; @@ -54,8 +54,16 @@ pub struct BigTableDbSettings { pub database_pool_max_size: Option, /// Max time (in seconds) to wait for a database connection #[serde(default)] - #[serde(deserialize_with = "deserialize_u32_to_duration")] - pub database_pool_connection_timeout: Duration, + #[serde(deserialize_with = "deserialize_u32_to_duration_opt")] + pub database_pool_connection_wait_timeout: Option, + /// Max time (in seconds) to wait when creating a database connection + #[serde(default)] + #[serde(deserialize_with = "deserialize_u32_to_duration_opt")] + pub database_pool_connection_create_timeout: Option, + /// Max time (in seconds) to wait when recycling a database connection + #[serde(default)] + #[serde(deserialize_with = "deserialize_u32_to_duration_opt")] + pub database_pool_connection_recycle_timeout: Option, /// Max time (in seconds) a connection should live #[serde(default)] #[serde(deserialize_with = "deserialize_u32_to_duration")] @@ -110,3 +118,13 @@ impl TryFrom<&str> for BigTableDbSettings { Ok(me) } } + +pub fn deserialize_u32_to_duration_opt<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let maybe_seconds: Option = Option::deserialize(deserializer)?; + Ok(maybe_seconds.map(|s| Duration::from_secs(s.into()))) +} diff --git a/autopush-common/src/db/bigtable/pool.rs b/autopush-common/src/db/bigtable/pool.rs index 17b166423..b76db6d5f 100644 --- a/autopush-common/src/db/bigtable/pool.rs +++ b/autopush-common/src/db/bigtable/pool.rs @@ -94,16 +94,12 @@ impl BigTablePool { debug!("🏊 Setting pool max size {}", &size); config.max_size = size as usize; }; - if !bt_settings.database_pool_connection_timeout.is_zero() { - debug!( - "🏊 Setting connection timeout to {} seconds", - &bt_settings.database_pool_connection_timeout.as_secs() - ); - config.timeouts = Timeouts { - create: Some(bt_settings.database_pool_connection_timeout), - ..Default::default() - }; - } + config.timeouts = Timeouts { + wait: bt_settings.database_pool_connection_wait_timeout, + create: bt_settings.database_pool_connection_create_timeout, + recycle: bt_settings.database_pool_connection_recycle_timeout, + }; + debug!("🏊 Setting pool timeouts to {:?}", &config.timeouts); let pool = deadpool::managed::Pool::builder(manager) .config(config) .build() diff --git a/autopush-common/src/db/client.rs b/autopush-common/src/db/client.rs index 42e5e66f6..477b11401 100644 --- a/autopush-common/src/db/client.rs +++ b/autopush-common/src/db/client.rs @@ -109,12 +109,17 @@ pub trait DbClient: Send + Sync { #[allow(clippy::needless_lifetimes)] fn rotating_message_table<'a>(&'a self) -> Option<&'a str>; - fn box_clone(&self) -> Box; - /// Provide the module name. /// This was added for simple dual mode testing, but may be useful in /// other situations. fn name(&self) -> String; + + /// Return the current deadpool Status (if using deadpool) + fn pool_status(&self) -> Option { + None + } + + fn box_clone(&self) -> Box; } impl Clone for Box { diff --git a/autopush-common/src/db/dual/mod.rs b/autopush-common/src/db/dual/mod.rs index 336d9e0c5..3fd104e99 100644 --- a/autopush-common/src/db/dual/mod.rs +++ b/autopush-common/src/db/dual/mod.rs @@ -459,6 +459,10 @@ impl DbClient for DualClientImpl { fn name(&self) -> String { "Dual".to_owned() } + + fn pool_status(&self) -> Option { + self.primary.pool_status() + } } #[cfg(all(test, feature = "bigtable", feature = "dynamodb"))] diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 6a3fe29bb..3eb7a0064 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -31,12 +31,15 @@ pub mod dual; pub mod dynamodb; pub mod error; pub mod models; +pub mod reporter; pub mod routing; mod util; // used by integration testing pub mod mock; +pub use reporter::spawn_pool_periodic_reporter; + use crate::errors::{ApcErrorKind, Result}; use crate::notification::{Notification, STANDARD_NOTIFICATION_PREFIX, TOPIC_NOTIFICATION_PREFIX}; use crate::util::timing::{ms_since_epoch, sec_since_epoch}; diff --git a/autopush-common/src/db/reporter.rs b/autopush-common/src/db/reporter.rs new file mode 100644 index 000000000..69da365d9 --- /dev/null +++ b/autopush-common/src/db/reporter.rs @@ -0,0 +1,43 @@ +use std::{sync::Arc, time::Duration}; + +use actix_web::rt; +use cadence::{Gauged, StatsdClient}; +use gethostname::gethostname; + +use super::client::DbClient; + +/// Emit db pool (deadpool) metrics periodically +pub fn spawn_pool_periodic_reporter( + interval: Duration, + db: Box, + metrics: Arc, +) { + let hostname = gethostname().to_string_lossy().to_string(); + rt::spawn(async move { + loop { + pool_periodic_reporter(&*db, &metrics, &hostname); + rt::time::sleep(interval).await; + } + }); +} + +fn pool_periodic_reporter(db: &dyn DbClient, metrics: &StatsdClient, hostname: &str) { + let Some(status) = db.pool_status() else { + return; + }; + metrics + .gauge_with_tags( + "database.pool.active", + (status.size - status.available) as u64, + ) + .with_tag("hostname", hostname) + .send(); + metrics + .gauge_with_tags("database.pool.idle", status.available as u64) + .with_tag("hostname", hostname) + .send(); + metrics + .gauge_with_tags("database.pool.waiting", status.waiting as u64) + .with_tag("hostname", hostname) + .send(); +} diff --git a/autopush-common/src/logging.rs b/autopush-common/src/logging.rs index 4e1634bcd..32fefe881 100644 --- a/autopush-common/src/logging.rs +++ b/autopush-common/src/logging.rs @@ -83,3 +83,13 @@ fn get_ec2_instance_id() -> reqwest::Result { .error_for_status()? .text() } + +/// Return parallelism/number of CPU information to log at startup +pub fn parallelism_banner() -> String { + format!( + "available_parallelism: {:?} num_cpus: {} num_cpus (phys): {}", + std::thread::available_parallelism(), + num_cpus::get(), + num_cpus::get_physical() + ) +} From e86cd084522ff56bb9183509c617881b25d24061 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Mon, 26 Feb 2024 16:14:57 -0800 Subject: [PATCH 2/2] clippy --- autopush-common/src/db/bigtable/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autopush-common/src/db/bigtable/mod.rs b/autopush-common/src/db/bigtable/mod.rs index d5afc5cef..05c79ec9d 100644 --- a/autopush-common/src/db/bigtable/mod.rs +++ b/autopush-common/src/db/bigtable/mod.rs @@ -26,7 +26,7 @@ pub use bigtable_client::error::BigTableError; pub use bigtable_client::BigTableClientImpl; use grpcio::Metadata; -use serde::{Deserialize, Deserializer}; +use serde::Deserialize; use std::time::Duration; use crate::db::bigtable::bigtable_client::MetadataBuilder;