From 43cd4acc76950809e2ea069fb69bcea295cfcfa6 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Tue, 12 Dec 2023 21:29:07 -0500 Subject: [PATCH] Fix delta aggregation metric reuse (#1434) --- opentelemetry-sdk/CHANGELOG.md | 4 + .../src/metrics/internal/aggregate.rs | 236 ++++++++++++++++++ .../metrics/internal/exponential_histogram.rs | 181 +++++--------- .../src/metrics/internal/histogram.rs | 122 +++------ .../src/metrics/internal/last_value.rs | 34 +-- opentelemetry-sdk/src/metrics/internal/sum.rs | 113 ++++----- 6 files changed, 396 insertions(+), 294 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 5a5e49b80c..a0660e3db0 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,10 @@ ## vNext +- Fix delta aggregation metric reuse. (#1434) + +## v0.21.1 + ### Fixed - Fix metric export corruption if gauges have not received a last value. (#1363) diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index ccd0f26f88..08d6feec04 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -210,3 +210,239 @@ impl> AggregateBuilder { ) } } + +#[cfg(test)] +mod tests { + use crate::metrics::data::{ + DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, + Histogram, HistogramDataPoint, Sum, + }; + use std::time::SystemTime; + + use super::*; + + #[test] + fn last_value_aggregation() { + let (measure, agg) = AggregateBuilder::::new(None, None).last_value(); + let mut a = Gauge { + data_points: vec![DataPoint { + attributes: AttributeSet::from(&[KeyValue::new("a", 1)][..]), + start_time: Some(SystemTime::now()), + time: Some(SystemTime::now()), + value: 1u64, + exemplars: vec![], + }], + }; + let new_attributes = [KeyValue::new("b", 2)]; + measure.call(2, AttributeSet::from(&new_attributes[..])); + + let (count, new_agg) = agg.call(Some(&mut a)); + + assert_eq!(count, 1); + assert!(new_agg.is_none()); + assert_eq!(a.data_points.len(), 1); + assert_eq!( + a.data_points[0].attributes, + AttributeSet::from(&new_attributes[..]) + ); + assert_eq!(a.data_points[0].value, 2); + } + + #[test] + fn precomputed_sum_aggregation() { + for temporality in [Temporality::Delta, Temporality::Cumulative] { + let (measure, agg) = + AggregateBuilder::::new(Some(temporality), None).precomputed_sum(true); + let mut a = Sum { + data_points: vec![ + DataPoint { + attributes: AttributeSet::from(&[KeyValue::new("a1", 1)][..]), + start_time: Some(SystemTime::now()), + time: Some(SystemTime::now()), + value: 1u64, + exemplars: vec![], + }, + DataPoint { + attributes: AttributeSet::from(&[KeyValue::new("a2", 2)][..]), + start_time: Some(SystemTime::now()), + time: Some(SystemTime::now()), + value: 2u64, + exemplars: vec![], + }, + ], + temporality: if temporality == Temporality::Delta { + Temporality::Cumulative + } else { + Temporality::Delta + }, + is_monotonic: false, + }; + let new_attributes = [KeyValue::new("b", 2)]; + measure.call(3, AttributeSet::from(&new_attributes[..])); + + let (count, new_agg) = agg.call(Some(&mut a)); + + assert_eq!(count, 1); + assert!(new_agg.is_none()); + assert_eq!(a.temporality, temporality); + assert!(a.is_monotonic); + assert_eq!(a.data_points.len(), 1); + assert_eq!( + a.data_points[0].attributes, + AttributeSet::from(&new_attributes[..]) + ); + assert_eq!(a.data_points[0].value, 3); + } + } + + #[test] + fn sum_aggregation() { + for temporality in [Temporality::Delta, Temporality::Cumulative] { + let (measure, agg) = AggregateBuilder::::new(Some(temporality), None).sum(true); + let mut a = Sum { + data_points: vec![ + DataPoint { + attributes: AttributeSet::from(&[KeyValue::new("a1", 1)][..]), + start_time: Some(SystemTime::now()), + time: Some(SystemTime::now()), + value: 1u64, + exemplars: vec![], + }, + DataPoint { + attributes: AttributeSet::from(&[KeyValue::new("a2", 2)][..]), + start_time: Some(SystemTime::now()), + time: Some(SystemTime::now()), + value: 2u64, + exemplars: vec![], + }, + ], + temporality: if temporality == Temporality::Delta { + Temporality::Cumulative + } else { + Temporality::Delta + }, + is_monotonic: false, + }; + let new_attributes = [KeyValue::new("b", 2)]; + measure.call(3, AttributeSet::from(&new_attributes[..])); + + let (count, new_agg) = agg.call(Some(&mut a)); + + assert_eq!(count, 1); + assert!(new_agg.is_none()); + assert_eq!(a.temporality, temporality); + assert!(a.is_monotonic); + assert_eq!(a.data_points.len(), 1); + assert_eq!( + a.data_points[0].attributes, + AttributeSet::from(&new_attributes[..]) + ); + assert_eq!(a.data_points[0].value, 3); + } + } + + #[test] + fn explicit_bucket_histogram_aggregation() { + for temporality in [Temporality::Delta, Temporality::Cumulative] { + let (measure, agg) = AggregateBuilder::::new(Some(temporality), None) + .explicit_bucket_histogram(vec![1.0], true, true); + let mut a = Histogram { + data_points: vec![HistogramDataPoint { + attributes: AttributeSet::from(&[KeyValue::new("a2", 2)][..]), + start_time: SystemTime::now(), + time: SystemTime::now(), + count: 2, + bounds: vec![1.0, 2.0], + bucket_counts: vec![0, 1, 1], + min: None, + max: None, + sum: 3u64, + exemplars: vec![], + }], + temporality: if temporality == Temporality::Delta { + Temporality::Cumulative + } else { + Temporality::Delta + }, + }; + let new_attributes = [KeyValue::new("b", 2)]; + measure.call(3, AttributeSet::from(&new_attributes[..])); + + let (count, new_agg) = agg.call(Some(&mut a)); + + assert_eq!(count, 1); + assert!(new_agg.is_none()); + assert_eq!(a.temporality, temporality); + assert_eq!(a.data_points.len(), 1); + assert_eq!( + a.data_points[0].attributes, + AttributeSet::from(&new_attributes[..]) + ); + assert_eq!(a.data_points[0].count, 1); + assert_eq!(a.data_points[0].bounds, vec![1.0]); + assert_eq!(a.data_points[0].bucket_counts, vec![0, 1]); + assert_eq!(a.data_points[0].min, Some(3)); + assert_eq!(a.data_points[0].max, Some(3)); + assert_eq!(a.data_points[0].sum, 3); + } + } + + #[test] + fn exponential_histogram_aggregation() { + for temporality in [Temporality::Delta, Temporality::Cumulative] { + let (measure, agg) = AggregateBuilder::::new(Some(temporality), None) + .exponential_bucket_histogram(4, 20, true, true); + let mut a = ExponentialHistogram { + data_points: vec![ExponentialHistogramDataPoint { + attributes: AttributeSet::from(&[KeyValue::new("a2", 2)][..]), + start_time: SystemTime::now(), + time: SystemTime::now(), + count: 2, + min: None, + max: None, + sum: 3u64, + scale: 10, + zero_count: 1, + positive_bucket: ExponentialBucket { + offset: 1, + counts: vec![1], + }, + negative_bucket: ExponentialBucket { + offset: 1, + counts: vec![1], + }, + zero_threshold: 1.0, + exemplars: vec![], + }], + temporality: if temporality == Temporality::Delta { + Temporality::Cumulative + } else { + Temporality::Delta + }, + }; + let new_attributes = [KeyValue::new("b", 2)]; + measure.call(3, AttributeSet::from(&new_attributes[..])); + + let (count, new_agg) = agg.call(Some(&mut a)); + + assert_eq!(count, 1); + assert!(new_agg.is_none()); + assert_eq!(a.temporality, temporality); + assert_eq!(a.data_points.len(), 1); + assert_eq!( + a.data_points[0].attributes, + AttributeSet::from(&new_attributes[..]) + ); + assert_eq!(a.data_points[0].count, 1); + assert_eq!(a.data_points[0].min, Some(3)); + assert_eq!(a.data_points[0].max, Some(3)); + assert_eq!(a.data_points[0].sum, 3); + assert_eq!(a.data_points[0].zero_count, 0); + assert_eq!(a.data_points[0].zero_threshold, 0.0); + assert_eq!(a.data_points[0].positive_bucket.offset, 1661953); + assert_eq!(a.data_points[0].positive_bucket.counts, vec![1]); + assert_eq!(a.data_points[0].negative_bucket.offset, 0); + assert!(a.data_points[0].negative_bucket.counts.is_empty()); + } + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index ec0b70d003..189b61c553 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -1,9 +1,4 @@ -use std::{ - collections::HashMap, - f64::consts::LOG2_E, - sync::Mutex, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::{collections::HashMap, f64::consts::LOG2_E, sync::Mutex, time::SystemTime}; use once_cell::sync::Lazy; use opentelemetry::metrics::MetricsError; @@ -387,6 +382,7 @@ impl> ExpoHistogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; + h.data_points.clear(); let mut values = match self.values.lock() { Ok(g) => g, @@ -395,62 +391,39 @@ impl> ExpoHistogram { let n = values.len(); if n > h.data_points.capacity() { - h.data_points.reserve(n - h.data_points.capacity()); + h.data_points.reserve_exact(n - h.data_points.capacity()); } - for (i, (a, mut b)) in values.drain().enumerate() { - let el = match h.data_points.get_mut(i) { - Some(el) => el, - None => { - h.data_points.push(data::ExponentialHistogramDataPoint { - attributes: AttributeSet::default(), - start_time: UNIX_EPOCH, - time: UNIX_EPOCH, - count: 0, - min: None, - max: None, - sum: T::default(), - scale: 0, - zero_count: 0, - positive_bucket: data::ExponentialBucket { - offset: 0, - counts: vec![], - }, - negative_bucket: data::ExponentialBucket { - offset: 0, - counts: vec![], - }, - zero_threshold: 0.0, - exemplars: vec![], - }); - h.data_points.get_mut(i).unwrap() - } - }; - el.attributes = a; - el.start_time = start; - el.time = t; - el.count = b.count; - el.scale = b.scale; - el.zero_count = b.zero_count; - el.zero_threshold = 0.0; - - el.positive_bucket.offset = b.pos_buckets.start_bin; - el.positive_bucket.counts.clear(); - el.positive_bucket.counts.append(&mut b.pos_buckets.counts); - - el.negative_bucket.offset = b.neg_buckets.start_bin; - el.negative_bucket.counts.clear(); - el.negative_bucket.counts.append(&mut b.neg_buckets.counts); - - el.sum = if self.record_sum { b.sum } else { T::default() }; - - if self.record_min_max { - el.min = Some(b.min); - el.max = Some(b.max); - } else { - el.min = None; - el.max = None; - } + for (a, b) in values.drain() { + h.data_points.push(data::ExponentialHistogramDataPoint { + attributes: a, + start_time: start, + time: t, + count: b.count, + min: if self.record_min_max { + Some(b.min) + } else { + None + }, + max: if self.record_min_max { + Some(b.max) + } else { + None + }, + sum: if self.record_sum { b.sum } else { T::default() }, + scale: b.scale, + zero_count: b.zero_count, + positive_bucket: data::ExponentialBucket { + offset: b.pos_buckets.start_bin, + counts: b.pos_buckets.counts.clone(), + }, + negative_bucket: data::ExponentialBucket { + offset: b.neg_buckets.start_bin, + counts: b.neg_buckets.counts.clone(), + }, + zero_threshold: 0.0, + exemplars: vec![], + }); } // The delta collection cycle resets. @@ -488,71 +461,47 @@ impl> ExpoHistogram { Ok(g) => g, Err(_) => return (0, None), }; + h.data_points.clear(); let n = values.len(); if n > h.data_points.capacity() { - h.data_points.reserve(n - h.data_points.capacity()); + h.data_points.reserve_exact(n - h.data_points.capacity()); } // TODO: This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. - for (i, (a, b)) in values.iter().enumerate() { - let el = match h.data_points.get_mut(i) { - Some(el) => el, - None => { - h.data_points.push(data::ExponentialHistogramDataPoint { - attributes: AttributeSet::default(), - start_time: UNIX_EPOCH, - time: UNIX_EPOCH, - count: 0, - min: None, - max: None, - sum: T::default(), - scale: 0, - zero_count: 0, - positive_bucket: data::ExponentialBucket { - offset: 0, - counts: vec![], - }, - negative_bucket: data::ExponentialBucket { - offset: 0, - counts: vec![], - }, - zero_threshold: 0.0, - exemplars: vec![], - }); - h.data_points.get_mut(i).unwrap() - } - }; - el.attributes = a.clone(); - el.start_time = start; - el.time = t; - el.count = b.count; - el.scale = b.scale; - el.zero_count = b.zero_count; - el.zero_threshold = 0.0; - - el.positive_bucket.offset = b.pos_buckets.start_bin; - el.positive_bucket.counts.clear(); - el.positive_bucket - .counts - .extend_from_slice(&b.pos_buckets.counts); - - el.negative_bucket.offset = b.neg_buckets.start_bin; - el.negative_bucket.counts.clear(); - el.negative_bucket - .counts - .extend_from_slice(&b.neg_buckets.counts); - - if self.record_sum { - el.sum = b.sum; - } - if self.record_min_max { - el.min = Some(b.min); - el.max = Some(b.max); - } + for (a, b) in values.iter() { + h.data_points.push(data::ExponentialHistogramDataPoint { + attributes: a.clone(), + start_time: start, + time: t, + count: b.count, + min: if self.record_min_max { + Some(b.min) + } else { + None + }, + max: if self.record_min_max { + Some(b.max) + } else { + None + }, + sum: if self.record_sum { b.sum } else { T::default() }, + scale: b.scale, + zero_count: b.zero_count, + positive_bucket: data::ExponentialBucket { + offset: b.pos_buckets.start_bin, + counts: b.pos_buckets.counts.clone(), + }, + negative_bucket: data::ExponentialBucket { + offset: b.neg_buckets.start_bin, + counts: b.neg_buckets.counts.clone(), + }, + zero_threshold: 0.0, + exemplars: vec![], + }); } (n, new_agg.map(|a| Box::new(a) as Box<_>)) diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 535b9a8586..45ac569e2b 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -156,62 +156,38 @@ impl> Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; + h.data_points.clear(); let n = values.len(); if n > h.data_points.capacity() { h.data_points.reserve_exact(n - h.data_points.capacity()); } - for (i, (a, b)) in values.drain().enumerate() { - if let Some(dp) = h.data_points.get_mut(i) { - dp.attributes = a; - dp.start_time = start; - dp.time = t; - dp.count = b.count; - dp.bounds = self.hist_values.bounds.clone(); - dp.bucket_counts = b.counts.clone(); - dp.sum = if self.hist_values.record_sum { + for (a, b) in values.drain() { + h.data_points.push(HistogramDataPoint { + attributes: a, + start_time: start, + time: t, + count: b.count, + bounds: self.hist_values.bounds.clone(), + bucket_counts: b.counts.clone(), + sum: if self.hist_values.record_sum { b.total } else { T::default() - }; - dp.min = if self.record_min_max { + }, + min: if self.record_min_max { Some(b.min) } else { None - }; - dp.max = if self.record_min_max { + }, + max: if self.record_min_max { Some(b.max) } else { None - }; - dp.exemplars.clear(); - } else { - h.data_points.push(HistogramDataPoint { - attributes: a, - start_time: start, - time: t, - count: b.count, - bounds: self.hist_values.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.hist_values.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); - }; + }, + exemplars: vec![], + }); } // The delta collection cycle resets. @@ -219,8 +195,6 @@ impl> Histogram { *start = t; } - h.data_points.truncate(n); - (n, new_agg.map(|a| Box::new(a) as Box<_>)) } @@ -249,70 +223,44 @@ impl> Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; + h.data_points.clear(); let n = values.len(); if n > h.data_points.capacity() { - h.data_points.reserve(n - h.data_points.capacity()); + h.data_points.reserve_exact(n - h.data_points.capacity()); } // TODO: This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. - for (i, (a, b)) in values.iter().enumerate() { - if let Some(dp) = h.data_points.get_mut(i) { - dp.attributes = a.clone(); - dp.start_time = start; - dp.time = t; - dp.count = b.count; - dp.bounds = self.hist_values.bounds.clone(); - dp.bucket_counts = b.counts.clone(); - dp.sum = if self.hist_values.record_sum { + for (a, b) in values.iter() { + h.data_points.push(HistogramDataPoint { + attributes: a.clone(), + start_time: start, + time: t, + count: b.count, + bounds: self.hist_values.bounds.clone(), + bucket_counts: b.counts.clone(), + sum: if self.hist_values.record_sum { b.total } else { T::default() - }; - dp.min = if self.record_min_max { + }, + min: if self.record_min_max { Some(b.min) } else { None - }; - dp.max = if self.record_min_max { + }, + max: if self.record_min_max { Some(b.max) } else { None - }; - dp.exemplars.clear(); - } else { - h.data_points.push(HistogramDataPoint { - attributes: a.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.hist_values.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.hist_values.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); - }; + }, + exemplars: vec![], + }); } - h.data_points.truncate(n); - (n, new_agg.map(|a| Box::new(a) as Box<_>)) } } diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index 57f35fc166..e5b2364b5b 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -53,37 +53,25 @@ impl> LastValue { } pub(crate) fn compute_aggregation(&self, dest: &mut Vec>) { + dest.clear(); let mut values = match self.values.lock() { Ok(guard) if !guard.is_empty() => guard, - _ => { - dest.clear(); // poisoned or no values recorded yet - return; - } + _ => return, }; let n = values.len(); if n > dest.capacity() { - dest.reserve(n - dest.capacity()); + dest.reserve_exact(n - dest.capacity()); } - for (i, (attrs, value)) in values.drain().enumerate() { - if let Some(dp) = dest.get_mut(i) { - dp.attributes = attrs; - dp.time = Some(value.timestamp); - dp.value = value.value; - dp.start_time = None; - dp.exemplars.clear(); - } else { - dest.push(DataPoint { - attributes: attrs, - time: Some(value.timestamp), - value: value.value, - start_time: None, - exemplars: vec![], - }); - } + for (attrs, value) in values.drain() { + dest.push(DataPoint { + attributes: attrs, + time: Some(value.timestamp), + value: value.value, + start_time: None, + exemplars: vec![], + }); } - - dest.truncate(n) } } diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 3adcd24d23..3fac77c459 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -96,6 +96,7 @@ impl> Sum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; + s_data.data_points.clear(); let mut values = match self.value_map.values.lock() { Ok(v) => v, @@ -106,27 +107,20 @@ impl> Sum { if n > s_data.data_points.capacity() { s_data .data_points - .reserve(n - s_data.data_points.capacity()); + .reserve_exact(n - s_data.data_points.capacity()); } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - for (i, (attrs, value)) in values.drain().enumerate() { - if let Some(dp) = s_data.data_points.get_mut(i) { - dp.attributes = attrs; - dp.start_time = Some(prev_start); - dp.time = Some(t); - dp.value = value; - dp.exemplars.clear() - } else { - s_data.data_points.push(DataPoint { - attributes: attrs, - start_time: Some(prev_start), - time: Some(t), - value, - exemplars: vec![], - }); - } + for (attrs, value) in values.drain() { + s_data.data_points.push(DataPoint { + attributes: attrs, + start_time: Some(prev_start), + time: Some(t), + value, + exemplars: vec![], + }); } + // The delta collection cycle resets. if let Ok(mut start) = self.start.lock() { *start = t; @@ -154,6 +148,7 @@ impl> Sum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; + s_data.data_points.clear(); let values = match self.value_map.values.lock() { Ok(v) => v, @@ -164,7 +159,7 @@ impl> Sum { if n > s_data.data_points.capacity() { s_data .data_points - .reserve(n - s_data.data_points.capacity()); + .reserve_exact(n - s_data.data_points.capacity()); } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); @@ -172,22 +167,14 @@ impl> Sum { // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. - for (i, (attrs, value)) in values.iter().enumerate() { - if let Some(dp) = s_data.data_points.get_mut(i) { - dp.attributes = attrs.clone(); - dp.start_time = Some(prev_start); - dp.time = Some(t); - dp.value = *value; - dp.exemplars.clear() - } else { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: *value, - exemplars: vec![], - }); - } + for (attrs, value) in values.iter() { + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: *value, + exemplars: vec![], + }); } (n, new_agg.map(|a| Box::new(a) as Box<_>)) @@ -234,6 +221,9 @@ impl> PrecomputedSum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); + s_data.data_points.clear(); + s_data.temporality = Temporality::Delta; + s_data.is_monotonic = self.monotonic; let mut values = match self.value_map.values.lock() { Ok(v) => v, @@ -244,7 +234,7 @@ impl> PrecomputedSum { if n > s_data.data_points.capacity() { s_data .data_points - .reserve(n - s_data.data_points.capacity()); + .reserve_exact(n - s_data.data_points.capacity()); } let mut new_reported = HashMap::with_capacity(n); let mut reported = match self.reported.lock() { @@ -253,26 +243,18 @@ impl> PrecomputedSum { }; let default = T::default(); - for (i, (attrs, value)) in values.drain().enumerate() { + for (attrs, value) in values.drain() { let delta = value - *reported.get(&attrs).unwrap_or(&default); if delta != default { new_reported.insert(attrs.clone(), value); } - if let Some(dp) = s_data.data_points.get_mut(i) { - dp.attributes = attrs.clone(); - dp.start_time = Some(prev_start); - dp.time = Some(t); - dp.value = delta; - dp.exemplars.clear(); - } else { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); - } + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: delta, + exemplars: vec![], + }); } // The delta collection cycle resets. @@ -304,6 +286,9 @@ impl> PrecomputedSum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); + s_data.data_points.clear(); + s_data.temporality = Temporality::Cumulative; + s_data.is_monotonic = self.monotonic; let values = match self.value_map.values.lock() { Ok(v) => v, @@ -314,7 +299,7 @@ impl> PrecomputedSum { if n > s_data.data_points.capacity() { s_data .data_points - .reserve(n - s_data.data_points.capacity()); + .reserve_exact(n - s_data.data_points.capacity()); } let mut new_reported = HashMap::with_capacity(n); let mut reported = match self.reported.lock() { @@ -323,26 +308,18 @@ impl> PrecomputedSum { }; let default = T::default(); - for (i, (attrs, value)) in values.iter().enumerate() { + for (attrs, value) in values.iter() { let delta = *value - *reported.get(attrs).unwrap_or(&default); if delta != default { new_reported.insert(attrs.clone(), *value); } - if let Some(dp) = s_data.data_points.get_mut(i) { - dp.attributes = attrs.clone(); - dp.start_time = Some(prev_start); - dp.time = Some(t); - dp.value = delta; - dp.exemplars.clear(); - } else { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); - } + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: delta, + exemplars: vec![], + }); } *reported = new_reported;