diff --git a/src/sinks/appsignal/integration_tests.rs b/src/sinks/appsignal/integration_tests.rs index 1c28032caf40d..8054da4eb92cc 100644 --- a/src/sinks/appsignal/integration_tests.rs +++ b/src/sinks/appsignal/integration_tests.rs @@ -98,14 +98,23 @@ async fn metrics_real_endpoint() { #[tokio::test] async fn metrics_shape() { let events: Vec<_> = (0..5) - .map(|index| { - Event::Metric(Metric::new( - format!("counter_{}", index), - MetricKind::Absolute, - MetricValue::Counter { - value: index as f64, - }, - )) + .flat_map(|index| { + vec![ + Event::Metric(Metric::new( + format!("counter_{}", index), + MetricKind::Absolute, + MetricValue::Counter { + value: index as f64, + }, + )), + Event::Metric(Metric::new( + format!("counter_{}", index), + MetricKind::Absolute, + MetricValue::Counter { + value: (index + index) as f64, + }, + )), + ] }) .collect(); let api_key = push_api_key(); @@ -146,11 +155,11 @@ async fn metrics_shape() { .collect(); assert_eq!( vec![ - ("counter_0", "absolute", 0.0), - ("counter_1", "absolute", 1.0), - ("counter_2", "absolute", 2.0), - ("counter_3", "absolute", 3.0), - ("counter_4", "absolute", 4.0), + ("counter_0", "incremental", 0.0), + ("counter_1", "incremental", 1.0), + ("counter_2", "incremental", 2.0), + ("counter_3", "incremental", 3.0), + ("counter_4", "incremental", 4.0), ], metrics ); @@ -231,11 +240,18 @@ async fn error_scenario_real_endpoint() { let (sink, _) = config.build(cx).await.unwrap(); let (batch, receiver) = BatchNotifier::new_with_receiver(); - let events = vec![Event::Metric(Metric::new( - "counter", - MetricKind::Absolute, - MetricValue::Counter { value: 1.0 }, - ))]; + let events = vec![ + Event::Metric(Metric::new( + "counter", + MetricKind::Absolute, + MetricValue::Counter { value: 1.0 }, + )), + Event::Metric(Metric::new( + "counter", + MetricKind::Absolute, + MetricValue::Counter { value: 2.0 }, + )), + ]; let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); sink.run(stream).await.unwrap(); diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 175c456bf6cbc..ecfa404b33dbf 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -8,6 +8,7 @@ mod config; mod encoder; +mod normalizer; mod request_builder; mod service; mod sink; diff --git a/src/sinks/appsignal/normalizer.rs b/src/sinks/appsignal/normalizer.rs new file mode 100644 index 0000000000000..0108de6516b3f --- /dev/null +++ b/src/sinks/appsignal/normalizer.rs @@ -0,0 +1,78 @@ +use vector_core::event::{Metric, MetricValue}; + +use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; + +#[derive(Default)] +pub(crate) struct AppsignalMetricsNormalizer; + +impl MetricNormalize for AppsignalMetricsNormalizer { + fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option { + // We only care about making sure that counters are incremental, and that gauges are + // always absolute. Other metric types are currently unsupported. + match &metric.value() { + // We always send counters as incremental and gauges as absolute. Realistically, any + // system sending an incremental gauge update is kind of doing it wrong, but alas. + MetricValue::Counter { .. } => state.make_incremental(metric), + MetricValue::Gauge { .. } => state.make_absolute(metric), + // Otherwise, send it through as-is. + _ => Some(metric), + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + + use crate::event::{Metric, MetricKind, MetricValue}; + + use super::AppsignalMetricsNormalizer; + use crate::test_util::metrics::{assert_normalize, tests}; + + #[test] + fn absolute_counter() { + tests::absolute_counter_normalize_to_incremental(AppsignalMetricsNormalizer); + } + + #[test] + fn incremental_counter() { + tests::incremental_counter_normalize_to_incremental(AppsignalMetricsNormalizer); + } + + #[test] + fn mixed_counter() { + tests::mixed_counter_normalize_to_incremental(AppsignalMetricsNormalizer); + } + + #[test] + fn absolute_gauge() { + tests::absolute_gauge_normalize_to_absolute(AppsignalMetricsNormalizer); + } + + #[test] + fn incremental_gauge() { + tests::incremental_gauge_normalize_to_absolute(AppsignalMetricsNormalizer); + } + + #[test] + fn mixed_gauge() { + tests::mixed_gauge_normalize_to_absolute(AppsignalMetricsNormalizer); + } + + #[test] + fn other_metrics() { + let metric = Metric::new( + "set", + MetricKind::Incremental, + MetricValue::Set { + values: BTreeSet::new(), + }, + ); + + assert_normalize( + AppsignalMetricsNormalizer, + vec![metric.clone()], + vec![Some(metric)], + ); + } +} diff --git a/src/sinks/appsignal/sink.rs b/src/sinks/appsignal/sink.rs index ab9b135829abb..b908ed757a45a 100644 --- a/src/sinks/appsignal/sink.rs +++ b/src/sinks/appsignal/sink.rs @@ -1,4 +1,5 @@ use futures::{stream::BoxStream, StreamExt}; +use futures_util::future::ready; use tower::{Service, ServiceBuilder}; use vector_core::{ event::Event, @@ -7,12 +8,14 @@ use vector_core::{ }; use crate::{ - codecs::Transformer, internal_events::SinkRequestBuildError, - sinks::util::builder::SinkBuilderExt, sinks::util::Compression, + codecs::Transformer, + internal_events::SinkRequestBuildError, + sinks::util::{buffer::metrics::MetricNormalizer, builder::SinkBuilderExt, Compression}, }; use super::{ encoder::AppsignalEncoder, + normalizer::AppsignalMetricsNormalizer, request_builder::{AppsignalRequest, AppsignalRequestBuilder}, }; @@ -32,8 +35,16 @@ where { pub(super) async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let service = ServiceBuilder::new().service(self.service); + let mut normalizer = MetricNormalizer::::default(); input + .filter_map(move |event| { + ready(if let Event::Metric(metric) = event { + normalizer.normalize(metric).map(Event::Metric) + } else { + Some(event) + }) + }) .batched(self.batch_settings.into_byte_size_config()) .request_builder( None, diff --git a/src/sinks/datadog/metrics/normalizer.rs b/src/sinks/datadog/metrics/normalizer.rs index d4e6430250e1c..6b6d27b1c0649 100644 --- a/src/sinks/datadog/metrics/normalizer.rs +++ b/src/sinks/datadog/metrics/normalizer.rs @@ -38,131 +38,16 @@ impl MetricNormalize for DatadogMetricsNormalizer { #[cfg(test)] mod tests { - use std::fmt; - use vector_core::{ - event::{ - metric::{Bucket, MetricSketch, Sample}, - Metric, MetricKind, MetricValue, StatisticKind, - }, + event::{metric::MetricSketch, Metric, MetricKind, MetricValue}, metrics::AgentDDSketch, }; use super::DatadogMetricsNormalizer; - use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; - - fn buckets_from_samples(values: &[f64]) -> (Vec, f64, u64) { - // Generate buckets, and general statistics, for an input set of data. We only use this in - // tests, and so we have some semi-realistic buckets here, but mainly we use them for testing, - // not for most accurately/efficiently representing the input samples. - let bounds = &[ - 1.0, - 2.0, - 4.0, - 8.0, - 16.0, - 32.0, - 64.0, - 128.0, - 256.0, - 512.0, - 1024.0, - f64::INFINITY, - ]; - let mut buckets = bounds - .iter() - .map(|b| Bucket { - upper_limit: *b, - count: 0, - }) - .collect::>(); - - let mut sum = 0.0; - let mut count = 0; - for value in values { - for bucket in buckets.iter_mut() { - if *value <= bucket.upper_limit { - bucket.count += 1; - } - } - - sum += *value; - count += 1; - } - - (buckets, sum, count) - } - - fn generate_f64s(start: u16, end: u16) -> Vec { - assert!(start <= end); - let mut samples = Vec::new(); - for n in start..=end { - samples.push(f64::from(n)); - } - samples - } - - fn get_counter(value: f64, kind: MetricKind) -> Metric { - Metric::new("counter", kind, MetricValue::Counter { value }) - } - - fn get_gauge(value: f64, kind: MetricKind) -> Metric { - Metric::new("gauge", kind, MetricValue::Gauge { value }) - } - fn get_set(values: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: fmt::Display, - { - Metric::new( - "set", - kind, - MetricValue::Set { - values: values.into_iter().map(|i| i.to_string()).collect(), - }, - ) - } - - fn get_distribution(samples: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: Into, - { - Metric::new( - "distribution", - kind, - MetricValue::Distribution { - samples: samples - .into_iter() - .map(|n| Sample { - value: n.into(), - rate: 1, - }) - .collect(), - statistic: StatisticKind::Histogram, - }, - ) - } - - fn get_aggregated_histogram(samples: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: Into, - { - let samples = samples.into_iter().map(Into::into).collect::>(); - let (buckets, sum, count) = buckets_from_samples(&samples); - - Metric::new( - "agg_histogram", - kind, - MetricValue::AggregatedHistogram { - buckets, - count, - sum, - }, - ) - } + use crate::test_util::metrics::{ + assert_normalize, generate_f64s, get_aggregated_histogram, get_distribution, tests, + }; fn get_sketch(name: N, samples: S, kind: MetricKind) -> Metric where @@ -183,199 +68,49 @@ mod tests { ) } - fn run_comparisons(inputs: Vec, expected_outputs: Vec>) { - let mut metric_set = MetricSet::default(); - let mut normalizer = DatadogMetricsNormalizer; - - for (input, expected) in inputs.into_iter().zip(expected_outputs) { - let result = normalizer.normalize(&mut metric_set, input); - assert_eq!(result, expected); - } - } - #[test] fn absolute_counter() { - let first_value = 3.14; - let second_value = 8.675309; - - let counters = vec![ - get_counter(first_value, MetricKind::Absolute), - get_counter(second_value, MetricKind::Absolute), - ]; - - let expected_counters = vec![ - None, - Some(get_counter( - second_value - first_value, - MetricKind::Incremental, - )), - ]; - - run_comparisons(counters, expected_counters); + tests::absolute_counter_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] fn incremental_counter() { - let first_value = 3.14; - let second_value = 8.675309; - - let counters = vec![ - get_counter(first_value, MetricKind::Incremental), - get_counter(second_value, MetricKind::Incremental), - ]; - - let expected_counters = counters - .clone() - .into_iter() - .map(Option::Some) - .collect::>(); - - run_comparisons(counters, expected_counters); + tests::incremental_counter_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] fn mixed_counter() { - let first_value = 3.14; - let second_value = 8.675309; - let third_value = 16.19; - - let counters = vec![ - get_counter(first_value, MetricKind::Incremental), - get_counter(second_value, MetricKind::Absolute), - get_counter(third_value, MetricKind::Absolute), - get_counter(first_value, MetricKind::Absolute), - get_counter(second_value, MetricKind::Incremental), - get_counter(third_value, MetricKind::Incremental), - ]; - - let expected_counters = vec![ - Some(get_counter(first_value, MetricKind::Incremental)), - None, - Some(get_counter( - third_value - second_value, - MetricKind::Incremental, - )), - None, - Some(get_counter(second_value, MetricKind::Incremental)), - Some(get_counter(third_value, MetricKind::Incremental)), - ]; - - run_comparisons(counters, expected_counters); + tests::mixed_counter_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] fn absolute_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Absolute), - get_gauge(second_value, MetricKind::Absolute), - ]; - - let expected_gauges = gauges - .clone() - .into_iter() - .map(Option::Some) - .collect::>(); - - run_comparisons(gauges, expected_gauges); + tests::absolute_gauge_normalize_to_absolute(DatadogMetricsNormalizer); } #[test] fn incremental_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Incremental), - get_gauge(second_value, MetricKind::Incremental), - ]; - - let expected_gauges = vec![ - Some(get_gauge(first_value, MetricKind::Absolute)), - Some(get_gauge(first_value + second_value, MetricKind::Absolute)), - ]; - - run_comparisons(gauges, expected_gauges); + tests::incremental_gauge_normalize_to_absolute(DatadogMetricsNormalizer); } #[test] fn mixed_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - let third_value = 16.19; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Incremental), - get_gauge(second_value, MetricKind::Absolute), - get_gauge(third_value, MetricKind::Absolute), - get_gauge(first_value, MetricKind::Absolute), - get_gauge(second_value, MetricKind::Incremental), - get_gauge(third_value, MetricKind::Incremental), - ]; - - let expected_gauges = vec![ - Some(get_gauge(first_value, MetricKind::Absolute)), - Some(get_gauge(second_value, MetricKind::Absolute)), - Some(get_gauge(third_value, MetricKind::Absolute)), - Some(get_gauge(first_value, MetricKind::Absolute)), - Some(get_gauge(first_value + second_value, MetricKind::Absolute)), - Some(get_gauge( - first_value + second_value + third_value, - MetricKind::Absolute, - )), - ]; - - run_comparisons(gauges, expected_gauges); + tests::mixed_gauge_normalize_to_absolute(DatadogMetricsNormalizer); } #[test] fn absolute_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Absolute), - get_set(15..=25, MetricKind::Absolute), - ]; - - let expected_sets = vec![None, Some(get_set(21..=25, MetricKind::Incremental))]; - - run_comparisons(sets, expected_sets); + tests::absolute_set_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] fn incremental_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Incremental), - get_set(15..=25, MetricKind::Incremental), - ]; - - let expected_sets = vec![ - Some(get_set(1..=20, MetricKind::Incremental)), - Some(get_set(15..=25, MetricKind::Incremental)), - ]; - - run_comparisons(sets, expected_sets); + tests::incremental_set_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] fn mixed_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Incremental), - get_set(10..=16, MetricKind::Absolute), - get_set(15..=25, MetricKind::Absolute), - get_set(1..5, MetricKind::Incremental), - get_set(3..=42, MetricKind::Incremental), - ]; - - let expected_sets = vec![ - Some(get_set(1..=20, MetricKind::Incremental)), - None, - Some(get_set(17..=25, MetricKind::Incremental)), - Some(get_set(1..5, MetricKind::Incremental)), - Some(get_set(3..=42, MetricKind::Incremental)), - ]; - - run_comparisons(sets, expected_sets); + tests::mixed_set_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] @@ -401,7 +136,7 @@ mod tests { )), ]; - run_comparisons(distributions, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, distributions, expected_sketches); } #[test] @@ -429,7 +164,7 @@ mod tests { )), ]; - run_comparisons(distributions, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, distributions, expected_sketches); } #[test] @@ -476,7 +211,7 @@ mod tests { )), ]; - run_comparisons(distributions, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, distributions, expected_sketches); } #[test] @@ -501,7 +236,7 @@ mod tests { ), ]; - run_comparisons(agg_histograms, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, agg_histograms, expected_sketches); } #[test] @@ -533,7 +268,7 @@ mod tests { ), ]; - run_comparisons(agg_histograms, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, agg_histograms, expected_sketches); } #[test] @@ -588,6 +323,6 @@ mod tests { ), ]; - run_comparisons(agg_histograms, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, agg_histograms, expected_sketches); } } diff --git a/src/sinks/statsd/normalizer.rs b/src/sinks/statsd/normalizer.rs index e497d074dfb16..2592292e7023c 100644 --- a/src/sinks/statsd/normalizer.rs +++ b/src/sinks/statsd/normalizer.rs @@ -20,226 +20,33 @@ impl MetricNormalize for StatsdNormalizer { #[cfg(test)] mod tests { - use std::fmt; - - use vector_core::event::{ - metric::{Bucket, Sample}, - Metric, MetricKind, MetricValue, StatisticKind, - }; + use vector_core::event::MetricKind; use super::StatsdNormalizer; - use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; - - fn buckets_from_samples(values: &[f64]) -> (Vec, f64, u64) { - // Generate buckets, and general statistics, for an input set of data. We only use this in - // tests, and so we have some semi-realistic buckets here, but mainly we use them for testing, - // not for most accurately/efficiently representing the input samples. - let bounds = &[ - 1.0, - 2.0, - 4.0, - 8.0, - 16.0, - 32.0, - 64.0, - 128.0, - 256.0, - 512.0, - 1024.0, - f64::INFINITY, - ]; - let mut buckets = bounds - .iter() - .map(|b| Bucket { - upper_limit: *b, - count: 0, - }) - .collect::>(); - - let mut sum = 0.0; - let mut count = 0; - for value in values { - for bucket in buckets.iter_mut() { - if *value <= bucket.upper_limit { - bucket.count += 1; - } - } - - sum += *value; - count += 1; - } - - (buckets, sum, count) - } - - fn generate_f64s(start: u16, end: u16) -> Vec { - assert!(start <= end); - let mut samples = Vec::new(); - for n in start..=end { - samples.push(f64::from(n)); - } - samples - } - - fn get_counter(value: f64, kind: MetricKind) -> Metric { - Metric::new("counter", kind, MetricValue::Counter { value }) - } - - fn get_gauge(value: f64, kind: MetricKind) -> Metric { - Metric::new("gauge", kind, MetricValue::Gauge { value }) - } - - fn get_set(values: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: fmt::Display, - { - Metric::new( - "set", - kind, - MetricValue::Set { - values: values.into_iter().map(|i| i.to_string()).collect(), - }, - ) - } - - fn get_distribution(samples: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: Into, - { - Metric::new( - "distribution", - kind, - MetricValue::Distribution { - samples: samples - .into_iter() - .map(|n| Sample { - value: n.into(), - rate: 1, - }) - .collect(), - statistic: StatisticKind::Histogram, - }, - ) - } - - fn get_aggregated_histogram(samples: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: Into, - { - let samples = samples.into_iter().map(Into::into).collect::>(); - let (buckets, sum, count) = buckets_from_samples(&samples); - - Metric::new( - "agg_histogram", - kind, - MetricValue::AggregatedHistogram { - buckets, - count, - sum, - }, - ) - } - fn run_comparisons(inputs: Vec, expected_outputs: Vec>) { - let mut metric_set = MetricSet::default(); - let mut normalizer = StatsdNormalizer; - - for (input, expected) in inputs.into_iter().zip(expected_outputs) { - let result = normalizer.normalize(&mut metric_set, input); - assert_eq!(result, expected); - } - } + use crate::test_util::metrics::{ + assert_normalize, generate_f64s, get_aggregated_histogram, get_distribution, get_gauge, + tests, + }; #[test] fn absolute_counter() { - let first_value = 3.14; - let second_value = 8.675309; - - let counters = vec![ - get_counter(first_value, MetricKind::Absolute), - get_counter(second_value, MetricKind::Absolute), - ]; - - let expected_counters = vec![ - None, - Some(get_counter( - second_value - first_value, - MetricKind::Incremental, - )), - ]; - - run_comparisons(counters, expected_counters); + tests::absolute_counter_normalize_to_incremental(StatsdNormalizer); } #[test] fn incremental_counter() { - let first_value = 3.14; - let second_value = 8.675309; - - let counters = vec![ - get_counter(first_value, MetricKind::Incremental), - get_counter(second_value, MetricKind::Incremental), - ]; - - let expected_counters = counters - .clone() - .into_iter() - .map(Option::Some) - .collect::>(); - - run_comparisons(counters, expected_counters); + tests::incremental_counter_normalize_to_incremental(StatsdNormalizer); } #[test] fn mixed_counter() { - let first_value = 3.14; - let second_value = 8.675309; - let third_value = 16.19; - - let counters = vec![ - get_counter(first_value, MetricKind::Incremental), - get_counter(second_value, MetricKind::Absolute), - get_counter(third_value, MetricKind::Absolute), - get_counter(first_value, MetricKind::Absolute), - get_counter(second_value, MetricKind::Incremental), - get_counter(third_value, MetricKind::Incremental), - ]; - - let expected_counters = vec![ - Some(get_counter(first_value, MetricKind::Incremental)), - None, - Some(get_counter( - third_value - second_value, - MetricKind::Incremental, - )), - None, - Some(get_counter(second_value, MetricKind::Incremental)), - Some(get_counter(third_value, MetricKind::Incremental)), - ]; - - run_comparisons(counters, expected_counters); + tests::mixed_counter_normalize_to_incremental(StatsdNormalizer); } #[test] fn absolute_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Absolute), - get_gauge(second_value, MetricKind::Absolute), - ]; - - let expected_gauges = gauges - .clone() - .into_iter() - .map(Option::Some) - .collect::>(); - - run_comparisons(gauges, expected_gauges); + tests::absolute_gauge_normalize_to_absolute(StatsdNormalizer); } #[test] @@ -258,7 +65,7 @@ mod tests { .map(Option::Some) .collect::>(); - run_comparisons(gauges, expected_gauges); + assert_normalize(StatsdNormalizer, gauges, expected_gauges); } #[test] @@ -282,55 +89,22 @@ mod tests { .map(Option::Some) .collect::>(); - run_comparisons(gauges, expected_gauges); + assert_normalize(StatsdNormalizer, gauges, expected_gauges); } #[test] fn absolute_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Absolute), - get_set(15..=25, MetricKind::Absolute), - ]; - - let expected_sets = vec![None, Some(get_set(21..=25, MetricKind::Incremental))]; - - run_comparisons(sets, expected_sets); + tests::absolute_set_normalize_to_incremental(StatsdNormalizer); } #[test] fn incremental_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Incremental), - get_set(15..=25, MetricKind::Incremental), - ]; - - let expected_sets = vec![ - Some(get_set(1..=20, MetricKind::Incremental)), - Some(get_set(15..=25, MetricKind::Incremental)), - ]; - - run_comparisons(sets, expected_sets); + tests::incremental_set_normalize_to_incremental(StatsdNormalizer); } #[test] fn mixed_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Incremental), - get_set(10..=16, MetricKind::Absolute), - get_set(15..=25, MetricKind::Absolute), - get_set(1..5, MetricKind::Incremental), - get_set(3..=42, MetricKind::Incremental), - ]; - - let expected_sets = vec![ - Some(get_set(1..=20, MetricKind::Incremental)), - None, - Some(get_set(17..=25, MetricKind::Incremental)), - Some(get_set(1..5, MetricKind::Incremental)), - Some(get_set(3..=42, MetricKind::Incremental)), - ]; - - run_comparisons(sets, expected_sets); + tests::mixed_set_normalize_to_incremental(StatsdNormalizer); } #[test] @@ -349,7 +123,7 @@ mod tests { Some(get_distribution(expected_samples, MetricKind::Incremental)), ]; - run_comparisons(distributions, expected_distributions); + assert_normalize(StatsdNormalizer, distributions, expected_distributions); } #[test] @@ -364,7 +138,7 @@ mod tests { let expected_distributions = distributions.iter().cloned().map(Some).collect(); - run_comparisons(distributions, expected_distributions); + assert_normalize(StatsdNormalizer, distributions, expected_distributions); } #[test] @@ -394,7 +168,7 @@ mod tests { Some(distributions[4].clone()), ]; - run_comparisons(distributions, expected_distributions); + assert_normalize(StatsdNormalizer, distributions, expected_distributions); } #[test] @@ -409,7 +183,7 @@ mod tests { let expected_agg_histograms = vec![]; - run_comparisons(agg_histograms, expected_agg_histograms); + assert_normalize(StatsdNormalizer, agg_histograms, expected_agg_histograms); } #[test] @@ -428,7 +202,7 @@ mod tests { .map(Option::Some) .collect::>(); - run_comparisons(agg_histograms, expected_agg_histograms); + assert_normalize(StatsdNormalizer, agg_histograms, expected_agg_histograms); } #[test] @@ -449,6 +223,6 @@ mod tests { let expected_agg_histograms = vec![]; - run_comparisons(agg_histograms, expected_agg_histograms); + assert_normalize(StatsdNormalizer, agg_histograms, expected_agg_histograms); } } diff --git a/src/test_util/metrics.rs b/src/test_util/metrics.rs index 9a3bdca726b71..749df9d736ce9 100644 --- a/src/test_util/metrics.rs +++ b/src/test_util/metrics.rs @@ -1,10 +1,13 @@ use std::collections::{HashMap, HashSet}; +use std::fmt::Display; + use vector_core::event::{ - metric::{MetricData, MetricSeries, Sample}, - Event, EventMetadata, Metric, MetricValue, + metric::{Bucket, MetricData, MetricSeries, Sample}, + Event, EventMetadata, Metric, MetricValue, StatisticKind, }; +use crate::event::MetricKind; use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; type SplitMetrics = HashMap; @@ -246,3 +249,309 @@ pub fn assert_set(metrics: &SplitMetrics, series: MetricSeries, expected_values: assert_eq!(actual_values, expected_values); } + +fn buckets_from_samples(values: &[f64]) -> (Vec, f64, u64) { + // Generate buckets, and general statistics, for an input set of data. We only use this in + // tests, and so we have some semi-realistic buckets here, but mainly we use them for testing, + // not for most accurately/efficiently representing the input samples. + let bounds = &[ + 1.0, + 2.0, + 4.0, + 8.0, + 16.0, + 32.0, + 64.0, + 128.0, + 256.0, + 512.0, + 1024.0, + f64::INFINITY, + ]; + let mut buckets = bounds + .iter() + .map(|b| Bucket { + upper_limit: *b, + count: 0, + }) + .collect::>(); + + let mut sum = 0.0; + let mut count = 0; + for value in values { + for bucket in buckets.iter_mut() { + if *value <= bucket.upper_limit { + bucket.count += 1; + } + } + + sum += *value; + count += 1; + } + + (buckets, sum, count) +} + +pub fn generate_f64s(start: u16, end: u16) -> Vec { + assert!(start <= end); + let mut samples = Vec::new(); + for n in start..=end { + samples.push(f64::from(n)); + } + samples +} + +pub fn get_set(values: S, kind: MetricKind) -> Metric +where + S: IntoIterator, + V: Display, +{ + Metric::new( + "set", + kind, + MetricValue::Set { + values: values.into_iter().map(|i| i.to_string()).collect(), + }, + ) +} + +pub fn get_distribution(samples: S, kind: MetricKind) -> Metric +where + S: IntoIterator, + V: Into, +{ + Metric::new( + "distribution", + kind, + MetricValue::Distribution { + samples: samples + .into_iter() + .map(|n| Sample { + value: n.into(), + rate: 1, + }) + .collect(), + statistic: StatisticKind::Histogram, + }, + ) +} + +pub fn get_aggregated_histogram(samples: S, kind: MetricKind) -> Metric +where + S: IntoIterator, + V: Into, +{ + let samples = samples.into_iter().map(Into::into).collect::>(); + let (buckets, sum, count) = buckets_from_samples(&samples); + + Metric::new( + "agg_histogram", + kind, + MetricValue::AggregatedHistogram { + buckets, + count, + sum, + }, + ) +} + +pub fn get_counter(value: f64, kind: MetricKind) -> Metric { + Metric::new("counter", kind, MetricValue::Counter { value }) +} + +pub fn get_gauge(value: f64, kind: MetricKind) -> Metric { + Metric::new("gauge", kind, MetricValue::Gauge { value }) +} + +pub fn assert_normalize( + mut normalizer: N, + inputs: Vec, + expected_outputs: Vec>, +) { + let mut metric_set = MetricSet::default(); + + for (input, expected) in inputs.into_iter().zip(expected_outputs) { + let result = normalizer.normalize(&mut metric_set, input); + assert_eq!(result, expected); + } +} + +pub mod tests { + use super::*; + + pub fn absolute_counter_normalize_to_incremental(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + + let counters = vec![ + get_counter(first_value, MetricKind::Absolute), + get_counter(second_value, MetricKind::Absolute), + ]; + + let expected_counters = vec![ + None, + Some(get_counter( + second_value - first_value, + MetricKind::Incremental, + )), + ]; + + assert_normalize(normalizer, counters, expected_counters); + } + + pub fn incremental_counter_normalize_to_incremental(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + + let counters = vec![ + get_counter(first_value, MetricKind::Incremental), + get_counter(second_value, MetricKind::Incremental), + ]; + + let expected_counters = counters + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + assert_normalize(normalizer, counters, expected_counters); + } + + pub fn mixed_counter_normalize_to_incremental(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + let third_value = 16.19; + + let counters = vec![ + get_counter(first_value, MetricKind::Incremental), + get_counter(second_value, MetricKind::Absolute), + get_counter(third_value, MetricKind::Absolute), + get_counter(first_value, MetricKind::Absolute), + get_counter(second_value, MetricKind::Incremental), + get_counter(third_value, MetricKind::Incremental), + ]; + + let expected_counters = vec![ + Some(get_counter(first_value, MetricKind::Incremental)), + None, + Some(get_counter( + third_value - second_value, + MetricKind::Incremental, + )), + None, + Some(get_counter(second_value, MetricKind::Incremental)), + Some(get_counter(third_value, MetricKind::Incremental)), + ]; + + assert_normalize(normalizer, counters, expected_counters); + } + + pub fn absolute_gauge_normalize_to_absolute(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Absolute), + get_gauge(second_value, MetricKind::Absolute), + ]; + + let expected_gauges = gauges + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + assert_normalize(normalizer, gauges, expected_gauges); + } + + pub fn incremental_gauge_normalize_to_absolute(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Incremental), + get_gauge(second_value, MetricKind::Incremental), + ]; + + let expected_gauges = vec![ + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(first_value + second_value, MetricKind::Absolute)), + ]; + + assert_normalize(normalizer, gauges, expected_gauges); + } + + pub fn mixed_gauge_normalize_to_absolute(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + let third_value = 16.19; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Incremental), + get_gauge(second_value, MetricKind::Absolute), + get_gauge(third_value, MetricKind::Absolute), + get_gauge(first_value, MetricKind::Absolute), + get_gauge(second_value, MetricKind::Incremental), + get_gauge(third_value, MetricKind::Incremental), + ]; + + let expected_gauges = vec![ + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(second_value, MetricKind::Absolute)), + Some(get_gauge(third_value, MetricKind::Absolute)), + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(first_value + second_value, MetricKind::Absolute)), + Some(get_gauge( + first_value + second_value + third_value, + MetricKind::Absolute, + )), + ]; + + assert_normalize(normalizer, gauges, expected_gauges); + } + + pub fn absolute_set_normalize_to_incremental(normalizer: N) { + let sets = vec![ + get_set(1..=20, MetricKind::Absolute), + get_set(15..=25, MetricKind::Absolute), + ]; + + let expected_sets = vec![None, Some(get_set(21..=25, MetricKind::Incremental))]; + + assert_normalize(normalizer, sets, expected_sets); + } + + pub fn incremental_set_normalize_to_incremental(normalizer: N) { + let sets = vec![ + get_set(1..=20, MetricKind::Incremental), + get_set(15..=25, MetricKind::Incremental), + ]; + + let expected_sets = vec![ + Some(get_set(1..=20, MetricKind::Incremental)), + Some(get_set(15..=25, MetricKind::Incremental)), + ]; + + assert_normalize(normalizer, sets, expected_sets); + } + + pub fn mixed_set_normalize_to_incremental(normalizer: N) { + let sets = vec![ + get_set(1..=20, MetricKind::Incremental), + get_set(10..=16, MetricKind::Absolute), + get_set(15..=25, MetricKind::Absolute), + get_set(1..5, MetricKind::Incremental), + get_set(3..=42, MetricKind::Incremental), + ]; + + let expected_sets = vec![ + Some(get_set(1..=20, MetricKind::Incremental)), + None, + Some(get_set(17..=25, MetricKind::Incremental)), + Some(get_set(1..5, MetricKind::Incremental)), + Some(get_set(3..=42, MetricKind::Incremental)), + ]; + + assert_normalize(normalizer, sets, expected_sets); + } +}