Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC: Implement MetricProducer by making the SDK a Producer #3523

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

type reader struct {
producer producer
producer Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
Expand All @@ -42,7 +42,7 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n
return r.aggregationFunc(kind)
}

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) RegisterProducer(p Producer) { r.producer = p }
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind)
}
Expand Down
57 changes: 42 additions & 15 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
// manualReader is a simple Reader that allows an application to
// read metrics on demand.
type manualReader struct {
producer atomic.Value
sdkProducer atomic.Value
shutdownOnce sync.Once

mu sync.Mutex
externalProducers atomic.Value

temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
}
Expand All @@ -41,19 +44,31 @@ var _ = map[Reader]struct{}{&manualReader{}: {}}
// NewManualReader returns a Reader which is directly called to collect metrics.
func NewManualReader(opts ...ManualReaderOption) Reader {
cfg := newManualReaderConfig(opts)
return &manualReader{
r := &manualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
r.externalProducers.Store([]Producer{})
return r
}

// register stores the Producer which enables the caller to read
// RegisterProducer stores the Producer which enables the caller to read
// metrics on demand.
func (mr *manualReader) register(p producer) {
// Only register once. If producer is already set, do nothing.
if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register manual reader"
global.Error(errDuplicateRegister, msg)
func (mr *manualReader) RegisterProducer(p Producer) {
if _, ok := p.(*pipeline); ok {
// Only register once. If sdk Producer is already set, do nothing.
if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.Produce}) {
msg := "did not RegisterProducer manual reader"
global.Error(errDuplicateRegister, msg)
}
} else {
mr.mu.Lock()
defer mr.mu.Unlock()
currentProducers := mr.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
mr.externalProducers.Store(newProducers)
}
}

Expand All @@ -77,18 +92,18 @@ func (mr *manualReader) Shutdown(context.Context) error {
err := ErrReaderShutdown
mr.shutdownOnce.Do(func() {
// Any future call to Collect will now return ErrReaderShutdown.
mr.producer.Store(produceHolder{
produce: shutdownProducer{}.produce,
mr.sdkProducer.Store(produceHolder{
produce: shutdownProducer{}.Produce,
})
err = nil
})
return err
}

// Collect gathers all metrics from the SDK, calling any callbacks necessary.
// Collect will return an error if called after shutdown.
// Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
p := mr.producer.Load()
p := mr.sdkProducer.Load()
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
}
Expand All @@ -99,11 +114,23 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
// this should never happen. In the unforeseen case that this does
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("manual reader: invalid producer: %T", p)
err := fmt.Errorf("manual reader: invalid Producer: %T", p)
return metricdata.ResourceMetrics{}, err
}

return ph.produce(ctx)
rm, err := ph.produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
for _, producer := range mr.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
// ignore resource from external metrics, but append scope metrics
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics.ScopeMetrics...)
}
return rm, nil
}

// manualReaderConfig contains configuration options for a ManualReader.
Expand Down
54 changes: 41 additions & 13 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
cancel: cancel,
done: make(chan struct{}),
}
r.externalProducers.Store([]Producer{})

go func() {
defer func() { close(r.done) }()
Expand All @@ -126,7 +127,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
// periodicReader is a Reader that continuously collects and exports metric
// data at a set interval.
type periodicReader struct {
producer atomic.Value
sdkProducer atomic.Value

mu sync.Mutex
externalProducers atomic.Value

timeout time.Duration
exporter Exporter
Expand Down Expand Up @@ -165,12 +169,22 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
}
}

// register registers p as the producer of this reader.
func (r *periodicReader) register(p producer) {
// Only register once. If producer is already set, do nothing.
if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
global.Error(errDuplicateRegister, msg)
// RegisterProducer registers p as the Producer of this reader.
func (r *periodicReader) RegisterProducer(p Producer) {
if _, ok := p.(*pipeline); ok {
// Only register once. If producer is already set, do nothing.
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.Produce}) {
msg := "did not register periodic reader"
global.Error(errDuplicateRegister, msg)
}
} else {
r.mu.Lock()
defer r.mu.Unlock()
currentProducers := r.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
r.externalProducers.Store(newProducers)
}
}

Expand All @@ -195,12 +209,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
// the SDK and other Producers. The returned metric data is not exported
// to the configured exporter, it is left to the caller to handle that if
// desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collect(ctx, r.producer.Load())
return r.collect(ctx, r.sdkProducer.Load())
}

// collect unwraps p as a produceHolder and returns its produce results.
Expand All @@ -218,7 +233,20 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
}
return ph.produce(ctx)

rm, err := ph.produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
// ignore resource from external metrics, but append scope metrics
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics.ScopeMetrics...)
}
return rm, nil
}

// export exports metric data m using r's exporter.
Expand Down Expand Up @@ -259,8 +287,8 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
<-r.done

// Any future call to Collect will now return ErrReaderShutdown.
ph := r.producer.Swap(produceHolder{
produce: shutdownProducer{}.produce,
ph := r.sdkProducer.Swap(produceHolder{
produce: shutdownProducer{}.Produce,
})

if ph != nil { // Reader was registered.
Expand Down
11 changes: 7 additions & 4 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (ts *periodicReaderTestSuite) SetupTest() {
}

ts.ErrReader = NewPeriodicReader(e)
ts.ErrReader.register(testProducer{})
ts.ErrReader.RegisterProducer(testSDKProducer())
}

func (ts *periodicReaderTestSuite) TearDownTest() {
Expand Down Expand Up @@ -193,7 +193,8 @@ func TestPeriodicReaderRun(t *testing.T) {
}

r := NewPeriodicReader(exp)
r.register(testProducer{})
r.RegisterProducer(testSDKProducer())
r.RegisterProducer(testProducer{})
trigger <- time.Now()
assert.Equal(t, assert.AnError, <-eh.Err)

Expand Down Expand Up @@ -221,7 +222,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("ForceFlush", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
r.RegisterProducer(testSDKProducer())
r.RegisterProducer(testProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")

Expand All @@ -232,7 +234,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("Shutdown", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
r.RegisterProducer(testSDKProducer())
r.RegisterProducer(testProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})
Expand Down
9 changes: 5 additions & 4 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline
}

// pipeline connects all of the instruments created by a meter provider to a Reader.
// This is the object that will be `Reader.register()` when a meter provider is created.
// This is the object that will be `Reader.RegisterProducer()` when a meter provider
// is created.
//
// As instruments are created the instrument should be checked if it exists in the
// views of a the Reader, and if so each aggregator should be added to the pipeline.
Expand Down Expand Up @@ -108,10 +109,10 @@ type callbackKey int
// they would have different integer values.
const produceKey callbackKey = 0

// produce returns aggregated metrics from a single collection.
// Produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
func (p *pipeline) Produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
p.Lock()
defer p.Unlock()

Expand Down Expand Up @@ -433,7 +434,7 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli
reader: r,
views: views,
}
r.register(p)
r.RegisterProducer(p)
pipes = append(pipes, p)
}
return pipes
Expand Down
14 changes: 7 additions & 7 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (testSumAggregator) Aggregation() metricdata.Aggregation {
func TestEmptyPipeline(t *testing.T) {
pipe := &pipeline{}

output, err := pipe.produce(context.Background())
output, err := pipe.Produce(context.Background())
require.NoError(t, err)
assert.Nil(t, output.Resource)
assert.Len(t, output.ScopeMetrics, 0)
Expand All @@ -57,7 +57,7 @@ func TestEmptyPipeline(t *testing.T) {
pipe.addCallback(func(ctx context.Context) {})
})

output, err = pipe.produce(context.Background())
output, err = pipe.Produce(context.Background())
require.NoError(t, err)
assert.Nil(t, output.Resource)
require.Len(t, output.ScopeMetrics, 1)
Expand All @@ -67,7 +67,7 @@ func TestEmptyPipeline(t *testing.T) {
func TestNewPipeline(t *testing.T) {
pipe := newPipeline(nil, nil, nil)

output, err := pipe.produce(context.Background())
output, err := pipe.Produce(context.Background())
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
assert.Len(t, output.ScopeMetrics, 0)
Expand All @@ -81,7 +81,7 @@ func TestNewPipeline(t *testing.T) {
pipe.addCallback(func(ctx context.Context) {})
})

output, err = pipe.produce(context.Background())
output, err = pipe.Produce(context.Background())
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
require.Len(t, output.ScopeMetrics, 1)
Expand All @@ -92,7 +92,7 @@ func TestPipelineUsesResource(t *testing.T) {
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
pipe := newPipeline(res, nil, nil)

output, err := pipe.produce(context.Background())
output, err := pipe.Produce(context.Background())
assert.NoError(t, err)
assert.Equal(t, res, output.Resource)
}
Expand All @@ -107,7 +107,7 @@ func TestPipelineConcurrency(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, _ = pipe.produce(ctx)
_, _ = pipe.Produce(ctx)
}()

wg.Add(1)
Expand Down Expand Up @@ -168,7 +168,7 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
a.Aggregate(1, *attribute.EmptySet())
}

out, err := test.pipe.produce(context.Background())
out, err := test.pipe.Produce(context.Background())
require.NoError(t, err)
require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline")
sm := out.ScopeMetrics[0]
Expand Down
Loading