Skip to content

Commit

Permalink
feat(appsignal sink): Normalize metrics
Browse files Browse the repository at this point in the history
Implement a normaliser for the AppSignal sink to convert absolute
counter metrics to incremental counters, and incremental gauges to
absolute gauges. The AppSignal API ignores absolute counters and
incremental gauges, so this change adds support for absolute counters
and incremental gauges.

This normaliser is inspired by the DataDog normaliser.
  • Loading branch information
unflxw committed Aug 11, 2023
1 parent 81cf6a7 commit cdef864
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/sinks/appsignal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@
#[cfg(all(test, feature = "appsignal-integration-tests"))]
mod integration_tests;

mod normalizer;
use normalizer::AppsignalMetricsNormalizer;

use std::task::Poll;

use futures::{
future,
future::{BoxFuture, Ready},
};

use futures_util::future::ready;

use http::{header::AUTHORIZATION, Request, StatusCode, Uri};
use hyper::Body;
use serde_json::{json, Value};
Expand All @@ -26,6 +32,7 @@ use crate::{
sinks::{
prelude::*,
util::{
buffer::metrics::MetricNormalizer,
encoding::{as_tracked_write, Encoder},
http::HttpBatchService,
http::HttpStatusRetryLogic,
Expand Down Expand Up @@ -189,8 +196,16 @@ where
{
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let service = ServiceBuilder::new().service(self.service);
let mut normalizer = MetricNormalizer::<AppsignalMetricsNormalizer>::default();

input
.filter_map(move |event| {
ready(if let Event::Metric(metric) = event {
normalizer.normalize(metric).map(Event::Metric)
} else {
Some(event)
})
})
.batched(self.batch_settings.into_byte_size_config())
.request_builder(
None,
Expand Down
199 changes: 199 additions & 0 deletions src/sinks/appsignal/normalizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use vector_core::event::{Metric, MetricValue};

use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};

#[derive(Default)]
pub(crate) struct AppsignalMetricsNormalizer;

impl MetricNormalize for AppsignalMetricsNormalizer {
fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
// We only care about making sure that counters are incremental, and that gauges are
// always absolute. Other metric types are currently unsupported.
match &metric.value() {
// We always send counters as incremental and gauges as absolute. Realistically, any
// system sending an incremental gauge update is kind of doing it wrong, but alas.
MetricValue::Counter { .. } => state.make_incremental(metric),
MetricValue::Gauge { .. } => state.make_absolute(metric),
// Otherwise, send it through as-is.
_ => Some(metric),
}
}
}

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use vector_core::event::{Metric, MetricKind, MetricValue};

use super::AppsignalMetricsNormalizer;
use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};

fn get_counter(value: f64, kind: MetricKind) -> Metric {
Metric::new("counter", kind, MetricValue::Counter { value })
}

fn get_gauge(value: f64, kind: MetricKind) -> Metric {
Metric::new("gauge", kind, MetricValue::Gauge { value })
}

fn run_comparisons(inputs: Vec<Metric>, expected_outputs: Vec<Option<Metric>>) {
let mut metric_set = MetricSet::default();
let mut normalizer = AppsignalMetricsNormalizer;

for (input, expected) in inputs.into_iter().zip(expected_outputs) {
let result = normalizer.normalize(&mut metric_set, input);
assert_eq!(result, expected);
}
}

#[test]
fn absolute_counter() {
let first_value = 3.14;
let second_value = 8.675309;

let counters = vec![
get_counter(first_value, MetricKind::Absolute),
get_counter(second_value, MetricKind::Absolute),
];

let expected_counters = vec![
None,
Some(get_counter(
second_value - first_value,
MetricKind::Incremental,
)),
];

run_comparisons(counters, expected_counters);
}

#[test]
fn incremental_counter() {
let first_value = 3.14;
let second_value = 8.675309;

let counters = vec![
get_counter(first_value, MetricKind::Incremental),
get_counter(second_value, MetricKind::Incremental),
];

let expected_counters = counters
.clone()
.into_iter()
.map(Option::Some)
.collect::<Vec<_>>();

run_comparisons(counters, expected_counters);
}

#[test]
fn mixed_counter() {
let first_value = 3.14;
let second_value = 8.675309;
let third_value = 16.19;

let counters = vec![
get_counter(first_value, MetricKind::Incremental),
get_counter(second_value, MetricKind::Absolute),
get_counter(third_value, MetricKind::Absolute),
get_counter(first_value, MetricKind::Absolute),
get_counter(second_value, MetricKind::Incremental),
get_counter(third_value, MetricKind::Incremental),
];

let expected_counters = vec![
Some(get_counter(first_value, MetricKind::Incremental)),
None,
Some(get_counter(
third_value - second_value,
MetricKind::Incremental,
)),
None,
Some(get_counter(second_value, MetricKind::Incremental)),
Some(get_counter(third_value, MetricKind::Incremental)),
];

run_comparisons(counters, expected_counters);
}

#[test]
fn absolute_gauge() {
let first_value = 3.14;
let second_value = 8.675309;

let gauges = vec![
get_gauge(first_value, MetricKind::Absolute),
get_gauge(second_value, MetricKind::Absolute),
];

let expected_gauges = gauges
.clone()
.into_iter()
.map(Option::Some)
.collect::<Vec<_>>();

run_comparisons(gauges, expected_gauges);
}

#[test]
fn incremental_gauge() {
let first_value = 3.14;
let second_value = 8.675309;

let gauges = vec![
get_gauge(first_value, MetricKind::Incremental),
get_gauge(second_value, MetricKind::Incremental),
];

let expected_gauges = vec![
Some(get_gauge(first_value, MetricKind::Absolute)),
Some(get_gauge(first_value + second_value, MetricKind::Absolute)),
];

run_comparisons(gauges, expected_gauges);
}

#[test]
fn mixed_gauge() {
let first_value = 3.14;
let second_value = 8.675309;
let third_value = 16.19;

let gauges = vec![
get_gauge(first_value, MetricKind::Incremental),
get_gauge(second_value, MetricKind::Absolute),
get_gauge(third_value, MetricKind::Absolute),
get_gauge(first_value, MetricKind::Absolute),
get_gauge(second_value, MetricKind::Incremental),
get_gauge(third_value, MetricKind::Incremental),
];

let expected_gauges = vec![
Some(get_gauge(first_value, MetricKind::Absolute)),
Some(get_gauge(second_value, MetricKind::Absolute)),
Some(get_gauge(third_value, MetricKind::Absolute)),
Some(get_gauge(first_value, MetricKind::Absolute)),
Some(get_gauge(first_value + second_value, MetricKind::Absolute)),
Some(get_gauge(
first_value + second_value + third_value,
MetricKind::Absolute,
)),
];

run_comparisons(gauges, expected_gauges);
}

#[test]
fn other_metrics() {
let metric = Metric::new(
"set",
MetricKind::Incremental,
MetricValue::Set {
values: BTreeSet::new(),
},
);

run_comparisons(vec![metric], vec![None]);
}
}

0 comments on commit cdef864

Please sign in to comment.