From c2fcd7a6433d461d41f53e9360ab33d6da9a2092 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 23 Dec 2024 17:25:18 +0100 Subject: [PATCH] feat(tag_cardinality_limit transform): enable per metric limits for `tag_cardinality_limit` This adds the ability to add per metric limits in `tag_cardinality_limit` transform, besides the global configuration that applies to all metrics. It supports matching metrics by name and optionally by namespace too. Closes: #15743 --- .../tag_cardinality_limit/config.rs | 37 ++- src/transforms/tag_cardinality_limit/mod.rs | 88 +++++-- src/transforms/tag_cardinality_limit/tests.rs | 238 +++++++++++++++++- 3 files changed, 331 insertions(+), 32 deletions(-) diff --git a/src/transforms/tag_cardinality_limit/config.rs b/src/transforms/tag_cardinality_limit/config.rs index 6e52396656427..237520b42b83f 100644 --- a/src/transforms/tag_cardinality_limit/config.rs +++ b/src/transforms/tag_cardinality_limit/config.rs @@ -16,6 +16,19 @@ use vector_lib::configurable::configurable_component; ))] #[derive(Clone, Debug)] pub struct TagCardinalityLimitConfig { + #[serde(flatten)] + pub global: TagCardinalityLimitInnerConfig, + + /// Tag cardinality limits configuration per metric name. + #[configurable(derived)] + #[serde(default)] + pub per_metric_limits: Vec, +} + +/// Configuration for the `tag_cardinality_limit` transform for a specific group of metrics. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct TagCardinalityLimitInnerConfig { /// How many distinct values to accept for any given key. #[serde(default = "default_value_limit")] pub value_limit: usize, @@ -77,6 +90,21 @@ pub enum LimitExceededAction { DropEvent, } +/// Tag cardinality limit configuration per metric name. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct PerMetricConfig { + /// Name of the metric this configuration refers to. + pub name: String, + + /// Namespace of the metric this configuration refers to. + #[serde(default)] + pub namespace: Option, + + #[serde(flatten)] + pub config: TagCardinalityLimitInnerConfig, +} + const fn default_limit_exceeded_action() -> LimitExceededAction { LimitExceededAction::DropTag } @@ -92,9 +120,12 @@ pub(crate) const fn default_cache_size() -> usize { impl GenerateConfig for TagCardinalityLimitConfig { fn generate_config() -> toml::Value { toml::Value::try_from(Self { - mode: Mode::Exact, - value_limit: default_value_limit(), - limit_exceeded_action: default_limit_exceeded_action(), + global: TagCardinalityLimitInnerConfig { + mode: Mode::Exact, + value_limit: default_value_limit(), + limit_exceeded_action: default_limit_exceeded_action(), + }, + per_metric_limits: Vec::default(), }) .unwrap() } diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index 14b18b457a6c0..5156cb0c48a84 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -19,13 +19,15 @@ mod tag_value_set; mod tests; use crate::event::metric::TagValueSet; -pub use config::TagCardinalityLimitConfig; +pub use config::{TagCardinalityLimitConfig, TagCardinalityLimitInnerConfig}; use tag_value_set::AcceptedTagValueSet; +type MetricId = (Option, String); + #[derive(Debug)] pub struct TagCardinalityLimit { config: TagCardinalityLimitConfig, - accepted_tags: HashMap, + accepted_tags: HashMap, HashMap>, } impl TagCardinalityLimit { @@ -36,6 +38,22 @@ impl TagCardinalityLimit { } } + fn get_config_for_metric( + &self, + metric_key: &Option, + ) -> &TagCardinalityLimitInnerConfig { + match metric_key { + Some(id) => self + .config + .per_metric_limits + .iter() + .find(|c| c.name == id.1 && (c.namespace.is_none() || c.namespace == id.0)) + .map(|c| &c.config) + .unwrap_or(&self.config.global), + None => &self.config.global, + } + } + /// Takes in key and a value corresponding to a tag on an incoming Metric /// Event. If that value is already part of set of accepted values for that /// key, then simply returns true. If that value is not yet part of the @@ -44,10 +62,18 @@ impl TagCardinalityLimit { /// for the key and returns true, otherwise returns false. A false return /// value indicates to the caller that the value is not accepted for this /// key, and the configured limit_exceeded_action should be taken. - fn try_accept_tag(&mut self, key: &str, value: &TagValueSet) -> bool { - let tag_value_set = self.accepted_tags.entry_ref(key).or_insert_with(|| { - AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode) - }); + fn try_accept_tag( + &mut self, + metric_key: &Option, + key: &str, + value: &TagValueSet, + ) -> bool { + let config = self.get_config_for_metric(metric_key).clone(); + info!("try_accept_tag using config: {:?}", config); + let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default(); + let tag_value_set = metric_accepted_tags + .entry_ref(key) + .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode)); if tag_value_set.contains(value) { // Tag value has already been accepted, nothing more to do. @@ -55,11 +81,11 @@ impl TagCardinalityLimit { } // Tag value not yet part of the accepted set. - if tag_value_set.len() < self.config.value_limit { + if tag_value_set.len() < config.value_limit { // accept the new value tag_value_set.insert(value.clone()); - if tag_value_set.len() == self.config.value_limit { + if tag_value_set.len() == config.value_limit { emit!(TagCardinalityValueLimitReached { key }); } @@ -72,34 +98,58 @@ impl TagCardinalityLimit { /// Checks if recording a key and value corresponding to a tag on an incoming Metric would /// exceed the cardinality limit. - fn tag_limit_exceeded(&self, key: &str, value: &TagValueSet) -> bool { + fn tag_limit_exceeded( + &self, + metric_key: &Option, + key: &str, + value: &TagValueSet, + ) -> bool { self.accepted_tags - .get(key) - .map(|value_set| { - !value_set.contains(value) && value_set.len() >= self.config.value_limit + .get(metric_key) + .and_then(|metric_accepted_tags| { + metric_accepted_tags.get(key).map(|value_set| { + !value_set.contains(value) + && value_set.len() >= self.get_config_for_metric(metric_key).value_limit + }) }) .unwrap_or(false) } /// Record a key and value corresponding to a tag on an incoming Metric. - fn record_tag_value(&mut self, key: &str, value: &TagValueSet) { - self.accepted_tags + fn record_tag_value(&mut self, metric_key: &Option, key: &str, value: &TagValueSet) { + let config = self.get_config_for_metric(metric_key).clone(); + let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default(); + metric_accepted_tags .entry_ref(key) - .or_insert_with(|| AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode)) + .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode)) .insert(value.clone()); } fn transform_one(&mut self, mut event: Event) -> Option { let metric = event.as_mut_metric(); let metric_name = metric.name().to_string(); + let metric_namespace = metric.namespace().map(|n| n.to_string()); + info!("The config: {:?}", self.config); + let has_per_metric_config = self.config.per_metric_limits.iter().any(|c| { + c.name == metric_name && (c.namespace.is_none() || c.namespace == metric_namespace) + }); + let metric_key = if has_per_metric_config { + info!("Metric specific config has been found!"); + Some((metric_namespace, metric_name.clone())) + } else { + None + }; if let Some(tags_map) = metric.tags_mut() { - match self.config.limit_exceeded_action { + match self + .get_config_for_metric(&metric_key) + .limit_exceeded_action + { LimitExceededAction::DropEvent => { // This needs to check all the tags, to ensure that the ordering of tag names // doesn't change the behavior of the check. for (key, value) in tags_map.iter_sets() { - if self.tag_limit_exceeded(key, value) { + if self.tag_limit_exceeded(&metric_key, key, value) { emit!(TagCardinalityLimitRejectingEvent { metric_name: &metric_name, tag_key: key, @@ -109,12 +159,12 @@ impl TagCardinalityLimit { } } for (key, value) in tags_map.iter_sets() { - self.record_tag_value(key, value); + self.record_tag_value(&metric_key, key, value); } } LimitExceededAction::DropTag => { tags_map.retain(|key, value| { - if self.try_accept_tag(key, value) { + if self.try_accept_tag(&metric_key, key, value) { true } else { emit!(TagCardinalityLimitRejectingTag { diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index d4e63b0820ec8..d8bb4f974c012 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use config::PerMetricConfig; use vector_lib::config::ComponentKey; use vector_lib::config::OutputId; use vector_lib::event::EventMetadata; @@ -24,12 +25,12 @@ fn generate_config() { crate::test_util::test_generate_config::(); } -fn make_metric(tags: MetricTags) -> Event { +fn make_metric_with_name(tags: MetricTags, name: &str) -> Event { let event_metadata = EventMetadata::default().with_source_type("unit_test_stream"); Event::Metric( Metric::new_with_metadata( - "event", + name, metric::MetricKind::Incremental, metric::MetricValue::Counter { value: 1.0 }, event_metadata, @@ -38,14 +39,21 @@ fn make_metric(tags: MetricTags) -> Event { ) } +fn make_metric(tags: MetricTags) -> Event { + make_metric_with_name(tags, "event") +} + const fn make_transform_hashset( value_limit: usize, limit_exceeded_action: LimitExceededAction, ) -> TagCardinalityLimitConfig { TagCardinalityLimitConfig { - value_limit, - limit_exceeded_action, - mode: Mode::Exact, + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Exact, + }, + per_metric_limits: Vec::new(), } } @@ -54,11 +62,46 @@ const fn make_transform_bloom( limit_exceeded_action: LimitExceededAction, ) -> TagCardinalityLimitConfig { TagCardinalityLimitConfig { - value_limit, - limit_exceeded_action, - mode: Mode::Probabilistic(BloomFilterConfig { - cache_size_per_key: default_cache_size(), - }), + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }), + }, + per_metric_limits: Vec::new(), + } +} + +const fn make_transform_hashset_with_per_metric_limits( + value_limit: usize, + limit_exceeded_action: LimitExceededAction, + per_metric_limits: Vec, +) -> TagCardinalityLimitConfig { + TagCardinalityLimitConfig { + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Exact, + }, + per_metric_limits, + } +} + +const fn make_transform_bloom_with_per_metric_limits( + value_limit: usize, + limit_exceeded_action: LimitExceededAction, + per_metric_limits: Vec, +) -> TagCardinalityLimitConfig { + TagCardinalityLimitConfig { + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }), + }, + per_metric_limits, } } @@ -359,3 +402,178 @@ fn drop_event_checks_all_tags(make_tags: impl Fn(&str, &str) -> MetricTags) { assert_eq!(new_event3, None); assert_eq!(new_event4, Some(event4)); } + +#[tokio::test] +async fn tag_cardinality_limit_separate_value_limit_per_metric_name_hashset() { + separate_value_limit_per_metric_name(make_transform_hashset_with_per_metric_limits( + 2, + LimitExceededAction::DropTag, + vec![ + PerMetricConfig { + name: "metricA".to_string(), + namespace: None, + config: make_transform_hashset(1, LimitExceededAction::DropTag).global, + }, + PerMetricConfig { + name: "metricB".to_string(), + namespace: None, + config: make_transform_hashset(5, LimitExceededAction::DropTag).global, + }, + ], + )) + .await; +} + +#[tokio::test] +async fn tag_cardinality_limit_separate_value_limit_per_metric_name_bloom() { + separate_value_limit_per_metric_name(make_transform_bloom_with_per_metric_limits( + 2, + LimitExceededAction::DropTag, + vec![ + PerMetricConfig { + name: "metricA".to_string(), + namespace: None, + config: make_transform_bloom(1, LimitExceededAction::DropTag).global, + }, + PerMetricConfig { + name: "metricB".to_string(), + namespace: None, + config: make_transform_bloom(5, LimitExceededAction::DropTag).global, + }, + ], + )) + .await; +} + +/// Test that hitting the value limit on one tag does not affect the ability to take new +/// values for other tags. +async fn separate_value_limit_per_metric_name(config: TagCardinalityLimitConfig) { + assert_transform_compliance(async move { + let mut event_a1 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val1"), "metricA"); + + // The limit for tag1 should already be reached here + let mut event_a2 = + make_metric_with_name(metric_tags!("tag1" => "val2", "tag2" => "val1"), "metricA"); + + // The limit for tag2 should be reached here + let mut event_a3 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val2"), "metricA"); + + // MetricB should have all of its tags kept due to higher limit + let mut event_b1 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val1"), "metricB"); + let mut event_b2 = + make_metric_with_name(metric_tags!("tag1" => "val2", "tag2" => "val1"), "metricB"); + let mut event_b3 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val2"), "metricB"); + + // MetricC has no specific config, so it uses the global config, which allows 2 values + let mut event_c1 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val1"), "metricC"); + let mut event_c2 = + make_metric_with_name(metric_tags!("tag1" => "val2", "tag2" => "val2"), "metricC"); + // The limit for tag2 should be reached here + let mut event_c3 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val3"), "metricC"); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await; + + let events = vec![ + &mut event_a1, + &mut event_a2, + &mut event_a3, + &mut event_b1, + &mut event_b2, + &mut event_b3, + &mut event_c1, + &mut event_c2, + &mut event_c3, + ]; + + for event in &events { + tx.send((*event).clone()).await.unwrap(); + } + + let new_event_a1 = out.recv().await; + let new_event_a2 = out.recv().await; + let new_event_a3 = out.recv().await; + let new_event_b1 = out.recv().await; + let new_event_b2 = out.recv().await; + let new_event_b3 = out.recv().await; + let new_event_c1 = out.recv().await; + let new_event_c2 = out.recv().await; + let new_event_c3 = out.recv().await; + + drop(tx); + topology.stop().await; + + for event in events { + event.set_source_id(Arc::new(ComponentKey::from("in"))); + event.set_upstream_id(Arc::new(OutputId::from("transform"))); + event.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + } + + assert_eq!(new_event_a1, Some(event_a1)); + // The second event should have been modified to remove "tag1" + let new_event_a2 = new_event_a2.unwrap(); + assert!(!new_event_a2 + .as_metric() + .tags() + .unwrap() + .contains_key("tag1")); + assert_eq!( + "val1", + new_event_a2 + .as_metric() + .tags() + .unwrap() + .get("tag2") + .unwrap() + ); + + // The third event should have been modified to remove "tag2" + let new_event_a3 = new_event_a3.unwrap(); + assert!(!new_event_a3 + .as_metric() + .tags() + .unwrap() + .contains_key("tag2")); + assert_eq!( + "val1", + new_event_a3 + .as_metric() + .tags() + .unwrap() + .get("tag1") + .unwrap() + ); + + assert_eq!(new_event_b1, Some(event_b1)); + assert_eq!(new_event_b2, Some(event_b2)); + assert_eq!(new_event_b3, Some(event_b3)); + + assert_eq!(new_event_c1, Some(event_c1)); + assert_eq!(new_event_c2, Some(event_c2)); + // The third event should have been modified to remove "tag2" + let new_event_c3 = new_event_c3.unwrap(); + assert!(!new_event_c3 + .as_metric() + .tags() + .unwrap() + .contains_key("tag2")); + assert_eq!( + "val1", + new_event_c3 + .as_metric() + .tags() + .unwrap() + .get("tag1") + .unwrap() + ); + }) + .await; +}