diff --git a/Cargo.lock b/Cargo.lock index 52bcf35f0..c98c8290f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -750,6 +750,7 @@ dependencies = [ "log", "mockall", "mockito", + "num_cpus", "openssl", "protobuf", "rand 0.8.5", diff --git a/autoconnect/src/main.rs b/autoconnect/src/main.rs index 845cb6ae1..420112f91 100644 --- a/autoconnect/src/main.rs +++ b/autoconnect/src/main.rs @@ -4,7 +4,7 @@ #[macro_use] extern crate slog_scope; -use std::{env, vec::Vec}; +use std::{env, time::Duration, vec::Vec}; use actix_http::HttpService; use actix_server::Server; @@ -16,6 +16,7 @@ use serde::Deserialize; use autoconnect_settings::{AppState, Settings}; use autoconnect_web::{build_app, config, config_router}; use autopush_common::{ + db::spawn_pool_periodic_reporter, errors::{ApcErrorKind, Result}, logging, }; @@ -79,12 +80,17 @@ async fn main() -> Result<()> { let actix_workers = settings.actix_workers; let app_state = AppState::from_settings(settings)?; app_state.init_and_spawn_megaphone_updater().await?; + spawn_pool_periodic_reporter( + Duration::from_secs(10), + app_state.db.clone(), + app_state.metrics.clone(), + ); info!( - "Starting autoconnect on port: {} router_port: {} (available_parallelism: {:?})", + "Starting autoconnect on port: {} router_port: {} ({})", port, router_port, - std::thread::available_parallelism() + logging::parallelism_banner() ); let router_app_state = app_state.clone(); diff --git a/autoendpoint/src/main.rs b/autoendpoint/src/main.rs index 630239e2d..702ba61d2 100644 --- a/autoendpoint/src/main.rs +++ b/autoendpoint/src/main.rs @@ -59,9 +59,9 @@ async fn main() -> Result<(), Box> { .await .expect("Could not start server"); info!( - "Starting autoendpoint on port: {} (available_parallelism: {:?})", + "Starting autoendpoint on port: {} ({})", host_port, - std::thread::available_parallelism() + logging::parallelism_banner() ); server.await?; diff --git a/autoendpoint/src/server.rs b/autoendpoint/src/server.rs index 1b6c506ee..3be7cedbd 100644 --- a/autoendpoint/src/server.rs +++ b/autoendpoint/src/server.rs @@ -7,19 +7,18 @@ use actix_cors::Cors; use actix_web::{ dev, http::StatusCode, middleware::ErrorHandlers, web, web::Data, App, HttpServer, }; -#[cfg(feature = "bigtable")] -use autopush_common::db::bigtable::BigTableClientImpl; -#[cfg(feature = "dual")] -use autopush_common::db::dual::DualClientImpl; use cadence::StatsdClient; use fernet::MultiFernet; use serde_json::json; +#[cfg(feature = "bigtable")] +use autopush_common::db::bigtable::BigTableClientImpl; +#[cfg(feature = "dual")] +use autopush_common::db::dual::DualClientImpl; #[cfg(feature = "dynamodb")] use autopush_common::db::dynamodb::DdbClientImpl; - use autopush_common::{ - db::{client::DbClient, DbSettings, StorageType}, + db::{client::DbClient, spawn_pool_periodic_reporter, DbSettings, StorageType}, middleware::sentry::SentryWrapper, }; @@ -130,6 +129,12 @@ impl Server { adm_router, }; + spawn_pool_periodic_reporter( + Duration::from_secs(10), + app_state.db.clone(), + app_state.metrics.clone(), + ); + let server = HttpServer::new(move || { // These have a bad habit of being reset. Specify them explicitly. let cors = Cors::default() diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index 14e3d6f30..9f065bf23 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -59,6 +59,7 @@ async-trait = "0.1" deadpool = { version = "0.10", features = ["managed", "rt_tokio_1"] } gethostname = "0.4" futures-backoff = "0.1.0" +num_cpus = "1.16" woothee = "0.13" # #[cfg(bigtable)] for this section. diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index a5b30ddd0..ef9350fce 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -1199,6 +1199,10 @@ impl DbClient for BigTableClientImpl { fn name(&self) -> String { "Bigtable".to_owned() } + + fn pool_status(&self) -> Option { + Some(self.pool.pool.status()) + } } #[cfg(all(test, feature = "emulator"))] diff --git a/autopush-common/src/db/client.rs b/autopush-common/src/db/client.rs index 42e5e66f6..477b11401 100644 --- a/autopush-common/src/db/client.rs +++ b/autopush-common/src/db/client.rs @@ -109,12 +109,17 @@ pub trait DbClient: Send + Sync { #[allow(clippy::needless_lifetimes)] fn rotating_message_table<'a>(&'a self) -> Option<&'a str>; - fn box_clone(&self) -> Box; - /// Provide the module name. /// This was added for simple dual mode testing, but may be useful in /// other situations. fn name(&self) -> String; + + /// Return the current deadpool Status (if using deadpool) + fn pool_status(&self) -> Option { + None + } + + fn box_clone(&self) -> Box; } impl Clone for Box { diff --git a/autopush-common/src/db/dual/mod.rs b/autopush-common/src/db/dual/mod.rs index 336d9e0c5..3fd104e99 100644 --- a/autopush-common/src/db/dual/mod.rs +++ b/autopush-common/src/db/dual/mod.rs @@ -459,6 +459,10 @@ impl DbClient for DualClientImpl { fn name(&self) -> String { "Dual".to_owned() } + + fn pool_status(&self) -> Option { + self.primary.pool_status() + } } #[cfg(all(test, feature = "bigtable", feature = "dynamodb"))] diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 6a3fe29bb..3eb7a0064 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -31,12 +31,15 @@ pub mod dual; pub mod dynamodb; pub mod error; pub mod models; +pub mod reporter; pub mod routing; mod util; // used by integration testing pub mod mock; +pub use reporter::spawn_pool_periodic_reporter; + use crate::errors::{ApcErrorKind, Result}; use crate::notification::{Notification, STANDARD_NOTIFICATION_PREFIX, TOPIC_NOTIFICATION_PREFIX}; use crate::util::timing::{ms_since_epoch, sec_since_epoch}; diff --git a/autopush-common/src/db/reporter.rs b/autopush-common/src/db/reporter.rs new file mode 100644 index 000000000..69da365d9 --- /dev/null +++ b/autopush-common/src/db/reporter.rs @@ -0,0 +1,43 @@ +use std::{sync::Arc, time::Duration}; + +use actix_web::rt; +use cadence::{Gauged, StatsdClient}; +use gethostname::gethostname; + +use super::client::DbClient; + +/// Emit db pool (deadpool) metrics periodically +pub fn spawn_pool_periodic_reporter( + interval: Duration, + db: Box, + metrics: Arc, +) { + let hostname = gethostname().to_string_lossy().to_string(); + rt::spawn(async move { + loop { + pool_periodic_reporter(&*db, &metrics, &hostname); + rt::time::sleep(interval).await; + } + }); +} + +fn pool_periodic_reporter(db: &dyn DbClient, metrics: &StatsdClient, hostname: &str) { + let Some(status) = db.pool_status() else { + return; + }; + metrics + .gauge_with_tags( + "database.pool.active", + (status.size - status.available) as u64, + ) + .with_tag("hostname", hostname) + .send(); + metrics + .gauge_with_tags("database.pool.idle", status.available as u64) + .with_tag("hostname", hostname) + .send(); + metrics + .gauge_with_tags("database.pool.waiting", status.waiting as u64) + .with_tag("hostname", hostname) + .send(); +} diff --git a/autopush-common/src/logging.rs b/autopush-common/src/logging.rs index 4e1634bcd..32fefe881 100644 --- a/autopush-common/src/logging.rs +++ b/autopush-common/src/logging.rs @@ -83,3 +83,13 @@ fn get_ec2_instance_id() -> reqwest::Result { .error_for_status()? .text() } + +/// Return parallelism/number of CPU information to log at startup +pub fn parallelism_banner() -> String { + format!( + "available_parallelism: {:?} num_cpus: {} num_cpus (phys): {}", + std::thread::available_parallelism(), + num_cpus::get(), + num_cpus::get_physical() + ) +}