diff --git a/Cargo.lock b/Cargo.lock index a3e8abe..a6bc4a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,6 +216,14 @@ name = "bytes" version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "cadence" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-channel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "cc" version = "1.0.61" @@ -1159,6 +1167,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "megaphone" version = "0.1.6" dependencies = [ + "cadence 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)", "diesel 1.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1170,7 +1179,7 @@ dependencies = [ "sentry 0.20.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.117 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.59 (registry+https://github.com/rust-lang/crates.io-index)", - "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 2.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-async 2.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-mozlog-json 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-term 2.6.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2192,7 +2201,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "slog" -version = "2.5.2" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "erased-serde 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2204,7 +2213,7 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crossbeam-channel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", - "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 2.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "take_mut 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2217,7 +2226,7 @@ dependencies = [ "chrono 0.4.19 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.117 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.59 (registry+https://github.com/rust-lang/crates.io-index)", - "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 2.7.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2227,7 +2236,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.19 (registry+https://github.com/rust-lang/crates.io-index)", - "slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 2.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "term 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2958,6 +2967,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" "checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" "checksum bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" +"checksum cadence 0.22.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6281d1200ac3293fd08be899c9a0c17b83cda0672221fcbe1fefc886a555e35e" "checksum cc 1.0.61 (registry+https://github.com/rust-lang/crates.io-index)" = "ed67cbde08356238e75fc4656be4749481eeffb09e19f320a25237d5221c985d" "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" "checksum cfg-if 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" @@ -3166,7 +3176,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum sha2 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a256f46ea78a0c0d9ff00077504903ac881a1dafdc20da66545699e7776b3e69" "checksum sized-chunks 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1ec31ceca5644fa6d444cc77548b88b67f46db6f7c71683b0f9336e671830d2f" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" -"checksum slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1cc9c640a4adbfbcc11ffb95efe5aa7af7309e002adab54b185507dbf2377b99" +"checksum slog 2.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8347046d4ebd943127157b94d63abb990fcf729dc4e9978927fdf4ac3c998d06" "checksum slog-async 2.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51b3336ce47ce2f96673499fc07eb85e3472727b9a7a2959964b002c2ce8fbbb" "checksum slog-mozlog-json 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f400f1c5db96f1f52065e8931ca0c524cceb029f7537c9e6d5424488ca137ca0" "checksum slog-term 2.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bab1d807cf71129b05ce36914e1dbb6fbfbdecaf686301cb457f4fa967f9f5b6" diff --git a/Cargo.toml b/Cargo.toml index e5302f1..75c2446 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ authors = [ edition = "2018" [dependencies] +cadence = { version = "0.22"} diesel = { version = "1.4", features = ["mysql", "r2d2"] } diesel_migrations = { version = "1.4.0", features = ["mysql"] } failure = "0.1" @@ -19,7 +20,7 @@ rocket_contrib = "0.4" serde = "1.0" serde_json = "1.0" sentry = { version = "0.20", features = ["with_curl_transport"] } -slog = { version = "2.5.2", features = ["nested-values"] } +slog = { version = "2.7", features = ["nested-values"] } slog-async = { version = "2.5", features = ["nested-values"] } slog_derive = "0.2.0" slog-mozlog-json = "0.1.0" diff --git a/src/http.rs b/src/http.rs index b3cd279..d4c4202 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,5 +1,6 @@ use std::env; use std::io::Read; +use std::time::Instant; use diesel::{dsl::sql, sql_types::Integer, OptionalExtension, QueryDsl, RunQueryDsl}; use failure::ResultExt; @@ -28,6 +29,8 @@ use crate::db::{ }; use crate::error::{HandlerError, HandlerErrorKind, HandlerResult, Result, VALIDATION_FAILED}; use crate::logging::{self, RequestLogger}; +use crate::metrics::Metrics; +use crate::tags::Tags; lazy_static! { static ref URLSAFE_B64_RE: Regex = Regex::new(r"^[A-Za-z0-9\-_]+$").unwrap(); @@ -86,6 +89,8 @@ fn broadcast( broadcaster_id: String, bchannel_id: String, version: HandlerResult, + metrics: Metrics, + base_tags: Tags, ) -> HandlerResult> { let conn = conn?; @@ -96,8 +101,20 @@ fn broadcast( Err(HandlerErrorKind::InvalidBchannelId)? } + let mut tags = base_tags.clone(); let version = version?.value; + + tags.tags + .insert("broadcaster".to_owned(), broadcaster_id.clone()); + tags.tags + .insert("channel_id".to_owned(), bchannel_id.clone()); + tags.tags.insert("version".to_owned(), version.clone()); + metrics.incr_with_tags("broadcast.cmd.update", Some(tags.clone())); + + + let start = Instant::now(); let created = broadcaster?.broadcast_new_version(&conn, &bchannel_id, &version)?; + metrics.timer_with_tags("broadcast.update", (start-Instant::now()).as_millis() as u64, Some(tags)); let status = if created { Status::Created } else { Status::Ok }; info!( log, @@ -120,9 +137,14 @@ fn broadcast( fn get_broadcasts( conn: HandlerResult, reader: HandlerResult, + metrics: Metrics, ) -> HandlerResult { + metrics.incr("broadcast.cmd.dump"); let conn = conn?; + let start = Instant::now(); let broadcasts = reader?.read_broadcasts(&conn)?; + metrics.timer_with_tags("broadcast.dump", (start-Instant::now()).as_millis() as u64, None); + Ok(json!({ "code": 200, "broadcasts": broadcasts @@ -191,12 +213,16 @@ fn setup_rocket(rocket: Rocket) -> Result { let authenticator = auth::BearerTokenAuthenticator::from_config(rocket.config())?; let environment = rocket.config().environment; let logger = logging::init_logging(rocket.config())?; + let metrics = Metrics::init(rocket.config())?; + let tags = Tags::init(rocket.config()); db::run_embedded_migrations(rocket.config())?; Ok(rocket .manage(pool) .manage(authenticator) .manage(environment) .manage(logger) + .manage(metrics) + .manage(tags) .mount( "/", routes![broadcast, get_broadcasts, version, heartbeat, lbheartbeat], diff --git a/src/main.rs b/src/main.rs index bb53565..1001bf3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,8 @@ mod db; mod error; mod http; mod logging; +mod metrics; +mod tags; fn main() { http::rocket().expect("rocket failed").launch(); diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..c88b1ac --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,168 @@ +use std::net::UdpSocket; +use std::time::Instant; + +use cadence::{ + BufferedUdpMetricSink, Counted, Metric, NopMetricSink, QueuingMetricSink, StatsdClient, + StatsdClientBuilder, Timed, +}; +use rocket::{ + config::ConfigError, + request::{self, FromRequest}, + Config, Outcome, Request, State, +}; +use slog::{error, trace, warn, Logger}; + +use crate::error; +use crate::logging; +use crate::tags::Tags; + +#[derive(Debug, Clone)] +pub struct MetricTimer { + pub label: String, + pub start: Instant, + // pub tags: Tags, +} + +#[derive(Debug, Clone)] +pub struct Metrics { + client: Option, + tags: Option, + log: Logger, + timer: Option, +} + +impl Drop for Metrics { + fn drop(&mut self) { + //let tags = self.tags.clone().unwrap_or_default(); + if let Some(client) = self.client.as_ref() { + if let Some(timer) = self.timer.as_ref() { + let lapse = (Instant::now() - timer.start).as_millis() as u64; + warn!(self.log, "⌚ Ending timer at nanos: {:?} : {:?}", &timer.label, lapse); + let tagged = client.time_with_tags(&timer.label, lapse); + // Include any "hard coded" tags. + // tagged = tagged.with_tag("version", env!("CARGO_PKG_VERSION")); + // let tags = timer.tags.tags.clone(); + /* + let keys = tags.keys(); + for tag in keys { + tagged = tagged.with_tag(tag, &tags.get(tag).unwrap()) + } + */ + match tagged.try_send() { + Err(e) => { + // eat the metric, but log the error + warn!(self.log, "⚠️ Metric {} error: {:?} ", &timer.label, e); + } + Ok(v) => { + trace!(self.log, "⌚ {:?}", v.as_metric_str()); + } + } + } + } + } +} + +impl Metrics { + pub fn sink() -> StatsdClientBuilder { + StatsdClient::builder("", NopMetricSink) + } + + pub fn init(config: &Config) -> error::Result { + let logging = logging::init_logging(config)?; + let builder = match config.get_string("statsd_host") { + Ok(statsd_host) => { + let socket = UdpSocket::bind("0.0.0.0:0")?; + socket.set_nonblocking(true)?; + + let host = ( + statsd_host.as_str(), + config.get_int("statsd_port").unwrap_or(8125) as u16, + ); + let udp_sink = BufferedUdpMetricSink::from(host, socket)?; + let sink = QueuingMetricSink::from(udp_sink); + StatsdClient::builder( + &config + .get_string("statsd_label") + .unwrap_or("megaphone".to_string()), + sink, + ) + } + Err(ConfigError::Missing(_)) => Self::sink(), + Err(e) => { + error!(logging, "Could not build metric: {:?}", e); + Err(error::HandlerErrorKind::InternalError)? + } + }; + Ok(Metrics { + client: Some( + builder + .with_error_handler(|err| println!("Metric send error: {:?}", err)) + .build(), + ), + log: logging.clone(), + timer: None, + tags: Some(Tags::init(config)?), + }) + } + + // increment a counter with no tags data. + pub fn incr(&self, label: &str) { + self.incr_with_tags(label, None) + } + + pub fn incr_with_tags(&self, label: &str, tags: Option) { + if let Some(client) = self.client.as_ref() { + let mut tagged = client.incr_with_tags(label); + let mut mtags = self.tags.clone().unwrap_or_default(); + if let Some(tags) = tags { + mtags.extend(tags.tags); + } + for key in mtags.tags.keys().clone() { + if let Some(val) = mtags.tags.get(key) { + tagged = tagged.with_tag(&key, val.as_ref()); + } + } + // Include any "hard coded" tags. + // incr = incr.with_tag("version", env!("CARGO_PKG_VERSION")); + match tagged.try_send() { + Err(e) => { + // eat the metric, but log the error + warn!(self.log, "⚠️ Metric {} error: {:?} ", label, e); + } + Ok(v) => trace!(self.log, "☑️ {:?}", v.as_metric_str()), + } + } + } + + pub fn timer_with_tags(&self, label:&str, lapse: u64, tags: Option) { + if let Some(client) = self.client.as_ref() { + let mut tagged = client.time_with_tags(label, lapse); + let mtags = tags.unwrap_or_default(); + for key in mtags.tags.keys().clone() { + if let Some(val) = mtags.tags.get(key) { + tagged = tagged.with_tag(&key, val.as_ref()); + } + } + match tagged.try_send() { + Err(e) => { + warn!(self.log, "Metric {} error {:?}", label, e); + }, + Ok(v) => {dbg!(v);} + } + } + } +} + +impl<'a, 'r> FromRequest<'a, 'r> for Metrics { + type Error = failure::Error; + + fn from_request(request: &'a Request<'r>) -> request::Outcome { + Outcome::Success( + request + .guard::>() + .unwrap() + .inner() + .clone(), + ) + } +} diff --git a/src/tags.rs b/src/tags.rs new file mode 100644 index 0000000..168e9aa --- /dev/null +++ b/src/tags.rs @@ -0,0 +1,150 @@ +use rocket::{ + request::{self, FromRequest}, + Config, Outcome, Request, State, +}; +use serde::{ + ser::{SerializeMap, Serializer}, + Serialize, +}; +use std::collections::{BTreeMap, HashMap}; + +use crate::error::Result; + +#[derive(Clone, Debug)] +pub struct Tags { + pub tags: HashMap, + pub extra: HashMap, +} + +impl Default for Tags { + fn default() -> Tags { + Tags { + tags: HashMap::new(), + extra: HashMap::new(), + } + } +} + +impl Serialize for Tags { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + let mut seq = serializer.serialize_map(Some(self.tags.len()))?; + for tag in self.tags.clone() { + if !tag.1.is_empty() { + seq.serialize_entry(&tag.0, &tag.1)?; + } + } + seq.end() + } +} + +// Tags are extra data to be recorded in metric and logging calls. +// If additional tags are required or desired, you will need to add them to the +// mutable extensions, e.g. +// ``` +// let mut tags = request.extensions_mut().get::(); +// tags.insert("SomeLabel".to_owned(), "whatever".to_owned()); +// ``` +// how you get the request (or the response, and it's set of `extensions`) to whatever +// function requires it, is left as an exercise for the reader. +impl Tags { + /* + pub fn with_tags(tags: HashMap) -> Tags { + if tags.is_empty() { + return Tags::default(); + } + Tags { + tags, + extra: HashMap::new(), + } + } + + pub fn get(&self, label: &str) -> String { + let none = "None".to_owned(); + self.tags.get(label).map(String::from).unwrap_or(none) + } + */ + + pub fn extend(&mut self, tags: HashMap) { + self.tags.extend(tags); + } + + /* // reserved for KV impl + pub fn tag_tree(self) -> BTreeMap { + let mut result = BTreeMap::new(); + + for (k, v) in self.tags { + result.insert(k.clone(), v.clone()); + } + result + } + + pub fn extra_tree(self) -> BTreeMap { + let mut result = BTreeMap::new(); + + for (k, v) in self.extra { + result.insert(k.clone(), Value::from(v)); + } + result + } + */ +} + +impl<'a, 'r> FromRequest<'a, 'r> for Tags { + type Error = failure::Error; + + fn from_request(req: &'a Request<'r>) -> request::Outcome { + Outcome::Success(req.guard::>().unwrap().inner().clone()) + } +} + +impl Tags { + pub fn init(_config: &Config) -> Result { + let tags = HashMap::new(); + let extra = HashMap::new(); + /* parse the header? + if let Some(ua) = req.headers().get("User-Agent") { + if let Ok(uas) = ua.to_str() { + let (ua_result, metrics_os, metrics_browser) = parse_user_agent(uas); + insert_if_not_empty("ua.os.family", metrics_os, &mut tags); + insert_if_not_empty("ua.browser.family", metrics_browser, &mut tags); + insert_if_not_empty("ua.name", ua_result.name, &mut tags); + insert_if_not_empty("ua.os.ver", &ua_result.os_version.to_owned(), &mut tags); + insert_if_not_empty("ua.browser.ver", ua_result.version, &mut tags); + extra.insert("ua".to_owned(), uas.to_string()); + } + } + tags.insert("uri.method".to_owned(), req_head.method.to_string()); + // `uri.path` causes too much cardinality for influx but keep it in + // extra for sentry + extra.insert("uri.path".to_owned(), req_head.uri.to_string()); + */ + Ok(Tags { tags, extra }) + } +} + +impl Into> for Tags { + fn into(self) -> BTreeMap { + let mut result = BTreeMap::new(); + + for (k, v) in self.tags { + result.insert(k.clone(), v.clone()); + } + + result + } +} + +/* +impl KV for Tags { + fn serialize(&self, _rec: &Record<'_>, serializer: &mut dyn slog::Serializer) -> slog::Result { + for (key, val) in &self.tags { + let k = Key::from(key.clone()); // Key::from wants a static. + serializer.emit_str(k.as_ref(), &val)?; + } + Ok(()) + } +} +*/