Skip to content

Commit

Permalink
fix(tag_cardinality_limit transform): mitigate performance bottleneck (
Browse files Browse the repository at this point in the history
…#19281)

* fix(tag_cardinality_limit transform): mitigate performance bottleneck

* clippy

* OPW-225
  • Loading branch information
dsmith3197 authored Dec 1, 2023
1 parent 1cc16f8 commit 23d828a
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 5 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
optimization_goal: ingress_throughput
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
generator:
- tcp:
seed: [2, 3, 5, 7, 11, 13, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137]
addr: "localhost:8282"
variant: "syslog5424"
bytes_per_second: "500 Mb"
block_sizes: ["1Kb", "2Kb", "4Kb", "8Kb", "256Kb", "512Kb", "1024Kb"]
maximum_prebuild_cache_size_bytes: "256 Mb"
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
data_dir: "/var/lib/vector"

##
## Sources
##

sources:
internal_metrics:
type: "internal_metrics"

syslog:
type: "syslog"
address: "0.0.0.0:8282"
max_length: 1500000
mode: "tcp"

##
## Transforms
##

transforms:
remap:
type: "remap"
inputs: [ "syslog" ]
source: |-
.cardinality_1000 = random_int(0, 1000)
log2metric:
type: "log_to_metric"
inputs: [ "remap" ]
metrics:
- type: "counter"
field: "procid"
tags:
hostname: "{{ host }}"
facility: "{{ facility }}"
severity: "{{ severity }}"
cardinality_1000: "{{ cardinality_1000 }}"

cardinality_control:
type: "tag_cardinality_limit"
inputs: [ "log2metric" ]
limit_exceeded_action: "drop_tag"
mode: "probabilistic"
value_limit: 100

##
## Sinks
##

sinks:
prometheus:
type: "prometheus_exporter"
inputs: [ "internal_metrics" ]
address: "0.0.0.0:9090"

blackhole:
type: "blackhole"
print_interval_secs: 0
inputs: [ "cardinality_control" ]
2 changes: 1 addition & 1 deletion src/transforms/tag_cardinality_limit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ const fn default_value_limit() -> usize {
}

pub(crate) const fn default_cache_size() -> usize {
5000 * 1024 // 5KB
5 * 1024 // 5KB
}

impl GenerateConfig for TagCardinalityLimitConfig {
Expand Down
83 changes: 80 additions & 3 deletions src/transforms/tag_cardinality_limit/tag_value_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,41 @@ pub struct AcceptedTagValueSet {

enum TagValueSetStorage {
Set(HashSet<TagValueSet>),
Bloom(BloomFilter<TagValueSet>),
Bloom(BloomFilterStorage),
}

/// A bloom filter that tracks the number of items inserted into it.
struct BloomFilterStorage {
inner: BloomFilter<TagValueSet>,

/// Count of items inserted into the bloom filter.
/// We manually track this because `BloomFilter::count` has O(n) time complexity.
count: usize,
}

impl BloomFilterStorage {
fn new(size: usize) -> Self {
Self {
inner: BloomFilter::with_size(size),
count: 0,
}
}

fn insert(&mut self, value: &TagValueSet) {
// Only update the count if the value is not already in the bloom filter.
if !self.inner.contains(value) {
self.inner.insert(value);
self.count += 1;
}
}

fn contains(&self, value: &TagValueSet) -> bool {
self.inner.contains(value)
}

const fn count(&self) -> usize {
self.count
}
}

impl fmt::Debug for TagValueSetStorage {
Expand All @@ -29,8 +63,7 @@ impl AcceptedTagValueSet {
let storage = match &mode {
Mode::Exact => TagValueSetStorage::Set(HashSet::with_capacity(value_limit)),
Mode::Probabilistic(config) => {
let num_bits = config.cache_size_per_key / 8; // Convert bytes to bits
TagValueSetStorage::Bloom(BloomFilter::with_size(num_bits))
TagValueSetStorage::Bloom(BloomFilterStorage::new(config.cache_size_per_key))
}
};
Self { storage }
Expand Down Expand Up @@ -59,3 +92,47 @@ impl AcceptedTagValueSet {
};
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::event::metric::TagValueSet;
use crate::transforms::tag_cardinality_limit::config::Mode;

#[test]
fn test_accepted_tag_value_set_exact() {
let mut accepted_tag_value_set = AcceptedTagValueSet::new(2, &Mode::Exact);

assert!(!accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()])));
assert_eq!(accepted_tag_value_set.len(), 0);

accepted_tag_value_set.insert(TagValueSet::from(["value1".to_string()]));
assert_eq!(accepted_tag_value_set.len(), 1);
assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()])));

accepted_tag_value_set.insert(TagValueSet::from(["value2".to_string()]));
assert_eq!(accepted_tag_value_set.len(), 2);
assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value2".to_string()])));
}

#[test]
fn test_accepted_tag_value_set_probabilistic() {
let mut accepted_tag_value_set = AcceptedTagValueSet::new(2, &Mode::Exact);

assert!(!accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()])));
assert_eq!(accepted_tag_value_set.len(), 0);

accepted_tag_value_set.insert(TagValueSet::from(["value1".to_string()]));
assert_eq!(accepted_tag_value_set.len(), 1);
assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()])));

// Inserting the same value again should not increase the count.
accepted_tag_value_set.insert(TagValueSet::from(["value1".to_string()]));
assert_eq!(accepted_tag_value_set.len(), 1);
assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()])));

accepted_tag_value_set.insert(TagValueSet::from(["value2".to_string()]));
assert_eq!(accepted_tag_value_set.len(), 2);
assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value2".to_string()])));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ base: components: transforms: tag_cardinality_limit: configuration: {
"""
relevant_when: "mode = \"probabilistic\""
required: false
type: uint: default: 5120000
type: uint: default: 5120
}
limit_exceeded_action: {
description: """
Expand Down

0 comments on commit 23d828a

Please sign in to comment.