Skip to content

Commit

Permalink
Add selector of exemplar reservoir providers to metric.Stream configu…
Browse files Browse the repository at this point in the history
…ration
  • Loading branch information
dashpole committed Oct 14, 2024
1 parent ba4a2ab commit b295fcc
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 13 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850)
- Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850)
- Add `ExemplarReservoirProviderSelector` and `DefaultExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric`, which defines the exemplar reservoir to use based on the aggregation of the metric. (#5861)
- Add `ExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric.Stream` to allow using views to configure the exemplar reservoir to use for a metric. (#5861)
- Add `ReservoirProvider`, `HistogramReservoirProvider` and `FixedSizeReservoirProvider` to `go.opentelemetry.io/otel/sdk/metric/exemplar` to make it convenient to use providers of Reservoirs. (#5861)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand All @@ -21,7 +24,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `TraceBasedFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747, #5862)
- Add `WithExportBufferSize` option to log batch processor.(#5877)

### Changed

Expand Down
42 changes: 32 additions & 10 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,49 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"runtime"
"slices"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

// ExemplarReservoirProviderSelector selects the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] to use
// based on the [Aggregation] of the metric.
type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and filter configuration.
func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] {
func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
provider := DefaultExemplarReservoirProviderSelector(agg)
return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
}
}

// DefaultExemplarReservoirProviderSelector returns the default
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] for the
// provided [Aggregation].
//
// For explicit bucket histograms with more than 1 bucket, it uses the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.HistogramReservoirProvider].
// For exponential histograms, it uses the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.FixedSizeReservoirProvider]
// with a size of min(20, max_buckets).
// For all other aggregations, it uses the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.FixedSizeReservoirProvider]
// with a size equal to the number of CPUs.
//
// Exemplar default reservoirs MAY change in a minor version bump. No
// guarantees are made on the shape or statistical properties of returned
// exemplars.
func DefaultExemplarReservoirProviderSelector(agg Aggregation) exemplar.ReservoirProvider {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() aggregate.FilteredExemplarReservoir[N] {
bounds := cp
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds))
}
return exemplar.HistogramReservoirProvider(a.Boundaries)
}

var n int
Expand All @@ -50,7 +74,5 @@ func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) f
}
}

return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n))
}
return exemplar.FixedSizeReservoirProvider(n)
}
7 changes: 7 additions & 0 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
func FixedSizeReservoirProvider(k int) ReservoirProvider {
return func() Reservoir {
return NewFixedSizeReservoir(k)
}
}

// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
// k exemplars. If there are k or less measurements made, the Reservoir will
// sample each one. If there are more than k, the Reservoir will then randomly
Expand Down
12 changes: 10 additions & 2 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// HistogramReservoirProvider is a provider of [HistogramReservoir].
func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
cp := slices.Clone(bounds)
slices.Sort(cp)
return func() Reservoir {
return NewHistogramReservoir(cp)
}
}

// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
// measurement that falls within a histogram bucket. The histogram bucket
// upper-boundaries are define by bounds.
//
// The passed bounds will be sorted by this function.
// The passed bounds must be sorted before calling this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
slices.Sort(bounds)
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/exemplar/reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ type Reservoir interface {
// The Reservoir state is preserved after this call.
Collect(dest *[]Exemplar)
}

// ReservoirProvider creates new [Reservoir]s.
type ReservoirProvider func() Reservoir
6 changes: 6 additions & 0 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ type Stream struct {
// Use NewAllowKeysFilter from "go.opentelemetry.io/otel/attribute" to
// provide an allow-list of attribute keys here.
AttributeFilter attribute.Filter
// ExemplarReservoirProvider selects the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] based
// on the [Aggregation].
//
// If unspecified, [DefaultExemplarReservoirProviderSelector] is used.
ExemplarReservoirProviderSelector ExemplarReservoirProviderSelector
}

// instID are the identifying properties of a instrument.
Expand Down

0 comments on commit b295fcc

Please sign in to comment.