diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index cad697774a2..e9068a4b936 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -18,10 +18,16 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( + "time" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +// now is used to return the current local time while allowing tests to +// override the the default time.Now function. +var now = time.Now + // Aggregator forms an aggregation from a collection of recorded measurements. type Aggregator[N int64 | float64] interface { // Aggregate records the measurement, scoped by attr, and aggregates it diff --git a/sdk/metric/internal/aggregator_example_test.go b/sdk/metric/internal/aggregator_example_test.go index 348f35d21e0..dc4a0cd499e 100644 --- a/sdk/metric/internal/aggregator_example_test.go +++ b/sdk/metric/internal/aggregator_example_test.go @@ -48,7 +48,7 @@ func (p *syncInt64Provider) Counter(string, ...instrument.Option) (syncint64.Cou // and View configuration. Assume here these are determined to be a // cumulative sum. - aggregator := NewCumulativeSum[int64]() + aggregator := NewCumulativeSum[int64](true) count := inst{aggregateFunc: aggregator.Aggregate} p.aggregations = append(p.aggregations, aggregator.Aggregation()) diff --git a/sdk/metric/internal/aggregator_test.go b/sdk/metric/internal/aggregator_test.go index 645352153c1..e93d643b642 100644 --- a/sdk/metric/internal/aggregator_test.go +++ b/sdk/metric/internal/aggregator_test.go @@ -39,7 +39,8 @@ var ( bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false)) carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false)) - monoIncr = setMap{alice: 1, bob: 10, carol: 2} + monoIncr = setMap{alice: 1, bob: 10, carol: 2} + nonMonoIncr = setMap{alice: 1, bob: -1, carol: 2} // Sat Jan 01 2000 00:00:00 GMT+0000. staticTime = time.Unix(946684800, 0) diff --git a/sdk/metric/internal/lastvalue.go b/sdk/metric/internal/lastvalue.go index 1cfeeb59a91..48e1b426c76 100644 --- a/sdk/metric/internal/lastvalue.go +++ b/sdk/metric/internal/lastvalue.go @@ -25,10 +25,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -// now is used to return the current local time while allowing tests to -// override the the default time.Now function. -var now = time.Now - // datapoint is timestamped measurement data. type datapoint[N int64 | float64] struct { timestamp time.Time diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 839b4690127..b80dcd9c40b 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -18,63 +18,139 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( + "sync" + "time" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -// sum summarizes a set of measurements as their arithmetic sum. -type sum[N int64 | float64] struct { - // TODO(#2972): implement. +// valueMap is the storage for all sums. +type valueMap[N int64 | float64] struct { + sync.Mutex + values map[attribute.Set]N +} + +func newValueMap[N int64 | float64]() *valueMap[N] { + return &valueMap[N]{values: make(map[attribute.Set]N)} } -func (s *sum[N]) Aggregate(value N, attr attribute.Set) { - // TODO(#2972): implement. +func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) { + s.Lock() + s.values[attr] += value + s.Unlock() } // NewDeltaSum returns an Aggregator that summarizes a set of measurements as // their arithmetic sum. Each sum is scoped by attributes and the aggregation // cycle the measurements were made in. // +// The monotonic value is used to communicate the produced Aggregation is +// monotonic or not. The returned Aggregator does not make any guarantees this +// value is accurate. It is up to the caller to ensure it. +// // Each aggregation cycle is treated independently. When the returned -// Aggregator's Aggregations method is called it will reset all sums to zero. -func NewDeltaSum[N int64 | float64]() Aggregator[N] { - // TODO(#2972): implement. - return &deltaSum[N]{} +// Aggregator's Aggregation method is called it will reset all sums to zero. +func NewDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { + return &deltaSum[N]{ + valueMap: newValueMap[N](), + monotonic: monotonic, + start: now(), + } } // deltaSum summarizes a set of measurements made in a single aggregation // cycle as their arithmetic sum. type deltaSum[N int64 | float64] struct { - sum[N] + *valueMap[N] - // TODO(#2972): implement. + monotonic bool + start time.Time } func (s *deltaSum[N]) Aggregation() metricdata.Aggregation { - // TODO(#2972): implement. - return nil + out := metricdata.Sum[N]{ + Temporality: metricdata.DeltaTemporality, + IsMonotonic: s.monotonic, + } + + s.Lock() + defer s.Unlock() + + if len(s.values) == 0 { + return out + } + + t := now() + out.DataPoints = make([]metricdata.DataPoint[N], 0, len(s.values)) + for attr, value := range s.values { + out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ + Attributes: attr, + StartTime: s.start, + Time: t, + Value: value, + }) + // Unused attribute sets do not report. + delete(s.values, attr) + } + // The delta collection cycle resets. + s.start = t + return out } // NewCumulativeSum returns an Aggregator that summarizes a set of -// measurements as their arithmetic sum. Each sum is scoped by attributes. +// measurements as their arithmetic sum. Each sum is scoped by attributes and +// the aggregation cycle the measurements were made in. +// +// The monotonic value is used to communicate the produced Aggregation is +// monotonic or not. The returned Aggregator does not make any guarantees this +// value is accurate. It is up to the caller to ensure it. // -// Each aggregation cycle builds from the previous, the sums are the -// arithmetic sum of all values aggregated since the returned Aggregator was -// created. -func NewCumulativeSum[N int64 | float64]() Aggregator[N] { - // TODO(#2972): implement. - return &cumulativeSum[N]{} +// Each aggregation cycle is treated independently. When the returned +// Aggregator's Aggregation method is called it will reset all sums to zero. +func NewCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { + return &cumulativeSum[N]{ + valueMap: newValueMap[N](), + monotonic: monotonic, + start: now(), + } } // cumulativeSum summarizes a set of measurements made over all aggregation // cycles as their arithmetic sum. type cumulativeSum[N int64 | float64] struct { - sum[N] + *valueMap[N] - // TODO(#2972): implement. + monotonic bool + start time.Time } func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { - // TODO(#2972): implement. - return nil + out := metricdata.Sum[N]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: s.monotonic, + } + + s.Lock() + defer s.Unlock() + + if len(s.values) == 0 { + return out + } + + t := now() + out.DataPoints = make([]metricdata.DataPoint[N], 0, len(s.values)) + for attr, value := range s.values { + out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ + Attributes: attr, + StartTime: s.start, + Time: t, + Value: value, + }) + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + } + return out } diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go new file mode 100644 index 00000000000..668afd1a022 --- /dev/null +++ b/sdk/metric/internal/sum_test.go @@ -0,0 +1,135 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "testing" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +func TestSum(t *testing.T) { + t.Cleanup(mockTime(now)) + t.Run("Int64", testSum[int64]) + t.Run("Float64", testSum[float64]) +} + +func testSum[N int64 | float64](t *testing.T) { + tester := &aggregatorTester[N]{ + GoroutineN: defaultGoroutines, + MeasurementN: defaultMeasurements, + CycleN: defaultCycles, + } + + t.Run("Delta", func(t *testing.T) { + incr, mono := monoIncr, true + eFunc := deltaExpecter[N](incr, mono) + t.Run("Monotonic", tester.Run(NewDeltaSum[N](mono), incr, eFunc)) + + incr, mono = nonMonoIncr, false + eFunc = deltaExpecter[N](incr, mono) + t.Run("NonMonotonic", tester.Run(NewDeltaSum[N](mono), incr, eFunc)) + }) + + t.Run("Cumulative", func(t *testing.T) { + incr, mono := monoIncr, true + eFunc := cumuExpecter[N](incr, mono) + t.Run("Monotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc)) + + incr, mono = nonMonoIncr, false + eFunc = cumuExpecter[N](incr, mono) + t.Run("NonMonotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc)) + }) +} + +func deltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc { + sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono} + return func(m int) metricdata.Aggregation { + sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) + for a, v := range incr { + sum.DataPoints = append(sum.DataPoints, point[N](a, N(v*m))) + } + return sum + } +} + +func cumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc { + var cycle int + sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono} + return func(m int) metricdata.Aggregation { + cycle++ + sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) + for a, v := range incr { + sum.DataPoints = append(sum.DataPoints, point[N](a, N(v*cycle*m))) + } + return sum + } +} + +// point returns a DataPoint that started and ended now. +func point[N int64 | float64](a attribute.Set, v N) metricdata.DataPoint[N] { + return metricdata.DataPoint[N]{ + Attributes: a, + StartTime: now(), + Time: now(), + Value: N(v), + } +} + +func testDeltaSumReset[N int64 | float64](t *testing.T) { + t.Cleanup(mockTime(now)) + + expect := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality} + a := NewDeltaSum[N](false) + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) + + a.Aggregate(1, alice) + expect.DataPoints = []metricdata.DataPoint[N]{point[N](alice, 1)} + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) + + // The attr set should be forgotten once Aggregations is called. + expect.DataPoints = nil + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) + + // Aggregating another set should not affect the original (alice). + a.Aggregate(1, bob) + expect.DataPoints = []metricdata.DataPoint[N]{point[N](bob, 1)} + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) +} + +func TestDeltaSumReset(t *testing.T) { + t.Run("Int64", testDeltaSumReset[int64]) + t.Run("Float64", testDeltaSumReset[float64]) +} + +func BenchmarkSum(b *testing.B) { + b.Run("Int64", benchmarkSum[int64]) + b.Run("Float64", benchmarkSum[float64]) +} + +func benchmarkSum[N int64 | float64](b *testing.B) { + // The monotonic argument is only used to annotate the Sum returned from + // the Aggregation method. It should not have an effect on operational + // performance, therefore, only monotonic=false is benchmarked here. + factory := func() Aggregator[N] { return NewDeltaSum[N](false) } + b.Run("Delta", benchmarkAggregator(factory)) + factory = func() Aggregator[N] { return NewCumulativeSum[N](false) } + b.Run("Cumulative", benchmarkAggregator(factory)) +}