Skip to content

Commit

Permalink
Use default view if instrument does not match any pipeline view (#3237)
Browse files Browse the repository at this point in the history
* Use default view if inst matches no other

Fix #3224

* Test default view applied if no match

* Add changes to changelog

* Remove unneeded views len check in WithReader

* Do not return agg if adding err-ed

* Revert "Do not return agg if adding err-ed"

This reverts commit b56efb0.
  • Loading branch information
MrAlias authored Oct 7, 2022
1 parent 697d245 commit c5ebbc4
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 22 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This addresses [GO-2022-0493](https://pkg.go.dev/vuln/GO-2022-0493). (#3235)
- Update histogram default bounds to match the requirements of the latest specification. (#3222)

### Fixed

- Use default view if instrument does not match any registered view of a reader. (#3224, #3237)

## [0.32.1] Metric SDK (Alpha) - 2022-09-22

### Changed
Expand Down
4 changes: 0 additions & 4 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ func WithReader(r Reader, views ...view.View) Option {
if cfg.readers == nil {
cfg.readers = make(map[Reader][]view.View)
}
if len(views) == 0 {
views = []view.View{{}}
}

cfg.readers[r] = views
return cfg
})
Expand Down
52 changes: 34 additions & 18 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,31 +188,23 @@ func newInserter[N int64 | float64](p *pipeline) *inserter[N] {
// Instrument inserts instrument inst with instUnit returning the Aggregators
// that need to be updated with measurments for that instrument.
func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
var matched bool
seen := map[instrumentID]struct{}{}
var aggs []internal.Aggregator[N]
errs := &multierror{wrapped: errCreatingAggregators}
for _, v := range i.pipeline.views {
inst, match := v.TransformInstrument(inst)
if !match {
continue
}
matched = true

id := instrumentID{
scope: inst.Scope,
name: inst.Name,
description: inst.Description,
}

if _, ok := seen[id]; ok || !match {
continue
}

if inst.Aggregation == nil {
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
} else if _, ok := inst.Aggregation.(aggregation.Default); ok {
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
}

if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err)
errs.append(err)
if _, ok := seen[id]; ok {
continue
}

Expand All @@ -233,16 +225,40 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in
errs.append(err)
}
}
// TODO(#3224): handle when no views match. Default should be reader
// aggregation returned.

if !matched { // Apply implicit default view if no explicit matched.
a, err := i.aggregator(inst)
if err != nil {
errs.append(err)
}
if a != nil {
aggs = append(aggs, a)
err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a)
if err != nil {
errs.append(err)
}
}
}

return aggs, errs.errorOrNil()
}

// aggregator returns the Aggregator for an instrument configuration. If the
// instrument defines an unknown aggregation, an error is returned.
func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) {
// TODO (#3011): If filtering is done by the Aggregator it should be passed
// here.
switch inst.Aggregation.(type) {
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind)
}

if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
return nil, fmt.Errorf(
"creating aggregator with instrumentKind: %d, aggregation %v: %w",
inst.Kind, inst.Aggregation, err,
)
}

var (
temporality = i.pipeline.reader.temporality(inst.Kind)
monotonic bool
Expand Down
61 changes: 61 additions & 0 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -211,3 +214,61 @@ func TestPipelineConcurrency(t *testing.T) {
}
wg.Wait()
}

func TestDefaultViewImplicit(t *testing.T) {
t.Run("Int64", testDefaultViewImplicit[int64]())
t.Run("Float64", testDefaultViewImplicit[float64]())
}

func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
inst := view.Instrument{
Scope: instrumentation.Scope{Name: "testing/lib"},
Name: "requests",
Description: "count of requests received",
Kind: view.SyncCounter,
Aggregation: aggregation.Sum{},
}
return func(t *testing.T) {
reader := NewManualReader()
v, err := view.New(view.MatchInstrumentName("foo"), view.WithRename("bar"))
require.NoError(t, err)

tests := []struct {
name string
pipe *pipeline
}{
{
name: "NoView",
pipe: newPipeline(nil, reader, nil),
},
{
name: "NoMatchingView",
pipe: newPipeline(nil, reader, []view.View{v}),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
i := newInserter[N](test.pipe)
got, err := i.Instrument(inst, unit.Dimensionless)
require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied")

out, err := test.pipe.produce(context.Background())
require.NoError(t, err)
require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline")
sm := out.ScopeMetrics[0]
require.Len(t, sm.Metrics, 1, "metrics not produced from default view")
metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: inst.Name,
Description: inst.Description,
Unit: unit.Dimensionless,
Data: metricdata.Sum[N]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
}, sm.Metrics[0], metricdatatest.IgnoreTimestamp())
})
}
}
}

0 comments on commit c5ebbc4

Please sign in to comment.