diff --git a/sdk/metric/internal/filter.go b/sdk/metric/internal/filter.go new file mode 100644 index 00000000000..2407d016e90 --- /dev/null +++ b/sdk/metric/internal/filter.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "sync" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// filter is an aggregator that applies attribute filter when Aggregating. filters +// do not have any backing memory, and must be constructed with a backing Aggregator. +type filter[N int64 | float64] struct { + filter func(attribute.Set) attribute.Set + aggregator Aggregator[N] + + sync.Mutex + seen map[attribute.Set]attribute.Set +} + +// NewFilter wraps an Aggregator with an attribute filtering function. +func NewFilter[N int64 | float64](agg Aggregator[N], fn func(attribute.Set) attribute.Set) Aggregator[N] { + if fn == nil { + return agg + } + return &filter[N]{ + filter: fn, + aggregator: agg, + seen: map[attribute.Set]attribute.Set{}, + } +} + +// Aggregate records the measurement, scoped by attr, and aggregates it +// into an aggregation. +func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { + // TODO (#3006): drop stale attributes from seen. + f.Lock() + defer f.Unlock() + fAttr, ok := f.seen[attr] + if !ok { + fAttr = f.filter(attr) + f.seen[attr] = fAttr + } + f.aggregator.Aggregate(measurement, fAttr) +} + +// Aggregation returns an Aggregation, for all the aggregated +// measurements made and ends an aggregation cycle. +func (f *filter[N]) Aggregation() metricdata.Aggregation { + return f.aggregator.Aggregation() +} diff --git a/sdk/metric/internal/filter_test.go b/sdk/metric/internal/filter_test.go new file mode 100644 index 00000000000..8ce2747a375 --- /dev/null +++ b/sdk/metric/internal/filter_test.go @@ -0,0 +1,202 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// This is an aggregator that has a stable output, used for testing. It does not +// follow any spec prescribed aggregation. +type testStableAggregator[N int64 | float64] struct { + sync.Mutex + values []metricdata.DataPoint[N] +} + +// Aggregate records the measurement, scoped by attr, and aggregates it +// into an aggregation. +func (a *testStableAggregator[N]) Aggregate(measurement N, attr attribute.Set) { + a.Lock() + defer a.Unlock() + + a.values = append(a.values, metricdata.DataPoint[N]{ + Attributes: attr, + Value: measurement, + }) +} + +// Aggregation returns an Aggregation, for all the aggregated +// measurements made and ends an aggregation cycle. +func (a *testStableAggregator[N]) Aggregation() metricdata.Aggregation { + return metricdata.Gauge[N]{ + DataPoints: a.values, + } +} + +func testNewFilterNoFilter[N int64 | float64](t *testing.T, agg Aggregator[N]) { + filter := NewFilter(agg, nil) + assert.Equal(t, agg, filter) +} + +func testNewFilter[N int64 | float64](t *testing.T, agg Aggregator[N]) { + f := NewFilter(agg, testAttributeFilter) + require.IsType(t, &filter[N]{}, f) + filt := f.(*filter[N]) + assert.Equal(t, agg, filt.aggregator) +} + +func testAttributeFilter(input attribute.Set) attribute.Set { + out, _ := input.Filter(func(kv attribute.KeyValue) bool { + return kv.Key == "power-level" + }) + return out +} + +func TestNewFilter(t *testing.T) { + t.Run("int64", func(t *testing.T) { + agg := &testStableAggregator[int64]{} + testNewFilterNoFilter[int64](t, agg) + testNewFilter[int64](t, agg) + }) + t.Run("float64", func(t *testing.T) { + agg := &testStableAggregator[float64]{} + testNewFilterNoFilter[float64](t, agg) + testNewFilter[float64](t, agg) + }) +} + +func testDataPoint[N int64 | float64](attr attribute.Set) metricdata.DataPoint[N] { + return metricdata.DataPoint[N]{ + Attributes: attr, + Value: 1, + } +} + +func testFilterAggregate[N int64 | float64](t *testing.T) { + testCases := []struct { + name string + inputAttr []attribute.Set + output []metricdata.DataPoint[N] + }{ + { + name: "Will filter all out", + inputAttr: []attribute.Set{ + attribute.NewSet( + attribute.String("foo", "bar"), + attribute.Float64("lifeUniverseEverything", 42.0), + ), + }, + output: []metricdata.DataPoint[N]{ + testDataPoint[N](*attribute.EmptySet()), + }, + }, + { + name: "Will keep appropriate attributes", + inputAttr: []attribute.Set{ + attribute.NewSet( + attribute.String("foo", "bar"), + attribute.Int("power-level", 9001), + attribute.Float64("lifeUniverseEverything", 42.0), + ), + attribute.NewSet( + attribute.String("foo", "bar"), + attribute.Int("power-level", 9001), + ), + }, + output: []metricdata.DataPoint[N]{ + // A real Aggregator will combine these, the testAggregator doesn't for list stability. + testDataPoint[N](attribute.NewSet(attribute.Int("power-level", 9001))), + testDataPoint[N](attribute.NewSet(attribute.Int("power-level", 9001))), + }, + }, + { + name: "Will combine Aggregations", + inputAttr: []attribute.Set{ + attribute.NewSet( + attribute.String("foo", "bar"), + ), + attribute.NewSet( + attribute.Float64("lifeUniverseEverything", 42.0), + ), + }, + output: []metricdata.DataPoint[N]{ + // A real Aggregator will combine these, the testAggregator doesn't for list stability. + testDataPoint[N](*attribute.EmptySet()), + testDataPoint[N](*attribute.EmptySet()), + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + f := NewFilter[N](&testStableAggregator[N]{}, testAttributeFilter) + for _, set := range tt.inputAttr { + f.Aggregate(1, set) + } + out := f.Aggregation().(metricdata.Gauge[N]) + assert.Equal(t, tt.output, out.DataPoints) + }) + } +} + +func TestFilterAggregate(t *testing.T) { + t.Run("int64", func(t *testing.T) { + testFilterAggregate[int64](t) + }) + t.Run("float64", func(t *testing.T) { + testFilterAggregate[float64](t) + }) +} + +func testFilterConcurrent[N int64 | float64](t *testing.T) { + f := NewFilter[N](&testStableAggregator[N]{}, testAttributeFilter) + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + f.Aggregate(1, attribute.NewSet( + attribute.String("foo", "bar"), + )) + wg.Done() + }() + + go func() { + f.Aggregate(1, attribute.NewSet( + attribute.Int("power-level", 9001), + )) + wg.Done() + }() + + wg.Wait() +} + +func TestFilterConcurrent(t *testing.T) { + t.Run("int64", func(t *testing.T) { + testFilterConcurrent[int64](t) + }) + t.Run("float64", func(t *testing.T) { + testFilterConcurrent[float64](t) + }) +}