Skip to content

Commit

Permalink
Merge branch 'new_sdk/main' into stdoutmetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias authored Aug 4, 2022
2 parents 54a3e24 + eefb277 commit e868287
Show file tree
Hide file tree
Showing 8 changed files with 513 additions and 30 deletions.
6 changes: 6 additions & 0 deletions sdk/metric/internal/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// now is used to return the current local time while allowing tests to
// override the the default time.Now function.
var now = time.Now

// Aggregator forms an aggregation from a collection of recorded measurements.
type Aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/internal/aggregator_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p *syncInt64Provider) Counter(string, ...instrument.Option) (syncint64.Cou
// and View configuration. Assume here these are determined to be a
// cumulative sum.

aggregator := NewCumulativeSum[int64]()
aggregator := NewCumulativeSum[int64](true)
count := inst{aggregateFunc: aggregator.Aggregate}

p.aggregations = append(p.aggregations, aggregator.Aggregation())
Expand Down
3 changes: 2 additions & 1 deletion sdk/metric/internal/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ var (
bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))

monoIncr = setMap{alice: 1, bob: 10, carol: 2}
monoIncr = setMap{alice: 1, bob: 10, carol: 2}
nonMonoIncr = setMap{alice: 1, bob: -1, carol: 2}

// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)
Expand Down
67 changes: 67 additions & 0 deletions sdk/metric/internal/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// filter is an aggregator that applies attribute filter when Aggregating. filters
// do not have any backing memory, and must be constructed with a backing Aggregator.
type filter[N int64 | float64] struct {
filter func(attribute.Set) attribute.Set
aggregator Aggregator[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

// NewFilter wraps an Aggregator with an attribute filtering function.
func NewFilter[N int64 | float64](agg Aggregator[N], fn func(attribute.Set) attribute.Set) Aggregator[N] {
if fn == nil {
return agg
}
return &filter[N]{
filter: fn,
aggregator: agg,
seen: map[attribute.Set]attribute.Set{},
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr = f.filter(attr)
f.seen[attr] = fAttr
}
f.aggregator.Aggregate(measurement, fAttr)
}

// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *filter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}
202 changes: 202 additions & 0 deletions sdk/metric/internal/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// This is an aggregator that has a stable output, used for testing. It does not
// follow any spec prescribed aggregation.
type testStableAggregator[N int64 | float64] struct {
sync.Mutex
values []metricdata.DataPoint[N]
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (a *testStableAggregator[N]) Aggregate(measurement N, attr attribute.Set) {
a.Lock()
defer a.Unlock()

a.values = append(a.values, metricdata.DataPoint[N]{
Attributes: attr,
Value: measurement,
})
}

// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (a *testStableAggregator[N]) Aggregation() metricdata.Aggregation {
return metricdata.Gauge[N]{
DataPoints: a.values,
}
}

func testNewFilterNoFilter[N int64 | float64](t *testing.T, agg Aggregator[N]) {
filter := NewFilter(agg, nil)
assert.Equal(t, agg, filter)
}

func testNewFilter[N int64 | float64](t *testing.T, agg Aggregator[N]) {
f := NewFilter(agg, testAttributeFilter)
require.IsType(t, &filter[N]{}, f)
filt := f.(*filter[N])
assert.Equal(t, agg, filt.aggregator)
}

func testAttributeFilter(input attribute.Set) attribute.Set {
out, _ := input.Filter(func(kv attribute.KeyValue) bool {
return kv.Key == "power-level"
})
return out
}

func TestNewFilter(t *testing.T) {
t.Run("int64", func(t *testing.T) {
agg := &testStableAggregator[int64]{}
testNewFilterNoFilter[int64](t, agg)
testNewFilter[int64](t, agg)
})
t.Run("float64", func(t *testing.T) {
agg := &testStableAggregator[float64]{}
testNewFilterNoFilter[float64](t, agg)
testNewFilter[float64](t, agg)
})
}

func testDataPoint[N int64 | float64](attr attribute.Set) metricdata.DataPoint[N] {
return metricdata.DataPoint[N]{
Attributes: attr,
Value: 1,
}
}

func testFilterAggregate[N int64 | float64](t *testing.T) {
testCases := []struct {
name string
inputAttr []attribute.Set
output []metricdata.DataPoint[N]
}{
{
name: "Will filter all out",
inputAttr: []attribute.Set{
attribute.NewSet(
attribute.String("foo", "bar"),
attribute.Float64("lifeUniverseEverything", 42.0),
),
},
output: []metricdata.DataPoint[N]{
testDataPoint[N](*attribute.EmptySet()),
},
},
{
name: "Will keep appropriate attributes",
inputAttr: []attribute.Set{
attribute.NewSet(
attribute.String("foo", "bar"),
attribute.Int("power-level", 9001),
attribute.Float64("lifeUniverseEverything", 42.0),
),
attribute.NewSet(
attribute.String("foo", "bar"),
attribute.Int("power-level", 9001),
),
},
output: []metricdata.DataPoint[N]{
// A real Aggregator will combine these, the testAggregator doesn't for list stability.
testDataPoint[N](attribute.NewSet(attribute.Int("power-level", 9001))),
testDataPoint[N](attribute.NewSet(attribute.Int("power-level", 9001))),
},
},
{
name: "Will combine Aggregations",
inputAttr: []attribute.Set{
attribute.NewSet(
attribute.String("foo", "bar"),
),
attribute.NewSet(
attribute.Float64("lifeUniverseEverything", 42.0),
),
},
output: []metricdata.DataPoint[N]{
// A real Aggregator will combine these, the testAggregator doesn't for list stability.
testDataPoint[N](*attribute.EmptySet()),
testDataPoint[N](*attribute.EmptySet()),
},
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
f := NewFilter[N](&testStableAggregator[N]{}, testAttributeFilter)
for _, set := range tt.inputAttr {
f.Aggregate(1, set)
}
out := f.Aggregation().(metricdata.Gauge[N])
assert.Equal(t, tt.output, out.DataPoints)
})
}
}

func TestFilterAggregate(t *testing.T) {
t.Run("int64", func(t *testing.T) {
testFilterAggregate[int64](t)
})
t.Run("float64", func(t *testing.T) {
testFilterAggregate[float64](t)
})
}

func testFilterConcurrent[N int64 | float64](t *testing.T) {
f := NewFilter[N](&testStableAggregator[N]{}, testAttributeFilter)
wg := &sync.WaitGroup{}
wg.Add(2)

go func() {
f.Aggregate(1, attribute.NewSet(
attribute.String("foo", "bar"),
))
wg.Done()
}()

go func() {
f.Aggregate(1, attribute.NewSet(
attribute.Int("power-level", 9001),
))
wg.Done()
}()

wg.Wait()
}

func TestFilterConcurrent(t *testing.T) {
t.Run("int64", func(t *testing.T) {
testFilterConcurrent[int64](t)
})
t.Run("float64", func(t *testing.T) {
testFilterConcurrent[float64](t)
})
}
4 changes: 0 additions & 4 deletions sdk/metric/internal/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// now is used to return the current local time while allowing tests to
// override the the default time.Now function.
var now = time.Now

// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
timestamp time.Time
Expand Down
Loading

0 comments on commit e868287

Please sign in to comment.