diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 952e9a4a8bdc..87b8dd0d3570 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -27,8 +27,8 @@ var now = time.Now // Aggregator forms an aggregation from a collection of recorded measurements. // -// Aggregators need to be comparable so they can be de-duplicated by the SDK when -// it creates them for multiple views. +// Aggregators need to be comparable so they can be de-duplicated by the SDK +// when it creates them for multiple views. type Aggregator[N int64 | float64] interface { // Aggregate records the measurement, scoped by attr, and aggregates it // into an aggregation. @@ -38,3 +38,22 @@ type Aggregator[N int64 | float64] interface { // measurements made and ends an aggregation cycle. Aggregation() metricdata.Aggregation } + +// precomputeAggregator is an Aggregator that recieves values to aggregate that +// have been pre-computed by the caller. +type precomputeAggregator[N int64 | float64] interface { + // The Aggregate method of the embedded Aggregator is used to record + // pre-computed measurements, scoped by attributes that have not been + // filtered by an attribute filter. + Aggregator[N] + + // aggregateFiltered records measurements scoped by attributes that have + // been filtered by an attribute filter. + // + // Pre-computed measurements of filtered attributes need to be recorded + // separate from those that haven't been filtered so they can be added to + // the non-filtered pre-computed measurements in a collection cycle and + // then resest after the cycle (the non-filtered pre-computed measurements + // are not reset). + aggregateFiltered(N, attribute.Set) +} diff --git a/sdk/metric/internal/filter.go b/sdk/metric/internal/filter.go index a08c61e3d823..4d24b62819a6 100644 --- a/sdk/metric/internal/filter.go +++ b/sdk/metric/internal/filter.go @@ -21,26 +21,26 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -type filterAgg[N int64 | float64] interface { - Aggregator[N] - - // filtered records values for attributes that have been filtered. - filtered(N, attribute.Set) -} - -// NewFilter wraps an Aggregator with an attribute filtering function. +// NewFilter returns an Aggregator that wraps an agg with an attribute +// filtering function. Both pre-computed non-pre-computed Aggregators can be +// passed for agg. An appropriate Aggregator will be returned for the detected +// type. func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] { if fn == nil { return agg } - if fa, ok := agg.(filterAgg[N]); ok { + if fa, ok := agg.(precomputeAggregator[N]); ok { return newPrecomputedFilter(fa, fn) } return newFilter(agg, fn) } -// filter is an aggregator that applies attribute filter when Aggregating. filters -// do not have any backing memory, and must be constructed with a backing Aggregator. +// filter wraps an aggregator with an attribute filter. All recorded +// measurements will have their attributes filtered before they are passed to +// the underlying aggregator's Aggregate method. +// +// This should not be used to wrap a pre-computed Aggregator. Use a +// precomputedFilter instead. type filter[N int64 | float64] struct { filter attribute.Filter aggregator Aggregator[N] @@ -49,6 +49,11 @@ type filter[N int64 | float64] struct { seen map[attribute.Set]attribute.Set } +// newFilter returns an filter Aggregator that wraps agg with the attribute +// filter fn. +// +// This should not be used to wrap a pre-computed Aggregator. Use a +// precomputedFilter instead. func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] { return &filter[N]{ filter: fn, @@ -78,25 +83,33 @@ func (f *filter[N]) Aggregation() metricdata.Aggregation { } // precomputedFilter is an aggregator that applies attribute filter when -// Aggregating for precomputed Aggregations. The precomputed Aggregations need -// to operate normally when no attribute filtering is done (for sums this means -// setting the value), but when attribute filtering is done it needs to be -// added to any set value. +// Aggregating for pre-computed Aggregations. The pre-computed Aggregations +// need to operate normally when no attribute filtering is done (for sums this +// means setting the value), but when attribute filtering is done it needs to +// be added to any set value. type precomputedFilter[N int64 | float64] struct { filter attribute.Filter - aggregator filterAgg[N] + aggregator precomputeAggregator[N] sync.Mutex seen map[attribute.Set]attribute.Set } -func newPrecomputedFilter[N int64 | float64](agg filterAgg[N], fn attribute.Filter) *precomputedFilter[N] { +// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg +// with the attribute filter fn. +// +// This should not be used to wrap a non-pre-computed Aggregator. Use a +// precomputedFilter instead. +func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] { return &precomputedFilter[N]{ filter: fn, aggregator: agg, seen: make(map[attribute.Set]attribute.Set), } } + +// Aggregate records the measurement, scoped by attr, and aggregates it +// into an aggregation. func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) { // TODO (#3006): drop stale attributes from seen. f.Lock() @@ -110,10 +123,12 @@ func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) { // No filtering done. f.aggregator.Aggregate(measurement, fAttr) } else { - f.aggregator.filtered(measurement, fAttr) + f.aggregator.aggregateFiltered(measurement, fAttr) } } +// Aggregation returns an Aggregation, for all the aggregated +// measurements made and ends an aggregation cycle. func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation { return f.aggregator.Aggregation() } diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 2fff3ba00d8b..7783dc66490c 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -158,14 +158,17 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { return out } +// precomputedValue is the recorded measurement value for a set of attributes. type precomputedValue[N int64 | float64] struct { - // measured is the value directly measured. + // measured is the last value measured for a set of attributes that were + // not filtered. measured N - // filtered is the sum of values from spatially aggregations. + // filtered is the sum of values from measurements that had their + // attributes filtered. filtered N } -// valueMap is the storage for precomputed sums. +// precomputedMap is the storage for precomputed sums. type precomputedMap[N int64 | float64] struct { sync.Mutex values map[attribute.Set]precomputedValue[N] @@ -177,7 +180,14 @@ func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] { } } -// Aggregate records value as a cumulative sum for attr. +// Aggregate records value with the unfiltered attributes attr. +// +// If a previous measurement was made for the same attribute set: +// +// - If that measurement's attributes were not filtered, this value overwrite +// that value. +// - If that measurement's attributes were filtered, this value will be +// recorded along side that value. func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) { s.Lock() v := s.values[attr] @@ -186,8 +196,18 @@ func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) { s.Unlock() } -// filtered records value with spatially re-aggregated attrs. -func (s *precomputedMap[N]) filtered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered. +// aggregateFiltered records value with the filtered attributes attr. +// +// If a previous measurement was made for the same attribute set: +// +// - If that measurement's attributes were not filtered, this value will be +// recorded along side that value. +// - If that measurement's attributes were filtered, this value will be +// added to it. +// +// This method should not be used if attr have not been reduced by an attribute +// filter. +func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered. s.Lock() v := s.values[attr] v.filtered += value @@ -196,15 +216,14 @@ func (s *precomputedMap[N]) filtered(value N, attr attribute.Set) { // nolint: u } // NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of -// measurements as their pre-computed arithmetic sum. Each sum is scoped by -// attributes and the aggregation cycle the measurements were made in. +// pre-computed sums. Each sum is scoped by attributes and the aggregation +// cycle the measurements were made in. // // The monotonic value is used to communicate the produced Aggregation is // monotonic or not. The returned Aggregator does not make any guarantees this // value is accurate. It is up to the caller to ensure it. // -// The output Aggregation will report recorded values as delta temporality. It -// is up to the caller to ensure this is accurate. +// The output Aggregation will report recorded values as delta temporality. func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { return &precomputedDeltaSum[N]{ precomputedMap: newPrecomputedMap[N](), @@ -214,8 +233,8 @@ func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { } } -// precomputedDeltaSum summarizes a set of measurements recorded over all -// aggregation cycles as the delta arithmetic sum. +// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all +// aggregation cycles as the delta of these sums. type precomputedDeltaSum[N int64 | float64] struct { *precomputedMap[N] @@ -225,6 +244,16 @@ type precomputedDeltaSum[N int64 | float64] struct { start time.Time } +// Aggregation returns the recorded pre-computed sums as an Aggregation. The +// sum values are expressed as the delta between what was measured this +// collection cycle and the previous. +// +// All pre-computed sums that were recorded for attributes sets reduced by an +// attribute filter (filtered-sums) are summed together and added to any +// pre-computed sum value recorded directly for the resulting attribute set +// (unfiltered-sum). The filtered-sums are reset to zero for the next +// collection cycle, and the unfiltered-sum is kept for the next collection +// cycle. func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { s.Lock() defer s.Unlock() @@ -264,15 +293,15 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { } // NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of -// measurements as their pre-computed arithmetic sum. Each sum is scoped by -// attributes and the aggregation cycle the measurements were made in. +// pre-computed sums. Each sum is scoped by attributes and the aggregation +// cycle the measurements were made in. // // The monotonic value is used to communicate the produced Aggregation is // monotonic or not. The returned Aggregator does not make any guarantees this // value is accurate. It is up to the caller to ensure it. // // The output Aggregation will report recorded values as cumulative -// temporality. It is up to the caller to ensure this is accurate. +// temporality. func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { return &precomputedCumulativeSum[N]{ precomputedMap: newPrecomputedMap[N](), @@ -281,8 +310,7 @@ func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N } } -// precomputedCumulativeSum summarizes a set of measurements recorded over all -// aggregation cycles directly as the cumulative arithmetic sum. +// precomputedCumulativeSum directly records and reports a set of pre-computed sums. type precomputedCumulativeSum[N int64 | float64] struct { *precomputedMap[N] @@ -290,6 +318,16 @@ type precomputedCumulativeSum[N int64 | float64] struct { start time.Time } +// Aggregation returns the recorded pre-computed sums as an Aggregation. The +// sum values are expressed directly as they are assumed to be recorded as the +// cumulative sum of a some measured phenomena. +// +// All pre-computed sums that were recorded for attributes sets reduced by an +// attribute filter (filtered-sums) are summed together and added to any +// pre-computed sum value recorded directly for the resulting attribute set +// (unfiltered-sum). The filtered-sums are reset to zero for the next +// collection cycle, and the unfiltered-sum is kept for the next collection +// cycle. func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { s.Lock() defer s.Unlock() diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go index b3e18d764da2..cde79aaa92ba 100644 --- a/sdk/metric/internal/sum_test.go +++ b/sdk/metric/internal/sum_test.go @@ -167,7 +167,7 @@ func TestDeltaSumReset(t *testing.T) { func TestPreComputedDeltaSum(t *testing.T) { var mono bool agg := NewPrecomputedDeltaSum[int64](mono) - require.Implements(t, (*filterAgg[int64])(nil), agg) + require.Implements(t, (*precomputeAggregator[int64])(nil), agg) attrs := attribute.NewSet(attribute.String("key", "val")) agg.Aggregate(1, attrs) @@ -185,7 +185,7 @@ func TestPreComputedDeltaSum(t *testing.T) { want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) - agg.(filterAgg[int64]).filtered(1, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) got = agg.Aggregation() // measured(+): 1, previous(-): 1, filtered(+): 1 want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} @@ -205,8 +205,8 @@ func TestPreComputedDeltaSum(t *testing.T) { agg.Aggregate(2, attrs) agg.Aggregate(5, attrs) // Filtered should add. - agg.(filterAgg[int64]).filtered(3, attrs) - agg.(filterAgg[int64]).filtered(10, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) got = agg.Aggregation() // measured(+): 5, previous(-): 1, filtered(+): 13 want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)} @@ -221,9 +221,9 @@ func TestPreComputedDeltaSum(t *testing.T) { // Order should not affect measure. // Filtered should add. - agg.(filterAgg[int64]).filtered(3, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) agg.Aggregate(7, attrs) - agg.(filterAgg[int64]).filtered(10, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) got = agg.Aggregation() // measured(+): 7, previous(-): 5, filtered(+): 13 want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 15)} @@ -238,7 +238,7 @@ func TestPreComputedDeltaSum(t *testing.T) { func TestPreComputedCumulativeSum(t *testing.T) { var mono bool agg := NewPrecomputedCumulativeSum[int64](mono) - require.Implements(t, (*filterAgg[int64])(nil), agg) + require.Implements(t, (*precomputeAggregator[int64])(nil), agg) attrs := attribute.NewSet(attribute.String("key", "val")) agg.Aggregate(1, attrs) @@ -255,7 +255,7 @@ func TestPreComputedCumulativeSum(t *testing.T) { got = agg.Aggregation() metricdatatest.AssertAggregationsEqual(t, want, got, opt) - agg.(filterAgg[int64]).filtered(1, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) got = agg.Aggregation() want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) @@ -268,8 +268,8 @@ func TestPreComputedCumulativeSum(t *testing.T) { // Override set value. agg.Aggregate(5, attrs) // Filtered should add. - agg.(filterAgg[int64]).filtered(3, attrs) - agg.(filterAgg[int64]).filtered(10, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) got = agg.Aggregation() want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) @@ -281,9 +281,9 @@ func TestPreComputedCumulativeSum(t *testing.T) { // Order should not affect measure. // Filtered should add. - agg.(filterAgg[int64]).filtered(3, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) agg.Aggregate(7, attrs) - agg.(filterAgg[int64]).filtered(10, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) got = agg.Aggregation() want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)} metricdatatest.AssertAggregationsEqual(t, want, got, opt)