From cdef864981f94518d787da8055b9afa040934063 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Tue, 8 Aug 2023 16:02:27 +0200 Subject: [PATCH] feat(appsignal sink): Normalize metrics Implement a normaliser for the AppSignal sink to convert absolute counter metrics to incremental counters, and incremental gauges to absolute gauges. The AppSignal API ignores absolute counters and incremental gauges, so this change adds support for absolute counters and incremental gauges. This normaliser is inspired by the DataDog normaliser. --- src/sinks/appsignal/mod.rs | 15 +++ src/sinks/appsignal/normalizer.rs | 199 ++++++++++++++++++++++++++++++ 2 files changed, 214 insertions(+) create mode 100644 src/sinks/appsignal/normalizer.rs diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 5b8b01edbff25e..5426e3ffc34a5d 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -9,12 +9,18 @@ #[cfg(all(test, feature = "appsignal-integration-tests"))] mod integration_tests; +mod normalizer; +use normalizer::AppsignalMetricsNormalizer; + use std::task::Poll; use futures::{ future, future::{BoxFuture, Ready}, }; + +use futures_util::future::ready; + use http::{header::AUTHORIZATION, Request, StatusCode, Uri}; use hyper::Body; use serde_json::{json, Value}; @@ -26,6 +32,7 @@ use crate::{ sinks::{ prelude::*, util::{ + buffer::metrics::MetricNormalizer, encoding::{as_tracked_write, Encoder}, http::HttpBatchService, http::HttpStatusRetryLogic, @@ -189,8 +196,16 @@ where { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let service = ServiceBuilder::new().service(self.service); + let mut normalizer = MetricNormalizer::::default(); input + .filter_map(move |event| { + ready(if let Event::Metric(metric) = event { + normalizer.normalize(metric).map(Event::Metric) + } else { + Some(event) + }) + }) .batched(self.batch_settings.into_byte_size_config()) .request_builder( None, diff --git a/src/sinks/appsignal/normalizer.rs b/src/sinks/appsignal/normalizer.rs new file mode 100644 index 00000000000000..c6d330d2f3a98e --- /dev/null +++ b/src/sinks/appsignal/normalizer.rs @@ -0,0 +1,199 @@ +use vector_core::event::{Metric, MetricValue}; + +use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; + +#[derive(Default)] +pub(crate) struct AppsignalMetricsNormalizer; + +impl MetricNormalize for AppsignalMetricsNormalizer { + fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option { + // We only care about making sure that counters are incremental, and that gauges are + // always absolute. Other metric types are currently unsupported. + match &metric.value() { + // We always send counters as incremental and gauges as absolute. Realistically, any + // system sending an incremental gauge update is kind of doing it wrong, but alas. + MetricValue::Counter { .. } => state.make_incremental(metric), + MetricValue::Gauge { .. } => state.make_absolute(metric), + // Otherwise, send it through as-is. + _ => Some(metric), + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + + use vector_core::event::{Metric, MetricKind, MetricValue}; + + use super::AppsignalMetricsNormalizer; + use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; + + fn get_counter(value: f64, kind: MetricKind) -> Metric { + Metric::new("counter", kind, MetricValue::Counter { value }) + } + + fn get_gauge(value: f64, kind: MetricKind) -> Metric { + Metric::new("gauge", kind, MetricValue::Gauge { value }) + } + + fn run_comparisons(inputs: Vec, expected_outputs: Vec>) { + let mut metric_set = MetricSet::default(); + let mut normalizer = AppsignalMetricsNormalizer; + + for (input, expected) in inputs.into_iter().zip(expected_outputs) { + let result = normalizer.normalize(&mut metric_set, input); + assert_eq!(result, expected); + } + } + + #[test] + fn absolute_counter() { + let first_value = 3.14; + let second_value = 8.675309; + + let counters = vec![ + get_counter(first_value, MetricKind::Absolute), + get_counter(second_value, MetricKind::Absolute), + ]; + + let expected_counters = vec![ + None, + Some(get_counter( + second_value - first_value, + MetricKind::Incremental, + )), + ]; + + run_comparisons(counters, expected_counters); + } + + #[test] + fn incremental_counter() { + let first_value = 3.14; + let second_value = 8.675309; + + let counters = vec![ + get_counter(first_value, MetricKind::Incremental), + get_counter(second_value, MetricKind::Incremental), + ]; + + let expected_counters = counters + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + run_comparisons(counters, expected_counters); + } + + #[test] + fn mixed_counter() { + let first_value = 3.14; + let second_value = 8.675309; + let third_value = 16.19; + + let counters = vec![ + get_counter(first_value, MetricKind::Incremental), + get_counter(second_value, MetricKind::Absolute), + get_counter(third_value, MetricKind::Absolute), + get_counter(first_value, MetricKind::Absolute), + get_counter(second_value, MetricKind::Incremental), + get_counter(third_value, MetricKind::Incremental), + ]; + + let expected_counters = vec![ + Some(get_counter(first_value, MetricKind::Incremental)), + None, + Some(get_counter( + third_value - second_value, + MetricKind::Incremental, + )), + None, + Some(get_counter(second_value, MetricKind::Incremental)), + Some(get_counter(third_value, MetricKind::Incremental)), + ]; + + run_comparisons(counters, expected_counters); + } + + #[test] + fn absolute_gauge() { + let first_value = 3.14; + let second_value = 8.675309; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Absolute), + get_gauge(second_value, MetricKind::Absolute), + ]; + + let expected_gauges = gauges + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + run_comparisons(gauges, expected_gauges); + } + + #[test] + fn incremental_gauge() { + let first_value = 3.14; + let second_value = 8.675309; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Incremental), + get_gauge(second_value, MetricKind::Incremental), + ]; + + let expected_gauges = vec![ + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(first_value + second_value, MetricKind::Absolute)), + ]; + + run_comparisons(gauges, expected_gauges); + } + + #[test] + fn mixed_gauge() { + let first_value = 3.14; + let second_value = 8.675309; + let third_value = 16.19; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Incremental), + get_gauge(second_value, MetricKind::Absolute), + get_gauge(third_value, MetricKind::Absolute), + get_gauge(first_value, MetricKind::Absolute), + get_gauge(second_value, MetricKind::Incremental), + get_gauge(third_value, MetricKind::Incremental), + ]; + + let expected_gauges = vec![ + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(second_value, MetricKind::Absolute)), + Some(get_gauge(third_value, MetricKind::Absolute)), + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(first_value + second_value, MetricKind::Absolute)), + Some(get_gauge( + first_value + second_value + third_value, + MetricKind::Absolute, + )), + ]; + + run_comparisons(gauges, expected_gauges); + } + + #[test] + fn other_metrics() { + let metric = Metric::new( + "set", + MetricKind::Incremental, + MetricValue::Set { + values: BTreeSet::new(), + }, + ); + + run_comparisons(vec![metric], vec![None]); + } +}