Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add implementation of Sum aggregators #3000

Merged
merged 42 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
405c377
Implement the sum aggregators
MrAlias Jul 7, 2022
a138c5f
Add unit tests for delta/cumulative sums
MrAlias Jul 7, 2022
4ba0c4c
Add benchmarks
MrAlias Jul 7, 2022
f4237c1
Merge sum tests into one
MrAlias Jul 7, 2022
1397f4c
Remove unused start time from cumulative sum
MrAlias Jul 7, 2022
31adb7f
Refactor benchmark tests
MrAlias Jul 7, 2022
3ad3c89
goimports
MrAlias Jul 8, 2022
4b78904
Move timestamp out of lock
MrAlias Jul 8, 2022
090fb3b
Refactor testing
MrAlias Jul 8, 2022
1166395
Fix spelling mistake
MrAlias Jul 8, 2022
0a01d06
Name param of expectFunc
MrAlias Jul 8, 2022
9e9a0ad
Reset delta sum to zero instead of delete
MrAlias Jul 8, 2022
2b1741b
Revert to deleting unused attr sets
MrAlias Jul 11, 2022
2399af3
Refactor testing to allow use across other aggs
MrAlias Jul 11, 2022
45b07ed
Add TODO to bound cumulative sum mem usage
MrAlias Jul 11, 2022
ee4ce7b
Fix misspelling
MrAlias Jul 11, 2022
f125d53
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 11, 2022
4311369
Unify aggregator benchmark code in aggregator_test
MrAlias Jul 11, 2022
02cdb3a
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 21, 2022
d483c2a
Use generic DataPoint value
MrAlias Jul 21, 2022
5f72d60
Fix assertion_fail_test.go
MrAlias Jul 22, 2022
b5732c0
Merge branch 'generic-metricdata' into sum-agg-impl
MrAlias Jul 22, 2022
17b4e8f
Use generic metricdata types
MrAlias Jul 22, 2022
e48ad68
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 25, 2022
dcbfd9a
Fix tests
MrAlias Jul 25, 2022
292898c
Fix benchmarks
MrAlias Jul 25, 2022
b911e60
Fix lint
MrAlias Jul 26, 2022
86971db
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 26, 2022
840ca2e
Update sum documentation
MrAlias Jul 26, 2022
bcf2fc5
Remove leftover encapsulating test run
MrAlias Jul 26, 2022
1fe3558
Use t.Cleanup to mock time
MrAlias Jul 26, 2022
16359dd
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 27, 2022
3c7a389
Consolidate expecter logic into funcs
MrAlias Jul 27, 2022
2745e1c
Move errNegVal closer to use
MrAlias Jul 27, 2022
c4ad8ac
Run the agg test
MrAlias Jul 27, 2022
e7b779f
Add tests for monotonic sum Aggregate err
MrAlias Jul 27, 2022
cfd7d87
Run make lint
MrAlias Jul 27, 2022
4db6b90
Make monotonic an arg of creation funcs
MrAlias Jul 27, 2022
f01d942
Remove Aggregate monotonic validation
MrAlias Jul 27, 2022
6fed240
Rename sum to valueMap
MrAlias Jul 27, 2022
b2486fa
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 28, 2022
672e8b2
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Aug 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/metric/internal/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// now is used to return the current local time while allowing tests to
// override the the default time.Now function.
var now = time.Now

// Aggregator forms an aggregation from a collection of recorded measurements.
type Aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/internal/aggregator_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p *syncInt64Provider) Counter(string, ...instrument.Option) (syncint64.Cou
// and View configuration. Assume here these are determined to be a
// cumulative sum.

aggregator := NewCumulativeSum[int64]()
aggregator := NewCumulativeSum[int64](true)
count := inst{aggregateFunc: aggregator.Aggregate}

p.aggregations = append(p.aggregations, aggregator.Aggregation())
Expand Down
3 changes: 2 additions & 1 deletion sdk/metric/internal/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ var (
bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))

monoIncr = setMap{alice: 1, bob: 10, carol: 2}
monoIncr = setMap{alice: 1, bob: 10, carol: 2}
nonMonoIncr = setMap{alice: 1, bob: -1, carol: 2}

// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)
Expand Down
4 changes: 0 additions & 4 deletions sdk/metric/internal/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// now is used to return the current local time while allowing tests to
// override the the default time.Now function.
var now = time.Now

// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
timestamp time.Time
Expand Down
124 changes: 100 additions & 24 deletions sdk/metric/internal/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,139 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// sum summarizes a set of measurements as their arithmetic sum.
type sum[N int64 | float64] struct {
// TODO(#2972): implement.
// valueMap is the storage for all sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
values map[attribute.Set]N
}

func newValueMap[N int64 | float64]() *valueMap[N] {
return &valueMap[N]{values: make(map[attribute.Set]N)}
}

func (s *sum[N]) Aggregate(value N, attr attribute.Set) {
// TODO(#2972): implement.
func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
s.values[attr] += value
s.Unlock()
}

// NewDeltaSum returns an Aggregator that summarizes a set of measurements as
// their arithmetic sum. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in.
//
// The monotonic value is used to communicate the produced Aggregation is
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregations method is called it will reset all sums to zero.
func NewDeltaSum[N int64 | float64]() Aggregator[N] {
// TODO(#2972): implement.
return &deltaSum[N]{}
// Aggregator's Aggregation method is called it will reset all sums to zero.
func NewDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &deltaSum[N]{
valueMap: newValueMap[N](),
monotonic: monotonic,
start: now(),
}
}

// deltaSum summarizes a set of measurements made in a single aggregation
// cycle as their arithmetic sum.
type deltaSum[N int64 | float64] struct {
sum[N]
*valueMap[N]

// TODO(#2972): implement.
monotonic bool
start time.Time
}

func (s *deltaSum[N]) Aggregation() metricdata.Aggregation {
// TODO(#2972): implement.
return nil
out := metricdata.Sum[N]{
Temporality: metricdata.DeltaTemporality,
IsMonotonic: s.monotonic,
}

s.Lock()
defer s.Unlock()

if len(s.values) == 0 {
return out
}

t := now()
out.DataPoints = make([]metricdata.DataPoint[N], 0, len(s.values))
for attr, value := range s.values {
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr,
StartTime: s.start,
Time: t,
Value: value,
})
// Unused attribute sets do not report.
delete(s.values, attr)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}
// The delta collection cycle resets.
s.start = t
return out
}

// NewCumulativeSum returns an Aggregator that summarizes a set of
// measurements as their arithmetic sum. Each sum is scoped by attributes.
// measurements as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
//
// The monotonic value is used to communicate the produced Aggregation is
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// Each aggregation cycle builds from the previous, the sums are the
// arithmetic sum of all values aggregated since the returned Aggregator was
// created.
func NewCumulativeSum[N int64 | float64]() Aggregator[N] {
// TODO(#2972): implement.
return &cumulativeSum[N]{}
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregation method is called it will reset all sums to zero.
func NewCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &cumulativeSum[N]{
valueMap: newValueMap[N](),
monotonic: monotonic,
start: now(),
}
}

// cumulativeSum summarizes a set of measurements made over all aggregation
// cycles as their arithmetic sum.
type cumulativeSum[N int64 | float64] struct {
sum[N]
*valueMap[N]

// TODO(#2972): implement.
monotonic bool
start time.Time
}

func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
// TODO(#2972): implement.
return nil
out := metricdata.Sum[N]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: s.monotonic,
}

s.Lock()
defer s.Unlock()

if len(s.values) == 0 {
return out
}

t := now()
out.DataPoints = make([]metricdata.DataPoint[N], 0, len(s.values))
for attr, value := range s.values {
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr,
StartTime: s.start,
Time: t,
Value: value,
})
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
return out
}
135 changes: 135 additions & 0 deletions sdk/metric/internal/sum_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"testing"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)

func TestSum(t *testing.T) {
t.Cleanup(mockTime(now))
t.Run("Int64", testSum[int64])
t.Run("Float64", testSum[float64])
}

func testSum[N int64 | float64](t *testing.T) {
tester := &aggregatorTester[N]{
GoroutineN: defaultGoroutines,
MeasurementN: defaultMeasurements,
CycleN: defaultCycles,
}

t.Run("Delta", func(t *testing.T) {
incr, mono := monoIncr, true
eFunc := deltaExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewDeltaSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
eFunc = deltaExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewDeltaSum[N](mono), incr, eFunc))
})

t.Run("Cumulative", func(t *testing.T) {
incr, mono := monoIncr, true
eFunc := cumuExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
eFunc = cumuExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc))
})
}

func deltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono}
return func(m int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point[N](a, N(v*m)))
}
return sum
}
}

func cumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
var cycle int
sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono}
return func(m int) metricdata.Aggregation {
cycle++
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point[N](a, N(v*cycle*m)))
}
return sum
}
}

// point returns a DataPoint that started and ended now.
func point[N int64 | float64](a attribute.Set, v N) metricdata.DataPoint[N] {
return metricdata.DataPoint[N]{
Attributes: a,
StartTime: now(),
Time: now(),
Value: N(v),
}
}

func testDeltaSumReset[N int64 | float64](t *testing.T) {
t.Cleanup(mockTime(now))

expect := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality}
a := NewDeltaSum[N](false)
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())

a.Aggregate(1, alice)
expect.DataPoints = []metricdata.DataPoint[N]{point[N](alice, 1)}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())

// The attr set should be forgotten once Aggregations is called.
expect.DataPoints = nil
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())

// Aggregating another set should not affect the original (alice).
a.Aggregate(1, bob)
expect.DataPoints = []metricdata.DataPoint[N]{point[N](bob, 1)}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())
}

func TestDeltaSumReset(t *testing.T) {
t.Run("Int64", testDeltaSumReset[int64])
t.Run("Float64", testDeltaSumReset[float64])
}

func BenchmarkSum(b *testing.B) {
b.Run("Int64", benchmarkSum[int64])
b.Run("Float64", benchmarkSum[float64])
}

func benchmarkSum[N int64 | float64](b *testing.B) {
// The monotonic argument is only used to annotate the Sum returned from
// the Aggregation method. It should not have an effect on operational
// performance, therefore, only monotonic=false is benchmarked here.
factory := func() Aggregator[N] { return NewDeltaSum[N](false) }
b.Run("Delta", benchmarkAggregator(factory))
factory = func() Aggregator[N] { return NewCumulativeSum[N](false) }
b.Run("Cumulative", benchmarkAggregator(factory))
}