From 9915c30c77455c5e8516cfa539f1e3cc5c1a14b0 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 19 Jul 2023 18:03:22 +0000 Subject: [PATCH] prototype for the advice api --- metric/instrument.go | 24 +++++++++++++++++++ metric/syncfloat64.go | 10 ++++++-- metric/syncint64.go | 10 ++++++-- sdk/metric/instrument.go | 9 ++++++++ sdk/metric/meter.go | 50 +++++++++++++++++++++------------------- sdk/metric/meter_test.go | 26 +++++++++++++++++++++ sdk/metric/pipeline.go | 11 ++++++--- 7 files changed, 109 insertions(+), 31 deletions(-) diff --git a/metric/instrument.go b/metric/instrument.go index 0033c1e12d5..32ce0e7b2cc 100644 --- a/metric/instrument.go +++ b/metric/instrument.go @@ -39,6 +39,13 @@ type InstrumentOption interface { Float64ObservableGaugeOption } +// HistogramOption applies options to histogram instruments. +// This assumes we only want to allow setting explicit bucket boundaries for histograms instruments. +type HistogramOption interface { + Int64HistogramOption + Float64HistogramOption +} + type descOpt string func (o descOpt) applyFloat64Counter(c Float64CounterConfig) Float64CounterConfig { @@ -169,6 +176,23 @@ func (o unitOpt) applyInt64ObservableGauge(c Int64ObservableGaugeConfig) Int64Ob // WithUnit sets the instrument unit. func WithUnit(u string) InstrumentOption { return unitOpt(u) } +// WithExplicitBucketBoundaries sets the instrument explicit bucket boundaries. +// Note: We could call this something closer to the spec (e.g. WithAdvice(advice)), but I used +// this to keep the prototype simpler. +func WithExplicitBucketBoundaries(bounds []float64) HistogramOption { return bucketOpt(bounds) } + +type bucketOpt []float64 + +func (o bucketOpt) applyFloat64Histogram(c Float64HistogramConfig) Float64HistogramConfig { + c.explicitBucketBoundaries = o + return c +} + +func (o bucketOpt) applyInt64Histogram(c Int64HistogramConfig) Int64HistogramConfig { + c.explicitBucketBoundaries = o + return c +} + // AddOption applies options to an addition measurement. See // [MeasurementOption] for other options that can be used as an AddOption. type AddOption interface { diff --git a/metric/syncfloat64.go b/metric/syncfloat64.go index f0b063721d8..0a4825ae6a7 100644 --- a/metric/syncfloat64.go +++ b/metric/syncfloat64.go @@ -147,8 +147,9 @@ type Float64Histogram interface { // Float64HistogramConfig contains options for synchronous counter instruments // that record int64 values. type Float64HistogramConfig struct { - description string - unit string + description string + unit string + explicitBucketBoundaries []float64 } // NewFloat64HistogramConfig returns a new [Float64HistogramConfig] with all @@ -171,6 +172,11 @@ func (c Float64HistogramConfig) Unit() string { return c.unit } +// ExplicitBucketBoundaries returns the configured explicit bucket boundaries. +func (c Float64HistogramConfig) ExplicitBucketBoundaries() []float64 { + return c.explicitBucketBoundaries +} + // Float64HistogramOption applies options to a [Float64HistogramConfig]. See // [InstrumentOption] for other options that can be used as a // Float64HistogramOption. diff --git a/metric/syncint64.go b/metric/syncint64.go index 6f508eb66d4..56667d32fc0 100644 --- a/metric/syncint64.go +++ b/metric/syncint64.go @@ -147,8 +147,9 @@ type Int64Histogram interface { // Int64HistogramConfig contains options for synchronous counter instruments // that record int64 values. type Int64HistogramConfig struct { - description string - unit string + description string + unit string + explicitBucketBoundaries []float64 } // NewInt64HistogramConfig returns a new [Int64HistogramConfig] with all opts @@ -171,6 +172,11 @@ func (c Int64HistogramConfig) Unit() string { return c.unit } +// ExplicitBucketBoundaries returns the configured explicit bucket boundaries. +func (c Int64HistogramConfig) ExplicitBucketBoundaries() []float64 { + return c.explicitBucketBoundaries +} + // Int64HistogramOption applies options to a [Int64HistogramConfig]. See // [InstrumentOption] for other options that can be used as an // Int64HistogramOption. diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index eff2f179a51..256e6f5b57b 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -80,6 +80,11 @@ type Instrument struct { // Scope identifies the instrumentation that created the instrument. Scope instrumentation.Scope + // bucketBoundaries are the configured boundaries for histogram instruments. + // Make this non-exported because I don't think we want users setting this + // in a view. + bucketBoundaries []float64 + // Ensure forward compatibility if non-comparable fields need to be added. nonComparable // nolint: unused } @@ -91,6 +96,8 @@ func (i Instrument) empty() bool { i.Kind == zeroInstrumentKind && i.Unit == "" && i.Scope == zeroScope + // This is only used for views, so no need to take bucket boundaries + // into account here. } // matches returns whether all the non-zero-value fields of i match the @@ -102,6 +109,8 @@ func (i Instrument) matches(other Instrument) bool { i.matchesKind(other) && i.matchesUnit(other) && i.matchesScope(other) + // Note: Don't require matching the bucket boundaries, as that isn't something + // that users can set with a view. } // matchesName returns true if the Name of i is "" or it equals the Name of diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index caed7387c0a..ee9eeeee080 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -69,7 +69,7 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) cfg := metric.NewInt64CounterConfig(options...) const kind = InstrumentKindCounter p := int64InstProvider{m} - i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil) if err != nil { return i, err } @@ -84,7 +84,7 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou cfg := metric.NewInt64UpDownCounterConfig(options...) const kind = InstrumentKindUpDownCounter p := int64InstProvider{m} - i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil) if err != nil { return i, err } @@ -99,7 +99,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti cfg := metric.NewInt64HistogramConfig(options...) const kind = InstrumentKindHistogram p := int64InstProvider{m} - i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), cfg.ExplicitBucketBoundaries()) if err != nil { return i, err } @@ -162,7 +162,7 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti cfg := metric.NewFloat64CounterConfig(options...) const kind = InstrumentKindCounter p := float64InstProvider{m} - i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil) if err != nil { return i, err } @@ -177,7 +177,7 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow cfg := metric.NewFloat64UpDownCounterConfig(options...) const kind = InstrumentKindUpDownCounter p := float64InstProvider{m} - i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), nil) if err != nil { return i, err } @@ -192,7 +192,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram cfg := metric.NewFloat64HistogramConfig(options...) const kind = InstrumentKindHistogram p := float64InstProvider{m} - i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit(), cfg.ExplicitBucketBoundaries()) if err != nil { return i, err } @@ -445,47 +445,49 @@ func (noopRegister) Unregister() error { // int64InstProvider provides int64 OpenTelemetry instruments. type int64InstProvider struct{ *meter } -func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) { +func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string, bounds []float64) ([]aggregate.Measure[int64], error) { inst := Instrument{ - Name: name, - Description: desc, - Unit: u, - Kind: kind, - Scope: p.scope, + Name: name, + Description: desc, + Unit: u, + Kind: kind, + Scope: p.scope, + bucketBoundaries: bounds, } return p.int64Resolver.Aggregators(inst) } // lookup returns the resolved instrumentImpl. -func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { - aggs, err := p.aggs(kind, name, desc, u) +func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string, bounds []float64) (*int64Inst, error) { + aggs, err := p.aggs(kind, name, desc, u, bounds) return &int64Inst{measures: aggs}, err } // float64InstProvider provides float64 OpenTelemetry instruments. type float64InstProvider struct{ *meter } -func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) { +func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string, bounds []float64) ([]aggregate.Measure[float64], error) { inst := Instrument{ - Name: name, - Description: desc, - Unit: u, - Kind: kind, - Scope: p.scope, + Name: name, + Description: desc, + Unit: u, + Kind: kind, + Scope: p.scope, + bucketBoundaries: bounds, } return p.float64Resolver.Aggregators(inst) } // lookup returns the resolved instrumentImpl. -func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { - aggs, err := p.aggs(kind, name, desc, u) +func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string, bounds []float64) (*float64Inst, error) { + aggs, err := p.aggs(kind, name, desc, u, bounds) return &float64Inst{measures: aggs}, err } type int64ObservProvider struct{ *meter } func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) { - aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u) + aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u, nil) return newInt64Observable(p.meter, kind, name, desc, u, aggs), err } @@ -518,7 +520,7 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) { type float64ObservProvider struct{ *meter } func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) { - aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u) + aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u, nil) return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err } diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 8657ccc7135..6d284079522 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -397,6 +397,32 @@ func TestMeterCreatesInstruments(t *testing.T) { }, }, }, + { + name: "SyncInt64Histogram with bounds through advice", + fn: func(t *testing.T, m metric.Meter) { + gauge, err := m.Int64Histogram("histogram", metric.WithExplicitBucketBoundaries([]float64{0, 1, 2, 3})) + assert.NoError(t, err) + + gauge.Record(context.Background(), 7) + }, + want: metricdata.Metrics{ + Name: "histogram", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.Set{}, + Count: 1, + Bounds: []float64{0, 1, 2, 3}, + BucketCounts: []uint64{0, 0, 0, 0, 1}, + Min: metricdata.NewExtrema[int64](7), + Max: metricdata.NewExtrema[int64](7), + Sum: 7, + }, + }, + }, + }, + }, { name: "SyncFloat64Count", fn: func(t *testing.T, m metric.Meter) { diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index ad52aedfe68..4ade817037e 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -247,7 +247,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error } matched = true - in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream) + in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, inst.bucketBoundaries, stream) if err != nil { errs.append(err) } @@ -272,7 +272,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error Description: inst.Description, Unit: inst.Unit, } - in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream) + in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, inst.bucketBoundaries, stream) if err != nil { errs.append(err) } @@ -306,11 +306,16 @@ type aggVal[N int64 | float64] struct { // // If the instrument defines an unknown or incompatible aggregation, an error // is returned. -func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) { +func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, bucketBoundaries []float64, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) { switch stream.Aggregation.(type) { case nil, aggregation.Default: // Undefined, nil, means to use the default from the reader. stream.Aggregation = i.pipeline.reader.aggregation(kind) + // override bucket boundaries with the instrument-level default boundaries + if a, ok := stream.Aggregation.(aggregation.ExplicitBucketHistogram); ok && len(bucketBoundaries) > 0 { + a.Boundaries = bucketBoundaries + stream.Aggregation = a + } } if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {