Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics Enricher API #1271

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
468688c
add metrics processor
hstan Oct 20, 2020
fc0b575
remove unused method
hstan Oct 20, 2020
4e641ac
move interface and add comments
hstan Oct 21, 2020
fe2c130
add comments for MetricsProcessor interface
hstan Oct 21, 2020
dbe97fe
Merge branch 'master' into add-metrics-processor
hstan Oct 21, 2020
b297e94
add changelog
hstan Oct 28, 2020
ee0f8fd
Merge branch 'master' of github.com:open-telemetry/opentelemetry-go i…
hstan Oct 28, 2020
21c833f
make the option api accept variadic signature for metrics processors
hstan Oct 29, 2020
7b11574
split changelog into two separate lines
hstan Oct 29, 2020
015599a
update `MetricsProcessor` documentation
hstan Oct 30, 2020
adf34f1
Merge branch 'master' of github.com:hstan/opentelemetry-go into add-m…
hstan Nov 11, 2020
e6b051e
use a function type instead of interface and rename it to a more mean…
hstan Nov 11, 2020
afa378d
update changelog
hstan Nov 11, 2020
8ea1616
add metrics labels enricher to pull controller
hstan Nov 11, 2020
f73d433
add test for pull with metrics labels enricher
hstan Nov 11, 2020
208a049
add test for push with metrics labels enricher
hstan Nov 11, 2020
4f91f17
sort import order
hstan Nov 11, 2020
4739df6
update changelog
hstan Nov 11, 2020
85faa2e
handle error returned from the enricher function
hstan Nov 11, 2020
b94b328
Merge branch 'master' of github.com:open-telemetry/opentelemetry-go i…
hstan Nov 13, 2020
b3caafa
clean up
hstan Dec 14, 2020
a22e258
Merge branch 'master' of github.com:open-telemetry/opentelemetry-go i…
hstan Dec 14, 2020
6dab82e
fix tests
hstan Dec 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type Config struct {
// Resource describes all the metric records processed by the
// Accumulator.
Resource *resource.Resource

// MetricProcessors are executed each time a metric is recorded
// by the Accumulator's sync instrument implementation
MetricsProcessors []MetricsProcessor
}

// Option is the interface that applies the value to a configuration option.
Expand All @@ -43,3 +47,13 @@ type resourceOption struct {
func (o resourceOption) Apply(config *Config) {
config.Resource = o.Resource
}

func WithMetricsProcessors(processors []MetricsProcessor) Option {
hstan marked this conversation as resolved.
Show resolved Hide resolved
return metricsProcessorsOption(processors)
}

type metricsProcessorsOption []MetricsProcessor

func (p metricsProcessorsOption) Apply(config *Config) {
config.MetricsProcessors = p
hstan marked this conversation as resolved.
Show resolved Hide resolved
}
13 changes: 13 additions & 0 deletions sdk/metric/controller/push/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package push
import (
"time"

"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand All @@ -33,6 +34,8 @@ type Config struct {
// integrate, and export) can last before it is canceled. Defaults to
// the controller push period.
Timeout time.Duration

MetricsProcessors []metric.MetricsProcessor
}

// Option is the interface that applies the value to a configuration option.
Expand Down Expand Up @@ -73,3 +76,13 @@ type timeoutOption time.Duration
func (o timeoutOption) Apply(config *Config) {
config.Timeout = time.Duration(o)
}

func WithMetricsProcessor(p metric.MetricsProcessor) Option {
hstan marked this conversation as resolved.
Show resolved Hide resolved
return metricsProcessorOption{p}
}

type metricsProcessorOption struct{ metric.MetricsProcessor }

func (m metricsProcessorOption) Apply(config *Config) {
config.MetricsProcessors = append(config.MetricsProcessors, m.MetricsProcessor)
}
1 change: 1 addition & 0 deletions sdk/metric/controller/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Opt
impl := sdk.NewAccumulator(
hstan marked this conversation as resolved.
Show resolved Hide resolved
checkpointer,
sdk.WithResource(c.Resource),
sdk.WithMetricsProcessors(c.MetricsProcessors),
)
return &Controller{
provider: registry.NewMeterProvider(impl),
Expand Down
36 changes: 29 additions & 7 deletions sdk/metric/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type (

// resource is applied to all records in this Accumulator.
resource *resource.Resource

// metricsProcessors are applied to all records in this Accumulator
metricsProcessors []MetricsProcessor
}

syncInstrument struct {
Expand Down Expand Up @@ -123,8 +126,9 @@ type (
}

instrument struct {
meter *Accumulator
descriptor otel.Descriptor
meter *Accumulator
descriptor otel.Descriptor
metricsProcessors []MetricsProcessor
}

asyncInstrument struct {
Expand All @@ -139,6 +143,15 @@ type (
labels *label.Set
observed export.Aggregator
}

// MetricsProcessor can be provided as an config option when creating
// a new push controller
hstan marked this conversation as resolved.
Show resolved Hide resolved
MetricsProcessor interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetricsProcessor does not seem like an appropriate name for this interface. It is overly general, uses the word "metrics" as a noun instead of an adjective (something that is avoided in OpenTelemetry), and duplicates the name of the other metric.Processor from the export package, which is is also a field of the Accumulator.

Can we use a more descriptive name of the function this interface will serve and possibly match broader naming standards?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be an interface? It seems like a declared type of func would be sufficient here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right

// OnMetricRecorded is execute everytime a metric is recorded by
// the sync instrument implementation of an Accumulator, it generally
// provides ability to correlate the context with the metrics
OnMetricRecorded(context.Context, *[]label.KeyValue)
Copy link

@chenzhihao chenzhihao Oct 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this method be too specific?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing the added indirection here of a reference type for the second parameter means that argument will be mutated. This doesn't seem idiomatic of Go and it will likely add an additional allocation of the slice pointer and pose concurrency issues.

Is there any reason this could not accept a slice and return a slice?

Additionally, how are errors expected to be handled here?

}
)

var (
Expand All @@ -154,6 +167,10 @@ func (inst *instrument) Descriptor() api.Descriptor {
return inst.descriptor
}

func (inst *instrument) getMetricsProcessors() []MetricsProcessor {
return inst.metricsProcessors
}

func (a *asyncInstrument) Implementation() interface{} {
return a
}
Expand Down Expand Up @@ -291,6 +308,9 @@ func (s *syncInstrument) Bind(kvs []label.KeyValue) api.BoundSyncImpl {
}

func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs []label.KeyValue) {
for _, processor := range s.getMetricsProcessors() {
processor.OnMetricRecorded(ctx, &kvs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the processors are only invoked for synchronous instruments, do they not have any value for async instruments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aneurysm9 TBH we haven't started using the async instrument yet, but when implementing this feature I did looked it. Firstly I didn't find a proper place to add this hook then when I explore the code I reckon it is unnecessary to do that since for async instrument the client can provide a callback which already gives ability to achive the same purpose.
e.g. https://github.com/open-telemetry/opentelemetry-go/blob/master/example/basic/main.go#L71 you can re-compose the metric labels using the context and commonLabels in the callback function. (Please correct me if I'm wrong)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to use this interface for asynchronous instruments, but the applications will be different. There's no distributed context available for this Enricher API, but you still might want to use this API to filter labels before they enter the Accumulator. Enricher is not a good name for a filter, so it feels like we still haven't found the right name for this concept.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
h := s.acquireHandle(kvs, nil)
defer h.Unbind()
h.RecordOne(ctx, number)
Expand All @@ -312,18 +332,20 @@ func NewAccumulator(processor export.Processor, opts ...Option) *Accumulator {
}

return &Accumulator{
processor: processor,
asyncInstruments: internal.NewAsyncInstrumentState(),
resource: c.Resource,
asyncInstruments: internal.NewAsyncInstrumentState(),
processor: processor,
resource: c.Resource,
metricsProcessors: c.MetricsProcessors,
}
}

// NewSyncInstrument implements api.MetricImpl.
func (m *Accumulator) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) {
return &syncInstrument{
instrument: instrument{
descriptor: descriptor,
meter: m,
descriptor: descriptor,
meter: m,
metricsProcessors: m.metricsProcessors,
},
}, nil
}
Expand Down