Skip to content

Commit

Permalink
[exporter/prometheusexporter] accumulate delta temporality histograms (
Browse files Browse the repository at this point in the history
…open-telemetry#23790)

**Description:** <Describe what has changed.>
This continues the work done in the now closed
[PR](open-telemetry#20530).
I have addressed issues raised in the original PR by
- Adding logic to handle timestamp misalignments 
- Adding fix + a out-of-bounds bug  

In addition, I have performed end-to-end testing in a local setup, and
confirmed that accumulated histogram time series are correct.

**Link to tracking Issue:** <Issue number if applicable>

open-telemetry#4968

open-telemetry#9006

open-telemetry#19153
**Testing:** <Describe what testing was performed and which tests were
added.>
Added tests for timestamp misalignment and an out-of-bounds bug
discovered in the previous PR.
End-to-end testing to ensure histogram bucket counts exported to
Prometheus are correct

---------

Signed-off-by: Loc Mai <[email protected]>
Signed-off-by: xchen <[email protected]>
Signed-off-by: stephenchen <[email protected]>
Co-authored-by: Lev Popov <[email protected]>
Co-authored-by: Lev Popov <[email protected]>
Co-authored-by: Anthony Mirabella <[email protected]>
Co-authored-by: Loc Mai <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
6 people authored and cparkins committed Jan 10, 2024
1 parent 61d9cdf commit 1f5677a
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 59 deletions.
11 changes: 11 additions & 0 deletions .chloggen/promexp-delta.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Accumulate histograms with delta temporality

# One or more tracking issues related to the change
issues: [4968]
101 changes: 84 additions & 17 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (a *lastValueAccumulator) addMetric(metric pmetric.Metric, il pcommon.Instr
case pmetric.MetricTypeSum:
return a.accumulateSum(metric, il, resourceAttrs, now)
case pmetric.MetricTypeHistogram:
return a.accumulateDoubleHistogram(metric, il, resourceAttrs, now)
return a.accumulateHistogram(metric, il, resourceAttrs, now)
case pmetric.MetricTypeSummary:
return a.accumulateSummary(metric, il, resourceAttrs, now)
default:
Expand Down Expand Up @@ -221,42 +221,70 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
return
}

func (a *lastValueAccumulator) accumulateDoubleHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
doubleHistogram := metric.Histogram()
func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
histogram := metric.Histogram()
a.logger.Debug("Accumulate histogram.....")
dps := histogram.DataPoints()

// Drop metrics with non-cumulative aggregations
if doubleHistogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
return
}

dps := doubleHistogram.DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs)
signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs) // uniquely identify this time series you are accumulating for
if ip.Flags().NoRecordedValue() {
a.registeredMetrics.Delete(signature)
return 0
}

v, ok := a.registeredMetrics.Load(signature)
v, ok := a.registeredMetrics.Load(signature) // a accumulates metric values for all times series. Get value for particular time series
if !ok {
// first data point
m := copyMetricMetadata(metric)
ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
continue
}
mv := v.(*accumulatedValue)

if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
m := copyMetricMetadata(metric)
m.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

switch histogram.AggregationTemporality() {
case pmetric.AggregationTemporalityDelta:
pp := mv.value.Histogram().DataPoints().At(0) // previous aggregated value for time range
if ip.StartTimestamp().AsTime() != pp.Timestamp().AsTime() {
// treat misalignment as restart and reset, or violation of single-writer principle and drop
a.logger.With(
zap.String("ip_start_time", ip.StartTimestamp().String()),
zap.String("pp_start_time", pp.StartTimestamp().String()),
zap.String("pp_timestamp", pp.Timestamp().String()),
zap.String("ip_timestamp", ip.Timestamp().String()),
).Warn("Misaligned starting timestamps")
if ip.StartTimestamp().AsTime().After(pp.Timestamp().AsTime()) {
a.logger.Debug("treating it like reset")
ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
} else {
a.logger.With(
zap.String("metric_name", metric.Name()),
).Warn("Dropped misaligned histogram datapoint")
continue
}
} else {
a.logger.Debug("Accumulate another histogram datapoint")
accumulateHistogramValues(pp, ip, m.Histogram().DataPoints().AppendEmpty())
}
case pmetric.AggregationTemporalityCumulative:
if ip.Timestamp().AsTime().Before(mv.value.Histogram().DataPoints().At(0).Timestamp().AsTime()) {
// only keep datapoint with latest timestamp
continue
}

ip.CopyTo(m.Histogram().DataPoints().AppendEmpty())
default:
// unsupported temporality
continue
}

m := copyMetricMetadata(metric)
ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scope: il, updated: now})
n++
}
Expand Down Expand Up @@ -316,3 +344,42 @@ func copyMetricMetadata(metric pmetric.Metric) pmetric.Metric {

return m
}

func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) {
dest.SetStartTimestamp(prev.StartTimestamp())

older := prev
newer := current
if current.Timestamp().AsTime().Before(prev.Timestamp().AsTime()) {
older = current
newer = prev
}

newer.Attributes().CopyTo(dest.Attributes())
dest.SetTimestamp(newer.Timestamp())

// checking for bucket boundary alignment, optionally re-aggregate on newer boundaries
match := older.ExplicitBounds().Len() == newer.ExplicitBounds().Len()
for i := 0; match && i < newer.ExplicitBounds().Len(); i++ {
match = older.ExplicitBounds().At(i) == newer.ExplicitBounds().At(i)
}

if match {

dest.SetCount(newer.Count() + older.Count())
dest.SetSum(newer.Sum() + older.Sum())

counts := make([]uint64, newer.BucketCounts().Len())
for i := 0; i < newer.BucketCounts().Len(); i++ {
counts[i] = newer.BucketCounts().At(i) + older.BucketCounts().At(i)
}
dest.BucketCounts().FromRaw(counts)
} else {
// use new value if bucket bounds do not match
dest.SetCount(newer.Count())
dest.SetSum(newer.Sum())
dest.BucketCounts().FromRaw(newer.BucketCounts().AsRaw())
}

dest.ExplicitBounds().FromRaw(newer.ExplicitBounds().AsRaw())
}
230 changes: 188 additions & 42 deletions exporter/prometheusexporter/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,6 @@ import (
"go.uber.org/zap"
)

func TestAccumulateDeltaAggregation(t *testing.T) {
tests := []struct {
name string
fillMetric func(time.Time, pmetric.Metric)
}{
{
name: "Histogram",
fillMetric: func(ts time.Time, metric pmetric.Metric) {
metric.SetName("test_metric")
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
metric.SetDescription("test description")
dp := metric.Histogram().DataPoints().AppendEmpty()
dp.BucketCounts().FromRaw([]uint64{5, 2})
dp.SetCount(7)
dp.ExplicitBounds().FromRaw([]float64{3.5, 10.0})
dp.SetSum(42.42)
dp.Attributes().PutStr("label_1", "1")
dp.Attributes().PutStr("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
tt.fillMetric(time.Now(), ilm.Metrics().AppendEmpty())

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
n := a.Accumulate(resourceMetrics)
require.Equal(t, 0, n)

signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), pcommon.NewMap(), pcommon.NewMap())
v, ok := a.registeredMetrics.Load(signature)
require.False(t, ok)
require.Nil(t, v)
})
}
}

func TestAccumulateMetrics(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -422,6 +380,194 @@ func TestAccumulateDeltaToCumulative(t *testing.T) {
}
}

func TestAccumulateDeltaToCumulativeHistogram(t *testing.T) {
appendDeltaHistogram := func(startTs time.Time, ts time.Time, count uint64, sum float64, counts []uint64, bounds []float64, metrics pmetric.MetricSlice) {
metric := metrics.AppendEmpty()
metric.SetName("test_metric")
metric.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
metric.SetDescription("test description")
dp := metric.Histogram().DataPoints().AppendEmpty()
dp.ExplicitBounds().FromRaw(bounds)
dp.BucketCounts().FromRaw(counts)
dp.SetCount(count)
dp.SetSum(sum)
dp.Attributes().PutStr("label_1", "1")
dp.Attributes().PutStr("label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTs))
}

t.Run("AccumulateHappyPath", func(t *testing.T) {
startTs := time.Now().Add(-5 * time.Second)
ts1 := time.Now().Add(-4 * time.Second)
ts2 := time.Now().Add(-3 * time.Second)
resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
appendDeltaHistogram(startTs, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics())
appendDeltaHistogram(ts1, ts2, 4, 8.3, []uint64{1, 1, 2, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics())

m1 := ilm.Metrics().At(0).Histogram().DataPoints().At(0)
m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0)
signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m2.Attributes(), pcommon.NewMap())

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
n := a.Accumulate(resourceMetrics)
require.Equal(t, 2, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m1.Sum()+m2.Sum(), v.Sum())
require.Equal(t, m1.Count()+m2.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m1.BucketCounts().At(i)+m2.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m2.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("ResetBuckets/Ignore", func(t *testing.T) {
startTs := time.Now().Add(-5 * time.Second)
ts1 := time.Now().Add(-3 * time.Second)
ts2 := time.Now().Add(-4 * time.Second)
resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
appendDeltaHistogram(startTs, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics())
appendDeltaHistogram(startTs, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics())

m1 := ilm.Metrics().At(0).Histogram().DataPoints().At(0)
m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0)
signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m2.Attributes(), pcommon.NewMap())

// should ignore metric with different buckets from the past
a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
n := a.Accumulate(resourceMetrics)
require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m1.Sum(), v.Sum())
require.Equal(t, m1.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m1.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m1.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("ResetBuckets/Perform", func(t *testing.T) {
// should reset when different buckets arrive
startTs := time.Now().Add(-5 * time.Second)
ts1 := time.Now().Add(-3 * time.Second)
ts2 := time.Now().Add(-2 * time.Second)
resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
appendDeltaHistogram(startTs, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics())
appendDeltaHistogram(ts1, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics())

m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0)
signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m2.Attributes(), pcommon.NewMap())

// should ignore metric with different buckets from the past
a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
n := a.Accumulate(resourceMetrics)
require.Equal(t, 2, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m2.Sum(), v.Sum())
require.Equal(t, m2.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m2.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m2.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("MisalignedTimestamps/Drop", func(t *testing.T) {
// should drop data points with different start time that's before latest timestamp
startTs1 := time.Now().Add(-5 * time.Second)
startTs2 := time.Now().Add(-4 * time.Second)
ts1 := time.Now().Add(-3 * time.Second)
ts2 := time.Now().Add(-2 * time.Second)
resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
appendDeltaHistogram(startTs1, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics())
appendDeltaHistogram(startTs2, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics())

m1 := ilm.Metrics().At(0).Histogram().DataPoints().At(0)
signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m1.Attributes(), pcommon.NewMap())

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
n := a.Accumulate(resourceMetrics)
require.Equal(t, 1, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m1.Sum(), v.Sum())
require.Equal(t, m1.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m1.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m1.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
t.Run("MisalignedTimestamps/Reset", func(t *testing.T) {
// reset when start timestamp skips ahead
startTs1 := time.Now().Add(-5 * time.Second)
startTs2 := time.Now().Add(-2 * time.Second)
ts1 := time.Now().Add(-3 * time.Second)
ts2 := time.Now().Add(-1 * time.Second)
resourceMetrics := pmetric.NewResourceMetrics()
ilm := resourceMetrics.ScopeMetrics().AppendEmpty()
ilm.Scope().SetName("test")
appendDeltaHistogram(startTs1, ts1, 5, 2.5, []uint64{1, 3, 1, 0, 0}, []float64{0.1, 0.5, 1, 10}, ilm.Metrics())
appendDeltaHistogram(startTs2, ts2, 7, 5, []uint64{3, 1, 1, 0, 0}, []float64{0.1, 0.2, 1, 10}, ilm.Metrics())

m2 := ilm.Metrics().At(1).Histogram().DataPoints().At(0)
signature := timeseriesSignature(ilm.Scope().Name(), ilm.Metrics().At(0), m2.Attributes(), pcommon.NewMap())

a := newAccumulator(zap.NewNop(), 1*time.Hour).(*lastValueAccumulator)
n := a.Accumulate(resourceMetrics)
require.Equal(t, 2, n)

m, ok := a.registeredMetrics.Load(signature)
v := m.(*accumulatedValue).value.Histogram().DataPoints().At(0)
require.True(t, ok)

require.Equal(t, m2.Sum(), v.Sum())
require.Equal(t, m2.Count(), v.Count())

for i := 0; i < v.BucketCounts().Len(); i++ {
require.Equal(t, m2.BucketCounts().At(i), v.BucketCounts().At(i))
}

for i := 0; i < v.ExplicitBounds().Len(); i++ {
require.Equal(t, m2.ExplicitBounds().At(i), v.ExplicitBounds().At(i))
}
})
}

func TestAccumulateDroppedMetrics(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 1f5677a

Please sign in to comment.