diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 0bd52d630236..9eb86f593acd 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -188,31 +188,23 @@ func newInserter[N int64 | float64](p *pipeline) *inserter[N] { // Instrument inserts instrument inst with instUnit returning the Aggregators // that need to be updated with measurments for that instrument. func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { + var matched bool seen := map[instrumentID]struct{}{} var aggs []internal.Aggregator[N] errs := &multierror{wrapped: errCreatingAggregators} for _, v := range i.pipeline.views { inst, match := v.TransformInstrument(inst) + if !match { + continue + } + matched = true id := instrumentID{ scope: inst.Scope, name: inst.Name, description: inst.Description, } - - if _, ok := seen[id]; ok || !match { - continue - } - - if inst.Aggregation == nil { - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) - } else if _, ok := inst.Aggregation.(aggregation.Default); ok { - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) - } - - if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { - err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err) - errs.append(err) + if _, ok := seen[id]; ok { continue } @@ -233,16 +225,40 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in errs.append(err) } } - // TODO(#3224): handle when no views match. Default should be reader - // aggregation returned. + + if !matched { // Apply implicit default view if no explicit matched. + a, err := i.aggregator(inst) + if err != nil { + errs.append(err) + } + if a != nil { + aggs = append(aggs, a) + err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a) + if err != nil { + errs.append(err) + } + } + } + return aggs, errs.errorOrNil() } // aggregator returns the Aggregator for an instrument configuration. If the // instrument defines an unknown aggregation, an error is returned. func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { - // TODO (#3011): If filtering is done by the Aggregator it should be passed - // here. + switch inst.Aggregation.(type) { + case nil, aggregation.Default: + // Undefined, nil, means to use the default from the reader. + inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) + } + + if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { + return nil, fmt.Errorf( + "creating aggregator with instrumentKind: %d, aggregation %v: %w", + inst.Kind, inst.Aggregation, err, + ) + } + var ( temporality = i.pipeline.reader.temporality(inst.Kind) monotonic bool