Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delta/cumulative histogram implementation #3045

Merged
merged 10 commits into from
Aug 2, 2022
192 changes: 177 additions & 15 deletions sdk/metric/internal/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,94 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sort"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// histogram summarizes a set of measurements as an histogram with
type buckets struct {
counts []uint64
count uint64
sum float64
min, max float64
}

// newBuckets returns buckets with n bins.
func newBuckets(n int) *buckets {
return &buckets{counts: make([]uint64, n)}
}

func (b *buckets) bin(idx int, value float64) {
b.counts[idx]++
b.count++
b.sum += value
if value < b.min {
b.min = value
} else if value > b.max {
b.max = value
}
}

// histValues summarizes a set of measurements as an histValues with
// explicitly defined buckets.
type histogram[N int64 | float64] struct {
// TODO(#2970): implement.
type histValues[N int64 | float64] struct {
bounds []float64

values map[attribute.Set]*buckets
valuesMu sync.Mutex
}

func (s *histogram[N]) Aggregate(value N, attr attribute.Set) {
// TODO(#2970): implement.
func newHistValues[N int64 | float64](bounds []float64) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
// complete control over the fix.
b := make([]float64, len(bounds))
copy(b, bounds)
sort.Float64s(b)
return &histValues[N]{
bounds: b,
values: make(map[attribute.Set]*buckets),
}
}

// Aggregate records the measurement value, scoped by attr, and aggregates it
// into a histogram.
func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
// Accept all types to satisfy the Aggregator interface. However, since
// the Aggregation produced by this Aggregator is only float64, convert
// here to only use this type.
v := float64(value)

// This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the buckets in that the length of buckets
// is len(s.bounds)+1, with the last bucket representing:
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, v)

s.valuesMu.Lock()
defer s.valuesMu.Unlock()

b, ok := s.values[attr]
if !ok {
// N+1 buckets. For example:
//
// bounds = [0, 5, 10]
//
// Then,
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets(len(s.bounds) + 1)
// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = v, v
s.values[attr] = b
}
b.bin(idx, v)
}

// NewDeltaHistogram returns an Aggregator that summarizes a set of
Expand All @@ -41,20 +116,59 @@ func (s *histogram[N]) Aggregate(value N, attr attribute.Set) {
// Aggregator's Aggregations method is called it will reset all histogram
// counts to zero.
func NewDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] {
return &deltaHistogram[N]{}
return &deltaHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries),
noMinMax: cfg.NoMinMax,
start: now(),
}
}

// deltaHistogram summarizes a set of measurements made in a single
// aggregation cycle as an histogram with explicitly defined buckets.
type deltaHistogram[N int64 | float64] struct {
histogram[N]
*histValues[N]

// TODO(#2970): implement.
noMinMax bool
start time.Time
}

func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation {
// TODO(#2970): implement.
return nil
h := metricdata.Histogram{Temporality: metricdata.DeltaTemporality}

s.valuesMu.Lock()
defer s.valuesMu.Unlock()

if len(s.values) == 0 {
return h
}

// Do not allow modification of our copy of bounds.
bounds := make([]float64, len(s.bounds))
copy(bounds, s.bounds)
t := now()
h.DataPoints = make([]metricdata.HistogramDataPoint, 0, len(s.values))
for a, b := range s.values {
hdp := metricdata.HistogramDataPoint{
Attributes: a,
StartTime: s.start,
Time: t,
Count: b.count,
Bounds: bounds,
BucketCounts: b.counts,
Sum: b.sum,
}
if !s.noMinMax {
hdp.Min = &b.min
hdp.Max = &b.max
}
h.DataPoints = append(h.DataPoints, hdp)

// Unused attribute sets do not report.
delete(s.values, a)
}
// The delta collection cycle resets.
s.start = t
return h
}

// NewCumulativeHistogram returns an Aggregator that summarizes a set of
Expand All @@ -64,18 +178,66 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation {
// the bucketed counts of all values aggregated since the returned Aggregator
// was created.
func NewCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] {
return &cumulativeHistogram[N]{}
return &cumulativeHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries),
noMinMax: cfg.NoMinMax,
start: now(),
}
}

// cumulativeHistogram summarizes a set of measurements made over all
// aggregation cycles as an histogram with explicitly defined buckets.
type cumulativeHistogram[N int64 | float64] struct {
histogram[N]
*histValues[N]

// TODO(#2970): implement.
noMinMax bool
start time.Time
}

func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation {
// TODO(#2970): implement.
return nil
h := metricdata.Histogram{Temporality: metricdata.CumulativeTemporality}

s.valuesMu.Lock()
defer s.valuesMu.Unlock()

if len(s.values) == 0 {
return h
}

// Do not allow modification of our copy of bounds.
bounds := make([]float64, len(s.bounds))
copy(bounds, s.bounds)
t := now()
h.DataPoints = make([]metricdata.HistogramDataPoint, 0, len(s.values))
for a, b := range s.values {
// The HistogramDataPoint field values returned need to be copies of
// the buckets value as we will keep updating them.
//
// TODO (#3047): Making copies for bounds and counts incurs a large
// memory allocation footprint. Alternatives should be explored.
counts := make([]uint64, len(b.counts))
copy(counts, b.counts)

hdp := metricdata.HistogramDataPoint{
Attributes: a,
StartTime: s.start,
Time: t,
Count: b.count,
Bounds: bounds,
BucketCounts: counts,
Sum: b.sum,
}
if !s.noMinMax {
// Similar to counts, make a copy.
min, max := b.min, b.max
hdp.Min = &min
hdp.Max = &max
}
h.DataPoints = append(h.DataPoints, hdp)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
return h
}
Loading