Skip to content

Commit

Permalink
Redesign RegisterCallback API (#3584)
Browse files Browse the repository at this point in the history
* Update RegisterCallback and Callback declerations

RegisterCallback accepts variadic Asynchronous instruments instead of a
slice.

Callback accepts an observation result recorder to ensure instruments
that are observed by a callback.

* Update global, noop, SDK implementations
* Fix examples
* Add changes to changelog
* Test RegisterCallback for invalid observers
* Test callbacks from foreign sources not collected
* Support registering delegating instruments
  • Loading branch information
MrAlias authored Jan 19, 2023
1 parent e8c6e45 commit 69b18e6
Show file tree
Hide file tree
Showing 12 changed files with 630 additions and 114 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
Instead it uses the `net.sock.peer` attributes. (#3581)
- The parameters for the `RegisterCallback` method of the `Meter` from `go.opentelemetry.io/otel/metric` are changed.
The slice of `instrument.Asynchronous` parameter is now passed as a variadic argument. (#3587)
- The `Callback` in `go.opentelemetry.io/otel/metric` has the added `Observer` parameter added.
This new parameter is used by `Callback` implementations to observe values for asynchronous instruments instead of calling the `Observe` method of the instrument directly. (#3584)

### Fixed

- The `RegisterCallback` method of the `Meter` from `go.opentelemetry.io/otel/sdk/metric` only registers a callback for instruments created by that meter.
Trying to register a callback with instruments from a different meter will result in an error being returned. (#3584)

### Deprecated

Expand Down
5 changes: 3 additions & 2 deletions example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus"
api "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/sdk/metric"
)
Expand Down Expand Up @@ -68,9 +69,9 @@ func main() {
if err != nil {
log.Fatal(err)
}
_, err = meter.RegisterCallback(func(ctx context.Context) error {
_, err = meter.RegisterCallback(func(_ context.Context, o api.Observer) error {
n := -10. + rand.Float64()*(90.) // [-10, 100)
gauge.Observe(ctx, n, attrs...)
o.ObserveFloat64(gauge, n, attrs...)
return nil
}, gauge)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ func ExampleMeter_asynchronous_multiple() {
gcPause, _ := meter.Float64Histogram("gcPause")

_, err := meter.RegisterCallback(
func(ctx context.Context) error {
func(ctx context.Context, o metric.Observer) error {
memStats := &runtime.MemStats{}
// This call does work
runtime.ReadMemStats(memStats)

heapAlloc.Observe(ctx, int64(memStats.HeapAlloc))
gcCount.Observe(ctx, int64(memStats.NumGC))
o.ObserveInt64(heapAlloc, int64(memStats.HeapAlloc))
o.ObserveInt64(gcCount, int64(memStats.NumGC))

// This function synchronously records the pauses
computeGCPauses(ctx, gcPause, memStats.PauseNs[:])
Expand Down
47 changes: 41 additions & 6 deletions metric/internal/global/instruments.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"go.opentelemetry.io/otel/metric/instrument"
)

// unwrapper unwraps to return the underlying instrument implementation.
type unwrapper interface {
Unwrap() instrument.Asynchronous
}

type afCounter struct {
name string
opts []instrument.Float64ObserverOption
Expand All @@ -33,6 +38,9 @@ type afCounter struct {
instrument.Asynchronous
}

var _ unwrapper = (*afCounter)(nil)
var _ instrument.Float64ObservableCounter = (*afCounter)(nil)

func (i *afCounter) setDelegate(m metric.Meter) {
ctr, err := m.Float64ObservableCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -48,7 +56,7 @@ func (i *afCounter) Observe(ctx context.Context, x float64, attrs ...attribute.K
}
}

func (i *afCounter) unwrap() instrument.Asynchronous {
func (i *afCounter) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Float64ObservableCounter)
}
Expand All @@ -64,6 +72,9 @@ type afUpDownCounter struct {
instrument.Asynchronous
}

var _ unwrapper = (*afUpDownCounter)(nil)
var _ instrument.Float64ObservableUpDownCounter = (*afUpDownCounter)(nil)

func (i *afUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.Float64ObservableUpDownCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -79,7 +90,7 @@ func (i *afUpDownCounter) Observe(ctx context.Context, x float64, attrs ...attri
}
}

func (i *afUpDownCounter) unwrap() instrument.Asynchronous {
func (i *afUpDownCounter) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Float64ObservableUpDownCounter)
}
Expand All @@ -104,13 +115,16 @@ func (i *afGauge) setDelegate(m metric.Meter) {
i.delegate.Store(ctr)
}

var _ unwrapper = (*afGauge)(nil)
var _ instrument.Float64ObservableGauge = (*afGauge)(nil)

func (i *afGauge) Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(instrument.Float64ObservableGauge).Observe(ctx, x, attrs...)
}
}

func (i *afGauge) unwrap() instrument.Asynchronous {
func (i *afGauge) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Float64ObservableGauge)
}
Expand All @@ -126,6 +140,9 @@ type aiCounter struct {
instrument.Asynchronous
}

var _ unwrapper = (*aiCounter)(nil)
var _ instrument.Int64ObservableCounter = (*aiCounter)(nil)

func (i *aiCounter) setDelegate(m metric.Meter) {
ctr, err := m.Int64ObservableCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -141,7 +158,7 @@ func (i *aiCounter) Observe(ctx context.Context, x int64, attrs ...attribute.Key
}
}

func (i *aiCounter) unwrap() instrument.Asynchronous {
func (i *aiCounter) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Int64ObservableCounter)
}
Expand All @@ -157,6 +174,9 @@ type aiUpDownCounter struct {
instrument.Asynchronous
}

var _ unwrapper = (*aiUpDownCounter)(nil)
var _ instrument.Int64ObservableUpDownCounter = (*aiUpDownCounter)(nil)

func (i *aiUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.Int64ObservableUpDownCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -172,7 +192,7 @@ func (i *aiUpDownCounter) Observe(ctx context.Context, x int64, attrs ...attribu
}
}

func (i *aiUpDownCounter) unwrap() instrument.Asynchronous {
func (i *aiUpDownCounter) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Int64ObservableUpDownCounter)
}
Expand All @@ -188,6 +208,9 @@ type aiGauge struct {
instrument.Asynchronous
}

var _ unwrapper = (*aiGauge)(nil)
var _ instrument.Int64ObservableGauge = (*aiGauge)(nil)

func (i *aiGauge) setDelegate(m metric.Meter) {
ctr, err := m.Int64ObservableGauge(i.name, i.opts...)
if err != nil {
Expand All @@ -203,7 +226,7 @@ func (i *aiGauge) Observe(ctx context.Context, x int64, attrs ...attribute.KeyVa
}
}

func (i *aiGauge) unwrap() instrument.Asynchronous {
func (i *aiGauge) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Int64ObservableGauge)
}
Expand All @@ -220,6 +243,8 @@ type sfCounter struct {
instrument.Synchronous
}

var _ instrument.Float64Counter = (*sfCounter)(nil)

func (i *sfCounter) setDelegate(m metric.Meter) {
ctr, err := m.Float64Counter(i.name, i.opts...)
if err != nil {
Expand All @@ -244,6 +269,8 @@ type sfUpDownCounter struct {
instrument.Synchronous
}

var _ instrument.Float64UpDownCounter = (*sfUpDownCounter)(nil)

func (i *sfUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.Float64UpDownCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -268,6 +295,8 @@ type sfHistogram struct {
instrument.Synchronous
}

var _ instrument.Float64Histogram = (*sfHistogram)(nil)

func (i *sfHistogram) setDelegate(m metric.Meter) {
ctr, err := m.Float64Histogram(i.name, i.opts...)
if err != nil {
Expand All @@ -292,6 +321,8 @@ type siCounter struct {
instrument.Synchronous
}

var _ instrument.Int64Counter = (*siCounter)(nil)

func (i *siCounter) setDelegate(m metric.Meter) {
ctr, err := m.Int64Counter(i.name, i.opts...)
if err != nil {
Expand All @@ -316,6 +347,8 @@ type siUpDownCounter struct {
instrument.Synchronous
}

var _ instrument.Int64UpDownCounter = (*siUpDownCounter)(nil)

func (i *siUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.Int64UpDownCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -340,6 +373,8 @@ type siHistogram struct {
instrument.Synchronous
}

var _ instrument.Int64Histogram = (*siHistogram)(nil)

func (i *siHistogram) setDelegate(m metric.Meter) {
ctr, err := m.Int64Histogram(i.name, i.opts...)
if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions metric/internal/global/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func TestMeterProviderRace(t *testing.T) {
close(finish)
}

var zeroCallback metric.Callback = func(ctx context.Context, or metric.Observer) error {
return nil
}

func TestMeterRace(t *testing.T) {
mtr := &meter{}

Expand All @@ -66,7 +70,7 @@ func TestMeterRace(t *testing.T) {
_, _ = mtr.Int64Counter(name)
_, _ = mtr.Int64UpDownCounter(name)
_, _ = mtr.Int64Histogram(name)
_, _ = mtr.RegisterCallback(func(ctx context.Context) error { return nil })
_, _ = mtr.RegisterCallback(zeroCallback)
if !once {
wg.Done()
once = true
Expand All @@ -86,7 +90,7 @@ func TestMeterRace(t *testing.T) {

func TestUnregisterRace(t *testing.T) {
mtr := &meter{}
reg, err := mtr.RegisterCallback(func(ctx context.Context) error { return nil })
reg, err := mtr.RegisterCallback(zeroCallback)
require.NoError(t, err)

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -128,8 +132,8 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (instrument.Float
_, err = m.Int64ObservableGauge("test_Async_Gauge")
assert.NoError(t, err)

_, err = m.RegisterCallback(func(ctx context.Context) error {
afcounter.Observe(ctx, 3)
_, err = m.RegisterCallback(func(ctx context.Context, obs metric.Observer) error {
obs.ObserveFloat64(afcounter, 3)
return nil
}, afcounter)
require.NoError(t, err)
Expand Down Expand Up @@ -323,7 +327,7 @@ func TestRegistrationDelegation(t *testing.T) {
require.NoError(t, err)

var called0 bool
reg0, err := m.RegisterCallback(func(context.Context) error {
reg0, err := m.RegisterCallback(func(context.Context, metric.Observer) error {
called0 = true
return nil
}, actr)
Expand All @@ -334,7 +338,7 @@ func TestRegistrationDelegation(t *testing.T) {
assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered")

var called1 bool
reg1, err := m.RegisterCallback(func(context.Context) error {
reg1, err := m.RegisterCallback(func(context.Context, metric.Observer) error {
called1 = true
return nil
}, actr)
Expand Down
16 changes: 15 additions & 1 deletion metric/internal/global/meter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package global // import "go.opentelemetry.io/otel/metric/internal/global"
import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
)
Expand Down Expand Up @@ -136,11 +137,24 @@ func (r testReg) Unregister() error {
// This enables async collection.
func (m *testMeter) collect() {
ctx := context.Background()
o := observationRecorder{ctx}
for _, f := range m.callbacks {
if f == nil {
// Unregister.
continue
}
_ = f(ctx)
_ = f(ctx, o)
}
}

type observationRecorder struct {
ctx context.Context
}

func (o observationRecorder) ObserveFloat64(i instrument.Float64Observer, value float64, attr ...attribute.KeyValue) {
i.Observe(o.ctx, value, attr...)
}

func (o observationRecorder) ObserveInt64(i instrument.Int64Observer, value int64, attr ...attribute.KeyValue) {
i.Observe(o.ctx, value, attr...)
}
14 changes: 12 additions & 2 deletions metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package metric // import "go.opentelemetry.io/otel/metric"
import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
)

Expand Down Expand Up @@ -106,7 +107,8 @@ type Meter interface {
}

// Callback is a function registered with a Meter that makes observations for
// the set of instruments it is registered with.
// the set of instruments it is registered with. The Observer parameter is used
// to record measurment observations for these instruments.
//
// The function needs to complete in a finite amount of time and the deadline
// of the passed context is expected to be honored.
Expand All @@ -116,7 +118,15 @@ type Meter interface {
// the same attributes as another Callback will report.
//
// The function needs to be concurrent safe.
type Callback func(context.Context) error
type Callback func(context.Context, Observer) error

// Observer records measurements for multiple instruments in a Callback.
type Observer interface {
// ObserveFloat64 records the float64 value with attributes for obsrv.
ObserveFloat64(obsrv instrument.Float64Observer, value float64, attributes ...attribute.KeyValue)
// ObserveInt64 records the int64 value with attributes for obsrv.
ObserveInt64(obsrv instrument.Int64Observer, value int64, attributes ...attribute.KeyValue)
}

// Registration is an token representing the unique registration of a callback
// for a set of instruments with a Meter.
Expand Down
Loading

0 comments on commit 69b18e6

Please sign in to comment.