Skip to content

Commit

Permalink
pr review
Browse files Browse the repository at this point in the history
Signed-off-by: odubajDT <[email protected]>
  • Loading branch information
odubajDT committed Sep 4, 2024
1 parent b4cbc17 commit 54dfc96
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .chloggen/feat_16224_aggregate_label_value.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ change_type: enhancement
component: transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Support aggregating metrics based on their attribute values and substituing the values with a new value."
note: "Support aggregating metrics based on their attribute values and substituting the values with a new value."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [16224]
Expand Down
10 changes: 4 additions & 6 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ Examples:

`aggregate_on_attributes(function, Optional[attributes])`

The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys to aggregate upon.
The `aggregate_on_attributes` function aggregates all datapoints in the metric based on the supplied attributes. `function` is a case-sensitive string that represents the aggregation function and `attributes` is an optional list of attribute keys of type string to aggregate upon.

`aggregate_on_attributes` function removes all attributes that are present in datapoints except the ones that are specified in the `attributes` parameter. If `attributes` parameter is not set, all attributes are removed from datapoints. Afterwards all datapoints are aggregated depending on the attributes left (none or the ones present in the list).

Expand Down Expand Up @@ -420,9 +420,7 @@ To aggregate only using a specified set of attributes, you can use `keep_matchin

`aggregate_on_attribute_value(function, attribute, values, newValue)`

The `aggregate_on_attribute_value` function aggregates all datapoints in the metric containing the attribute `attribute` with one of the values present in the `values` parameter. `function` is a case-sensitive string that represents the aggregation function.

Firstly, `attribute` values with one of the values present in `values` are substituted by `newValue` for all datapoints. Afterwards all datapoints are aggregated depending on the attributes.
The `aggregate_on_attribute_value` function aggregates all datapoints in the metric containing the attribute `attribute` (type string) with one of the values present in the `values` parameter (list of strings) into a single datapoint where the attribute has the value `newValue` (type string). `function` is a case-sensitive string that represents the aggregation function.

The following metric types can be aggregated:

Expand All @@ -444,7 +442,7 @@ Supported aggregation functions are:

Examples:

- `aggregate_on_attribute_value(sum, attr1, [val1, val2], new_val) where name == "system.memory.usage`
- `aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage"`

The `aggregate_on_attribute_value` function can also be used in conjunction with
[keep_matching_keys](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/ottlfuncs#keep_matching_keys) or
Expand All @@ -455,7 +453,7 @@ For example, to remove attribute keys matching a regex and aggregate the metrics
```yaml
statements:
- delete_matching_keys(attributes, "(?i).*myRegex.*") where name == "system.memory.usage"
- aggregate_on_attribute_value(sum, attr1, [val1, val2], new_val) where name == "system.memory.usage"
- aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage"
```

To aggregate only using a specified set of attributes, you can use `keep_matching_keys`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)
Expand Down Expand Up @@ -49,6 +50,27 @@ func Test_aggregateOnAttributeValues(t *testing.T) {
input2.Attributes().PutStr("test", "test2")
},
},
{
name: "empty values",
input: getTestSumMetricMultipleAggregateOnAttributeValue(),
t: aggregateutil.Sum,
values: []string{},
attribute: "test",
newValue: "test_new",
want: func(metrics pmetric.MetricSlice) {
sumMetric := metrics.AppendEmpty()
sumMetric.SetEmptySum()
sumMetric.SetName("sum_metric")

input := sumMetric.Sum().DataPoints().AppendEmpty()
input.SetDoubleValue(100)
input.Attributes().PutStr("test", "test1")

input2 := sumMetric.Sum().DataPoints().AppendEmpty()
input2.SetDoubleValue(50)
input2.Attributes().PutStr("test", "test2")
},
},
{
name: "non-existing attribute",
input: getTestSumMetricMultipleAggregateOnAttributeValue(),
Expand Down Expand Up @@ -96,6 +118,49 @@ func Test_aggregateOnAttributeValues(t *testing.T) {
input2.Attributes().PutStr("test3", "test3")
},
},
{
name: "duplicated values",
input: getTestSumMetricMultipleAggregateOnAttributeValue(),
t: aggregateutil.Sum,
values: []string{
"test1",
"test1",
"test2",
},
attribute: "test",
newValue: "test_new",
want: func(metrics pmetric.MetricSlice) {
sumMetric := metrics.AppendEmpty()
sumMetric.SetEmptySum()
sumMetric.SetName("sum_metric")
input := sumMetric.Sum().DataPoints().AppendEmpty()
input.SetDoubleValue(150)
input.Attributes().PutStr("test", "test_new")
},
},
{
name: "2 datapoints aggregated, one left unaggregated",
input: getTestSumMetricMultipleAggregateOnAttributeValueOdd(),
t: aggregateutil.Sum,
values: []string{
"test1",
},
attribute: "test",
newValue: "test_new",
want: func(metrics pmetric.MetricSlice) {
sumMetric := metrics.AppendEmpty()
sumMetric.SetEmptySum()
sumMetric.SetName("sum_metric")

input := sumMetric.Sum().DataPoints().AppendEmpty()
input.SetDoubleValue(150)
input.Attributes().PutStr("test", "test_new")

input3 := sumMetric.Sum().DataPoints().AppendEmpty()
input3.SetDoubleValue(30)
input3.Attributes().PutStr("test", "test2")
},
},
{
name: "sum sum",
input: getTestSumMetricMultipleAggregateOnAttributeValue(),
Expand Down Expand Up @@ -447,6 +512,21 @@ func Test_aggregateOnAttributeValues(t *testing.T) {
}
}

func Test_createAggregateOnAttributeValueFunction(t *testing.T) {
// invalid input arguments
_, e := createAggregateOnAttributeValueFunction(ottl.FunctionContext{}, nil)
require.Contains(t, e.Error(), "AggregateOnAttributeValueFactory args must be of type *AggregateOnAttributeValueArguments")

// invalid aggregation function
_, e = createAggregateOnAttributeValueFunction(ottl.FunctionContext{}, &aggregateOnAttributeValueArguments{
AggregationFunction: "invalid",
Attribute: "attr",
Values: []string{"val"},
NewValue: "newVal",
})
require.Contains(t, e.Error(), "invalid aggregation function")
}

func getTestSumMetricMultipleAggregateOnAttributeValue() pmetric.Metric {
metricInput := pmetric.NewMetric()
metricInput.SetEmptySum()
Expand Down

0 comments on commit 54dfc96

Please sign in to comment.