From a2b60da6c599d9aa53cd85ab0d240b8eb795010e Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Tue, 12 Dec 2023 16:32:09 +0100 Subject: [PATCH] Add experimental metrics implementation (#618) Co-authored-by: Jan Michael Auer --- CHANGELOG.md | 8 + Cargo.lock | 46 +- sentry-core/Cargo.toml | 3 + sentry-core/src/cadence.rs | 162 ++++ sentry-core/src/client.rs | 32 + sentry-core/src/lib.rs | 6 + sentry-core/src/metrics.rs | 1183 +++++++++++++++++++++++++ sentry-core/src/units.rs | 260 ++++++ sentry-types/Cargo.toml | 1 + sentry-types/src/protocol/envelope.rs | 20 +- sentry/Cargo.toml | 1 + sentry/src/transports/ratelimit.rs | 12 +- 12 files changed, 1717 insertions(+), 17 deletions(-) create mode 100644 sentry-core/src/cadence.rs create mode 100644 sentry-core/src/metrics.rs create mode 100644 sentry-core/src/units.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index ede7372f..919668d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## Unreleased + +**Features**: + +- Add experimental implementations for Sentry metrics and a cadence sink. These + require to use the `UNSTABLE_metrics` and `UNSTABLE_cadence` feature flags. + Note that these APIs are still under development and subject to change. + ## 0.32.0 **Features**: diff --git a/Cargo.lock b/Cargo.lock index b4b089b8..730b164b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -767,6 +767,15 @@ dependencies = [ "bytes 1.5.0", ] +[[package]] +name = "cadence" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f39286bc075b023101dccdb79456a1334221c768b8faede0c2aff7ed29a9482d" +dependencies = [ + "crossbeam-channel", +] + [[package]] name = "cast" version = "0.3.0" @@ -986,6 +995,16 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -2920,7 +2939,7 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "sentry" -version = "0.31.8" +version = "0.32.0" dependencies = [ "actix-web", "anyhow", @@ -2956,7 +2975,7 @@ dependencies = [ [[package]] name = "sentry-actix" -version = "0.31.8" +version = "0.32.0" dependencies = [ "actix-web", "futures", @@ -2968,7 +2987,7 @@ dependencies = [ [[package]] name = "sentry-anyhow" -version = "0.31.8" +version = "0.32.0" dependencies = [ "anyhow", "sentry", @@ -2978,7 +2997,7 @@ dependencies = [ [[package]] name = "sentry-backtrace" -version = "0.31.8" +version = "0.32.0" dependencies = [ "backtrace", "once_cell", @@ -2988,7 +3007,7 @@ dependencies = [ [[package]] name = "sentry-contexts" -version = "0.31.8" +version = "0.32.0" dependencies = [ "hostname", "libc", @@ -3001,9 +3020,10 @@ dependencies = [ [[package]] name = "sentry-core" -version = "0.31.8" +version = "0.32.0" dependencies = [ "anyhow", + "cadence", "criterion", "futures", "log", @@ -3021,7 +3041,7 @@ dependencies = [ [[package]] name = "sentry-debug-images" -version = "0.31.8" +version = "0.32.0" dependencies = [ "findshlibs", "once_cell", @@ -3030,7 +3050,7 @@ dependencies = [ [[package]] name = "sentry-log" -version = "0.31.8" +version = "0.32.0" dependencies = [ "log", "pretty_env_logger", @@ -3040,7 +3060,7 @@ dependencies = [ [[package]] name = "sentry-panic" -version = "0.31.8" +version = "0.32.0" dependencies = [ "sentry", "sentry-backtrace", @@ -3049,7 +3069,7 @@ dependencies = [ [[package]] name = "sentry-slog" -version = "0.31.8" +version = "0.32.0" dependencies = [ "erased-serde", "sentry", @@ -3061,7 +3081,7 @@ dependencies = [ [[package]] name = "sentry-tower" -version = "0.31.8" +version = "0.32.0" dependencies = [ "anyhow", "axum 0.7.1", @@ -3081,7 +3101,7 @@ dependencies = [ [[package]] name = "sentry-tracing" -version = "0.31.8" +version = "0.32.0" dependencies = [ "log", "sentry", @@ -3095,7 +3115,7 @@ dependencies = [ [[package]] name = "sentry-types" -version = "0.31.8" +version = "0.32.0" dependencies = [ "debugid", "hex", diff --git a/sentry-core/Cargo.toml b/sentry-core/Cargo.toml index 5242acaf..8095cc46 100644 --- a/sentry-core/Cargo.toml +++ b/sentry-core/Cargo.toml @@ -26,8 +26,11 @@ client = ["rand"] # and macros actually expand features (and extern crate) where they are used! debug-logs = ["dep:log"] test = ["client"] +UNSTABLE_metrics = ["sentry-types/UNSTABLE_metrics"] +UNSTABLE_cadence = ["dep:cadence", "UNSTABLE_metrics"] [dependencies] +cadence = { version = "0.29.0", optional = true } log = { version = "0.4.8", optional = true, features = ["std"] } once_cell = "1" rand = { version = "0.8.1", optional = true } diff --git a/sentry-core/src/cadence.rs b/sentry-core/src/cadence.rs new file mode 100644 index 00000000..8cd7c3d0 --- /dev/null +++ b/sentry-core/src/cadence.rs @@ -0,0 +1,162 @@ +//! [`cadence`] integration for Sentry. +//! +//! [`cadence`] is a popular Statsd client for Rust. The [`SentryMetricSink`] provides a drop-in +//! integration to send metrics captured via `cadence` to Sentry. For direct usage of Sentry +//! metrics, see the [`metrics`](crate::metrics) module. +//! +//! # Usage +//! +//! To use the `cadence` integration, enable the `UNSTABLE_cadence` feature in your `Cargo.toml`. +//! Then, create a [`SentryMetricSink`] and pass it to your `cadence` client: +//! +//! ``` +//! use cadence::StatsdClient; +//! use sentry::cadence::SentryMetricSink; +//! +//! let client = StatsdClient::from_sink("sentry.test", SentryMetricSink::new()); +//! ``` +//! +//! # Side-by-side Usage +//! +//! If you want to send metrics to Sentry and another backend at the same time, you can use +//! [`SentryMetricSink::wrap`] to wrap another [`MetricSink`]: +//! +//! ``` +//! use cadence::{StatsdClient, NopMetricSink}; +//! use sentry::cadence::SentryMetricSink; +//! +//! let sink = SentryMetricSink::wrap(NopMetricSink); +//! let client = StatsdClient::from_sink("sentry.test", sink); +//! ``` + +use std::sync::Arc; + +use cadence::{MetricSink, NopMetricSink}; + +use crate::metrics::Metric; +use crate::{Client, Hub}; + +/// A [`MetricSink`] that sends metrics to Sentry. +/// +/// This metric sends all metrics to Sentry. The Sentry client is internally buffered, so submission +/// will be delayed. +/// +/// Optionally, this sink can also forward metrics to another [`MetricSink`]. This is useful if you +/// want to send metrics to Sentry and another backend at the same time. Use +/// [`SentryMetricSink::wrap`] to construct such a sink. +#[derive(Debug)] +pub struct SentryMetricSink { + client: Option>, + sink: S, +} + +impl SentryMetricSink +where + S: MetricSink, +{ + /// Creates a new [`SentryMetricSink`], wrapping the given [`MetricSink`]. + pub fn wrap(sink: S) -> Self { + Self { client: None, sink } + } + + /// Creates a new [`SentryMetricSink`] sending data to the given [`Client`]. + pub fn with_client(mut self, client: Arc) -> Self { + self.client = Some(client); + self + } +} + +impl SentryMetricSink { + /// Creates a new [`SentryMetricSink`]. + /// + /// It is not required that a client is available when this sink is created. The sink sends + /// metrics to the client of the Sentry hub that is registered when the metrics are emitted. + pub fn new() -> Self { + Self { + client: None, + sink: NopMetricSink, + } + } +} + +impl Default for SentryMetricSink { + fn default() -> Self { + Self::new() + } +} + +impl MetricSink for SentryMetricSink { + fn emit(&self, string: &str) -> std::io::Result { + if let Ok(metric) = Metric::parse_statsd(string) { + if let Some(ref client) = self.client { + client.add_metric(metric); + } else if let Some(client) = Hub::current().client() { + client.add_metric(metric); + } + } + + // NopMetricSink returns `0`, which is correct as Sentry is buffering the metrics. + self.sink.emit(string) + } + + fn flush(&self) -> std::io::Result<()> { + let flushed = if let Some(ref client) = self.client { + client.flush(None) + } else if let Some(client) = Hub::current().client() { + client.flush(None) + } else { + true + }; + + let sink_result = self.sink.flush(); + + if !flushed { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to flush metrics to Sentry", + )) + } else { + sink_result + } + } +} + +#[cfg(test)] +mod tests { + use cadence::{Counted, Distributed}; + use sentry_types::protocol::latest::EnvelopeItem; + + use crate::test::with_captured_envelopes; + + use super::*; + + #[test] + fn test_basic_metrics() { + let envelopes = with_captured_envelopes(|| { + let client = cadence::StatsdClient::from_sink("sentry.test", SentryMetricSink::new()); + client.count("some.count", 1).unwrap(); + client.count("some.count", 10).unwrap(); + client + .count_with_tags("count.with.tags", 1) + .with_tag("foo", "bar") + .send(); + client.distribution("some.distr", 1).unwrap(); + client.distribution("some.distr", 2).unwrap(); + client.distribution("some.distr", 3).unwrap(); + }); + assert_eq!(envelopes.len(), 1); + + let mut items = envelopes[0].items(); + let Some(EnvelopeItem::Statsd(metrics)) = items.next() else { + panic!("expected metrics"); + }; + let metrics = std::str::from_utf8(metrics).unwrap(); + + println!("{metrics}"); + + assert!(metrics.contains("sentry.test.count.with.tags:1|c|#foo:bar|T")); + assert!(metrics.contains("sentry.test.some.count:11|c|T")); + assert!(metrics.contains("sentry.test.some.distr:1:2:3|d|T")); + assert_eq!(items.next(), None); + } +} diff --git a/sentry-core/src/client.rs b/sentry-core/src/client.rs index 0dd7c9d1..a2681140 100644 --- a/sentry-core/src/client.rs +++ b/sentry-core/src/client.rs @@ -11,6 +11,8 @@ use sentry_types::protocol::v7::SessionUpdate; use sentry_types::random_uuid; use crate::constants::SDK_INFO; +#[cfg(feature = "UNSTABLE_metrics")] +use crate::metrics::{self, MetricAggregator}; use crate::protocol::{ClientSdkInfo, Event}; use crate::session::SessionFlusher; use crate::types::{Dsn, Uuid}; @@ -45,6 +47,8 @@ pub struct Client { options: ClientOptions, transport: TransportArc, session_flusher: RwLock>, + #[cfg(feature = "UNSTABLE_metrics")] + metric_aggregator: RwLock>, integrations: Vec<(TypeId, Arc)>, pub(crate) sdk_info: ClientSdkInfo, } @@ -65,10 +69,17 @@ impl Clone for Client { transport.clone(), self.options.session_mode, ))); + #[cfg(feature = "UNSTABLE_metrics")] + let metric_aggregator = RwLock::new(Some(MetricAggregator::new( + transport.clone(), + &self.options, + ))); Client { options: self.options.clone(), transport, session_flusher, + #[cfg(feature = "UNSTABLE_metrics")] + metric_aggregator, integrations: self.integrations.clone(), sdk_info: self.sdk_info.clone(), } @@ -136,10 +147,17 @@ impl Client { transport.clone(), options.session_mode, ))); + + #[cfg(feature = "UNSTABLE_metrics")] + let metric_aggregator = + RwLock::new(Some(MetricAggregator::new(transport.clone(), &options))); + Client { options, transport, session_flusher, + #[cfg(feature = "UNSTABLE_metrics")] + metric_aggregator, integrations, sdk_info, } @@ -308,11 +326,23 @@ impl Client { } } + /// Captures a metric and sends it to Sentry on the next flush. + #[cfg(feature = "UNSTABLE_metrics")] + pub fn add_metric(&self, metric: metrics::Metric) { + if let Some(ref aggregator) = *self.metric_aggregator.read().unwrap() { + aggregator.add(metric) + } + } + /// Drains all pending events without shutting down. pub fn flush(&self, timeout: Option) -> bool { if let Some(ref flusher) = *self.session_flusher.read().unwrap() { flusher.flush(); } + #[cfg(feature = "UNSTABLE_metrics")] + if let Some(ref aggregator) = *self.metric_aggregator.read().unwrap() { + aggregator.flush(); + } if let Some(ref transport) = *self.transport.read().unwrap() { transport.flush(timeout.unwrap_or(self.options.shutdown_timeout)) } else { @@ -329,6 +359,8 @@ impl Client { /// `shutdown_timeout` in the client options. pub fn close(&self, timeout: Option) -> bool { drop(self.session_flusher.write().unwrap().take()); + #[cfg(feature = "UNSTABLE_metrics")] + drop(self.metric_aggregator.write().unwrap().take()); let transport_opt = self.transport.write().unwrap().take(); if let Some(transport) = transport_opt { sentry_debug!("client close; request transport to shut down"); diff --git a/sentry-core/src/lib.rs b/sentry-core/src/lib.rs index aa513aa7..f29454e6 100644 --- a/sentry-core/src/lib.rs +++ b/sentry-core/src/lib.rs @@ -136,13 +136,19 @@ pub use crate::performance::*; pub use crate::scope::{Scope, ScopeGuard}; pub use crate::transport::{Transport, TransportFactory}; +#[cfg(all(feature = "client", feature = "UNSTABLE_cadence"))] +pub mod cadence; // client feature #[cfg(feature = "client")] mod client; #[cfg(feature = "client")] mod hub_impl; +#[cfg(all(feature = "client", feature = "UNSTABLE_metrics"))] +pub mod metrics; #[cfg(feature = "client")] mod session; +#[cfg(all(feature = "client", feature = "UNSTABLE_metrics"))] +mod units; #[cfg(feature = "client")] pub use crate::client::Client; diff --git a/sentry-core/src/metrics.rs b/sentry-core/src/metrics.rs new file mode 100644 index 00000000..ffb94a07 --- /dev/null +++ b/sentry-core/src/metrics.rs @@ -0,0 +1,1183 @@ +//! Utilities to track metrics in Sentry. +//! +//! Metrics are numerical values that can track anything about your environment over time, from +//! latency to error rates to user signups. +//! +//! Metrics at Sentry come in different flavors, in order to help you track your data in the most +//! efficient and cost-effective way. The types of metrics we currently support are: +//! +//! - **Counters** track a value that can only be incremented. +//! - **Distributions** track a list of values over time in on which you can perform aggregations +//! like max, min, avg. +//! - **Gauges** track a value that can go up and down. +//! - **Sets** track a set of values on which you can perform aggregations such as count_unique. +//! +//! For more information on metrics in Sentry, see [our docs]. +//! +//! # Usage +//! +//! To collect a metric, use the [`Metric`] struct to capture all relevant properties of your +//! metric. Then, use [`send`](Metric::send) to send the metric to Sentry: +//! +//! ``` +//! use std::time::Duration; +//! use sentry::metrics::Metric; +//! +//! Metric::count("requests") +//! .with_tag("method", "GET") +//! .send(); +//! +//! Metric::timing("request.duration", Duration::from_millis(17)) +//! .with_tag("status_code", "200") +//! // unit is added automatically by timing +//! .send(); +//! +//! Metric::set("site.visitors", "user1") +//! .with_unit("user") +//! .send(); +//! ``` +//! +//! # Usage with Cadence +//! +//! [`cadence`] is a popular Statsd client for Rust and can be used to send metrics to Sentry. To +//! use Sentry directly with `cadence`, see the [`sentry-cadence`](crate::cadence) documentation. +//! +//! [our docs]: https://develop.sentry.dev/delightful-developer-metrics/ + +use std::borrow::Cow; +use std::collections::hash_map::{DefaultHasher, Entry}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::fmt::{self, Write}; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use sentry_types::protocol::latest::{Envelope, EnvelopeItem}; + +use crate::client::TransportArc; +use crate::{ClientOptions, Hub}; + +pub use crate::units::*; + +const BUCKET_INTERVAL: Duration = Duration::from_secs(10); +const FLUSH_INTERVAL: Duration = Duration::from_secs(5); +const MAX_WEIGHT: usize = 100_000; + +/// Type alias for strings used in [`Metric`] for names and tags. +pub type MetricStr = Cow<'static, str>; + +/// Type used for [`MetricValue::Counter`]. +pub type CounterValue = f64; + +/// Type used for [`MetricValue::Distribution`]. +pub type DistributionValue = f64; + +/// Type used for [`MetricValue::Set`]. +pub type SetValue = u32; + +/// Type used for [`MetricValue::Gauge`]. +pub type GaugeValue = f64; + +/// The value of a [`Metric`], indicating its type. +#[derive(Debug)] +pub enum MetricValue { + /// Counts instances of an event. + /// + /// Counters can be incremented and decremented. The default operation is to increment a counter + /// by `1`, although increments by larger values and even floating point values are possible. + /// + /// # Example + /// + /// ``` + /// use sentry::metrics::{Metric, MetricValue}; + /// + /// Metric::build("my.counter", MetricValue::Counter(1.0)).send(); + /// ``` + Counter(CounterValue), + + /// Builds a statistical distribution over values reported. + /// + /// Based on individual reported values, distributions allow to query the maximum, minimum, or + /// average of the reported values, as well as statistical quantiles. With an increasing number + /// of values in the distribution, its accuracy becomes approximate. + /// + /// # Example + /// + /// ``` + /// use sentry::metrics::{Metric, MetricValue}; + /// + /// Metric::build("my.distribution", MetricValue::Distribution(42.0)).send(); + /// ``` + Distribution(DistributionValue), + + /// Counts the number of unique reported values. + /// + /// Sets allow sending arbitrary discrete values, including strings, and store the deduplicated + /// count. With an increasing number of unique values in the set, its accuracy becomes + /// approximate. It is not possible to query individual values from a set. + /// + /// # Example + /// + /// To create a set value, use [`MetricValue::set_from_str`] or + /// [`MetricValue::set_from_display`]. These functions convert the provided argument into a + /// unique hash value, which is then used as the set value. + /// + /// ``` + /// use sentry::metrics::{Metric, MetricValue}; + /// + /// Metric::build("my.set", MetricValue::set_from_str("foo")).send(); + /// ``` + Set(SetValue), + + /// Stores absolute snapshots of values. + /// + /// In addition to plain [counters](Self::Counter), gauges store a snapshot of the maximum, + /// minimum and sum of all values, as well as the last reported value. Note that the "last" + /// component of this aggregation is not commutative. Which value is preserved as last value is + /// implementation-defined. + /// + /// # Example + /// + /// ``` + /// use sentry::metrics::{Metric, MetricValue}; + /// + /// Metric::build("my.gauge", MetricValue::Gauge(42.0)).send(); + /// ``` + Gauge(GaugeValue), +} + +impl MetricValue { + /// Returns a set value representing the given string. + pub fn set_from_str(string: &str) -> Self { + Self::Set(hash_set_value(string)) + } + + /// Returns a set value representing the given argument. + pub fn set_from_display(display: impl fmt::Display) -> Self { + Self::Set(hash_set_value(&display.to_string())) + } + + /// Returns the type of the metric value. + fn ty(&self) -> MetricType { + match self { + Self::Counter(_) => MetricType::Counter, + Self::Distribution(_) => MetricType::Distribution, + Self::Gauge(_) => MetricType::Gauge, + Self::Set(_) => MetricType::Set, + } + } +} + +/// Hashes the given set value. +/// +/// Sets only guarantee 32-bit accuracy, but arbitrary strings are allowed on the protocol. Upon +/// parsing, they are hashed and only used as hashes subsequently. +fn hash_set_value(string: &str) -> u32 { + use std::hash::Hasher; + let mut hasher = DefaultHasher::default(); + hasher.write(string.as_bytes()); + hasher.finish() as u32 +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +enum MetricType { + Counter, + Distribution, + Set, + Gauge, +} + +impl MetricType { + /// Return the shortcode for this metric type. + pub fn as_str(&self) -> &'static str { + match self { + MetricType::Counter => "c", + MetricType::Distribution => "d", + MetricType::Set => "s", + MetricType::Gauge => "g", + } + } +} + +impl fmt::Display for MetricType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_str()) + } +} + +impl std::str::FromStr for MetricType { + type Err = (); + + fn from_str(s: &str) -> Result { + Ok(match s { + "c" | "m" => Self::Counter, + "h" | "d" | "ms" => Self::Distribution, + "s" => Self::Set, + "g" => Self::Gauge, + _ => return Err(()), + }) + } +} + +/// A snapshot of values. +#[derive(Clone, Copy, Debug, PartialEq)] +struct GaugeSummary { + /// The last value reported in the bucket. + /// + /// This aggregation is not commutative. + pub last: GaugeValue, + /// The minimum value reported in the bucket. + pub min: GaugeValue, + /// The maximum value reported in the bucket. + pub max: GaugeValue, + /// The sum of all values reported in the bucket. + pub sum: GaugeValue, + /// The number of times this bucket was updated with a new value. + pub count: u64, +} + +impl GaugeSummary { + /// Creates a gauge snapshot from a single value. + pub fn single(value: GaugeValue) -> Self { + Self { + last: value, + min: value, + max: value, + sum: value, + count: 1, + } + } + + /// Inserts a new value into the gauge. + pub fn insert(&mut self, value: GaugeValue) { + self.last = value; + self.min = self.min.min(value); + self.max = self.max.max(value); + self.sum += value; + self.count += 1; + } +} + +/// The aggregated value of a [`Metric`] bucket. +#[derive(Debug)] +enum BucketValue { + Counter(CounterValue), + Distribution(Vec), + Set(BTreeSet), + Gauge(GaugeSummary), +} + +impl BucketValue { + /// Inserts a new value into the bucket and returns the added weight. + pub fn insert(&mut self, value: MetricValue) -> usize { + match (self, value) { + (Self::Counter(c1), MetricValue::Counter(c2)) => { + *c1 += c2; + 0 + } + (Self::Distribution(d1), MetricValue::Distribution(d2)) => { + d1.push(d2); + 1 + } + (Self::Set(s1), MetricValue::Set(s2)) => { + if s1.insert(s2) { + 1 + } else { + 0 + } + } + (Self::Gauge(g1), MetricValue::Gauge(g2)) => { + g1.insert(g2); + 0 + } + _ => panic!("invalid metric type"), + } + } + + /// Returns the number of values stored in this bucket. + pub fn weight(&self) -> usize { + match self { + BucketValue::Counter(_) => 1, + BucketValue::Distribution(v) => v.len(), + BucketValue::Set(v) => v.len(), + BucketValue::Gauge(_) => 5, + } + } +} + +impl From for BucketValue { + fn from(value: MetricValue) -> Self { + match value { + MetricValue::Counter(v) => Self::Counter(v), + MetricValue::Distribution(v) => Self::Distribution(vec![v]), + MetricValue::Gauge(v) => Self::Gauge(GaugeSummary::single(v)), + MetricValue::Set(v) => Self::Set(BTreeSet::from([v])), + } + } +} + +/// A metric value that contains a numeric value and metadata to be sent to Sentry. +/// +/// # Units +/// +/// To make the most out of metrics in Sentry, consider assigning a unit during construction. This +/// can be achieved using the [`with_unit`](MetricBuilder::with_unit) builder method. See the +/// documentation for more examples on units. +/// +/// ``` +/// use sentry::metrics::{Metric, InformationUnit}; +/// +/// Metric::distribution("request.size", 47.2) +/// .with_unit(InformationUnit::Byte) +/// .send(); +/// ``` +/// +/// # Sending Metrics +/// +/// Metrics can be sent to Sentry directly using the [`send`](MetricBuilder::send) method on the +/// constructor. This will send the metric to the [`Client`](crate::Client) on the current [`Hub`]. +/// If there is no client on the current hub, the metric is dropped. +/// +/// ``` +/// use sentry::metrics::Metric; +/// +/// Metric::count("requests") +/// .with_tag("method", "GET") +/// .send(); +/// ``` +/// +/// # Sending to a Custom Client +/// +/// Metrics can also be sent to a custom client. This is useful if you want to send metrics to a +/// different Sentry project or with different configuration. To do so, finish building the metric +/// and then call [`add_metric`](crate::Client::add_metric) to the client: +/// +/// ``` +/// use sentry::Hub; +/// use sentry::metrics::Metric; +/// +/// let metric = Metric::count("requests") +/// .with_tag("method", "GET") +/// .finish(); +/// +/// // Obtain a client from somewhere +/// if let Some(client) = Hub::current().client() { +/// client.add_metric(metric); +/// } +/// ``` +#[derive(Debug)] +pub struct Metric { + /// The name of the metric, identifying it in Sentry. + /// + /// The name should consist of + name: MetricStr, + unit: MetricUnit, + value: MetricValue, + tags: BTreeMap, + time: Option, +} + +impl Metric { + /// Creates a new metric with the stated name and value. + /// + /// The provided name identifies the metric in Sentry. It should consist of alphanumeric + /// characters and `_`, `-`, and `.`. While a single forward slash (`/`) is also allowed in + /// metric names, it has a special meaning and should not be used in regular metric names. All + /// characters that do not match this criteria are sanitized. + /// + /// The value of the metric determines its type. See the [struct-level](self) docs and + /// constructor methods for examples on how to build metrics. + pub fn build(name: impl Into, value: MetricValue) -> MetricBuilder { + let metric = Metric { + name: name.into(), + unit: MetricUnit::None, + value, + tags: BTreeMap::new(), + time: None, + }; + + MetricBuilder { metric } + } + + /// Parses a metric from a StatsD string. + /// + /// This supports regular StatsD payloads with an extension for tags. In the below example, tags + /// are optional: + /// + /// ```plain + /// :||#:,: + /// ``` + /// + /// Units are encoded into the metric name, separated by an `@`: + /// + /// ```plain + /// @:||#:,: + /// ``` + pub fn parse_statsd(string: &str) -> Result { + parse_metric_opt(string).ok_or(ParseMetricError(())) + } + + /// Builds a metric that increments a [counter](MetricValue::Counter) by the given value. + /// + /// # Example + /// + /// ``` + /// use sentry::metrics::{Metric}; + /// + /// Metric::incr("operation.total_values", 7.0).send(); + /// ``` + pub fn incr(name: impl Into, value: f64) -> MetricBuilder { + Self::build(name, MetricValue::Counter(value)) + } + + /// Builds a metric that [counts](MetricValue::Counter) the single occurrence of an event. + /// + /// # Example + /// + /// ``` + /// use sentry::metrics::{Metric}; + /// + /// Metric::count("requests").send(); + /// ``` + pub fn count(name: impl Into) -> MetricBuilder { + Self::build(name, MetricValue::Counter(1.0)) + } + + /// Builds a metric that tracks the duration of an operation. + /// + /// This is a [distribution](MetricValue::Distribution) metric that is tracked in seconds. + /// + /// # Example + /// + /// ``` + /// use std::time::Duration; + /// use sentry::metrics::{Metric}; + /// + /// Metric::timing("operation", Duration::from_secs(1)).send(); + /// ``` + pub fn timing(name: impl Into, timing: Duration) -> MetricBuilder { + Self::build(name, MetricValue::Distribution(timing.as_secs_f64())) + .with_unit(DurationUnit::Second) + } + + /// Builds a metric that tracks the [distribution](MetricValue::Distribution) of values. + /// + /// # Example + /// + /// ``` + /// use sentry::metrics::{Metric}; + /// + /// Metric::distribution("operation.batch_size", 42.0).send(); + /// ``` + pub fn distribution(name: impl Into, value: f64) -> MetricBuilder { + Self::build(name, MetricValue::Distribution(value)) + } + + /// Builds a metric that tracks the [unique number](MetricValue::Set) of values provided. + /// + /// See [`MetricValue`] for more ways to construct sets. + /// + /// # Example + /// + /// ``` + /// use sentry::metrics::{Metric}; + /// + /// Metric::set("users", "user1").send(); + /// ``` + pub fn set(name: impl Into, string: &str) -> MetricBuilder { + Self::build(name, MetricValue::set_from_str(string)) + } + + /// Builds a metric that tracks the [snapshot](MetricValue::Gauge) of provided values. + /// + /// # Example + /// + /// ``` + /// use sentry::metrics::{Metric}; + /// + /// Metric::gauge("cache.size", 42.0).send(); + /// ``` + pub fn gauge(name: impl Into, value: f64) -> MetricBuilder { + Self::build(name, MetricValue::Gauge(value)) + } + + /// Sends the metric to the current client. + /// + /// When building a metric, you can use [`MetricBuilder::send`] to send the metric directly. If + /// there is no client on the current [`Hub`], the metric is dropped. + pub fn send(self) { + if let Some(client) = Hub::current().client() { + client.add_metric(self); + } + } +} + +/// A builder for metrics. +/// +/// Use one of the [`Metric`] constructors to create a new builder. See the struct-level docs for +/// examples of how to build metrics. +#[must_use] +#[derive(Debug)] +pub struct MetricBuilder { + metric: Metric, +} + +impl MetricBuilder { + /// Sets the unit for the metric. + /// + /// The unit augments the metric value by giving it a magnitude and semantics. Some units have + /// special support when rendering metrics or their values in Sentry, such as for timings. See + /// [`MetricUnit`] for more information on the supported units. The unit can be set to + /// [`MetricUnit::None`] to indicate that the metric has no unit, or to [`MetricUnit::Custom`] + /// to indicate a user-defined unit. + /// + /// By default, the unit is set to [`MetricUnit::None`]. + pub fn with_unit(mut self, unit: impl Into) -> Self { + self.metric.unit = unit.into(); + self + } + + /// Adds a tag to the metric. + /// + /// Tags allow you to add dimensions to metrics. They are key-value pairs that can be filtered + /// or grouped by in Sentry. + /// + /// When sent to Sentry via [`MetricBuilder::send`] or when added to a + /// [`Client`](crate::Client), the client may add default tags to the metrics, such as the + /// `release` or the `environment` from the Scope. + pub fn with_tag(mut self, name: impl Into, value: impl Into) -> Self { + self.metric.tags.insert(name.into(), value.into()); + self + } + + /// Sets the timestamp for the metric. + /// + /// By default, the timestamp is set to the current time when the metric is built or sent. + pub fn with_time(mut self, time: SystemTime) -> Self { + self.metric.time = Some(time); + self + } + + /// Builds the metric. + pub fn finish(self) -> Metric { + self.metric + } + + /// Sends the metric to the current client. + /// + /// This is a shorthand for `.finish().send()`. If there is no client on the current [`Hub`], + /// the metric is dropped. + pub fn send(self) { + self.finish().send() + } +} + +/// Error emitted from [`Metric::parse_statsd`] for invalid metric strings. +#[derive(Debug)] +pub struct ParseMetricError(()); + +impl std::error::Error for ParseMetricError {} + +impl fmt::Display for ParseMetricError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("invalid metric string") + } +} + +fn parse_metric_opt(string: &str) -> Option { + let mut components = string.split('|'); + + let (mri_str, value_str) = components.next()?.split_once(':')?; + let (name, unit) = match mri_str.split_once('@') { + Some((name, unit_str)) => (name, unit_str.parse().ok()?), + None => (mri_str, MetricUnit::None), + }; + + let ty = components.next().and_then(|s| s.parse().ok())?; + let value = match ty { + MetricType::Counter => MetricValue::Counter(value_str.parse().ok()?), + MetricType::Distribution => MetricValue::Distribution(value_str.parse().ok()?), + MetricType::Set => MetricValue::Set(value_str.parse().ok()?), + MetricType::Gauge => MetricValue::Gauge(value_str.parse().ok()?), + }; + + let mut builder = Metric::build(name.to_owned(), value).with_unit(unit); + + for component in components { + if let Some('#') = component.chars().next() { + for pair in component.get(1..)?.split(',') { + let mut key_value = pair.splitn(2, ':'); + + let key = key_value.next()?.to_owned(); + let value = key_value.next().unwrap_or_default().to_owned(); + + builder = builder.with_tag(key, value); + } + } + } + + Some(builder.finish()) +} + +/// Composite bucket key for [`BucketMap`]. +#[derive(Debug, PartialEq, Eq, Hash)] +struct BucketKey { + ty: MetricType, + name: MetricStr, + unit: MetricUnit, + tags: BTreeMap, +} + +/// UNIX timestamp used for buckets. +type Timestamp = u64; + +/// A nested map storing metric buckets. +/// +/// This map consists of two levels: +/// 1. The rounded UNIX timestamp of buckets. +/// 2. The metric buckets themselves with a corresponding timestamp. +/// +/// This structure allows for efficient dequeueing of buckets that are older than a certain +/// threshold. The buckets are dequeued in order of their timestamp, so the oldest buckets are +/// dequeued first. +type BucketMap = BTreeMap>; + +#[derive(Debug)] +struct SharedAggregatorState { + buckets: BucketMap, + weight: usize, + running: bool, + force_flush: bool, +} + +impl SharedAggregatorState { + pub fn new() -> Self { + Self { + buckets: BTreeMap::new(), + weight: 0, + running: true, + force_flush: false, + } + } + + /// Adds a new bucket to the aggregator. + /// + /// The bucket timestamp is rounded to the nearest bucket interval. Note that this does NOT + /// automatically flush the aggregator if the weight exceeds the weight threshold. + pub fn add(&mut self, mut timestamp: Timestamp, key: BucketKey, value: MetricValue) { + // Floor timestamp to bucket interval + timestamp /= BUCKET_INTERVAL.as_secs(); + timestamp *= BUCKET_INTERVAL.as_secs(); + + match self.buckets.entry(timestamp).or_default().entry(key) { + Entry::Occupied(mut e) => self.weight += e.get_mut().insert(value), + Entry::Vacant(e) => self.weight += e.insert(value.into()).weight(), + } + } + + /// Removes and returns all buckets that are ready to flush. + /// + /// Buckets are ready to flush as soon as their time window has closed. For example, a bucket + /// from timestamps `[4600, 4610)` is ready to flush immediately at `4610`. + pub fn take_buckets(&mut self) -> BucketMap { + if self.force_flush || !self.running { + self.weight = 0; + self.force_flush = false; + std::mem::take(&mut self.buckets) + } else { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .saturating_sub(BUCKET_INTERVAL) + .as_secs(); + + // Split all buckets after the cutoff time. `split` contains newer buckets, which should + // remain, so swap them. After the swap, `split` contains all older buckets. + let mut split = self.buckets.split_off(×tamp); + std::mem::swap(&mut split, &mut self.buckets); + + self.weight -= split + .values() + .flat_map(|map| map.values()) + .map(|bucket| bucket.weight()) + .sum::(); + + split + } + } + + pub fn weight(&self) -> usize { + self.weight + } +} + +type TagMap = BTreeMap; + +fn get_default_tags(options: &ClientOptions) -> TagMap { + let mut tags = TagMap::new(); + if let Some(ref release) = options.release { + tags.insert("release".into(), release.clone()); + } + if let Some(ref environment) = options.environment { + tags.insert("environment".into(), environment.clone()); + } + tags +} + +#[derive(Clone)] +struct Worker { + shared: Arc>, + default_tags: TagMap, + transport: TransportArc, +} + +impl Worker { + pub fn run(self) { + loop { + // Park instead of sleep so we can wake the thread up. Do not account for delays during + // flushing, since we benefit from some drift to spread out metric submissions. + thread::park_timeout(FLUSH_INTERVAL); + + let buckets = { + let mut guard = self.shared.lock().unwrap(); + if !guard.running { + break; + } + guard.take_buckets() + }; + + self.flush_buckets(buckets); + } + } + + pub fn flush_buckets(&self, buckets: BucketMap) { + if buckets.is_empty() { + return; + } + + // The transport is usually available when flush is called. Prefer a short lock and worst + // case throw away the result rather than blocking the transport for too long. + if let Ok(output) = self.format_payload(buckets) { + let mut envelope = Envelope::new(); + envelope.add_item(EnvelopeItem::Statsd(output)); + + if let Some(ref transport) = *self.transport.read().unwrap() { + transport.send_envelope(envelope); + } + } + } + + fn format_payload(&self, buckets: BucketMap) -> std::io::Result> { + use std::io::Write; + let mut out = vec![]; + + for (timestamp, buckets) in buckets { + for (key, value) in buckets { + write!(&mut out, "{}", SafeKey(key.name.as_ref()))?; + if key.unit != MetricUnit::None { + write!(&mut out, "@{}", key.unit)?; + } + + match value { + BucketValue::Counter(c) => { + write!(&mut out, ":{}", c)?; + } + BucketValue::Distribution(d) => { + for v in d { + write!(&mut out, ":{}", v)?; + } + } + BucketValue::Set(s) => { + for v in s { + write!(&mut out, ":{}", v)?; + } + } + BucketValue::Gauge(g) => { + write!( + &mut out, + ":{}:{}:{}:{}:{}", + g.last, g.min, g.max, g.sum, g.count + )?; + } + } + + write!(&mut out, "|{}", key.ty.as_str())?; + + for (i, (k, v)) in key.tags.iter().chain(&self.default_tags).enumerate() { + match i { + 0 => write!(&mut out, "|#")?, + _ => write!(&mut out, ",")?, + } + + write!(&mut out, "{}:{}", SafeKey(k.as_ref()), SafeVal(v.as_ref()))?; + } + + writeln!(&mut out, "|T{}", timestamp)?; + } + } + + Ok(out) + } +} + +impl fmt::Debug for Worker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Worker") + .field("transport", &format_args!("ArcTransport")) + .field("default_tags", &self.default_tags) + .finish() + } +} + +#[derive(Debug)] +pub(crate) struct MetricAggregator { + local_worker: Worker, + handle: Option>, +} + +impl MetricAggregator { + pub fn new(transport: TransportArc, options: &ClientOptions) -> Self { + let worker = Worker { + shared: Arc::new(Mutex::new(SharedAggregatorState::new())), + default_tags: get_default_tags(options), + transport, + }; + + let local_worker = worker.clone(); + + let handle = thread::Builder::new() + .name("sentry-metrics".into()) + .spawn(move || worker.run()) + .expect("failed to spawn thread"); + + Self { + local_worker, + handle: Some(handle), + } + } + + pub fn add(&self, metric: Metric) { + let Metric { + name, + unit, + value, + tags, + time, + } = metric; + + let timestamp = time + .unwrap_or_else(SystemTime::now) + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let key = BucketKey { + ty: value.ty(), + name, + unit, + tags, + }; + + let mut guard = self.local_worker.shared.lock().unwrap(); + guard.add(timestamp, key, value); + + if guard.weight() > MAX_WEIGHT { + if let Some(ref handle) = self.handle { + guard.force_flush = true; + handle.thread().unpark(); + } + } + } + + pub fn flush(&self) { + let buckets = { + let mut guard = self.local_worker.shared.lock().unwrap(); + guard.force_flush = true; + guard.take_buckets() + }; + + self.local_worker.flush_buckets(buckets); + } +} + +impl Drop for MetricAggregator { + fn drop(&mut self) { + let buckets = { + let mut guard = self.local_worker.shared.lock().unwrap(); + guard.running = false; + guard.take_buckets() + }; + + self.local_worker.flush_buckets(buckets); + + if let Some(handle) = self.handle.take() { + handle.thread().unpark(); + handle.join().unwrap(); + } + } +} + +fn safe_fmt(f: &mut fmt::Formatter<'_>, string: &str, mut check: F) -> fmt::Result +where + F: FnMut(char) -> bool, +{ + let mut valid = true; + + for c in string.chars() { + if check(c) { + valid = true; + f.write_char(c)?; + } else if valid { + valid = false; + f.write_char('_')?; + } + } + + Ok(()) +} + +// Helper that serializes a string into a safe format for metric names or tag keys. +struct SafeKey<'s>(&'s str); + +impl<'s> fmt::Display for SafeKey<'s> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + safe_fmt(f, self.0, |c| { + c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.' | '/') + }) + } +} + +// Helper that serializes a string into a safe format for tag values. +struct SafeVal<'s>(&'s str); + +impl<'s> fmt::Display for SafeVal<'s> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + safe_fmt(f, self.0, |c| { + c.is_alphanumeric() + || matches!( + c, + '_' | ':' | '/' | '@' | '.' | '{' | '}' | '[' | ']' | '$' | '-' + ) + }) + } +} + +#[cfg(test)] +mod tests { + use crate::test::{with_captured_envelopes, with_captured_envelopes_options}; + use crate::ClientOptions; + + use super::*; + + /// Returns the current system time and rounded bucket timestamp. + fn current_time() -> (SystemTime, u64) { + let now = SystemTime::now(); + let timestamp = now.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(); + let timestamp = timestamp / 10 * 10; + + (now, timestamp) + } + + fn get_single_metrics(envelopes: &[Envelope]) -> &str { + assert_eq!(envelopes.len(), 1, "expected exactly one envelope"); + + let mut items = envelopes[0].items(); + let Some(EnvelopeItem::Statsd(payload)) = items.next() else { + panic!("expected metrics item"); + }; + + std::str::from_utf8(payload).unwrap().trim() + } + + #[test] + fn test_tags() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::count("my.metric") + .with_tag("foo", "bar") + .with_tag("and", "more") + .with_time(time) + .send(); + }); + + let metrics = get_single_metrics(&envelopes); + assert_eq!(metrics, format!("my.metric:1|c|#and:more,foo:bar|T{ts}")); + } + + #[test] + fn test_unit() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::count("my.metric") + .with_time(time) + .with_unit("custom") + .send(); + }); + + let metrics = get_single_metrics(&envelopes); + assert_eq!(metrics, format!("my.metric@custom:1|c|T{ts}")); + } + + #[test] + fn test_metric_sanitation() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::count("my$$$metric").with_time(time).send(); + }); + + let metrics = get_single_metrics(&envelopes); + assert_eq!(metrics, format!("my_metric:1|c|T{ts}")); + } + + #[test] + fn test_tag_sanitation() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::count("my.metric") + .with_tag("foo-bar$$$blub", "%$föö{}") + .with_time(time) + .send(); + }); + + let metrics = get_single_metrics(&envelopes); + assert_eq!( + metrics, + format!("my.metric:1|c|#foo-bar_blub:_$föö{{}}|T{ts}") + ); + } + + #[test] + fn test_own_namespace() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::count("ns/my.metric").with_time(time).send(); + }); + + let metrics = get_single_metrics(&envelopes); + assert_eq!(metrics, format!("ns/my.metric:1|c|T{ts}")); + } + + #[test] + fn test_default_tags() { + let (time, ts) = current_time(); + + let options = ClientOptions { + release: Some("myapp@1.0.0".into()), + environment: Some("production".into()), + ..Default::default() + }; + + let envelopes = with_captured_envelopes_options( + || { + Metric::count("requests") + .with_tag("foo", "bar") + .with_time(time) + .send(); + }, + options, + ); + + let metrics = get_single_metrics(&envelopes); + assert_eq!( + metrics, + format!("requests:1|c|#foo:bar,environment:production,release:myapp@1.0.0|T{ts}") + ); + } + + #[test] + fn test_counter() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::count("my.metric").with_time(time).send(); + Metric::incr("my.metric", 2.0).with_time(time).send(); + }); + + let metrics = get_single_metrics(&envelopes); + assert_eq!(metrics, format!("my.metric:3|c|T{ts}")); + } + + #[test] + fn test_timing() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::timing("my.metric", Duration::from_millis(200)) + .with_time(time) + .send(); + Metric::timing("my.metric", Duration::from_millis(100)) + .with_time(time) + .send(); + }); + + let metrics = get_single_metrics(&envelopes); + assert_eq!(metrics, format!("my.metric@second:0.2:0.1|d|T{ts}")); + } + + #[test] + fn test_distribution() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::distribution("my.metric", 2.0) + .with_time(time) + .send(); + Metric::distribution("my.metric", 1.0) + .with_time(time) + .send(); + }); + + let metrics = get_single_metrics(&envelopes); + assert_eq!(metrics, format!("my.metric:2:1|d|T{ts}")); + } + + #[test] + fn test_set() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::set("my.metric", "hello").with_time(time).send(); + // Duplicate that should not be reflected twice + Metric::set("my.metric", "hello").with_time(time).send(); + Metric::set("my.metric", "world").with_time(time).send(); + }); + + let metrics = get_single_metrics(&envelopes); + assert_eq!(metrics, format!("my.metric:3410894750:3817476724|s|T{ts}")); + } + + #[test] + fn test_gauge() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::gauge("my.metric", 2.0).with_time(time).send(); + Metric::gauge("my.metric", 1.0).with_time(time).send(); + Metric::gauge("my.metric", 1.5).with_time(time).send(); + }); + + let metrics = get_single_metrics(&envelopes); + assert_eq!(metrics, format!("my.metric:1.5:1:2:4.5:3|g|T{ts}")); + } + + #[test] + fn test_multiple() { + let (time, ts) = current_time(); + + let envelopes = with_captured_envelopes(|| { + Metric::count("my.metric").with_time(time).send(); + Metric::distribution("my.dist", 2.0).with_time(time).send(); + }); + + let metrics = get_single_metrics(&envelopes); + println!("{metrics}"); + + assert!(metrics.contains(&format!("my.metric:1|c|T{ts}"))); + assert!(metrics.contains(&format!("my.dist:2|d|T{ts}"))); + } +} diff --git a/sentry-core/src/units.rs b/sentry-core/src/units.rs new file mode 100644 index 00000000..bc47af65 --- /dev/null +++ b/sentry-core/src/units.rs @@ -0,0 +1,260 @@ +//! Type definitions for Sentry metrics. + +use std::borrow::Cow; +use std::fmt; + +/// The unit of measurement of a metric value. +/// +/// Units augment metric values by giving them a magnitude and semantics. There are certain types of +/// units that are subdivided in their precision: +/// - [`DurationUnit`]: time durations +/// - [`InformationUnit`]: sizes of information +/// - [`FractionUnit`]: fractions +/// +/// You are not restricted to these units, but can use any `&'static str` or `String` as a unit. +#[derive(Clone, Debug, Eq, PartialEq, Hash, Default)] +pub enum MetricUnit { + /// A time duration, defaulting to `"millisecond"`. + Duration(DurationUnit), + /// Size of information derived from bytes, defaulting to `"byte"`. + Information(InformationUnit), + /// Fractions such as percentages, defaulting to `"ratio"`. + Fraction(FractionUnit), + /// user-defined units without builtin conversion or default. + Custom(Cow<'static, str>), + /// Untyped value without a unit (`""`). + #[default] + None, +} + +impl MetricUnit { + /// Returns `true` if the metric_unit is [`None`]. + pub fn is_none(&self) -> bool { + matches!(self, Self::None) + } +} + +impl fmt::Display for MetricUnit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MetricUnit::Duration(u) => u.fmt(f), + MetricUnit::Information(u) => u.fmt(f), + MetricUnit::Fraction(u) => u.fmt(f), + MetricUnit::Custom(u) => u.fmt(f), + MetricUnit::None => f.write_str("none"), + } + } +} + +impl std::str::FromStr for MetricUnit { + type Err = ParseMetricUnitError; + + fn from_str(s: &str) -> Result { + Ok(match s { + "nanosecond" | "ns" => Self::Duration(DurationUnit::NanoSecond), + "microsecond" => Self::Duration(DurationUnit::MicroSecond), + "millisecond" | "ms" => Self::Duration(DurationUnit::MilliSecond), + "second" | "s" => Self::Duration(DurationUnit::Second), + "minute" => Self::Duration(DurationUnit::Minute), + "hour" => Self::Duration(DurationUnit::Hour), + "day" => Self::Duration(DurationUnit::Day), + "week" => Self::Duration(DurationUnit::Week), + + "bit" => Self::Information(InformationUnit::Bit), + "byte" => Self::Information(InformationUnit::Byte), + "kilobyte" => Self::Information(InformationUnit::KiloByte), + "kibibyte" => Self::Information(InformationUnit::KibiByte), + "megabyte" => Self::Information(InformationUnit::MegaByte), + "mebibyte" => Self::Information(InformationUnit::MebiByte), + "gigabyte" => Self::Information(InformationUnit::GigaByte), + "gibibyte" => Self::Information(InformationUnit::GibiByte), + "terabyte" => Self::Information(InformationUnit::TeraByte), + "tebibyte" => Self::Information(InformationUnit::TebiByte), + "petabyte" => Self::Information(InformationUnit::PetaByte), + "pebibyte" => Self::Information(InformationUnit::PebiByte), + "exabyte" => Self::Information(InformationUnit::ExaByte), + "exbibyte" => Self::Information(InformationUnit::ExbiByte), + + "ratio" => Self::Fraction(FractionUnit::Ratio), + "percent" => Self::Fraction(FractionUnit::Percent), + + "" | "none" => Self::None, + _ => Self::Custom(s.to_owned().into()), + }) + } +} + +impl From for MetricUnit { + fn from(unit: DurationUnit) -> Self { + Self::Duration(unit) + } +} + +impl From for MetricUnit { + fn from(unit: InformationUnit) -> Self { + Self::Information(unit) + } +} + +impl From for MetricUnit { + fn from(unit: FractionUnit) -> Self { + Self::Fraction(unit) + } +} + +impl From<&'static str> for MetricUnit { + fn from(unit: &'static str) -> Self { + Self::Custom(unit.into()) + } +} + +impl From for MetricUnit { + fn from(unit: String) -> Self { + Self::Custom(unit.into()) + } +} + +impl From> for MetricUnit { + fn from(unit: Cow<'static, str>) -> Self { + Self::Custom(unit) + } +} + +/// Time duration units used in [`MetricUnit::Duration`]. +/// +/// Defaults to `millisecond`. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +pub enum DurationUnit { + /// Nanosecond (`"nanosecond"`), 10^-9 seconds. + NanoSecond, + /// Microsecond (`"microsecond"`), 10^-6 seconds. + MicroSecond, + /// Millisecond (`"millisecond"`), 10^-3 seconds. + MilliSecond, + /// Full second (`"second"`). + Second, + /// Minute (`"minute"`), 60 seconds. + Minute, + /// Hour (`"hour"`), 3600 seconds. + Hour, + /// Day (`"day"`), 86,400 seconds. + Day, + /// Week (`"week"`), 604,800 seconds. + Week, +} + +impl Default for DurationUnit { + fn default() -> Self { + Self::MilliSecond + } +} + +impl fmt::Display for DurationUnit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::NanoSecond => f.write_str("nanosecond"), + Self::MicroSecond => f.write_str("microsecond"), + Self::MilliSecond => f.write_str("millisecond"), + Self::Second => f.write_str("second"), + Self::Minute => f.write_str("minute"), + Self::Hour => f.write_str("hour"), + Self::Day => f.write_str("day"), + Self::Week => f.write_str("week"), + } + } +} + +/// An error parsing a [`MetricUnit`] or one of its variants. +#[derive(Clone, Copy, Debug)] +pub struct ParseMetricUnitError(()); + +/// Size of information derived from bytes, used in [`MetricUnit::Information`]. +/// +/// Defaults to `byte`. See also [Units of +/// information](https://en.wikipedia.org/wiki/Units_of_information). +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +pub enum InformationUnit { + /// Bit (`"bit"`), corresponding to 1/8 of a byte. + /// + /// Note that there are computer systems with a different number of bits per byte. + Bit, + /// Byte (`"byte"`). + Byte, + /// Kilobyte (`"kilobyte"`), 10^3 bytes. + KiloByte, + /// Kibibyte (`"kibibyte"`), 2^10 bytes. + KibiByte, + /// Megabyte (`"megabyte"`), 10^6 bytes. + MegaByte, + /// Mebibyte (`"mebibyte"`), 2^20 bytes. + MebiByte, + /// Gigabyte (`"gigabyte"`), 10^9 bytes. + GigaByte, + /// Gibibyte (`"gibibyte"`), 2^30 bytes. + GibiByte, + /// Terabyte (`"terabyte"`), 10^12 bytes. + TeraByte, + /// Tebibyte (`"tebibyte"`), 2^40 bytes. + TebiByte, + /// Petabyte (`"petabyte"`), 10^15 bytes. + PetaByte, + /// Pebibyte (`"pebibyte"`), 2^50 bytes. + PebiByte, + /// Exabyte (`"exabyte"`), 10^18 bytes. + ExaByte, + /// Exbibyte (`"exbibyte"`), 2^60 bytes. + ExbiByte, +} + +impl Default for InformationUnit { + fn default() -> Self { + Self::Byte + } +} + +impl fmt::Display for InformationUnit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Bit => f.write_str("bit"), + Self::Byte => f.write_str("byte"), + Self::KiloByte => f.write_str("kilobyte"), + Self::KibiByte => f.write_str("kibibyte"), + Self::MegaByte => f.write_str("megabyte"), + Self::MebiByte => f.write_str("mebibyte"), + Self::GigaByte => f.write_str("gigabyte"), + Self::GibiByte => f.write_str("gibibyte"), + Self::TeraByte => f.write_str("terabyte"), + Self::TebiByte => f.write_str("tebibyte"), + Self::PetaByte => f.write_str("petabyte"), + Self::PebiByte => f.write_str("pebibyte"), + Self::ExaByte => f.write_str("exabyte"), + Self::ExbiByte => f.write_str("exbibyte"), + } + } +} + +/// Units of fraction used in [`MetricUnit::Fraction`]. +/// +/// Defaults to `ratio`. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +pub enum FractionUnit { + /// Floating point fraction of `1`. + Ratio, + /// Ratio expressed as a fraction of `100`. `100%` equals a ratio of `1.0`. + Percent, +} + +impl Default for FractionUnit { + fn default() -> Self { + Self::Ratio + } +} + +impl fmt::Display for FractionUnit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Ratio => f.write_str("ratio"), + Self::Percent => f.write_str("percent"), + } + } +} diff --git a/sentry-types/Cargo.toml b/sentry-types/Cargo.toml index 4ef095cc..baeb8989 100644 --- a/sentry-types/Cargo.toml +++ b/sentry-types/Cargo.toml @@ -19,6 +19,7 @@ all-features = true [features] default = ["protocol"] protocol = [] +UNSTABLE_metrics = [] [dependencies] debugid = { version = "0.8.0", features = ["serde"] } diff --git a/sentry-types/src/protocol/envelope.rs b/sentry-types/src/protocol/envelope.rs index ba95c7a6..02f77803 100644 --- a/sentry-types/src/protocol/envelope.rs +++ b/sentry-types/src/protocol/envelope.rs @@ -4,7 +4,9 @@ use serde::Deserialize; use thiserror::Error; use uuid::Uuid; -use super::v7::{ +use super::v7 as protocol; + +use protocol::{ Attachment, AttachmentType, Event, MonitorCheckIn, SessionAggregates, SessionUpdate, Transaction, }; @@ -42,6 +44,7 @@ struct EnvelopeHeader { /// An Envelope Item Type. #[derive(Clone, Debug, Eq, PartialEq, Deserialize)] +#[non_exhaustive] enum EnvelopeItemType { /// An Event Item type. #[serde(rename = "event")] @@ -61,6 +64,10 @@ enum EnvelopeItemType { /// A Monitor Check In Item Type #[serde(rename = "check_in")] MonitorCheckIn, + /// A Metrics Item type. + #[cfg(feature = "UNSTABLE_metrics")] + #[serde(rename = "statsd")] + Metrics, } /// An Envelope Item Header. @@ -109,6 +116,9 @@ pub enum EnvelopeItem { Attachment(Attachment), /// A MonitorCheckIn item. MonitorCheckIn(MonitorCheckIn), + /// A Metrics Item. + #[cfg(feature = "UNSTABLE_metrics")] + Statsd(Vec), /// This is a sentinel item used to `filter` raw envelopes. Raw, // TODO: @@ -349,6 +359,8 @@ impl Envelope { EnvelopeItem::MonitorCheckIn(check_in) => { serde_json::to_writer(&mut item_buf, check_in)? } + #[cfg(feature = "UNSTABLE_metrics")] + EnvelopeItem::Statsd(statsd) => item_buf.extend_from_slice(statsd), EnvelopeItem::Raw => { continue; } @@ -358,8 +370,10 @@ impl Envelope { EnvelopeItem::SessionUpdate(_) => "session", EnvelopeItem::SessionAggregates(_) => "sessions", EnvelopeItem::Transaction(_) => "transaction", - EnvelopeItem::Attachment(_) | EnvelopeItem::Raw => unreachable!(), EnvelopeItem::MonitorCheckIn(_) => "check_in", + #[cfg(feature = "UNSTABLE_metrics")] + EnvelopeItem::Statsd(_) => "statsd", + EnvelopeItem::Attachment(_) | EnvelopeItem::Raw => unreachable!(), }; writeln!( writer, @@ -503,6 +517,8 @@ impl Envelope { EnvelopeItemType::MonitorCheckIn => { serde_json::from_slice(payload).map(EnvelopeItem::MonitorCheckIn) } + #[cfg(feature = "UNSTABLE_metrics")] + EnvelopeItemType::Metrics => Ok(EnvelopeItem::Statsd(payload.into())), } .map_err(EnvelopeError::InvalidItemPayload)?; diff --git a/sentry/Cargo.toml b/sentry/Cargo.toml index 7b06c427..ebfec0d8 100644 --- a/sentry/Cargo.toml +++ b/sentry/Cargo.toml @@ -22,6 +22,7 @@ rustdoc-args = ["--cfg", "doc_cfg"] [features] default = ["backtrace", "contexts", "debug-images", "panic", "transport"] +UNSTABLE_metrics = ["sentry-core/UNSTABLE_metrics"] # default integrations backtrace = ["sentry-backtrace", "sentry-tracing?/backtrace"] diff --git a/sentry/src/transports/ratelimit.rs b/sentry/src/transports/ratelimit.rs index 0aa36bb7..a64ce3a9 100644 --- a/sentry/src/transports/ratelimit.rs +++ b/sentry/src/transports/ratelimit.rs @@ -12,6 +12,7 @@ pub struct RateLimiter { session: Option, transaction: Option, attachment: Option, + statsd: Option, } impl RateLimiter { @@ -56,6 +57,7 @@ impl RateLimiter { "session" => self.session = new_time, "transaction" => self.transaction = new_time, "attachment" => self.attachment = new_time, + "statsd" => self.statsd = new_time, _ => {} } } @@ -89,6 +91,7 @@ impl RateLimiter { RateLimitingCategory::Session => self.session, RateLimitingCategory::Transaction => self.transaction, RateLimitingCategory::Attachment => self.attachment, + RateLimitingCategory::Statsd => self.statsd, }?; time_left.duration_since(SystemTime::now()).ok() } @@ -112,6 +115,8 @@ impl RateLimiter { } EnvelopeItem::Transaction(_) => RateLimitingCategory::Transaction, EnvelopeItem::Attachment(_) => RateLimitingCategory::Attachment, + #[cfg(feature = "UNSTABLE_metrics")] + EnvelopeItem::Statsd(_) => RateLimitingCategory::Statsd, _ => RateLimitingCategory::Any, }) }) @@ -131,6 +136,9 @@ pub enum RateLimitingCategory { Transaction, /// Rate Limit pertaining to Attachments. Attachment, + /// Rate Limit pertaining to metrics. + #[allow(unused)] + Statsd, } #[cfg(test)] @@ -149,8 +157,8 @@ mod tests { rl.update_from_sentry_header( r#" - 30::bar, - 120:invalid:invalid, + 30::bar, + 120:invalid:invalid, 4711:foo;bar;baz;security:project "#, );