Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: spawn Tokenserver pool reporter #1283

Merged
merged 1 commit into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ impl DbPool for MockDbPool {
Ok(Box::new(MockDb::new()) as Box<dyn Db<'a>>)
}

fn state(&self) -> results::PoolState {
results::PoolState::default()
}

fn validate_batch_id(&self, _: params::ValidateBatchId) -> Result<(), DbError> {
Ok(())
}
Expand All @@ -33,6 +29,12 @@ impl DbPool for MockDbPool {
}
}

impl GetPoolState for MockDbPool {
fn state(&self) -> PoolState {
PoolState::default()
}
}

#[derive(Clone, Debug)]
pub struct MockDb;

Expand Down
27 changes: 21 additions & 6 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ pub const BATCH_LIFETIME: i64 = 2 * 60 * 60 * 1000; // 2 hours, in milliseconds
type DbFuture<'a, T> = LocalBoxFuture<'a, Result<T, ApiError>>;

#[async_trait]
pub trait DbPool: Sync + Send + Debug {
pub trait DbPool: Sync + Send + Debug + GetPoolState {
async fn get(&self) -> ApiResult<Box<dyn Db<'_>>>;

fn state(&self) -> results::PoolState;

fn validate_batch_id(&self, params: params::ValidateBatchId) -> Result<(), DbError>;

fn box_clone(&self) -> Box<dyn DbPool>;
Expand All @@ -76,6 +74,23 @@ impl Clone for Box<dyn DbPool> {
}
}

pub trait GetPoolState {
fn state(&self) -> PoolState;
}

impl GetPoolState for Box<dyn DbPool> {
fn state(&self) -> PoolState {
(**self).state()
}
}

#[derive(Debug, Default)]
/// A mockable r2d2::State
pub struct PoolState {
pub connections: u32,
pub idle_connections: u32,
}

pub trait Db<'a>: Debug + 'a {
fn lock_for_read(&self, params: params::LockCollection) -> DbFuture<'_, ()>;

Expand Down Expand Up @@ -280,18 +295,18 @@ pub async fn pool_from_settings(
}

/// Emit DbPool metrics periodically
pub fn spawn_pool_periodic_reporter(
pub fn spawn_pool_periodic_reporter<T: GetPoolState + 'static>(
interval: Duration,
metrics: StatsdClient,
pool: Box<dyn DbPool>,
pool: T,
) -> Result<(), DbError> {
let hostname = hostname::get()
.expect("Couldn't get hostname")
.into_string()
.expect("Couldn't get hostname");
actix_rt::spawn(async move {
loop {
let results::PoolState {
let PoolState {
connections,
idle_connections,
} = pool.state();
Expand Down
16 changes: 7 additions & 9 deletions src/db/mysql/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ use diesel_logger::LoggingConnection;
use super::models::{MysqlDb, Result};
#[cfg(test)]
use super::test::TestTransactionCustomizer;
use crate::db::{
error::DbError,
results::{self, PoolState},
Db, DbPool, STD_COLLS,
};
use crate::db::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS};
use crate::error::{ApiError, ApiResult};
use crate::server::metrics::Metrics;
use crate::settings::{Quota, Settings};
Expand Down Expand Up @@ -114,10 +110,6 @@ impl DbPool for MysqlDbPool {
Ok(Box::new(db) as Box<dyn Db<'a>>)
}

fn state(&self) -> results::PoolState {
self.pool.state().into()
}

fn validate_batch_id(&self, id: String) -> Result<()> {
super::batch::validate_batch_id(&id)
}
Expand All @@ -127,6 +119,12 @@ impl DbPool for MysqlDbPool {
}
}

impl GetPoolState for MysqlDbPool {
fn state(&self) -> PoolState {
self.pool.state().into()
}
}

impl fmt::Debug for MysqlDbPool {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("MysqlDbPool")
Expand Down
7 changes: 0 additions & 7 deletions src/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ pub struct PostBsos {
pub failed: HashMap<String, String>,
}

#[derive(Debug, Default)]
/// A mockable r2d2::State
pub struct PoolState {
pub connections: u32,
pub idle_connections: u32,
}

#[derive(Debug, Default)]
pub struct ConnectionInfo {
pub age: i64,
Expand Down
2 changes: 1 addition & 1 deletion src/db/spanner/manager/bb8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use grpcio::{EnvBuilder, Environment};
use crate::{
db::{
error::{DbError, DbErrorKind},
results::PoolState,
PoolState,
},
server::metrics::Metrics,
settings::Settings,
Expand Down
2 changes: 1 addition & 1 deletion src/db/spanner/manager/deadpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use grpcio::{EnvBuilder, Environment};
use crate::{
db::{
error::{DbError, DbErrorKind},
results::PoolState,
PoolState,
},
server::metrics::Metrics,
settings::Settings,
Expand Down
12 changes: 7 additions & 5 deletions src/db/spanner/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bb8::ErrorSink;
use tokio::sync::RwLock;

use crate::{
db::{error::DbError, results, Db, DbPool, STD_COLLS},
db::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS},
error::ApiResult,
server::metrics::Metrics,
settings::{Quota, Settings},
Expand Down Expand Up @@ -100,10 +100,6 @@ impl DbPool for SpannerDbPool {
.map_err(Into::into)
}

fn state(&self) -> results::PoolState {
self.pool.status().into()
}

fn validate_batch_id(&self, id: String) -> Result<()> {
super::batch::validate_batch_id(&id)
}
Expand All @@ -113,6 +109,12 @@ impl DbPool for SpannerDbPool {
}
}

impl GetPoolState for SpannerDbPool {
fn state(&self) -> PoolState {
self.pool.status().into()
}
}

impl fmt::Debug for SpannerDbPool {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("SpannerDbPool")
Expand Down
19 changes: 17 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,22 @@ impl Server {
..Default::default()
}));
let tokenserver_state = if settings.tokenserver.enabled {
Some(tokenserver::ServerState::from_settings(
let state = tokenserver::ServerState::from_settings(
&settings.tokenserver,
metrics::metrics_from_opts(
&settings.tokenserver.statsd_label,
settings.statsd_host.as_deref(),
settings.statsd_port,
)?,
)?)
)?;

spawn_pool_periodic_reporter(
Duration::from_secs(10),
*state.metrics.clone(),
state.db_pool.clone(),
)?;

Some(state)
} else {
None
};
Expand Down Expand Up @@ -304,6 +312,13 @@ impl Server {
settings.statsd_port,
)?,
)?;

spawn_pool_periodic_reporter(
Duration::from_secs(10),
*tokenserver_state.metrics.clone(),
tokenserver_state.db_pool.clone(),
)?;

let server = HttpServer::new(move || {
build_app_without_syncstorage!(
Some(tokenserver_state.clone()),
Expand Down
8 changes: 7 additions & 1 deletion src/tokenserver/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::models::{Db, DbFuture};
use super::params;
use super::pool::DbPool;
use super::results;
use crate::db::error::DbError;
use crate::db::{error::DbError, GetPoolState, PoolState};

#[derive(Clone, Debug)]
pub struct MockDbPool;
Expand All @@ -29,6 +29,12 @@ impl DbPool for MockDbPool {
}
}

impl GetPoolState for MockDbPool {
fn state(&self) -> PoolState {
PoolState::default()
}
}

#[derive(Clone, Debug)]
pub struct MockDb;

Expand Down
16 changes: 14 additions & 2 deletions src/tokenserver/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use diesel_logger::LoggingConnection;
use std::time::Duration;

use super::models::{Db, DbResult, TokenserverDb};
use crate::db::{error::DbError, DbErrorKind};
use crate::db::{error::DbError, DbErrorKind, GetPoolState, PoolState};
use crate::diesel::Connection;
use crate::server::metrics::Metrics;
use crate::tokenserver::settings::Settings;
Expand Down Expand Up @@ -112,12 +112,24 @@ impl DbPool for TokenserverPool {
}

#[async_trait]
pub trait DbPool: Sync + Send {
pub trait DbPool: Sync + Send + GetPoolState {
async fn get(&self) -> Result<Box<dyn Db>, DbError>;

fn box_clone(&self) -> Box<dyn DbPool>;
}

impl GetPoolState for TokenserverPool {
fn state(&self) -> PoolState {
self.inner.state().into()
}
}

impl GetPoolState for Box<dyn DbPool> {
fn state(&self) -> PoolState {
(**self).state()
}
}

impl Clone for Box<dyn DbPool> {
fn clone(&self) -> Box<dyn DbPool> {
self.box_clone()
Expand Down
2 changes: 1 addition & 1 deletion src/web/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result<HttpResponse, Error> {
let deadarc = state.deadman.clone();
let mut deadman = *deadarc.read().await;
let db_state = if cfg!(test) {
use crate::db::results::PoolState;
use crate::db::PoolState;
use actix_web::http::header::HeaderValue;
use std::str::FromStr;

Expand Down