diff --git a/.chloggen/aggregate_labels_empty.yaml b/.chloggen/aggregate_labels_empty.yaml new file mode 100644 index 000000000000..a98c8ed4f3c1 --- /dev/null +++ b/.chloggen/aggregate_labels_empty.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: metricstransform + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The previously removed functionality of aggregating against an empty label set is restored. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34430] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/internal/coreinternal/aggregateutil/aggregate.go b/internal/coreinternal/aggregateutil/aggregate.go index dcfe176be121..c7bc4613b716 100644 --- a/internal/coreinternal/aggregateutil/aggregate.go +++ b/internal/coreinternal/aggregateutil/aggregate.go @@ -33,9 +33,21 @@ func CopyMetricDetails(from, to pmetric.Metric) { } func FilterAttrs(metric pmetric.Metric, filterAttrKeys []string) { - if len(filterAttrKeys) == 0 { + // filterAttrKeys being nil means the filter is to be skipped. + if filterAttrKeys == nil { return } + // filterAttrKeys being empty means it is explicitly expected to filter + // against an empty label set, which is functionally the same as removing + // all attributes. + if len(filterAttrKeys) == 0 { + RangeDataPointAttributes(metric, func(attrs pcommon.Map) bool { + attrs.Clear() + return true + }) + } + // filterAttrKeys having provided attributes means the filter continues + // as normal. RangeDataPointAttributes(metric, func(attrs pcommon.Map) bool { attrs.RemoveIf(func(k string, _ pcommon.Value) bool { return isNotPresent(k, filterAttrKeys) diff --git a/internal/coreinternal/aggregateutil/aggregate_test.go b/internal/coreinternal/aggregateutil/aggregate_test.go index a1e986b988e2..642e44996dd8 100644 --- a/internal/coreinternal/aggregateutil/aggregate_test.go +++ b/internal/coreinternal/aggregateutil/aggregate_test.go @@ -125,9 +125,7 @@ func Test_FilterAttributes(t *testing.T) { want: func() pmetric.Metric { m := pmetric.NewMetric() s := m.SetEmptySum() - d := s.DataPoints().AppendEmpty() - d.Attributes().PutStr("attr1", "val1") - d.Attributes().PutStr("attr2", "val2") + s.DataPoints().AppendEmpty() return m }, }, diff --git a/processor/metricstransformprocessor/metrics_testcase_builder_test.go b/processor/metricstransformprocessor/metrics_testcase_builder_test.go index dc2142b2ca76..55579b24b8b4 100644 --- a/processor/metricstransformprocessor/metrics_testcase_builder_test.go +++ b/processor/metricstransformprocessor/metrics_testcase_builder_test.go @@ -4,6 +4,8 @@ package metricstransformprocessor import ( + "fmt" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -54,7 +56,12 @@ func (b builder) addDoubleDatapoint(start, ts pcommon.Timestamp, val float64, at func (b builder) setAttrs(attrs pcommon.Map, attrValues []string) { if len(attrValues) != len(b.attrs) { - panic(attrValues) + panic( + fmt.Sprintf( + "not enough attributes, expected %d attributes but got %s", + len(b.attrs), + attrValues), + ) } for i, a := range b.attrs { attrs.PutStr(a, attrValues[i]) diff --git a/processor/metricstransformprocessor/metrics_transform_processor_otlp.go b/processor/metricstransformprocessor/metrics_transform_processor_otlp.go index ad2e3b42c6e8..1d1abac32830 100644 --- a/processor/metricstransformprocessor/metrics_transform_processor_otlp.go +++ b/processor/metricstransformprocessor/metrics_transform_processor_otlp.go @@ -554,7 +554,7 @@ func transformMetric(metric pmetric.Metric, transform internalTransform) bool { updateLabelOp(metric, op, transform.MetricIncludeFilter) case aggregateLabels: if canChangeMetric { - attrs := []string{} + attrs := make([]string, 0, len(op.labelSetMap)) for k, v := range op.labelSetMap { if v { attrs = append(attrs, k) diff --git a/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go b/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go index cc6e5ff742d7..5d5191e57664 100644 --- a/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go +++ b/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go @@ -667,6 +667,35 @@ var ( build(), }, }, + { + name: "metric_label_aggregation_empty_label_set", + transforms: []internalTransform{ + { + MetricIncludeFilter: internalFilterStrict{include: "metric1"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: aggregateLabels, + AggregationType: aggregateutil.Sum, + LabelSet: []string{}, + }, + labelSetMap: map[string]bool{}, + }, + }, + }, + }, + in: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeGauge, "metric1", "label1", "label2", "label3"). + addIntDatapoint(0, 1, 1, "a", "b", "c"). + build(), + }, + out: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeGauge, "metric1"). + addIntDatapoint(0, 1, 1). + build(), + }, + }, { name: "metric_label_aggregation_ignored_for_partial_metric_match", transforms: []internalTransform{