diff --git a/sdk/metric/internal/histogram.go b/sdk/metric/internal/histogram.go index d8f2514efb2..e5298e22d6f 100644 --- a/sdk/metric/internal/histogram.go +++ b/sdk/metric/internal/histogram.go @@ -18,19 +18,94 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( + "sort" + "sync" + "time" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -// histogram summarizes a set of measurements as an histogram with +type buckets struct { + counts []uint64 + count uint64 + sum float64 + min, max float64 +} + +// newBuckets returns buckets with n bins. +func newBuckets(n int) *buckets { + return &buckets{counts: make([]uint64, n)} +} + +func (b *buckets) bin(idx int, value float64) { + b.counts[idx]++ + b.count++ + b.sum += value + if value < b.min { + b.min = value + } else if value > b.max { + b.max = value + } +} + +// histValues summarizes a set of measurements as an histValues with // explicitly defined buckets. -type histogram[N int64 | float64] struct { - // TODO(#2970): implement. +type histValues[N int64 | float64] struct { + bounds []float64 + + values map[attribute.Set]*buckets + valuesMu sync.Mutex } -func (s *histogram[N]) Aggregate(value N, attr attribute.Set) { - // TODO(#2970): implement. +func newHistValues[N int64 | float64](bounds []float64) *histValues[N] { + // The responsibility of keeping all buckets correctly associated with the + // passed boundaries is ultimately this type's responsibility. Make a copy + // here so we can always guarantee this. Or, in the case of failure, have + // complete control over the fix. + b := make([]float64, len(bounds)) + copy(b, bounds) + sort.Float64s(b) + return &histValues[N]{ + bounds: b, + values: make(map[attribute.Set]*buckets), + } +} + +// Aggregate records the measurement value, scoped by attr, and aggregates it +// into a histogram. +func (s *histValues[N]) Aggregate(value N, attr attribute.Set) { + // Accept all types to satisfy the Aggregator interface. However, since + // the Aggregation produced by this Aggregator is only float64, convert + // here to only use this type. + v := float64(value) + + // This search will return an index in the range [0, len(s.bounds)], where + // it will return len(s.bounds) if value is greater than the last element + // of s.bounds. This aligns with the buckets in that the length of buckets + // is len(s.bounds)+1, with the last bucket representing: + // (s.bounds[len(s.bounds)-1], +∞). + idx := sort.SearchFloat64s(s.bounds, v) + + s.valuesMu.Lock() + defer s.valuesMu.Unlock() + + b, ok := s.values[attr] + if !ok { + // N+1 buckets. For example: + // + // bounds = [0, 5, 10] + // + // Then, + // + // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) + b = newBuckets(len(s.bounds) + 1) + // Ensure min and max are recorded values (not zero), for new buckets. + b.min, b.max = v, v + s.values[attr] = b + } + b.bin(idx, v) } // NewDeltaHistogram returns an Aggregator that summarizes a set of @@ -41,20 +116,59 @@ func (s *histogram[N]) Aggregate(value N, attr attribute.Set) { // Aggregator's Aggregations method is called it will reset all histogram // counts to zero. func NewDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { - return &deltaHistogram[N]{} + return &deltaHistogram[N]{ + histValues: newHistValues[N](cfg.Boundaries), + noMinMax: cfg.NoMinMax, + start: now(), + } } // deltaHistogram summarizes a set of measurements made in a single // aggregation cycle as an histogram with explicitly defined buckets. type deltaHistogram[N int64 | float64] struct { - histogram[N] + *histValues[N] - // TODO(#2970): implement. + noMinMax bool + start time.Time } func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation { - // TODO(#2970): implement. - return nil + h := metricdata.Histogram{Temporality: metricdata.DeltaTemporality} + + s.valuesMu.Lock() + defer s.valuesMu.Unlock() + + if len(s.values) == 0 { + return h + } + + // Do not allow modification of our copy of bounds. + bounds := make([]float64, len(s.bounds)) + copy(bounds, s.bounds) + t := now() + h.DataPoints = make([]metricdata.HistogramDataPoint, 0, len(s.values)) + for a, b := range s.values { + hdp := metricdata.HistogramDataPoint{ + Attributes: a, + StartTime: s.start, + Time: t, + Count: b.count, + Bounds: bounds, + BucketCounts: b.counts, + Sum: b.sum, + } + if !s.noMinMax { + hdp.Min = &b.min + hdp.Max = &b.max + } + h.DataPoints = append(h.DataPoints, hdp) + + // Unused attribute sets do not report. + delete(s.values, a) + } + // The delta collection cycle resets. + s.start = t + return h } // NewCumulativeHistogram returns an Aggregator that summarizes a set of @@ -64,18 +178,66 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation { // the bucketed counts of all values aggregated since the returned Aggregator // was created. func NewCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { - return &cumulativeHistogram[N]{} + return &cumulativeHistogram[N]{ + histValues: newHistValues[N](cfg.Boundaries), + noMinMax: cfg.NoMinMax, + start: now(), + } } // cumulativeHistogram summarizes a set of measurements made over all // aggregation cycles as an histogram with explicitly defined buckets. type cumulativeHistogram[N int64 | float64] struct { - histogram[N] + *histValues[N] - // TODO(#2970): implement. + noMinMax bool + start time.Time } func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation { - // TODO(#2970): implement. - return nil + h := metricdata.Histogram{Temporality: metricdata.CumulativeTemporality} + + s.valuesMu.Lock() + defer s.valuesMu.Unlock() + + if len(s.values) == 0 { + return h + } + + // Do not allow modification of our copy of bounds. + bounds := make([]float64, len(s.bounds)) + copy(bounds, s.bounds) + t := now() + h.DataPoints = make([]metricdata.HistogramDataPoint, 0, len(s.values)) + for a, b := range s.values { + // The HistogramDataPoint field values returned need to be copies of + // the buckets value as we will keep updating them. + // + // TODO (#3047): Making copies for bounds and counts incurs a large + // memory allocation footprint. Alternatives should be explored. + counts := make([]uint64, len(b.counts)) + copy(counts, b.counts) + + hdp := metricdata.HistogramDataPoint{ + Attributes: a, + StartTime: s.start, + Time: t, + Count: b.count, + Bounds: bounds, + BucketCounts: counts, + Sum: b.sum, + } + if !s.noMinMax { + // Similar to counts, make a copy. + min, max := b.min, b.max + hdp.Min = &min + hdp.Max = &max + } + h.DataPoints = append(h.DataPoints, hdp) + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + } + return h } diff --git a/sdk/metric/internal/histogram_test.go b/sdk/metric/internal/histogram_test.go new file mode 100644 index 00000000000..edeaf8a6945 --- /dev/null +++ b/sdk/metric/internal/histogram_test.go @@ -0,0 +1,203 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +var ( + bounds = []float64{1, 5} + histConf = aggregation.ExplicitBucketHistogram{ + Boundaries: bounds, + NoMinMax: false, + } +) + +func TestHistogram(t *testing.T) { + t.Cleanup(mockTime(now)) + t.Run("Int64", testHistogram[int64]) + t.Run("Float64", testHistogram[float64]) +} + +func testHistogram[N int64 | float64](t *testing.T) { + tester := &aggregatorTester[N]{ + GoroutineN: defaultGoroutines, + MeasurementN: defaultMeasurements, + CycleN: defaultCycles, + } + + incr := monoIncr + eFunc := deltaHistExpecter(incr) + t.Run("Delta", tester.Run(NewDeltaHistogram[N](histConf), incr, eFunc)) + eFunc = cumuHistExpecter(incr) + t.Run("Cumulative", tester.Run(NewCumulativeHistogram[N](histConf), incr, eFunc)) +} + +func deltaHistExpecter(incr setMap) expectFunc { + h := metricdata.Histogram{Temporality: metricdata.DeltaTemporality} + return func(m int) metricdata.Aggregation { + h.DataPoints = make([]metricdata.HistogramDataPoint, 0, len(incr)) + for a, v := range incr { + h.DataPoints = append(h.DataPoints, hPoint(a, float64(v), uint64(m))) + } + return h + } +} + +func cumuHistExpecter(incr setMap) expectFunc { + var cycle int + h := metricdata.Histogram{Temporality: metricdata.CumulativeTemporality} + return func(m int) metricdata.Aggregation { + cycle++ + h.DataPoints = make([]metricdata.HistogramDataPoint, 0, len(incr)) + for a, v := range incr { + h.DataPoints = append(h.DataPoints, hPoint(a, float64(v), uint64(cycle*m))) + } + return h + } +} + +// hPoint returns an HistogramDataPoint that started and ended now with multi +// number of measurements values v. It includes a min and max (set to v). +func hPoint(a attribute.Set, v float64, multi uint64) metricdata.HistogramDataPoint { + idx := sort.SearchFloat64s(bounds, v) + counts := make([]uint64, len(bounds)+1) + counts[idx] += multi + return metricdata.HistogramDataPoint{ + Attributes: a, + StartTime: now(), + Time: now(), + Count: multi, + Bounds: bounds, + BucketCounts: counts, + Min: &v, + Max: &v, + Sum: v * float64(multi), + } +} + +func TestBucketsBin(t *testing.T) { + b := newBuckets(3) + assertB := func(counts []uint64, count uint64, sum, min, max float64) { + assert.Equal(t, counts, b.counts) + assert.Equal(t, count, b.count) + assert.Equal(t, sum, b.sum) + assert.Equal(t, min, b.min) + assert.Equal(t, max, b.max) + } + + assertB([]uint64{0, 0, 0}, 0, 0, 0, 0) + b.bin(1, 2) + assertB([]uint64{0, 1, 0}, 1, 2, 0, 2) + b.bin(0, -1) + assertB([]uint64{1, 1, 0}, 2, 1, -1, 2) +} + +func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram) Aggregator[N], getBounds func(Aggregator[N]) []float64) func(t *testing.T) { + b := []float64{0, 1, 2} + cpB := make([]float64, len(b)) + copy(cpB, b) + + a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b}) + return func(t *testing.T) { + require.Equal(t, cpB, getBounds(a)) + + b[0] = 10 + assert.Equal(t, cpB, getBounds(a), "modifying the bounds argument should not change the bounds") + + a.Aggregate(5, alice) + hdp := a.Aggregation().(metricdata.Histogram).DataPoints[0] + hdp.Bounds[1] = 10 + assert.Equal(t, cpB, getBounds(a), "modifying the Aggregation bounds should not change the bounds") + } +} + +func TestHistogramImmutableBounds(t *testing.T) { + t.Run("Delta", testHistImmutableBounds[int64]( + NewDeltaHistogram[int64], + func(a Aggregator[int64]) []float64 { + deltaH := a.(*deltaHistogram[int64]) + return deltaH.bounds + }, + )) + + t.Run("Cumulative", testHistImmutableBounds[int64]( + NewCumulativeHistogram[int64], + func(a Aggregator[int64]) []float64 { + cumuH := a.(*cumulativeHistogram[int64]) + return cumuH.bounds + }, + )) +} + +func TestCumulativeHistogramImutableCounts(t *testing.T) { + a := NewCumulativeHistogram[int64](histConf) + a.Aggregate(5, alice) + hdp := a.Aggregation().(metricdata.Histogram).DataPoints[0] + + cumuH := a.(*cumulativeHistogram[int64]) + require.Equal(t, hdp.BucketCounts, cumuH.values[alice].counts) + + cpCounts := make([]uint64, len(hdp.BucketCounts)) + copy(cpCounts, hdp.BucketCounts) + hdp.BucketCounts[0] = 10 + assert.Equal(t, cpCounts, cumuH.values[alice].counts, "modifying the Aggregator bucket counts should not change the Aggregator") +} + +func TestDeltaHistogramReset(t *testing.T) { + t.Cleanup(mockTime(now)) + + expect := metricdata.Histogram{Temporality: metricdata.DeltaTemporality} + a := NewDeltaHistogram[int64](histConf) + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) + + a.Aggregate(1, alice) + expect.DataPoints = []metricdata.HistogramDataPoint{hPoint(alice, 1, 1)} + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) + + // The attr set should be forgotten once Aggregations is called. + expect.DataPoints = nil + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) + + // Aggregating another set should not affect the original (alice). + a.Aggregate(1, bob) + expect.DataPoints = []metricdata.HistogramDataPoint{hPoint(bob, 1, 1)} + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) +} + +func BenchmarkHistogram(b *testing.B) { + b.Run("Int64", benchmarkHistogram[int64]) + b.Run("Float64", benchmarkHistogram[float64]) +} + +func benchmarkHistogram[N int64 | float64](b *testing.B) { + factory := func() Aggregator[N] { return NewDeltaHistogram[N](histConf) } + b.Run("Delta", benchmarkAggregator(factory)) + factory = func() Aggregator[N] { return NewCumulativeHistogram[N](histConf) } + b.Run("Cumulative", benchmarkAggregator(factory)) +}