Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Metric Producer as a new interface, which returns scope metrics #3524

Merged
merged 15 commits into from
Dec 15, 2022
Next Next commit
add RegisterProducer method and metric.Producer interface
dashpole committed Dec 9, 2022

Verified

This commit was signed with the committer’s verified signature.
dashpole David Ashpole
commit ef675bfbe354bc274f1de6ac8f917676b90f068b
16 changes: 9 additions & 7 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
@@ -28,12 +28,13 @@ import (
)

type reader struct {
producer producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
producer sdkProducer
externalProducers []Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}

var _ Reader = (*reader)(nil)
@@ -42,7 +43,8 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n
return r.aggregationFunc(kind)
}

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) register(p sdkProducer) { r.producer = p }
func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.externalProducers, p) }
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind)
}
50 changes: 39 additions & 11 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
@@ -28,9 +28,12 @@ import (
// manualReader is a simple Reader that allows an application to
// read metrics on demand.
type manualReader struct {
producer atomic.Value
sdkProducer atomic.Value
shutdownOnce sync.Once

mu sync.Mutex
externalProducers atomic.Value

temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
}
@@ -41,22 +44,36 @@ var _ = map[Reader]struct{}{&manualReader{}: {}}
// NewManualReader returns a Reader which is directly called to collect metrics.
func NewManualReader(opts ...ManualReaderOption) Reader {
cfg := newManualReaderConfig(opts)
return &manualReader{
r := &manualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
r.externalProducers.Store([]Producer{})
return r
}

// register stores the Producer which enables the caller to read
// metrics on demand.
func (mr *manualReader) register(p producer) {
// register stores the sdkProducer which enables the caller
// to read metrics from the SDK on demand.
func (mr *manualReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register manual reader"
global.Error(errDuplicateRegister, msg)
}
}

// RegisterProducer stores the external Producer which enables the caller
// to read metrics on demand.
func (mr *manualReader) RegisterProducer(p Producer) {
mr.mu.Lock()
defer mr.mu.Unlock()
currentProducers := mr.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
mr.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
@@ -77,18 +94,18 @@ func (mr *manualReader) Shutdown(context.Context) error {
err := ErrReaderShutdown
mr.shutdownOnce.Do(func() {
// Any future call to Collect will now return ErrReaderShutdown.
mr.producer.Store(produceHolder{
mr.sdkProducer.Store(produceHolder{
produce: shutdownProducer{}.produce,
})
err = nil
})
return err
}

// Collect gathers all metrics from the SDK, calling any callbacks necessary.
// Collect will return an error if called after shutdown.
// Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
p := mr.producer.Load()
p := mr.sdkProducer.Load()
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
}
@@ -103,7 +120,18 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
return metricdata.ResourceMetrics{}, err
}

return ph.produce(ctx)
rm, err := ph.produce(ctx)
if err != nil {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
return metricdata.ResourceMetrics{}, err
}
for _, producer := range mr.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
dashpole marked this conversation as resolved.
Show resolved Hide resolved
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, nil
}

// manualReaderConfig contains configuration options for a ManualReader.
44 changes: 36 additions & 8 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
@@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
cancel: cancel,
done: make(chan struct{}),
}
r.externalProducers.Store([]Producer{})
dashpole marked this conversation as resolved.
Show resolved Hide resolved

go func() {
defer func() { close(r.done) }()
@@ -126,7 +127,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
// periodicReader is a Reader that continuously collects and exports metric
// data at a set interval.
type periodicReader struct {
producer atomic.Value
sdkProducer atomic.Value

mu sync.Mutex
externalProducers atomic.Value

timeout time.Duration
exporter Exporter
@@ -166,14 +170,25 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
}

// register registers p as the producer of this reader.
func (r *periodicReader) register(p producer) {
func (r *periodicReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
global.Error(errDuplicateRegister, msg)
}
}

// RegisterProducer registers p as an external Producer of this reader.
func (r *periodicReader) RegisterProducer(p Producer) {
r.mu.Lock()
defer r.mu.Unlock()
currentProducers := r.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
r.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
@@ -195,12 +210,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
// the SDK and other Producers. The returned metric data is not exported
// to the configured exporter, it is left to the caller to handle that if
// desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collect(ctx, r.producer.Load())
return r.collect(ctx, r.sdkProducer.Load())
}

// collect unwraps p as a produceHolder and returns its produce results.
@@ -218,7 +234,19 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
}
return ph.produce(ctx)

rm, err := ph.produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, nil
}

// export exports metric data m using r's exporter.
@@ -259,7 +287,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
<-r.done

// Any future call to Collect will now return ErrReaderShutdown.
ph := r.producer.Swap(produceHolder{
ph := r.sdkProducer.Swap(produceHolder{
produce: shutdownProducer{}.produce,
})

19 changes: 16 additions & 3 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
@@ -51,7 +51,12 @@ type Reader interface {
// register registers a Reader with a MeterProvider.
// The producer argument allows the Reader to signal the sdk to collect
// and send aggregated metric measurements.
register(producer)
register(sdkProducer)

// RegisterProducer registers a Reader with an external Producer.
// The Producer argument allows the Reader to signal the Producer to
// collect and send aggregated metric measurements.
RegisterProducer(Producer)

// temporality reports the Temporality for the instrument kind provided.
temporality(InstrumentKind) metricdata.Temporality
@@ -84,14 +89,22 @@ type Reader interface {
Shutdown(context.Context) error
}

// producer produces metrics for a Reader.
type producer interface {
// sdkProducer produces metrics for a Reader.
type sdkProducer interface {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
produce(context.Context) (metricdata.ResourceMetrics, error)
}

// Producer produces metrics for a Reader from an external source.
type Producer interface {
// Produce returns aggregated metrics from an external source.
//
// This method should be safe to call concurrently.
Produce(context.Context) ([]metricdata.ScopeMetrics, error)
}

// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {