Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datadog_metrics sink): improve aggregation performance #18759

Merged
merged 7 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type = "log_to_metric"
inputs = ["remap"]

[[transforms.log2metric.metrics]]
type = "gauge"
type = "counter"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this from a gauge to a counter to induce the worst case behavior in terms of performance.

field = "procid"
tags.hostname = "{{ hostname }}"
tags.facility = "{{ facility }}"
Expand Down
233 changes: 110 additions & 123 deletions src/sinks/datadog/metrics/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use futures_util::{
StreamExt,
};
use tower::Service;
use vector_common::finalization::EventFinalizers;
use vector_core::{
event::{Event, Metric, MetricValue},
partition::Partitioner,
Expand All @@ -23,7 +22,6 @@ use super::{
use crate::{
internal_events::DatadogMetricsEncodingError,
sinks::util::{
buffer::metrics::sort::sort_for_compression,
buffer::metrics::{AggregatedSummarySplitter, MetricSplitter},
SinkBuilderExt,
},
Expand Down Expand Up @@ -103,15 +101,12 @@ where
// Aggregate counters with identical timestamps, otherwise identical counters (same
// series and same timestamp, when rounded to whole seconds) will be dropped in a
// last-write-wins situation when they hit the DD metrics intake.
//
// This also sorts metrics by name, which significantly improves HTTP compression.
.map(|((api_key, endpoint), metrics)| {
let collapsed_metrics = collapse_counters_by_series_and_timestamp(metrics);
let collapsed_metrics = sort_and_collapse_counters_by_series_and_timestamp(metrics);
((api_key, endpoint), collapsed_metrics)
})
// Sort metrics by name, which significantly improves HTTP compression.
.map(|((api_key, endpoint), mut metrics)| {
sort_for_compression(&mut metrics);
((api_key, endpoint), metrics)
})
// We build our requests "incrementally", which means that for a single batch of metrics, we might generate
// N requests to send them all, as Datadog has API-level limits on payload size, so we keep adding metrics
// to a request until we reach the limit, and then create a new request, and so on and so forth, until all
Expand Down Expand Up @@ -159,131 +154,72 @@ where
}
}

fn collapse_counters_by_series_and_timestamp(mut metrics: Vec<Metric>) -> Vec<Metric> {
// NOTE: Astute observers may recognize that this behavior could also be achieved by using
// `Vec::dedup_by`, but the clincher is that `dedup_by` requires a sorted vector to begin with.
//
// This function is designed to collapse duplicate counters even if the metrics are unsorted,
// which leads to a measurable boost in performance, being nearly 35% faster than `dedup_by`
// when the inputs are sorted, and up to 50% faster when the inputs are unsorted.
//
// These numbers are based on sorting a newtype wrapper around the metric instead of the metric
// itself, which does involve allocating a string in our tests. _However_, sorting the `Metric`
// directly is not possible without a customized `PartialOrd` implementation, as some of the
// nested fields containing `f64` values makes it underivable, and I'm not 100% sure that we
// could/would want to have a narrowly-focused impl of `PartialOrd` on `Metric` to fit this use
// case (metric type -> metric name -> metric timestamp, nothing else) vs being able to sort
// metrics by name first, etc. Then there's the potential issue of the reordering of fields
// changing the ordering behavior of `Metric`... and it just felt easier to write this tailored
// algorithm for the use case at hand.
let mut idx = 0;
/// Collapses counters by series and timestamp, leaving all other metrics unmodified.
/// The return value is sorted by metric series, which is desirable for compression. A sorted vector
/// tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm).
///
/// Note that the time complexity of this function is O(nlogn) and the space complexity is O(1).
Fixed Show fixed Hide fixed
/// If needed, we can trade space for time by using a HashMap, which would be O(n) time and O(n) space.
fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec<Metric>) -> Vec<Metric> {
let now_ts = Utc::now().timestamp();

// For each metric, see if it's a counter. If so, we check the rest of the metrics
// _after_ it to see if they share the same series _and_ timestamp, when converted
// to a Unix timestamp. If they match, we take that counter's value and merge it
// with our "current" counter metric, and then drop the secondary one from the
// vector.
//
// For any non-counter, we simply ignore it and leave it as-is.
while idx < metrics.len() {
let curr_idx = idx;
let counter_ts = match metrics[curr_idx].value() {
MetricValue::Counter { .. } => metrics[curr_idx]
.data()
.timestamp()
.map(|dt| dt.timestamp())
.unwrap_or(now_ts),
// If it's not a counter, we can skip it.
_ => {
idx += 1;
continue;
}
// Sort by series and timestamp which is required for the below dedupe to behave as desired.
// This also tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm).
metrics.sort_unstable_by(|a, b| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we were already doing this sort previously but doing it after the aggregation step. Moving the sort before the aggregation means that we now may be sorting a larger array but the subsequent aggregation step is now O(n) instead of O(n^2).

(
a.series(),
a.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
)
.cmp(&(
b.series(),
b.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
))
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be simpler as a sort_unstable_by_key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried changing this but ran into a lifetime error when using sort_unstable_by_key without cloning. I think we should keep it as-is to avoid extra clones.


metrics.dedup_by(|left, right| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantically, it might make a little more sense to use coalesce for this instead of dedupe_by. Hopefully they're about equivalent in terms of allocations, etc.

Another option could be group_by/into_grouping_map_by with the same key as the sort above, followed by a merge of each group.

Copy link
Contributor Author

@dsmith3197 dsmith3197 Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. However, the dedupe_by implementation is slightly more optimized as it does the operation in place rather than creating an iterator and then collecting into a new vector. Are you okay keeping it as dedupe_by? I'm happy to create a benchmark and/or change it if desired.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious now what the difference would be, since in this case we could theoretically coalesce in place, but I'm fine keeping it as dedupe_by 😄 . A small explanatory comment might be a good addition though, just to emphasize that our intent here is not actually to "dedupe".

// Only aggregate counters. All other types can be skipped.
let MetricValue::Counter { value: left_value } = left.value() else {
return false;
};

let mut accumulated_value = 0.0;
let mut accumulated_finalizers = EventFinalizers::default();

// Now go through each metric _after_ the current one to see if it matches the
// current metric: is a counter, with the same name and timestamp. If it is, we
// accumulate its value and then remove it.
//
// Otherwise, we skip it.
let mut is_disjoint = false;
let mut had_match = false;
let mut inner_idx = curr_idx + 1;
while inner_idx < metrics.len() {
let mut should_advance = true;
if let MetricValue::Counter { value } = metrics[inner_idx].value() {
let other_counter_ts = metrics[inner_idx]
.data()
.timestamp()
.map(|dt| dt.timestamp())
.unwrap_or(now_ts);
if metrics[curr_idx].series() == metrics[inner_idx].series()
&& counter_ts == other_counter_ts
{
had_match = true;

// Collapse this counter by accumulating its value, and its
// finalizers, and removing it from the original vector of metrics.
accumulated_value += *value;

let mut old_metric = metrics.swap_remove(inner_idx);
accumulated_finalizers.merge(old_metric.metadata_mut().take_finalizers());
should_advance = false;
} else {
// We hit a counter that _doesn't_ match, but we can't just skip
// it because we also need to evaluate it against all the
// counters that come after it, so we only increment the index
// for this inner loop.
//
// As well, we mark ourselves to stop incrementing the outer
// index if we find more counters to accumulate, because we've
// hit a disjoint counter here. While we may be continuing to
// shrink the count of remaining metrics from accumulating,
// we have to ensure this counter we just visited is visited by
// the outer loop.
is_disjoint = true;
}
}

if should_advance {
inner_idx += 1;

if !is_disjoint {
idx += 1;
}
}
if !matches!(right.value(), MetricValue::Counter { .. }) {
return false;
}
dsmith3197 marked this conversation as resolved.
Show resolved Hide resolved
if left.series() != right.series() {
return false;
}

// If we had matches during the accumulator phase, update our original counter.
if had_match {
let metric = metrics.get_mut(curr_idx).expect("current index must exist");
match metric.value_mut() {
MetricValue::Counter { value } => {
*value += accumulated_value;
metric
.metadata_mut()
.merge_finalizers(accumulated_finalizers);
}
_ => unreachable!("current index must represent a counter"),
}
let left_ts = left.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
let right_ts = right.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
if left_ts != right_ts {
return false;
}

idx += 1;
}
// If we reach this point, the counters are for the same series and have the same timestamp.
let MetricValue::Counter { value: right_value } = right.value_mut() else {
return false;
};
// NOTE: The docs for `dedup_by` specify that if `left`/`right` are equal, then
// `left` is the element that gets removed.
*right_value += left_value;
bruceg marked this conversation as resolved.
Show resolved Hide resolved
right
.metadata_mut()
.merge_finalizers(left.metadata_mut().take_finalizers());

true
});

metrics
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use chrono::{DateTime, Utc};
use proptest::prelude::*;
use vector_core::event::{Metric, MetricKind, MetricValue};

use super::collapse_counters_by_series_and_timestamp;
use super::sort_and_collapse_counters_by_series_and_timestamp;

fn arb_collapsible_metrics() -> impl Strategy<Value = Vec<Metric>> {
let ts = Utc::now();
Expand Down Expand Up @@ -315,7 +251,7 @@ mod tests {
fn collapse_no_metrics() {
let input = Vec::new();
let expected = input.clone();
let actual = collapse_counters_by_series_and_timestamp(input);
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);

assert_eq!(expected, actual);
}
Expand All @@ -324,7 +260,7 @@ mod tests {
fn collapse_single_metric() {
let input = vec![create_counter("basic", 42.0)];
let expected = input.clone();
let actual = collapse_counters_by_series_and_timestamp(input);
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);

assert_eq!(expected, actual);
}
Expand All @@ -333,7 +269,7 @@ mod tests {
fn collapse_identical_metrics_gauge() {
let input = vec![create_gauge("basic", 42.0), create_gauge("basic", 42.0)];
let expected = input.clone();
let actual = collapse_counters_by_series_and_timestamp(input);
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);

assert_eq!(expected, actual);

Expand All @@ -348,7 +284,7 @@ mod tests {
create_gauge("basic", gauge_value),
];
let expected = input.clone();
let actual = collapse_counters_by_series_and_timestamp(input);
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);

assert_eq!(expected, actual);
}
Expand All @@ -368,7 +304,59 @@ mod tests {

let expected_counter_value = input.len() as f64 * counter_value;
let expected = vec![create_counter("basic", expected_counter_value)];
let actual = collapse_counters_by_series_and_timestamp(input);
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);

assert_eq!(expected, actual);
}

#[test]
fn collapse_identical_metrics_counter_unsorted() {
let gauge_value = 1.0;
let counter_value = 42.0;
let input = vec![
create_counter("gauge", gauge_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("gauge", gauge_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
];

let expected_counter_value = input.len() as f64 * counter_value;
let expected = vec![
create_counter("basic", expected_counter_value),
create_counter("gauge", gauge_value),
create_counter("gauge", gauge_value),
];
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);

assert_eq!(expected, actual);
}

#[test]
fn collapse_identical_metrics_multiple_timestamps() {
let ts_1 = Utc::now() - Duration::from_secs(5);
let ts_2 = ts_1 - Duration::from_secs(5);
let counter_value = 42.0;
let input = vec![
create_counter("basic", counter_value),
create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
create_counter("basic", counter_value),
create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
create_counter("basic", counter_value),
];

let expected = vec![
create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_2)),
create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_1)),
create_counter("basic", counter_value * 3.),
];
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);

assert_eq!(expected, actual);
}
Expand Down Expand Up @@ -419,8 +407,7 @@ mod tests {
expected_output.sort_by_cached_key(MetricCollapseSort::from_metric);
expected_output.dedup_by(collapse_dedup_fn);

let mut actual_output = collapse_counters_by_series_and_timestamp(input);
actual_output.sort_by_cached_key(MetricCollapseSort::from_metric);
let actual_output = sort_and_collapse_counters_by_series_and_timestamp(input);

prop_assert_eq!(expected_output, actual_output);
}
Expand Down
2 changes: 0 additions & 2 deletions src/sinks/util/buffer/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pub mod sort;

use std::cmp::Ordering;

use vector_core::event::metric::{Metric, MetricValue, Sample};
Expand Down
Loading