Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add implementation of last-value aggregator #3008

Merged
merged 9 commits into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions sdk/metric/internal/aggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"strconv"
"sync"
"testing"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)

const (
defaultGoroutines = 5
defaultMeasurements = 30
defaultCycles = 3
)

var (
alice = attribute.NewSet(attribute.String("user", "alice"), attribute.Bool("admin", true))
bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))

monoIncr = setMap{alice: 1, bob: 10, carol: 2}

// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)
staticNowFunc = func() time.Time { return staticTime }
// Pass to t.Cleanup to override the now function with staticNowFunc and
// revert once the test completes. E.g. t.Cleanup(mockTime(now)).
mockTime = func(orig func() time.Time) (cleanup func()) {
now = staticNowFunc
return func() { now = orig }
}
)

// setMap maps attribute sets to a number.
type setMap map[attribute.Set]int

// expectFunc is a function that returns an Aggregation of expected values for
// a cycle that contains m measurements (total across all goroutines). Each
// call advances the cycle.
type expectFunc func(m int) metricdata.Aggregation

// aggregatorTester runs an acceptance test on an Aggregator. It will ask an
// Aggregator to aggregate a set of values as if they were real measurements
// made MeasurementN number of times. This will be done in GoroutineN number
// of different goroutines. After the Aggregator has been asked to aggregate
// all these measurements, it is validated using a passed expecterFunc. This
// set of operation is a signle cycle, and the the aggregatorTester will run
// CycleN number of cycles.
type aggregatorTester[N int64 | float64] struct {
// GoroutineN is the number of goroutines aggregatorTester will use to run
// the test with.
GoroutineN int
// MeasurementN is the number of measurements that are made each cycle a
// goroutine runs the test.
MeasurementN int
// CycleN is the number of times a goroutine will make a set of
// measurements.
CycleN int
}

func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFunc) func(*testing.T) {
m := at.MeasurementN * at.GoroutineN
return func(t *testing.T) {
for i := 0; i < at.CycleN; i++ {
var wg sync.WaitGroup
wg.Add(at.GoroutineN)
for i := 0; i < at.GoroutineN; i++ {
go func() {
defer wg.Done()
for j := 0; j < at.MeasurementN; j++ {
for attrs, n := range incr {
a.Aggregate(N(n), attrs)
}
}
}()
}
wg.Wait()

metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation())
}
}
}

var bmarkResults metricdata.Aggregation

func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggregator[N], count int) {
attrs := make([]attribute.Set, count)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i))
}

b.Run("Aggregate", func(b *testing.B) {
agg := factory()
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
for _, attr := range attrs {
agg.Aggregate(1, attr)
}
}
bmarkResults = agg.Aggregation()
})

b.Run("Aggregations", func(b *testing.B) {
aggs := make([]Aggregator[N], b.N)
for n := range aggs {
a := factory()
for _, attr := range attrs {
a.Aggregate(1, attr)
}
aggs[n] = a
}

b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
bmarkResults = aggs[n].Aggregation()
}
})
}

func benchmarkAggregator[N int64 | float64](factory func() Aggregator[N]) func(*testing.B) {
counts := []int{1, 10, 100}
return func(b *testing.B) {
for _, n := range counts {
b.Run(strconv.Itoa(n), func(b *testing.B) {
benchmarkAggregatorN(b, factory, n)
})
}
}
}
48 changes: 43 additions & 5 deletions sdk/metric/internal/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,64 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// now is used to return the current local time while allowing tests to
// override the the default time.Now function.
var now = time.Now

// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
timestamp time.Time
value N
}

// lastValue summarizes a set of measurements as the last one made.
type lastValue[N int64 | float64] struct {
// TODO(#2971): implement.
sync.Mutex

values map[attribute.Set]datapoint[N]
}

// NewLastValue returns an Aggregator that summarizes a set of measurements as
// the last one made.
func NewLastValue[N int64 | float64]() Aggregator[N] {
return &lastValue[N]{}
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
}

func (s *lastValue[N]) Aggregate(value N, attr attribute.Set) {
// TODO(#2971): implement.
d := datapoint[N]{timestamp: now(), value: value}
s.Lock()
s.values[attr] = d
s.Unlock()
}

func (s *lastValue[N]) Aggregation() metricdata.Aggregation {
// TODO(#2971): implement.
return nil
gauge := metricdata.Gauge[N]{}

s.Lock()
defer s.Unlock()

if len(s.values) == 0 {
return gauge
}

gauge.DataPoints = make([]metricdata.DataPoint[N], 0, len(s.values))
for a, v := range s.values {
gauge.DataPoints = append(gauge.DataPoints, metricdata.DataPoint[N]{
Attributes: a,
// The event time is the only meaningful timestamp, StartTime is
// ignored.
Time: v.timestamp,
Value: v.value,
})
// Do not report stale values.
delete(s.values, a)
}
return gauge
}
91 changes: 91 additions & 0 deletions sdk/metric/internal/lastvalue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"testing"

"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)

func TestLastValue(t *testing.T) {
t.Cleanup(mockTime(now))

t.Run("Int64", testLastValue[int64]())
t.Run("Float64", testLastValue[float64]())
}

func testLastValue[N int64 | float64]() func(*testing.T) {
tester := &aggregatorTester[N]{
GoroutineN: defaultGoroutines,
MeasurementN: defaultMeasurements,
CycleN: defaultCycles,
}

eFunc := func(increments setMap) expectFunc {
data := make([]metricdata.DataPoint[N], 0, len(increments))
for a, v := range increments {
point := metricdata.DataPoint[N]{Attributes: a, Time: now(), Value: N(v)}
data = append(data, point)
}
gauge := metricdata.Gauge[N]{DataPoints: data}
return func(int) metricdata.Aggregation { return gauge }
}
incr := monoIncr
return tester.Run(NewLastValue[N](), incr, eFunc(incr))
}

func testLastValueReset[N int64 | float64](t *testing.T) {
t.Cleanup(mockTime(now))

a := NewLastValue[N]()
expect := metricdata.Gauge[N]{}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())

a.Aggregate(1, alice)
expect.DataPoints = []metricdata.DataPoint[N]{{
Attributes: alice,
Time: now(),
Value: 1,
}}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())

// The attr set should be forgotten once Aggregations is called.
expect.DataPoints = nil
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())

// Aggregating another set should not affect the original (alice).
a.Aggregate(1, bob)
expect.DataPoints = []metricdata.DataPoint[N]{{
Attributes: bob,
Time: now(),
Value: 1,
}}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())
}

func TestLastValueReset(t *testing.T) {
t.Run("Int64", testLastValueReset[int64])
t.Run("Float64", testLastValueReset[float64])
}

func BenchmarkLastValue(b *testing.B) {
b.Run("Int64", benchmarkAggregator(NewLastValue[int64]))
b.Run("Float64", benchmarkAggregator(NewLastValue[float64]))
}