Skip to content

Commit

Permalink
feat: add Tokenserver metrics (#1200)
Browse files Browse the repository at this point in the history
Closes #1108
  • Loading branch information
ethowitz authored Jan 25, 2022
1 parent a7c5f80 commit aa93312
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 35 deletions.
35 changes: 23 additions & 12 deletions src/server/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::net::UdpSocket;
use std::time::Instant;

use actix_web::{error::ErrorInternalServerError, web::Data, Error, HttpRequest};
use actix_web::{
dev::Payload, error::ErrorInternalServerError, web::Data, Error, FromRequest, HttpRequest,
};
use cadence::{
BufferedUdpMetricSink, Counted, Metric, NopMetricSink, QueuingMetricSink, StatsdClient, Timed,
};
use futures::future;
use futures::future::Ready;

use crate::error::ApiError;
use crate::server::ServerState;
use crate::settings::Settings;
use crate::web::tags::Tags;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -54,12 +57,17 @@ impl Drop for Metrics {
}
}

impl From<&HttpRequest> for Metrics {
fn from(req: &HttpRequest) -> Self {
impl FromRequest for Metrics {
type Config = ();
type Error = Error;
type Future = Ready<Result<Self, Self::Error>>;

fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let exts = req.extensions();
let def_tags = Tags::from(req.head());
let tags = exts.get::<Tags>().unwrap_or(&def_tags);
Metrics {

future::ok(Metrics {
client: match req.app_data::<Data<ServerState>>() {
Some(v) => Some(*v.metrics.clone()),
None => {
Expand All @@ -69,7 +77,7 @@ impl From<&HttpRequest> for Metrics {
},
tags: Some(tags.clone()),
timer: None,
}
})
}
}

Expand Down Expand Up @@ -167,18 +175,21 @@ pub fn metrics_from_req(req: &HttpRequest) -> Result<Box<StatsdClient>, Error> {
.clone())
}

/// Create a cadence StatsdClient from the given options
pub fn metrics_from_opts(opts: &Settings) -> Result<StatsdClient, ApiError> {
let builder = if let Some(statsd_host) = opts.statsd_host.as_ref() {
pub fn metrics_from_opts(
label: &str,
host: Option<&str>,
port: u16,
) -> Result<StatsdClient, ApiError> {
let builder = if let Some(statsd_host) = host {
let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.set_nonblocking(true)?;

let host = (statsd_host.as_str(), opts.statsd_port);
let host = (statsd_host, port);
let udp_sink = BufferedUdpMetricSink::from(host, socket)?;
let sink = QueuingMetricSink::from(udp_sink);
StatsdClient::builder(opts.statsd_label.as_ref(), sink)
StatsdClient::builder(label, sink)
} else {
StatsdClient::builder(opts.statsd_label.as_ref(), NopMetricSink)
StatsdClient::builder(label, NopMetricSink)
};
Ok(builder
.with_error_handler(|err| {
Expand Down
24 changes: 21 additions & 3 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ macro_rules! build_app {
.wrap(ErrorHandlers::new().handler(StatusCode::NOT_FOUND, ApiError::render_404))
// These are our wrappers
.wrap(middleware::weave::WeaveTimestamp::new())
.wrap(tokenserver::logging::LoggingWrapper::new())
.wrap(middleware::sentry::SentryWrapper::default())
.wrap(middleware::rejectua::RejectUA::default())
.wrap($cors)
Expand Down Expand Up @@ -176,6 +177,7 @@ macro_rules! build_app_without_syncstorage {
.wrap(ErrorHandlers::new().handler(StatusCode::NOT_FOUND, ApiError::render_404))
// These are our wrappers
.wrap(middleware::sentry::SentryWrapper::default())
.wrap(tokenserver::logging::LoggingWrapper::new())
.wrap(middleware::rejectua::RejectUA::default())
// Followed by the "official middleware" so they run first.
// actix is getting increasingly tighter about CORS headers. Our server is
Expand Down Expand Up @@ -222,7 +224,11 @@ macro_rules! build_app_without_syncstorage {
impl Server {
pub async fn with_settings(settings: Settings) -> Result<dev::Server, ApiError> {
let settings_copy = settings.clone();
let metrics = metrics::metrics_from_opts(&settings)?;
let metrics = metrics::metrics_from_opts(
&settings.statsd_label,
settings.statsd_host.as_deref(),
settings.statsd_port,
)?;
let host = settings.host.clone();
let port = settings.port;
let db_pool = pool_from_settings(&settings, &Metrics::from(&metrics)).await?;
Expand All @@ -239,6 +245,11 @@ impl Server {
let tokenserver_state = if settings.tokenserver.enabled {
Some(tokenserver::ServerState::from_settings(
&settings.tokenserver,
metrics::metrics_from_opts(
&settings.tokenserver.statsd_label,
settings.statsd_host.as_deref(),
settings.statsd_port,
)?,
)?)
} else {
None
Expand Down Expand Up @@ -283,8 +294,15 @@ impl Server {
let settings_copy = settings.clone();
let host = settings.host.clone();
let port = settings.port;
let secrets = Arc::new(settings.master_secret);
let tokenserver_state = tokenserver::ServerState::from_settings(&settings.tokenserver)?;
let secrets = Arc::new(settings.master_secret.clone());
let tokenserver_state = tokenserver::ServerState::from_settings(
&settings.tokenserver,
metrics::metrics_from_opts(
&settings.tokenserver.statsd_label,
settings.statsd_host.as_deref(),
settings.statsd_port,
)?,
)?;
let server = HttpServer::new(move || {
build_app_without_syncstorage!(
Some(tokenserver_state.clone()),
Expand Down
1 change: 1 addition & 0 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl Settings {
s.set_default("tokenserver.fxa_metrics_hash_secret", "secret")?;
s.set_default("tokenserver.test_mode_enabled", false)?;
s.set_default("tokenserver.node_type", "spanner")?;
s.set_default("tokenserver.statsd_label", "syncstorage.tokenserver")?;

// Set Cors defaults
s.set_default(
Expand Down
22 changes: 20 additions & 2 deletions src/tokenserver/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{
use super::{params, results};
use crate::db::error::{DbError, DbErrorKind};
use crate::error::ApiError;
use crate::server::metrics::Metrics;
use crate::sync_db_method;

/// The maximum possible generation number. Used as a tombstone to mark users that have been
Expand All @@ -39,6 +40,7 @@ pub struct TokenserverDb {
/// conn. structs are !Sync (Arc requires both for Send). See the Send impl
/// below.
pub(super) inner: Arc<DbInner>,
metrics: Metrics,
}

/// Despite the db conn structs being !Sync (see Arc<MysqlDbInner> above) we
Expand All @@ -61,7 +63,7 @@ impl TokenserverDb {
// get IDs from records created during other requests.
const LAST_INSERT_ID_QUERY: &'static str = "SELECT LAST_INSERT_ID() AS id";

pub fn new(conn: Conn) -> Self {
pub fn new(conn: Conn, metrics: &Metrics) -> Self {
let inner = DbInner {
#[cfg(not(test))]
conn,
Expand All @@ -71,6 +73,7 @@ impl TokenserverDb {

Self {
inner: Arc::new(inner),
metrics: metrics.clone(),
}
}

Expand Down Expand Up @@ -211,6 +214,9 @@ impl TokenserverDb {
"#;
const DEFAULT_CAPACITY_RELEASE_RATE: f32 = 0.1;

let mut metrics = self.metrics.clone();
metrics.start_timer("tokenserver.storage.get_best_node", None);

// We may have to retry the query if we need to release more capacity. This loop allows
// a maximum of five retries before bailing out.
for _ in 0..5 {
Expand Down Expand Up @@ -310,6 +316,7 @@ impl TokenserverDb {
keys_changed_at: params.keys_changed_at,
created_at: allocate_user_result.created_at,
replaced_at: None,
first_seen_at: allocate_user_result.created_at,
old_client_states: vec![],
})
} else {
Expand Down Expand Up @@ -342,6 +349,8 @@ impl TokenserverDb {
}
}

let first_seen_at = raw_users[raw_users.len() - 1].created_at;

match (raw_user.replaced_at, raw_user.node) {
// If the most up-to-date user is marked as replaced or does not have a node
// assignment, allocate a new user. Note that, if the current user is marked
Expand Down Expand Up @@ -371,6 +380,7 @@ impl TokenserverDb {
keys_changed_at: raw_user.keys_changed_at,
created_at: allocate_user_result.created_at,
replaced_at: None,
first_seen_at,
old_client_states,
})
}
Expand All @@ -385,6 +395,7 @@ impl TokenserverDb {
keys_changed_at: raw_user.keys_changed_at,
created_at: raw_user.created_at,
replaced_at: None,
first_seen_at,
old_client_states,
}),
// The most up-to-date user doesn't have a node and is retired.
Expand All @@ -395,6 +406,9 @@ impl TokenserverDb {

/// Creates a new user and assigns them to a node.
fn allocate_user_sync(&self, params: params::AllocateUser) -> DbResult<results::AllocateUser> {
let mut metrics = self.metrics.clone();
metrics.start_timer("tokenserver.storage.allocate_user", None);

// Get the least-loaded node
let node = self.get_best_node_sync(params::GetBestNode {
service_id: params.service_id,
Expand Down Expand Up @@ -1914,6 +1928,10 @@ mod tests {
let tokenserver_settings = test_settings().tokenserver;
let use_test_transactions = true;

TokenserverPool::new(&tokenserver_settings, use_test_transactions)
TokenserverPool::new(
&tokenserver_settings,
&Metrics::noop(),
use_test_transactions,
)
}
}
18 changes: 14 additions & 4 deletions src/tokenserver/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::time::Duration;

use super::models::{Db, DbResult, TokenserverDb};
use crate::db::{error::DbError, DbErrorKind};
use crate::server::metrics::Metrics;
use crate::tokenserver::settings::Settings;

#[cfg(test)]
Expand All @@ -37,10 +38,15 @@ pub fn run_embedded_migrations(database_url: &str) -> DbResult<()> {
pub struct TokenserverPool {
/// Pool of db connections
inner: Pool<ConnectionManager<MysqlConnection>>,
metrics: Metrics,
}

impl TokenserverPool {
pub fn new(settings: &Settings, _use_test_transactions: bool) -> DbResult<Self> {
pub fn new(
settings: &Settings,
metrics: &Metrics,
_use_test_transactions: bool,
) -> DbResult<Self> {
run_embedded_migrations(&settings.database_url)?;

let manager = ConnectionManager::<MysqlConnection>::new(settings.database_url.clone());
Expand All @@ -60,21 +66,22 @@ impl TokenserverPool {

Ok(Self {
inner: builder.build(manager)?,
metrics: metrics.clone(),
})
}

pub fn get_sync(&self) -> Result<TokenserverDb, DbError> {
let conn = self.inner.get().map_err(DbError::from)?;

Ok(TokenserverDb::new(conn))
Ok(TokenserverDb::new(conn, &self.metrics))
}

#[cfg(test)]
pub async fn get_tokenserver_db(&self) -> Result<TokenserverDb, DbError> {
let pool = self.clone();
let conn = block(move || pool.inner.get().map_err(DbError::from)).await?;

Ok(TokenserverDb::new(conn))
Ok(TokenserverDb::new(conn, &self.metrics))
}
}

Expand All @@ -92,10 +99,13 @@ impl From<actix_web::error::BlockingError<DbError>> for DbError {
#[async_trait]
impl DbPool for TokenserverPool {
async fn get(&self) -> Result<Box<dyn Db>, DbError> {
let mut metrics = self.metrics.clone();
metrics.start_timer("tokenserver.storage.get_pool", None);

let pool = self.clone();
let conn = block(move || pool.inner.get().map_err(DbError::from)).await?;

Ok(Box::new(TokenserverDb::new(conn)) as Box<dyn Db>)
Ok(Box::new(TokenserverDb::new(conn, &self.metrics)) as Box<dyn Db>)
}

fn box_clone(&self) -> Box<dyn DbPool> {
Expand Down
1 change: 1 addition & 0 deletions src/tokenserver/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct GetOrCreateUser {
pub keys_changed_at: Option<i64>,
pub created_at: i64,
pub replaced_at: Option<i64>,
pub first_seen_at: i64,
pub old_client_states: Vec<String>,
}

Expand Down
Loading

0 comments on commit aa93312

Please sign in to comment.