From 738abfebd5008150315d258650c85799062542f4 Mon Sep 17 00:00:00 2001 From: odubajDT Date: Tue, 30 Jul 2024 08:32:12 +0200 Subject: [PATCH 01/11] [processor/transform] introduce aggregate_on_attribute_value function for metrics Signed-off-by: odubajDT --- .../feat_16224_aggregate_label_value.yaml | 27 + processor/transformprocessor/README.md | 29 + ...unc_agregate_on_attribute_value_metrics.go | 70 +++ ...gregate_on_attribute_value_metrics_test.go | 530 ++++++++++++++++++ .../internal/metrics/functions.go | 1 + .../internal/metrics/functions_test.go | 1 + .../internal/metrics/processor_test.go | 48 +- 7 files changed, 705 insertions(+), 1 deletion(-) create mode 100644 .chloggen/feat_16224_aggregate_label_value.yaml create mode 100644 processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go create mode 100644 processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go diff --git a/.chloggen/feat_16224_aggregate_label_value.yaml b/.chloggen/feat_16224_aggregate_label_value.yaml new file mode 100644 index 000000000000..bb4e6bba0460 --- /dev/null +++ b/.chloggen/feat_16224_aggregate_label_value.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Support aggregating metrics based on their attribute values and substituing the values with a new value." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [16224] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 8189ef104896..f0f473c46ad0 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -220,6 +220,7 @@ In addition to OTTL functions, the processor defines its own functions to help w - [copy_metric](#copy_metric) - [scale_metric](#scale_metric) - [aggregate_on_attributes](#aggregate_on_attributes) +- [aggregate_on_attribute_value](#aggregate_on_attribute_value) ### convert_sum_to_gauge @@ -415,6 +416,34 @@ statements: To aggregate only using a specified set of attributes, you can use `keep_matching_keys`. +### aggregate_on_attribute_value + +`aggregate_on_attribute_value(function, label, valueSet, newValue)` + +The `aggregate_on_attribute_value` function aggreates all metrics having the label `label` set with one of the values present in `valueSet` using the aggregation function specified in `function`. Additionally it substitutes the values of `label` present in `valueSet` with a new value specified by `newValue`. + +The function supports the following data types: + +- sum +- gauge +- histogram +- exponential histogram + +Supported aggregation functions are: + +- sum +- max +- min +- mean +- median +- count + +**NOTE:** Only the `sum` agregation function is supported for histogram and exponential histogram datatypes. + +Examples: + +- `aggregate_on_attribute_value(sum, attr1, [val1, val2], new_val) where name == "system.memory.usage` + ## Examples ### Perform transformation if field does not exist diff --git a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go new file mode 100644 index 000000000000..58d4777eb3c2 --- /dev/null +++ b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" +) + +type aggregateOnAttributeValueArguments struct { + Type string + Attribute string + Values []string + NewValue string +} + +func newAggregateOnAttributeValueFactory() ottl.Factory[ottlmetric.TransformContext] { + return ottl.NewFactory("aggregate_on_attribute_value", &aggregateOnAttributeValueArguments{}, createAggregateOnAttributeValueFunction) +} + +func createAggregateOnAttributeValueFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) { + args, ok := oArgs.(*aggregateOnAttributeValueArguments) + + if !ok { + return nil, fmt.Errorf("AggregateOnAttributeValueFactory args must be of type *AggregateOnAttributeValueArguments") + } + + t, err := aggregateutil.ConvertToAggregationFunction(args.Type) + if err != nil { + return nil, fmt.Errorf("aggregation function invalid: %s", err.Error()) + } + + return AggregateOnAttributeValue(t, args.Attribute, args.Values, args.NewValue) +} + +func AggregateOnAttributeValue(aggregationType aggregateutil.AggregationType, attribute string, values []string, newValue string) (ottl.ExprFunc[ottlmetric.TransformContext], error) { + return func(_ context.Context, tCtx ottlmetric.TransformContext) (any, error) { + metric := tCtx.GetMetric() + + aggregateutil.RangeDataPointAttributes(metric, func(attrs pcommon.Map) bool { + val, ok := attrs.Get(attribute) + if !ok { + return true + } + + for _, v := range values { + if val.Str() == v { + val.SetStr(newValue) + } + } + return true + }) + ag := aggregateutil.AggGroups{} + newMetric := pmetric.NewMetric() + aggregateutil.CopyMetricDetails(metric, newMetric) + aggregateutil.GroupDataPoints(metric, &ag) + aggregateutil.MergeDataPoints(newMetric, aggregationType, ag) + newMetric.MoveTo(metric) + + return nil, nil + }, nil +} diff --git a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go new file mode 100644 index 000000000000..7fb6cdb2ba6b --- /dev/null +++ b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go @@ -0,0 +1,530 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" +) + +func Test_aggregateOnAttributeValues(t *testing.T) { + tests := []struct { + name string + input pmetric.Metric + t aggregateutil.AggregationType + attribute string + values []string + newValue string + want func(pmetric.MetricSlice) + }{ + { + name: "non-existing value", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test44", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := sumMetric.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + }, + }, + { + name: "non-existing attribute", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + }, + attribute: "testyy", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := sumMetric.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + }, + }, + { + name: "sum sum", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(150) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum mean", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Mean, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(75) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum max", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Max, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum min", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Min, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(50) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum count", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Count, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(2) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum median even", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Median, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(75) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "sum median odd", + input: getTestSumMetricMultipleAggregateOnAttributeValueOdd(), + t: aggregateutil.Median, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(50) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge sum", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(17) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge mean", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Mean, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(8) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge count", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Count, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(2) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge median even", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Median, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(8) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge median odd", + input: getTestGaugeMetricMultipleAggregateOnAttributeValueOdd(), + t: aggregateutil.Median, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(5) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge min", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Min, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(5) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "gauge max", + input: getTestGaugeMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Max, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(12) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "histogram", + input: getTestHistogramMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyHistogram() + metricInput.SetName("histogram_metric") + metricInput.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.Histogram().DataPoints().AppendEmpty() + input.SetCount(10) + input.SetSum(25) + + input.BucketCounts().Append(4, 6) + input.ExplicitBounds().Append(1) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "exponential histogram", + input: getTestExponentialHistogramMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + metricInput := metrics.AppendEmpty() + metricInput.SetEmptyExponentialHistogram() + metricInput.SetName("exponential_histogram_metric") + metricInput.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.ExponentialHistogram().DataPoints().AppendEmpty() + input.SetScale(1) + input.SetCount(10) + input.SetSum(25) + input.Attributes().PutStr("test", "test_new") + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + evaluate, err := AggregateOnAttributeValue(tt.t, tt.attribute, tt.values, tt.newValue) + assert.NoError(t, err) + + _, err = evaluate(nil, ottlmetric.NewTransformContext(tt.input, pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics())) + require.Nil(t, err) + + actualMetrics := pmetric.NewMetricSlice() + tt.input.CopyTo(actualMetrics.AppendEmpty()) + + if tt.want != nil { + expected := pmetric.NewMetricSlice() + tt.want(expected) + assert.Equal(t, expected, actualMetrics) + } + }) + } +} + +func getTestSumMetricMultipleAggregateOnAttributeValue() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptySum() + metricInput.SetName("sum_metric") + + input := metricInput.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + + return metricInput +} + +func getTestSumMetricMultipleAggregateOnAttributeValueOdd() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptySum() + metricInput.SetName("sum_metric") + + input := metricInput.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test1") + + input3 := metricInput.Sum().DataPoints().AppendEmpty() + input3.SetDoubleValue(30) + input3.Attributes().PutStr("test", "test2") + + return metricInput +} + +func getTestGaugeMetricMultipleAggregateOnAttributeValue() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(12) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Gauge().DataPoints().AppendEmpty() + input2.SetIntValue(5) + input2.Attributes().PutStr("test", "test2") + + return metricInput +} + +func getTestGaugeMetricMultipleAggregateOnAttributeValueOdd() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyGauge() + metricInput.SetName("gauge_metric") + + input := metricInput.Gauge().DataPoints().AppendEmpty() + input.SetIntValue(12) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Gauge().DataPoints().AppendEmpty() + input2.SetIntValue(5) + input2.Attributes().PutStr("test", "test2") + + input3 := metricInput.Gauge().DataPoints().AppendEmpty() + input3.SetIntValue(3) + input3.Attributes().PutStr("test", "test1") + + return metricInput +} + +func getTestHistogramMetricMultipleAggregateOnAttributeValue() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyHistogram() + metricInput.SetName("histogram_metric") + metricInput.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.Histogram().DataPoints().AppendEmpty() + input.SetCount(5) + input.SetSum(12.34) + + input.BucketCounts().Append(2, 3) + input.ExplicitBounds().Append(1) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Histogram().DataPoints().AppendEmpty() + input2.SetCount(5) + input2.SetSum(12.66) + + input2.BucketCounts().Append(2, 3) + input2.ExplicitBounds().Append(1) + input2.Attributes().PutStr("test", "test2") + return metricInput +} + +func getTestExponentialHistogramMetricMultipleAggregateOnAttributeValue() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptyExponentialHistogram() + metricInput.SetName("exponential_histogram_metric") + metricInput.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + input := metricInput.ExponentialHistogram().DataPoints().AppendEmpty() + input.SetScale(1) + input.SetCount(5) + input.SetSum(12.34) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.ExponentialHistogram().DataPoints().AppendEmpty() + input2.SetScale(1) + input2.SetCount(5) + input2.SetSum(12.66) + input2.Attributes().PutStr("test", "test2") + return metricInput +} diff --git a/processor/transformprocessor/internal/metrics/functions.go b/processor/transformprocessor/internal/metrics/functions.go index e9e58b08a212..e9a4462f0a5e 100644 --- a/processor/transformprocessor/internal/metrics/functions.go +++ b/processor/transformprocessor/internal/metrics/functions.go @@ -51,6 +51,7 @@ func MetricFunctions() map[string]ottl.Factory[ottlmetric.TransformContext] { newCopyMetricFactory(), newScaleMetricFactory(), newAggregateOnAttributesFactory(), + newAggregateOnAttributeValueFactory(), ) if UseConvertBetweenSumAndGaugeMetricContext.IsEnabled() { diff --git a/processor/transformprocessor/internal/metrics/functions_test.go b/processor/transformprocessor/internal/metrics/functions_test.go index 98a08e18652f..a54c9274d1a0 100644 --- a/processor/transformprocessor/internal/metrics/functions_test.go +++ b/processor/transformprocessor/internal/metrics/functions_test.go @@ -62,6 +62,7 @@ func Test_MetricFunctions(t *testing.T) { expected["convert_sum_to_gauge"] = newConvertSumToGaugeFactory() expected["convert_gauge_to_sum"] = newConvertGaugeToSumFactory() expected["aggregate_on_attributes"] = newAggregateOnAttributesFactory() + expected["aggregate_on_attribute_value"] = newAggregateOnAttributeValueFactory() expected["extract_sum_metric"] = newExtractSumMetricFactory() expected["extract_count_metric"] = newExtractCountMetricFactory() expected["copy_metric"] = newCopyMetricFactory() diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 81140cd6babf..a12afd031378 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -219,7 +219,6 @@ func Test_ProcessMetrics_MetricContext(t *testing.T) { statements: []string{`aggregate_on_attributes("sum", ["attr1", "attr2"]) where name == "operationA"`}, want: func(td pmetric.Metrics) { m := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) - dataPoints := pmetric.NewNumberDataPointSlice() dataPoint1 := dataPoints.AppendEmpty() dataPoint1.SetStartTimestamp(StartTimestamp) @@ -245,6 +244,20 @@ func Test_ProcessMetrics_MetricContext(t *testing.T) { dataPoint1.Attributes().PutStr("flags", "A|B|C") dataPoint1.Attributes().PutStr("total.string", "123456789") + dataPoints.CopyTo(m.Sum().DataPoints()) + }, + }, + { + statements: []string{`aggregate_on_attribute_value("sum", "attr1", ["test1", "test2"], "test") where name == "operationE"`}, + want: func(td pmetric.Metrics) { + m := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4) + + dataPoints := pmetric.NewNumberDataPointSlice() + dataPoint1 := dataPoints.AppendEmpty() + dataPoint1.SetStartTimestamp(StartTimestamp) + dataPoint1.SetDoubleValue(4.7) + dataPoint1.Attributes().PutStr("attr1", "test") + dataPoints.CopyTo(m.Sum().DataPoints()) }, }, @@ -290,6 +303,8 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") }, }, { @@ -322,6 +337,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetDescription("test") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetDescription("test") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetDescription("test") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetDescription("test") }, }, { @@ -331,12 +347,14 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetUnit("new unit") }, }, { statements: []string{`set(metric.description, "Sum") where metric.type == METRIC_DATA_TYPE_SUM`}, want: func(td pmetric.Metrics) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetDescription("Sum") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetDescription("Sum") }, }, { @@ -345,12 +363,14 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) }, }, { statements: []string{`set(metric.is_monotonic, true) where metric.is_monotonic == false`}, want: func(td pmetric.Metrics) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().SetIsMonotonic(true) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().SetIsMonotonic(true) }, }, { @@ -394,6 +414,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "pass") }, }, { @@ -406,6 +427,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr1", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "pass") }, }, { @@ -527,6 +549,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetUnit("new unit") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetUnit("new unit") }, }, { @@ -731,6 +754,8 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") }, }, { @@ -758,6 +783,8 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") }, }, { @@ -811,6 +838,8 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") }, }, } @@ -879,6 +908,7 @@ func constructMetrics() pmetric.Metrics { fillMetricTwo(rm0ils0.Metrics().AppendEmpty()) fillMetricThree(rm0ils0.Metrics().AppendEmpty()) fillMetricFour(rm0ils0.Metrics().AppendEmpty()) + fillMetricFive(rm0ils0.Metrics().AppendEmpty()) return td } @@ -978,3 +1008,19 @@ func fillMetricFour(m pmetric.Metric) { quantileDataPoint1.SetQuantile(.95) quantileDataPoint1.SetValue(321) } + +func fillMetricFive(m pmetric.Metric) { + m.SetName("operationE") + m.SetDescription("operationE description") + m.SetUnit("operationE unit") + + dataPoint0 := m.SetEmptySum().DataPoints().AppendEmpty() + dataPoint0.SetStartTimestamp(StartTimestamp) + dataPoint0.SetDoubleValue(1.0) + dataPoint0.Attributes().PutStr("attr1", "test1") + + dataPoint1 := m.Sum().DataPoints().AppendEmpty() + dataPoint1.SetStartTimestamp(StartTimestamp) + dataPoint1.SetDoubleValue(3.7) + dataPoint1.Attributes().PutStr("attr1", "test2") +} From c0cf5188e84eddece1c8901e883f5ba520daeded Mon Sep 17 00:00:00 2001 From: odubajDT Date: Tue, 30 Jul 2024 09:11:19 +0200 Subject: [PATCH 02/11] polishing Signed-off-by: odubajDT --- processor/transformprocessor/README.md | 22 ++++++++-- ...unc_agregate_on_attribute_value_metrics.go | 12 +++--- ...gregate_on_attribute_value_metrics_test.go | 41 +++++++++++++++++++ .../internal/metrics/processor_test.go | 1 + 4 files changed, 67 insertions(+), 9 deletions(-) diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index f0f473c46ad0..de7f213f70e1 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -418,11 +418,13 @@ To aggregate only using a specified set of attributes, you can use `keep_matchin ### aggregate_on_attribute_value -`aggregate_on_attribute_value(function, label, valueSet, newValue)` +`aggregate_on_attribute_value(function, attribute, values, newValue)` -The `aggregate_on_attribute_value` function aggreates all metrics having the label `label` set with one of the values present in `valueSet` using the aggregation function specified in `function`. Additionally it substitutes the values of `label` present in `valueSet` with a new value specified by `newValue`. +The `aggregate_on_attribute_value` function aggreates all datapoints in the metric containing the attribute `attribute` with one of the values present in the `values` parameter using the aggregation function specified in `function` parameter. Additionally it substitutes the values of the specified `attribute` with a new value in `newValue` parameter. -The function supports the following data types: +The `aggregate_on_attribute_value` function fetches all datapoints in the metric, which have the attribute `attribute` present with one of the values speficied in `values` and substitues these values with `newValue`. Afterwards all datapoints are aggregated depending on the attributes. + +The following metric types can be aggregated: - sum - gauge @@ -444,6 +446,20 @@ Examples: - `aggregate_on_attribute_value(sum, attr1, [val1, val2], new_val) where name == "system.memory.usage` +The `aggregate_on_attributes` function can also be used in conjunction with +[keep_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#keep_matching_keys) or +[delete_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#delete_matching_keys). + +For example, to remove attribute keys matching a regex and aggregate the metrics on the remaining attributes, you can perform the following statement sequence: + +```yaml +statements: + - delete_matching_keys(attributes, "(?i).*myRegex.*") where name == "system.memory.usage" + - aggregate_on_attribute_value(sum, attr1, [val1, val2], new_val) where name == "system.memory.usage" +``` + +To aggregate only using a specified set of attributes, you can use `keep_matching_keys`. + ## Examples ### Perform transformation if field does not exist diff --git a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go index 58d4777eb3c2..0c6b5fd12961 100644 --- a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go +++ b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics.go @@ -16,10 +16,10 @@ import ( ) type aggregateOnAttributeValueArguments struct { - Type string - Attribute string - Values []string - NewValue string + AggregationFunction string + Attribute string + Values []string + NewValue string } func newAggregateOnAttributeValueFactory() ottl.Factory[ottlmetric.TransformContext] { @@ -33,9 +33,9 @@ func createAggregateOnAttributeValueFunction(_ ottl.FunctionContext, oArgs ottl. return nil, fmt.Errorf("AggregateOnAttributeValueFactory args must be of type *AggregateOnAttributeValueArguments") } - t, err := aggregateutil.ConvertToAggregationFunction(args.Type) + t, err := aggregateutil.ConvertToAggregationFunction(args.AggregationFunction) if err != nil { - return nil, fmt.Errorf("aggregation function invalid: %s", err.Error()) + return nil, fmt.Errorf("invalid aggregation function: '%s', valid options: %s", err.Error(), aggregateutil.GetSupportedAggregationFunctionsList()) } return AggregateOnAttributeValue(t, args.Attribute, args.Values, args.NewValue) diff --git a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go index 7fb6cdb2ba6b..68cdbc29e375 100644 --- a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go +++ b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go @@ -71,6 +71,30 @@ func Test_aggregateOnAttributeValues(t *testing.T) { input2.Attributes().PutStr("test", "test2") }, }, + { + name: "non-matching attribute", + input: getTestSumMetricMultipleAggregateOnAttributeValueAdditionalAttribute(), + t: aggregateutil.Sum, + values: []string{ + "test1", + }, + attribute: "testyy", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := sumMetric.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + input2.Attributes().PutStr("test3", "test3") + }, + }, { name: "sum sum", input: getTestSumMetricMultipleAggregateOnAttributeValue(), @@ -429,6 +453,23 @@ func getTestSumMetricMultipleAggregateOnAttributeValue() pmetric.Metric { return metricInput } +func getTestSumMetricMultipleAggregateOnAttributeValueAdditionalAttribute() pmetric.Metric { + metricInput := pmetric.NewMetric() + metricInput.SetEmptySum() + metricInput.SetName("sum_metric") + + input := metricInput.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := metricInput.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + input2.Attributes().PutStr("test3", "test3") + + return metricInput +} + func getTestSumMetricMultipleAggregateOnAttributeValueOdd() pmetric.Metric { metricInput := pmetric.NewMetric() metricInput.SetEmptySum() diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index a12afd031378..6087fcd70d74 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -219,6 +219,7 @@ func Test_ProcessMetrics_MetricContext(t *testing.T) { statements: []string{`aggregate_on_attributes("sum", ["attr1", "attr2"]) where name == "operationA"`}, want: func(td pmetric.Metrics) { m := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + dataPoints := pmetric.NewNumberDataPointSlice() dataPoint1 := dataPoints.AppendEmpty() dataPoint1.SetStartTimestamp(StartTimestamp) From 4a9677e299cd30f79b5f42561b1bba65c68f7cc4 Mon Sep 17 00:00:00 2001 From: odubajDT Date: Tue, 30 Jul 2024 09:15:11 +0200 Subject: [PATCH 03/11] lint Signed-off-by: odubajDT --- processor/transformprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index de7f213f70e1..2747b4e0ebe8 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -422,7 +422,7 @@ To aggregate only using a specified set of attributes, you can use `keep_matchin The `aggregate_on_attribute_value` function aggreates all datapoints in the metric containing the attribute `attribute` with one of the values present in the `values` parameter using the aggregation function specified in `function` parameter. Additionally it substitutes the values of the specified `attribute` with a new value in `newValue` parameter. -The `aggregate_on_attribute_value` function fetches all datapoints in the metric, which have the attribute `attribute` present with one of the values speficied in `values` and substitues these values with `newValue`. Afterwards all datapoints are aggregated depending on the attributes. +The `aggregate_on_attribute_value` function fetches all datapoints in the metric, which have the attribute `attribute` present with one of the values speficied in `values` and substitutes these values with `newValue`. Afterwards all datapoints are aggregated depending on the attributes. The following metric types can be aggregated: From 7a3a916b7f13e6e3f9b08c12a91c8439ab8bf101 Mon Sep 17 00:00:00 2001 From: odubajDT Date: Fri, 2 Aug 2024 09:32:22 +0200 Subject: [PATCH 04/11] use pmetrictest for comparing metrics Signed-off-by: odubajDT --- ...c_agregate_on_attribute_value_metrics_test.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go index 68cdbc29e375..45af434f93b8 100644 --- a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go +++ b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go @@ -13,6 +13,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) func Test_aggregateOnAttributeValues(t *testing.T) { @@ -425,13 +426,22 @@ func Test_aggregateOnAttributeValues(t *testing.T) { _, err = evaluate(nil, ottlmetric.NewTransformContext(tt.input, pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics())) require.Nil(t, err) - actualMetrics := pmetric.NewMetricSlice() - tt.input.CopyTo(actualMetrics.AppendEmpty()) + actualMetric := pmetric.NewMetricSlice() + tt.input.CopyTo(actualMetric.AppendEmpty()) if tt.want != nil { expected := pmetric.NewMetricSlice() tt.want(expected) - assert.Equal(t, expected, actualMetrics) + + expectedMetrics := pmetric.NewMetrics() + sl := expectedMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + expected.CopyTo(sl) + + actualMetrics := pmetric.NewMetrics() + sl2 := actualMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + actualMetric.CopyTo(sl2) + + require.Nil(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreMetricDataPointsOrder())) } }) } From b54b8f1ffc532e734eaa04e6155862bc229d0e35 Mon Sep 17 00:00:00 2001 From: odubajDT <93584209+odubajDT@users.noreply.github.com> Date: Wed, 7 Aug 2024 09:38:28 +0200 Subject: [PATCH 05/11] Update processor/transformprocessor/README.md --- processor/transformprocessor/README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 2747b4e0ebe8..2ea0d59dc273 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -422,8 +422,6 @@ To aggregate only using a specified set of attributes, you can use `keep_matchin The `aggregate_on_attribute_value` function aggreates all datapoints in the metric containing the attribute `attribute` with one of the values present in the `values` parameter using the aggregation function specified in `function` parameter. Additionally it substitutes the values of the specified `attribute` with a new value in `newValue` parameter. -The `aggregate_on_attribute_value` function fetches all datapoints in the metric, which have the attribute `attribute` present with one of the values speficied in `values` and substitutes these values with `newValue`. Afterwards all datapoints are aggregated depending on the attributes. - The following metric types can be aggregated: - sum From 249c7bf96b0031acc5e2eaa752f0d853e7285d6b Mon Sep 17 00:00:00 2001 From: odubajDT Date: Wed, 7 Aug 2024 09:55:41 +0200 Subject: [PATCH 06/11] adapt readme Signed-off-by: odubajDT --- processor/transformprocessor/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 2ea0d59dc273..618fdaac5395 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -420,7 +420,9 @@ To aggregate only using a specified set of attributes, you can use `keep_matchin `aggregate_on_attribute_value(function, attribute, values, newValue)` -The `aggregate_on_attribute_value` function aggreates all datapoints in the metric containing the attribute `attribute` with one of the values present in the `values` parameter using the aggregation function specified in `function` parameter. Additionally it substitutes the values of the specified `attribute` with a new value in `newValue` parameter. +The `aggregate_on_attribute_value` function aggreates all datapoints in the metric containing the attribute `attribute` with one of the values present in the `values` parameter. `function` is a case-sensitive string that represents the aggregation function. + +Firstly, `attribute` values with one of the values present in `values` are substituted by `newValue` for all datapoints. Afterwards all datapoints are aggregated depending on the attributes. The following metric types can be aggregated: From b83e4c75d34a744ad01fe6634e2f14316ef79ee1 Mon Sep 17 00:00:00 2001 From: odubajDT Date: Wed, 7 Aug 2024 09:56:45 +0200 Subject: [PATCH 07/11] adapt readme Signed-off-by: odubajDT --- processor/transformprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 618fdaac5395..d1741e19e38c 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -446,7 +446,7 @@ Examples: - `aggregate_on_attribute_value(sum, attr1, [val1, val2], new_val) where name == "system.memory.usage` -The `aggregate_on_attributes` function can also be used in conjunction with +The `aggregate_on_attribute_value` function can also be used in conjunction with [keep_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#keep_matching_keys) or [delete_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#delete_matching_keys). From b4cbc1791e7e3fdba4692d15dadefa9506057a31 Mon Sep 17 00:00:00 2001 From: odubajDT <93584209+odubajDT@users.noreply.github.com> Date: Mon, 26 Aug 2024 07:38:58 +0200 Subject: [PATCH 08/11] Update processor/transformprocessor/README.md --- processor/transformprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index d1741e19e38c..455395318ae3 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -420,7 +420,7 @@ To aggregate only using a specified set of attributes, you can use `keep_matchin `aggregate_on_attribute_value(function, attribute, values, newValue)` -The `aggregate_on_attribute_value` function aggreates all datapoints in the metric containing the attribute `attribute` with one of the values present in the `values` parameter. `function` is a case-sensitive string that represents the aggregation function. +The `aggregate_on_attribute_value` function aggregates all datapoints in the metric containing the attribute `attribute` with one of the values present in the `values` parameter. `function` is a case-sensitive string that represents the aggregation function. Firstly, `attribute` values with one of the values present in `values` are substituted by `newValue` for all datapoints. Afterwards all datapoints are aggregated depending on the attributes. From 54dfc96794a6a7dcdddeed7cebb2de8b7c27c5e3 Mon Sep 17 00:00:00 2001 From: odubajDT Date: Wed, 4 Sep 2024 10:10:44 +0200 Subject: [PATCH 09/11] pr review Signed-off-by: odubajDT --- .../feat_16224_aggregate_label_value.yaml | 2 +- processor/transformprocessor/README.md | 10 +-- ...gregate_on_attribute_value_metrics_test.go | 80 +++++++++++++++++++ 3 files changed, 85 insertions(+), 7 deletions(-) diff --git a/.chloggen/feat_16224_aggregate_label_value.yaml b/.chloggen/feat_16224_aggregate_label_value.yaml index bb4e6bba0460..c6c69f8b42ed 100644 --- a/.chloggen/feat_16224_aggregate_label_value.yaml +++ b/.chloggen/feat_16224_aggregate_label_value.yaml @@ -7,7 +7,7 @@ change_type: enhancement component: transformprocessor # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: "Support aggregating metrics based on their attribute values and substituing the values with a new value." +note: "Support aggregating metrics based on their attribute values and substituting the values with a new value." # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [16224] diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 455395318ae3..321cbcbf00cf 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -375,7 +375,7 @@ Examples: `aggregate_on_attributes(function, Optional[attributes])` -The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys to aggregate upon. +The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys of type string to aggregate upon. `aggregate_on_attributes` function removes all attributes that are present in datapoints except the ones that are specified in the `attributes` parameter. If `attributes` parameter is not set, all attributes are removed from datapoints. Afterwards all datapoints are aggregated depending on the attributes left (none or the ones present in the list). @@ -420,9 +420,7 @@ To aggregate only using a specified set of attributes, you can use `keep_matchin `aggregate_on_attribute_value(function, attribute, values, newValue)` -The `aggregate_on_attribute_value` function aggregates all datapoints in the metric containing the attribute `attribute` with one of the values present in the `values` parameter. `function` is a case-sensitive string that represents the aggregation function. - -Firstly, `attribute` values with one of the values present in `values` are substituted by `newValue` for all datapoints. Afterwards all datapoints are aggregated depending on the attributes. +The `aggregate_on_attribute_value` function aggregates all datapoints in the metric containing the attribute `attribute` (type string) with one of the values present in the `values` parameter (list of strings) into a single datapoint where the attribute has the value `newValue` (type string). `function` is a case-sensitive string that represents the aggregation function. The following metric types can be aggregated: @@ -444,7 +442,7 @@ Supported aggregation functions are: Examples: -- `aggregate_on_attribute_value(sum, attr1, [val1, val2], new_val) where name == "system.memory.usage` +- `aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage"` The `aggregate_on_attribute_value` function can also be used in conjunction with [keep_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#keep_matching_keys) or @@ -455,7 +453,7 @@ For example, to remove attribute keys matching a regex and aggregate the metrics ```yaml statements: - delete_matching_keys(attributes, "(?i).*myRegex.*") where name == "system.memory.usage" - - aggregate_on_attribute_value(sum, attr1, [val1, val2], new_val) where name == "system.memory.usage" + - aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage" ``` To aggregate only using a specified set of attributes, you can use `keep_matching_keys`. diff --git a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go index 45af434f93b8..2c8e2368ec50 100644 --- a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go +++ b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) @@ -49,6 +50,27 @@ func Test_aggregateOnAttributeValues(t *testing.T) { input2.Attributes().PutStr("test", "test2") }, }, + { + name: "empty values", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{}, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(100) + input.Attributes().PutStr("test", "test1") + + input2 := sumMetric.Sum().DataPoints().AppendEmpty() + input2.SetDoubleValue(50) + input2.Attributes().PutStr("test", "test2") + }, + }, { name: "non-existing attribute", input: getTestSumMetricMultipleAggregateOnAttributeValue(), @@ -96,6 +118,49 @@ func Test_aggregateOnAttributeValues(t *testing.T) { input2.Attributes().PutStr("test3", "test3") }, }, + { + name: "duplicated values", + input: getTestSumMetricMultipleAggregateOnAttributeValue(), + t: aggregateutil.Sum, + values: []string{ + "test1", + "test1", + "test2", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(150) + input.Attributes().PutStr("test", "test_new") + }, + }, + { + name: "2 datapoints aggregated, one left unaggregated", + input: getTestSumMetricMultipleAggregateOnAttributeValueOdd(), + t: aggregateutil.Sum, + values: []string{ + "test1", + }, + attribute: "test", + newValue: "test_new", + want: func(metrics pmetric.MetricSlice) { + sumMetric := metrics.AppendEmpty() + sumMetric.SetEmptySum() + sumMetric.SetName("sum_metric") + + input := sumMetric.Sum().DataPoints().AppendEmpty() + input.SetDoubleValue(150) + input.Attributes().PutStr("test", "test_new") + + input3 := sumMetric.Sum().DataPoints().AppendEmpty() + input3.SetDoubleValue(30) + input3.Attributes().PutStr("test", "test2") + }, + }, { name: "sum sum", input: getTestSumMetricMultipleAggregateOnAttributeValue(), @@ -447,6 +512,21 @@ func Test_aggregateOnAttributeValues(t *testing.T) { } } +func Test_createAggregateOnAttributeValueFunction(t *testing.T) { + // invalid input arguments + _, e := createAggregateOnAttributeValueFunction(ottl.FunctionContext{}, nil) + require.Contains(t, e.Error(), "AggregateOnAttributeValueFactory args must be of type *AggregateOnAttributeValueArguments") + + // invalid aggregation function + _, e = createAggregateOnAttributeValueFunction(ottl.FunctionContext{}, &aggregateOnAttributeValueArguments{ + AggregationFunction: "invalid", + Attribute: "attr", + Values: []string{"val"}, + NewValue: "newVal", + }) + require.Contains(t, e.Error(), "invalid aggregation function") +} + func getTestSumMetricMultipleAggregateOnAttributeValue() pmetric.Metric { metricInput := pmetric.NewMetric() metricInput.SetEmptySum() From b2af9876d5de5914a7bb55415702537d2608bd57 Mon Sep 17 00:00:00 2001 From: odubajDT Date: Wed, 4 Sep 2024 11:39:07 +0200 Subject: [PATCH 10/11] fix lint Signed-off-by: odubajDT --- .../metrics/func_agregate_on_attribute_value_metrics_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go index 2c8e2368ec50..0f84ee276d12 100644 --- a/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go +++ b/processor/transformprocessor/internal/metrics/func_agregate_on_attribute_value_metrics_test.go @@ -489,7 +489,7 @@ func Test_aggregateOnAttributeValues(t *testing.T) { assert.NoError(t, err) _, err = evaluate(nil, ottlmetric.NewTransformContext(tt.input, pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics())) - require.Nil(t, err) + require.NoError(t, err) actualMetric := pmetric.NewMetricSlice() tt.input.CopyTo(actualMetric.AppendEmpty()) @@ -506,7 +506,7 @@ func Test_aggregateOnAttributeValues(t *testing.T) { sl2 := actualMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() actualMetric.CopyTo(sl2) - require.Nil(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreMetricDataPointsOrder())) + require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreMetricDataPointsOrder())) } }) } From 410b43fd5d313c3db03ac0eb8cc7b9c275dc58de Mon Sep 17 00:00:00 2001 From: odubajDT Date: Fri, 6 Sep 2024 07:24:49 +0200 Subject: [PATCH 11/11] adapt readme Signed-off-by: odubajDT --- processor/transformprocessor/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 321cbcbf00cf..f8de372bc0ce 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -379,6 +379,8 @@ The `aggregate_on_attributes` function aggregates all datapoints in the metric b `aggregate_on_attributes` function removes all attributes that are present in datapoints except the ones that are specified in the `attributes` parameter. If `attributes` parameter is not set, all attributes are removed from datapoints. Afterwards all datapoints are aggregated depending on the attributes left (none or the ones present in the list). +**NOTE:** This function is supported only in `metric` context. + The following metric types can be aggregated: - sum @@ -422,6 +424,8 @@ To aggregate only using a specified set of attributes, you can use `keep_matchin The `aggregate_on_attribute_value` function aggregates all datapoints in the metric containing the attribute `attribute` (type string) with one of the values present in the `values` parameter (list of strings) into a single datapoint where the attribute has the value `newValue` (type string). `function` is a case-sensitive string that represents the aggregation function. +**NOTE:** This function is supported only in `metric` context. + The following metric types can be aggregated: - sum