Skip to content

Commit

Permalink
move viewCache back to meter
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Dec 6, 2023
1 parent afc87d4 commit 6923e80
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 105 deletions.
33 changes: 19 additions & 14 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@ var ErrInstrumentName = errors.New("invalid instrument name")
type meter struct {
embedded.Meter

scope instrumentation.Scope
pipes pipelines
scope instrumentation.Scope
pipes pipelines
viewCache *cache[string, instID]
}

func newMeter(s instrumentation.Scope, p pipelines) *meter {
// viewCache ensures instrument conflicts, including number conflicts, this
// meter is asked to create are logged to the user.
var viewCache cache[string, instID]
return &meter{
scope: s,
pipes: p,
scope: s,
pipes: p,
viewCache: &viewCache,
}
}

Expand Down Expand Up @@ -116,7 +121,7 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
for _, pipe := range p.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := pipe.int64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind))
in, err := pipe.int64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind), m.viewCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,7 +157,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
for _, pipe := range p.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := pipe.int64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind))
in, err := pipe.int64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind), m.viewCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -188,7 +193,7 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa
for _, pipe := range p.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := pipe.int64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind))
in, err := pipe.int64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind), m.viewCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -268,7 +273,7 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
for _, pipe := range p.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := pipe.float64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind))
in, err := pipe.float64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind), m.viewCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -304,7 +309,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
for _, pipe := range p.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := pipe.float64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind))
in, err := pipe.float64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind), m.viewCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -340,7 +345,7 @@ func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64Obs
for _, pipe := range p.pipes {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := pipe.float64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind))
in, err := pipe.float64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind), m.viewCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -562,7 +567,7 @@ func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]ag
Kind: kind,
Scope: p.scope,
}
return p.pipes.Int64Aggregators(inst)
return p.pipes.Int64Aggregators(inst, p.meter.viewCache)
}

func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramConfig) ([]aggregate.Measure[int64], error) {
Expand All @@ -579,7 +584,7 @@ func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramC
Kind: InstrumentKindHistogram,
Scope: p.scope,
}
measures, err := p.pipes.Int64HistogramAggregators(inst, boundaries)
measures, err := p.pipes.Int64HistogramAggregators(inst, boundaries, p.meter.viewCache)
return measures, errors.Join(aggError, err)
}

Expand All @@ -606,7 +611,7 @@ func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]
Kind: kind,
Scope: p.scope,
}
return p.pipes.Float64Aggregators(inst)
return p.pipes.Float64Aggregators(inst, p.meter.viewCache)
}

func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64HistogramConfig) ([]aggregate.Measure[float64], error) {
Expand All @@ -623,7 +628,7 @@ func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64Histog
Kind: InstrumentKindHistogram,
Scope: p.scope,
}
measures, err := p.pipes.Float64HistogramAggregators(inst, boundaries)
measures, err := p.pipes.Float64HistogramAggregators(inst, boundaries, p.meter.viewCache)
return measures, errors.Join(aggError, err)
}

Expand Down
53 changes: 21 additions & 32 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type instrumentSync struct {
compAgg aggregate.ComputeAggregation
}

func newPipeline(res *resource.Resource, reader Reader, views []View, vc *cache[string, instID]) *pipeline {
func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline {
if res == nil {
res = resource.Empty()
}
Expand All @@ -58,8 +58,8 @@ func newPipeline(res *resource.Resource, reader Reader, views []View, vc *cache[
views: views,
// aggregations is lazy allocated when needed.
}
p.int64Inserter = newInserter[int64](p, vc)
p.float64Inserter = newInserter[float64](p, vc)
p.int64Inserter = newInserter[int64](p)
p.float64Inserter = newInserter[float64](p)
return p
}

Expand Down Expand Up @@ -194,23 +194,12 @@ type inserter[N int64 | float64] struct {
// for the same aggregate function input the same instance is returned.
aggregators *cache[instID, aggVal[N]]

// views is a cache that holds instrument identifiers for all the
// instruments a Meter has created, it is provided from the Meter that owns
// this inserter. This cache ensures during the creation of instruments
// with the same name but different options (e.g. description, unit) a
// warning message is logged.
views *cache[string, instID]

pipeline *pipeline
}

func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] {
if vc == nil {
vc = &cache[string, instID]{}
}
func newInserter[N int64 | float64](p *pipeline) *inserter[N] {
return &inserter[N]{
aggregators: &cache[instID, aggVal[N]]{},
views: vc,
pipeline: p,
}
}
Expand All @@ -237,7 +226,7 @@ func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *ins
//
// If an instrument is determined to use a Drop aggregation, that instrument is
// not inserted nor returned.
func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) (measures[N], error) {
func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation, vc *cache[string, instID]) (measures[N], error) {
var (
matched bool
measures []aggregate.Measure[N]
Expand All @@ -251,7 +240,7 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
continue
}
matched = true
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation, vc)
if err != nil {
errs.append(err)
}
Expand All @@ -276,7 +265,7 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
Description: inst.Description,
Unit: inst.Unit,
}
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation, vc)
if err != nil {
errs.append(err)
}
Expand Down Expand Up @@ -335,7 +324,7 @@ func (p *pipeline) readerDefaultAggregation(kind InstrumentKind) Aggregation {
//
// If the instrument defines an unknown or incompatible aggregation, an error
// is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream, readerAggregation Aggregation) (meas aggregate.Measure[N], aggID uint64, err error) {
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream, readerAggregation Aggregation, vc *cache[string, instID]) (meas aggregate.Measure[N], aggID uint64, err error) {
switch stream.Aggregation.(type) {
case nil:
// The aggregation was not overridden with a view. Use the aggregation
Expand All @@ -356,7 +345,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
id := i.instID(kind, stream)
// If there is a conflict, the specification says the view should
// still be applied and a warning should be logged.
i.logConflict(id)
i.logConflict(id, vc)

// If there are requests for the same instrument with different name
// casing, the first-seen needs to be returned. Use a normalize ID for the
Expand Down Expand Up @@ -391,11 +380,11 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// logConflict validates if an instrument with the same case-insensitive name
// as id has already been created. If that instrument conflicts with id, a
// warning is logged.
func (i *inserter[N]) logConflict(id instID) {
func (i *inserter[N]) logConflict(id instID, vc *cache[string, instID]) {
// The API specification defines names as case-insensitive. If there is a
// different casing of a name it needs to be a conflict.
name := id.normalize().Name
existing := i.views.Lookup(name, func() instID { return id })
existing := vc.Lookup(name, func() instID { return id })
if id == existing {
return
}
Expand Down Expand Up @@ -553,10 +542,10 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
// measurement.
type pipelines []*pipeline

func newPipelines(res *resource.Resource, readers []Reader, views []View, vc *cache[string, instID]) pipelines {
func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for _, r := range readers {
p := newPipeline(res, r, views, vc)
p := newPipeline(res, r, views)
r.register(p)
pipes = append(pipes, p)
}
Expand Down Expand Up @@ -591,12 +580,12 @@ func (u unregisterFuncs) Unregister() error {

// Int64Aggregators returns the Aggregators that must be updated by the instrument
// defined by key.
func (p pipelines) Int64Aggregators(id Instrument) ([]aggregate.Measure[int64], error) {
func (p pipelines) Int64Aggregators(id Instrument, vc *cache[string, instID]) ([]aggregate.Measure[int64], error) {
var measures []aggregate.Measure[int64]

errs := &multierror{}
for _, pipe := range p {
in, err := pipe.int64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind))
in, err := pipe.int64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind), vc)
if err != nil {
errs.append(err)
}
Expand All @@ -607,12 +596,12 @@ func (p pipelines) Int64Aggregators(id Instrument) ([]aggregate.Measure[int64],

// Float64Aggregators returns the Aggregators that must be updated by the instrument
// defined by key.
func (p pipelines) Float64Aggregators(id Instrument) ([]aggregate.Measure[float64], error) {
func (p pipelines) Float64Aggregators(id Instrument, vc *cache[string, instID]) ([]aggregate.Measure[float64], error) {
var measures []aggregate.Measure[float64]

errs := &multierror{}
for _, pipe := range p {
in, err := pipe.float64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind))
in, err := pipe.float64Inserter.Instrument(id, pipe.readerDefaultAggregation(id.Kind), vc)
if err != nil {
errs.append(err)
}
Expand All @@ -624,7 +613,7 @@ func (p pipelines) Float64Aggregators(id Instrument) ([]aggregate.Measure[float6
// Int64HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
// defined by key. If boundaries were provided on instrument instantiation, those take precedence
// over boundaries provided by the reader.
func (p pipelines) Int64HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[int64], error) {
func (p pipelines) Int64HistogramAggregators(id Instrument, boundaries []float64, vc *cache[string, instID]) ([]aggregate.Measure[int64], error) {
var measures []aggregate.Measure[int64]

errs := &multierror{}
Expand All @@ -634,7 +623,7 @@ func (p pipelines) Int64HistogramAggregators(id Instrument, boundaries []float64
histAgg.Boundaries = boundaries
agg = histAgg
}
in, err := pipe.int64Inserter.Instrument(id, agg)
in, err := pipe.int64Inserter.Instrument(id, agg, vc)
if err != nil {
errs.append(err)
}
Expand All @@ -646,7 +635,7 @@ func (p pipelines) Int64HistogramAggregators(id Instrument, boundaries []float64
// Float64HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
// defined by key. If boundaries were provided on instrument instantiation, those take precedence
// over boundaries provided by the reader.
func (p pipelines) Float64HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[float64], error) {
func (p pipelines) Float64HistogramAggregators(id Instrument, boundaries []float64, vc *cache[string, instID]) ([]aggregate.Measure[float64], error) {
var measures []aggregate.Measure[float64]

errs := &multierror{}
Expand All @@ -656,7 +645,7 @@ func (p pipelines) Float64HistogramAggregators(id Instrument, boundaries []float
histAgg.Boundaries = boundaries
agg = histAgg
}
in, err := pipe.float64Inserter.Instrument(id, agg)
in, err := pipe.float64Inserter.Instrument(id, agg, vc)
if err != nil {
errs.append(err)
}
Expand Down
Loading

0 comments on commit 6923e80

Please sign in to comment.