From a8bbf8a0edba543ce182f83dc9cd21294276546e Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 27 Sep 2022 12:11:13 -0700 Subject: [PATCH 1/6] Use default view if inst matches no other Fix #3224 --- sdk/metric/pipeline.go | 52 +++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 0bd52d63023..9eb86f593ac 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -188,31 +188,23 @@ func newInserter[N int64 | float64](p *pipeline) *inserter[N] { // Instrument inserts instrument inst with instUnit returning the Aggregators // that need to be updated with measurments for that instrument. func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { + var matched bool seen := map[instrumentID]struct{}{} var aggs []internal.Aggregator[N] errs := &multierror{wrapped: errCreatingAggregators} for _, v := range i.pipeline.views { inst, match := v.TransformInstrument(inst) + if !match { + continue + } + matched = true id := instrumentID{ scope: inst.Scope, name: inst.Name, description: inst.Description, } - - if _, ok := seen[id]; ok || !match { - continue - } - - if inst.Aggregation == nil { - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) - } else if _, ok := inst.Aggregation.(aggregation.Default); ok { - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) - } - - if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { - err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err) - errs.append(err) + if _, ok := seen[id]; ok { continue } @@ -233,16 +225,40 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in errs.append(err) } } - // TODO(#3224): handle when no views match. Default should be reader - // aggregation returned. + + if !matched { // Apply implicit default view if no explicit matched. + a, err := i.aggregator(inst) + if err != nil { + errs.append(err) + } + if a != nil { + aggs = append(aggs, a) + err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a) + if err != nil { + errs.append(err) + } + } + } + return aggs, errs.errorOrNil() } // aggregator returns the Aggregator for an instrument configuration. If the // instrument defines an unknown aggregation, an error is returned. func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { - // TODO (#3011): If filtering is done by the Aggregator it should be passed - // here. + switch inst.Aggregation.(type) { + case nil, aggregation.Default: + // Undefined, nil, means to use the default from the reader. + inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) + } + + if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { + return nil, fmt.Errorf( + "creating aggregator with instrumentKind: %d, aggregation %v: %w", + inst.Kind, inst.Aggregation, err, + ) + } + var ( temporality = i.pipeline.reader.temporality(inst.Kind) monotonic bool From 64f3165da5502785647f79da7c8b59d2102491cd Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 27 Sep 2022 12:53:32 -0700 Subject: [PATCH 2/6] Test default view applied if no match --- sdk/metric/pipeline_test.go | 61 +++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index ca83c9c3a9e..7a3321da0c1 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -25,7 +25,10 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -211,3 +214,61 @@ func TestPipelineConcurrency(t *testing.T) { } wg.Wait() } + +func TestDefaultViewImplicit(t *testing.T) { + t.Run("Int64", testDefaultViewImplicit[int64]()) + t.Run("Float64", testDefaultViewImplicit[float64]()) +} + +func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { + inst := view.Instrument{ + Scope: instrumentation.Scope{Name: "testing/lib"}, + Name: "requests", + Description: "count of requests received", + Kind: view.SyncCounter, + Aggregation: aggregation.Sum{}, + } + return func(t *testing.T) { + reader := NewManualReader() + v, err := view.New(view.MatchInstrumentName("foo"), view.WithRename("bar")) + require.NoError(t, err) + + tests := []struct { + name string + pipe *pipeline + }{ + { + name: "NoView", + pipe: newPipeline(nil, reader, nil), + }, + { + name: "NoMatchingView", + pipe: newPipeline(nil, reader, []view.View{v}), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + i := newInserter[N](test.pipe) + got, err := i.Instrument(inst, unit.Dimensionless) + require.NoError(t, err) + assert.Len(t, got, 1, "default view not applied") + + out, err := test.pipe.produce(context.Background()) + require.NoError(t, err) + require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline") + sm := out.ScopeMetrics[0] + require.Len(t, sm.Metrics, 1, "metrics not produced from default view") + metricdatatest.AssertEqual(t, metricdata.Metrics{ + Name: inst.Name, + Description: inst.Description, + Unit: unit.Dimensionless, + Data: metricdata.Sum[N]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, sm.Metrics[0], metricdatatest.IgnoreTimestamp()) + }) + } + } +} From b8a36587208008123cf0df879ba95b9fd5ae14fa Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 27 Sep 2022 12:55:05 -0700 Subject: [PATCH 3/6] Add changes to changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf26a4ff104..afe1c2c7dbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Upgrade `golang.org/x/sys/unix` from `v0.0.0-20210423185535-09eb48e85fd7` to `v0.0.0-20220919091848-fb04ddd9f9c8`. This addresses [GO-2022-0493](https://pkg.go.dev/vuln/GO-2022-0493). (#3235) +### Fixed + +- Use default view if instrument does not match any registered view of a reader. (#3224, #3237) + ## [0.32.1] Metric SDK (Alpha) - 2022-09-22 ### Changed From b42f829606fa9ae972970a91d75d2eaedfecc545 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 29 Sep 2022 08:24:55 -0700 Subject: [PATCH 4/6] Remove unneeded views len check in WithReader --- sdk/metric/config.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index a1253334505..5b7537f63ed 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -126,10 +126,6 @@ func WithReader(r Reader, views ...view.View) Option { if cfg.readers == nil { cfg.readers = make(map[Reader][]view.View) } - if len(views) == 0 { - views = []view.View{{}} - } - cfg.readers[r] = views return cfg }) From b56efb06a763e7da174f8bfd896d37e244754ccc Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 29 Sep 2022 08:28:20 -0700 Subject: [PATCH 5/6] Do not return agg if adding err-ed --- sdk/metric/pipeline.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 9eb86f593ac..0dbdccd6291 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -216,14 +216,17 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in if agg == nil { // Drop aggregator. continue } - // TODO (#3011): If filtering is done at the instrument level add here. - // This is where the aggregator and the view are both in scope. - aggs = append(aggs, agg) - seen[id] = struct{}{} err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) if err != nil { errs.append(err) + // Do not return the aggregator to be updated if the pipeline will + // never produce from it. + continue } + // TODO (#3011): If filtering is done at the instrument level add here. + // This is where the aggregator and the view are both in scope. + aggs = append(aggs, agg) + seen[id] = struct{}{} } if !matched { // Apply implicit default view if no explicit matched. @@ -232,10 +235,13 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in errs.append(err) } if a != nil { - aggs = append(aggs, a) err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a) if err != nil { + // Do not return the aggregator to be updated if the pipeline + // will never produce from it. errs.append(err) + } else { + aggs = append(aggs, a) } } } From a399cf15a8b6d1afea3f8710653a7b6e3e374838 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 30 Sep 2022 14:21:01 -0700 Subject: [PATCH 6/6] Revert "Do not return agg if adding err-ed" This reverts commit b56efb06a763e7da174f8bfd896d37e244754ccc. --- sdk/metric/pipeline.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 0dbdccd6291..9eb86f593ac 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -216,17 +216,14 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in if agg == nil { // Drop aggregator. continue } - err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) - if err != nil { - errs.append(err) - // Do not return the aggregator to be updated if the pipeline will - // never produce from it. - continue - } // TODO (#3011): If filtering is done at the instrument level add here. // This is where the aggregator and the view are both in scope. aggs = append(aggs, agg) seen[id] = struct{}{} + err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) + if err != nil { + errs.append(err) + } } if !matched { // Apply implicit default view if no explicit matched. @@ -235,13 +232,10 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in errs.append(err) } if a != nil { + aggs = append(aggs, a) err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a) if err != nil { - // Do not return the aggregator to be updated if the pipeline - // will never produce from it. errs.append(err) - } else { - aggs = append(aggs, a) } } }