From 8d841539852a813ee69d09bc4da68d4f5efe2859 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 13 Aug 2020 15:47:17 -0700 Subject: [PATCH] Add a dimensionality-reducing metric Processor (#1057) * Add a dimensionality-reducing metric processor * Precommit * Add package docs * Remove dead code --- sdk/metric/processor/reducer/doc.go | 54 +++++++++ sdk/metric/processor/reducer/reducer.go | 67 +++++++++++ sdk/metric/processor/reducer/reducer_test.go | 118 +++++++++++++++++++ 3 files changed, 239 insertions(+) create mode 100644 sdk/metric/processor/reducer/doc.go create mode 100644 sdk/metric/processor/reducer/reducer.go create mode 100644 sdk/metric/processor/reducer/reducer_test.go diff --git a/sdk/metric/processor/reducer/doc.go b/sdk/metric/processor/reducer/doc.go new file mode 100644 index 000000000000..e7e36b196cf8 --- /dev/null +++ b/sdk/metric/processor/reducer/doc.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package reducer implements a metrics Processor component that applies +a `label.Filter` to each processed `export.Accumulation` to remove +labels before passing the result to another Processor. This Processor +can be used to reduce inherent dimensionality in the data, as a way to +control the cost of collecting high cardinality metric data. + +For example, to compose a push controller with a reducer and a basic +metric processor: + +type someFilter struct{ + // configuration for this filter + // ... +} + +func (someFilter) LabelFilterFor(_ *metric.Descriptor) label.Filter { + return func(label kv.KeyValue) bool { + // return true to keep this label, false to drop this label + // ... + } +} + +func setupMetrics(exporter export.Exporter) (stop func()) { + basicProcessor := basic.New( + simple.NewWithExactDistribution(), + exporter, + ) + + reducerProcessor := reducer.New(someFilter{...}, basicProcessor) + + pusher := push.New( + reducerProcessor, + exporter, + pushOpts..., + ) + pusher.Start() + global.SetMeterProvider(pusher.Provider()) + return pusher.Stop +*/ +package reducer // import "go.opentelemetry.io/otel/sdk/metric/processor/reducer" diff --git a/sdk/metric/processor/reducer/reducer.go b/sdk/metric/processor/reducer/reducer.go new file mode 100644 index 000000000000..0fde654d2f4d --- /dev/null +++ b/sdk/metric/processor/reducer/reducer.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reducer // import "go.opentelemetry.io/otel/sdk/metric/processor/reducer" + +import ( + "go.opentelemetry.io/otel/api/label" + "go.opentelemetry.io/otel/api/metric" + export "go.opentelemetry.io/otel/sdk/export/metric" +) + +type ( + // Processor implements "dimensionality reduction" by + // filtering keys from export label sets. + Processor struct { + export.Checkpointer + filterSelector LabelFilterSelector + } + + // LabelFilterSelector is the interface used to configure a + // specific Filter to an instrument. + LabelFilterSelector interface { + LabelFilterFor(*metric.Descriptor) label.Filter + } +) + +var _ export.Processor = &Processor{} +var _ export.Checkpointer = &Processor{} + +// New returns a dimensionality-reducing Processor that passes data to +// the next stage in an export pipeline. +func New(filterSelector LabelFilterSelector, ckpter export.Checkpointer) *Processor { + return &Processor{ + Checkpointer: ckpter, + filterSelector: filterSelector, + } +} + +// Process implements export.Processor. +func (p *Processor) Process(accum export.Accumulation) error { + // Note: the removed labels are returned and ignored here. + // Conceivably these inputs could be useful to a sampler. + reduced, _ := accum.Labels().Filter( + p.filterSelector.LabelFilterFor( + accum.Descriptor(), + ), + ) + return p.Checkpointer.Process( + export.NewAccumulation( + accum.Descriptor(), + &reduced, + accum.Resource(), + accum.Aggregator(), + ), + ) +} diff --git a/sdk/metric/processor/reducer/reducer_test.go b/sdk/metric/processor/reducer/reducer_test.go new file mode 100644 index 000000000000..ba003fbd313b --- /dev/null +++ b/sdk/metric/processor/reducer/reducer_test.go @@ -0,0 +1,118 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reducer_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/label" + "go.opentelemetry.io/otel/api/metric" + export "go.opentelemetry.io/otel/sdk/export/metric" + metricsdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/processor/basic" + processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest" + "go.opentelemetry.io/otel/sdk/metric/processor/reducer" + "go.opentelemetry.io/otel/sdk/resource" +) + +var ( + kvs1 = []kv.KeyValue{ + kv.Int("A", 1), + kv.Int("B", 2), + kv.Int("C", 3), + } + kvs2 = []kv.KeyValue{ + kv.Int("A", 1), + kv.Int("B", 0), + kv.Int("C", 3), + } +) + +type testFilter struct{} + +func (testFilter) LabelFilterFor(_ *metric.Descriptor) label.Filter { + return func(label kv.KeyValue) bool { + return label.Key == "A" || label.Key == "C" + } +} + +func generateData(impl metric.MeterImpl) { + ctx := context.Background() + meter := metric.WrapMeterImpl(impl, "testing") + + counter := metric.Must(meter).NewFloat64Counter("counter.sum") + + _ = metric.Must(meter).NewInt64SumObserver("observer.sum", + func(_ context.Context, result metric.Int64ObserverResult) { + result.Observe(10, kvs1...) + result.Observe(10, kvs2...) + }, + ) + + counter.Add(ctx, 100, kvs1...) + counter.Add(ctx, 100, kvs2...) +} + +func TestFilterProcessor(t *testing.T) { + testProc := processorTest.NewProcessor( + processorTest.AggregatorSelector(), + label.DefaultEncoder(), + ) + accum := metricsdk.NewAccumulator( + reducer.New(testFilter{}, processorTest.Checkpointer(testProc)), + metricsdk.WithResource( + resource.New(kv.String("R", "V")), + ), + ) + generateData(accum) + + accum.Collect(context.Background()) + + require.EqualValues(t, map[string]float64{ + "counter.sum/A=1,C=3/R=V": 200, + "observer.sum/A=1,C=3/R=V": 20, + }, testProc.Values()) +} + +// Test a filter with the ../basic Processor. +func TestFilterBasicProcessor(t *testing.T) { + basicProc := basic.New(processorTest.AggregatorSelector(), export.CumulativeExporter) + accum := metricsdk.NewAccumulator( + reducer.New(testFilter{}, basicProc), + metricsdk.WithResource( + resource.New(kv.String("R", "V")), + ), + ) + exporter := processorTest.NewExporter(basicProc, label.DefaultEncoder()) + + generateData(accum) + + basicProc.StartCollection() + accum.Collect(context.Background()) + if err := basicProc.FinishCollection(); err != nil { + t.Error(err) + } + + require.NoError(t, exporter.Export(context.Background(), basicProc.CheckpointSet())) + + require.EqualValues(t, map[string]float64{ + "counter.sum/A=1,C=3/R=V": 200, + "observer.sum/A=1,C=3/R=V": 20, + }, exporter.Values()) +}