Skip to content

Commit

Permalink
prototype for the advice api
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Jul 19, 2023
1 parent e08359f commit 9915c30
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 31 deletions.
24 changes: 24 additions & 0 deletions metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ type InstrumentOption interface {
Float64ObservableGaugeOption
}

// HistogramOption applies options to histogram instruments.
// This assumes we only want to allow setting explicit bucket boundaries for histograms instruments.
type HistogramOption interface {
Int64HistogramOption
Float64HistogramOption
}

type descOpt string

func (o descOpt) applyFloat64Counter(c Float64CounterConfig) Float64CounterConfig {
Expand Down Expand Up @@ -169,6 +176,23 @@ func (o unitOpt) applyInt64ObservableGauge(c Int64ObservableGaugeConfig) Int64Ob
// WithUnit sets the instrument unit.
func WithUnit(u string) InstrumentOption { return unitOpt(u) }

// WithExplicitBucketBoundaries sets the instrument explicit bucket boundaries.
// Note: We could call this something closer to the spec (e.g. WithAdvice(advice)), but I used
// this to keep the prototype simpler.
func WithExplicitBucketBoundaries(bounds []float64) HistogramOption { return bucketOpt(bounds) }

type bucketOpt []float64

func (o bucketOpt) applyFloat64Histogram(c Float64HistogramConfig) Float64HistogramConfig {
c.explicitBucketBoundaries = o
return c
}

func (o bucketOpt) applyInt64Histogram(c Int64HistogramConfig) Int64HistogramConfig {
c.explicitBucketBoundaries = o
return c
}

// AddOption applies options to an addition measurement. See
// [MeasurementOption] for other options that can be used as an AddOption.
type AddOption interface {
Expand Down
10 changes: 8 additions & 2 deletions metric/syncfloat64.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ type Float64Histogram interface {
// Float64HistogramConfig contains options for synchronous counter instruments
// that record int64 values.
type Float64HistogramConfig struct {
description string
unit string
description string
unit string
explicitBucketBoundaries []float64
}

// NewFloat64HistogramConfig returns a new [Float64HistogramConfig] with all
Expand All @@ -171,6 +172,11 @@ func (c Float64HistogramConfig) Unit() string {
return c.unit
}

// ExplicitBucketBoundaries returns the configured explicit bucket boundaries.
func (c Float64HistogramConfig) ExplicitBucketBoundaries() []float64 {
return c.explicitBucketBoundaries
}

// Float64HistogramOption applies options to a [Float64HistogramConfig]. See
// [InstrumentOption] for other options that can be used as a
// Float64HistogramOption.
Expand Down
10 changes: 8 additions & 2 deletions metric/syncint64.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ type Int64Histogram interface {
// Int64HistogramConfig contains options for synchronous counter instruments
// that record int64 values.
type Int64HistogramConfig struct {
description string
unit string
description string
unit string
explicitBucketBoundaries []float64
}

// NewInt64HistogramConfig returns a new [Int64HistogramConfig] with all opts
Expand All @@ -171,6 +172,11 @@ func (c Int64HistogramConfig) Unit() string {
return c.unit
}

// ExplicitBucketBoundaries returns the configured explicit bucket boundaries.
func (c Int64HistogramConfig) ExplicitBucketBoundaries() []float64 {
return c.explicitBucketBoundaries
}

// Int64HistogramOption applies options to a [Int64HistogramConfig]. See
// [InstrumentOption] for other options that can be used as an
// Int64HistogramOption.
Expand Down
9 changes: 9 additions & 0 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ type Instrument struct {
// Scope identifies the instrumentation that created the instrument.
Scope instrumentation.Scope

// bucketBoundaries are the configured boundaries for histogram instruments.
// Make this non-exported because I don't think we want users setting this
// in a view.
bucketBoundaries []float64

// Ensure forward compatibility if non-comparable fields need to be added.
nonComparable // nolint: unused
}
Expand All @@ -91,6 +96,8 @@ func (i Instrument) empty() bool {
i.Kind == zeroInstrumentKind &&
i.Unit == "" &&
i.Scope == zeroScope
// This is only used for views, so no need to take bucket boundaries
// into account here.
}

// matches returns whether all the non-zero-value fields of i match the
Expand All @@ -102,6 +109,8 @@ func (i Instrument) matches(other Instrument) bool {
i.matchesKind(other) &&
i.matchesUnit(other) &&
i.matchesScope(other)
// Note: Don't require matching the bucket boundaries, as that isn't something
// that users can set with a view.
}

// matchesName returns true if the Name of i is "" or it equals the Name of
Expand Down
50 changes: 26 additions & 24 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption)
cfg := metric.NewInt64CounterConfig(options...)
const kind = InstrumentKindCounter
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil)
if err != nil {
return i, err
}
Expand All @@ -84,7 +84,7 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
cfg := metric.NewInt64UpDownCounterConfig(options...)
const kind = InstrumentKindUpDownCounter
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil)
if err != nil {
return i, err
}
Expand All @@ -99,7 +99,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
cfg := metric.NewInt64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), cfg.ExplicitBucketBoundaries())
if err != nil {
return i, err
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti
cfg := metric.NewFloat64CounterConfig(options...)
const kind = InstrumentKindCounter
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil)
if err != nil {
return i, err
}
Expand All @@ -177,7 +177,7 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
cfg := metric.NewFloat64UpDownCounterConfig(options...)
const kind = InstrumentKindUpDownCounter
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil)
if err != nil {
return i, err
}
Expand All @@ -192,7 +192,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
cfg := metric.NewFloat64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), cfg.ExplicitBucketBoundaries())
if err != nil {
return i, err
}
Expand Down Expand Up @@ -445,47 +445,49 @@ func (noopRegister) Unregister() error {
// int64InstProvider provides int64 OpenTelemetry instruments.
type int64InstProvider struct{ *meter }

func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string, bounds []float64) ([]aggregate.Measure[int64], error) {
inst := Instrument{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
bucketBoundaries: bounds,
}
return p.int64Resolver.Aggregators(inst)
}

// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string, bounds []float64) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u, bounds)
return &int64Inst{measures: aggs}, err
}

// float64InstProvider provides float64 OpenTelemetry instruments.
type float64InstProvider struct{ *meter }

func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string, bounds []float64) ([]aggregate.Measure[float64], error) {
inst := Instrument{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
bucketBoundaries: bounds,
}
return p.float64Resolver.Aggregators(inst)
}

// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string, bounds []float64) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u, bounds)
return &float64Inst{measures: aggs}, err
}

type int64ObservProvider struct{ *meter }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u)
aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u, nil)
return newInt64Observable(p.meter, kind, name, desc, u, aggs), err
}

Expand Down Expand Up @@ -518,7 +520,7 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
type float64ObservProvider struct{ *meter }

func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u)
aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u, nil)
return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err
}

Expand Down
26 changes: 26 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,32 @@ func TestMeterCreatesInstruments(t *testing.T) {
},
},
},
{
name: "SyncInt64Histogram with bounds through advice",
fn: func(t *testing.T, m metric.Meter) {
gauge, err := m.Int64Histogram("histogram", metric.WithExplicitBucketBoundaries([]float64{0, 1, 2, 3}))
assert.NoError(t, err)

gauge.Record(context.Background(), 7)
},
want: metricdata.Metrics{
Name: "histogram",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.Set{},
Count: 1,
Bounds: []float64{0, 1, 2, 3},
BucketCounts: []uint64{0, 0, 0, 0, 1},
Min: metricdata.NewExtrema[int64](7),
Max: metricdata.NewExtrema[int64](7),
Sum: 7,
},
},
},
},
},
{
name: "SyncFloat64Count",
fn: func(t *testing.T, m metric.Meter) {
Expand Down
11 changes: 8 additions & 3 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
}
matched = true

in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, inst.bucketBoundaries, stream)
if err != nil {
errs.append(err)
}
Expand All @@ -272,7 +272,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
Description: inst.Description,
Unit: inst.Unit,
}
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, inst.bucketBoundaries, stream)
if err != nil {
errs.append(err)
}
Expand Down Expand Up @@ -306,11 +306,16 @@ type aggVal[N int64 | float64] struct {
//
// If the instrument defines an unknown or incompatible aggregation, an error
// is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, bucketBoundaries []float64, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
switch stream.Aggregation.(type) {
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
stream.Aggregation = i.pipeline.reader.aggregation(kind)
// override bucket boundaries with the instrument-level default boundaries
if a, ok := stream.Aggregation.(aggregation.ExplicitBucketHistogram); ok && len(bucketBoundaries) > 0 {
a.Boundaries = bucketBoundaries
stream.Aggregation = a
}
}

if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
Expand Down

0 comments on commit 9915c30

Please sign in to comment.