From d9003cf7d4ffb1dd6555cf7b3016767e1f0cafa0 Mon Sep 17 00:00:00 2001 From: Josh MacDonald Date: Wed, 23 Dec 2020 00:07:19 -0800 Subject: [PATCH 01/13] Add enricher interface --- sdk/export/metric/metric.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 525fe8f3a63..2293c3f1c30 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -125,6 +125,26 @@ type Checkpointer interface { FinishCollection() error } +// Enricher supports extracting baggage attributes and applying them +// as metric event labels and also permits erasing metric labels at +// runtime. When configured with an Accumulator, the Enricher is +// applied to all synchronous instruments. +// +// This receives the context and the event labels and +// returns the effective KeyValue slice. If this returns a +// nil KeyValue slice or a non-nil error, the caller SHOULD +// use the original KeyValue slice. +// +// This SHOULD NOT modify the input label slice. +// +// This is permitted to add or remove labels and are applied +// to all synchronous instrument events. +// +// Note: This interface does not include the *metric.Descriptor +// because it creates significant complexity and/or cost to enrich +// RecordBatch() events. +type Enricher func(context.Context, []label.KeyValue) ([]label.KeyValue, error) + // Aggregator implements a specific aggregation behavior, e.g., a // behavior to track a sequence of updates to an instrument. Sum-only // instruments commonly use a simple Sum aggregator, but for the From 5d670fad7c545996fe544ee2b679e9e07ebc658d Mon Sep 17 00:00:00 2001 From: Josh MacDonald Date: Wed, 23 Dec 2020 00:07:38 -0800 Subject: [PATCH 02/13] Factor expectOutput in correct_test.go --- sdk/metric/correct_test.go | 42 ++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 5ccba31af82..57c7ead32db 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -98,11 +98,20 @@ func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessPro accum := metricsdk.NewAccumulator( processor, testResource, + nil, ) meter := metric.WrapMeterImpl(accum, "test") return meter, accum, processor } +func expectOutput(t *testing.T, processor *correctnessProcessor, expect map[string]float64) { + out := processortest.NewOutput(label.DefaultEncoder()) + for _, rec := range processor.accumulations { + require.NoError(t, out.AddAccumulation(rec)) + } + require.EqualValues(t, expect, out.Map()) +} + func (ci *correctnessProcessor) Process(accumulation export.Accumulation) error { ci.accumulations = append(ci.accumulations, accumulation) return nil @@ -351,12 +360,9 @@ func TestObserverCollection(t *testing.T) { collected := sdk.Collect(ctx) require.Equal(t, collected, len(processor.accumulations)) - out := processortest.NewOutput(label.DefaultEncoder()) - for _, rec := range processor.accumulations { - require.NoError(t, out.AddAccumulation(rec)) - } mult := float64(mult) - require.EqualValues(t, map[string]float64{ + + expectOutput(t, processor, map[string]float64{ "float.valueobserver.lastvalue/A=B/R=V": -mult, "float.valueobserver.lastvalue/C=D/R=V": -mult, "int.valueobserver.lastvalue//R=V": mult, @@ -371,7 +377,7 @@ func TestObserverCollection(t *testing.T) { "float.updownsumobserver.sum/C=D/R=V": mult, "int.updownsumobserver.sum//R=V": -mult, "int.updownsumobserver.sum/A=B/R=V": mult, - }, out.Map()) + }) } } @@ -456,11 +462,7 @@ func TestObserverBatch(t *testing.T) { require.Equal(t, collected, len(processor.accumulations)) - out := processortest.NewOutput(label.DefaultEncoder()) - for _, rec := range processor.accumulations { - require.NoError(t, out.AddAccumulation(rec)) - } - require.EqualValues(t, map[string]float64{ + expectOutput(t, processor, map[string]float64{ "float.sumobserver.sum//R=V": 1.1, "float.sumobserver.sum/A=B/R=V": 1000, "int.sumobserver.sum//R=V": 10, @@ -475,7 +477,7 @@ func TestObserverBatch(t *testing.T) { "float.valueobserver.lastvalue/C=D/R=V": -1, "int.valueobserver.lastvalue//R=V": 1, "int.valueobserver.lastvalue/A=B/R=V": 1, - }, out.Map()) + }) } func TestRecordBatch(t *testing.T) { @@ -501,16 +503,12 @@ func TestRecordBatch(t *testing.T) { sdk.Collect(ctx) - out := processortest.NewOutput(label.DefaultEncoder()) - for _, rec := range processor.accumulations { - require.NoError(t, out.AddAccumulation(rec)) - } - require.EqualValues(t, map[string]float64{ + expectOutput(t, processor, map[string]float64{ "int64.sum/A=B,C=D/R=V": 1, "float64.sum/A=B,C=D/R=V": 2, "int64.exact/A=B,C=D/R=V": 3, "float64.exact/A=B,C=D/R=V": 4, - }, out.Map()) + }) } // TestRecordPersistence ensures that a direct-called instrument that @@ -583,12 +581,8 @@ func TestSyncInAsync(t *testing.T) { sdk.Collect(ctx) - out := processortest.NewOutput(label.DefaultEncoder()) - for _, rec := range processor.accumulations { - require.NoError(t, out.AddAccumulation(rec)) - } - require.EqualValues(t, map[string]float64{ + expectOutput(t, processor, map[string]float64{ "counter.sum//R=V": 100, "observer.lastvalue//R=V": 10, - }, out.Map()) + }) } From 383ffc76b0805455b1bc772a73585ec887828d7f Mon Sep 17 00:00:00 2001 From: Josh MacDonald Date: Wed, 23 Dec 2020 00:07:48 -0800 Subject: [PATCH 03/13] Add new arg to Accumulator --- sdk/metric/benchmark_test.go | 2 +- sdk/metric/controller/pull/pull.go | 1 + sdk/metric/controller/push/push.go | 1 + sdk/metric/processor/basic/basic_test.go | 2 +- .../processor/processortest/test_test.go | 1 + sdk/metric/processor/reducer/reducer_test.go | 2 + sdk/metric/sdk.go | 48 +++++++++++++++++-- sdk/metric/stress_test.go | 2 +- 8 files changed, 52 insertions(+), 7 deletions(-) diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index bb32bb34cdb..b117bfdb8bc 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -42,7 +42,7 @@ func newFixture(b *testing.B) *benchFixture { AggregatorSelector: processortest.AggregatorSelector(), } - bf.accumulator = sdk.NewAccumulator(bf, nil) + bf.accumulator = sdk.NewAccumulator(bf, nil, nil) bf.meter = metric.WrapMeterImpl(bf.accumulator, "benchmarks") return bf } diff --git a/sdk/metric/controller/pull/pull.go b/sdk/metric/controller/pull/pull.go index 68ea34b351c..cb4bcb2f258 100644 --- a/sdk/metric/controller/pull/pull.go +++ b/sdk/metric/controller/pull/pull.go @@ -61,6 +61,7 @@ func New(checkpointer export.Checkpointer, options ...Option) *Controller { accum := sdk.NewAccumulator( checkpointer, config.Resource, + nil, ) return &Controller{ accumulator: accum, diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index 4350de32de5..824e42e3304 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -62,6 +62,7 @@ func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Opt impl := sdk.NewAccumulator( checkpointer, c.Resource, + nil, ) return &Controller{ provider: registry.NewMeterProvider(impl), diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index 016de9677bd..ec4f79b9c53 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -474,7 +474,7 @@ func TestSumObserverEndToEnd(t *testing.T) { processorTest.AggregatorSelector(), eselector, ) - accum := sdk.NewAccumulator(proc, resource.Empty()) + accum := sdk.NewAccumulator(proc, resource.Empty(), nil) meter := metric.WrapMeterImpl(accum, "testing") var calls int64 diff --git a/sdk/metric/processor/processortest/test_test.go b/sdk/metric/processor/processortest/test_test.go index c4541d2656a..dc19da9a561 100644 --- a/sdk/metric/processor/processortest/test_test.go +++ b/sdk/metric/processor/processortest/test_test.go @@ -33,6 +33,7 @@ func generateTestData(proc export.Processor) { accum := metricsdk.NewAccumulator( proc, resource.NewWithAttributes(label.String("R", "V")), + nil, ) meter := metric.WrapMeterImpl(accum, "testing") diff --git a/sdk/metric/processor/reducer/reducer_test.go b/sdk/metric/processor/reducer/reducer_test.go index 138f3825dde..5aaa2edc111 100644 --- a/sdk/metric/processor/reducer/reducer_test.go +++ b/sdk/metric/processor/reducer/reducer_test.go @@ -76,6 +76,7 @@ func TestFilterProcessor(t *testing.T) { accum := metricsdk.NewAccumulator( reducer.New(testFilter{}, processorTest.Checkpointer(testProc)), resource.NewWithAttributes(label.String("R", "V")), + nil, ) generateData(accum) @@ -93,6 +94,7 @@ func TestFilterBasicProcessor(t *testing.T) { accum := metricsdk.NewAccumulator( reducer.New(testFilter{}, basicProc), resource.NewWithAttributes(label.String("R", "V")), + nil, ) exporter := processorTest.NewExporter(basicProc, label.DefaultEncoder()) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 64bfa9b1dd4..9bce81fd098 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -67,6 +67,9 @@ type ( // resource is applied to all records in this Accumulator. resource *resource.Resource + + // enricher (optional) extracts labels from baggage. + enricher export.Enricher } syncInstrument struct { @@ -123,7 +126,7 @@ type ( } instrument struct { - meter *Accumulator + meter *Accumulator // TODO rename 'accumulator' descriptor metric.Descriptor } @@ -287,9 +290,12 @@ func (s *syncInstrument) Bind(kvs []label.KeyValue) metric.BoundSyncImpl { } func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs []label.KeyValue) { + // Introduce labels from baggage. + kvs = s.meter.enrich(ctx, kvs) + h := s.acquireHandle(kvs, nil) defer h.Unbind() - h.RecordOne(ctx, num) + h.recordEvent(ctx, num) } // NewAccumulator constructs a new Accumulator for the given @@ -301,11 +307,12 @@ func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs [ // processor will call Collect() when it receives a request to scrape // current metric values. A push-based processor should configure its // own periodic collection. -func NewAccumulator(processor export.Processor, resource *resource.Resource) *Accumulator { +func NewAccumulator(processor export.Processor, resource *resource.Resource, enricher export.Enricher) *Accumulator { return &Accumulator{ processor: processor, asyncInstruments: internal.NewAsyncInstrumentState(), resource: resource, + enricher: enricher, } } @@ -352,6 +359,20 @@ func (m *Accumulator) Collect(ctx context.Context) int { return checkpointed } +func (m *Accumulator) enrich(ctx context.Context, kvs []label.KeyValue) []label.KeyValue { + if m.enricher == nil { + return kvs + } + out, err := m.enricher(ctx, kvs) + + // Return the enricher result if it is non-nil and no error. + if err == nil && out != nil { + return out + } + otel.Handle(err) + return kvs +} + func (m *Accumulator) collectSyncInstruments() int { checkpointed := 0 @@ -474,6 +495,9 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { // RecordBatch enters a batch of metric events. func (m *Accumulator) RecordBatch(ctx context.Context, kvs []label.KeyValue, measurements ...metric.Measurement) { + // Introduce labels from baggage. + kvs = m.enrich(ctx, kvs) + // Labels will be computed the first time acquireHandle is // called. Subsequent calls to acquireHandle will re-use the // previously computed value instead of recomputing the @@ -492,12 +516,27 @@ func (m *Accumulator) RecordBatch(ctx context.Context, kvs []label.KeyValue, mea } defer h.Unbind() - h.RecordOne(ctx, meas.Number()) + h.recordEvent(ctx, meas.Number()) } } // RecordOne implements metric.SyncImpl. func (r *record) RecordOne(ctx context.Context, num number.Number) { + if r.inst.meter.enricher == nil { + r.recordEvent(ctx, num) + return + } + + // Note: When there is an enricher, the bound instrument loses + // performance when labels are introduced. The ToSlice() below + // could be stored in the instrument when enricher != nil, to + // avoid this cost. + + // Call the unbound instrument path when the enricher is in use. + r.inst.RecordOne(ctx, num, r.labels.ToSlice()) +} + +func (r *record) recordEvent(ctx context.Context, num number.Number) { if r.current == nil { // The instrument is disabled according to the AggregatorSelector. return @@ -506,6 +545,7 @@ func (r *record) RecordOne(ctx context.Context, num number.Number) { otel.Handle(err) return } + if err := r.current.Update(ctx, num, &r.inst.descriptor); err != nil { otel.Handle(err) return diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index 4878435bde3..e27e6d89403 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -294,7 +294,7 @@ func stressTest(t *testing.T, impl testImpl) { } cc := concurrency() - sdk := NewAccumulator(fixture, nil) + sdk := NewAccumulator(fixture, nil, nil) meter := metric.WrapMeterImpl(sdk, "stress_test") fixture.wg.Add(cc + 1) From abe1454f6e973af52c6cbaff8cb13f4d298aba61 Mon Sep 17 00:00:00 2001 From: Josh MacDonald Date: Wed, 23 Dec 2020 00:43:56 -0800 Subject: [PATCH 04/13] Comment --- sdk/export/metric/metric.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 2293c3f1c30..010fb742712 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -143,6 +143,14 @@ type Checkpointer interface { // Note: This interface does not include the *metric.Descriptor // because it creates significant complexity and/or cost to enrich // RecordBatch() events. +// +// Note: This interface is called with input labels before they are +// sorted and de-duplicated as described in +// label.NewSetWithSortableFiltered(). The enricher has control over +// whether label values should override the input making use of the +// last-value semantic detailed for metric event labels in general. +// Appending baggage values means they override the call-site labels, +// prepending baggage means call-site labels override baggage labels. type Enricher func(context.Context, []label.KeyValue) ([]label.KeyValue, error) // Aggregator implements a specific aggregation behavior, e.g., a From 3b643a2f91cb72bbfad7d70679df28c9eb84ce53 Mon Sep 17 00:00:00 2001 From: Josh MacDonald Date: Wed, 23 Dec 2020 00:48:34 -0800 Subject: [PATCH 05/13] Add baggage ForEach to avoid copies, and a test --- baggage/baggage.go | 6 ++++++ baggage/baggage_test.go | 21 +++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/baggage/baggage.go b/baggage/baggage.go index 66b8416f1f3..e20ceebc600 100644 --- a/baggage/baggage.go +++ b/baggage/baggage.go @@ -36,6 +36,12 @@ func Set(ctx context.Context) label.Set { return label.NewSet(values...) } +// ForEach allows visiting the baggage values in a context without +// copying a slice. +func ForEach(ctx context.Context, f func(kv label.KeyValue) bool) { + baggage.MapFromContext(ctx).Foreach(f) +} + // Value returns the value related to key in the baggage of ctx. If no // value is set, the returned label.Value will be an uninitialized zero-value // with type INVALID. diff --git a/baggage/baggage_test.go b/baggage/baggage_test.go index f87c3b2b64d..dbb5bdf5c00 100644 --- a/baggage/baggage_test.go +++ b/baggage/baggage_test.go @@ -18,6 +18,8 @@ import ( "context" "testing" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/internal/baggage" "go.opentelemetry.io/otel/label" ) @@ -84,3 +86,22 @@ func TestBaggage(t *testing.T) { t.Fatal("WithoutBaggage failed to clear baggage") } } + +func TestForEach(t *testing.T) { + ctx := ContextWithValues( + context.Background(), + label.String("A", "B"), + label.String("C", "D"), + ) + + out := map[string]string{} + ForEach(ctx, func(kv label.KeyValue) bool { + out[string(kv.Key)] = kv.Value.Emit() + return true + }) + + require.EqualValues(t, map[string]string{ + "A": "B", + "C": "D", + }, out) +} From 6ad22650ada84651267c422a3b4ce62f8b51c5fb Mon Sep 17 00:00:00 2001 From: Josh MacDonald Date: Wed, 23 Dec 2020 00:48:39 -0800 Subject: [PATCH 06/13] More test --- sdk/metric/correct_test.go | 131 +++++++++++++++++++++++++++++++++++-- 1 file changed, 126 insertions(+), 5 deletions(-) diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 57c7ead32db..ad39a36fc88 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" @@ -104,12 +105,13 @@ func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessPro return meter, accum, processor } -func expectOutput(t *testing.T, processor *correctnessProcessor, expect map[string]float64) { +func expectAndResetOutput(t *testing.T, processor *correctnessProcessor, expect map[string]float64) { out := processortest.NewOutput(label.DefaultEncoder()) for _, rec := range processor.accumulations { require.NoError(t, out.AddAccumulation(rec)) } require.EqualValues(t, expect, out.Map()) + processor.accumulations = nil } func (ci *correctnessProcessor) Process(accumulation export.Accumulation) error { @@ -362,7 +364,7 @@ func TestObserverCollection(t *testing.T) { mult := float64(mult) - expectOutput(t, processor, map[string]float64{ + expectAndResetOutput(t, processor, map[string]float64{ "float.valueobserver.lastvalue/A=B/R=V": -mult, "float.valueobserver.lastvalue/C=D/R=V": -mult, "int.valueobserver.lastvalue//R=V": mult, @@ -462,7 +464,7 @@ func TestObserverBatch(t *testing.T) { require.Equal(t, collected, len(processor.accumulations)) - expectOutput(t, processor, map[string]float64{ + expectAndResetOutput(t, processor, map[string]float64{ "float.sumobserver.sum//R=V": 1.1, "float.sumobserver.sum/A=B/R=V": 1000, "int.sumobserver.sum//R=V": 10, @@ -503,7 +505,7 @@ func TestRecordBatch(t *testing.T) { sdk.Collect(ctx) - expectOutput(t, processor, map[string]float64{ + expectAndResetOutput(t, processor, map[string]float64{ "int64.sum/A=B,C=D/R=V": 1, "float64.sum/A=B,C=D/R=V": 2, "int64.exact/A=B,C=D/R=V": 3, @@ -581,8 +583,127 @@ func TestSyncInAsync(t *testing.T) { sdk.Collect(ctx) - expectOutput(t, processor, map[string]float64{ + expectAndResetOutput(t, processor, map[string]float64{ "counter.sum//R=V": 100, "observer.lastvalue//R=V": 10, }) } + +func TestEnricher(t *testing.T) { + enrich := func(context.Context, []label.KeyValue) ([]label.KeyValue, error) { + return nil, nil + } + + testHandler.Reset() + processor := &correctnessProcessor{ + t: t, + testSelector: &testSelector{selector: processortest.AggregatorSelector()}, + } + accum := metricsdk.NewAccumulator( + processor, + testResource, + func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) { + return enrich(ctx, kvs) + }, + ) + + meter := metric.WrapMeterImpl(accum, "test") + + bg := context.Background() + ctx := baggage.ContextWithValues( + bg, + label.String("Corr1", "Val1"), + label.String("Corr2", "Val2"), + ) + + counter := Must(meter).NewInt64Counter("name.sum") + recorder := Must(meter).NewFloat64ValueRecorder("name.lastvalue") + + counter.Add(ctx, 1) + recorder.Record(ctx, 10, label.String("E", "F")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum//R=V": 1, + "name.lastvalue/E=F/R=V": 10, + }) + + // This enriches with all baggage keys + enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) { + baggage.ForEach(ctx, func(kv label.KeyValue) bool { + input = append(input, kv) + return true + }) + return input, nil + } + + counter.Add(ctx, 1) + recorder.Record(ctx, 10, label.String("E", "F")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum/Corr1=Val1,Corr2=Val2/R=V": 1, + "name.lastvalue/Corr1=Val1,Corr2=Val2,E=F/R=V": 10, + }) + + // This enriches by erasing all labels + enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) { + return []label.KeyValue{}, nil + } + + counter.Add(ctx, 1, label.String("Y", "Z")) + recorder.Record(ctx, 10, label.String("E", "F")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum//R=V": 1, + "name.lastvalue//R=V": 10, + }) + + // This enriches by including the first input and all baggage labels. + enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) { + var output []label.KeyValue + if len(input) > 0 { + output = append(output, input[0]) + } + baggage.ForEach(ctx, func(kv label.KeyValue) bool { + output = append(output, kv) + return true + }) + return output, nil + } + + counter.Add(ctx, 1, label.String("Y", "Z"), label.String("X", "Y")) + recorder.Record(ctx, 10, label.String("E", "F"), label.String("G", "H")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum/Corr1=Val1,Corr2=Val2,Y=Z/R=V": 1, + "name.lastvalue/Corr1=Val1,Corr2=Val2,E=F/R=V": 10, + }) + + // This enriches by APPENDING a duplicate label. + enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) { + return append(input, label.String("Extra", "Baggage")), nil + } + + counter.Add(ctx, 1, label.String("Extra", "Call-site")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum/Extra=Baggage/R=V": 1, + }) + + // This enriches by APPENDING a duplicate label. + enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) { + return append([]label.KeyValue{label.String("Extra", "Baggage")}, input...), nil + } + + counter.Add(ctx, 1, label.String("Extra", "Call-site")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum/Extra=Call-site/R=V": 1, + }) + +} From 2194730986380bcb4247c8254f2794c63ccb90b5 Mon Sep 17 00:00:00 2001 From: Josh MacDonald Date: Wed, 23 Dec 2020 00:54:35 -0800 Subject: [PATCH 07/13] Update PR number --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7062c744a0..8a3e46ebac7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add the `ReadOnlySpan` and `ReadWriteSpan` interfaces to provide better control for accessing span data. (#1360) - `NewGRPCDriver` function returns a `ProtocolDriver` that maintains a single gRPC connection to the collector. (#1369) +- Metric SDK adds `Enricher` API for applying baggage attributes as metric labels in request context. (#1421) ### Changed From 38738f2982e8542d1d6b82653e561b948e4d66a4 Mon Sep 17 00:00:00 2001 From: Josh MacDonald Date: Wed, 23 Dec 2020 00:59:39 -0800 Subject: [PATCH 08/13] Remove one sentence --- sdk/export/metric/metric.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 010fb742712..528e71f84c4 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -137,9 +137,6 @@ type Checkpointer interface { // // This SHOULD NOT modify the input label slice. // -// This is permitted to add or remove labels and are applied -// to all synchronous instrument events. -// // Note: This interface does not include the *metric.Descriptor // because it creates significant complexity and/or cost to enrich // RecordBatch() events. From a219865a6361d77e54654d3b5f8fefa8cbc1d33f Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 20 Jan 2021 15:18:01 +1100 Subject: [PATCH 09/13] Add enricher option to the controller --- sdk/metric/controller/basic/config.go | 12 ++++++++++++ sdk/metric/controller/basic/controller.go | 1 + 2 files changed, 13 insertions(+) diff --git a/sdk/metric/controller/basic/config.go b/sdk/metric/controller/basic/config.go index 4ccd3a68c5f..a665ce58683 100644 --- a/sdk/metric/controller/basic/config.go +++ b/sdk/metric/controller/basic/config.go @@ -27,6 +27,8 @@ type Config struct { // created by the Controller. Resource *resource.Resource + Enricher export.Enricher + // CollectPeriod is the interval between calls to Collect a // checkpoint. // @@ -75,6 +77,16 @@ func (o resourceOption) Apply(config *Config) { config.Resource = o.Resource } +func WithEnricher(e export.Enricher) Option { + return enricherOption(e) +} + +type enricherOption export.Enricher + +func (e enricherOption) Apply(config *Config) { + config.Enricher = export.Enricher(e) +} + // WithCollectPeriod sets the CollectPeriod configuration option of a Config. func WithCollectPeriod(period time.Duration) Option { return collectPeriodOption(period) diff --git a/sdk/metric/controller/basic/controller.go b/sdk/metric/controller/basic/controller.go index 5227d9ba40e..d8416ef7dd3 100644 --- a/sdk/metric/controller/basic/controller.go +++ b/sdk/metric/controller/basic/controller.go @@ -89,6 +89,7 @@ func New(checkpointer export.Checkpointer, opts ...Option) *Controller { impl := sdk.NewAccumulator( checkpointer, c.Resource, + c.Enricher, ) return &Controller{ provider: registry.NewMeterProvider(impl), From 8fcd759c8bc807dacb04ea786eec138a60b83443 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Thu, 21 Jan 2021 11:35:20 +1100 Subject: [PATCH 10/13] add comments --- sdk/metric/controller/basic/config.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/metric/controller/basic/config.go b/sdk/metric/controller/basic/config.go index a665ce58683..e735f906604 100644 --- a/sdk/metric/controller/basic/config.go +++ b/sdk/metric/controller/basic/config.go @@ -27,6 +27,8 @@ type Config struct { // created by the Controller. Resource *resource.Resource + // Enricher is an optional function that can be provided to apply baggage attributes + // as metric labels. Enricher export.Enricher // CollectPeriod is the interval between calls to Collect a @@ -77,6 +79,7 @@ func (o resourceOption) Apply(config *Config) { config.Resource = o.Resource } +// WithEnricher sets the Enricher configuration option of a Config func WithEnricher(e export.Enricher) Option { return enricherOption(e) } From f7d21d3a450feac937d9946d4efc6167160afe30 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Thu, 21 Jan 2021 11:54:53 +1100 Subject: [PATCH 11/13] add a pusher test for enricher --- sdk/metric/controller/basic/push_test.go | 39 ++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/sdk/metric/controller/basic/push_test.go b/sdk/metric/controller/basic/push_test.go index 4add5f3f523..c0bc0404a92 100644 --- a/sdk/metric/controller/basic/push_test.go +++ b/sdk/metric/controller/basic/push_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" export "go.opentelemetry.io/otel/sdk/export/metric" @@ -228,3 +229,41 @@ func TestPushExportError(t *testing.T) { }) } } + +func TestEnricher(t *testing.T) { + exporter := newExporter() + checkpointer := newCheckpointer() + enricher := func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) { + baggage := baggage.Set(ctx) + kvs = append(baggage.ToSlice(), kvs...) + return kvs, nil + } + + p := controller.New( + checkpointer, + controller.WithPusher(exporter), + controller.WithCollectPeriod(time.Second), + controller.WithEnricher(enricher), + ) + + meter := p.MeterProvider().Meter("name") + + mock := controllertest.NewMockClock() + p.SetClock(mock) + + counter := metric.Must(meter).NewInt64Counter("counter.sum") + + p.Start(context.Background()) + + ctx := baggage.ContextWithValues(context.Background(), label.String("A", "B")) + counter.Add(ctx, 1) + + require.EqualValues(t, map[string]float64{}, exporter.Values()) + + mock.Add(time.Second) + runtime.Gosched() + + require.EqualValues(t, map[string]float64{ + "counter.sum/A=B/": 1, + }, exporter.Values()) +} From 1a182f6369752baabbdaafa68d3051e4d6215de3 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Thu, 21 Jan 2021 14:43:33 +1100 Subject: [PATCH 12/13] update PR number in changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ba9c714488..db65a092abe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added -- Metric SDK adds `Enricher` API for applying baggage attributes as metric labels in request context. (#XXX) //TODO: add PR number +- Metric SDK adds `Enricher` API for applying baggage attributes as metric labels in request context. (#1480) ## [0.16.0] - 2020-01-13 From 31686a00d30dd5db00f69505ddc3d1fdb79156d6 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Thu, 21 Jan 2021 14:54:11 +1100 Subject: [PATCH 13/13] check err for function call --- sdk/metric/controller/basic/push_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/metric/controller/basic/push_test.go b/sdk/metric/controller/basic/push_test.go index c0bc0404a92..29cf562e2ef 100644 --- a/sdk/metric/controller/basic/push_test.go +++ b/sdk/metric/controller/basic/push_test.go @@ -253,7 +253,8 @@ func TestEnricher(t *testing.T) { counter := metric.Must(meter).NewInt64Counter("counter.sum") - p.Start(context.Background()) + err := p.Start(context.Background()) + require.NoError(t, err) ctx := baggage.ContextWithValues(context.Background(), label.String("A", "B")) counter.Add(ctx, 1)