From d097adfb9bd233774111b2407c13aabde42ce124 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Tue, 26 Jul 2022 15:40:20 +0000 Subject: [PATCH 1/8] Adds a pipelineRegistry to manage creating aggregators. --- sdk/metric/pipeline.go | 134 +++++++++++- sdk/metric/pipeline_registry_test.go | 292 +++++++++++++++++++++++++++ 2 files changed, 424 insertions(+), 2 deletions(-) create mode 100644 sdk/metric/pipeline_registry_test.go diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index f7a85a91bab..c8b2a11bf74 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -20,11 +20,15 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" "fmt" + "strings" "sync" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -110,8 +114,8 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations)) for scope, instruments := range p.aggregations { metrics := make([]metricdata.Metrics, 0, len(instruments)) - for inst, aggregation := range instruments { - data := aggregation.Aggregation() + for inst, agg := range instruments { + data := agg.Aggregation() if data != nil { metrics = append(metrics, metricdata.Metrics{ Name: inst.name, @@ -134,3 +138,129 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err ScopeMetrics: sm, }, nil } + +// pipelineRegistry manages creating pipelines, and aggregators. The meters can +// retrieve new aggregators from a registry. +type pipelineRegistry struct { + views map[Reader][]view.View + pipelines map[Reader]*pipeline +} + +func newPipelineRegistry(views map[Reader][]view.View) *pipelineRegistry { + reg := &pipelineRegistry{ + views: views, + pipelines: map[Reader]*pipeline{}, + } + for rdr := range reg.views { + pipe := &pipeline{} + rdr.register(pipe) + reg.pipelines[rdr] = pipe + } + return reg +} + +// createInt64Aggregators will create all backing aggregators for an instrument. +// It will return an error if an instrument is registered more than once. +// Note: There may be returned aggregators with an error. +func (reg *pipelineRegistry) createInt64Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[int64], error) { + var aggs []internal.Aggregator[int64] + + errs := &multierror{} + for rdr, views := range reg.views { + pipe := reg.pipelines[rdr] + rdrAggs := createAggregators[int64](rdr, views, inst) + for inst, agg := range rdrAggs { + err := pipe.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) + if err != nil { + errs.append(err) + } + aggs = append(aggs, agg) + } + } + return aggs, errs.errorOrNil() +} + +// createFloat64Aggregators will create all backing aggregators for an instrument. +// It will return an error if an instrument is registered more than once. +// Note: There may be returned aggregators with an error. +func (reg *pipelineRegistry) createFloat64Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[float64], error) { + var aggs []internal.Aggregator[float64] + + errs := &multierror{} + for rdr, views := range reg.views { + pipe := reg.pipelines[rdr] + rdrAggs := createAggregators[float64](rdr, views, inst) + for inst, agg := range rdrAggs { + err := pipe.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) + if err != nil { + errs.append(err) + } + aggs = append(aggs, agg) + } + } + return aggs, errs.errorOrNil() +} + +func (reg *pipelineRegistry) registerCallback(fn func(context.Context)) { + for _, pipe := range reg.pipelines { + pipe.addCallback(fn) + } +} + +type multierror struct { + errors []string +} + +func (m *multierror) errorOrNil() error { + if len(m.errors) == 0 { + return nil + } + return fmt.Errorf(strings.Join(m.errors, "; ")) +} +func (m *multierror) append(err error) { + m.errors = append(m.errors, err.Error()) +} + +func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) map[view.Instrument]internal.Aggregator[N] { + aggs := map[view.Instrument]internal.Aggregator[N]{} + for _, v := range views { + inst, match := v.TransformInstrument(inst) + if inst.Aggregation == nil { + inst.Aggregation = rdr.aggregation(inst.Kind) + } + if match { + if _, ok := aggs[inst]; ok { + continue + } + agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind)) + if agg != nil { + // TODO (#3011): If filtering is done at the instrument level add here. + // This is where the aggregator and the view are both in scope. + aggs[inst] = agg + } + } + } + return aggs +} + +// createAggregator takes the config (Aggregation and Temporality) and produces a memory backed Aggregator. +// TODO (#3011): If filterting is done by the Aggregator it should be passed here. +func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporality metricdata.Temporality) internal.Aggregator[N] { + switch agg := agg.(type) { + case aggregation.Drop: + return nil + case aggregation.LastValue: + return internal.NewLastValue[N]() + case aggregation.Sum: + if temporality == metricdata.CumulativeTemporality { + return internal.NewCumulativeSum[N]() + } + return internal.NewDeltaSum[N]() + case aggregation.ExplicitBucketHistogram: + if temporality == metricdata.CumulativeTemporality { + return internal.NewCumulativeHistogram[N](agg) + } + return internal.NewDeltaHistogram[N](agg) + } + return nil +} diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go new file mode 100644 index 00000000000..3a2e4c74641 --- /dev/null +++ b/sdk/metric/pipeline_registry_test.go @@ -0,0 +1,292 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/internal" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" +) + +type testReader struct { + agg aggregation.Aggregation + temp metricdata.Temporality +} + +func (t testReader) register(producer) {} +func (t testReader) temporality(view.InstrumentKind) metricdata.Temporality { return t.temp } +func (t testReader) aggregation(view.InstrumentKind) aggregation.Aggregation { return t.agg } // nolint:revive // import-shadow for method scoped by type. +func (t testReader) Collect(context.Context) (metricdata.ResourceMetrics, error) { + return metricdata.ResourceMetrics{}, nil +} +func (t testReader) ForceFlush(context.Context) error { return nil } +func (t testReader) Shutdown(context.Context) error { return nil } + +func testCreateAggregators[N int64 | float64](t *testing.T) { + changeAggView, _ := view.New( + view.MatchInstrumentName("foo"), + view.WithSetAggregation(aggregation.LastValue{}), + ) + renameView, _ := view.New( + view.MatchInstrumentName("foo"), + view.WithRename("bar"), + ) + testcases := []struct { + name string + reader Reader + views []view.View + inst view.Instrument + wantKind internal.Aggregator[N] //Aggregators should match len and types + wantLen int + }{ + { + name: "drop should return 0 aggregators", + reader: testReader{ + agg: aggregation.Drop{}, + }, + views: []view.View{{}}, + inst: view.Instrument{Name: "foo"}, + }, + { + name: "reader should set default agg", + reader: testReader{ + agg: aggregation.Sum{}, + temp: metricdata.DeltaTemporality, + }, + views: []view.View{{}}, + inst: view.Instrument{Name: "foo"}, + wantKind: internal.NewDeltaSum[N](), + wantLen: 1, + }, + { + name: "view should overwrite reader", + reader: testReader{ + agg: aggregation.Sum{}, + temp: metricdata.DeltaTemporality, + }, + views: []view.View{changeAggView}, + inst: view.Instrument{Name: "foo"}, + wantKind: internal.NewLastValue[N](), + wantLen: 1, + }, + { + name: "multiple views should create multiple aggregators", + reader: testReader{ + agg: aggregation.Sum{}, + temp: metricdata.DeltaTemporality, + }, + views: []view.View{{}, renameView}, + inst: view.Instrument{Name: "foo"}, + wantKind: internal.NewDeltaSum[N](), + wantLen: 2, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + got := createAggregators[N](tt.reader, tt.views, tt.inst) + require.Len(t, got, tt.wantLen) + for _, agg := range got { + assert.IsType(t, tt.wantKind, agg) + } + }) + } +} + +func TestCreateAggregators(t *testing.T) { + t.Run("Int64", testCreateAggregators[int64]) + t.Run("Float64", testCreateAggregators[float64]) +} + +func TestPipelineRegistryCreateAggregators(t *testing.T) { + renameView, _ := view.New( + view.MatchInstrumentName("foo"), + view.WithRename("bar"), + ) + testCases := []struct { + name string + views map[Reader][]view.View + inst view.Instrument + wantInt64Agg []internal.Aggregator[int64] // Should match len and type + wantFloat64Agg []internal.Aggregator[float64] // Should match len and type + }{ + { + name: "No views have no aggregators", + inst: view.Instrument{Name: "foo"}, + }, + { + name: "1 reader 1 view gets 1 aggregator", + inst: view.Instrument{Name: "foo"}, + views: map[Reader][]view.View{ + testReader{agg: aggregation.LastValue{}}: { + {}, + }, + }, + wantInt64Agg: []internal.Aggregator[int64]{ + internal.NewLastValue[int64](), + }, + wantFloat64Agg: []internal.Aggregator[float64]{ + internal.NewLastValue[float64](), + }, + }, + { + name: "1 reader 2 views gets 2 aggregator", + inst: view.Instrument{Name: "foo"}, + views: map[Reader][]view.View{ + testReader{agg: aggregation.LastValue{}}: { + {}, + renameView, + }, + }, + wantInt64Agg: []internal.Aggregator[int64]{ + internal.NewLastValue[int64](), + internal.NewLastValue[int64](), + }, + wantFloat64Agg: []internal.Aggregator[float64]{ + internal.NewLastValue[float64](), + internal.NewLastValue[float64](), + }, + }, + { + name: "2 readers 1 view each gets 2 aggregators", + inst: view.Instrument{Name: "foo"}, + views: map[Reader][]view.View{ + testReader{agg: aggregation.LastValue{}}: { + {}, + }, + testReader{agg: aggregation.LastValue{}, temp: metricdata.CumulativeTemporality}: { + {}, + }, + }, + wantInt64Agg: []internal.Aggregator[int64]{ + internal.NewLastValue[int64](), + internal.NewLastValue[int64](), + }, + wantFloat64Agg: []internal.Aggregator[float64]{ + internal.NewLastValue[float64](), + internal.NewLastValue[float64](), + }, + }, + { + name: "2 reader 2 views each gets 4 aggregators", + inst: view.Instrument{Name: "foo"}, + views: map[Reader][]view.View{ + testReader{agg: aggregation.LastValue{}}: { + {}, + renameView, + }, + testReader{agg: aggregation.LastValue{}, temp: metricdata.CumulativeTemporality}: { + {}, + renameView, + }, + }, + wantInt64Agg: []internal.Aggregator[int64]{ + internal.NewLastValue[int64](), + internal.NewLastValue[int64](), + internal.NewLastValue[int64](), + internal.NewLastValue[int64](), + }, + wantFloat64Agg: []internal.Aggregator[float64]{ + internal.NewLastValue[float64](), + internal.NewLastValue[float64](), + internal.NewLastValue[float64](), + internal.NewLastValue[float64](), + }, + }, + { + name: "An instrument is duplicated in two views share the same aggregator", + inst: view.Instrument{Name: "foo"}, + views: map[Reader][]view.View{ + testReader{agg: aggregation.LastValue{}}: { + {}, + {}, + }, + }, + wantInt64Agg: []internal.Aggregator[int64]{ + internal.NewLastValue[int64](), + }, + wantFloat64Agg: []internal.Aggregator[float64]{ + internal.NewLastValue[float64](), + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + reg := newPipelineRegistry(tt.views) + + intAggs, err := reg.createInt64Aggregators(tt.inst, unit.Dimensionless) + assert.NoError(t, err) + + require.Len(t, intAggs, len(tt.wantInt64Agg)) + for i, agg := range intAggs { + assert.IsType(t, tt.wantInt64Agg[i], agg) + } + + reg = newPipelineRegistry(tt.views) + + floatAggs, err := reg.createFloat64Aggregators(tt.inst, unit.Dimensionless) + assert.NoError(t, err) + + require.Len(t, floatAggs, len(tt.wantFloat64Agg)) + for i, agg := range floatAggs { + assert.IsType(t, tt.wantFloat64Agg[i], agg) + } + }) + } +} + +func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { + renameView, _ := view.New( + view.MatchInstrumentName("bar"), + view.WithRename("foo"), + ) + views := map[Reader][]view.View{ + testReader{agg: aggregation.LastValue{}}: { + {}, + renameView, + }, + } + + fooInst := view.Instrument{Name: "foo"} + barInst := view.Instrument{Name: "bar"} + + reg := newPipelineRegistry(views) + + _, err := reg.createInt64Aggregators(fooInst, unit.Dimensionless) + assert.NoError(t, err) + + intAggs, err := reg.createInt64Aggregators(barInst, unit.Dimensionless) + assert.Error(t, err) + assert.Len(t, intAggs, 2) + + reg = newPipelineRegistry(views) + _, err = reg.createFloat64Aggregators(fooInst, unit.Dimensionless) + assert.NoError(t, err) + + floatAggs, err := reg.createFloat64Aggregators(barInst, unit.Dimensionless) + assert.Error(t, err) + assert.Len(t, floatAggs, 2) +} From 96106ce301b82b2b5bc51878812162d3e2f0993a Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Wed, 27 Jul 2022 13:48:32 +0000 Subject: [PATCH 2/8] Made pipeline generic --- sdk/metric/pipeline.go | 52 +++++-------- sdk/metric/pipeline_registry_test.go | 108 ++++++++++----------------- 2 files changed, 57 insertions(+), 103 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index c8b2a11bf74..b11b26c601b 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -141,55 +141,37 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err // pipelineRegistry manages creating pipelines, and aggregators. The meters can // retrieve new aggregators from a registry. -type pipelineRegistry struct { +type pipelineRegistry[N int64 | float64] struct { views map[Reader][]view.View pipelines map[Reader]*pipeline } -func newPipelineRegistry(views map[Reader][]view.View) *pipelineRegistry { - reg := &pipelineRegistry{ - views: views, - pipelines: map[Reader]*pipeline{}, - } - for rdr := range reg.views { +func newPipelineRegistries(views map[Reader][]view.View) (*pipelineRegistry[int64], *pipelineRegistry[float64]) { + pipelines := map[Reader]*pipeline{} + for rdr := range views { pipe := &pipeline{} rdr.register(pipe) - reg.pipelines[rdr] = pipe - } - return reg -} - -// createInt64Aggregators will create all backing aggregators for an instrument. -// It will return an error if an instrument is registered more than once. -// Note: There may be returned aggregators with an error. -func (reg *pipelineRegistry) createInt64Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[int64], error) { - var aggs []internal.Aggregator[int64] - - errs := &multierror{} - for rdr, views := range reg.views { - pipe := reg.pipelines[rdr] - rdrAggs := createAggregators[int64](rdr, views, inst) - for inst, agg := range rdrAggs { - err := pipe.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) - if err != nil { - errs.append(err) - } - aggs = append(aggs, agg) + pipelines[rdr] = pipe + } + return &pipelineRegistry[int64]{ + views: views, + pipelines: pipelines, + }, &pipelineRegistry[float64]{ + views: views, + pipelines: pipelines, } - } - return aggs, errs.errorOrNil() } -// createFloat64Aggregators will create all backing aggregators for an instrument. +// createAggregators will create all backing aggregators for an instrument. // It will return an error if an instrument is registered more than once. // Note: There may be returned aggregators with an error. -func (reg *pipelineRegistry) createFloat64Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[float64], error) { - var aggs []internal.Aggregator[float64] +func (reg *pipelineRegistry[N]) createAggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { + var aggs []internal.Aggregator[N] errs := &multierror{} for rdr, views := range reg.views { pipe := reg.pipelines[rdr] - rdrAggs := createAggregators[float64](rdr, views, inst) + rdrAggs := createAggregators[N](rdr, views, inst) for inst, agg := range rdrAggs { err := pipe.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) if err != nil { @@ -201,7 +183,7 @@ func (reg *pipelineRegistry) createFloat64Aggregators(inst view.Instrument, inst return aggs, errs.errorOrNil() } -func (reg *pipelineRegistry) registerCallback(fn func(context.Context)) { +func (reg *pipelineRegistry[N]) registerCallback(fn func(context.Context)) { for _, pipe := range reg.pipelines { pipe.addCallback(fn) } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 3a2e4c74641..0b159cfe295 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -126,11 +126,10 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { view.WithRename("bar"), ) testCases := []struct { - name string - views map[Reader][]view.View - inst view.Instrument - wantInt64Agg []internal.Aggregator[int64] // Should match len and type - wantFloat64Agg []internal.Aggregator[float64] // Should match len and type + name string + views map[Reader][]view.View + inst view.Instrument + wantCount int }{ { name: "No views have no aggregators", @@ -144,12 +143,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { {}, }, }, - wantInt64Agg: []internal.Aggregator[int64]{ - internal.NewLastValue[int64](), - }, - wantFloat64Agg: []internal.Aggregator[float64]{ - internal.NewLastValue[float64](), - }, + wantCount: 1, }, { name: "1 reader 2 views gets 2 aggregator", @@ -160,14 +154,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { renameView, }, }, - wantInt64Agg: []internal.Aggregator[int64]{ - internal.NewLastValue[int64](), - internal.NewLastValue[int64](), - }, - wantFloat64Agg: []internal.Aggregator[float64]{ - internal.NewLastValue[float64](), - internal.NewLastValue[float64](), - }, + wantCount: 2, }, { name: "2 readers 1 view each gets 2 aggregators", @@ -180,14 +167,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { {}, }, }, - wantInt64Agg: []internal.Aggregator[int64]{ - internal.NewLastValue[int64](), - internal.NewLastValue[int64](), - }, - wantFloat64Agg: []internal.Aggregator[float64]{ - internal.NewLastValue[float64](), - internal.NewLastValue[float64](), - }, + wantCount: 2, }, { name: "2 reader 2 views each gets 4 aggregators", @@ -202,18 +182,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { renameView, }, }, - wantInt64Agg: []internal.Aggregator[int64]{ - internal.NewLastValue[int64](), - internal.NewLastValue[int64](), - internal.NewLastValue[int64](), - internal.NewLastValue[int64](), - }, - wantFloat64Agg: []internal.Aggregator[float64]{ - internal.NewLastValue[float64](), - internal.NewLastValue[float64](), - internal.NewLastValue[float64](), - internal.NewLastValue[float64](), - }, + wantCount: 4, }, { name: "An instrument is duplicated in two views share the same aggregator", @@ -224,37 +193,33 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { {}, }, }, - wantInt64Agg: []internal.Aggregator[int64]{ - internal.NewLastValue[int64](), - }, - wantFloat64Agg: []internal.Aggregator[float64]{ - internal.NewLastValue[float64](), - }, + wantCount: 1, }, } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - reg := newPipelineRegistry(tt.views) - - intAggs, err := reg.createInt64Aggregators(tt.inst, unit.Dimensionless) - assert.NoError(t, err) - - require.Len(t, intAggs, len(tt.wantInt64Agg)) - for i, agg := range intAggs { - assert.IsType(t, tt.wantInt64Agg[i], agg) - } + intReg, _ := newPipelineRegistries(tt.views) + testPipelineRegistryCreateAggregators(t, intReg, tt.wantCount) + _, floatReg := newPipelineRegistries(tt.views) + testPipelineRegistryCreateAggregators(t, floatReg, tt.wantCount) + }) + } +} - reg = newPipelineRegistry(tt.views) +func testPipelineRegistryCreateAggregators[N int64 | float64](t *testing.T, reg *pipelineRegistry[N], wantCount int) { + inst := view.Instrument{Name: "foo"} + want := make([]internal.Aggregator[N], wantCount) + for i := range want { + want[i] = internal.NewLastValue[N]() + } - floatAggs, err := reg.createFloat64Aggregators(tt.inst, unit.Dimensionless) - assert.NoError(t, err) + aggs, err := reg.createAggregators(inst, unit.Dimensionless) + assert.NoError(t, err) - require.Len(t, floatAggs, len(tt.wantFloat64Agg)) - for i, agg := range floatAggs { - assert.IsType(t, tt.wantFloat64Agg[i], agg) - } - }) + require.Len(t, aggs, wantCount) + for i, agg := range aggs { + assert.IsType(t, want[i], agg) } } @@ -273,20 +238,27 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { fooInst := view.Instrument{Name: "foo"} barInst := view.Instrument{Name: "bar"} - reg := newPipelineRegistry(views) + intReg, floatReg := newPipelineRegistries(views) - _, err := reg.createInt64Aggregators(fooInst, unit.Dimensionless) + _, err := intReg.createAggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) - intAggs, err := reg.createInt64Aggregators(barInst, unit.Dimensionless) + // The Rename view should error, because it creates a foo instrument. + intAggs, err := intReg.createAggregators(barInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 2) - reg = newPipelineRegistry(views) - _, err = reg.createFloat64Aggregators(fooInst, unit.Dimensionless) + // Creating a float foo instrument should error because there is an int foo instrument. + floatAggs, err := floatReg.createAggregators(fooInst, unit.Dimensionless) + assert.Error(t, err) + assert.Len(t, floatAggs, 1) + + fooInst = view.Instrument{Name: "foo-float"} + + _, err = floatReg.createAggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) - floatAggs, err := reg.createFloat64Aggregators(barInst, unit.Dimensionless) + floatAggs, err = floatReg.createAggregators(barInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 2) } From b065a5950a765d43c878796d6297e1c174944ffe Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Mon, 1 Aug 2022 20:21:45 +0000 Subject: [PATCH 3/8] Add aggregation filter to the registry. --- sdk/metric/pipeline.go | 107 ++++++++-- sdk/metric/pipeline_registry_test.go | 287 +++++++++++++++++++++++---- sdk/metric/reader_test.go | 7 + 3 files changed, 340 insertions(+), 61 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index b11b26c601b..9de46d135c5 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -19,10 +19,12 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "errors" "fmt" "strings" "sync" + "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -139,8 +141,8 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err }, nil } -// pipelineRegistry manages creating pipelines, and aggregators. The meters can -// retrieve new aggregators from a registry. +// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve +// new Aggregators from a pipelineRegistry. type pipelineRegistry[N int64 | float64] struct { views map[Reader][]view.View pipelines map[Reader]*pipeline @@ -171,9 +173,12 @@ func (reg *pipelineRegistry[N]) createAggregators(inst view.Instrument, instUnit errs := &multierror{} for rdr, views := range reg.views { pipe := reg.pipelines[rdr] - rdrAggs := createAggregators[N](rdr, views, inst) + rdrAggs, err := createAggregators[N](rdr, views, inst) + if err != nil { + errs.append(err) + } for inst, agg := range rdrAggs { - err := pipe.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) + err := pipe.addAggregator(inst.scope, inst.name, inst.description, instUnit, agg) if err != nil { errs.append(err) } @@ -183,6 +188,7 @@ func (reg *pipelineRegistry[N]) createAggregators(inst view.Instrument, instUnit return aggs, errs.errorOrNil() } +// TODO (#3053) Only register callbacks if any instrument matches in a view. func (reg *pipelineRegistry[N]) registerCallback(fn func(context.Context)) { for _, pipe := range reg.pipelines { pipe.addCallback(fn) @@ -199,30 +205,57 @@ func (m *multierror) errorOrNil() error { } return fmt.Errorf(strings.Join(m.errors, "; ")) } + func (m *multierror) append(err error) { m.errors = append(m.errors, err.Error()) } -func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) map[view.Instrument]internal.Aggregator[N] { - aggs := map[view.Instrument]internal.Aggregator[N]{} +// instrumentIdentifier is used to identify multiple instruments being mapped to the same aggregator. +// e.g. using an exact match view with a name=* view. +// You can't use a view.Instrument here because not all Aggregators are hashable. +type instrumentIdentifier struct { + scope instrumentation.Scope + name string + description string +} + +func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) (map[instrumentIdentifier]internal.Aggregator[N], error) { + aggs := map[instrumentIdentifier]internal.Aggregator[N]{} + errs := &multierror{} for _, v := range views { inst, match := v.TransformInstrument(inst) + + ident := instrumentIdentifier{ + scope: inst.Scope, + name: inst.Name, + description: inst.Description, + } + + if _, ok := aggs[ident]; ok || !match { + continue + } + if inst.Aggregation == nil { inst.Aggregation = rdr.aggregation(inst.Kind) + } else if _, ok := inst.Aggregation.(aggregation.Default); ok { + inst.Aggregation = rdr.aggregation(inst.Kind) } - if match { - if _, ok := aggs[inst]; ok { - continue - } - agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind)) - if agg != nil { - // TODO (#3011): If filtering is done at the instrument level add here. - // This is where the aggregator and the view are both in scope. - aggs[inst] = agg - } + + if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { + global.Error(err, "creating aggregator", "instrumentKind", inst.Kind, "aggregation", inst.Aggregation) + errs.append(err) + continue } + + agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind)) + if agg != nil { + // TODO (#3011): If filtering is done at the instrument level add here. + // This is where the aggregator and the view are both in scope. + aggs[ident] = agg + } + } - return aggs + return aggs, errs.errorOrNil() } // createAggregator takes the config (Aggregation and Temporality) and produces a memory backed Aggregator. @@ -246,3 +279,43 @@ func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporalit } return nil } + +var errIncompatibleAggregation error = errors.New("incompatible aggregation") + +// is aggregatorCompatible checks if the aggregation can be used by the instrument. +// Current compatibility: +// +// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram | +// |----------------------|------|-----------|-----|-----------|-----------------------| +// | Sync Counter | X | | X | X | X | +// | Sync UpDown Counter | X | | X | | | +// | Sync Histogram | X | | X | X | X | +// | Async Counter | X | | X | | | +// | Async UpDown Counter | X | | X | | | +// | Async Gauge | X | X | | | | +func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregation) error { + switch agg.(type) { + case aggregation.ExplicitBucketHistogram: + if kind == view.SyncCounter || kind == view.SyncHistogram { + return nil + } + return errIncompatibleAggregation + case aggregation.Sum: + switch kind { + case view.AsyncCounter, view.AsyncUpDownCounter, view.SyncCounter, view.SyncHistogram, view.SyncUpDownCounter: + return nil + default: + return errIncompatibleAggregation + } + case aggregation.LastValue: + if kind == view.AsyncGauge { + return nil + } + return errIncompatibleAggregation + case aggregation.Drop: + return nil + default: + // This is used passed checking for default, it should be an error at this point. + return errIncompatibleAggregation + } +} diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 0b159cfe295..1ca2100400e 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -48,48 +48,53 @@ func (t testReader) Shutdown(context.Context) error { return nil } func testCreateAggregators[N int64 | float64](t *testing.T) { changeAggView, _ := view.New( view.MatchInstrumentName("foo"), - view.WithSetAggregation(aggregation.LastValue{}), + view.WithSetAggregation(aggregation.ExplicitBucketHistogram{}), ) renameView, _ := view.New( view.MatchInstrumentName("foo"), view.WithRename("bar"), ) + defaultAggView, _ := view.New( + view.MatchInstrumentName("foo"), + view.WithSetAggregation(aggregation.Default{}), + ) + + inst := view.Instrument{ + Name: "foo", + Kind: view.SyncCounter, + } + testcases := []struct { name string reader Reader views []view.View - inst view.Instrument wantKind internal.Aggregator[N] //Aggregators should match len and types wantLen int }{ { - name: "drop should return 0 aggregators", - reader: testReader{ - agg: aggregation.Drop{}, - }, - views: []view.View{{}}, - inst: view.Instrument{Name: "foo"}, + name: "drop should return 0 aggregators", + reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), + views: []view.View{{}}, }, { - name: "reader should set default agg", - reader: testReader{ - agg: aggregation.Sum{}, - temp: metricdata.DeltaTemporality, - }, - views: []view.View{{}}, - inst: view.Instrument{Name: "foo"}, + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, wantKind: internal.NewDeltaSum[N](), wantLen: 1, }, { - name: "view should overwrite reader", - reader: testReader{ - agg: aggregation.Sum{}, - temp: metricdata.DeltaTemporality, - }, + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + wantKind: internal.NewCumulativeSum[N](), + wantLen: 1, + }, + { + name: "view should overwrite reader", + reader: NewManualReader(), views: []view.View{changeAggView}, - inst: view.Instrument{Name: "foo"}, - wantKind: internal.NewLastValue[N](), + wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { @@ -99,14 +104,15 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { temp: metricdata.DeltaTemporality, }, views: []view.View{{}, renameView}, - inst: view.Instrument{Name: "foo"}, wantKind: internal.NewDeltaSum[N](), wantLen: 2, }, } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - got := createAggregators[N](tt.reader, tt.views, tt.inst) + + got, err := createAggregators[N](tt.reader, tt.views, inst) + assert.NoError(t, err) require.Len(t, got, tt.wantLen) for _, agg := range got { assert.IsType(t, tt.wantKind, agg) @@ -125,6 +131,9 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { view.MatchInstrumentName("foo"), view.WithRename("bar"), ) + testRdr := NewManualReader() + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) + testCases := []struct { name string views map[Reader][]view.View @@ -139,7 +148,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { name: "1 reader 1 view gets 1 aggregator", inst: view.Instrument{Name: "foo"}, views: map[Reader][]view.View{ - testReader{agg: aggregation.LastValue{}}: { + testRdr: { {}, }, }, @@ -149,7 +158,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { name: "1 reader 2 views gets 2 aggregator", inst: view.Instrument{Name: "foo"}, views: map[Reader][]view.View{ - testReader{agg: aggregation.LastValue{}}: { + testRdr: { {}, renameView, }, @@ -160,10 +169,10 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { name: "2 readers 1 view each gets 2 aggregators", inst: view.Instrument{Name: "foo"}, views: map[Reader][]view.View{ - testReader{agg: aggregation.LastValue{}}: { + testRdr: { {}, }, - testReader{agg: aggregation.LastValue{}, temp: metricdata.CumulativeTemporality}: { + testRdrHistogram: { {}, }, }, @@ -173,11 +182,11 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { name: "2 reader 2 views each gets 4 aggregators", inst: view.Instrument{Name: "foo"}, views: map[Reader][]view.View{ - testReader{agg: aggregation.LastValue{}}: { + testRdr: { {}, renameView, }, - testReader{agg: aggregation.LastValue{}, temp: metricdata.CumulativeTemporality}: { + testRdrHistogram: { {}, renameView, }, @@ -188,7 +197,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { name: "An instrument is duplicated in two views share the same aggregator", inst: view.Instrument{Name: "foo"}, views: map[Reader][]view.View{ - testReader{agg: aggregation.LastValue{}}: { + testRdr: { {}, {}, }, @@ -208,19 +217,32 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { } func testPipelineRegistryCreateAggregators[N int64 | float64](t *testing.T, reg *pipelineRegistry[N], wantCount int) { - inst := view.Instrument{Name: "foo"} - want := make([]internal.Aggregator[N], wantCount) - for i := range want { - want[i] = internal.NewLastValue[N]() - } + inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} aggs, err := reg.createAggregators(inst, unit.Dimensionless) assert.NoError(t, err) require.Len(t, aggs, wantCount) - for i, agg := range aggs { - assert.IsType(t, want[i], agg) +} + +func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) + + views := map[Reader][]view.View{ + testRdrHistogram: { + {}, + }, } + intReg, floatReg := newPipelineRegistries(views) + inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} + + intAggs, err := intReg.createAggregators(inst, unit.Dimensionless) + assert.Error(t, err) + assert.Len(t, intAggs, 0) + + floatAggs, err := floatReg.createAggregators(inst, unit.Dimensionless) + assert.Error(t, err) + assert.Len(t, floatAggs, 0) } func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { @@ -229,22 +251,23 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { view.WithRename("foo"), ) views := map[Reader][]view.View{ - testReader{agg: aggregation.LastValue{}}: { + testReader{agg: aggregation.Sum{}}: { {}, renameView, }, } - fooInst := view.Instrument{Name: "foo"} - barInst := view.Instrument{Name: "bar"} + fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} + barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} intReg, floatReg := newPipelineRegistries(views) - _, err := intReg.createAggregators(fooInst, unit.Dimensionless) + intAggs, err := intReg.createAggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) + assert.Len(t, intAggs, 1) // The Rename view should error, because it creates a foo instrument. - intAggs, err := intReg.createAggregators(barInst, unit.Dimensionless) + intAggs, err = intReg.createAggregators(barInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 2) @@ -253,7 +276,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { assert.Error(t, err) assert.Len(t, floatAggs, 1) - fooInst = view.Instrument{Name: "foo-float"} + fooInst = view.Instrument{Name: "foo-float", Kind: view.SyncCounter} _, err = floatReg.createAggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) @@ -262,3 +285,179 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { assert.Error(t, err) assert.Len(t, floatAggs, 2) } + +func TestIsAggregatorCompatible(t *testing.T) { + var undefinedInstrument view.InstrumentKind + + testCases := []struct { + name string + kind view.InstrumentKind + agg aggregation.Aggregation + wantErr bool + }{ + { + name: "SyncCounter and Drop", + kind: view.SyncCounter, + agg: aggregation.Drop{}, + wantErr: false, + }, + { + name: "SyncCounter and LastValue", + kind: view.SyncCounter, + agg: aggregation.LastValue{}, + wantErr: true, + }, + { + name: "SyncCounter and Sum", + kind: view.SyncCounter, + agg: aggregation.Sum{}, + wantErr: false, + }, + { + name: "SyncCounter and ExplicitBucketHistogram", + kind: view.SyncCounter, + agg: aggregation.ExplicitBucketHistogram{}, + wantErr: false, + }, + { + name: "SyncUpDownCounter and Drop", + kind: view.SyncUpDownCounter, + agg: aggregation.Drop{}, + wantErr: false, + }, + { + name: "SyncUpDownCounter and LastValue", + kind: view.SyncUpDownCounter, + agg: aggregation.LastValue{}, + wantErr: true, + }, + { + name: "SyncUpDownCounter and Sum", + kind: view.SyncUpDownCounter, + agg: aggregation.Sum{}, + wantErr: false, + }, + { + name: "SyncUpDownCounter and ExplicitBucketHistogram", + kind: view.SyncUpDownCounter, + agg: aggregation.ExplicitBucketHistogram{}, + wantErr: true, + }, + { + name: "SyncHistogram and Drop", + kind: view.SyncHistogram, + agg: aggregation.Drop{}, + wantErr: false, + }, + { + name: "SyncHistogram and LastValue", + kind: view.SyncHistogram, + agg: aggregation.LastValue{}, + wantErr: true, + }, + { + name: "SyncHistogram and Sum", + kind: view.SyncHistogram, + agg: aggregation.Sum{}, + wantErr: false, + }, + { + name: "SyncHistogram and ExplicitBucketHistogram", + kind: view.SyncHistogram, + agg: aggregation.ExplicitBucketHistogram{}, + wantErr: false, + }, + { + name: "AsyncCounter and Drop", + kind: view.AsyncCounter, + agg: aggregation.Drop{}, + wantErr: false, + }, + { + name: "AsyncCounter and LastValue", + kind: view.AsyncCounter, + agg: aggregation.LastValue{}, + wantErr: true, + }, + { + name: "AsyncCounter and Sum", + kind: view.AsyncCounter, + agg: aggregation.Sum{}, + wantErr: false, + }, + { + name: "AsyncCounter and ExplicitBucketHistogram", + kind: view.AsyncCounter, + agg: aggregation.ExplicitBucketHistogram{}, + wantErr: true, + }, + { + name: "AsyncUpDownCounter and Drop", + kind: view.AsyncUpDownCounter, + agg: aggregation.Drop{}, + wantErr: false, + }, + { + name: "AsyncUpDownCounter and LastValue", + kind: view.AsyncUpDownCounter, + agg: aggregation.LastValue{}, + wantErr: true, + }, + { + name: "AsyncUpDownCounter and Sum", + kind: view.AsyncUpDownCounter, + agg: aggregation.Sum{}, + wantErr: false, + }, + { + name: "AsyncUpDownCounter and ExplicitBucketHistogram", + kind: view.AsyncUpDownCounter, + agg: aggregation.ExplicitBucketHistogram{}, + wantErr: true, + }, + { + name: "AsyncGauge and Drop", + kind: view.AsyncGauge, + agg: aggregation.Drop{}, + wantErr: false, + }, + { + name: "AsyncGauge and aggregation.LastValue{}", + kind: view.AsyncGauge, + agg: aggregation.LastValue{}, + wantErr: false, + }, + { + name: "AsyncGauge and Sum", + kind: view.AsyncGauge, + agg: aggregation.Sum{}, + wantErr: true, + }, + { + name: "AsyncGauge and ExplicitBucketHistogram", + kind: view.AsyncGauge, + agg: aggregation.ExplicitBucketHistogram{}, + wantErr: true, + }, + { + name: "Default aggregation should error", + kind: view.SyncCounter, + agg: aggregation.Default{}, + wantErr: true, + }, + { + name: "unknown kind should error", + kind: undefinedInstrument, + agg: aggregation.Sum{}, + wantErr: true, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + err := isAggregatorCompatible(tt.kind, tt.agg) + + assert.Equal(t, tt.wantErr, (err != nil)) + }) + } +} diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 3cc8c8c9264..2bf802ba7a0 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -216,3 +216,10 @@ func TestDefaultTemporalitySelector(t *testing.T) { assert.Equal(t, metricdata.CumulativeTemporality, DefaultTemporalitySelector(ik)) } } + +func TestReadersAreHashable(t *testing.T) { + var _ map[Reader]struct{} = map[Reader]struct{}{ + NewManualReader(): {}, + NewPeriodicReader(new(fnExporter)): {}, + } +} From 948395cc131d2c055b6510e9ff93fc56e2cee487 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Mon, 1 Aug 2022 21:28:30 +0000 Subject: [PATCH 4/8] Fix lint --- sdk/metric/pipeline.go | 5 ++--- sdk/metric/pipeline_registry_test.go | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 9de46d135c5..248d766db69 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -253,7 +253,6 @@ func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst vi // This is where the aggregator and the view are both in scope. aggs[ident] = agg } - } return aggs, errs.errorOrNil() } @@ -280,7 +279,7 @@ func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporalit return nil } -var errIncompatibleAggregation error = errors.New("incompatible aggregation") +var errIncompatibleAggregation = errors.New("incompatible aggregation") // is aggregatorCompatible checks if the aggregation can be used by the instrument. // Current compatibility: @@ -292,7 +291,7 @@ var errIncompatibleAggregation error = errors.New("incompatible aggregation") // | Sync Histogram | X | | X | X | X | // | Async Counter | X | | X | | | // | Async UpDown Counter | X | | X | | | -// | Async Gauge | X | X | | | | +// | Async Gauge | X | X | | | |. func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregation) error { switch agg.(type) { case aggregation.ExplicitBucketHistogram: diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 1ca2100400e..c84a32164e1 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -110,7 +110,6 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - got, err := createAggregators[N](tt.reader, tt.views, inst) assert.NoError(t, err) require.Len(t, got, tt.wantLen) From f07b0180b1df00d9d7ffb322e8aecdba752a0a20 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Thu, 4 Aug 2022 15:28:38 +0000 Subject: [PATCH 5/8] add monotonic selection to aggregators --- sdk/metric/pipeline.go | 16 ++++++++++++---- sdk/metric/pipeline_registry_test.go | 6 +++--- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 248d766db69..2e692a3148a 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -247,7 +247,7 @@ func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst vi continue } - agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind)) + agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind), isMonotonic(inst.Kind)) if agg != nil { // TODO (#3011): If filtering is done at the instrument level add here. // This is where the aggregator and the view are both in scope. @@ -257,9 +257,17 @@ func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst vi return aggs, errs.errorOrNil() } +func isMonotonic(kind view.InstrumentKind) bool { + switch kind { + case view.AsyncCounter, view.SyncCounter, view.SyncHistogram: + return true + } + return false +} + // createAggregator takes the config (Aggregation and Temporality) and produces a memory backed Aggregator. // TODO (#3011): If filterting is done by the Aggregator it should be passed here. -func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporality metricdata.Temporality) internal.Aggregator[N] { +func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) internal.Aggregator[N] { switch agg := agg.(type) { case aggregation.Drop: return nil @@ -267,9 +275,9 @@ func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporalit return internal.NewLastValue[N]() case aggregation.Sum: if temporality == metricdata.CumulativeTemporality { - return internal.NewCumulativeSum[N]() + return internal.NewCumulativeSum[N](monotonic) } - return internal.NewDeltaSum[N]() + return internal.NewDeltaSum[N](monotonic) case aggregation.ExplicitBucketHistogram: if temporality == metricdata.CumulativeTemporality { return internal.NewCumulativeHistogram[N](agg) diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index c84a32164e1..973368bb144 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -80,14 +80,14 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { name: "default agg should use reader", reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), views: []view.View{defaultAggView}, - wantKind: internal.NewDeltaSum[N](), + wantKind: internal.NewDeltaSum[N](true), wantLen: 1, }, { name: "reader should set default agg", reader: NewManualReader(), views: []view.View{{}}, - wantKind: internal.NewCumulativeSum[N](), + wantKind: internal.NewCumulativeSum[N](true), wantLen: 1, }, { @@ -104,7 +104,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { temp: metricdata.DeltaTemporality, }, views: []view.View{{}, renameView}, - wantKind: internal.NewDeltaSum[N](), + wantKind: internal.NewDeltaSum[N](true), wantLen: 2, }, } From 15d7e84840e4a1a30f4f1583c118e642e85b3eec Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Fri, 5 Aug 2022 19:17:22 +0000 Subject: [PATCH 6/8] Addres PR feedback, expands testing --- sdk/metric/pipeline.go | 30 ++- sdk/metric/pipeline_registry_test.go | 383 +++++++++++++++++---------- sdk/metric/reader_test.go | 2 +- 3 files changed, 268 insertions(+), 147 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 2e692a3148a..95bfe244a94 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -24,7 +24,6 @@ import ( "strings" "sync" - "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -196,36 +195,41 @@ func (reg *pipelineRegistry[N]) registerCallback(fn func(context.Context)) { } type multierror struct { - errors []string + wrapped error + errors []string } func (m *multierror) errorOrNil() error { if len(m.errors) == 0 { return nil } - return fmt.Errorf(strings.Join(m.errors, "; ")) + return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; ")) } func (m *multierror) append(err error) { m.errors = append(m.errors, err.Error()) } -// instrumentIdentifier is used to identify multiple instruments being mapped to the same aggregator. +// instrumentID is used to identify multiple instruments being mapped to the same aggregator. // e.g. using an exact match view with a name=* view. -// You can't use a view.Instrument here because not all Aggregators are hashable. -type instrumentIdentifier struct { +// You can't use a view.Instrument here because not all Aggregators are comparable. +type instrumentID struct { scope instrumentation.Scope name string description string } -func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) (map[instrumentIdentifier]internal.Aggregator[N], error) { - aggs := map[instrumentIdentifier]internal.Aggregator[N]{} - errs := &multierror{} +var errCreatingAggregators = errors.New("could not create all aggregators") + +func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) (map[instrumentID]internal.Aggregator[N], error) { + aggs := map[instrumentID]internal.Aggregator[N]{} + errs := &multierror{ + wrapped: errCreatingAggregators, + } for _, v := range views { inst, match := v.TransformInstrument(inst) - ident := instrumentIdentifier{ + ident := instrumentID{ scope: inst.Scope, name: inst.Name, description: inst.Description, @@ -242,7 +246,7 @@ func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst vi } if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { - global.Error(err, "creating aggregator", "instrumentKind", inst.Kind, "aggregation", inst.Aggregation) + err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err) errs.append(err) continue } @@ -287,7 +291,9 @@ func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporalit return nil } +// TODO: review need for aggregation check after https://github.com/open-telemetry/opentelemetry-specification/issues/2710 var errIncompatibleAggregation = errors.New("incompatible aggregation") +var errUnknownAggregation = errors.New("unrecognized aggregation") // is aggregatorCompatible checks if the aggregation can be used by the instrument. // Current compatibility: @@ -323,6 +329,6 @@ func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregatio return nil default: // This is used passed checking for default, it should be an error at this point. - return errIncompatibleAggregation + return fmt.Errorf("%w: %v", errUnknownAggregation, agg) } } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 973368bb144..28f2bc4db5c 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -18,7 +18,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( - "context" "testing" "github.com/stretchr/testify/assert" @@ -27,23 +26,19 @@ import ( "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/internal" - "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/view" ) -type testReader struct { - agg aggregation.Aggregation - temp metricdata.Temporality +type invalidAggregation struct { + aggregation.Aggregation } -func (t testReader) register(producer) {} -func (t testReader) temporality(view.InstrumentKind) metricdata.Temporality { return t.temp } -func (t testReader) aggregation(view.InstrumentKind) aggregation.Aggregation { return t.agg } // nolint:revive // import-shadow for method scoped by type. -func (t testReader) Collect(context.Context) (metricdata.ResourceMetrics, error) { - return metricdata.ResourceMetrics{}, nil +func (invalidAggregation) Copy() aggregation.Aggregation { + return invalidAggregation{} +} +func (invalidAggregation) Err() error { + return nil } -func (t testReader) ForceFlush(context.Context) error { return nil } -func (t testReader) Shutdown(context.Context) error { return nil } func testCreateAggregators[N int64 | float64](t *testing.T) { changeAggView, _ := view.New( @@ -58,28 +53,81 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { view.MatchInstrumentName("foo"), view.WithSetAggregation(aggregation.Default{}), ) + invalidAggView, _ := view.New( + view.MatchInstrumentName("foo"), + view.WithSetAggregation(invalidAggregation{}), + ) - inst := view.Instrument{ - Name: "foo", - Kind: view.SyncCounter, + instruments := []view.Instrument{ + {Name: "foo", Kind: view.InstrumentKind(0)}, //Unknown kind + {Name: "foo", Kind: view.SyncCounter}, + {Name: "foo", Kind: view.SyncUpDownCounter}, + {Name: "foo", Kind: view.SyncHistogram}, + {Name: "foo", Kind: view.AsyncCounter}, + {Name: "foo", Kind: view.AsyncUpDownCounter}, + {Name: "foo", Kind: view.AsyncGauge}, } testcases := []struct { name string reader Reader views []view.View + inst view.Instrument wantKind internal.Aggregator[N] //Aggregators should match len and types wantLen int + wantErr error }{ { name: "drop should return 0 aggregators", reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), views: []view.View{{}}, + inst: instruments[view.SyncCounter], + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.SyncUpDownCounter], + wantKind: internal.NewDeltaSum[N](false), + wantLen: 1, }, { name: "default agg should use reader", reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), views: []view.View{defaultAggView}, + inst: instruments[view.SyncHistogram], + wantKind: internal.NewDeltaHistogram[N](aggregation.ExplicitBucketHistogram{}), + wantLen: 1, + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.AsyncCounter], + wantKind: internal.NewDeltaSum[N](true), + wantLen: 1, + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.AsyncUpDownCounter], + wantKind: internal.NewDeltaSum[N](false), + wantLen: 1, + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.AsyncGauge], + wantKind: internal.NewLastValue[N](), + wantLen: 1, + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.SyncCounter], wantKind: internal.NewDeltaSum[N](true), wantLen: 1, }, @@ -87,6 +135,47 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { name: "reader should set default agg", reader: NewManualReader(), views: []view.View{{}}, + inst: instruments[view.SyncUpDownCounter], + wantKind: internal.NewCumulativeSum[N](false), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.SyncHistogram], + wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.AsyncCounter], + wantKind: internal.NewCumulativeSum[N](true), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.AsyncUpDownCounter], + wantKind: internal.NewCumulativeSum[N](false), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.AsyncGauge], + wantKind: internal.NewLastValue[N](), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.SyncCounter], wantKind: internal.NewCumulativeSum[N](true), wantLen: 1, }, @@ -94,32 +183,61 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { name: "view should overwrite reader", reader: NewManualReader(), views: []view.View{changeAggView}, + inst: instruments[view.SyncCounter], wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), wantLen: 1, }, { - name: "multiple views should create multiple aggregators", - reader: testReader{ - agg: aggregation.Sum{}, - temp: metricdata.DeltaTemporality, - }, + name: "multiple views should create multiple aggregators", + reader: NewManualReader(), views: []view.View{{}, renameView}, - wantKind: internal.NewDeltaSum[N](true), + inst: instruments[view.SyncCounter], + wantKind: internal.NewCumulativeSum[N](true), wantLen: 2, }, + { + name: "reader with invalid aggregation should error", + reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + views: []view.View{{}}, + inst: instruments[view.SyncCounter], + wantErr: errCreatingAggregators, + }, + { + name: "view with invalid aggregation should error", + reader: NewManualReader(), + views: []view.View{invalidAggView}, + inst: instruments[view.SyncCounter], + wantErr: errCreatingAggregators, + }, } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - got, err := createAggregators[N](tt.reader, tt.views, inst) - assert.NoError(t, err) + got, err := createAggregators[N](tt.reader, tt.views, tt.inst) + assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) for _, agg := range got { assert.IsType(t, tt.wantKind, agg) } + }) } } +func testInvalidInstrumentShouldPanic[N int64 | float64]() { + reader := NewManualReader() + views := []view.View{{}} + inst := view.Instrument{ + Name: "foo", + Kind: view.InstrumentKind(255), + } + createAggregators[N](reader, views, inst) +} + +func TestInvalidInstrumentShouldPanic(t *testing.T) { + assert.Panics(t, testInvalidInstrumentShouldPanic[int64]) + assert.Panics(t, testInvalidInstrumentShouldPanic[float64]) +} + func TestCreateAggregators(t *testing.T) { t.Run("Int64", testCreateAggregators[int64]) t.Run("Float64", testCreateAggregators[float64]) @@ -250,7 +368,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { view.WithRename("foo"), ) views := map[Reader][]view.View{ - testReader{agg: aggregation.Sum{}}: { + NewManualReader(): { {}, renameView, }, @@ -289,174 +407,171 @@ func TestIsAggregatorCompatible(t *testing.T) { var undefinedInstrument view.InstrumentKind testCases := []struct { - name string - kind view.InstrumentKind - agg aggregation.Aggregation - wantErr bool + name string + kind view.InstrumentKind + agg aggregation.Aggregation + want error }{ { - name: "SyncCounter and Drop", - kind: view.SyncCounter, - agg: aggregation.Drop{}, - wantErr: false, + name: "SyncCounter and Drop", + kind: view.SyncCounter, + agg: aggregation.Drop{}, }, { - name: "SyncCounter and LastValue", - kind: view.SyncCounter, - agg: aggregation.LastValue{}, - wantErr: true, + name: "SyncCounter and LastValue", + kind: view.SyncCounter, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, }, { - name: "SyncCounter and Sum", - kind: view.SyncCounter, - agg: aggregation.Sum{}, - wantErr: false, + name: "SyncCounter and Sum", + kind: view.SyncCounter, + agg: aggregation.Sum{}, }, { - name: "SyncCounter and ExplicitBucketHistogram", - kind: view.SyncCounter, - agg: aggregation.ExplicitBucketHistogram{}, - wantErr: false, + name: "SyncCounter and ExplicitBucketHistogram", + kind: view.SyncCounter, + agg: aggregation.ExplicitBucketHistogram{}, }, { - name: "SyncUpDownCounter and Drop", - kind: view.SyncUpDownCounter, - agg: aggregation.Drop{}, - wantErr: false, + name: "SyncUpDownCounter and Drop", + kind: view.SyncUpDownCounter, + agg: aggregation.Drop{}, }, { - name: "SyncUpDownCounter and LastValue", - kind: view.SyncUpDownCounter, - agg: aggregation.LastValue{}, - wantErr: true, + name: "SyncUpDownCounter and LastValue", + kind: view.SyncUpDownCounter, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, }, { - name: "SyncUpDownCounter and Sum", - kind: view.SyncUpDownCounter, - agg: aggregation.Sum{}, - wantErr: false, + name: "SyncUpDownCounter and Sum", + kind: view.SyncUpDownCounter, + agg: aggregation.Sum{}, }, { - name: "SyncUpDownCounter and ExplicitBucketHistogram", - kind: view.SyncUpDownCounter, - agg: aggregation.ExplicitBucketHistogram{}, - wantErr: true, + name: "SyncUpDownCounter and ExplicitBucketHistogram", + kind: view.SyncUpDownCounter, + agg: aggregation.ExplicitBucketHistogram{}, + want: errIncompatibleAggregation, }, { - name: "SyncHistogram and Drop", - kind: view.SyncHistogram, - agg: aggregation.Drop{}, - wantErr: false, + name: "SyncHistogram and Drop", + kind: view.SyncHistogram, + agg: aggregation.Drop{}, }, { - name: "SyncHistogram and LastValue", - kind: view.SyncHistogram, - agg: aggregation.LastValue{}, - wantErr: true, + name: "SyncHistogram and LastValue", + kind: view.SyncHistogram, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, }, { - name: "SyncHistogram and Sum", - kind: view.SyncHistogram, - agg: aggregation.Sum{}, - wantErr: false, + name: "SyncHistogram and Sum", + kind: view.SyncHistogram, + agg: aggregation.Sum{}, }, { - name: "SyncHistogram and ExplicitBucketHistogram", - kind: view.SyncHistogram, - agg: aggregation.ExplicitBucketHistogram{}, - wantErr: false, + name: "SyncHistogram and ExplicitBucketHistogram", + kind: view.SyncHistogram, + agg: aggregation.ExplicitBucketHistogram{}, }, { - name: "AsyncCounter and Drop", - kind: view.AsyncCounter, - agg: aggregation.Drop{}, - wantErr: false, + name: "AsyncCounter and Drop", + kind: view.AsyncCounter, + agg: aggregation.Drop{}, }, { - name: "AsyncCounter and LastValue", - kind: view.AsyncCounter, - agg: aggregation.LastValue{}, - wantErr: true, + name: "AsyncCounter and LastValue", + kind: view.AsyncCounter, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, }, { - name: "AsyncCounter and Sum", - kind: view.AsyncCounter, - agg: aggregation.Sum{}, - wantErr: false, + name: "AsyncCounter and Sum", + kind: view.AsyncCounter, + agg: aggregation.Sum{}, }, { - name: "AsyncCounter and ExplicitBucketHistogram", - kind: view.AsyncCounter, - agg: aggregation.ExplicitBucketHistogram{}, - wantErr: true, + name: "AsyncCounter and ExplicitBucketHistogram", + kind: view.AsyncCounter, + agg: aggregation.ExplicitBucketHistogram{}, + want: errIncompatibleAggregation, }, { - name: "AsyncUpDownCounter and Drop", - kind: view.AsyncUpDownCounter, - agg: aggregation.Drop{}, - wantErr: false, + name: "AsyncUpDownCounter and Drop", + kind: view.AsyncUpDownCounter, + agg: aggregation.Drop{}, }, { - name: "AsyncUpDownCounter and LastValue", - kind: view.AsyncUpDownCounter, - agg: aggregation.LastValue{}, - wantErr: true, + name: "AsyncUpDownCounter and LastValue", + kind: view.AsyncUpDownCounter, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, }, { - name: "AsyncUpDownCounter and Sum", - kind: view.AsyncUpDownCounter, - agg: aggregation.Sum{}, - wantErr: false, + name: "AsyncUpDownCounter and Sum", + kind: view.AsyncUpDownCounter, + agg: aggregation.Sum{}, }, { - name: "AsyncUpDownCounter and ExplicitBucketHistogram", - kind: view.AsyncUpDownCounter, - agg: aggregation.ExplicitBucketHistogram{}, - wantErr: true, + name: "AsyncUpDownCounter and ExplicitBucketHistogram", + kind: view.AsyncUpDownCounter, + agg: aggregation.ExplicitBucketHistogram{}, + want: errIncompatibleAggregation, }, { - name: "AsyncGauge and Drop", - kind: view.AsyncGauge, - agg: aggregation.Drop{}, - wantErr: false, + name: "AsyncGauge and Drop", + kind: view.AsyncGauge, + agg: aggregation.Drop{}, }, { - name: "AsyncGauge and aggregation.LastValue{}", - kind: view.AsyncGauge, - agg: aggregation.LastValue{}, - wantErr: false, + name: "AsyncGauge and aggregation.LastValue{}", + kind: view.AsyncGauge, + agg: aggregation.LastValue{}, }, { - name: "AsyncGauge and Sum", - kind: view.AsyncGauge, - agg: aggregation.Sum{}, - wantErr: true, + name: "AsyncGauge and Sum", + kind: view.AsyncGauge, + agg: aggregation.Sum{}, + want: errIncompatibleAggregation, }, { - name: "AsyncGauge and ExplicitBucketHistogram", - kind: view.AsyncGauge, - agg: aggregation.ExplicitBucketHistogram{}, - wantErr: true, + name: "AsyncGauge and ExplicitBucketHistogram", + kind: view.AsyncGauge, + agg: aggregation.ExplicitBucketHistogram{}, + want: errIncompatibleAggregation, }, { - name: "Default aggregation should error", - kind: view.SyncCounter, - agg: aggregation.Default{}, - wantErr: true, + name: "Default aggregation should error", + kind: view.SyncCounter, + agg: aggregation.Default{}, + want: errUnknownAggregation, }, { - name: "unknown kind should error", - kind: undefinedInstrument, - agg: aggregation.Sum{}, - wantErr: true, + name: "unknown kind with Sum should error", + kind: undefinedInstrument, + agg: aggregation.Sum{}, + want: errIncompatibleAggregation, + }, + { + name: "unknown kind with LastValue should error", + kind: undefinedInstrument, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, + }, + { + name: "unknown kind with Histogram should error", + kind: undefinedInstrument, + agg: aggregation.ExplicitBucketHistogram{}, + want: errIncompatibleAggregation, }, } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { err := isAggregatorCompatible(tt.kind, tt.agg) - - assert.Equal(t, tt.wantErr, (err != nil)) + assert.ErrorIs(t, err, tt.want) }) } } diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 2bf802ba7a0..eb98c486ff0 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -217,7 +217,7 @@ func TestDefaultTemporalitySelector(t *testing.T) { } } -func TestReadersAreHashable(t *testing.T) { +func TestReadersAreComparable(t *testing.T) { var _ map[Reader]struct{} = map[Reader]struct{}{ NewManualReader(): {}, NewPeriodicReader(new(fnExporter)): {}, From 46544ece8d4e83f72fa0a8671aa279ca940dd262 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Mon, 8 Aug 2022 14:03:36 +0000 Subject: [PATCH 7/8] Fix lint --- sdk/metric/pipeline_registry_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 28f2bc4db5c..875c469ba1b 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -218,7 +218,6 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { for _, agg := range got { assert.IsType(t, tt.wantKind, agg) } - }) } } @@ -230,7 +229,7 @@ func testInvalidInstrumentShouldPanic[N int64 | float64]() { Name: "foo", Kind: view.InstrumentKind(255), } - createAggregators[N](reader, views, inst) + _, _ = createAggregators[N](reader, views, inst) } func TestInvalidInstrumentShouldPanic(t *testing.T) { From 127dce7a82b7e4013ee8a166569066d9e855458a Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Thu, 11 Aug 2022 18:24:42 +0000 Subject: [PATCH 8/8] Move reader comparability test to var --- sdk/metric/manual_reader.go | 4 ++-- sdk/metric/periodic_reader.go | 3 +++ sdk/metric/reader_test.go | 7 ------- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 61873d2ddea..ec985332188 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -39,8 +39,8 @@ type manualReader struct { aggregationSelector AggregationSelector } -// Compile time check the manualReader implements Reader. -var _ Reader = &manualReader{} +// Compile time check the manualReader implements Reader and is comparable. +var _ = map[Reader]struct{}{&manualReader{}: {}} // NewManualReader returns a Reader which is directly called to collect metrics. func NewManualReader(opts ...ManualReaderOption) Reader { diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 3e2db5ea50b..9c66ac5d552 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -149,6 +149,9 @@ type periodicReader struct { shutdownOnce sync.Once } +// Compile time check the periodicReader implements Reader and is comparable. +var _ = map[Reader]struct{}{&periodicReader{}: {}} + // newTicker allows testing override. var newTicker = time.NewTicker diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index eb98c486ff0..3cc8c8c9264 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -216,10 +216,3 @@ func TestDefaultTemporalitySelector(t *testing.T) { assert.Equal(t, metricdata.CumulativeTemporality, DefaultTemporalitySelector(ik)) } } - -func TestReadersAreComparable(t *testing.T) { - var _ map[Reader]struct{} = map[Reader]struct{}{ - NewManualReader(): {}, - NewPeriodicReader(new(fnExporter)): {}, - } -}