Skip to content

Commit

Permalink
Merge branch 'main' into opencensus_producer
Browse files Browse the repository at this point in the history
  • Loading branch information
hanyuancheung authored Dec 19, 2022
2 parents f4d3d26 + 4014204 commit da3df7c
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 79 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package.
This `Registration` can be used to unregister callbacks. (#3522)
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)

### Removed

- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)

### Added
### Changed

- Global error handler uses an atomic value instead of a mutex. (#3543)
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)
- Add `NewMetricProducer` to `go.opentelemetry.io/otel/bridge/opencensus`, which can be used to pass OpenCensus metrics to an OpenTelemetry Reader. (#3541)

Expand Down
2 changes: 1 addition & 1 deletion example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
_, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
n := -10. + rand.Float64()*(90.) // [-10, 100)
gauge.Observe(ctx, n, attrs...)
})
Expand Down
25 changes: 12 additions & 13 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package otel // import "go.opentelemetry.io/otel"
import (
"log"
"os"
"sync"
"sync/atomic"
"unsafe"
)

var (
Expand All @@ -34,28 +35,26 @@ var (
)

type delegator struct {
lock *sync.RWMutex
eh ErrorHandler
delegate unsafe.Pointer
}

func (d *delegator) Handle(err error) {
d.lock.RLock()
defer d.lock.RUnlock()
d.eh.Handle(err)
d.getDelegate().Handle(err)
}

func (d *delegator) getDelegate() ErrorHandler {
return *(*ErrorHandler)(atomic.LoadPointer(&d.delegate))
}

// setDelegate sets the ErrorHandler delegate.
func (d *delegator) setDelegate(eh ErrorHandler) {
d.lock.Lock()
defer d.lock.Unlock()
d.eh = eh
atomic.StorePointer(&d.delegate, unsafe.Pointer(&eh))
}

func defaultErrorHandler() *delegator {
return &delegator{
lock: &sync.RWMutex{},
eh: &errLogger{l: log.New(os.Stderr, "", log.LstdFlags)},
}
d := &delegator{}
d.setDelegate(&errLogger{l: log.New(os.Stderr, "", log.LstdFlags)})
return d
}

// errLogger logs errors if no delegate is set, otherwise they are delegated.
Expand Down
6 changes: 3 additions & 3 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type HandlerTestSuite struct {

func (s *HandlerTestSuite) SetupSuite() {
s.errCatcher = new(testErrCatcher)
s.origHandler = globalErrorHandler.eh
s.origHandler = globalErrorHandler.getDelegate()

globalErrorHandler.setDelegate(&errLogger{l: log.New(s.errCatcher, "", 0)})
}
Expand Down Expand Up @@ -111,12 +111,12 @@ func (s *HandlerTestSuite) TestAllowMultipleSets() {
secondary := &errLogger{l: log.New(notUsed, "", 0)}
SetErrorHandler(secondary)
s.Require().Same(GetErrorHandler(), globalErrorHandler, "set changed globalErrorHandler")
s.Require().Same(globalErrorHandler.eh, secondary, "new Handler not set")
s.Require().Same(globalErrorHandler.getDelegate(), secondary, "new Handler not set")

tertiary := &errLogger{l: log.New(notUsed, "", 0)}
SetErrorHandler(tertiary)
s.Require().Same(GetErrorHandler(), globalErrorHandler, "set changed globalErrorHandler")
s.Assert().Same(globalErrorHandler.eh, tertiary, "user Handler not overridden")
s.Assert().Same(globalErrorHandler.getDelegate(), tertiary, "user Handler not overridden")
}

func TestHandlerTestSuite(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ExampleMeter_asynchronous_single() {
panic(err)
}

err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage},
_, err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage},
func(ctx context.Context) {
// instrument.WithCallbackFunc(func(ctx context.Context) {
//Do Work to get the real memoryUsage
Expand All @@ -86,7 +86,7 @@ func ExampleMeter_asynchronous_multiple() {
gcCount, _ := meter.AsyncInt64().Counter("gcCount")
gcPause, _ := meter.SyncFloat64().Histogram("gcPause")

err := meter.RegisterCallback([]instrument.Asynchronous{
_, err := meter.RegisterCallback([]instrument.Asynchronous{
heapAlloc,
gcCount,
},
Expand Down
63 changes: 49 additions & 14 deletions metric/internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package global // import "go.opentelemetry.io/otel/metric/internal/global"

import (
"container/list"
"context"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -109,7 +110,8 @@ type meter struct {

mtx sync.Mutex
instruments []delegatedInstrument
callbacks []delegatedCallback

registry list.List

delegate atomic.Value // metric.Meter
}
Expand All @@ -135,12 +137,14 @@ func (m *meter) setDelegate(provider metric.MeterProvider) {
inst.setDelegate(meter)
}

for _, callback := range m.callbacks {
callback.setDelegate(meter)
for e := m.registry.Front(); e != nil; e = e.Next() {
r := e.Value.(*registration)
r.setDelegate(meter)
m.registry.Remove(e)
}

m.instruments = nil
m.callbacks = nil
m.registry.Init()
}

// AsyncInt64 is the namespace for the Asynchronous Integer instruments.
Expand All @@ -167,20 +171,24 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
insts = unwrapInstruments(insts)
return del.RegisterCallback(insts, function)
return del.RegisterCallback(insts, f)
}

m.mtx.Lock()
defer m.mtx.Unlock()
m.callbacks = append(m.callbacks, delegatedCallback{
instruments: insts,
function: function,
})

return nil
reg := &registration{instruments: insts, function: f}
e := m.registry.PushBack(reg)
reg.unreg = func() error {
m.mtx.Lock()
_ = m.registry.Remove(e)
m.mtx.Unlock()
return nil
}
return reg, nil
}

type wrapped interface {
Expand Down Expand Up @@ -217,17 +225,44 @@ func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
return (*sfInstProvider)(m)
}

type delegatedCallback struct {
type registration struct {
instruments []instrument.Asynchronous
function func(context.Context)

unreg func() error
unregMu sync.Mutex
}

func (c *delegatedCallback) setDelegate(m metric.Meter) {
func (c *registration) setDelegate(m metric.Meter) {
insts := unwrapInstruments(c.instruments)
err := m.RegisterCallback(insts, c.function)

c.unregMu.Lock()
defer c.unregMu.Unlock()

if c.unreg == nil {
// Unregister already called.
return
}

reg, err := m.RegisterCallback(insts, c.function)
if err != nil {
otel.Handle(err)
}

c.unreg = reg.Unregister
}

func (c *registration) Unregister() error {
c.unregMu.Lock()
defer c.unregMu.Unlock()
if c.unreg == nil {
// Unregister already called.
return nil
}

var err error
err, c.unreg = c.unreg(), nil
return err
}

type afInstProvider meter
Expand Down
84 changes: 81 additions & 3 deletions metric/internal/global/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestMeterRace(t *testing.T) {
_, _ = mtr.SyncInt64().Counter(name)
_, _ = mtr.SyncInt64().UpDownCounter(name)
_, _ = mtr.SyncInt64().Histogram(name)
_ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
_, _ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
if !once {
wg.Done()
once = true
Expand All @@ -86,6 +86,35 @@ func TestMeterRace(t *testing.T) {
close(finish)
}

func TestUnregisterRace(t *testing.T) {
mtr := &meter{}
reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) {})
require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(1)
finish := make(chan struct{})
go func() {
for i, once := 0, false; ; i++ {
_ = reg.Unregister()
if !once {
wg.Done()
once = true
}
select {
case <-finish:
return
default:
}
}
}()
_ = reg.Unregister()

wg.Wait()
mtr.setDelegate(metric.NewNoopMeterProvider())
close(finish)
}

func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Counter, asyncfloat64.Counter) {
afcounter, err := m.AsyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)
Expand All @@ -101,9 +130,10 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Coun
_, err = m.AsyncInt64().Gauge("test_Async_Gauge")
assert.NoError(t, err)

require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
afcounter.Observe(ctx, 3)
}))
})
require.NoError(t, err)

sfcounter, err := m.SyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)
Expand Down Expand Up @@ -257,3 +287,51 @@ func TestMeterDefersDelegations(t *testing.T) {
assert.IsType(t, &afCounter{}, actr)
assert.Equal(t, 1, mp.count)
}

func TestRegistrationDelegation(t *testing.T) {
// globalMeterProvider := otel.GetMeterProvider
globalMeterProvider := &meterProvider{}

m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test")
require.IsType(t, &meter{}, m)
mImpl := m.(*meter)

actr, err := m.AsyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)

var called0 bool
reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
called0 = true
})
require.NoError(t, err)
require.Equal(t, 1, mImpl.registry.Len(), "callback not registered")
// This means reg0 should not be delegated.
assert.NoError(t, reg0.Unregister())
assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered")

var called1 bool
reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
called1 = true
})
require.NoError(t, err)
require.Equal(t, 1, mImpl.registry.Len(), "second callback not registered")

mp := &testMeterProvider{}

// otel.SetMeterProvider(mp)
globalMeterProvider.setDelegate(mp)

testCollect(t, m) // This is a hacky way to emulate a read from an exporter
require.False(t, called0, "pre-delegation unregistered callback called")
require.True(t, called1, "callback not called")

called1 = false
assert.NoError(t, reg1.Unregister(), "unregister second callback")

testCollect(t, m) // This is a hacky way to emulate a read from an exporter
assert.False(t, called1, "unregistered callback called")

assert.NotPanics(t, func() {
assert.NoError(t, reg1.Unregister(), "duplicate unregister calls")
})
}
21 changes: 19 additions & 2 deletions metric/internal/global/meter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,21 @@ func (m *testMeter) AsyncFloat64() asyncfloat64.InstrumentProvider {
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *testMeter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
m.callbacks = append(m.callbacks, function)
func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) {
m.callbacks = append(m.callbacks, f)
return testReg{
f: func(idx int) func() {
return func() { m.callbacks[idx] = nil }
}(len(m.callbacks) - 1),
}, nil
}

type testReg struct {
f func()
}

func (r testReg) Unregister() error {
r.f()
return nil
}

Expand All @@ -85,6 +98,10 @@ func (m *testMeter) SyncFloat64() syncfloat64.InstrumentProvider {
func (m *testMeter) collect() {
ctx := context.Background()
for _, f := range m.callbacks {
if f == nil {
// Unregister.
continue
}
f(ctx)
}
}
Expand Down
Loading

0 comments on commit da3df7c

Please sign in to comment.