Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a pipelineRegistry to manage creating aggregators. #3044

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 194 additions & 2 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"fmt"
"strings"
"sync"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -110,8 +116,8 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations))
for scope, instruments := range p.aggregations {
metrics := make([]metricdata.Metrics, 0, len(instruments))
for inst, aggregation := range instruments {
data := aggregation.Aggregation()
for inst, agg := range instruments {
data := agg.Aggregation()
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
if data != nil {
metrics = append(metrics, metricdata.Metrics{
Name: inst.name,
Expand All @@ -134,3 +140,189 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
ScopeMetrics: sm,
}, nil
}

// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve
// new Aggregators from a pipelineRegistry.
type pipelineRegistry[N int64 | float64] struct {
views map[Reader][]view.View
pipelines map[Reader]*pipeline
}

func newPipelineRegistries(views map[Reader][]view.View) (*pipelineRegistry[int64], *pipelineRegistry[float64]) {
pipelines := map[Reader]*pipeline{}
for rdr := range views {
pipe := &pipeline{}
rdr.register(pipe)
pipelines[rdr] = pipe
}
return &pipelineRegistry[int64]{
views: views,
pipelines: pipelines,
}, &pipelineRegistry[float64]{
views: views,
pipelines: pipelines,
}
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}

// createAggregators will create all backing aggregators for an instrument.
// It will return an error if an instrument is registered more than once.
// Note: There may be returned aggregators with an error.
func (reg *pipelineRegistry[N]) createAggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
var aggs []internal.Aggregator[N]

errs := &multierror{}
for rdr, views := range reg.views {
pipe := reg.pipelines[rdr]
rdrAggs, err := createAggregators[N](rdr, views, inst)
if err != nil {
errs.append(err)
}
for inst, agg := range rdrAggs {
err := pipe.addAggregator(inst.scope, inst.name, inst.description, instUnit, agg)
if err != nil {
errs.append(err)
}
aggs = append(aggs, agg)
}
}
return aggs, errs.errorOrNil()
}

// TODO (#3053) Only register callbacks if any instrument matches in a view.
func (reg *pipelineRegistry[N]) registerCallback(fn func(context.Context)) {
for _, pipe := range reg.pipelines {
pipe.addCallback(fn)
}
}

type multierror struct {
errors []string
}

func (m *multierror) errorOrNil() error {
if len(m.errors) == 0 {
return nil
}
return fmt.Errorf(strings.Join(m.errors, "; "))
}
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved

func (m *multierror) append(err error) {
m.errors = append(m.errors, err.Error())
}

// instrumentIdentifier is used to identify multiple instruments being mapped to the same aggregator.
// e.g. using an exact match view with a name=* view.
// You can't use a view.Instrument here because not all Aggregators are hashable.
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
type instrumentIdentifier struct {
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
scope instrumentation.Scope
name string
description string
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
}
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) (map[instrumentIdentifier]internal.Aggregator[N], error) {
aggs := map[instrumentIdentifier]internal.Aggregator[N]{}
errs := &multierror{}
for _, v := range views {
inst, match := v.TransformInstrument(inst)

ident := instrumentIdentifier{
scope: inst.Scope,
name: inst.Name,
description: inst.Description,
}

if _, ok := aggs[ident]; ok || !match {
continue
}

if inst.Aggregation == nil {
inst.Aggregation = rdr.aggregation(inst.Kind)
} else if _, ok := inst.Aggregation.(aggregation.Default); ok {
inst.Aggregation = rdr.aggregation(inst.Kind)
}

if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
global.Error(err, "creating aggregator", "instrumentKind", inst.Kind, "aggregation", inst.Aggregation)
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
errs.append(err)
continue
}

agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind), isMonotonic(inst.Kind))
if agg != nil {
// TODO (#3011): If filtering is done at the instrument level add here.
// This is where the aggregator and the view are both in scope.
aggs[ident] = agg
}
}
return aggs, errs.errorOrNil()
}

func isMonotonic(kind view.InstrumentKind) bool {
switch kind {
case view.AsyncCounter, view.SyncCounter, view.SyncHistogram:
return true
}
return false
}

// createAggregator takes the config (Aggregation and Temporality) and produces a memory backed Aggregator.
// TODO (#3011): If filterting is done by the Aggregator it should be passed here.
func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) internal.Aggregator[N] {
switch agg := agg.(type) {
case aggregation.Drop:
return nil
case aggregation.LastValue:
return internal.NewLastValue[N]()
case aggregation.Sum:
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
if temporality == metricdata.CumulativeTemporality {
return internal.NewCumulativeSum[N](monotonic)
}
return internal.NewDeltaSum[N](monotonic)
case aggregation.ExplicitBucketHistogram:
if temporality == metricdata.CumulativeTemporality {
return internal.NewCumulativeHistogram[N](agg)
}
return internal.NewDeltaHistogram[N](agg)
}
return nil
}

var errIncompatibleAggregation = errors.New("incompatible aggregation")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I filed this tracking issue. open-telemetry/opentelemetry-specification#2710


// is aggregatorCompatible checks if the aggregation can be used by the instrument.
// Current compatibility:
//
// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram |
// |----------------------|------|-----------|-----|-----------|-----------------------|
// | Sync Counter | X | | X | X | X |
// | Sync UpDown Counter | X | | X | | |
// | Sync Histogram | X | | X | X | X |
// | Async Counter | X | | X | | |
// | Async UpDown Counter | X | | X | | |
// | Async Gauge | X | X | | | |.
func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregation) error {
switch agg.(type) {
case aggregation.ExplicitBucketHistogram:
if kind == view.SyncCounter || kind == view.SyncHistogram {
return nil
}
return errIncompatibleAggregation
case aggregation.Sum:
switch kind {
case view.AsyncCounter, view.AsyncUpDownCounter, view.SyncCounter, view.SyncHistogram, view.SyncUpDownCounter:
return nil
default:
return errIncompatibleAggregation
}
case aggregation.LastValue:
if kind == view.AsyncGauge {
return nil
}
return errIncompatibleAggregation
case aggregation.Drop:
return nil
default:
// This is used passed checking for default, it should be an error at this point.
return errIncompatibleAggregation
MadVikingGod marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading