diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index a924d879d00..3b65d70038e 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -28,7 +28,7 @@ import ( ) type reader struct { - producer producer + producer Producer temporalityFunc TemporalitySelector aggregationFunc AggregationSelector collectFunc func(context.Context) (metricdata.ResourceMetrics, error) @@ -42,7 +42,7 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n return r.aggregationFunc(kind) } -func (r *reader) register(p producer) { r.producer = p } +func (r *reader) RegisterProducer(p Producer) { r.producer = p } func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality { return r.temporalityFunc(kind) } diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 0ebfadf33a3..c32032d0cf1 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -28,9 +28,12 @@ import ( // manualReader is a simple Reader that allows an application to // read metrics on demand. type manualReader struct { - producer atomic.Value + sdkProducer atomic.Value shutdownOnce sync.Once + mu sync.Mutex + externalProducers atomic.Value + temporalitySelector TemporalitySelector aggregationSelector AggregationSelector } @@ -41,19 +44,31 @@ var _ = map[Reader]struct{}{&manualReader{}: {}} // NewManualReader returns a Reader which is directly called to collect metrics. func NewManualReader(opts ...ManualReaderOption) Reader { cfg := newManualReaderConfig(opts) - return &manualReader{ + r := &manualReader{ temporalitySelector: cfg.temporalitySelector, aggregationSelector: cfg.aggregationSelector, } + r.externalProducers.Store([]Producer{}) + return r } -// register stores the Producer which enables the caller to read +// RegisterProducer stores the Producer which enables the caller to read // metrics on demand. -func (mr *manualReader) register(p producer) { - // Only register once. If producer is already set, do nothing. - if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { - msg := "did not register manual reader" - global.Error(errDuplicateRegister, msg) +func (mr *manualReader) RegisterProducer(p Producer) { + if _, ok := p.(*pipeline); ok { + // Only register once. If sdk Producer is already set, do nothing. + if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.Produce}) { + msg := "did not RegisterProducer manual reader" + global.Error(errDuplicateRegister, msg) + } + } else { + mr.mu.Lock() + defer mr.mu.Unlock() + currentProducers := mr.externalProducers.Load().([]Producer) + newProducers := []Producer{} + newProducers = append(newProducers, currentProducers...) + newProducers = append(newProducers, p) + mr.externalProducers.Store(newProducers) } } @@ -77,18 +92,18 @@ func (mr *manualReader) Shutdown(context.Context) error { err := ErrReaderShutdown mr.shutdownOnce.Do(func() { // Any future call to Collect will now return ErrReaderShutdown. - mr.producer.Store(produceHolder{ - produce: shutdownProducer{}.produce, + mr.sdkProducer.Store(produceHolder{ + produce: shutdownProducer{}.Produce, }) err = nil }) return err } -// Collect gathers all metrics from the SDK, calling any callbacks necessary. -// Collect will return an error if called after shutdown. +// Collect gathers all metrics from the SDK and other Producers, calling any +// callbacks necessary. Collect will return an error if called after shutdown. func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - p := mr.producer.Load() + p := mr.sdkProducer.Load() if p == nil { return metricdata.ResourceMetrics{}, ErrReaderNotRegistered } @@ -99,11 +114,23 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics // this should never happen. In the unforeseen case that this does // happen, return an error instead of panicking so a users code does // not halt in the processes. - err := fmt.Errorf("manual reader: invalid producer: %T", p) + err := fmt.Errorf("manual reader: invalid Producer: %T", p) return metricdata.ResourceMetrics{}, err } - return ph.produce(ctx) + rm, err := ph.produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + for _, producer := range mr.externalProducers.Load().([]Producer) { + externalMetrics, err := producer.Produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + // ignore resource from external metrics, but append scope metrics + rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics.ScopeMetrics...) + } + return rm, nil } // manualReaderConfig contains configuration options for a ManualReader. diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 00ba1305595..87ee4a6c02b 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade cancel: cancel, done: make(chan struct{}), } + r.externalProducers.Store([]Producer{}) go func() { defer func() { close(r.done) }() @@ -126,7 +127,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade // periodicReader is a Reader that continuously collects and exports metric // data at a set interval. type periodicReader struct { - producer atomic.Value + sdkProducer atomic.Value + + mu sync.Mutex + externalProducers atomic.Value timeout time.Duration exporter Exporter @@ -165,12 +169,22 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) { } } -// register registers p as the producer of this reader. -func (r *periodicReader) register(p producer) { - // Only register once. If producer is already set, do nothing. - if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { - msg := "did not register periodic reader" - global.Error(errDuplicateRegister, msg) +// RegisterProducer registers p as the Producer of this reader. +func (r *periodicReader) RegisterProducer(p Producer) { + if _, ok := p.(*pipeline); ok { + // Only register once. If producer is already set, do nothing. + if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.Produce}) { + msg := "did not register periodic reader" + global.Error(errDuplicateRegister, msg) + } + } else { + r.mu.Lock() + defer r.mu.Unlock() + currentProducers := r.externalProducers.Load().([]Producer) + newProducers := []Producer{} + newProducers = append(newProducers, currentProducers...) + newProducers = append(newProducers, p) + r.externalProducers.Store(newProducers) } } @@ -195,12 +209,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error { } // Collect gathers and returns all metric data related to the Reader from -// the SDK. The returned metric data is not exported to the configured -// exporter, it is left to the caller to handle that if desired. +// the SDK and other Producers. The returned metric data is not exported +// to the configured exporter, it is left to the caller to handle that if +// desired. // // An error is returned if this is called after Shutdown. func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - return r.collect(ctx, r.producer.Load()) + return r.collect(ctx, r.sdkProducer.Load()) } // collect unwraps p as a produceHolder and returns its produce results. @@ -218,7 +233,20 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata err := fmt.Errorf("periodic reader: invalid producer: %T", p) return metricdata.ResourceMetrics{}, err } - return ph.produce(ctx) + + rm, err := ph.produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + for _, producer := range r.externalProducers.Load().([]Producer) { + externalMetrics, err := producer.Produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + // ignore resource from external metrics, but append scope metrics + rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics.ScopeMetrics...) + } + return rm, nil } // export exports metric data m using r's exporter. @@ -259,8 +287,8 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { <-r.done // Any future call to Collect will now return ErrReaderShutdown. - ph := r.producer.Swap(produceHolder{ - produce: shutdownProducer{}.produce, + ph := r.sdkProducer.Swap(produceHolder{ + produce: shutdownProducer{}.Produce, }) if ph != nil { // Reader was registered. diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index d48c1a7de8e..aced26b220f 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -114,7 +114,7 @@ func (ts *periodicReaderTestSuite) SetupTest() { } ts.ErrReader = NewPeriodicReader(e) - ts.ErrReader.register(testProducer{}) + ts.ErrReader.RegisterProducer(testSDKProducer()) } func (ts *periodicReaderTestSuite) TearDownTest() { @@ -193,7 +193,8 @@ func TestPeriodicReaderRun(t *testing.T) { } r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.RegisterProducer(testSDKProducer()) + r.RegisterProducer(testProducer{}) trigger <- time.Now() assert.Equal(t, assert.AnError, <-eh.Err) @@ -221,7 +222,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush", func(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.RegisterProducer(testSDKProducer()) + r.RegisterProducer(testProducer{}) assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") @@ -232,7 +234,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown", func(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.RegisterProducer(testSDKProducer()) + r.RegisterProducer(testProducer{}) assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") }) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index bc6901e5775..874888b2e4b 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -63,7 +63,8 @@ func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline } // pipeline connects all of the instruments created by a meter provider to a Reader. -// This is the object that will be `Reader.register()` when a meter provider is created. +// This is the object that will be `Reader.RegisterProducer()` when a meter provider +// is created. // // As instruments are created the instrument should be checked if it exists in the // views of a the Reader, and if so each aggregator should be added to the pipeline. @@ -108,10 +109,10 @@ type callbackKey int // they would have different integer values. const produceKey callbackKey = 0 -// produce returns aggregated metrics from a single collection. +// Produce returns aggregated metrics from a single collection. // // This method is safe to call concurrently. -func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { +func (p *pipeline) Produce(ctx context.Context) (metricdata.ResourceMetrics, error) { p.Lock() defer p.Unlock() @@ -433,7 +434,7 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli reader: r, views: views, } - r.register(p) + r.RegisterProducer(p) pipes = append(pipes, p) } return pipes diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index fe702a2d1ab..2cbbe9e8d3c 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -43,7 +43,7 @@ func (testSumAggregator) Aggregation() metricdata.Aggregation { func TestEmptyPipeline(t *testing.T) { pipe := &pipeline{} - output, err := pipe.produce(context.Background()) + output, err := pipe.Produce(context.Background()) require.NoError(t, err) assert.Nil(t, output.Resource) assert.Len(t, output.ScopeMetrics, 0) @@ -57,7 +57,7 @@ func TestEmptyPipeline(t *testing.T) { pipe.addCallback(func(ctx context.Context) {}) }) - output, err = pipe.produce(context.Background()) + output, err = pipe.Produce(context.Background()) require.NoError(t, err) assert.Nil(t, output.Resource) require.Len(t, output.ScopeMetrics, 1) @@ -67,7 +67,7 @@ func TestEmptyPipeline(t *testing.T) { func TestNewPipeline(t *testing.T) { pipe := newPipeline(nil, nil, nil) - output, err := pipe.produce(context.Background()) + output, err := pipe.Produce(context.Background()) require.NoError(t, err) assert.Equal(t, resource.Empty(), output.Resource) assert.Len(t, output.ScopeMetrics, 0) @@ -81,7 +81,7 @@ func TestNewPipeline(t *testing.T) { pipe.addCallback(func(ctx context.Context) {}) }) - output, err = pipe.produce(context.Background()) + output, err = pipe.Produce(context.Background()) require.NoError(t, err) assert.Equal(t, resource.Empty(), output.Resource) require.Len(t, output.ScopeMetrics, 1) @@ -92,7 +92,7 @@ func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) pipe := newPipeline(res, nil, nil) - output, err := pipe.produce(context.Background()) + output, err := pipe.Produce(context.Background()) assert.NoError(t, err) assert.Equal(t, res, output.Resource) } @@ -107,7 +107,7 @@ func TestPipelineConcurrency(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, _ = pipe.produce(ctx) + _, _ = pipe.Produce(ctx) }() wg.Add(1) @@ -168,7 +168,7 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { a.Aggregate(1, *attribute.EmptySet()) } - out, err := test.pipe.produce(context.Background()) + out, err := test.pipe.Produce(context.Background()) require.NoError(t, err) require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline") sm := out.ScopeMetrics[0] diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index aa9d50ef666..fe3a3eedf7e 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -48,10 +48,10 @@ var ErrReaderShutdown = fmt.Errorf("reader is shutdown") // Pull-based exporters will typically implement Register // themselves, since they read on demand. type Reader interface { - // register registers a Reader with a MeterProvider. - // The producer argument allows the Reader to signal the sdk to collect - // and send aggregated metric measurements. - register(producer) + // RegisterProducer registers a Reader with a Producer. + // The Producer argument allows the Reader to signal the Producer to + // collect and send aggregated metric measurements. + RegisterProducer(Producer) // temporality reports the Temporality for the instrument kind provided. temporality(InstrumentKind) metricdata.Temporality @@ -84,15 +84,15 @@ type Reader interface { Shutdown(context.Context) error } -// producer produces metrics for a Reader. -type producer interface { - // produce returns aggregated metrics from a single collection. +// Producer produces metrics for a Reader. +type Producer interface { + // Produce returns aggregated metrics from a single collection. // // This method is safe to call concurrently. - produce(context.Context) (metricdata.ResourceMetrics, error) + Produce(context.Context) (metricdata.ResourceMetrics, error) } -// produceHolder is used as an atomic.Value to wrap the non-concrete producer +// produceHolder is used as an atomic.Value to wrap the non-concrete Producer // type. type produceHolder struct { produce func(context.Context) (metricdata.ResourceMetrics, error) @@ -101,8 +101,8 @@ type produceHolder struct { // shutdownProducer produces an ErrReaderShutdown error always. type shutdownProducer struct{} -// produce returns an ErrReaderShutdown error. -func (p shutdownProducer) produce(context.Context) (metricdata.ResourceMetrics, error) { +// Produce returns an ErrReaderShutdown error. +func (p shutdownProducer) Produce(context.Context) (metricdata.ResourceMetrics, error) { return metricdata.ResourceMetrics{}, ErrReaderShutdown } diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 28b249bd3e2..aebe4f4e13a 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -58,7 +58,8 @@ func (ts *readerTestSuite) TestErrorForNotRegistered() { } func (ts *readerTestSuite) TestProducer() { - ts.Reader.register(testProducer{}) + ts.Reader.RegisterProducer(testSDKProducer()) + ts.Reader.RegisterProducer(testProducer{}) m, err := ts.Reader.Collect(context.Background()) ts.NoError(err) ts.Equal(testMetrics, m) @@ -66,7 +67,7 @@ func (ts *readerTestSuite) TestProducer() { func (ts *readerTestSuite) TestCollectAfterShutdown() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.RegisterProducer(testSDKProducer()) ts.Require().NoError(ts.Reader.Shutdown(ctx)) m, err := ts.Reader.Collect(ctx) @@ -76,41 +77,42 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() { func (ts *readerTestSuite) TestShutdownTwice() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.RegisterProducer(testSDKProducer()) ts.Require().NoError(ts.Reader.Shutdown(ctx)) ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown) } func (ts *readerTestSuite) TestMultipleForceFlush() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.RegisterProducer(testSDKProducer()) ts.Require().NoError(ts.Reader.ForceFlush(ctx)) ts.NoError(ts.Reader.ForceFlush(ctx)) } func (ts *readerTestSuite) TestMultipleRegister() { - p0 := testProducer{ - produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { - // Differentiate this producer from the second by returning an - // error. - return testMetrics, assert.AnError - }, + ctx, cancel := context.WithCancel(context.Background()) + p0 := &pipeline{ + callbacks: []func(_ context.Context){func(_ context.Context) { + // Differentiate this producer from the second by cancelling the context + // to generate an error. + cancel() + }}, } - p1 := testProducer{} + p1 := testSDKProducer() - ts.Reader.register(p0) + ts.Reader.RegisterProducer(p0) // This should be ignored. - ts.Reader.register(p1) + ts.Reader.RegisterProducer(p1) - _, err := ts.Reader.Collect(context.Background()) - ts.Equal(assert.AnError, err) + _, err := ts.Reader.Collect(ctx) + ts.Equal(context.Canceled, err) } func (ts *readerTestSuite) TestMethodConcurrency() { // Requires the race-detector (a default test option for the project). // All reader methods should be concurrent-safe. - ts.Reader.register(testProducer{}) + ts.Reader.RegisterProducer(testSDKProducer()) ctx := context.Background() var wg sync.WaitGroup @@ -141,7 +143,7 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { ctx := context.Background() ts.Require().NoError(ts.Reader.Shutdown(ctx)) // Registering after shutdown should not revert the shutdown. - ts.Reader.register(testProducer{}) + ts.Reader.RegisterProducer(testSDKProducer()) m, err := ts.Reader.Collect(ctx) ts.ErrorIs(err, ErrReaderShutdown) @@ -170,11 +172,15 @@ var testMetrics = metricdata.ResourceMetrics{ }}, } +func testSDKProducer() Producer { + return &pipeline{resource: resource.NewSchemaless(attribute.String("test", "Reader"))} +} + type testProducer struct { produceFunc func(context.Context) (metricdata.ResourceMetrics, error) } -func (p testProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { +func (p testProducer) Produce(ctx context.Context) (metricdata.ResourceMetrics, error) { if p.produceFunc != nil { return p.produceFunc(ctx) } @@ -183,7 +189,8 @@ func (p testProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, func benchReaderCollectFunc(r Reader) func(*testing.B) { ctx := context.Background() - r.register(testProducer{}) + r.RegisterProducer(testSDKProducer()) + r.RegisterProducer(testProducer{}) // Store bechmark results in a closure to prevent the compiler from // inlining and skipping the function.