Skip to content

Commit

Permalink
Adds a pipelineRegistry to manage creating aggregators. (#3044)
Browse files Browse the repository at this point in the history
* Adds a pipelineRegistry to manage creating aggregators.

* Made pipeline generic
* Add aggregation filter to the registry.

Co-authored-by: Chester Cheung <[email protected]>
  • Loading branch information
MadVikingGod and hanyuancheung authored Aug 11, 2022
1 parent a8d1f4a commit 258bb2d
Show file tree
Hide file tree
Showing 4 changed files with 781 additions and 4 deletions.
4 changes: 2 additions & 2 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type manualReader struct {
aggregationSelector AggregationSelector
}

// Compile time check the manualReader implements Reader.
var _ Reader = &manualReader{}
// Compile time check the manualReader implements Reader and is comparable.
var _ = map[Reader]struct{}{&manualReader{}: {}}

// NewManualReader returns a Reader which is directly called to collect metrics.
func NewManualReader(opts ...ManualReaderOption) Reader {
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ type periodicReader struct {
shutdownOnce sync.Once
}

// Compile time check the periodicReader implements Reader and is comparable.
var _ = map[Reader]struct{}{&periodicReader{}: {}}

// newTicker allows testing override.
var newTicker = time.NewTicker

Expand Down
202 changes: 200 additions & 2 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"fmt"
"strings"
"sync"

"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -110,8 +115,8 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations))
for scope, instruments := range p.aggregations {
metrics := make([]metricdata.Metrics, 0, len(instruments))
for inst, aggregation := range instruments {
data := aggregation.Aggregation()
for inst, agg := range instruments {
data := agg.Aggregation()
if data != nil {
metrics = append(metrics, metricdata.Metrics{
Name: inst.name,
Expand All @@ -134,3 +139,196 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
ScopeMetrics: sm,
}, nil
}

// pipelineRegistry manages creating pipelines, and aggregators. Meters retrieve
// new Aggregators from a pipelineRegistry.
type pipelineRegistry[N int64 | float64] struct {
views map[Reader][]view.View
pipelines map[Reader]*pipeline
}

func newPipelineRegistries(views map[Reader][]view.View) (*pipelineRegistry[int64], *pipelineRegistry[float64]) {
pipelines := map[Reader]*pipeline{}
for rdr := range views {
pipe := &pipeline{}
rdr.register(pipe)
pipelines[rdr] = pipe
}
return &pipelineRegistry[int64]{
views: views,
pipelines: pipelines,
}, &pipelineRegistry[float64]{
views: views,
pipelines: pipelines,
}
}

// createAggregators will create all backing aggregators for an instrument.
// It will return an error if an instrument is registered more than once.
// Note: There may be returned aggregators with an error.
func (reg *pipelineRegistry[N]) createAggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) {
var aggs []internal.Aggregator[N]

errs := &multierror{}
for rdr, views := range reg.views {
pipe := reg.pipelines[rdr]
rdrAggs, err := createAggregators[N](rdr, views, inst)
if err != nil {
errs.append(err)
}
for inst, agg := range rdrAggs {
err := pipe.addAggregator(inst.scope, inst.name, inst.description, instUnit, agg)
if err != nil {
errs.append(err)
}
aggs = append(aggs, agg)
}
}
return aggs, errs.errorOrNil()
}

// TODO (#3053) Only register callbacks if any instrument matches in a view.
func (reg *pipelineRegistry[N]) registerCallback(fn func(context.Context)) {
for _, pipe := range reg.pipelines {
pipe.addCallback(fn)
}
}

type multierror struct {
wrapped error
errors []string
}

func (m *multierror) errorOrNil() error {
if len(m.errors) == 0 {
return nil
}
return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; "))
}

func (m *multierror) append(err error) {
m.errors = append(m.errors, err.Error())
}

// instrumentID is used to identify multiple instruments being mapped to the same aggregator.
// e.g. using an exact match view with a name=* view.
// You can't use a view.Instrument here because not all Aggregators are comparable.
type instrumentID struct {
scope instrumentation.Scope
name string
description string
}

var errCreatingAggregators = errors.New("could not create all aggregators")

func createAggregators[N int64 | float64](rdr Reader, views []view.View, inst view.Instrument) (map[instrumentID]internal.Aggregator[N], error) {
aggs := map[instrumentID]internal.Aggregator[N]{}
errs := &multierror{
wrapped: errCreatingAggregators,
}
for _, v := range views {
inst, match := v.TransformInstrument(inst)

ident := instrumentID{
scope: inst.Scope,
name: inst.Name,
description: inst.Description,
}

if _, ok := aggs[ident]; ok || !match {
continue
}

if inst.Aggregation == nil {
inst.Aggregation = rdr.aggregation(inst.Kind)
} else if _, ok := inst.Aggregation.(aggregation.Default); ok {
inst.Aggregation = rdr.aggregation(inst.Kind)
}

if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil {
err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err)
errs.append(err)
continue
}

agg := createAggregator[N](inst.Aggregation, rdr.temporality(inst.Kind), isMonotonic(inst.Kind))
if agg != nil {
// TODO (#3011): If filtering is done at the instrument level add here.
// This is where the aggregator and the view are both in scope.
aggs[ident] = agg
}
}
return aggs, errs.errorOrNil()
}

func isMonotonic(kind view.InstrumentKind) bool {
switch kind {
case view.AsyncCounter, view.SyncCounter, view.SyncHistogram:
return true
}
return false
}

// createAggregator takes the config (Aggregation and Temporality) and produces a memory backed Aggregator.
// TODO (#3011): If filterting is done by the Aggregator it should be passed here.
func createAggregator[N int64 | float64](agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) internal.Aggregator[N] {
switch agg := agg.(type) {
case aggregation.Drop:
return nil
case aggregation.LastValue:
return internal.NewLastValue[N]()
case aggregation.Sum:
if temporality == metricdata.CumulativeTemporality {
return internal.NewCumulativeSum[N](monotonic)
}
return internal.NewDeltaSum[N](monotonic)
case aggregation.ExplicitBucketHistogram:
if temporality == metricdata.CumulativeTemporality {
return internal.NewCumulativeHistogram[N](agg)
}
return internal.NewDeltaHistogram[N](agg)
}
return nil
}

// TODO: review need for aggregation check after https://github.com/open-telemetry/opentelemetry-specification/issues/2710
var errIncompatibleAggregation = errors.New("incompatible aggregation")
var errUnknownAggregation = errors.New("unrecognized aggregation")

// is aggregatorCompatible checks if the aggregation can be used by the instrument.
// Current compatibility:
//
// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram |
// |----------------------|------|-----------|-----|-----------|-----------------------|
// | Sync Counter | X | | X | X | X |
// | Sync UpDown Counter | X | | X | | |
// | Sync Histogram | X | | X | X | X |
// | Async Counter | X | | X | | |
// | Async UpDown Counter | X | | X | | |
// | Async Gauge | X | X | | | |.
func isAggregatorCompatible(kind view.InstrumentKind, agg aggregation.Aggregation) error {
switch agg.(type) {
case aggregation.ExplicitBucketHistogram:
if kind == view.SyncCounter || kind == view.SyncHistogram {
return nil
}
return errIncompatibleAggregation
case aggregation.Sum:
switch kind {
case view.AsyncCounter, view.AsyncUpDownCounter, view.SyncCounter, view.SyncHistogram, view.SyncUpDownCounter:
return nil
default:
return errIncompatibleAggregation
}
case aggregation.LastValue:
if kind == view.AsyncGauge {
return nil
}
return errIncompatibleAggregation
case aggregation.Drop:
return nil
default:
// This is used passed checking for default, it should be an error at this point.
return fmt.Errorf("%w: %v", errUnknownAggregation, agg)
}
}
Loading

0 comments on commit 258bb2d

Please sign in to comment.