From cec9e73a3268da40748ff6ee4a6e2ed3ad57003e Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 6 Jan 2023 12:12:14 -0800 Subject: [PATCH 1/2] Have multi-inst callback return an error --- CHANGELOG.md | 1 + example/prometheus/main.go | 3 +- metric/example_test.go | 6 +- metric/internal/global/meter.go | 3 +- metric/internal/global/meter_test.go | 13 +++-- metric/internal/global/meter_types_test.go | 4 +- metric/meter.go | 2 +- sdk/metric/meter_test.go | 64 +++++++++++++++------- sdk/metric/pipeline.go | 4 +- sdk/metric/pipeline_test.go | 6 +- 10 files changed, 68 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2252e56d8c3..061f272c89a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `InstrumentKindAsyncGauge` is renamed to `InstrumentKindObservableGauge` - Update the `RegisterCallback` method of the `Meter` in the `go.opentelemetry.io/otel/sdk/metric` package to accept the added `Callback` type instead of an inline function type definition. The underlying type of a `Callback` is the same `func(context.Context)` that the method used to accept. (#3564) +- The callback function registered with a `Meter` from the `go.opentelemetry.io/otel/metric` package is required to return an error now. (#TBD) ### Deprecated diff --git a/example/prometheus/main.go b/example/prometheus/main.go index 6bc9563d77e..68be2f8d6cb 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -68,9 +68,10 @@ func main() { if err != nil { log.Fatal(err) } - _, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) error { n := -10. + rand.Float64()*(90.) // [-10, 100) gauge.Observe(ctx, n, attrs...) + return nil }) if err != nil { log.Fatal(err) diff --git a/metric/example_test.go b/metric/example_test.go index 08741f77d06..c6beeca7fc3 100644 --- a/metric/example_test.go +++ b/metric/example_test.go @@ -62,13 +62,14 @@ func ExampleMeter_asynchronous_single() { } _, err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage}, - func(ctx context.Context) { + func(ctx context.Context) error { // instrument.WithCallbackFunc(func(ctx context.Context) { //Do Work to get the real memoryUsage // mem := GatherMemory(ctx) mem := 75000 memoryUsage.Observe(ctx, int64(mem)) + return nil }) if err != nil { fmt.Println("Failed to register callback") @@ -90,7 +91,7 @@ func ExampleMeter_asynchronous_multiple() { heapAlloc, gcCount, }, - func(ctx context.Context) { + func(ctx context.Context) error { memStats := &runtime.MemStats{} // This call does work runtime.ReadMemStats(memStats) @@ -100,6 +101,7 @@ func ExampleMeter_asynchronous_multiple() { // This function synchronously records the pauses computeGCPauses(ctx, gcPause, memStats.PauseNs[:]) + return nil }, ) diff --git a/metric/internal/global/meter.go b/metric/internal/global/meter.go index 8d71aa050cf..bdde4e87ea0 100644 --- a/metric/internal/global/meter.go +++ b/metric/internal/global/meter.go @@ -16,7 +16,6 @@ package global // import "go.opentelemetry.io/otel/metric/internal/global" import ( "container/list" - "context" "sync" "sync/atomic" @@ -323,7 +322,7 @@ func unwrapInstruments(instruments []instrument.Asynchronous) []instrument.Async type registration struct { instruments []instrument.Asynchronous - function func(context.Context) + function metric.Callback unreg func() error unregMu sync.Mutex diff --git a/metric/internal/global/meter_test.go b/metric/internal/global/meter_test.go index 903ed340a02..86d6f3e0ade 100644 --- a/metric/internal/global/meter_test.go +++ b/metric/internal/global/meter_test.go @@ -68,7 +68,7 @@ func TestMeterRace(t *testing.T) { _, _ = mtr.Int64Counter(name) _, _ = mtr.Int64UpDownCounter(name) _, _ = mtr.Int64Histogram(name) - _, _ = mtr.RegisterCallback(nil, func(ctx context.Context) {}) + _, _ = mtr.RegisterCallback(nil, func(ctx context.Context) error { return nil }) if !once { wg.Done() once = true @@ -88,7 +88,7 @@ func TestMeterRace(t *testing.T) { func TestUnregisterRace(t *testing.T) { mtr := &meter{} - reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) {}) + reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) error { return nil }) require.NoError(t, err) wg := &sync.WaitGroup{} @@ -130,8 +130,9 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Coun _, err = m.Int64ObservableGauge("test_Async_Gauge") assert.NoError(t, err) - _, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) error { afcounter.Observe(ctx, 3) + return nil }) require.NoError(t, err) @@ -324,8 +325,9 @@ func TestRegistrationDelegation(t *testing.T) { require.NoError(t, err) var called0 bool - reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) { + reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) error { called0 = true + return nil }) require.NoError(t, err) require.Equal(t, 1, mImpl.registry.Len(), "callback not registered") @@ -334,8 +336,9 @@ func TestRegistrationDelegation(t *testing.T) { assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered") var called1 bool - reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) { + reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) error { called1 = true + return nil }) require.NoError(t, err) require.Equal(t, 1, mImpl.registry.Len(), "second callback not registered") diff --git a/metric/internal/global/meter_types_test.go b/metric/internal/global/meter_types_test.go index 7a2680ee45a..31903995a83 100644 --- a/metric/internal/global/meter_types_test.go +++ b/metric/internal/global/meter_types_test.go @@ -52,7 +52,7 @@ type testMeter struct { siUDCount int siHist int - callbacks []func(context.Context) + callbacks []metric.Callback } func (m *testMeter) Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error) { @@ -145,6 +145,6 @@ func (m *testMeter) collect() { // Unregister. continue } - f(ctx) + _ = f(ctx) } } diff --git a/metric/meter.go b/metric/meter.go index 7a042113835..35ffe0de384 100644 --- a/metric/meter.go +++ b/metric/meter.go @@ -120,7 +120,7 @@ type Meter interface { // the same attributes as another Callback will report. // // The function needs to be concurrent safe. -type Callback func(context.Context) +type Callback func(context.Context) error // Registration is an token representing the unique registration of a callback // for a set of instruments with a Meter. diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 9568ee3311a..b394707b0aa 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -95,6 +95,8 @@ func TestMeterInstrumentConcurrency(t *testing.T) { wg.Wait() } +var emptyCallback metric.Callback = func(ctx context.Context) error { return nil } + // A Meter Should be able register Callbacks Concurrently. func TestMeterCallbackCreationConcurrency(t *testing.T) { wg := &sync.WaitGroup{} @@ -103,11 +105,11 @@ func TestMeterCallbackCreationConcurrency(t *testing.T) { m := NewMeterProvider().Meter("callback-concurrency") go func() { - _, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + _, _ = m.RegisterCallback([]instrument.Asynchronous{}, emptyCallback) wg.Done() }() go func() { - _, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + _, _ = m.RegisterCallback([]instrument.Asynchronous{}, emptyCallback) wg.Done() }() wg.Wait() @@ -115,7 +117,7 @@ func TestMeterCallbackCreationConcurrency(t *testing.T) { func TestNoopCallbackUnregisterConcurrency(t *testing.T) { m := NewMeterProvider().Meter("noop-unregister-concurrency") - reg, err := m.RegisterCallback(nil, func(ctx context.Context) {}) + reg, err := m.RegisterCallback(nil, emptyCallback) require.NoError(t, err) wg := &sync.WaitGroup{} @@ -143,11 +145,11 @@ func TestCallbackUnregisterConcurrency(t *testing.T) { require.NoError(t, err) i := []instrument.Asynchronous{actr} - regCtr, err := meter.RegisterCallback(i, func(ctx context.Context) {}) + regCtr, err := meter.RegisterCallback(i, emptyCallback) require.NoError(t, err) i = []instrument.Asynchronous{ag} - regG, err := meter.RegisterCallback(i, func(ctx context.Context) {}) + regG, err := meter.RegisterCallback(i, emptyCallback) require.NoError(t, err) wg := &sync.WaitGroup{} @@ -183,8 +185,9 @@ func TestMeterCreatesInstruments(t *testing.T) { } ctr, err := m.Int64ObservableCounter("aint", instrument.WithInt64Callback(cback)) assert.NoError(t, err) - _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error { ctr.Observe(ctx, 3) + return nil }) assert.NoError(t, err) @@ -212,8 +215,9 @@ func TestMeterCreatesInstruments(t *testing.T) { } ctr, err := m.Int64ObservableUpDownCounter("aint", instrument.WithInt64Callback(cback)) assert.NoError(t, err) - _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error { ctr.Observe(ctx, 11) + return nil }) assert.NoError(t, err) @@ -241,8 +245,9 @@ func TestMeterCreatesInstruments(t *testing.T) { } gauge, err := m.Int64ObservableGauge("agauge", instrument.WithInt64Callback(cback)) assert.NoError(t, err) - _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) error { gauge.Observe(ctx, 11) + return nil }) assert.NoError(t, err) @@ -268,8 +273,9 @@ func TestMeterCreatesInstruments(t *testing.T) { } ctr, err := m.Float64ObservableCounter("afloat", instrument.WithFloat64Callback(cback)) assert.NoError(t, err) - _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error { ctr.Observe(ctx, 3) + return nil }) assert.NoError(t, err) @@ -297,8 +303,9 @@ func TestMeterCreatesInstruments(t *testing.T) { } ctr, err := m.Float64ObservableUpDownCounter("afloat", instrument.WithFloat64Callback(cback)) assert.NoError(t, err) - _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error { ctr.Observe(ctx, 11) + return nil }) assert.NoError(t, err) @@ -326,8 +333,9 @@ func TestMeterCreatesInstruments(t *testing.T) { } gauge, err := m.Float64ObservableGauge("agauge", instrument.WithFloat64Callback(cback)) assert.NoError(t, err) - _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) error { gauge.Observe(ctx, 11) + return nil }) assert.NoError(t, err) @@ -501,16 +509,18 @@ func TestMetersProvideScope(t *testing.T) { m1 := mp.Meter("scope1") ctr1, err := m1.Float64ObservableCounter("ctr1") assert.NoError(t, err) - _, err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) { + _, err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) error { ctr1.Observe(ctx, 5) + return nil }) assert.NoError(t, err) m2 := mp.Meter("scope2") ctr2, err := m2.Int64ObservableCounter("ctr2") assert.NoError(t, err) - _, err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) { + _, err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) error { ctr2.Observe(ctx, 7) + return nil }) assert.NoError(t, err) @@ -594,7 +604,10 @@ func TestUnregisterUnregisters(t *testing.T) { floag64Counter, floag64UpDownCounter, floag64Gauge, - }, func(context.Context) { called = true }) + }, func(context.Context) error { + called = true + return nil + }) require.NoError(t, err) ctx := context.Background() @@ -644,7 +657,10 @@ func TestRegisterCallbackDropAggregations(t *testing.T) { floag64Counter, floag64UpDownCounter, floag64Gauge, - }, func(context.Context) { called = true }) + }, func(context.Context) error { + called = true + return nil + }) require.NoError(t, err) data, err := r.Collect(context.Background()) @@ -669,9 +685,10 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil }) return err }, @@ -696,9 +713,10 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil }) return err }, @@ -723,9 +741,10 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil }) return err }, @@ -748,9 +767,10 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil }) return err }, @@ -775,9 +795,10 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil }) return err }, @@ -802,9 +823,10 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + return nil }) return err }, diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index e305eb8706c..32b9125340b 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -147,7 +147,9 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err for e := p.multiCallbacks.Front(); e != nil; e = e.Next() { // TODO make the callbacks parallel. ( #3034 ) f := e.Value.(metric.Callback) - f(ctx) + if err := f(ctx); err != nil { + errs.append(err) + } if err := ctx.Err(); err != nil { // This means the context expired before we finished running callbacks. return metricdata.ResourceMetrics{}, err diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 7fea4c8aba2..7b9f89585dc 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -54,7 +54,7 @@ func TestEmptyPipeline(t *testing.T) { }) require.NotPanics(t, func() { - pipe.addMultiCallback(func(ctx context.Context) {}) + pipe.addMultiCallback(emptyCallback) }) output, err = pipe.produce(context.Background()) @@ -78,7 +78,7 @@ func TestNewPipeline(t *testing.T) { }) require.NotPanics(t, func() { - pipe.addMultiCallback(func(ctx context.Context) {}) + pipe.addMultiCallback(emptyCallback) }) output, err = pipe.produce(context.Background()) @@ -121,7 +121,7 @@ func TestPipelineConcurrency(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - pipe.addMultiCallback(func(ctx context.Context) {}) + pipe.addMultiCallback(emptyCallback) }() } wg.Wait() From 1428de3edaf41f0229b9358e4f6ed28db22362b7 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 6 Jan 2023 12:14:02 -0800 Subject: [PATCH 2/2] Update PR number in changelog entry --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 061f272c89a..9b4dd46e850 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,7 +75,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `InstrumentKindAsyncGauge` is renamed to `InstrumentKindObservableGauge` - Update the `RegisterCallback` method of the `Meter` in the `go.opentelemetry.io/otel/sdk/metric` package to accept the added `Callback` type instead of an inline function type definition. The underlying type of a `Callback` is the same `func(context.Context)` that the method used to accept. (#3564) -- The callback function registered with a `Meter` from the `go.opentelemetry.io/otel/metric` package is required to return an error now. (#TBD) +- The callback function registered with a `Meter` from the `go.opentelemetry.io/otel/metric` package is required to return an error now. (#3576) ### Deprecated