From 3be51055587b89876324183aa83428f1b567cf83 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 22 Nov 2023 14:11:41 -0800 Subject: [PATCH 1/5] Remove metric attribute key de-duplication --- opentelemetry-prometheus/src/lib.rs | 3 +- .../tests/data/sanitized_labels.txt | 2 +- .../tests/integration_test.rs | 8 ++- opentelemetry-sdk/src/attributes/set.rs | 8 +-- opentelemetry-sdk/src/metrics/mod.rs | 60 +++++++++++++++++++ 5 files changed, 71 insertions(+), 10 deletions(-) diff --git a/opentelemetry-prometheus/src/lib.rs b/opentelemetry-prometheus/src/lib.rs index bb637f52b2..592ea964c4 100644 --- a/opentelemetry-prometheus/src/lib.rs +++ b/opentelemetry-prometheus/src/lib.rs @@ -373,7 +373,8 @@ impl prometheus::core::Collector for Collector { /// Maps attributes into Prometheus-style label pairs. /// /// It sanitizes invalid characters and handles duplicate keys (due to -/// sanitization) by sorting and concatenating the values following the spec. +/// sanitization or when user provides duplicate keys) by sorting and +/// concatenating the values following the spec. fn get_attrs(kvs: &mut dyn Iterator, extra: &[LabelPair]) -> Vec { let mut keys_map = BTreeMap::>::new(); for (key, value) in kvs { diff --git a/opentelemetry-prometheus/tests/data/sanitized_labels.txt b/opentelemetry-prometheus/tests/data/sanitized_labels.txt index 79ec70e61f..e80185df17 100644 --- a/opentelemetry-prometheus/tests/data/sanitized_labels.txt +++ b/opentelemetry-prometheus/tests/data/sanitized_labels.txt @@ -1,6 +1,6 @@ # HELP foo_total a sanitary counter # TYPE foo_total counter -foo_total{A_B="Q",C_D="Y;Z",otel_scope_name="testmeter",otel_scope_version="v0.1.0"} 24.3 +foo_total{A_B="Q;X",C_D="Y;Z",otel_scope_name="testmeter",otel_scope_version="v0.1.0"} 24.3 # HELP otel_scope_info Instrumentation Scope metadata # TYPE otel_scope_info gauge otel_scope_info{otel_scope_name="testmeter",otel_scope_version="v0.1.0"} 1 diff --git a/opentelemetry-prometheus/tests/integration_test.rs b/opentelemetry-prometheus/tests/integration_test.rs index aff4c43a99..47770c1d85 100644 --- a/opentelemetry-prometheus/tests/integration_test.rs +++ b/opentelemetry-prometheus/tests/integration_test.rs @@ -135,7 +135,13 @@ fn prometheus_exporter_integration() { builder: ExporterBuilder::default().without_units(), record_metrics: Box::new(|meter| { let attrs = vec![ - // exact match, value should be overwritten + // exact match, SDK will not do de-duplication, + // values should be concatenated. + // The order X;Q vs Q:X is not guaranteed. + // We expect end-users to not produce duplicates in the + // first place, but if that cannot be done, + // we can offer a opt-in feature in SDK to do it. + // https://github.com/open-telemetry/opentelemetry-rust/issues/1300 Key::new("A.B").string("X"), Key::new("A.B").string("Q"), // unintended match due to sanitization, values should be concatenated diff --git a/opentelemetry-sdk/src/attributes/set.rs b/opentelemetry-sdk/src/attributes/set.rs index 06490879a1..c120128a3d 100644 --- a/opentelemetry-sdk/src/attributes/set.rs +++ b/opentelemetry-sdk/src/attributes/set.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::{ cmp::Ordering, hash::{Hash, Hasher}, @@ -109,16 +108,11 @@ pub struct AttributeSet(Vec); impl From<&[KeyValue]> for AttributeSet { fn from(values: &[KeyValue]) -> Self { - let mut seen_keys = HashSet::with_capacity(values.len()); let mut vec = values .iter() .rev() .filter_map(|kv| { - if seen_keys.insert(kv.key.clone()) { - Some(HashKeyValue(kv.clone())) - } else { - None - } + Some(HashKeyValue(kv.clone())) }) .collect::>(); vec.sort_unstable(); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 9eb34d9303..1197d70464 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -157,6 +157,66 @@ mod tests { ); } + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation_duplicate_attributes() { + // Run this test with stdout enabled to see output. + // cargo test counter_aggregation_duplicate_attributes --features=metrics,testing + + // Arrange + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter = meter_provider.meter("test"); + let counter = meter + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .init(); + counter.add(1, &[KeyValue::new("key1", "value1"),KeyValue::new("key1", "value2")]); + counter.add(1, &[KeyValue::new("key1", "value1"),KeyValue::new("key1", "value2")]); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_counter"); + assert_eq!(metric.unit.as_str(), "my_unit"); + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 1 time-series. + assert_eq!(sum.data_points.len(), 1); + + // find and validate key1=value1,key1=value2 datapoint + let data_point = &sum.data_points[0]; + assert_eq!( + data_point + .value, + 2 + ); + assert_eq!(data_point.attributes.len(), 2, "Should have 2 attributes as sdk is not deduplicating attributes"); + let key_value1_found = data_point + .attributes + .iter() + .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1"); + let key_value2_found = data_point + .attributes + .iter() + .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2"); + + assert_eq!(key_value1_found && key_value2_found, true, "Should have found both key1=value1 and key1=value2 attributes as does not dedup"); + } + // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! #[tokio::test(flavor = "multi_thread", worker_threads = 1)] From 22e2c0d78b9d2a0e10857ab768a29a6c4f97ecfe Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 22 Nov 2023 14:30:11 -0800 Subject: [PATCH 2/5] fmt --- opentelemetry-prometheus/src/lib.rs | 2 +- opentelemetry-sdk/src/attributes/set.rs | 4 +--- opentelemetry-sdk/src/metrics/mod.rs | 32 ++++++++++++++++++------- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/opentelemetry-prometheus/src/lib.rs b/opentelemetry-prometheus/src/lib.rs index 592ea964c4..b31a8e81aa 100644 --- a/opentelemetry-prometheus/src/lib.rs +++ b/opentelemetry-prometheus/src/lib.rs @@ -373,7 +373,7 @@ impl prometheus::core::Collector for Collector { /// Maps attributes into Prometheus-style label pairs. /// /// It sanitizes invalid characters and handles duplicate keys (due to -/// sanitization or when user provides duplicate keys) by sorting and +/// sanitization or when user provides duplicate keys) by sorting and /// concatenating the values following the spec. fn get_attrs(kvs: &mut dyn Iterator, extra: &[LabelPair]) -> Vec { let mut keys_map = BTreeMap::>::new(); diff --git a/opentelemetry-sdk/src/attributes/set.rs b/opentelemetry-sdk/src/attributes/set.rs index c120128a3d..5b9ba4a5b0 100644 --- a/opentelemetry-sdk/src/attributes/set.rs +++ b/opentelemetry-sdk/src/attributes/set.rs @@ -111,9 +111,7 @@ impl From<&[KeyValue]> for AttributeSet { let mut vec = values .iter() .rev() - .filter_map(|kv| { - Some(HashKeyValue(kv.clone())) - }) + .filter_map(|kv| Some(HashKeyValue(kv.clone()))) .collect::>(); vec.sort_unstable(); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 1197d70464..f7a1401b5b 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -175,8 +175,20 @@ mod tests { .u64_counter("my_counter") .with_unit(Unit::new("my_unit")) .init(); - counter.add(1, &[KeyValue::new("key1", "value1"),KeyValue::new("key1", "value2")]); - counter.add(1, &[KeyValue::new("key1", "value1"),KeyValue::new("key1", "value2")]); + counter.add( + 1, + &[ + KeyValue::new("key1", "value1"), + KeyValue::new("key1", "value2"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("key1", "value1"), + KeyValue::new("key1", "value2"), + ], + ); meter_provider.force_flush().unwrap(); @@ -197,14 +209,14 @@ mod tests { // Expecting 1 time-series. assert_eq!(sum.data_points.len(), 1); - // find and validate key1=value1,key1=value2 datapoint + // find and validate key1=value1,key1=value2 datapoint let data_point = &sum.data_points[0]; + assert_eq!(data_point.value, 2); assert_eq!( - data_point - .value, - 2 + data_point.attributes.len(), + 2, + "Should have 2 attributes as sdk is not deduplicating attributes" ); - assert_eq!(data_point.attributes.len(), 2, "Should have 2 attributes as sdk is not deduplicating attributes"); let key_value1_found = data_point .attributes .iter() @@ -214,7 +226,11 @@ mod tests { .iter() .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2"); - assert_eq!(key_value1_found && key_value2_found, true, "Should have found both key1=value1 and key1=value2 attributes as does not dedup"); + assert_eq!( + key_value1_found && key_value2_found, + true, + "Should have found both key1=value1 and key1=value2 attributes as does not dedup" + ); } // "multi_thread" tokio flavor must be used else flush won't From a6850e54489694ae583b12ae08ea2e005c5aa9e2 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 22 Nov 2023 14:31:33 -0800 Subject: [PATCH 3/5] clippy ifx --- opentelemetry-sdk/src/metrics/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index f7a1401b5b..30420e7491 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -226,9 +226,8 @@ mod tests { .iter() .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2"); - assert_eq!( + assert!( key_value1_found && key_value2_found, - true, "Should have found both key1=value1 and key1=value2 attributes as does not dedup" ); } From 32f26bd731ce1a4c3fedb6ca185212ec6a217e68 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 22 Nov 2023 14:32:59 -0800 Subject: [PATCH 4/5] map is fine --- opentelemetry-sdk/src/attributes/set.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/attributes/set.rs b/opentelemetry-sdk/src/attributes/set.rs index 5b9ba4a5b0..97f34ad3d1 100644 --- a/opentelemetry-sdk/src/attributes/set.rs +++ b/opentelemetry-sdk/src/attributes/set.rs @@ -111,7 +111,7 @@ impl From<&[KeyValue]> for AttributeSet { let mut vec = values .iter() .rev() - .filter_map(|kv| Some(HashKeyValue(kv.clone()))) + .map(|kv| HashKeyValue(kv.clone())) .collect::>(); vec.sort_unstable(); From 02a709c3587fa8f6887ec3897900caa1985aee0e Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 22 Nov 2023 14:36:57 -0800 Subject: [PATCH 5/5] add changelog --- opentelemetry-sdk/CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 1b5e0b76f9..028d5c8d43 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -24,6 +24,12 @@ - **Breaking** Remove `TextMapCompositePropagator` [#1373](https://github.com/open-telemetry/opentelemetry-rust/pull/1373). Use `TextMapCompositePropagator` in opentelemetry API. - [#1375](https://github.com/open-telemetry/opentelemetry-rust/pull/1375/) Fix metric collections during PeriodicReader shutdown +- **Breaking** + [#1397](https://github.com/open-telemetry/opentelemetry-rust/issues/1397) + Removes de-duplication of Metric attribute keys to achieve performance gains. + Please share [feedback + here](https://github.com/open-telemetry/opentelemetry-rust/issues/1300), if + you are affected. ## v0.21.1