From 96859c9ed9e8eb81690748c3e0866872f1b6d7e3 Mon Sep 17 00:00:00 2001 From: Ethan Donowitz Date: Tue, 19 Apr 2022 16:09:17 -0400 Subject: [PATCH] feat: spawn Tokenserver pool reporter --- src/db/mock.rs | 10 ++++++---- src/db/mod.rs | 27 +++++++++++++++++++++------ src/db/mysql/pool.rs | 16 +++++++--------- src/db/results.rs | 7 ------- src/db/spanner/manager/bb8.rs | 2 +- src/db/spanner/manager/deadpool.rs | 2 +- src/db/spanner/pool.rs | 12 +++++++----- src/server/mod.rs | 19 +++++++++++++++++-- src/tokenserver/db/mock.rs | 8 +++++++- src/tokenserver/db/pool.rs | 16 ++++++++++++++-- src/web/handlers.rs | 2 +- 11 files changed, 82 insertions(+), 39 deletions(-) diff --git a/src/db/mock.rs b/src/db/mock.rs index 4fa666a288..08691106bc 100644 --- a/src/db/mock.rs +++ b/src/db/mock.rs @@ -20,10 +20,6 @@ impl DbPool for MockDbPool { Ok(Box::new(MockDb::new()) as Box>) } - fn state(&self) -> results::PoolState { - results::PoolState::default() - } - fn validate_batch_id(&self, _: params::ValidateBatchId) -> Result<(), DbError> { Ok(()) } @@ -33,6 +29,12 @@ impl DbPool for MockDbPool { } } +impl GetPoolState for MockDbPool { + fn state(&self) -> PoolState { + PoolState::default() + } +} + #[derive(Clone, Debug)] pub struct MockDb; diff --git a/src/db/mod.rs b/src/db/mod.rs index e2eb8fe857..f8b240f91a 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -60,11 +60,9 @@ pub const BATCH_LIFETIME: i64 = 2 * 60 * 60 * 1000; // 2 hours, in milliseconds type DbFuture<'a, T> = LocalBoxFuture<'a, Result>; #[async_trait] -pub trait DbPool: Sync + Send + Debug { +pub trait DbPool: Sync + Send + Debug + GetPoolState { async fn get(&self) -> ApiResult>>; - fn state(&self) -> results::PoolState; - fn validate_batch_id(&self, params: params::ValidateBatchId) -> Result<(), DbError>; fn box_clone(&self) -> Box; @@ -76,6 +74,23 @@ impl Clone for Box { } } +pub trait GetPoolState { + fn state(&self) -> PoolState; +} + +impl GetPoolState for Box { + fn state(&self) -> PoolState { + (**self).state() + } +} + +#[derive(Debug, Default)] +/// A mockable r2d2::State +pub struct PoolState { + pub connections: u32, + pub idle_connections: u32, +} + pub trait Db<'a>: Debug + 'a { fn lock_for_read(&self, params: params::LockCollection) -> DbFuture<'_, ()>; @@ -280,10 +295,10 @@ pub async fn pool_from_settings( } /// Emit DbPool metrics periodically -pub fn spawn_pool_periodic_reporter( +pub fn spawn_pool_periodic_reporter( interval: Duration, metrics: StatsdClient, - pool: Box, + pool: T, ) -> Result<(), DbError> { let hostname = hostname::get() .expect("Couldn't get hostname") @@ -291,7 +306,7 @@ pub fn spawn_pool_periodic_reporter( .expect("Couldn't get hostname"); actix_rt::spawn(async move { loop { - let results::PoolState { + let PoolState { connections, idle_connections, } = pool.state(); diff --git a/src/db/mysql/pool.rs b/src/db/mysql/pool.rs index b69a1fdc3b..ed17ae5dde 100644 --- a/src/db/mysql/pool.rs +++ b/src/db/mysql/pool.rs @@ -20,11 +20,7 @@ use diesel_logger::LoggingConnection; use super::models::{MysqlDb, Result}; #[cfg(test)] use super::test::TestTransactionCustomizer; -use crate::db::{ - error::DbError, - results::{self, PoolState}, - Db, DbPool, STD_COLLS, -}; +use crate::db::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS}; use crate::error::{ApiError, ApiResult}; use crate::server::metrics::Metrics; use crate::settings::{Quota, Settings}; @@ -114,10 +110,6 @@ impl DbPool for MysqlDbPool { Ok(Box::new(db) as Box>) } - fn state(&self) -> results::PoolState { - self.pool.state().into() - } - fn validate_batch_id(&self, id: String) -> Result<()> { super::batch::validate_batch_id(&id) } @@ -127,6 +119,12 @@ impl DbPool for MysqlDbPool { } } +impl GetPoolState for MysqlDbPool { + fn state(&self) -> PoolState { + self.pool.state().into() + } +} + impl fmt::Debug for MysqlDbPool { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("MysqlDbPool") diff --git a/src/db/results.rs b/src/db/results.rs index 3879e7c931..9bcefb0994 100644 --- a/src/db/results.rs +++ b/src/db/results.rs @@ -80,13 +80,6 @@ pub struct PostBsos { pub failed: HashMap, } -#[derive(Debug, Default)] -/// A mockable r2d2::State -pub struct PoolState { - pub connections: u32, - pub idle_connections: u32, -} - #[derive(Debug, Default)] pub struct ConnectionInfo { pub age: i64, diff --git a/src/db/spanner/manager/bb8.rs b/src/db/spanner/manager/bb8.rs index 701df73ad3..7ad6d7f8b5 100644 --- a/src/db/spanner/manager/bb8.rs +++ b/src/db/spanner/manager/bb8.rs @@ -8,7 +8,7 @@ use grpcio::{EnvBuilder, Environment}; use crate::{ db::{ error::{DbError, DbErrorKind}, - results::PoolState, + PoolState, }, server::metrics::Metrics, settings::Settings, diff --git a/src/db/spanner/manager/deadpool.rs b/src/db/spanner/manager/deadpool.rs index 8dae9b9201..74eef40c9d 100644 --- a/src/db/spanner/manager/deadpool.rs +++ b/src/db/spanner/manager/deadpool.rs @@ -7,7 +7,7 @@ use grpcio::{EnvBuilder, Environment}; use crate::{ db::{ error::{DbError, DbErrorKind}, - results::PoolState, + PoolState, }, server::metrics::Metrics, settings::Settings, diff --git a/src/db/spanner/pool.rs b/src/db/spanner/pool.rs index f1eaf28621..77b6149cfd 100644 --- a/src/db/spanner/pool.rs +++ b/src/db/spanner/pool.rs @@ -5,7 +5,7 @@ use bb8::ErrorSink; use tokio::sync::RwLock; use crate::{ - db::{error::DbError, results, Db, DbPool, STD_COLLS}, + db::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS}, error::ApiResult, server::metrics::Metrics, settings::{Quota, Settings}, @@ -100,10 +100,6 @@ impl DbPool for SpannerDbPool { .map_err(Into::into) } - fn state(&self) -> results::PoolState { - self.pool.status().into() - } - fn validate_batch_id(&self, id: String) -> Result<()> { super::batch::validate_batch_id(&id) } @@ -113,6 +109,12 @@ impl DbPool for SpannerDbPool { } } +impl GetPoolState for SpannerDbPool { + fn state(&self) -> PoolState { + self.pool.status().into() + } +} + impl fmt::Debug for SpannerDbPool { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("SpannerDbPool") diff --git a/src/server/mod.rs b/src/server/mod.rs index 553e04d263..cfd50b035b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -244,14 +244,22 @@ impl Server { ..Default::default() })); let tokenserver_state = if settings.tokenserver.enabled { - Some(tokenserver::ServerState::from_settings( + let state = tokenserver::ServerState::from_settings( &settings.tokenserver, metrics::metrics_from_opts( &settings.tokenserver.statsd_label, settings.statsd_host.as_deref(), settings.statsd_port, )?, - )?) + )?; + + spawn_pool_periodic_reporter( + Duration::from_secs(10), + *state.metrics.clone(), + state.db_pool.clone(), + )?; + + Some(state) } else { None }; @@ -304,6 +312,13 @@ impl Server { settings.statsd_port, )?, )?; + + spawn_pool_periodic_reporter( + Duration::from_secs(10), + *tokenserver_state.metrics.clone(), + tokenserver_state.db_pool.clone(), + )?; + let server = HttpServer::new(move || { build_app_without_syncstorage!( Some(tokenserver_state.clone()), diff --git a/src/tokenserver/db/mock.rs b/src/tokenserver/db/mock.rs index 4975b993ec..d702b05ecb 100644 --- a/src/tokenserver/db/mock.rs +++ b/src/tokenserver/db/mock.rs @@ -7,7 +7,7 @@ use super::models::{Db, DbFuture}; use super::params; use super::pool::DbPool; use super::results; -use crate::db::error::DbError; +use crate::db::{error::DbError, GetPoolState, PoolState}; #[derive(Clone, Debug)] pub struct MockDbPool; @@ -29,6 +29,12 @@ impl DbPool for MockDbPool { } } +impl GetPoolState for MockDbPool { + fn state(&self) -> PoolState { + PoolState::default() + } +} + #[derive(Clone, Debug)] pub struct MockDb; diff --git a/src/tokenserver/db/pool.rs b/src/tokenserver/db/pool.rs index e9d0b6d081..e2088af295 100644 --- a/src/tokenserver/db/pool.rs +++ b/src/tokenserver/db/pool.rs @@ -8,7 +8,7 @@ use diesel_logger::LoggingConnection; use std::time::Duration; use super::models::{Db, DbResult, TokenserverDb}; -use crate::db::{error::DbError, DbErrorKind}; +use crate::db::{error::DbError, DbErrorKind, GetPoolState, PoolState}; use crate::diesel::Connection; use crate::server::metrics::Metrics; use crate::tokenserver::settings::Settings; @@ -112,12 +112,24 @@ impl DbPool for TokenserverPool { } #[async_trait] -pub trait DbPool: Sync + Send { +pub trait DbPool: Sync + Send + GetPoolState { async fn get(&self) -> Result, DbError>; fn box_clone(&self) -> Box; } +impl GetPoolState for TokenserverPool { + fn state(&self) -> PoolState { + self.inner.state().into() + } +} + +impl GetPoolState for Box { + fn state(&self) -> PoolState { + (**self).state() + } +} + impl Clone for Box { fn clone(&self) -> Box { self.box_clone() diff --git a/src/web/handlers.rs b/src/web/handlers.rs index 21bdadb756..7a972eec97 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -639,7 +639,7 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result { let deadarc = state.deadman.clone(); let mut deadman = *deadarc.read().await; let db_state = if cfg!(test) { - use crate::db::results::PoolState; + use crate::db::PoolState; use actix_web::http::header::HeaderValue; use std::str::FromStr;