Skip to content

Commit

Permalink
Fix delta aggregation metric reuse (#1434)
Browse files Browse the repository at this point in the history
  • Loading branch information
jtescher committed Jan 4, 2024
1 parent 3ed8998 commit 43cd4ac
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 294 deletions.
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## vNext

- Fix delta aggregation metric reuse. (#1434)

## v0.21.1

### Fixed

- Fix metric export corruption if gauges have not received a last value. (#1363)
Expand Down
236 changes: 236 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,239 @@ impl<T: Number<T>> AggregateBuilder<T> {
)
}
}

#[cfg(test)]
mod tests {
use crate::metrics::data::{
DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint,
Histogram, HistogramDataPoint, Sum,
};
use std::time::SystemTime;

use super::*;

#[test]
fn last_value_aggregation() {
let (measure, agg) = AggregateBuilder::<u64>::new(None, None).last_value();
let mut a = Gauge {
data_points: vec![DataPoint {
attributes: AttributeSet::from(&[KeyValue::new("a", 1)][..]),
start_time: Some(SystemTime::now()),
time: Some(SystemTime::now()),
value: 1u64,
exemplars: vec![],
}],
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(2, AttributeSet::from(&new_attributes[..]));

let (count, new_agg) = agg.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
assert_eq!(a.data_points.len(), 1);
assert_eq!(
a.data_points[0].attributes,
AttributeSet::from(&new_attributes[..])
);
assert_eq!(a.data_points[0].value, 2);
}

#[test]
fn precomputed_sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) =
AggregateBuilder::<u64>::new(Some(temporality), None).precomputed_sum(true);
let mut a = Sum {
data_points: vec![
DataPoint {
attributes: AttributeSet::from(&[KeyValue::new("a1", 1)][..]),
start_time: Some(SystemTime::now()),
time: Some(SystemTime::now()),
value: 1u64,
exemplars: vec![],
},
DataPoint {
attributes: AttributeSet::from(&[KeyValue::new("a2", 2)][..]),
start_time: Some(SystemTime::now()),
time: Some(SystemTime::now()),
value: 2u64,
exemplars: vec![],
},
],
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Temporality::Delta
},
is_monotonic: false,
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, AttributeSet::from(&new_attributes[..]));

let (count, new_agg) = agg.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
assert_eq!(a.temporality, temporality);
assert!(a.is_monotonic);
assert_eq!(a.data_points.len(), 1);
assert_eq!(
a.data_points[0].attributes,
AttributeSet::from(&new_attributes[..])
);
assert_eq!(a.data_points[0].value, 3);
}
}

#[test]
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None).sum(true);
let mut a = Sum {
data_points: vec![
DataPoint {
attributes: AttributeSet::from(&[KeyValue::new("a1", 1)][..]),
start_time: Some(SystemTime::now()),
time: Some(SystemTime::now()),
value: 1u64,
exemplars: vec![],
},
DataPoint {
attributes: AttributeSet::from(&[KeyValue::new("a2", 2)][..]),
start_time: Some(SystemTime::now()),
time: Some(SystemTime::now()),
value: 2u64,
exemplars: vec![],
},
],
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Temporality::Delta
},
is_monotonic: false,
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, AttributeSet::from(&new_attributes[..]));

let (count, new_agg) = agg.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
assert_eq!(a.temporality, temporality);
assert!(a.is_monotonic);
assert_eq!(a.data_points.len(), 1);
assert_eq!(
a.data_points[0].attributes,
AttributeSet::from(&new_attributes[..])
);
assert_eq!(a.data_points[0].value, 3);
}
}

#[test]
fn explicit_bucket_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
.explicit_bucket_histogram(vec![1.0], true, true);
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
attributes: AttributeSet::from(&[KeyValue::new("a2", 2)][..]),
start_time: SystemTime::now(),
time: SystemTime::now(),
count: 2,
bounds: vec![1.0, 2.0],
bucket_counts: vec![0, 1, 1],
min: None,
max: None,
sum: 3u64,
exemplars: vec![],
}],
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Temporality::Delta
},
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, AttributeSet::from(&new_attributes[..]));

let (count, new_agg) = agg.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
assert_eq!(a.temporality, temporality);
assert_eq!(a.data_points.len(), 1);
assert_eq!(
a.data_points[0].attributes,
AttributeSet::from(&new_attributes[..])
);
assert_eq!(a.data_points[0].count, 1);
assert_eq!(a.data_points[0].bounds, vec![1.0]);
assert_eq!(a.data_points[0].bucket_counts, vec![0, 1]);
assert_eq!(a.data_points[0].min, Some(3));
assert_eq!(a.data_points[0].max, Some(3));
assert_eq!(a.data_points[0].sum, 3);
}
}

#[test]
fn exponential_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
.exponential_bucket_histogram(4, 20, true, true);
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
attributes: AttributeSet::from(&[KeyValue::new("a2", 2)][..]),
start_time: SystemTime::now(),
time: SystemTime::now(),
count: 2,
min: None,
max: None,
sum: 3u64,
scale: 10,
zero_count: 1,
positive_bucket: ExponentialBucket {
offset: 1,
counts: vec![1],
},
negative_bucket: ExponentialBucket {
offset: 1,
counts: vec![1],
},
zero_threshold: 1.0,
exemplars: vec![],
}],
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Temporality::Delta
},
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(3, AttributeSet::from(&new_attributes[..]));

let (count, new_agg) = agg.call(Some(&mut a));

assert_eq!(count, 1);
assert!(new_agg.is_none());
assert_eq!(a.temporality, temporality);
assert_eq!(a.data_points.len(), 1);
assert_eq!(
a.data_points[0].attributes,
AttributeSet::from(&new_attributes[..])
);
assert_eq!(a.data_points[0].count, 1);
assert_eq!(a.data_points[0].min, Some(3));
assert_eq!(a.data_points[0].max, Some(3));
assert_eq!(a.data_points[0].sum, 3);
assert_eq!(a.data_points[0].zero_count, 0);
assert_eq!(a.data_points[0].zero_threshold, 0.0);
assert_eq!(a.data_points[0].positive_bucket.offset, 1661953);
assert_eq!(a.data_points[0].positive_bucket.counts, vec![1]);
assert_eq!(a.data_points[0].negative_bucket.offset, 0);
assert!(a.data_points[0].negative_bucket.counts.is_empty());
}
}
}
Loading

0 comments on commit 43cd4ac

Please sign in to comment.