Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/attributesprocessor] Add metric support #8111

Merged
merged 10 commits into from
Mar 10, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### 💡 Enhancements 💡

- `attributesprocessor`: Add metric support (#8111)
- `prometheusremotewriteexporter`: Write-Ahead Log support enabled (#7304)
- `hostreceiver/filesystemscraper`: Add filesystem utilization (#8027)
- `hostreceiver/pagingscraper`: Add paging.utilization (#6221)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
gopkg.in/zorkian/go-datadog-api.v2 v2.30.0 // indirect
gotest.tools/v3 v3.1.0 // indirect
k8s.io/api v0.23.4 // indirect
k8s.io/apimachinery v0.23.4 // indirect
k8s.io/client-go v0.23.4 // indirect
Expand Down
60 changes: 59 additions & 1 deletion go.sum

Large diffs are not rendered by default.

38 changes: 23 additions & 15 deletions internal/coreinternal/processor/filterconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,32 @@ import (
// by the processor, captured under the 'include' and the second, exclude, to
// define what is excluded from the processor.
type MatchConfig struct {
// Include specifies the set of span/log properties that must be present in order
// Include specifies the set of input data properties that must be present in order
// for this processor to apply to it.
// Note: If `exclude` is specified, the span/log is compared against those
// Note: If `exclude` is specified, the input data is compared against those
// properties after the `include` properties.
// This is an optional field. If neither `include` and `exclude` are set, all span/logs
// This is an optional field. If neither `include` and `exclude` are set, all input data
// are processed. If `include` is set and `exclude` isn't set, then all
// span/logs matching the properties in this structure are processed.
// input data matching the properties in this structure are processed.
Include *MatchProperties `mapstructure:"include"`

// Exclude specifies when this processor will not be applied to the span/logs
// Exclude specifies when this processor will not be applied to the input data
// which match the specified properties.
// Note: The `exclude` properties are checked after the `include` properties,
// if they exist, are checked.
// If `include` isn't specified, the `exclude` properties are checked against
// all span/logs.
// This is an optional field. If neither `include` and `exclude` are set, all span/logs
// are processed. If `exclude` is set and `include` isn't set, then all
// span/logs that do no match the properties in this structure are processed.
// all input data.
// This is an optional field. If neither `include` and `exclude` are set, all input data
// is processed. If `exclude` is set and `include` isn't set, then all the
// input data that does not match the properties in this structure are processed.
Exclude *MatchProperties `mapstructure:"exclude"`
}

// MatchProperties specifies the set of properties in a span/log to match
// against and if the span/log should be included or excluded from the
// processor. At least one of services (spans only), span/log names or
// MatchProperties specifies the set of properties in a spans/log/metric to match
// against and if the input data should be included or excluded from the
// processor. At least one of services (spans only), names or
// attributes must be specified. It is supported to have all specified, but
// this requires all of the properties to match for the inclusion/exclusion to
// this requires all the properties to match for the inclusion/exclusion to
// occur.
// The following are examples of invalid configurations:
// attributes/bad1:
Expand Down Expand Up @@ -77,7 +77,10 @@ type MatchProperties struct {
// For logs, one of LogNames, Attributes, Resources or Libraries must be specified with a
// non-empty value for a valid configuration.

// Services specify the list of of items to match service name against.
// For metrics, one of MetricNames, Expressions, or ResourceAttributes must be specified with a
// non-empty value for a valid configuration.

// Services specify the list of items to match service name against.
// A match occurs if the span's service name matches at least one item in this list.
// This is an optional field.
Services []string `mapstructure:"services"`
Expand All @@ -92,14 +95,19 @@ type MatchProperties struct {
// Deprecated: the Name field is removed from the log data model.
LogNames []string `mapstructure:"log_names"`

// MetricNames is a list of strings to match metric name against.
// A match occurs if metric name matches at least one item in the list.
// This field is optional.
MetricNames []string `mapstructure:"metric_names"`

// Attributes specifies the list of attributes to match against.
// All of these attributes must match exactly for a match to occur.
// Only match_type=strict is allowed if "attributes" are specified.
// This is an optional field.
Attributes []Attribute `mapstructure:"attributes"`

// Resources specify the list of items to match the resources against.
// A match occurs if the span's resources matches at least one item in this list.
// A match occurs if the data's resources match at least one item in this list.
// This is an optional field.
Resources []Attribute `mapstructure:"resources"`

Expand Down
15 changes: 14 additions & 1 deletion internal/coreinternal/processor/filtermetric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,20 @@ type MatchProperties struct {
ResourceAttributes []filterconfig.Attribute `mapstructure:"resource_attributes"`
}

// ChecksMetrics returns whether or not the check should iterate through all the metrics
func CreateMatchPropertiesFromDefault(properties *filterconfig.MatchProperties) *MatchProperties {
if properties == nil {
return nil
}

return &MatchProperties{
MatchType: MatchType(properties.Config.MatchType),
RegexpConfig: properties.Config.RegexpConfig,
MetricNames: properties.MetricNames,
ResourceAttributes: properties.Resources,
}
}

// ChecksMetrics returns whether the check should iterate through all the metrics
func (mp *MatchProperties) ChecksMetrics() bool {
if mp == nil {
return false
Expand Down
35 changes: 35 additions & 0 deletions internal/coreinternal/processor/filtermetric/filtermetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package filtermetric // import "github.com/open-telemetry/opentelemetry-collecto

import (
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"
)

type Matcher interface {
Expand All @@ -25,8 +26,42 @@ type Matcher interface {
// NewMatcher constructs a metric Matcher. If an 'expr' match type is specified,
// returns an expr matcher, otherwise a name matcher.
func NewMatcher(config *MatchProperties) (Matcher, error) {
if config == nil {
return nil, nil
}

if config.MatchType == Expr {
return newExprMatcher(config.Expressions)
}
return newNameMatcher(config)
}

// Filters have the ability to include and exclude metrics based on the metric's properties.
// The default is to not skip. If include is defined, the metric must match or it will be skipped.
// If include is not defined but exclude is, metric will be skipped if it matches exclude. Metric
// is included if neither specified.
func SkipMetric(include, exclude Matcher, metric pdata.Metric, logger *zap.Logger) bool {
if include != nil {
// A false (or an error) returned in this case means the metric should not be processed.
i, err := include.MatchMetric(metric)
if !i || err != nil {
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved
logger.Debug("Skipping metric",
zap.String("metric_name", (metric.Name())),
zap.Error(err)) // zap.Error handles case where err is nil
return true
}
}

if exclude != nil {
// A true (or an error) returned in this case means the metric should not be processed.
e, err := exclude.MatchMetric(metric)
if e || err != nil {
logger.Debug("Skipping metric",
zap.String("metric_name", (metric.Name())),
zap.Error(err)) // zap.Error handles case where err is nil
return true
}
}

return false
}
67 changes: 52 additions & 15 deletions processor/attributesprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
# Attributes Processor
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved

Supported pipeline types: traces, logs.
Supported pipeline types: traces, logs, metrics.

The attributes processor modifies attributes of a span or log. Please refer to
The attributes processor modifies attributes of a span, log, or metric. Please refer to
[config.go](./config.go) for the config spec.

This processor also supports the ability to filter and match spans/logs to determine
This processor also supports the ability to filter and match input data to determine
if they should be [included or excluded](#includeexclude-filtering) for specified actions.

It takes a list of actions which are performed in order specified in the config.
The supported actions are:
- `insert`: Inserts a new attribute in spans/logs where the key does not already exist.
- `update`: Updates an attribute in spans/logs where the key does exist.
- `upsert`: Performs insert or update. Inserts a new attribute in spans/logs where the
key does not already exist and updates an attribute in spans/logs where the key
- `insert`: Inserts a new attribute in input data where the key does not already exist.
- `update`: Updates an attribute in input data where the key does exist.
- `upsert`: Performs insert or update. Inserts a new attribute in input data where the
key does not already exist and updates an attribute in input data where the key
does exist.
- `delete`: Deletes an attribute from a span/log.
- `delete`: Deletes an attribute from the input data.
- `hash`: Hashes (SHA1) an existing attribute value.
- `extract`: Extracts values using a regular expression rule from the input key
to target keys specified in the rule. If a target key already exists, it will
Expand All @@ -37,7 +37,7 @@ For the actions `insert`, `update` and `upsert`,
# Key specifies the attribute to act upon.
- key: <key>
action: {insert, update, upsert}
# FromAttribute specifies the attribute from the span/log to use to populate
# FromAttribute specifies the attribute from the input data to use to populate
# the value. If the attribute doesn't exist, no action is performed.
from_attribute: <other key>

Expand Down Expand Up @@ -116,16 +116,49 @@ processors:
Refer to [config.yaml](./testdata/config.yaml) for detailed
examples on using the processor.

### Attributes Processor for Metrics vs. [Metric Transform Processor](../metricstransformprocessor)

Regarding metric support, these two processors have overlapping functionality. They can both do simple modifications
crobert-1 marked this conversation as resolved.
Show resolved Hide resolved
of metric attribute key-value pairs. As a general rule the attributes processor has more attribute related
functionality, while the metrics transform processor can do much more data manipulation. The attributes processor
is preferred when the only needed functionality is overlapping, as it natively uses the official OpenTelemetry
data model. However, if the metric transform processor is already in use or its extra functionality is necessary,
there's no need to migrate away from it.

Shared functionality
* Add attributes
* Update values of attributes

Attribute processor specific functionality
* delete
* hash
* extract

Metric transform processor specific functionality
* Rename metrics
* Delete data points
* Toggle data type
* Scale value
* Aggregate across label sets
* Aggregate across label values

## Include/Exclude Filtering

The [attribute processor](README.md) exposes
an option to provide a set of properties of a span or log record to match against to determine
if the span/log should be included or excluded from the processor. To configure
an option to provide a set of properties of a span, log, or metric record to match against to determine
if the input data should be included or excluded from the processor. To configure
this option, under `include` and/or `exclude` at least `match_type` and one of the following
is required:
- For spans, one of `services`, `span_names`, `attributes`, `resources`, or `libraries` must be specified with a non-empty value for a valid configuration. The `log_names` field is invalid.
- For spans, one of `services`, `span_names`, `attributes`, `resources`, or `libraries` must be specified
with a non-empty value for a valid configuration. The `log_names`, `expressions`, `resource_attributes` and
`metric_names` fields are invalid.
- For logs, one of `log_names`, `attributes`, `resources`, or `libraries` must be specified with a
non-empty value for a valid configuration. The `span_names` and `services` fields are invalid.
non-empty value for a valid configuration. The `span_names`, `metric_names`, `expressions`, `resource_attributes`,
and `services` fields are invalid.
- For metrics, one of `metric_names`, `resources` must be specified
with a valid non-empty value for a valid configuration. The `span_names`, `log_names`, and
`services` fields are invalid.


Note: If both `include` and `exclude` are specified, the `include` properties
are checked before the `exclude` properties.
Expand Down Expand Up @@ -154,11 +187,11 @@ attributes:
services: [<item1>, ..., <itemN>]

# resources specify an array of items to match the resources against.
# A match occurs if the span/log resources matches at least one of the items.
# A match occurs if the input data resources matches at least one of the items.
resources: [<item1>, ..., <itemN>]

# libraries specify an array of items to match the implementation library against.
# A match occurs if the span/log implementation library matches at least one of the items.
# A match occurs if the input data implementation library matches at least one of the items.
libraries: [<item1>, ..., <itemN>]

# The span name must match at least one of the items.
Expand All @@ -169,6 +202,10 @@ attributes:
# This is an optional field.
log_names: [<item1>, ..., <itemN>]

# The metric name must match at least one of the items.
# This is an optional field.
metric_names: [<item1>, ..., <itemN>]

# Attributes specifies the list of attributes to match against.
# All of these attributes must match exactly for a match to occur.
# This is an optional field.
Expand Down
100 changes: 100 additions & 0 deletions processor/attributesprocessor/attributes_metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package attributesprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor"

import (
"context"

"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filtermetric"
)

type metricAttributesProcessor struct {
logger *zap.Logger
attrProc *attraction.AttrProc
include filtermetric.Matcher
exclude filtermetric.Matcher
}

// newMetricAttributesProcessor returns a processor that modifies attributes of a
// metric record. To construct the attributes processors, the use of the factory
// methods are required in order to validate the inputs.
func newMetricAttributesProcessor(logger *zap.Logger, attrProc *attraction.AttrProc, include, exclude filtermetric.Matcher) *metricAttributesProcessor {
return &metricAttributesProcessor{
logger: logger,
attrProc: attrProc,
include: include,
exclude: exclude,
}
}

func (a *metricAttributesProcessor) processMetrics(ctx context.Context, md pdata.Metrics) (pdata.Metrics, error) {
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rs := rms.At(i)
ilms := rs.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ils := ilms.At(j)
metrics := ils.Metrics()
for k := 0; k < metrics.Len(); k++ {
mr := metrics.At(k)
if filtermetric.SkipMetric(a.include, a.exclude, mr, a.logger) {
continue
}

a.processMetricAttributes(ctx, mr)
}
}
}
return md, nil
}

// Attributes are provided for each log and trace, but not at the metric level
// Need to process attributes for every data point within a metric.
func (a *metricAttributesProcessor) processMetricAttributes(ctx context.Context, m pdata.Metric) {

// This is a lot of repeated code, but since there is no single parent superclass
// between metric data types, we can't use polymorphism.
switch m.DataType() {
case pdata.MetricDataTypeGauge:
dps := m.Gauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
a.attrProc.Process(ctx, dps.At(i).Attributes())
}
case pdata.MetricDataTypeSum:
dps := m.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
a.attrProc.Process(ctx, dps.At(i).Attributes())
}
case pdata.MetricDataTypeHistogram:
dps := m.Histogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
a.attrProc.Process(ctx, dps.At(i).Attributes())
}
case pdata.MetricDataTypeExponentialHistogram:
dps := m.ExponentialHistogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
a.attrProc.Process(ctx, dps.At(i).Attributes())
}
case pdata.MetricDataTypeSummary:
dps := m.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
a.attrProc.Process(ctx, dps.At(i).Attributes())
}
}
}
Loading