Skip to content

Commit

Permalink
feat(appsignal sink): Normalize metrics
Browse files Browse the repository at this point in the history
Implement a normaliser for the AppSignal sink to convert absolute
counter metrics to incremental counters, and incremental gauges to
absolute gauges. The AppSignal API ignores absolute counters and
incremental gauges, so this change adds support for absolute counters
and incremental gauges.

This normaliser is inspired by the DataDog normaliser.
  • Loading branch information
unflxw committed Aug 17, 2023
1 parent 7a1c49c commit dea1537
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/sinks/appsignal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
mod config;
mod encoder;
mod normalizer;
mod request_builder;
mod service;
mod sink;
Expand Down
199 changes: 199 additions & 0 deletions src/sinks/appsignal/normalizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use vector_core::event::{Metric, MetricValue};

use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};

#[derive(Default)]
pub(crate) struct AppsignalMetricsNormalizer;

impl MetricNormalize for AppsignalMetricsNormalizer {
fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
// We only care about making sure that counters are incremental, and that gauges are
// always absolute. Other metric types are currently unsupported.
match &metric.value() {
// We always send counters as incremental and gauges as absolute. Realistically, any
// system sending an incremental gauge update is kind of doing it wrong, but alas.
MetricValue::Counter { .. } => state.make_incremental(metric),
MetricValue::Gauge { .. } => state.make_absolute(metric),
// Otherwise, send it through as-is.
_ => Some(metric),
}
}
}

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use vector_core::event::{Metric, MetricKind, MetricValue};

use super::AppsignalMetricsNormalizer;
use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};

fn get_counter(value: f64, kind: MetricKind) -> Metric {
Metric::new("counter", kind, MetricValue::Counter { value })
}

fn get_gauge(value: f64, kind: MetricKind) -> Metric {
Metric::new("gauge", kind, MetricValue::Gauge { value })
}

fn run_comparisons(inputs: Vec<Metric>, expected_outputs: Vec<Option<Metric>>) {
let mut metric_set = MetricSet::default();
let mut normalizer = AppsignalMetricsNormalizer;

for (input, expected) in inputs.into_iter().zip(expected_outputs) {
let result = normalizer.normalize(&mut metric_set, input);
assert_eq!(result, expected);
}
}

#[test]
fn absolute_counter() {
let first_value = 3.14;
let second_value = 8.675309;

let counters = vec![
get_counter(first_value, MetricKind::Absolute),
get_counter(second_value, MetricKind::Absolute),
];

let expected_counters = vec![
None,
Some(get_counter(
second_value - first_value,
MetricKind::Incremental,
)),
];

run_comparisons(counters, expected_counters);
}

#[test]
fn incremental_counter() {
let first_value = 3.14;
let second_value = 8.675309;

let counters = vec![
get_counter(first_value, MetricKind::Incremental),
get_counter(second_value, MetricKind::Incremental),
];

let expected_counters = counters
.clone()
.into_iter()
.map(Option::Some)
.collect::<Vec<_>>();

run_comparisons(counters, expected_counters);
}

#[test]
fn mixed_counter() {
let first_value = 3.14;
let second_value = 8.675309;
let third_value = 16.19;

let counters = vec![
get_counter(first_value, MetricKind::Incremental),
get_counter(second_value, MetricKind::Absolute),
get_counter(third_value, MetricKind::Absolute),
get_counter(first_value, MetricKind::Absolute),
get_counter(second_value, MetricKind::Incremental),
get_counter(third_value, MetricKind::Incremental),
];

let expected_counters = vec![
Some(get_counter(first_value, MetricKind::Incremental)),
None,
Some(get_counter(
third_value - second_value,
MetricKind::Incremental,
)),
None,
Some(get_counter(second_value, MetricKind::Incremental)),
Some(get_counter(third_value, MetricKind::Incremental)),
];

run_comparisons(counters, expected_counters);
}

#[test]
fn absolute_gauge() {
let first_value = 3.14;
let second_value = 8.675309;

let gauges = vec![
get_gauge(first_value, MetricKind::Absolute),
get_gauge(second_value, MetricKind::Absolute),
];

let expected_gauges = gauges
.clone()
.into_iter()
.map(Option::Some)
.collect::<Vec<_>>();

run_comparisons(gauges, expected_gauges);
}

#[test]
fn incremental_gauge() {
let first_value = 3.14;
let second_value = 8.675309;

let gauges = vec![
get_gauge(first_value, MetricKind::Incremental),
get_gauge(second_value, MetricKind::Incremental),
];

let expected_gauges = vec![
Some(get_gauge(first_value, MetricKind::Absolute)),
Some(get_gauge(first_value + second_value, MetricKind::Absolute)),
];

run_comparisons(gauges, expected_gauges);
}

#[test]
fn mixed_gauge() {
let first_value = 3.14;
let second_value = 8.675309;
let third_value = 16.19;

let gauges = vec![
get_gauge(first_value, MetricKind::Incremental),
get_gauge(second_value, MetricKind::Absolute),
get_gauge(third_value, MetricKind::Absolute),
get_gauge(first_value, MetricKind::Absolute),
get_gauge(second_value, MetricKind::Incremental),
get_gauge(third_value, MetricKind::Incremental),
];

let expected_gauges = vec![
Some(get_gauge(first_value, MetricKind::Absolute)),
Some(get_gauge(second_value, MetricKind::Absolute)),
Some(get_gauge(third_value, MetricKind::Absolute)),
Some(get_gauge(first_value, MetricKind::Absolute)),
Some(get_gauge(first_value + second_value, MetricKind::Absolute)),
Some(get_gauge(
first_value + second_value + third_value,
MetricKind::Absolute,
)),
];

run_comparisons(gauges, expected_gauges);
}

#[test]
fn other_metrics() {
let metric = Metric::new(
"set",
MetricKind::Incremental,
MetricValue::Set {
values: BTreeSet::new(),
},
);

run_comparisons(vec![metric], vec![None]);
}
}
15 changes: 13 additions & 2 deletions src/sinks/appsignal/sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::{stream::BoxStream, StreamExt};
use futures_util::future::ready;
use tower::{Service, ServiceBuilder};
use vector_core::{
event::Event,
Expand All @@ -7,12 +8,14 @@ use vector_core::{
};

use crate::{
codecs::Transformer, internal_events::SinkRequestBuildError,
sinks::util::builder::SinkBuilderExt, sinks::util::Compression,
codecs::Transformer,
internal_events::SinkRequestBuildError,
sinks::util::{buffer::metrics::MetricNormalizer, builder::SinkBuilderExt, Compression},
};

use super::{
encoder::AppsignalEncoder,
normalizer::AppsignalMetricsNormalizer,
request_builder::{AppsignalRequest, AppsignalRequestBuilder},
};

Expand All @@ -32,8 +35,16 @@ where
{
pub(super) async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let service = ServiceBuilder::new().service(self.service);
let mut normalizer = MetricNormalizer::<AppsignalMetricsNormalizer>::default();

input
.filter_map(move |event| {
ready(if let Event::Metric(metric) = event {
normalizer.normalize(metric).map(Event::Metric)
} else {
Some(event)
})
})
.batched(self.batch_settings.into_byte_size_config())
.request_builder(
None,
Expand Down

0 comments on commit dea1537

Please sign in to comment.