Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric SDK label Enricher interface #1421

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `NewGRPCDriver` function returns a `ProtocolDriver` that maintains a single gRPC connection to the collector. (#1369)
- Documentation about the project's versioning policy. (#1388)
- `NewSplitDriver` for OTLP exporter that allows sending traces and metrics to different endpoints. (#1418)
- Metric SDK adds `Enricher` API for applying baggage attributes as metric labels in request context. (#1421)

### Changed

Expand Down
6 changes: 6 additions & 0 deletions baggage/baggage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func Set(ctx context.Context) label.Set {
return label.NewSet(values...)
}

// ForEach allows visiting the baggage values in a context without
// copying a slice.
func ForEach(ctx context.Context, f func(kv label.KeyValue) bool) {
baggage.MapFromContext(ctx).Foreach(f)
}

// Value returns the value related to key in the baggage of ctx. If no
// value is set, the returned label.Value will be an uninitialized zero-value
// with type INVALID.
Expand Down
21 changes: 21 additions & 0 deletions baggage/baggage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/internal/baggage"
"go.opentelemetry.io/otel/label"
)
Expand Down Expand Up @@ -84,3 +86,22 @@ func TestBaggage(t *testing.T) {
t.Fatal("WithoutBaggage failed to clear baggage")
}
}

func TestForEach(t *testing.T) {
ctx := ContextWithValues(
context.Background(),
label.String("A", "B"),
label.String("C", "D"),
)

out := map[string]string{}
ForEach(ctx, func(kv label.KeyValue) bool {
out[string(kv.Key)] = kv.Value.Emit()
return true
})

require.EqualValues(t, map[string]string{
"A": "B",
"C": "D",
}, out)
}
25 changes: 25 additions & 0 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,31 @@ type Checkpointer interface {
FinishCollection() error
}

// Enricher supports extracting baggage attributes and applying them
// as metric event labels and also permits erasing metric labels at
// runtime. When configured with an Accumulator, the Enricher is
// applied to all synchronous instruments.
//
// This receives the context and the event labels and
// returns the effective KeyValue slice. If this returns a
// nil KeyValue slice or a non-nil error, the caller SHOULD
// use the original KeyValue slice.
//
// This SHOULD NOT modify the input label slice.
//
// Note: This interface does not include the *metric.Descriptor
// because it creates significant complexity and/or cost to enrich
// RecordBatch() events.
//
// Note: This interface is called with input labels before they are
// sorted and de-duplicated as described in
// label.NewSetWithSortableFiltered(). The enricher has control over
// whether label values should override the input making use of the
// last-value semantic detailed for metric event labels in general.
// Appending baggage values means they override the call-site labels,
// prepending baggage means call-site labels override baggage labels.
type Enricher func(context.Context, []label.KeyValue) ([]label.KeyValue, error)

// Aggregator implements a specific aggregation behavior, e.g., a
// behavior to track a sequence of updates to an instrument. Sum-only
// instruments commonly use a simple Sum aggregator, but for the
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newFixture(b *testing.B) *benchFixture {
AggregatorSelector: processortest.AggregatorSelector(),
}

bf.accumulator = sdk.NewAccumulator(bf, nil)
bf.accumulator = sdk.NewAccumulator(bf, nil, nil)
bf.meter = metric.WrapMeterImpl(bf.accumulator, "benchmarks")
return bf
}
Expand Down
1 change: 1 addition & 0 deletions sdk/metric/controller/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func New(checkpointer export.Checkpointer, options ...Option) *Controller {
accum := sdk.NewAccumulator(
checkpointer,
config.Resource,
nil,
)
return &Controller{
accumulator: accum,
Expand Down
1 change: 1 addition & 0 deletions sdk/metric/controller/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Opt
impl := sdk.NewAccumulator(
checkpointer,
c.Resource,
nil,
)
return &Controller{
provider: registry.NewMeterProvider(impl),
Expand Down
163 changes: 139 additions & 24 deletions sdk/metric/correct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
Expand Down Expand Up @@ -98,11 +99,21 @@ func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessPro
accum := metricsdk.NewAccumulator(
processor,
testResource,
nil,
)
meter := metric.WrapMeterImpl(accum, "test")
return meter, accum, processor
}

func expectAndResetOutput(t *testing.T, processor *correctnessProcessor, expect map[string]float64) {
out := processortest.NewOutput(label.DefaultEncoder())
for _, rec := range processor.accumulations {
require.NoError(t, out.AddAccumulation(rec))
}
require.EqualValues(t, expect, out.Map())
processor.accumulations = nil
}

func (ci *correctnessProcessor) Process(accumulation export.Accumulation) error {
ci.accumulations = append(ci.accumulations, accumulation)
return nil
Expand Down Expand Up @@ -351,12 +362,9 @@ func TestObserverCollection(t *testing.T) {
collected := sdk.Collect(ctx)
require.Equal(t, collected, len(processor.accumulations))

out := processortest.NewOutput(label.DefaultEncoder())
for _, rec := range processor.accumulations {
require.NoError(t, out.AddAccumulation(rec))
}
mult := float64(mult)
require.EqualValues(t, map[string]float64{

expectAndResetOutput(t, processor, map[string]float64{
"float.valueobserver.lastvalue/A=B/R=V": -mult,
"float.valueobserver.lastvalue/C=D/R=V": -mult,
"int.valueobserver.lastvalue//R=V": mult,
Expand All @@ -371,7 +379,7 @@ func TestObserverCollection(t *testing.T) {
"float.updownsumobserver.sum/C=D/R=V": mult,
"int.updownsumobserver.sum//R=V": -mult,
"int.updownsumobserver.sum/A=B/R=V": mult,
}, out.Map())
})
}
}

Expand Down Expand Up @@ -456,11 +464,7 @@ func TestObserverBatch(t *testing.T) {

require.Equal(t, collected, len(processor.accumulations))

out := processortest.NewOutput(label.DefaultEncoder())
for _, rec := range processor.accumulations {
require.NoError(t, out.AddAccumulation(rec))
}
require.EqualValues(t, map[string]float64{
expectAndResetOutput(t, processor, map[string]float64{
"float.sumobserver.sum//R=V": 1.1,
"float.sumobserver.sum/A=B/R=V": 1000,
"int.sumobserver.sum//R=V": 10,
Expand All @@ -475,7 +479,7 @@ func TestObserverBatch(t *testing.T) {
"float.valueobserver.lastvalue/C=D/R=V": -1,
"int.valueobserver.lastvalue//R=V": 1,
"int.valueobserver.lastvalue/A=B/R=V": 1,
}, out.Map())
})
}

func TestRecordBatch(t *testing.T) {
Expand All @@ -501,16 +505,12 @@ func TestRecordBatch(t *testing.T) {

sdk.Collect(ctx)

out := processortest.NewOutput(label.DefaultEncoder())
for _, rec := range processor.accumulations {
require.NoError(t, out.AddAccumulation(rec))
}
require.EqualValues(t, map[string]float64{
expectAndResetOutput(t, processor, map[string]float64{
"int64.sum/A=B,C=D/R=V": 1,
"float64.sum/A=B,C=D/R=V": 2,
"int64.exact/A=B,C=D/R=V": 3,
"float64.exact/A=B,C=D/R=V": 4,
}, out.Map())
})
}

// TestRecordPersistence ensures that a direct-called instrument that
Expand Down Expand Up @@ -583,12 +583,127 @@ func TestSyncInAsync(t *testing.T) {

sdk.Collect(ctx)

out := processortest.NewOutput(label.DefaultEncoder())
for _, rec := range processor.accumulations {
require.NoError(t, out.AddAccumulation(rec))
}
require.EqualValues(t, map[string]float64{
expectAndResetOutput(t, processor, map[string]float64{
"counter.sum//R=V": 100,
"observer.lastvalue//R=V": 10,
}, out.Map())
})
}

func TestEnricher(t *testing.T) {
enrich := func(context.Context, []label.KeyValue) ([]label.KeyValue, error) {
return nil, nil
}

testHandler.Reset()
processor := &correctnessProcessor{
t: t,
testSelector: &testSelector{selector: processortest.AggregatorSelector()},
}
accum := metricsdk.NewAccumulator(
processor,
testResource,
func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) {
return enrich(ctx, kvs)
},
)

meter := metric.WrapMeterImpl(accum, "test")

bg := context.Background()
ctx := baggage.ContextWithValues(
bg,
label.String("Corr1", "Val1"),
label.String("Corr2", "Val2"),
)

counter := Must(meter).NewInt64Counter("name.sum")
recorder := Must(meter).NewFloat64ValueRecorder("name.lastvalue")

counter.Add(ctx, 1)
recorder.Record(ctx, 10, label.String("E", "F"))

_ = accum.Collect(bg)
expectAndResetOutput(t, processor, map[string]float64{
"name.sum//R=V": 1,
"name.lastvalue/E=F/R=V": 10,
})

// This enriches with all baggage keys
enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) {
baggage.ForEach(ctx, func(kv label.KeyValue) bool {
input = append(input, kv)
return true
})
return input, nil
}

counter.Add(ctx, 1)
recorder.Record(ctx, 10, label.String("E", "F"))

_ = accum.Collect(bg)
expectAndResetOutput(t, processor, map[string]float64{
"name.sum/Corr1=Val1,Corr2=Val2/R=V": 1,
"name.lastvalue/Corr1=Val1,Corr2=Val2,E=F/R=V": 10,
})

// This enriches by erasing all labels
enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) {
return []label.KeyValue{}, nil
}

counter.Add(ctx, 1, label.String("Y", "Z"))
recorder.Record(ctx, 10, label.String("E", "F"))

_ = accum.Collect(bg)
expectAndResetOutput(t, processor, map[string]float64{
"name.sum//R=V": 1,
"name.lastvalue//R=V": 10,
})

// This enriches by including the first input and all baggage labels.
enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) {
var output []label.KeyValue
if len(input) > 0 {
output = append(output, input[0])
}
baggage.ForEach(ctx, func(kv label.KeyValue) bool {
output = append(output, kv)
return true
})
return output, nil
}

counter.Add(ctx, 1, label.String("Y", "Z"), label.String("X", "Y"))
recorder.Record(ctx, 10, label.String("E", "F"), label.String("G", "H"))

_ = accum.Collect(bg)
expectAndResetOutput(t, processor, map[string]float64{
"name.sum/Corr1=Val1,Corr2=Val2,Y=Z/R=V": 1,
"name.lastvalue/Corr1=Val1,Corr2=Val2,E=F/R=V": 10,
})

// This enriches by APPENDING a duplicate label.
enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) {
return append(input, label.String("Extra", "Baggage")), nil
}

counter.Add(ctx, 1, label.String("Extra", "Call-site"))

_ = accum.Collect(bg)
expectAndResetOutput(t, processor, map[string]float64{
"name.sum/Extra=Baggage/R=V": 1,
})

// This enriches by APPENDING a duplicate label.
enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) {
return append([]label.KeyValue{label.String("Extra", "Baggage")}, input...), nil
}

counter.Add(ctx, 1, label.String("Extra", "Call-site"))

_ = accum.Collect(bg)
expectAndResetOutput(t, processor, map[string]float64{
"name.sum/Extra=Call-site/R=V": 1,
})

}
2 changes: 1 addition & 1 deletion sdk/metric/processor/basic/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func TestSumObserverEndToEnd(t *testing.T) {
processorTest.AggregatorSelector(),
eselector,
)
accum := sdk.NewAccumulator(proc, resource.Empty())
accum := sdk.NewAccumulator(proc, resource.Empty(), nil)
meter := metric.WrapMeterImpl(accum, "testing")

var calls int64
Expand Down
1 change: 1 addition & 0 deletions sdk/metric/processor/processortest/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func generateTestData(proc export.Processor) {
accum := metricsdk.NewAccumulator(
proc,
resource.NewWithAttributes(label.String("R", "V")),
nil,
)
meter := metric.WrapMeterImpl(accum, "testing")

Expand Down
2 changes: 2 additions & 0 deletions sdk/metric/processor/reducer/reducer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestFilterProcessor(t *testing.T) {
accum := metricsdk.NewAccumulator(
reducer.New(testFilter{}, processorTest.Checkpointer(testProc)),
resource.NewWithAttributes(label.String("R", "V")),
nil,
)
generateData(accum)

Expand All @@ -93,6 +94,7 @@ func TestFilterBasicProcessor(t *testing.T) {
accum := metricsdk.NewAccumulator(
reducer.New(testFilter{}, basicProc),
resource.NewWithAttributes(label.String("R", "V")),
nil,
)
exporter := processorTest.NewExporter(basicProc, label.DefaultEncoder())

Expand Down
Loading