Skip to content

Commit

Permalink
Support Delta & Cumulative temporality for LastValue aggregates (open…
Browse files Browse the repository at this point in the history
…-telemetry#5305)

* Add delta/cumulative/precomputed LastValue agg

* Add cumulative testing

* Add precomputed testing

* Add changelog entry
  • Loading branch information
MrAlias authored May 10, 2024
1 parent 737f885 commit 69800ee
Show file tree
Hide file tree
Showing 6 changed files with 470 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- De-duplicate map attributes added to a `Record` in `go.opentelemetry.io/otel/sdk/log`. (#5230)
- The `go.opentelemetry.io/otel/exporters/stdout/stdoutlog` exporter won't print `AttributeValueLengthLimit` and `AttributeCountLimit` fields now, instead it prints the `DroppedAttributes` field. (#5272)
- Improved performance in the `Stringer` implementation of `go.opentelemetry.io/otel/baggage.Member` by reducing the number of allocations. (#5286)
- Set the start time for last-value aggregates in `go.opentelemetry.io/otel/sdk/metric`. (#5305)
- The `Span` in `go.opentelemetry.io/otel/sdk/trace` will record links without span context if either non-empty `TraceState` or attributes are provided. (#5315)

### Fixed
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func BenchmarkInstrument(b *testing.B) {
build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64]

in, _ := build.LastValue()
in, _ := build.PrecomputedLastValue()
meas = append(meas, in)

build.Temporality = metricdata.CumulativeTemporality
Expand All @@ -50,7 +50,7 @@ func BenchmarkInstrument(b *testing.B) {
build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64]

in, _ := build.LastValue()
in, _ := build.PrecomputedLastValue()
meas = append(meas, in)

build.Temporality = metricdata.CumulativeTemporality
Expand Down
29 changes: 17 additions & 12 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,26 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
}

// LastValue returns a last-value aggregate function input and output.
//
// The Builder.Temporality is ignored and delta is use always.
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
default:
return b.filter(lv.measure), lv.cumulative
}
}

return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
// reuse of the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
lv.computeAggregation(&gData.DataPoints)
*dest = gData

return len(gData.DataPoints)
// PrecomputedLastValue returns a last-value aggregate function input and
// output. The aggregation returned from the returned ComputeAggregation
// function will always only return values from the previous collection cycle.
func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) {
lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
default:
return b.filter(lv.measure), lv.cumulative
}
}

Expand Down
88 changes: 85 additions & 3 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *la
newRes: r,
limit: newLimiter[datapoint[N]](limit),
values: make(map[attribute.Distinct]datapoint[N]),
start: now(),
}
}

Expand All @@ -36,6 +37,7 @@ type lastValue[N int64 | float64] struct {
newRes func() exemplar.Reservoir
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
}

func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
Expand All @@ -58,23 +60,103 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
s.values[attr.Equivalent()] = d
}

func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = now()

*dest = gData

return n
}

func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
*dest = gData

return n
}

// copyDpts copies the datapoints held by s into dest. The number of datapoints
// copied is returned.
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N]) int {
n := len(s.values)
*dest = reset(*dest, n, n)

var i int
for _, v := range s.values {
(*dest)[i].Attributes = v.attrs
// The event time is the only meaningful timestamp, StartTime is
// ignored.
(*dest)[i].StartTime = s.start
(*dest)[i].Time = v.timestamp
(*dest)[i].Value = v.value
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
i++
}
return n
}

// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}

// precomputedLastValue summarizes a set of observations as the last one made.
type precomputedLastValue[N int64 | float64] struct {
*lastValue[N]
}

func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = now()

*dest = gData

return n
}

func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
// Do not report stale values.
clear(s.values)
*dest = gData

return n
}
Loading

0 comments on commit 69800ee

Please sign in to comment.