Skip to content

Commit

Permalink
feat(tag_cardinality_limit transform): enable per metric limits for `…
Browse files Browse the repository at this point in the history
…tag_cardinality_limit`

This adds the ability to add per metric limits in `tag_cardinality_limit` transform, besides the
global configuration that applies to all metrics. It supports matching metrics by name and
optionally by namespace too.

Closes: vectordotdev#15743
  • Loading branch information
esensar committed Dec 23, 2024
1 parent 7c6d0c9 commit c2fcd7a
Show file tree
Hide file tree
Showing 3 changed files with 331 additions and 32 deletions.
37 changes: 34 additions & 3 deletions src/transforms/tag_cardinality_limit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ use vector_lib::configurable::configurable_component;
))]
#[derive(Clone, Debug)]
pub struct TagCardinalityLimitConfig {
#[serde(flatten)]
pub global: TagCardinalityLimitInnerConfig,

/// Tag cardinality limits configuration per metric name.
#[configurable(derived)]
#[serde(default)]
pub per_metric_limits: Vec<PerMetricConfig>,
}

/// Configuration for the `tag_cardinality_limit` transform for a specific group of metrics.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct TagCardinalityLimitInnerConfig {
/// How many distinct values to accept for any given key.
#[serde(default = "default_value_limit")]
pub value_limit: usize,
Expand Down Expand Up @@ -77,6 +90,21 @@ pub enum LimitExceededAction {
DropEvent,
}

/// Tag cardinality limit configuration per metric name.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct PerMetricConfig {
/// Name of the metric this configuration refers to.
pub name: String,

/// Namespace of the metric this configuration refers to.
#[serde(default)]
pub namespace: Option<String>,

#[serde(flatten)]
pub config: TagCardinalityLimitInnerConfig,
}

const fn default_limit_exceeded_action() -> LimitExceededAction {
LimitExceededAction::DropTag
}
Expand All @@ -92,9 +120,12 @@ pub(crate) const fn default_cache_size() -> usize {
impl GenerateConfig for TagCardinalityLimitConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
mode: Mode::Exact,
value_limit: default_value_limit(),
limit_exceeded_action: default_limit_exceeded_action(),
global: TagCardinalityLimitInnerConfig {
mode: Mode::Exact,
value_limit: default_value_limit(),
limit_exceeded_action: default_limit_exceeded_action(),
},
per_metric_limits: Vec::default(),
})
.unwrap()
}
Expand Down
88 changes: 69 additions & 19 deletions src/transforms/tag_cardinality_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ mod tag_value_set;
mod tests;

use crate::event::metric::TagValueSet;
pub use config::TagCardinalityLimitConfig;
pub use config::{TagCardinalityLimitConfig, TagCardinalityLimitInnerConfig};
use tag_value_set::AcceptedTagValueSet;

type MetricId = (Option<String>, String);

#[derive(Debug)]
pub struct TagCardinalityLimit {
config: TagCardinalityLimitConfig,
accepted_tags: HashMap<String, AcceptedTagValueSet>,
accepted_tags: HashMap<Option<MetricId>, HashMap<String, AcceptedTagValueSet>>,
}

impl TagCardinalityLimit {
Expand All @@ -36,6 +38,22 @@ impl TagCardinalityLimit {
}
}

fn get_config_for_metric(
&self,
metric_key: &Option<MetricId>,
) -> &TagCardinalityLimitInnerConfig {
match metric_key {
Some(id) => self
.config
.per_metric_limits
.iter()
.find(|c| c.name == id.1 && (c.namespace.is_none() || c.namespace == id.0))
.map(|c| &c.config)
.unwrap_or(&self.config.global),
None => &self.config.global,
}
}

/// Takes in key and a value corresponding to a tag on an incoming Metric
/// Event. If that value is already part of set of accepted values for that
/// key, then simply returns true. If that value is not yet part of the
Expand All @@ -44,22 +62,30 @@ impl TagCardinalityLimit {
/// for the key and returns true, otherwise returns false. A false return
/// value indicates to the caller that the value is not accepted for this
/// key, and the configured limit_exceeded_action should be taken.
fn try_accept_tag(&mut self, key: &str, value: &TagValueSet) -> bool {
let tag_value_set = self.accepted_tags.entry_ref(key).or_insert_with(|| {
AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode)
});
fn try_accept_tag(
&mut self,
metric_key: &Option<MetricId>,
key: &str,
value: &TagValueSet,
) -> bool {
let config = self.get_config_for_metric(metric_key).clone();
info!("try_accept_tag using config: {:?}", config);
let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default();
let tag_value_set = metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode));

if tag_value_set.contains(value) {
// Tag value has already been accepted, nothing more to do.
return true;
}

// Tag value not yet part of the accepted set.
if tag_value_set.len() < self.config.value_limit {
if tag_value_set.len() < config.value_limit {
// accept the new value
tag_value_set.insert(value.clone());

if tag_value_set.len() == self.config.value_limit {
if tag_value_set.len() == config.value_limit {
emit!(TagCardinalityValueLimitReached { key });
}

Expand All @@ -72,34 +98,58 @@ impl TagCardinalityLimit {

/// Checks if recording a key and value corresponding to a tag on an incoming Metric would
/// exceed the cardinality limit.
fn tag_limit_exceeded(&self, key: &str, value: &TagValueSet) -> bool {
fn tag_limit_exceeded(
&self,
metric_key: &Option<MetricId>,
key: &str,
value: &TagValueSet,
) -> bool {
self.accepted_tags
.get(key)
.map(|value_set| {
!value_set.contains(value) && value_set.len() >= self.config.value_limit
.get(metric_key)
.and_then(|metric_accepted_tags| {
metric_accepted_tags.get(key).map(|value_set| {
!value_set.contains(value)
&& value_set.len() >= self.get_config_for_metric(metric_key).value_limit
})
})
.unwrap_or(false)
}

/// Record a key and value corresponding to a tag on an incoming Metric.
fn record_tag_value(&mut self, key: &str, value: &TagValueSet) {
self.accepted_tags
fn record_tag_value(&mut self, metric_key: &Option<MetricId>, key: &str, value: &TagValueSet) {
let config = self.get_config_for_metric(metric_key).clone();
let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default();
metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode))
.or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode))
.insert(value.clone());
}

fn transform_one(&mut self, mut event: Event) -> Option<Event> {
let metric = event.as_mut_metric();
let metric_name = metric.name().to_string();
let metric_namespace = metric.namespace().map(|n| n.to_string());
info!("The config: {:?}", self.config);
let has_per_metric_config = self.config.per_metric_limits.iter().any(|c| {
c.name == metric_name && (c.namespace.is_none() || c.namespace == metric_namespace)
});
let metric_key = if has_per_metric_config {
info!("Metric specific config has been found!");
Some((metric_namespace, metric_name.clone()))
} else {
None
};
if let Some(tags_map) = metric.tags_mut() {
match self.config.limit_exceeded_action {
match self
.get_config_for_metric(&metric_key)
.limit_exceeded_action
{
LimitExceededAction::DropEvent => {
// This needs to check all the tags, to ensure that the ordering of tag names
// doesn't change the behavior of the check.

for (key, value) in tags_map.iter_sets() {
if self.tag_limit_exceeded(key, value) {
if self.tag_limit_exceeded(&metric_key, key, value) {
emit!(TagCardinalityLimitRejectingEvent {
metric_name: &metric_name,
tag_key: key,
Expand All @@ -109,12 +159,12 @@ impl TagCardinalityLimit {
}
}
for (key, value) in tags_map.iter_sets() {
self.record_tag_value(key, value);
self.record_tag_value(&metric_key, key, value);
}
}
LimitExceededAction::DropTag => {
tags_map.retain(|key, value| {
if self.try_accept_tag(key, value) {
if self.try_accept_tag(&metric_key, key, value) {
true
} else {
emit!(TagCardinalityLimitRejectingTag {
Expand Down
Loading

0 comments on commit c2fcd7a

Please sign in to comment.