Skip to content

Commit

Permalink
Rename feature of the metrics transform processor (metrics and labels) (
Browse files Browse the repository at this point in the history
#376)

* metric processor rename metrics and labels new branch

* remove branches

* removed validations

* lint: remove unused variables related to validation

* updated errors and error testing, optimized code based on comments

* add metrics transform component into components file

* have this built with the main collector

* metric processor rename metrics and labels new branch

* remove branches

* removed validations

* lint: remove unused variables related to validation

* updated errors and error testing, optimized code based on comments

* add metrics transform component into components file

merge components.go

* have this built with the main collector

merge go.mod go.sum

* go.mod resolved bad merge

* go.mod resolved bad merge

* go.sum after testing
  • Loading branch information
JingboWangGoogle authored and wyTrivail committed Jul 13, 2020
1 parent a2f2220 commit da08d58
Show file tree
Hide file tree
Showing 18 changed files with 2,806 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver"
Expand Down Expand Up @@ -120,6 +121,7 @@ func components() (config.Factories, error) {
processors := []component.ProcessorFactoryBase{
&k8sprocessor.Factory{},
resourcedetectionprocessor.NewFactory(),
&metricstransformprocessor.Factory{},
}
for _, pr := range factories.Processors {
processors = append(processors, pr)
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerlegacyreceiver v0.0.0
Expand Down Expand Up @@ -112,4 +113,6 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sp

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor => ./processor/resourcedetectionprocessor/

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor => ./processor/metricstransformprocessor/

replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
1 change: 1 addition & 0 deletions processor/metricstransformprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
99 changes: 99 additions & 0 deletions processor/metricstransformprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Metrics Transform Processor **(UNDER DEVELOPMENT - NOT READY FOR USE)**
Supported pipeline types: metrics
- This ONLY supports renames/aggregations **within individual metrics**. It does not do any aggregation across batches, so it is not suitable for aggregating metrics from multiple sources (e.g. multiple nodes or clients). At this point, it is only for aggregating metrics from a single source that groups its metrics for a particular time period into a single batch (e.g. host metrics from the VM the collector is running on).
- Rename Collisions will result in a no operation on the metrics data
- e.g. If want to rename a metric or label to `new_name` while there is already a metric or label called `new_name`, this operation will not take any effect. There will also be an error logged

## Description
The metrics transform processor can be used to rename metrics, labels, or label values. It can also be used to perform aggregations on metrics across labels or label values.

## Capabilities
- Rename metrics (e.g. rename `cpu/usage` to `cpu/usage_time`)
- Rename labels (e.g. rename `cpu` to `core`)
- Rename label values (e.g. rename `done` to `complete`)
- Aggregate across label sets (e.g. only want the label `usage`, but don’t care about the labels `core`, and `cpu`)
- Aggregation_type: sum, average, max
- Aggregate across label values (e.g. want `memory{slab}`, but don’t care about `memory{slab_reclaimable}` & `memory{slab_unreclaimable}`)
- Aggregation_type: sum, average, max

## Configuration
```yaml
# transforms is a list of transformations with each element transforming a metric selected by metric name
transforms:
# name is used to match with the metric to operate on. This implementation doesn’t utilize the filtermetric’s MatchProperties struct because it doesn’t match well with what I need at this phase. All is needed for this processor at this stage is a single name string that can be used to match with selected metrics. The list of metric names and the match type in the filtermetric’s MatchProperties struct are unnecessary. Also, based on the issue about improving filtering configuration, it seems like this struct is subject to be slightly modified.
- metric_name: <current_metric_name>

# action specifies if the operations are performed on the current copy of the metric or on a newly created metric that will be inserted
action: {update, insert}

# new_name is used to rename metrics (e.g. rename cpu/usage to cpu/usage_time) if action is insert, new_name is required
new_name: <new_metric_name_inserted>

# operations contain a list of operations that will be performed on the selected metrics. Each operation block is a key-value pair, where the key can be any arbitrary string set by the users for readability, and the value is a struct with fields required for operations. The action field is important for the processor to identify exactly which operation to perform
operations:

# update_label action can be used to update the name of a label or the values of this label (e.g. rename label `cpu` to `core`)
- action: update_label
label: <current_label1>
new_label: <new_label>
value_actions:
- value: <current_label_value>
new_value: <new_label_value>

# aggregate_labels action aggregates metrics across labels (e.g. only want the label `usage`, but don’t care about the labels `core`, and `cpu`)
- action: aggregate_labels
# label_set contains a list of labels that will remain after the aggregation. The excluded labels will be aggregated by the way specified by aggregation_type.
label_set: [labels...]
aggregation_type: {sum, average, max}

# aggregate_label_values action aggregates labels across label values (e.g. want memory{slab}, but don’t care about memory{slab_reclaimable} & memory{slab_unreclaimable})
- action: aggregate_label_values
label: <label>
# aggregated_values contains a list of label values that will be aggregated by the way specified by aggregation_type into new_value. The excluded label values will remain.
aggregated_values: [values...]
new_value: <new_value>
aggregation_type: {sum, average, max}
```
## Examples
### Insert New Metric
```yaml
# create host.cpu.utilization from host.cpu.usage
metric_name: host/cpu/usage
action: insert
new_name: host/cpu/utilization
operations:
...
```
### Rename Labels
```yaml
# rename the label cpu to core
operations:
- action: update_label
label: cpu
new_label: core
```
### Aggregate Labels
```yaml
# aggregate away everything but `state` using summation
...
operations:
-action: aggregate_labels
label_set: [ state ]
aggregation_type: sum
```
### Aggregate Label Values
```yaml
# combine slab_reclaimable & slab_unreclaimable by summation
...
operations:
-action: aggregate_label_values
label: state
aggregated_values: [ slab_reclaimable, slab_unreclaimable ]
new_value: slab
aggregation_type: sum
```
134 changes: 134 additions & 0 deletions processor/metricstransformprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2020 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metricstransformprocessor

import "go.opentelemetry.io/collector/config/configmodels"

const (
// MetricNameFieldName is the mapstructure field name for MetricName field
MetricNameFieldName = "metric_name"

// ActionFieldName is the mapstructure field name for Action field
ActionFieldName = "action"

// NewNameFieldName is the mapstructure field name for NewName field
NewNameFieldName = "new_name"

// LabelFieldName is the mapstructure field name for Label field
LabelFieldName = "label"

// NewLabelFieldName is the mapstructure field name for NewLabel field
NewLabelFieldName = "new_label"
)

// Config defines configuration for Resource processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`

// Transform specifies a list of transforms on metrics with each transform focusing on one metric.
Transforms []Transform `mapstructure:"transforms"`
}

// Transform defines the transformation applied to the specific metric
type Transform struct {
// MetricName is used to select the metric to operate on.
// REQUIRED
MetricName string `mapstructure:"metric_name"`

// Action specifies the action performed on the matched metric.
// REQUIRED
Action ConfigAction `mapstructure:"action"`

// NewName specifies the name of the new metric when inserting or updating.
// REQUIRED only if Action is INSERT.
NewName string `mapstructure:"new_name"`

// Operations contains a list of operations that will be performed on the selected metric.
Operations []Operation `mapstructure:"operations"`
}

// Operation defines the specific operation performed on the selected metrics.
type Operation struct {
// Action specifies the action performed for this operation.
// REQUIRED
Action OperationAction `mapstructure:"action"`

// Label identifies the exact label to operate on.
Label string `mapstructure:"label"`

// NewLabel determines the name to rename the identified label to.
NewLabel string `mapstructure:"new_label"`

// LabelSet is a list of labels to keep. All other labels are aggregated based on the AggregationType.
LabelSet []string `mapstructure:"label_set"`

// AggregationType specifies how to aggregate.
AggregationType AggregationType `mapstructure:"aggregation_type"`

// AggregatedValues is a list of label values to aggregate away.
AggregatedValues []string `mapstructure:"aggregated_values"`

// NewValue indicates what is the value called when the AggregatedValues are aggregated into one.
NewValue string `mapstructure:"new_value"`

// ValueActions is a list of renaming actions for label values.
ValueActions []ValueAction `mapstructure:"value_actions"`
}

// ValueAction renames label values.
type ValueAction struct {
// Value specifies the current label value.
Value string `mapstructure:"value"`

// NewValue specifies the label value to rename to.
NewValue string `mapstructure:"new_value"`
}

// ConfigAction is the enum to capture the two types of actions to perform on a metric.
type ConfigAction string

// OperationAction is the enum to capture the thress types of actions to perform for an operation.
type OperationAction string

// AggregationType os the enum to capture the three types of aggregation for the aggregation operation.
type AggregationType string

const (
// Insert adds a new metric to the batch with a new name.
Insert ConfigAction = "insert"

// Update updates an existing metric.
Update ConfigAction = "update"

// UpdateLabel applies name changes to label and/or label values.
UpdateLabel OperationAction = "update_label"

// AggregateLabels aggregates away all labels other than the ones in Operation.LabelSet
// by the method indicated by Operation.AggregationType.
AggregateLabels OperationAction = "aggregate_labels"

// AggregateLabelValues aggregates away the values in Operation.AggregatedValues
// by the method indicated by Operation.AggregationType.
AggregateLabelValues OperationAction = "aggregate_label_values"

// Average indicates taking the average of the aggregated data.
Average AggregationType = "average"

// Max indicates taking the max of the aggregated data.
Max AggregationType = "max"

// Sum indicates taking the sum of the aggregated data.
Sum AggregationType = "sum"
)
102 changes: 102 additions & 0 deletions processor/metricstransformprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metricstransformprocessor

import (
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmodels"
)

var (
testDataOperations = []Operation{
{
Action: UpdateLabel,
Label: "label",
NewLabel: "new_label",
ValueActions: []ValueAction{
{
Value: "current_label_value",
NewValue: "new_label_value",
},
},
},
{
Action: AggregateLabels,
LabelSet: []string{
"label1",
"label2",
},
AggregationType: Sum,
},
{
Action: AggregateLabelValues,
Label: "label",
AggregatedValues: []string{
"value1",
"value2",
},
NewValue: "new_value",
AggregationType: Sum,
},
}

tests = []struct {
filterName string
expCfg *Config
}{
{
filterName: "metricstransform",
expCfg: &Config{
ProcessorSettings: configmodels.ProcessorSettings{
NameVal: "metricstransform",
TypeVal: typeStr,
},
Transforms: []Transform{
{
MetricName: "old_name",
Action: Update,
NewName: "new_name",
Operations: testDataOperations,
},
},
},
},
}
)

// TestLoadingFullConfig tests loading testdata/config_full.yaml.
func TestLoadingFullConfig(t *testing.T) {
factories, err := config.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factories.Processors[configmodels.Type(typeStr)] = factory
config, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config_full.yaml"), factories)

assert.NoError(t, err)
require.NotNil(t, config)

for _, test := range tests {
t.Run(test.filterName, func(t *testing.T) {
cfg := config.Processors[test.filterName]
assert.Equal(t, test.expCfg, cfg)
})
}
}
17 changes: 17 additions & 0 deletions processor/metricstransformprocessor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2020 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package metricstransformprocessor implements a processor for transforming metrics including renaming metric name, labels and label values.
// This processor can also aggregate metrics based on labels or label values.
package metricstransformprocessor
Loading

0 comments on commit da08d58

Please sign in to comment.