Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Tokenserver metrics #1200

Merged
merged 5 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions src/server/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::net::UdpSocket;
use std::time::Instant;

use actix_web::{error::ErrorInternalServerError, web::Data, Error, HttpRequest};
use actix_web::{
dev::Payload, error::ErrorInternalServerError, web::Data, Error, FromRequest, HttpRequest,
};
use cadence::{
BufferedUdpMetricSink, Counted, Metric, NopMetricSink, QueuingMetricSink, StatsdClient, Timed,
};
use futures::future;
use futures::future::Ready;

use crate::error::ApiError;
use crate::server::ServerState;
use crate::settings::Settings;
use crate::web::tags::Tags;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -54,12 +57,17 @@ impl Drop for Metrics {
}
}

impl From<&HttpRequest> for Metrics {
fn from(req: &HttpRequest) -> Self {
impl FromRequest for Metrics {
type Config = ();
type Error = Error;
type Future = Ready<Result<Self, Self::Error>>;

fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let exts = req.extensions();
let def_tags = Tags::from(req.head());
let tags = exts.get::<Tags>().unwrap_or(&def_tags);
Metrics {

future::ok(Metrics {
client: match req.app_data::<Data<ServerState>>() {
Some(v) => Some(*v.metrics.clone()),
None => {
Expand All @@ -69,7 +77,7 @@ impl From<&HttpRequest> for Metrics {
},
tags: Some(tags.clone()),
timer: None,
}
})
}
}

Expand Down Expand Up @@ -167,18 +175,21 @@ pub fn metrics_from_req(req: &HttpRequest) -> Result<Box<StatsdClient>, Error> {
.clone())
}

/// Create a cadence StatsdClient from the given options
pub fn metrics_from_opts(opts: &Settings) -> Result<StatsdClient, ApiError> {
let builder = if let Some(statsd_host) = opts.statsd_host.as_ref() {
pub fn metrics_from_opts(
label: &str,
host: Option<&str>,
port: u16,
) -> Result<StatsdClient, ApiError> {
let builder = if let Some(statsd_host) = host {
let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.set_nonblocking(true)?;

let host = (statsd_host.as_str(), opts.statsd_port);
let host = (statsd_host, port);
let udp_sink = BufferedUdpMetricSink::from(host, socket)?;
let sink = QueuingMetricSink::from(udp_sink);
StatsdClient::builder(opts.statsd_label.as_ref(), sink)
StatsdClient::builder(label, sink)
} else {
StatsdClient::builder(opts.statsd_label.as_ref(), NopMetricSink)
StatsdClient::builder(label, NopMetricSink)
};
Ok(builder
.with_error_handler(|err| {
Expand Down
24 changes: 21 additions & 3 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ macro_rules! build_app {
.wrap(ErrorHandlers::new().handler(StatusCode::NOT_FOUND, ApiError::render_404))
// These are our wrappers
.wrap(middleware::weave::WeaveTimestamp::new())
.wrap(tokenserver::logging::LoggingWrapper::new())
.wrap(middleware::sentry::SentryWrapper::default())
.wrap(middleware::rejectua::RejectUA::default())
.wrap($cors)
Expand Down Expand Up @@ -176,6 +177,7 @@ macro_rules! build_app_without_syncstorage {
.wrap(ErrorHandlers::new().handler(StatusCode::NOT_FOUND, ApiError::render_404))
// These are our wrappers
.wrap(middleware::sentry::SentryWrapper::default())
.wrap(tokenserver::logging::LoggingWrapper::new())
.wrap(middleware::rejectua::RejectUA::default())
// Followed by the "official middleware" so they run first.
// actix is getting increasingly tighter about CORS headers. Our server is
Expand Down Expand Up @@ -222,7 +224,11 @@ macro_rules! build_app_without_syncstorage {
impl Server {
pub async fn with_settings(settings: Settings) -> Result<dev::Server, ApiError> {
let settings_copy = settings.clone();
let metrics = metrics::metrics_from_opts(&settings)?;
let metrics = metrics::metrics_from_opts(
&settings.statsd_label,
settings.statsd_host.as_deref(),
settings.statsd_port,
)?;
let host = settings.host.clone();
let port = settings.port;
let db_pool = pool_from_settings(&settings, &Metrics::from(&metrics)).await?;
Expand All @@ -239,6 +245,11 @@ impl Server {
let tokenserver_state = if settings.tokenserver.enabled {
Some(tokenserver::ServerState::from_settings(
&settings.tokenserver,
metrics::metrics_from_opts(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tokenserver will have its own separate metrics client -- is that okay? It seems like it shouldn't be a problem to me, but I wanted to double-check

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it's not a big deal for now.

&settings.tokenserver.statsd_label,
settings.statsd_host.as_deref(),
settings.statsd_port,
)?,
)?)
} else {
None
Expand Down Expand Up @@ -283,8 +294,15 @@ impl Server {
let settings_copy = settings.clone();
let host = settings.host.clone();
let port = settings.port;
let secrets = Arc::new(settings.master_secret);
let tokenserver_state = tokenserver::ServerState::from_settings(&settings.tokenserver)?;
let secrets = Arc::new(settings.master_secret.clone());
let tokenserver_state = tokenserver::ServerState::from_settings(
&settings.tokenserver,
metrics::metrics_from_opts(
&settings.tokenserver.statsd_label,
settings.statsd_host.as_deref(),
settings.statsd_port,
)?,
)?;
let server = HttpServer::new(move || {
build_app_without_syncstorage!(
Some(tokenserver_state.clone()),
Expand Down
1 change: 1 addition & 0 deletions src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl Settings {
s.set_default("tokenserver.fxa_metrics_hash_secret", "secret")?;
s.set_default("tokenserver.test_mode_enabled", false)?;
s.set_default("tokenserver.node_type", "spanner")?;
s.set_default("tokenserver.statsd_label", "syncstorage.tokenserver")?;

// Set Cors defaults
s.set_default(
Expand Down
22 changes: 20 additions & 2 deletions src/tokenserver/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{
use super::{params, results};
use crate::db::error::{DbError, DbErrorKind};
use crate::error::ApiError;
use crate::server::metrics::Metrics;
use crate::sync_db_method;

/// The maximum possible generation number. Used as a tombstone to mark users that have been
Expand All @@ -39,6 +40,7 @@ pub struct TokenserverDb {
/// conn. structs are !Sync (Arc requires both for Send). See the Send impl
/// below.
pub(super) inner: Arc<DbInner>,
metrics: Metrics,
}

/// Despite the db conn structs being !Sync (see Arc<MysqlDbInner> above) we
Expand All @@ -61,7 +63,7 @@ impl TokenserverDb {
// get IDs from records created during other requests.
const LAST_INSERT_ID_QUERY: &'static str = "SELECT LAST_INSERT_ID() AS id";

pub fn new(conn: Conn) -> Self {
pub fn new(conn: Conn, metrics: &Metrics) -> Self {
let inner = DbInner {
#[cfg(not(test))]
conn,
Expand All @@ -71,6 +73,7 @@ impl TokenserverDb {

Self {
inner: Arc::new(inner),
metrics: metrics.clone(),
}
}

Expand Down Expand Up @@ -211,6 +214,9 @@ impl TokenserverDb {
"#;
const DEFAULT_CAPACITY_RELEASE_RATE: f32 = 0.1;

let mut metrics = self.metrics.clone();
metrics.start_timer("tokenserver.storage.get_best_node", None);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may have intended this but I just wanted to note that Cadence hardcodes a prefix that we set to the value of Settings::statsd_label so this emits "syncstorage.tokenserver.storage.get_best_node".

We could overwrite tokenserver's statsd_label to use "tokenserver" instead or look to the future with "syncstorage.tokenserver" or maybe it makes more sense to keep it as you have it 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooo I didn't notice this. I think it makes sense to overwrite statsd_label to "syncstorage.tokenserver" for Tokenserver. I'll make the change


// We may have to retry the query if we need to release more capacity. This loop allows
// a maximum of five retries before bailing out.
for _ in 0..5 {
Expand Down Expand Up @@ -310,6 +316,7 @@ impl TokenserverDb {
keys_changed_at: params.keys_changed_at,
created_at: allocate_user_result.created_at,
replaced_at: None,
first_seen_at: allocate_user_result.created_at,
old_client_states: vec![],
})
} else {
Expand Down Expand Up @@ -342,6 +349,8 @@ impl TokenserverDb {
}
}

let first_seen_at = raw_users[raw_users.len() - 1].created_at;

match (raw_user.replaced_at, raw_user.node) {
// If the most up-to-date user is marked as replaced or does not have a node
// assignment, allocate a new user. Note that, if the current user is marked
Expand Down Expand Up @@ -371,6 +380,7 @@ impl TokenserverDb {
keys_changed_at: raw_user.keys_changed_at,
created_at: allocate_user_result.created_at,
replaced_at: None,
first_seen_at,
old_client_states,
})
}
Expand All @@ -385,6 +395,7 @@ impl TokenserverDb {
keys_changed_at: raw_user.keys_changed_at,
created_at: raw_user.created_at,
replaced_at: None,
first_seen_at,
old_client_states,
}),
// The most up-to-date user doesn't have a node and is retired.
Expand All @@ -395,6 +406,9 @@ impl TokenserverDb {

/// Creates a new user and assigns them to a node.
fn allocate_user_sync(&self, params: params::AllocateUser) -> DbResult<results::AllocateUser> {
let mut metrics = self.metrics.clone();
metrics.start_timer("tokenserver.storage.allocate_user", None);

// Get the least-loaded node
let node = self.get_best_node_sync(params::GetBestNode {
service_id: params.service_id,
Expand Down Expand Up @@ -1914,6 +1928,10 @@ mod tests {
let tokenserver_settings = test_settings().tokenserver;
let use_test_transactions = true;

TokenserverPool::new(&tokenserver_settings, use_test_transactions)
TokenserverPool::new(
&tokenserver_settings,
&Metrics::noop(),
use_test_transactions,
)
}
}
18 changes: 14 additions & 4 deletions src/tokenserver/db/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::time::Duration;

use super::models::{Db, DbResult, TokenserverDb};
use crate::db::{error::DbError, DbErrorKind};
use crate::server::metrics::Metrics;
use crate::tokenserver::settings::Settings;

#[cfg(test)]
Expand All @@ -37,10 +38,15 @@ pub fn run_embedded_migrations(database_url: &str) -> DbResult<()> {
pub struct TokenserverPool {
/// Pool of db connections
inner: Pool<ConnectionManager<MysqlConnection>>,
metrics: Metrics,
}

impl TokenserverPool {
pub fn new(settings: &Settings, _use_test_transactions: bool) -> DbResult<Self> {
pub fn new(
settings: &Settings,
metrics: &Metrics,
_use_test_transactions: bool,
) -> DbResult<Self> {
run_embedded_migrations(&settings.database_url)?;

let manager = ConnectionManager::<MysqlConnection>::new(settings.database_url.clone());
Expand All @@ -60,21 +66,22 @@ impl TokenserverPool {

Ok(Self {
inner: builder.build(manager)?,
metrics: metrics.clone(),
})
}

pub fn get_sync(&self) -> Result<TokenserverDb, DbError> {
let conn = self.inner.get().map_err(DbError::from)?;

Ok(TokenserverDb::new(conn))
Ok(TokenserverDb::new(conn, &self.metrics))
}

#[cfg(test)]
pub async fn get_tokenserver_db(&self) -> Result<TokenserverDb, DbError> {
let pool = self.clone();
let conn = block(move || pool.inner.get().map_err(DbError::from)).await?;

Ok(TokenserverDb::new(conn))
Ok(TokenserverDb::new(conn, &self.metrics))
}
}

Expand All @@ -92,10 +99,13 @@ impl From<actix_web::error::BlockingError<DbError>> for DbError {
#[async_trait]
impl DbPool for TokenserverPool {
async fn get(&self) -> Result<Box<dyn Db>, DbError> {
let mut metrics = self.metrics.clone();
metrics.start_timer("tokenserver.storage.get_pool", None);

let pool = self.clone();
let conn = block(move || pool.inner.get().map_err(DbError::from)).await?;

Ok(Box::new(TokenserverDb::new(conn)) as Box<dyn Db>)
Ok(Box::new(TokenserverDb::new(conn, &self.metrics)) as Box<dyn Db>)
}

fn box_clone(&self) -> Box<dyn DbPool> {
Expand Down
1 change: 1 addition & 0 deletions src/tokenserver/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct GetOrCreateUser {
pub keys_changed_at: Option<i64>,
pub created_at: i64,
pub replaced_at: Option<i64>,
pub first_seen_at: i64,
pub old_client_states: Vec<String>,
}

Expand Down
Loading