Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric label enrich #1480

Closed
wants to merge 15 commits into from
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Metric SDK adds `Enricher` API for applying baggage attributes as metric labels in request context. (#1480)

## [0.16.0] - 2020-01-13

### Added
Expand Down
6 changes: 6 additions & 0 deletions baggage/baggage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func Set(ctx context.Context) label.Set {
return label.NewSet(values...)
}

// ForEach allows visiting the baggage values in a context without
// copying a slice.
func ForEach(ctx context.Context, f func(kv label.KeyValue) bool) {
baggage.MapFromContext(ctx).Foreach(f)
}

// Value returns the value related to key in the baggage of ctx. If no
// value is set, the returned label.Value will be an uninitialized zero-value
// with type INVALID.
Expand Down
21 changes: 21 additions & 0 deletions baggage/baggage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/internal/baggage"
"go.opentelemetry.io/otel/label"
)
Expand Down Expand Up @@ -84,3 +86,22 @@ func TestBaggage(t *testing.T) {
t.Fatal("WithoutBaggage failed to clear baggage")
}
}

func TestForEach(t *testing.T) {
ctx := ContextWithValues(
context.Background(),
label.String("A", "B"),
label.String("C", "D"),
)

out := map[string]string{}
ForEach(ctx, func(kv label.KeyValue) bool {
out[string(kv.Key)] = kv.Value.Emit()
return true
})

require.EqualValues(t, map[string]string{
"A": "B",
"C": "D",
}, out)
}
25 changes: 25 additions & 0 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,31 @@ type Checkpointer interface {
FinishCollection() error
}

// Enricher supports extracting baggage attributes and applying them
// as metric event labels and also permits erasing metric labels at
// runtime. When configured with an Accumulator, the Enricher is
// applied to all synchronous instruments.
//
// This receives the context and the event labels and
// returns the effective KeyValue slice. If this returns a
// nil KeyValue slice or a non-nil error, the caller SHOULD
// use the original KeyValue slice.
//
// This SHOULD NOT modify the input label slice.
//
// Note: This interface does not include the *metric.Descriptor
// because it creates significant complexity and/or cost to enrich
// RecordBatch() events.
//
// Note: This interface is called with input labels before they are
// sorted and de-duplicated as described in
// label.NewSetWithSortableFiltered(). The enricher has control over
// whether label values should override the input making use of the
// last-value semantic detailed for metric event labels in general.
// Appending baggage values means they override the call-site labels,
// prepending baggage means call-site labels override baggage labels.
type Enricher func(context.Context, []label.KeyValue) ([]label.KeyValue, error)

// Aggregator implements a specific aggregation behavior, e.g., a
// behavior to track a sequence of updates to an instrument. Sum-only
// instruments commonly use a simple Sum aggregator, but for the
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newFixture(b *testing.B) *benchFixture {
AggregatorSelector: processortest.AggregatorSelector(),
}

bf.accumulator = sdk.NewAccumulator(bf, nil)
bf.accumulator = sdk.NewAccumulator(bf, nil, nil)
bf.meter = metric.WrapMeterImpl(bf.accumulator, "benchmarks")
return bf
}
Expand Down
15 changes: 15 additions & 0 deletions sdk/metric/controller/basic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type Config struct {
// created by the Controller.
Resource *resource.Resource

// Enricher is an optional function that can be provided to apply baggage attributes
// as metric labels.
Enricher export.Enricher

// CollectPeriod is the interval between calls to Collect a
// checkpoint.
//
Expand Down Expand Up @@ -75,6 +79,17 @@ func (o resourceOption) Apply(config *Config) {
config.Resource = o.Resource
}

// WithEnricher sets the Enricher configuration option of a Config
func WithEnricher(e export.Enricher) Option {
return enricherOption(e)
}

type enricherOption export.Enricher

func (e enricherOption) Apply(config *Config) {
config.Enricher = export.Enricher(e)
}

// WithCollectPeriod sets the CollectPeriod configuration option of a Config.
func WithCollectPeriod(period time.Duration) Option {
return collectPeriodOption(period)
Expand Down
1 change: 1 addition & 0 deletions sdk/metric/controller/basic/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func New(checkpointer export.Checkpointer, opts ...Option) *Controller {
impl := sdk.NewAccumulator(
checkpointer,
c.Resource,
c.Enricher,
)
return &Controller{
provider: registry.NewMeterProvider(impl),
Expand Down
40 changes: 40 additions & 0 deletions sdk/metric/controller/basic/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
Expand Down Expand Up @@ -228,3 +229,42 @@ func TestPushExportError(t *testing.T) {
})
}
}

func TestEnricher(t *testing.T) {
exporter := newExporter()
checkpointer := newCheckpointer()
enricher := func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) {
baggage := baggage.Set(ctx)
kvs = append(baggage.ToSlice(), kvs...)
return kvs, nil
}

p := controller.New(
checkpointer,
controller.WithPusher(exporter),
controller.WithCollectPeriod(time.Second),
controller.WithEnricher(enricher),
)

meter := p.MeterProvider().Meter("name")

mock := controllertest.NewMockClock()
p.SetClock(mock)

counter := metric.Must(meter).NewInt64Counter("counter.sum")

err := p.Start(context.Background())
require.NoError(t, err)

ctx := baggage.ContextWithValues(context.Background(), label.String("A", "B"))
counter.Add(ctx, 1)

require.EqualValues(t, map[string]float64{}, exporter.Values())

mock.Add(time.Second)
runtime.Gosched()

require.EqualValues(t, map[string]float64{
"counter.sum/A=B/": 1,
}, exporter.Values())
}
Loading