Skip to content

Commit

Permalink
Add a dimensionality-reducing metric Processor (open-telemetry#1057)
Browse files Browse the repository at this point in the history
* Add a dimensionality-reducing metric processor

* Precommit

* Add package docs

* Remove dead code
  • Loading branch information
jmacd authored and evantorrie committed Sep 10, 2020
1 parent 9a5c9cb commit 8d84153
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 0 deletions.
54 changes: 54 additions & 0 deletions sdk/metric/processor/reducer/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package reducer implements a metrics Processor component that applies
a `label.Filter` to each processed `export.Accumulation` to remove
labels before passing the result to another Processor. This Processor
can be used to reduce inherent dimensionality in the data, as a way to
control the cost of collecting high cardinality metric data.
For example, to compose a push controller with a reducer and a basic
metric processor:
type someFilter struct{
// configuration for this filter
// ...
}
func (someFilter) LabelFilterFor(_ *metric.Descriptor) label.Filter {
return func(label kv.KeyValue) bool {
// return true to keep this label, false to drop this label
// ...
}
}
func setupMetrics(exporter export.Exporter) (stop func()) {
basicProcessor := basic.New(
simple.NewWithExactDistribution(),
exporter,
)
reducerProcessor := reducer.New(someFilter{...}, basicProcessor)
pusher := push.New(
reducerProcessor,
exporter,
pushOpts...,
)
pusher.Start()
global.SetMeterProvider(pusher.Provider())
return pusher.Stop
*/
package reducer // import "go.opentelemetry.io/otel/sdk/metric/processor/reducer"
67 changes: 67 additions & 0 deletions sdk/metric/processor/reducer/reducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reducer // import "go.opentelemetry.io/otel/sdk/metric/processor/reducer"

import (
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
)

type (
// Processor implements "dimensionality reduction" by
// filtering keys from export label sets.
Processor struct {
export.Checkpointer
filterSelector LabelFilterSelector
}

// LabelFilterSelector is the interface used to configure a
// specific Filter to an instrument.
LabelFilterSelector interface {
LabelFilterFor(*metric.Descriptor) label.Filter
}
)

var _ export.Processor = &Processor{}
var _ export.Checkpointer = &Processor{}

// New returns a dimensionality-reducing Processor that passes data to
// the next stage in an export pipeline.
func New(filterSelector LabelFilterSelector, ckpter export.Checkpointer) *Processor {
return &Processor{
Checkpointer: ckpter,
filterSelector: filterSelector,
}
}

// Process implements export.Processor.
func (p *Processor) Process(accum export.Accumulation) error {
// Note: the removed labels are returned and ignored here.
// Conceivably these inputs could be useful to a sampler.
reduced, _ := accum.Labels().Filter(
p.filterSelector.LabelFilterFor(
accum.Descriptor(),
),
)
return p.Checkpointer.Process(
export.NewAccumulation(
accum.Descriptor(),
&reduced,
accum.Resource(),
accum.Aggregator(),
),
)
}
118 changes: 118 additions & 0 deletions sdk/metric/processor/reducer/reducer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reducer_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest"
"go.opentelemetry.io/otel/sdk/metric/processor/reducer"
"go.opentelemetry.io/otel/sdk/resource"
)

var (
kvs1 = []kv.KeyValue{
kv.Int("A", 1),
kv.Int("B", 2),
kv.Int("C", 3),
}
kvs2 = []kv.KeyValue{
kv.Int("A", 1),
kv.Int("B", 0),
kv.Int("C", 3),
}
)

type testFilter struct{}

func (testFilter) LabelFilterFor(_ *metric.Descriptor) label.Filter {
return func(label kv.KeyValue) bool {
return label.Key == "A" || label.Key == "C"
}
}

func generateData(impl metric.MeterImpl) {
ctx := context.Background()
meter := metric.WrapMeterImpl(impl, "testing")

counter := metric.Must(meter).NewFloat64Counter("counter.sum")

_ = metric.Must(meter).NewInt64SumObserver("observer.sum",
func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(10, kvs1...)
result.Observe(10, kvs2...)
},
)

counter.Add(ctx, 100, kvs1...)
counter.Add(ctx, 100, kvs2...)
}

func TestFilterProcessor(t *testing.T) {
testProc := processorTest.NewProcessor(
processorTest.AggregatorSelector(),
label.DefaultEncoder(),
)
accum := metricsdk.NewAccumulator(
reducer.New(testFilter{}, processorTest.Checkpointer(testProc)),
metricsdk.WithResource(
resource.New(kv.String("R", "V")),
),
)
generateData(accum)

accum.Collect(context.Background())

require.EqualValues(t, map[string]float64{
"counter.sum/A=1,C=3/R=V": 200,
"observer.sum/A=1,C=3/R=V": 20,
}, testProc.Values())
}

// Test a filter with the ../basic Processor.
func TestFilterBasicProcessor(t *testing.T) {
basicProc := basic.New(processorTest.AggregatorSelector(), export.CumulativeExporter)
accum := metricsdk.NewAccumulator(
reducer.New(testFilter{}, basicProc),
metricsdk.WithResource(
resource.New(kv.String("R", "V")),
),
)
exporter := processorTest.NewExporter(basicProc, label.DefaultEncoder())

generateData(accum)

basicProc.StartCollection()
accum.Collect(context.Background())
if err := basicProc.FinishCollection(); err != nil {
t.Error(err)
}

require.NoError(t, exporter.Export(context.Background(), basicProc.CheckpointSet()))

require.EqualValues(t, map[string]float64{
"counter.sum/A=1,C=3/R=V": 200,
"observer.sum/A=1,C=3/R=V": 20,
}, exporter.Values())
}

0 comments on commit 8d84153

Please sign in to comment.