diff --git a/regression/cases/syslog_regex_logs2metric_ddmetrics/vector/vector.toml b/regression/cases/syslog_regex_logs2metric_ddmetrics/vector/vector.toml index b8a0cb4fbf558..b04171757296c 100644 --- a/regression/cases/syslog_regex_logs2metric_ddmetrics/vector/vector.toml +++ b/regression/cases/syslog_regex_logs2metric_ddmetrics/vector/vector.toml @@ -27,7 +27,7 @@ type = "log_to_metric" inputs = ["remap"] [[transforms.log2metric.metrics]] - type = "gauge" + type = "counter" field = "procid" tags.hostname = "{{ hostname }}" tags.facility = "{{ facility }}" diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index 5ceefc3c487d2..fccfe040fdd95 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -8,7 +8,6 @@ use futures_util::{ StreamExt, }; use tower::Service; -use vector_common::finalization::EventFinalizers; use vector_core::{ event::{Event, Metric, MetricValue}, partition::Partitioner, @@ -23,8 +22,8 @@ use super::{ use crate::{ internal_events::DatadogMetricsEncodingError, sinks::util::{ - buffer::metrics::sort::sort_for_compression, buffer::metrics::{AggregatedSummarySplitter, MetricSplitter}, + request_builder::default_request_builder_concurrency_limit, SinkBuilderExt, }, }; @@ -103,15 +102,18 @@ where // Aggregate counters with identical timestamps, otherwise identical counters (same // series and same timestamp, when rounded to whole seconds) will be dropped in a // last-write-wins situation when they hit the DD metrics intake. - .map(|((api_key, endpoint), metrics)| { - let collapsed_metrics = collapse_counters_by_series_and_timestamp(metrics); - ((api_key, endpoint), collapsed_metrics) - }) - // Sort metrics by name, which significantly improves HTTP compression. - .map(|((api_key, endpoint), mut metrics)| { - sort_for_compression(&mut metrics); - ((api_key, endpoint), metrics) - }) + // + // This also sorts metrics by name, which significantly improves HTTP compression. + .concurrent_map( + default_request_builder_concurrency_limit(), + |((api_key, endpoint), metrics)| { + Box::pin(async move { + let collapsed_metrics = + sort_and_collapse_counters_by_series_and_timestamp(metrics); + ((api_key, endpoint), collapsed_metrics) + }) + }, + ) // We build our requests "incrementally", which means that for a single batch of metrics, we might generate // N requests to send them all, as Datadog has API-level limits on payload size, so we keep adding metrics // to a request until we reach the limit, and then create a new request, and so on and so forth, until all @@ -159,142 +161,98 @@ where } } -fn collapse_counters_by_series_and_timestamp(mut metrics: Vec) -> Vec { - // NOTE: Astute observers may recognize that this behavior could also be achieved by using - // `Vec::dedup_by`, but the clincher is that `dedup_by` requires a sorted vector to begin with. - // - // This function is designed to collapse duplicate counters even if the metrics are unsorted, - // which leads to a measurable boost in performance, being nearly 35% faster than `dedup_by` - // when the inputs are sorted, and up to 50% faster when the inputs are unsorted. - // - // These numbers are based on sorting a newtype wrapper around the metric instead of the metric - // itself, which does involve allocating a string in our tests. _However_, sorting the `Metric` - // directly is not possible without a customized `PartialOrd` implementation, as some of the - // nested fields containing `f64` values makes it underivable, and I'm not 100% sure that we - // could/would want to have a narrowly-focused impl of `PartialOrd` on `Metric` to fit this use - // case (metric type -> metric name -> metric timestamp, nothing else) vs being able to sort - // metrics by name first, etc. Then there's the potential issue of the reordering of fields - // changing the ordering behavior of `Metric`... and it just felt easier to write this tailored - // algorithm for the use case at hand. - let mut idx = 0; +/// Collapses counters by series and timestamp, leaving all other metrics unmodified. +/// The return value is sorted by metric series, which is desirable for compression. A sorted vector +/// tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm). +/// +/// Note that the time complexity of this function is O(n log n) and the space complexity is O(1). +/// If needed, we can trade space for time by using a HashMap, which would be O(n) time and O(n) space. +fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec) -> Vec { let now_ts = Utc::now().timestamp(); - // For each metric, see if it's a counter. If so, we check the rest of the metrics - // _after_ it to see if they share the same series _and_ timestamp, when converted - // to a Unix timestamp. If they match, we take that counter's value and merge it - // with our "current" counter metric, and then drop the secondary one from the - // vector. - // - // For any non-counter, we simply ignore it and leave it as-is. - while idx < metrics.len() { - let curr_idx = idx; - let counter_ts = match metrics[curr_idx].value() { - MetricValue::Counter { .. } => metrics[curr_idx] - .data() - .timestamp() - .map(|dt| dt.timestamp()) - .unwrap_or(now_ts), - // If it's not a counter, we can skip it. - _ => { - idx += 1; - continue; - } - }; - - let mut accumulated_value = 0.0; - let mut accumulated_finalizers = EventFinalizers::default(); - - // Now go through each metric _after_ the current one to see if it matches the - // current metric: is a counter, with the same name and timestamp. If it is, we - // accumulate its value and then remove it. - // - // Otherwise, we skip it. - let mut is_disjoint = false; - let mut had_match = false; - let mut inner_idx = curr_idx + 1; - while inner_idx < metrics.len() { - let mut should_advance = true; - if let MetricValue::Counter { value } = metrics[inner_idx].value() { - let other_counter_ts = metrics[inner_idx] - .data() - .timestamp() - .map(|dt| dt.timestamp()) - .unwrap_or(now_ts); - if metrics[curr_idx].series() == metrics[inner_idx].series() - && counter_ts == other_counter_ts - { - had_match = true; - - // Collapse this counter by accumulating its value, and its - // finalizers, and removing it from the original vector of metrics. - accumulated_value += *value; - - let mut old_metric = metrics.swap_remove(inner_idx); - accumulated_finalizers.merge(old_metric.metadata_mut().take_finalizers()); - should_advance = false; - } else { - // We hit a counter that _doesn't_ match, but we can't just skip - // it because we also need to evaluate it against all the - // counters that come after it, so we only increment the index - // for this inner loop. - // - // As well, we mark ourselves to stop incrementing the outer - // index if we find more counters to accumulate, because we've - // hit a disjoint counter here. While we may be continuing to - // shrink the count of remaining metrics from accumulating, - // we have to ensure this counter we just visited is visited by - // the outer loop. - is_disjoint = true; - } - } - - if should_advance { - inner_idx += 1; - - if !is_disjoint { - idx += 1; - } - } + // Sort by series and timestamp which is required for the below dedupe to behave as desired. + // This also tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm). + // Note that `sort_unstable_by_key` would be simpler but results in lifetime errors without cloning. + metrics.sort_unstable_by(|a, b| { + ( + a.value().as_name(), + a.series(), + a.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts), + ) + .cmp(&( + a.value().as_name(), + b.series(), + b.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts), + )) + }); + + // Aggregate counters that share the same series and timestamp. + // While `coalesce` is semantically more fitting here than `dedupe_by`, we opt for the latter because + // they share the same functionality and `dedupe_by`'s implementation is more optimized, doing the + // operation in place. + metrics.dedup_by(|left, right| { + if left.series() != right.series() { + return false; } - // If we had matches during the accumulator phase, update our original counter. - if had_match { - let metric = metrics.get_mut(curr_idx).expect("current index must exist"); - match metric.value_mut() { - MetricValue::Counter { value } => { - *value += accumulated_value; - metric - .metadata_mut() - .merge_finalizers(accumulated_finalizers); - } - _ => unreachable!("current index must represent a counter"), - } + let left_ts = left.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts); + let right_ts = right.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts); + if left_ts != right_ts { + return false; } - idx += 1; - } + // Only aggregate counters. All other types can be skipped. + if let ( + MetricValue::Counter { value: left_value }, + MetricValue::Counter { value: right_value }, + ) = (left.value(), right.value_mut()) + { + // NOTE: The docs for `dedup_by` specify that if `left`/`right` are equal, then + // `left` is the element that gets removed. + *right_value += left_value; + right + .metadata_mut() + .merge_finalizers(left.metadata_mut().take_finalizers()); + + true + } else { + false + } + }); metrics } #[cfg(test)] mod tests { + use std::{collections::HashSet, time::Duration}; + use chrono::{DateTime, Utc}; use proptest::prelude::*; - use vector_core::event::{Metric, MetricKind, MetricValue}; + use vector_core::{ + event::{Metric, MetricKind, MetricValue}, + metric_tags, + }; - use super::collapse_counters_by_series_and_timestamp; + use super::sort_and_collapse_counters_by_series_and_timestamp; fn arb_collapsible_metrics() -> impl Strategy> { let ts = Utc::now(); any::>().prop_map(move |values| { + let mut unique_metrics = HashSet::new(); values .into_iter() .map(|(id, value)| { let name = format!("{}-{}", value.as_name(), id); Metric::new(name, MetricKind::Incremental, value).with_timestamp(Some(ts)) }) + // Filter out duplicates other than counters. We do this to prevent false positives. False positives would occur + // because we don't collapse other metric types and we can't sort metrics by their values. + .filter(|metric| { + matches!(metric.value(), MetricValue::Counter { .. }) + || unique_metrics.insert(metric.series().clone()) + }) .collect() }) } @@ -315,7 +273,7 @@ mod tests { fn collapse_no_metrics() { let input = Vec::new(); let expected = input.clone(); - let actual = collapse_counters_by_series_and_timestamp(input); + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); assert_eq!(expected, actual); } @@ -324,7 +282,7 @@ mod tests { fn collapse_single_metric() { let input = vec![create_counter("basic", 42.0)]; let expected = input.clone(); - let actual = collapse_counters_by_series_and_timestamp(input); + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); assert_eq!(expected, actual); } @@ -333,7 +291,7 @@ mod tests { fn collapse_identical_metrics_gauge() { let input = vec![create_gauge("basic", 42.0), create_gauge("basic", 42.0)]; let expected = input.clone(); - let actual = collapse_counters_by_series_and_timestamp(input); + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); assert_eq!(expected, actual); @@ -348,7 +306,7 @@ mod tests { create_gauge("basic", gauge_value), ]; let expected = input.clone(); - let actual = collapse_counters_by_series_and_timestamp(input); + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); assert_eq!(expected, actual); } @@ -368,7 +326,91 @@ mod tests { let expected_counter_value = input.len() as f64 * counter_value; let expected = vec![create_counter("basic", expected_counter_value)]; - let actual = collapse_counters_by_series_and_timestamp(input); + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); + + assert_eq!(expected, actual); + } + + #[test] + fn collapse_identical_metrics_counter_unsorted() { + let gauge_value = 1.0; + let counter_value = 42.0; + let input = vec![ + create_gauge("gauge", gauge_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + create_gauge("gauge", gauge_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + ]; + + let expected_counter_value = (input.len() - 2) as f64 * counter_value; + let expected = vec![ + create_counter("basic", expected_counter_value), + create_gauge("gauge", gauge_value), + create_gauge("gauge", gauge_value), + ]; + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); + + assert_eq!(expected, actual); + } + + #[test] + fn collapse_identical_metrics_multiple_timestamps() { + let ts_1 = Utc::now() - Duration::from_secs(5); + let ts_2 = ts_1 - Duration::from_secs(5); + let counter_value = 42.0; + let input = vec![ + create_counter("basic", counter_value), + create_counter("basic", counter_value).with_timestamp(Some(ts_1)), + create_counter("basic", counter_value).with_timestamp(Some(ts_2)), + create_counter("basic", counter_value), + create_counter("basic", counter_value).with_timestamp(Some(ts_2)), + create_counter("basic", counter_value).with_timestamp(Some(ts_1)), + create_counter("basic", counter_value), + ]; + + let expected = vec![ + create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_2)), + create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_1)), + create_counter("basic", counter_value * 3.), + ]; + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); + + assert_eq!(expected, actual); + } + + #[test] + fn collapse_identical_metrics_with_tags() { + let counter_value = 42.0; + let input = vec![ + create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))), + create_counter("basic", counter_value).with_tags(Some(metric_tags!( + "a" => "a", + "b" => "b", + ))), + create_counter("basic", counter_value), + create_counter("basic", counter_value).with_tags(Some(metric_tags!( + "b" => "b", + "a" => "a", + ))), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))), + ]; + + let expected = vec![ + create_counter("basic", counter_value * 3.), + create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!("a" => "a"))), + create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!( + "a" => "a", + "b" => "b", + ))), + ]; + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); assert_eq!(expected, actual); } @@ -419,8 +461,7 @@ mod tests { expected_output.sort_by_cached_key(MetricCollapseSort::from_metric); expected_output.dedup_by(collapse_dedup_fn); - let mut actual_output = collapse_counters_by_series_and_timestamp(input); - actual_output.sort_by_cached_key(MetricCollapseSort::from_metric); + let actual_output = sort_and_collapse_counters_by_series_and_timestamp(input); prop_assert_eq!(expected_output, actual_output); } diff --git a/src/sinks/util/buffer/metrics/mod.rs b/src/sinks/util/buffer/metrics/mod.rs index 877cdc9c4bcc1..e66b3c2364140 100644 --- a/src/sinks/util/buffer/metrics/mod.rs +++ b/src/sinks/util/buffer/metrics/mod.rs @@ -1,5 +1,3 @@ -pub mod sort; - use std::cmp::Ordering; use vector_core::event::metric::{Metric, MetricValue, Sample}; diff --git a/src/sinks/util/buffer/metrics/sort.rs b/src/sinks/util/buffer/metrics/sort.rs deleted file mode 100644 index feaa563493789..0000000000000 --- a/src/sinks/util/buffer/metrics/sort.rs +++ /dev/null @@ -1,67 +0,0 @@ -use crate::event::Metric; - -/// Sorts metrics in an order that is likely to achieve good compression. -pub fn sort_for_compression(metrics: &mut [Metric]) { - // This just sorts by series today. This tends to compress better than a random ordering by - // 2-3x (JSON encoded, deflate algorithm) - metrics.sort_unstable_by(|a, b| a.series().cmp(b.series())) -} - -#[cfg(test)] -mod test { - use crate::event::MetricValue; - use rand::prelude::SliceRandom; - use rand::thread_rng; - use vector_core::event::{Metric, MetricKind}; - use vector_core::metric_tags; - - // This just ensures the sorting does not change. `sort_for_compression` relies on - // the default `PartialOrd` on `MetricSeries`. - #[test] - fn test_compression_order() { - let sorted_metrics = vec![ - Metric::new( - "metric_1", - MetricKind::Absolute, - MetricValue::Gauge { value: 0.0 }, - ), - Metric::new( - "metric_2", - MetricKind::Incremental, - MetricValue::Gauge { value: 0.0 }, - ), - Metric::new( - "metric_3", - MetricKind::Absolute, - MetricValue::Gauge { value: 0.0 }, - ) - .with_tags(Some(metric_tags!("z" => "z"))), - Metric::new( - "metric_4", - MetricKind::Absolute, - MetricValue::Gauge { value: 0.0 }, - ) - .with_tags(Some(metric_tags!("a" => "a"))), - Metric::new( - "metric_4", - MetricKind::Absolute, - MetricValue::Gauge { value: 0.0 }, - ) - .with_tags(Some(metric_tags!( - "a" => "a", - "b" => "b", - ))), - Metric::new( - "metric_4", - MetricKind::Absolute, - MetricValue::Gauge { value: 0.0 }, - ) - .with_tags(Some(metric_tags!("b" => "b"))), - ]; - - let mut rand_metrics = sorted_metrics.clone(); - rand_metrics.shuffle(&mut thread_rng()); - super::sort_for_compression(&mut rand_metrics); - assert_eq!(sorted_metrics, rand_metrics); - } -} diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index c51bf405dc4d1..617697ea84ec1 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -82,13 +82,13 @@ pub trait SinkBuilderExt: Stream { /// /// If the spawned future panics, the panic will be carried through and resumed on the task /// calling the stream. - fn concurrent_map(self, limit: Option, f: F) -> ConcurrentMap + fn concurrent_map(self, limit: NonZeroUsize, f: F) -> ConcurrentMap where Self: Sized, F: Fn(Self::Item) -> Pin + Send + 'static>> + Send + 'static, T: Send + 'static, { - ConcurrentMap::new(self, limit, f) + ConcurrentMap::new(self, Some(limit), f) } /// Constructs a [`Stream`] which transforms the input into a request suitable for sending to @@ -114,7 +114,7 @@ pub trait SinkBuilderExt: Stream { { let builder = Arc::new(builder); - self.concurrent_map(Some(limit), move |input| { + self.concurrent_map(limit, move |input| { let builder = Arc::clone(&builder); Box::pin(async move {