From 258bb2d46ced564ea7525fc1209db70684c06353 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Thu, 11 Aug 2022 14:10:27 -0500 Subject: [PATCH] Adds a pipelineRegistry to manage creating aggregators. (#3044) * Adds a pipelineRegistry to manage creating aggregators. * Made pipeline generic * Add aggregation filter to the registry. Co-authored-by: Chester Cheung --- sdk/metric/manual_reader.go | 4 +- sdk/metric/periodic_reader.go | 3 + sdk/metric/pipeline.go | 202 +++++++++- sdk/metric/pipeline_registry_test.go | 576 +++++++++++++++++++++++++++ 4 files changed, 781 insertions(+), 4 deletions(-) create mode 100644 sdk/metric/pipeline_registry_test.go diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 61873d2ddea..ec985332188 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -39,8 +39,8 @@ type manualReader struct { aggregationSelector AggregationSelector } -// Compile time check the manualReader implements Reader. -var _ Reader = &manualReader{} +// Compile time check the manualReader implements Reader and is comparable. +var _ = map[Reader]struct{}{&manualReader{}: {}} // NewManualReader returns a Reader which is directly called to collect metrics. func NewManualReader(opts ...ManualReaderOption) Reader { diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 3e2db5ea50b..9c66ac5d552 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -149,6 +149,9 @@ type periodicReader struct { shutdownOnce sync.Once } +// Compile time check the periodicReader implements Reader and is comparable. +var _ = map[Reader]struct{}{&periodicReader{}: {}} + // newTicker allows testing override. var newTicker = time.NewTicker diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index f7a85a91bab..95bfe244a94 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -19,12 +19,17 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "errors" "fmt" + "strings" "sync" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -110,8 +115,8 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations)) for scope, instruments := range p.aggregations { metrics := make([]metricdata.Metrics, 0, len(instruments)) - for inst, aggregation := range instruments { - data := aggregation.Aggregation() + for inst, agg := range instruments { + data := agg.Aggregation() if data != nil { metrics = append(metrics, metricdata.Metrics{ Name: inst.name, @@ -134,3 +139,196 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err ScopeMetrics: sm, }, nil } + +// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve +// new Aggregators from a pipelineRegistry. +type pipelineRegistry[N int64 | float64] struct { + views map[Reader][]view.View + pipelines map[Reader]*pipeline +} + +func newPipelineRegistries(views map[Reader][]view.View) (*pipelineRegistry[int64], *pipelineRegistry[float64]) { + pipelines := map[Reader]*pipeline{} + for rdr := range views { + pipe := &pipeline{} + rdr.register(pipe) + pipelines[rdr] = pipe + } + return &pipelineRegistry[int64]{ + views: views, + pipelines: pipelines, + }, &pipelineRegistry[float64]{ + views: views, + pipelines: pipelines, + } +} + +// createAggregators will create all backing aggregators for an instrument. +// It will return an error if an instrument is registered more than once. +// Note: There may be returned aggregators with an error. +func (reg *pipelineRegistry[N]) createAggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { + var aggs []internal.Aggregator[N] + + errs := &multierror{} + for rdr, views := range reg.views { + pipe := reg.pipelines[rdr] + rdrAggs, err := createAggregators[N](rdr, views, inst) + if err != nil { + errs.append(err) + } + for inst, agg := range rdrAggs { + err := pipe.addAggregator(inst.scope, inst.name, inst.description, instUnit, agg) + if err != nil { + errs.append(err) + } + aggs = append(aggs, agg) + } + } + return aggs, errs.errorOrNil() +} + +// TODO (#3053) Only register callbacks if any instrument matches in a view. +func (reg *pipelineRegistry[N]) registerCallback(fn func(context.Context)) { + for _, pipe := range reg.pipelines { + pipe.addCallback(fn) + } +} + +type multierror struct { + wrapped error + errors []string +} + +func (m *multierror) errorOrNil() error { + if len(m.errors) == 0 { + return nil + } + return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; ")) +} + +func (m *multierror) append(err error) { + m.errors = append(m.errors, err.Error()) +} + +// instrumentID is used to identify multiple instruments being mapped to the same aggregator. +// e.g. using an exact match view with a name=* view. +// You can't use a view.Instrument here because not all Aggregators are comparable. +type instrumentID struct { + scope instrumentation.Scope + name string + description string +} + +var errCreatingAggregators = errors.New("could not create all aggregators") + +func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) (map[instrumentID]internal.Aggregator[N], error) { + aggs := map[instrumentID]internal.Aggregator[N]{} + errs := &multierror{ + wrapped: errCreatingAggregators, + } + for _, v := range views { + inst, match := v.TransformInstrument(inst) + + ident := instrumentID{ + scope: inst.Scope, + name: inst.Name, + description: inst.Description, + } + + if _, ok := aggs[ident]; ok || !match { + continue + } + + if inst.Aggregation == nil { + inst.Aggregation = rdr.aggregation(inst.Kind) + } else if _, ok := inst.Aggregation.(aggregation.Default); ok { + inst.Aggregation = rdr.aggregation(inst.Kind) + } + + if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { + err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err) + errs.append(err) + continue + } + + agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind), isMonotonic(inst.Kind)) + if agg != nil { + // TODO (#3011): If filtering is done at the instrument level add here. + // This is where the aggregator and the view are both in scope. + aggs[ident] = agg + } + } + return aggs, errs.errorOrNil() +} + +func isMonotonic(kind view.InstrumentKind) bool { + switch kind { + case view.AsyncCounter, view.SyncCounter, view.SyncHistogram: + return true + } + return false +} + +// createAggregator takes the config (Aggregation and Temporality) and produces a memory backed Aggregator. +// TODO (#3011): If filterting is done by the Aggregator it should be passed here. +func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) internal.Aggregator[N] { + switch agg := agg.(type) { + case aggregation.Drop: + return nil + case aggregation.LastValue: + return internal.NewLastValue[N]() + case aggregation.Sum: + if temporality == metricdata.CumulativeTemporality { + return internal.NewCumulativeSum[N](monotonic) + } + return internal.NewDeltaSum[N](monotonic) + case aggregation.ExplicitBucketHistogram: + if temporality == metricdata.CumulativeTemporality { + return internal.NewCumulativeHistogram[N](agg) + } + return internal.NewDeltaHistogram[N](agg) + } + return nil +} + +// TODO: review need for aggregation check after https://github.com/open-telemetry/opentelemetry-specification/issues/2710 +var errIncompatibleAggregation = errors.New("incompatible aggregation") +var errUnknownAggregation = errors.New("unrecognized aggregation") + +// is aggregatorCompatible checks if the aggregation can be used by the instrument. +// Current compatibility: +// +// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram | +// |----------------------|------|-----------|-----|-----------|-----------------------| +// | Sync Counter | X | | X | X | X | +// | Sync UpDown Counter | X | | X | | | +// | Sync Histogram | X | | X | X | X | +// | Async Counter | X | | X | | | +// | Async UpDown Counter | X | | X | | | +// | Async Gauge | X | X | | | |. +func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregation) error { + switch agg.(type) { + case aggregation.ExplicitBucketHistogram: + if kind == view.SyncCounter || kind == view.SyncHistogram { + return nil + } + return errIncompatibleAggregation + case aggregation.Sum: + switch kind { + case view.AsyncCounter, view.AsyncUpDownCounter, view.SyncCounter, view.SyncHistogram, view.SyncUpDownCounter: + return nil + default: + return errIncompatibleAggregation + } + case aggregation.LastValue: + if kind == view.AsyncGauge { + return nil + } + return errIncompatibleAggregation + case aggregation.Drop: + return nil + default: + // This is used passed checking for default, it should be an error at this point. + return fmt.Errorf("%w: %v", errUnknownAggregation, agg) + } +} diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go new file mode 100644 index 00000000000..875c469ba1b --- /dev/null +++ b/sdk/metric/pipeline_registry_test.go @@ -0,0 +1,576 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/internal" + "go.opentelemetry.io/otel/sdk/metric/view" +) + +type invalidAggregation struct { + aggregation.Aggregation +} + +func (invalidAggregation) Copy() aggregation.Aggregation { + return invalidAggregation{} +} +func (invalidAggregation) Err() error { + return nil +} + +func testCreateAggregators[N int64 | float64](t *testing.T) { + changeAggView, _ := view.New( + view.MatchInstrumentName("foo"), + view.WithSetAggregation(aggregation.ExplicitBucketHistogram{}), + ) + renameView, _ := view.New( + view.MatchInstrumentName("foo"), + view.WithRename("bar"), + ) + defaultAggView, _ := view.New( + view.MatchInstrumentName("foo"), + view.WithSetAggregation(aggregation.Default{}), + ) + invalidAggView, _ := view.New( + view.MatchInstrumentName("foo"), + view.WithSetAggregation(invalidAggregation{}), + ) + + instruments := []view.Instrument{ + {Name: "foo", Kind: view.InstrumentKind(0)}, //Unknown kind + {Name: "foo", Kind: view.SyncCounter}, + {Name: "foo", Kind: view.SyncUpDownCounter}, + {Name: "foo", Kind: view.SyncHistogram}, + {Name: "foo", Kind: view.AsyncCounter}, + {Name: "foo", Kind: view.AsyncUpDownCounter}, + {Name: "foo", Kind: view.AsyncGauge}, + } + + testcases := []struct { + name string + reader Reader + views []view.View + inst view.Instrument + wantKind internal.Aggregator[N] //Aggregators should match len and types + wantLen int + wantErr error + }{ + { + name: "drop should return 0 aggregators", + reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), + views: []view.View{{}}, + inst: instruments[view.SyncCounter], + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.SyncUpDownCounter], + wantKind: internal.NewDeltaSum[N](false), + wantLen: 1, + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.SyncHistogram], + wantKind: internal.NewDeltaHistogram[N](aggregation.ExplicitBucketHistogram{}), + wantLen: 1, + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.AsyncCounter], + wantKind: internal.NewDeltaSum[N](true), + wantLen: 1, + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.AsyncUpDownCounter], + wantKind: internal.NewDeltaSum[N](false), + wantLen: 1, + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.AsyncGauge], + wantKind: internal.NewLastValue[N](), + wantLen: 1, + }, + { + name: "default agg should use reader", + reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), + views: []view.View{defaultAggView}, + inst: instruments[view.SyncCounter], + wantKind: internal.NewDeltaSum[N](true), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.SyncUpDownCounter], + wantKind: internal.NewCumulativeSum[N](false), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.SyncHistogram], + wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.AsyncCounter], + wantKind: internal.NewCumulativeSum[N](true), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.AsyncUpDownCounter], + wantKind: internal.NewCumulativeSum[N](false), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.AsyncGauge], + wantKind: internal.NewLastValue[N](), + wantLen: 1, + }, + { + name: "reader should set default agg", + reader: NewManualReader(), + views: []view.View{{}}, + inst: instruments[view.SyncCounter], + wantKind: internal.NewCumulativeSum[N](true), + wantLen: 1, + }, + { + name: "view should overwrite reader", + reader: NewManualReader(), + views: []view.View{changeAggView}, + inst: instruments[view.SyncCounter], + wantKind: internal.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), + wantLen: 1, + }, + { + name: "multiple views should create multiple aggregators", + reader: NewManualReader(), + views: []view.View{{}, renameView}, + inst: instruments[view.SyncCounter], + wantKind: internal.NewCumulativeSum[N](true), + wantLen: 2, + }, + { + name: "reader with invalid aggregation should error", + reader: NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + views: []view.View{{}}, + inst: instruments[view.SyncCounter], + wantErr: errCreatingAggregators, + }, + { + name: "view with invalid aggregation should error", + reader: NewManualReader(), + views: []view.View{invalidAggView}, + inst: instruments[view.SyncCounter], + wantErr: errCreatingAggregators, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + got, err := createAggregators[N](tt.reader, tt.views, tt.inst) + assert.ErrorIs(t, err, tt.wantErr) + require.Len(t, got, tt.wantLen) + for _, agg := range got { + assert.IsType(t, tt.wantKind, agg) + } + }) + } +} + +func testInvalidInstrumentShouldPanic[N int64 | float64]() { + reader := NewManualReader() + views := []view.View{{}} + inst := view.Instrument{ + Name: "foo", + Kind: view.InstrumentKind(255), + } + _, _ = createAggregators[N](reader, views, inst) +} + +func TestInvalidInstrumentShouldPanic(t *testing.T) { + assert.Panics(t, testInvalidInstrumentShouldPanic[int64]) + assert.Panics(t, testInvalidInstrumentShouldPanic[float64]) +} + +func TestCreateAggregators(t *testing.T) { + t.Run("Int64", testCreateAggregators[int64]) + t.Run("Float64", testCreateAggregators[float64]) +} + +func TestPipelineRegistryCreateAggregators(t *testing.T) { + renameView, _ := view.New( + view.MatchInstrumentName("foo"), + view.WithRename("bar"), + ) + testRdr := NewManualReader() + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) + + testCases := []struct { + name string + views map[Reader][]view.View + inst view.Instrument + wantCount int + }{ + { + name: "No views have no aggregators", + inst: view.Instrument{Name: "foo"}, + }, + { + name: "1 reader 1 view gets 1 aggregator", + inst: view.Instrument{Name: "foo"}, + views: map[Reader][]view.View{ + testRdr: { + {}, + }, + }, + wantCount: 1, + }, + { + name: "1 reader 2 views gets 2 aggregator", + inst: view.Instrument{Name: "foo"}, + views: map[Reader][]view.View{ + testRdr: { + {}, + renameView, + }, + }, + wantCount: 2, + }, + { + name: "2 readers 1 view each gets 2 aggregators", + inst: view.Instrument{Name: "foo"}, + views: map[Reader][]view.View{ + testRdr: { + {}, + }, + testRdrHistogram: { + {}, + }, + }, + wantCount: 2, + }, + { + name: "2 reader 2 views each gets 4 aggregators", + inst: view.Instrument{Name: "foo"}, + views: map[Reader][]view.View{ + testRdr: { + {}, + renameView, + }, + testRdrHistogram: { + {}, + renameView, + }, + }, + wantCount: 4, + }, + { + name: "An instrument is duplicated in two views share the same aggregator", + inst: view.Instrument{Name: "foo"}, + views: map[Reader][]view.View{ + testRdr: { + {}, + {}, + }, + }, + wantCount: 1, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + intReg, _ := newPipelineRegistries(tt.views) + testPipelineRegistryCreateAggregators(t, intReg, tt.wantCount) + _, floatReg := newPipelineRegistries(tt.views) + testPipelineRegistryCreateAggregators(t, floatReg, tt.wantCount) + }) + } +} + +func testPipelineRegistryCreateAggregators[N int64 | float64](t *testing.T, reg *pipelineRegistry[N], wantCount int) { + inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} + + aggs, err := reg.createAggregators(inst, unit.Dimensionless) + assert.NoError(t, err) + + require.Len(t, aggs, wantCount) +} + +func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik view.InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) + + views := map[Reader][]view.View{ + testRdrHistogram: { + {}, + }, + } + intReg, floatReg := newPipelineRegistries(views) + inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} + + intAggs, err := intReg.createAggregators(inst, unit.Dimensionless) + assert.Error(t, err) + assert.Len(t, intAggs, 0) + + floatAggs, err := floatReg.createAggregators(inst, unit.Dimensionless) + assert.Error(t, err) + assert.Len(t, floatAggs, 0) +} + +func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { + renameView, _ := view.New( + view.MatchInstrumentName("bar"), + view.WithRename("foo"), + ) + views := map[Reader][]view.View{ + NewManualReader(): { + {}, + renameView, + }, + } + + fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} + barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} + + intReg, floatReg := newPipelineRegistries(views) + + intAggs, err := intReg.createAggregators(fooInst, unit.Dimensionless) + assert.NoError(t, err) + assert.Len(t, intAggs, 1) + + // The Rename view should error, because it creates a foo instrument. + intAggs, err = intReg.createAggregators(barInst, unit.Dimensionless) + assert.Error(t, err) + assert.Len(t, intAggs, 2) + + // Creating a float foo instrument should error because there is an int foo instrument. + floatAggs, err := floatReg.createAggregators(fooInst, unit.Dimensionless) + assert.Error(t, err) + assert.Len(t, floatAggs, 1) + + fooInst = view.Instrument{Name: "foo-float", Kind: view.SyncCounter} + + _, err = floatReg.createAggregators(fooInst, unit.Dimensionless) + assert.NoError(t, err) + + floatAggs, err = floatReg.createAggregators(barInst, unit.Dimensionless) + assert.Error(t, err) + assert.Len(t, floatAggs, 2) +} + +func TestIsAggregatorCompatible(t *testing.T) { + var undefinedInstrument view.InstrumentKind + + testCases := []struct { + name string + kind view.InstrumentKind + agg aggregation.Aggregation + want error + }{ + { + name: "SyncCounter and Drop", + kind: view.SyncCounter, + agg: aggregation.Drop{}, + }, + { + name: "SyncCounter and LastValue", + kind: view.SyncCounter, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, + }, + { + name: "SyncCounter and Sum", + kind: view.SyncCounter, + agg: aggregation.Sum{}, + }, + { + name: "SyncCounter and ExplicitBucketHistogram", + kind: view.SyncCounter, + agg: aggregation.ExplicitBucketHistogram{}, + }, + { + name: "SyncUpDownCounter and Drop", + kind: view.SyncUpDownCounter, + agg: aggregation.Drop{}, + }, + { + name: "SyncUpDownCounter and LastValue", + kind: view.SyncUpDownCounter, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, + }, + { + name: "SyncUpDownCounter and Sum", + kind: view.SyncUpDownCounter, + agg: aggregation.Sum{}, + }, + { + name: "SyncUpDownCounter and ExplicitBucketHistogram", + kind: view.SyncUpDownCounter, + agg: aggregation.ExplicitBucketHistogram{}, + want: errIncompatibleAggregation, + }, + { + name: "SyncHistogram and Drop", + kind: view.SyncHistogram, + agg: aggregation.Drop{}, + }, + { + name: "SyncHistogram and LastValue", + kind: view.SyncHistogram, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, + }, + { + name: "SyncHistogram and Sum", + kind: view.SyncHistogram, + agg: aggregation.Sum{}, + }, + { + name: "SyncHistogram and ExplicitBucketHistogram", + kind: view.SyncHistogram, + agg: aggregation.ExplicitBucketHistogram{}, + }, + { + name: "AsyncCounter and Drop", + kind: view.AsyncCounter, + agg: aggregation.Drop{}, + }, + { + name: "AsyncCounter and LastValue", + kind: view.AsyncCounter, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, + }, + { + name: "AsyncCounter and Sum", + kind: view.AsyncCounter, + agg: aggregation.Sum{}, + }, + { + name: "AsyncCounter and ExplicitBucketHistogram", + kind: view.AsyncCounter, + agg: aggregation.ExplicitBucketHistogram{}, + want: errIncompatibleAggregation, + }, + { + name: "AsyncUpDownCounter and Drop", + kind: view.AsyncUpDownCounter, + agg: aggregation.Drop{}, + }, + { + name: "AsyncUpDownCounter and LastValue", + kind: view.AsyncUpDownCounter, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, + }, + { + name: "AsyncUpDownCounter and Sum", + kind: view.AsyncUpDownCounter, + agg: aggregation.Sum{}, + }, + { + name: "AsyncUpDownCounter and ExplicitBucketHistogram", + kind: view.AsyncUpDownCounter, + agg: aggregation.ExplicitBucketHistogram{}, + want: errIncompatibleAggregation, + }, + { + name: "AsyncGauge and Drop", + kind: view.AsyncGauge, + agg: aggregation.Drop{}, + }, + { + name: "AsyncGauge and aggregation.LastValue{}", + kind: view.AsyncGauge, + agg: aggregation.LastValue{}, + }, + { + name: "AsyncGauge and Sum", + kind: view.AsyncGauge, + agg: aggregation.Sum{}, + want: errIncompatibleAggregation, + }, + { + name: "AsyncGauge and ExplicitBucketHistogram", + kind: view.AsyncGauge, + agg: aggregation.ExplicitBucketHistogram{}, + want: errIncompatibleAggregation, + }, + { + name: "Default aggregation should error", + kind: view.SyncCounter, + agg: aggregation.Default{}, + want: errUnknownAggregation, + }, + { + name: "unknown kind with Sum should error", + kind: undefinedInstrument, + agg: aggregation.Sum{}, + want: errIncompatibleAggregation, + }, + { + name: "unknown kind with LastValue should error", + kind: undefinedInstrument, + agg: aggregation.LastValue{}, + want: errIncompatibleAggregation, + }, + { + name: "unknown kind with Histogram should error", + kind: undefinedInstrument, + agg: aggregation.ExplicitBucketHistogram{}, + want: errIncompatibleAggregation, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + err := isAggregatorCompatible(tt.kind, tt.agg) + assert.ErrorIs(t, err, tt.want) + }) + } +}