Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype for the advice api #4341

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ type InstrumentOption interface {
Float64ObservableGaugeOption
}

// HistogramOption applies options to histogram instruments.
// This assumes we only want to allow setting explicit bucket boundaries for histograms instruments.
type HistogramOption interface {
Int64HistogramOption
Float64HistogramOption
}

type descOpt string

func (o descOpt) applyFloat64Counter(c Float64CounterConfig) Float64CounterConfig {
Expand Down Expand Up @@ -169,6 +176,23 @@ func (o unitOpt) applyInt64ObservableGauge(c Int64ObservableGaugeConfig) Int64Ob
// WithUnit sets the instrument unit.
func WithUnit(u string) InstrumentOption { return unitOpt(u) }

// WithExplicitBucketBoundaries sets the instrument explicit bucket boundaries.
// Note: We could call this something closer to the spec (e.g. WithAdvice(advice)), but I used
// this to keep the prototype simpler.
func WithExplicitBucketBoundaries(bounds []float64) HistogramOption { return bucketOpt(bounds) }

type bucketOpt []float64

func (o bucketOpt) applyFloat64Histogram(c Float64HistogramConfig) Float64HistogramConfig {
c.explicitBucketBoundaries = o
return c
}

func (o bucketOpt) applyInt64Histogram(c Int64HistogramConfig) Int64HistogramConfig {
c.explicitBucketBoundaries = o
return c
}

// AddOption applies options to an addition measurement. See
// [MeasurementOption] for other options that can be used as an AddOption.
type AddOption interface {
Expand Down
10 changes: 8 additions & 2 deletions metric/syncfloat64.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ type Float64Histogram interface {
// Float64HistogramConfig contains options for synchronous counter instruments
// that record int64 values.
type Float64HistogramConfig struct {
description string
unit string
description string
unit string
explicitBucketBoundaries []float64
}

// NewFloat64HistogramConfig returns a new [Float64HistogramConfig] with all
Expand All @@ -171,6 +172,11 @@ func (c Float64HistogramConfig) Unit() string {
return c.unit
}

// ExplicitBucketBoundaries returns the configured explicit bucket boundaries.
func (c Float64HistogramConfig) ExplicitBucketBoundaries() []float64 {
return c.explicitBucketBoundaries
}

// Float64HistogramOption applies options to a [Float64HistogramConfig]. See
// [InstrumentOption] for other options that can be used as a
// Float64HistogramOption.
Expand Down
10 changes: 8 additions & 2 deletions metric/syncint64.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ type Int64Histogram interface {
// Int64HistogramConfig contains options for synchronous counter instruments
// that record int64 values.
type Int64HistogramConfig struct {
description string
unit string
description string
unit string
explicitBucketBoundaries []float64
}

// NewInt64HistogramConfig returns a new [Int64HistogramConfig] with all opts
Expand All @@ -171,6 +172,11 @@ func (c Int64HistogramConfig) Unit() string {
return c.unit
}

// ExplicitBucketBoundaries returns the configured explicit bucket boundaries.
func (c Int64HistogramConfig) ExplicitBucketBoundaries() []float64 {
return c.explicitBucketBoundaries
}

// Int64HistogramOption applies options to a [Int64HistogramConfig]. See
// [InstrumentOption] for other options that can be used as an
// Int64HistogramOption.
Expand Down
9 changes: 9 additions & 0 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ type Instrument struct {
// Scope identifies the instrumentation that created the instrument.
Scope instrumentation.Scope

// bucketBoundaries are the configured boundaries for histogram instruments.
// Make this non-exported because I don't think we want users setting this
// in a view.
bucketBoundaries []float64

// Ensure forward compatibility if non-comparable fields need to be added.
nonComparable // nolint: unused
}
Expand All @@ -91,6 +96,8 @@ func (i Instrument) empty() bool {
i.Kind == zeroInstrumentKind &&
i.Unit == "" &&
i.Scope == zeroScope
// This is only used for views, so no need to take bucket boundaries
// into account here.
}

// matches returns whether all the non-zero-value fields of i match the
Expand All @@ -102,6 +109,8 @@ func (i Instrument) matches(other Instrument) bool {
i.matchesKind(other) &&
i.matchesUnit(other) &&
i.matchesScope(other)
// Note: Don't require matching the bucket boundaries, as that isn't something
// that users can set with a view.
}

// matchesName returns true if the Name of i is "" or it equals the Name of
Expand Down
50 changes: 26 additions & 24 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption)
cfg := metric.NewInt64CounterConfig(options...)
const kind = InstrumentKindCounter
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil)
if err != nil {
return i, err
}
Expand All @@ -84,7 +84,7 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
cfg := metric.NewInt64UpDownCounterConfig(options...)
const kind = InstrumentKindUpDownCounter
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil)
if err != nil {
return i, err
}
Expand All @@ -99,7 +99,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
cfg := metric.NewInt64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), cfg.ExplicitBucketBoundaries())
if err != nil {
return i, err
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti
cfg := metric.NewFloat64CounterConfig(options...)
const kind = InstrumentKindCounter
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil)
if err != nil {
return i, err
}
Expand All @@ -177,7 +177,7 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
cfg := metric.NewFloat64UpDownCounterConfig(options...)
const kind = InstrumentKindUpDownCounter
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil)
if err != nil {
return i, err
}
Expand All @@ -192,7 +192,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
cfg := metric.NewFloat64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), cfg.ExplicitBucketBoundaries())
if err != nil {
return i, err
}
Expand Down Expand Up @@ -445,47 +445,49 @@ func (noopRegister) Unregister() error {
// int64InstProvider provides int64 OpenTelemetry instruments.
type int64InstProvider struct{ *meter }

func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string, bounds []float64) ([]aggregate.Measure[int64], error) {
inst := Instrument{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
bucketBoundaries: bounds,
}
return p.int64Resolver.Aggregators(inst)
}

// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string, bounds []float64) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u, bounds)
return &int64Inst{measures: aggs}, err
}

// float64InstProvider provides float64 OpenTelemetry instruments.
type float64InstProvider struct{ *meter }

func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string, bounds []float64) ([]aggregate.Measure[float64], error) {
inst := Instrument{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
bucketBoundaries: bounds,
}
return p.float64Resolver.Aggregators(inst)
}

// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string, bounds []float64) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u, bounds)
return &float64Inst{measures: aggs}, err
}

type int64ObservProvider struct{ *meter }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u)
aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u, nil)
return newInt64Observable(p.meter, kind, name, desc, u, aggs), err
}

Expand Down Expand Up @@ -518,7 +520,7 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
type float64ObservProvider struct{ *meter }

func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u)
aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u, nil)
return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err
}

Expand Down
26 changes: 26 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,32 @@ func TestMeterCreatesInstruments(t *testing.T) {
},
},
},
{
name: "SyncInt64Histogram with bounds through advice",
fn: func(t *testing.T, m metric.Meter) {
gauge, err := m.Int64Histogram("histogram", metric.WithExplicitBucketBoundaries([]float64{0, 1, 2, 3}))
assert.NoError(t, err)

gauge.Record(context.Background(), 7)
},
want: metricdata.Metrics{
Name: "histogram",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.Set{},
Count: 1,
Bounds: []float64{0, 1, 2, 3},
BucketCounts: []uint64{0, 0, 0, 0, 1},
Min: metricdata.NewExtrema[int64](7),
Max: metricdata.NewExtrema[int64](7),
Sum: 7,
},
},
},
},
},
{
name: "SyncFloat64Count",
fn: func(t *testing.T, m metric.Meter) {
Expand Down
11 changes: 8 additions & 3 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
}
matched = true

in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, inst.bucketBoundaries, stream)
if err != nil {
errs.append(err)
}
Expand All @@ -272,7 +272,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
Description: inst.Description,
Unit: inst.Unit,
}
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, inst.bucketBoundaries, stream)
if err != nil {
errs.append(err)
}
Expand Down Expand Up @@ -306,11 +306,16 @@ type aggVal[N int64 | float64] struct {
//
// If the instrument defines an unknown or incompatible aggregation, an error
// is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, bucketBoundaries []float64, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
switch stream.Aggregation.(type) {
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
stream.Aggregation = i.pipeline.reader.aggregation(kind)
// override bucket boundaries with the instrument-level default boundaries
if a, ok := stream.Aggregation.(aggregation.ExplicitBucketHistogram); ok && len(bucketBoundaries) > 0 {
a.Boundaries = bucketBoundaries
stream.Aggregation = a
}
}

if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
Expand Down