Skip to content

Commit

Permalink
feat: spawn Tokenserver pool reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
Ethan Donowitz committed Apr 26, 2022
1 parent 1a197a6 commit 96859c9
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 39 deletions.
10 changes: 6 additions & 4 deletions src/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ impl DbPool for MockDbPool {
Ok(Box::new(MockDb::new()) as Box<dyn Db<'a>>)
}

fn state(&self) -> results::PoolState {
results::PoolState::default()
}

fn validate_batch_id(&self, _: params::ValidateBatchId) -> Result<(), DbError> {
Ok(())
}
Expand All @@ -33,6 +29,12 @@ impl DbPool for MockDbPool {
}
}

impl GetPoolState for MockDbPool {
fn state(&self) -> PoolState {
PoolState::default()
}
}

#[derive(Clone, Debug)]
pub struct MockDb;

Expand Down
27 changes: 21 additions & 6 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ pub const BATCH_LIFETIME: i64 = 2 * 60 * 60 * 1000; // 2 hours, in milliseconds
type DbFuture<'a, T> = LocalBoxFuture<'a, Result<T, ApiError>>;

#[async_trait]
pub trait DbPool: Sync + Send + Debug {
pub trait DbPool: Sync + Send + Debug + GetPoolState {
async fn get(&self) -> ApiResult<Box<dyn Db<'_>>>;

fn state(&self) -> results::PoolState;

fn validate_batch_id(&self, params: params::ValidateBatchId) -> Result<(), DbError>;

fn box_clone(&self) -> Box<dyn DbPool>;
Expand All @@ -76,6 +74,23 @@ impl Clone for Box<dyn DbPool> {
}
}

pub trait GetPoolState {
fn state(&self) -> PoolState;
}

impl GetPoolState for Box<dyn DbPool> {
fn state(&self) -> PoolState {
(**self).state()
}
}

#[derive(Debug, Default)]
/// A mockable r2d2::State
pub struct PoolState {
pub connections: u32,
pub idle_connections: u32,
}

pub trait Db<'a>: Debug + 'a {
fn lock_for_read(&self, params: params::LockCollection) -> DbFuture<'_, ()>;

Expand Down Expand Up @@ -280,18 +295,18 @@ pub async fn pool_from_settings(
}

/// Emit DbPool metrics periodically
pub fn spawn_pool_periodic_reporter(
pub fn spawn_pool_periodic_reporter<T: GetPoolState + 'static>(
interval: Duration,
metrics: StatsdClient,
pool: Box<dyn DbPool>,
pool: T,
) -> Result<(), DbError> {
let hostname = hostname::get()
.expect("Couldn't get hostname")
.into_string()
.expect("Couldn't get hostname");
actix_rt::spawn(async move {
loop {
let results::PoolState {
let PoolState {
connections,
idle_connections,
} = pool.state();
Expand Down
16 changes: 7 additions & 9 deletions src/db/mysql/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ use diesel_logger::LoggingConnection;
use super::models::{MysqlDb, Result};
#[cfg(test)]
use super::test::TestTransactionCustomizer;
use crate::db::{
error::DbError,
results::{self, PoolState},
Db, DbPool, STD_COLLS,
};
use crate::db::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS};
use crate::error::{ApiError, ApiResult};
use crate::server::metrics::Metrics;
use crate::settings::{Quota, Settings};
Expand Down Expand Up @@ -114,10 +110,6 @@ impl DbPool for MysqlDbPool {
Ok(Box::new(db) as Box<dyn Db<'a>>)
}

fn state(&self) -> results::PoolState {
self.pool.state().into()
}

fn validate_batch_id(&self, id: String) -> Result<()> {
super::batch::validate_batch_id(&id)
}
Expand All @@ -127,6 +119,12 @@ impl DbPool for MysqlDbPool {
}
}

impl GetPoolState for MysqlDbPool {
fn state(&self) -> PoolState {
self.pool.state().into()
}
}

impl fmt::Debug for MysqlDbPool {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("MysqlDbPool")
Expand Down
7 changes: 0 additions & 7 deletions src/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ pub struct PostBsos {
pub failed: HashMap<String, String>,
}

#[derive(Debug, Default)]
/// A mockable r2d2::State
pub struct PoolState {
pub connections: u32,
pub idle_connections: u32,
}

#[derive(Debug, Default)]
pub struct ConnectionInfo {
pub age: i64,
Expand Down
2 changes: 1 addition & 1 deletion src/db/spanner/manager/bb8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use grpcio::{EnvBuilder, Environment};
use crate::{
db::{
error::{DbError, DbErrorKind},
results::PoolState,
PoolState,
},
server::metrics::Metrics,
settings::Settings,
Expand Down
2 changes: 1 addition & 1 deletion src/db/spanner/manager/deadpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use grpcio::{EnvBuilder, Environment};
use crate::{
db::{
error::{DbError, DbErrorKind},
results::PoolState,
PoolState,
},
server::metrics::Metrics,
settings::Settings,
Expand Down
12 changes: 7 additions & 5 deletions src/db/spanner/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bb8::ErrorSink;
use tokio::sync::RwLock;

use crate::{
db::{error::DbError, results, Db, DbPool, STD_COLLS},
db::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS},
error::ApiResult,
server::metrics::Metrics,
settings::{Quota, Settings},
Expand Down Expand Up @@ -100,10 +100,6 @@ impl DbPool for SpannerDbPool {
.map_err(Into::into)
}

fn state(&self) -> results::PoolState {
self.pool.status().into()
}

fn validate_batch_id(&self, id: String) -> Result<()> {
super::batch::validate_batch_id(&id)
}
Expand All @@ -113,6 +109,12 @@ impl DbPool for SpannerDbPool {
}
}

impl GetPoolState for SpannerDbPool {
fn state(&self) -> PoolState {
self.pool.status().into()
}
}

impl fmt::Debug for SpannerDbPool {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("SpannerDbPool")
Expand Down
19 changes: 17 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,22 @@ impl Server {
..Default::default()
}));
let tokenserver_state = if settings.tokenserver.enabled {
Some(tokenserver::ServerState::from_settings(
let state = tokenserver::ServerState::from_settings(
&settings.tokenserver,
metrics::metrics_from_opts(
&settings.tokenserver.statsd_label,
settings.statsd_host.as_deref(),
settings.statsd_port,
)?,
)?)
)?;

spawn_pool_periodic_reporter(
Duration::from_secs(10),
*state.metrics.clone(),
state.db_pool.clone(),
)?;

Some(state)
} else {
None
};
Expand Down Expand Up @@ -304,6 +312,13 @@ impl Server {
settings.statsd_port,
)?,
)?;

spawn_pool_periodic_reporter(
Duration::from_secs(10),
*tokenserver_state.metrics.clone(),
tokenserver_state.db_pool.clone(),
)?;

let server = HttpServer::new(move || {
build_app_without_syncstorage!(
Some(tokenserver_state.clone()),
Expand Down
8 changes: 7 additions & 1 deletion src/tokenserver/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::models::{Db, DbFuture};
use super::params;
use super::pool::DbPool;
use super::results;
use crate::db::error::DbError;
use crate::db::{error::DbError, GetPoolState, PoolState};

#[derive(Clone, Debug)]
pub struct MockDbPool;
Expand All @@ -29,6 +29,12 @@ impl DbPool for MockDbPool {
}
}

impl GetPoolState for MockDbPool {
fn state(&self) -> PoolState {
PoolState::default()
}
}

#[derive(Clone, Debug)]
pub struct MockDb;

Expand Down
16 changes: 14 additions & 2 deletions src/tokenserver/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use diesel_logger::LoggingConnection;
use std::time::Duration;

use super::models::{Db, DbResult, TokenserverDb};
use crate::db::{error::DbError, DbErrorKind};
use crate::db::{error::DbError, DbErrorKind, GetPoolState, PoolState};
use crate::diesel::Connection;
use crate::server::metrics::Metrics;
use crate::tokenserver::settings::Settings;
Expand Down Expand Up @@ -112,12 +112,24 @@ impl DbPool for TokenserverPool {
}

#[async_trait]
pub trait DbPool: Sync + Send {
pub trait DbPool: Sync + Send + GetPoolState {
async fn get(&self) -> Result<Box<dyn Db>, DbError>;

fn box_clone(&self) -> Box<dyn DbPool>;
}

impl GetPoolState for TokenserverPool {
fn state(&self) -> PoolState {
self.inner.state().into()
}
}

impl GetPoolState for Box<dyn DbPool> {
fn state(&self) -> PoolState {
(**self).state()
}
}

impl Clone for Box<dyn DbPool> {
fn clone(&self) -> Box<dyn DbPool> {
self.box_clone()
Expand Down
2 changes: 1 addition & 1 deletion src/web/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result<HttpResponse, Error> {
let deadarc = state.deadman.clone();
let mut deadman = *deadarc.read().await;
let db_state = if cfg!(test) {
use crate::db::results::PoolState;
use crate::db::PoolState;
use actix_web::http::header::HeaderValue;
use std::str::FromStr;

Expand Down

0 comments on commit 96859c9

Please sign in to comment.