Skip to content

Commit

Permalink
feat: Add metrics
Browse files Browse the repository at this point in the history
* note: this uses a semi-bastardized version of the metrics library from
  syncstorage-rs. KV doesn't quite work right yet due to ownership
  issues, but that's only for logging issues anyway.
* needs some last pass cleanup

Closes #83
  • Loading branch information
jrconlin committed Dec 2, 2020
1 parent 8689a8c commit 9fdb597
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 7 deletions.
22 changes: 16 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ authors = [
edition = "2018"

[dependencies]
cadence = { version = "0.22"}
diesel = { version = "1.4", features = ["mysql", "r2d2"] }
diesel_migrations = { version = "1.4.0", features = ["mysql"] }
failure = "0.1"
Expand All @@ -19,7 +20,7 @@ rocket_contrib = "0.4"
serde = "1.0"
serde_json = "1.0"
sentry = { version = "0.20", features = ["with_curl_transport"] }
slog = { version = "2.5.2", features = ["nested-values"] }
slog = { version = "2.7", features = ["nested-values"] }
slog-async = { version = "2.5", features = ["nested-values"] }
slog_derive = "0.2.0"
slog-mozlog-json = "0.1.0"
Expand Down
26 changes: 26 additions & 0 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::env;
use std::io::Read;
use std::time::Instant;

use diesel::{dsl::sql, sql_types::Integer, OptionalExtension, QueryDsl, RunQueryDsl};
use failure::ResultExt;
Expand Down Expand Up @@ -28,6 +29,8 @@ use crate::db::{
};
use crate::error::{HandlerError, HandlerErrorKind, HandlerResult, Result, VALIDATION_FAILED};
use crate::logging::{self, RequestLogger};
use crate::metrics::Metrics;
use crate::tags::Tags;

lazy_static! {
static ref URLSAFE_B64_RE: Regex = Regex::new(r"^[A-Za-z0-9\-_]+$").unwrap();
Expand Down Expand Up @@ -86,6 +89,8 @@ fn broadcast(
broadcaster_id: String,
bchannel_id: String,
version: HandlerResult<VersionInput>,
metrics: Metrics,
base_tags: Tags,
) -> HandlerResult<status::Custom<JsonValue>> {
let conn = conn?;

Expand All @@ -96,8 +101,20 @@ fn broadcast(
Err(HandlerErrorKind::InvalidBchannelId)?
}

let mut tags = base_tags.clone();
let version = version?.value;

tags.tags
.insert("broadcaster".to_owned(), broadcaster_id.clone());
tags.tags
.insert("channel_id".to_owned(), bchannel_id.clone());
tags.tags.insert("version".to_owned(), version.clone());
metrics.incr_with_tags("broadcast.cmd.update", Some(tags.clone()));


let start = Instant::now();
let created = broadcaster?.broadcast_new_version(&conn, &bchannel_id, &version)?;
metrics.timer_with_tags("broadcast.update", (start-Instant::now()).as_millis() as u64, Some(tags));
let status = if created { Status::Created } else { Status::Ok };
info!(
log,
Expand All @@ -120,9 +137,14 @@ fn broadcast(
fn get_broadcasts(
conn: HandlerResult<db::Conn>,
reader: HandlerResult<Reader>,
metrics: Metrics,
) -> HandlerResult<JsonValue> {
metrics.incr("broadcast.cmd.dump");
let conn = conn?;
let start = Instant::now();
let broadcasts = reader?.read_broadcasts(&conn)?;
metrics.timer_with_tags("broadcast.dump", (start-Instant::now()).as_millis() as u64, None);

Ok(json!({
"code": 200,
"broadcasts": broadcasts
Expand Down Expand Up @@ -191,12 +213,16 @@ fn setup_rocket(rocket: Rocket) -> Result<Rocket> {
let authenticator = auth::BearerTokenAuthenticator::from_config(rocket.config())?;
let environment = rocket.config().environment;
let logger = logging::init_logging(rocket.config())?;
let metrics = Metrics::init(rocket.config())?;
let tags = Tags::init(rocket.config());
db::run_embedded_migrations(rocket.config())?;
Ok(rocket
.manage(pool)
.manage(authenticator)
.manage(environment)
.manage(logger)
.manage(metrics)
.manage(tags)
.mount(
"/",
routes![broadcast, get_broadcasts, version, heartbeat, lbheartbeat],
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mod db;
mod error;
mod http;
mod logging;
mod metrics;
mod tags;

fn main() {
http::rocket().expect("rocket failed").launch();
Expand Down
168 changes: 168 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use std::net::UdpSocket;
use std::time::Instant;

use cadence::{
BufferedUdpMetricSink, Counted, Metric, NopMetricSink, QueuingMetricSink, StatsdClient,
StatsdClientBuilder, Timed,
};
use rocket::{
config::ConfigError,
request::{self, FromRequest},
Config, Outcome, Request, State,
};
use slog::{error, trace, warn, Logger};

use crate::error;
use crate::logging;
use crate::tags::Tags;

#[derive(Debug, Clone)]
pub struct MetricTimer {
pub label: String,
pub start: Instant,
// pub tags: Tags,
}

#[derive(Debug, Clone)]
pub struct Metrics {
client: Option<StatsdClient>,
tags: Option<Tags>,
log: Logger,
timer: Option<MetricTimer>,
}

impl Drop for Metrics {
fn drop(&mut self) {
//let tags = self.tags.clone().unwrap_or_default();
if let Some(client) = self.client.as_ref() {
if let Some(timer) = self.timer.as_ref() {
let lapse = (Instant::now() - timer.start).as_millis() as u64;
warn!(self.log, "⌚ Ending timer at nanos: {:?} : {:?}", &timer.label, lapse);
let tagged = client.time_with_tags(&timer.label, lapse);
// Include any "hard coded" tags.
// tagged = tagged.with_tag("version", env!("CARGO_PKG_VERSION"));
// let tags = timer.tags.tags.clone();
/*
let keys = tags.keys();
for tag in keys {
tagged = tagged.with_tag(tag, &tags.get(tag).unwrap())
}
*/
match tagged.try_send() {
Err(e) => {
// eat the metric, but log the error
warn!(self.log, "⚠️ Metric {} error: {:?} ", &timer.label, e);
}
Ok(v) => {
trace!(self.log, "⌚ {:?}", v.as_metric_str());
}
}
}
}
}
}

impl Metrics {
pub fn sink() -> StatsdClientBuilder {
StatsdClient::builder("", NopMetricSink)
}

pub fn init(config: &Config) -> error::Result<Metrics> {
let logging = logging::init_logging(config)?;
let builder = match config.get_string("statsd_host") {
Ok(statsd_host) => {
let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.set_nonblocking(true)?;

let host = (
statsd_host.as_str(),
config.get_int("statsd_port").unwrap_or(8125) as u16,
);
let udp_sink = BufferedUdpMetricSink::from(host, socket)?;
let sink = QueuingMetricSink::from(udp_sink);
StatsdClient::builder(
&config
.get_string("statsd_label")
.unwrap_or("megaphone".to_string()),
sink,
)
}
Err(ConfigError::Missing(_)) => Self::sink(),
Err(e) => {
error!(logging, "Could not build metric: {:?}", e);
Err(error::HandlerErrorKind::InternalError)?
}
};
Ok(Metrics {
client: Some(
builder
.with_error_handler(|err| println!("Metric send error: {:?}", err))
.build(),
),
log: logging.clone(),
timer: None,
tags: Some(Tags::init(config)?),
})
}

// increment a counter with no tags data.
pub fn incr(&self, label: &str) {
self.incr_with_tags(label, None)
}

pub fn incr_with_tags(&self, label: &str, tags: Option<Tags>) {
if let Some(client) = self.client.as_ref() {
let mut tagged = client.incr_with_tags(label);
let mut mtags = self.tags.clone().unwrap_or_default();
if let Some(tags) = tags {
mtags.extend(tags.tags);
}
for key in mtags.tags.keys().clone() {
if let Some(val) = mtags.tags.get(key) {
tagged = tagged.with_tag(&key, val.as_ref());
}
}
// Include any "hard coded" tags.
// incr = incr.with_tag("version", env!("CARGO_PKG_VERSION"));
match tagged.try_send() {
Err(e) => {
// eat the metric, but log the error
warn!(self.log, "⚠️ Metric {} error: {:?} ", label, e);
}
Ok(v) => trace!(self.log, "☑️ {:?}", v.as_metric_str()),
}
}
}

pub fn timer_with_tags(&self, label:&str, lapse: u64, tags: Option<Tags>) {
if let Some(client) = self.client.as_ref() {
let mut tagged = client.time_with_tags(label, lapse);
let mtags = tags.unwrap_or_default();
for key in mtags.tags.keys().clone() {
if let Some(val) = mtags.tags.get(key) {
tagged = tagged.with_tag(&key, val.as_ref());
}
}
match tagged.try_send() {
Err(e) => {
warn!(self.log, "Metric {} error {:?}", label, e);
},
Ok(v) => {dbg!(v);}
}
}
}
}

impl<'a, 'r> FromRequest<'a, 'r> for Metrics {
type Error = failure::Error;

fn from_request(request: &'a Request<'r>) -> request::Outcome<Self, Self::Error> {
Outcome::Success(
request
.guard::<State<'_, Metrics>>()
.unwrap()
.inner()
.clone(),
)
}
}
Loading

0 comments on commit 9fdb597

Please sign in to comment.