Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/attributesprocessor] Add a convert action #7930

Merged
merged 28 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### 💡 Enhancements 💡

- `attributesprocessor`: Add convert action (#7930)
- `attributesprocessor`: Add metric support (#8111)
- `prometheusremotewriteexporter`: Write-Ahead Log support enabled (#7304)
- `hostreceiver/filesystemscraper`: Add filesystem utilization (#8027)
Expand Down
45 changes: 42 additions & 3 deletions internal/coreinternal/attraction/attraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// Settings specifies the processor settings.
type Settings struct {
// Actions specifies the list of attributes to act on.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT, CONVERT}.
// This is a required field.
Actions []ActionKeyValue `mapstructure:"actions"`
}
Expand Down Expand Up @@ -64,6 +64,11 @@ type ActionKeyValue struct {
// If the key has multiple values the values will be joined with `;` separator.
FromContext string `mapstructure:"from_context"`

// ConvertedType specifies the target type of an attribute to be converted
// If the key doesn't exist, no action is performed.
// If the value cannot be converted, the original value will be left as-is
ConvertedType string `mapstructure:"converted_type"`

// Action specifies the type of action to perform.
// The set of values are {INSERT, UPDATE, UPSERT, DELETE, HASH}.
// Both lower case and upper case are supported.
Expand All @@ -85,6 +90,7 @@ type ActionKeyValue struct {
// EXTRACT - Extracts values using a regular expression rule from the input
// 'key' to target keys specified in the 'rule'. If a target key
// already exists, it will be overridden.
// CONVERT - converts the type of an existing attribute, if convertable
// This is a required field.
Action Action `mapstructure:"action"`
}
Expand Down Expand Up @@ -134,12 +140,16 @@ const (
// 'key' to target keys specified in the 'rule'. If a target key already
// exists, it will be overridden.
EXTRACT Action = "extract"

// CONVERT converts the type of an existing attribute, if convertable
CONVERT Action = "convert"
)

type attributeAction struct {
Key string
FromAttribute string
FromContext string
ConvertedType string
// Compiled regex if provided
Regex *regexp.Regexp
// Attribute names extracted from the regexp's subexpressions.
Expand Down Expand Up @@ -190,7 +200,9 @@ func NewAttrProc(settings *Settings) (*AttrProc, error) {
}
if a.RegexPattern != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use the \"pattern\" field. This must not be specified for %d-th action", a.Action, i)

}
if a.ConvertedType != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use the \"converted_type\" field. This must not be specified for %d-th action", a.Action, i)
}
// Convert the raw value from the configuration to the internal trace representation of the value.
if a.Value != nil {
Expand All @@ -207,13 +219,18 @@ func NewAttrProc(settings *Settings) (*AttrProc, error) {
if valueSourceCount > 0 || a.RegexPattern != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use value sources or \"pattern\" field. These must not be specified for %d-th action", a.Action, i)
}
if a.ConvertedType != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use the \"converted_type\" field. This must not be specified for %d-th action", a.Action, i)
}
case EXTRACT:
if valueSourceCount > 0 {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use a value source field. These must not be specified for %d-th action", a.Action, i)
}
if a.RegexPattern == "" {
return nil, fmt.Errorf("error creating AttrProc due to missing required field \"pattern\" for action \"%s\" at the %d-th action", a.Action, i)

}
if a.ConvertedType != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use the \"converted_type\" field. This must not be specified for %d-th action", a.Action, i)
}
re, err := regexp.Compile(a.RegexPattern)
if err != nil {
Expand All @@ -231,6 +248,20 @@ func NewAttrProc(settings *Settings) (*AttrProc, error) {
}
action.Regex = re
action.AttrNames = attrNames
case CONVERT:
if valueSourceCount > 0 || a.RegexPattern != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use value sources or \"pattern\" field. These must not be specified for %d-th action", a.Action, i)
}
switch a.ConvertedType {
case stringConversionTarget:
case intConversionTarget:
case doubleConversionTarget:
case "":
return nil, fmt.Errorf("error creating AttrProc due to missing required field \"converted_type\" for action \"%s\" at the %d-th action", a.Action, i)
default:
return nil, fmt.Errorf("error creating AttrProc due to invalid value \"%s\" in field \"converted_type\" for action \"%s\" at the %d-th action", a.ConvertedType, a.Action, i)
}
action.ConvertedType = a.ConvertedType
default:
return nil, fmt.Errorf("error creating AttrProc due to unsupported action %q at the %d-th actions", a.Action, i)
}
Expand Down Expand Up @@ -272,6 +303,8 @@ func (ap *AttrProc) Process(ctx context.Context, attrs pdata.AttributeMap) {
hashAttribute(action, attrs)
case EXTRACT:
extractAttributes(action, attrs)
case CONVERT:
convertAttribute(action, attrs)
}
}
}
Expand Down Expand Up @@ -306,6 +339,12 @@ func hashAttribute(action attributeAction, attrs pdata.AttributeMap) {
}
}

func convertAttribute(action attributeAction, attrs pdata.AttributeMap) {
if value, exists := attrs.Get(action.Key); exists {
convertValue(action.ConvertedType, value)
}
}

func extractAttributes(action attributeAction, attrs pdata.AttributeMap) {
value, found := attrs.Get(action.Key)

Expand Down
77 changes: 77 additions & 0 deletions internal/coreinternal/attraction/type_converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package attraction // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction"

import (
"strconv"

"go.opentelemetry.io/collector/model/pdata"
)

const (
stringConversionTarget = "string"
intConversionTarget = "int"
doubleConversionTarget = "double"
)

func convertValue(to string, v pdata.AttributeValue) {
switch to {
case stringConversionTarget:
switch v.Type() {
case pdata.AttributeValueTypeString:
default:
v.SetStringVal(v.AsString())
}
case intConversionTarget:
switch v.Type() {
case pdata.AttributeValueTypeInt:
case pdata.AttributeValueTypeDouble:
v.SetIntVal(int64(v.DoubleVal()))
case pdata.AttributeValueTypeBool:
if v.BoolVal() {
v.SetIntVal(1)
} else {
v.SetIntVal(0)
}
case pdata.AttributeValueTypeString:
s := v.StringVal()
n, err := strconv.ParseInt(s, 10, 64)
if err == nil {
v.SetIntVal(n)
} // else leave original value
default: // leave original value
hughsimpson marked this conversation as resolved.
Show resolved Hide resolved
hughsimpson marked this conversation as resolved.
Show resolved Hide resolved
}
case doubleConversionTarget:
switch v.Type() {
case pdata.AttributeValueTypeInt:
v.SetDoubleVal(float64(v.IntVal()))
case pdata.AttributeValueTypeDouble:
case pdata.AttributeValueTypeBool:
if v.BoolVal() {
v.SetDoubleVal(1)
} else {
v.SetDoubleVal(0)
}
case pdata.AttributeValueTypeString:
s := v.StringVal()
n, err := strconv.ParseFloat(s, 64)
if err == nil {
v.SetDoubleVal(n)
} // else leave original value
default: // leave original value
hughsimpson marked this conversation as resolved.
Show resolved Hide resolved
}
default: // No-op
}
}
16 changes: 16 additions & 0 deletions processor/attributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The supported actions are:
to target keys specified in the rule. If a target key already exists, it will
be overridden. Note: It behaves similar to the Span Processor `to_attributes`
setting with the existing attribute as the source.
- `convert`: Converts an existing attribute to a specified type.

For the actions `insert`, `update` and `upsert`,
- `key` is required
Expand Down Expand Up @@ -88,6 +89,18 @@ For the `extract` action,

```


For the `convert` action,
hughsimpson marked this conversation as resolved.
Show resolved Hide resolved
- `key` is required
- `action: convert` is required.
- `converted_type` is required and must be one of int, double or string
```yaml
# Key specifies the attribute to act upon.
- key: <key>
action: convert
converted_type: <type>
hughsimpson marked this conversation as resolved.
Show resolved Hide resolved
```

The list of actions can be composed to create rich scenarios, such as
back filling attribute, copying values to a new key, redacting sensitive information.
The following is a sample configuration.
Expand All @@ -110,6 +123,9 @@ processors:
action: delete
- key: account_email
action: hash
- key: http.status_code
action: convert
converted_type: int

```

Expand Down
76 changes: 76 additions & 0 deletions processor/attributesprocessor/attributes_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,82 @@ func TestLogAttributes_Hash(t *testing.T) {
}
}

func TestLogAttributes_Convert(t *testing.T) {
testCases := []logTestCase{
{
name: "int to int",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(1),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(1),
},
},
{
name: "false to int",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueBool(false),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(0),
},
},
{
name: "String to int (good)",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueString("123"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(123),
},
},
{
name: "String to int (bad)",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueString("int-10"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueString("int-10"),
},
},
{
name: "String to double",
inputAttributes: map[string]pdata.AttributeValue{
"to.double": pdata.NewAttributeValueString("123.6"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.double": pdata.NewAttributeValueDouble(123.6),
},
},
{
name: "Double to string",
inputAttributes: map[string]pdata.AttributeValue{
"to.string": pdata.NewAttributeValueDouble(99.1),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.string": pdata.NewAttributeValueString("99.1"),
},
},
}

factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Actions = []attraction.ActionKeyValue{
{Key: "to.int", Action: attraction.CONVERT, ConvertedType: "int"},
{Key: "to.double", Action: attraction.CONVERT, ConvertedType: "double"},
{Key: "to.string", Action: attraction.CONVERT, ConvertedType: "string"},
}

tp, err := factory.CreateLogsProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.Nil(t, err)
require.NotNil(t, tp)

for _, tt := range testCases {
runIndividualLogTestCase(t, tt, tp)
}
}

func BenchmarkAttributes_FilterLogsByName(b *testing.B) {
testCases := []logTestCase{
{
Expand Down
57 changes: 57 additions & 0 deletions processor/attributesprocessor/attributes_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,63 @@ func TestMetricAttributes_Hash(t *testing.T) {
runIndividualMetricTestCase(t, tc, mp)
}
}
func TestMetricAttributes_Convert(t *testing.T) {
testCases := []metricTestCase{
{
name: "String to int (good)",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueString("123"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(123),
},
},
{
name: "String to int (bad)",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueString("int-10"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueString("int-10"),
},
},
{
name: "String to double",
inputAttributes: map[string]pdata.AttributeValue{
"to.double": pdata.NewAttributeValueString("3.141e2"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.double": pdata.NewAttributeValueDouble(314.1),
},
},
{
name: "Double to string",
inputAttributes: map[string]pdata.AttributeValue{
"to.string": pdata.NewAttributeValueDouble(99.1),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.string": pdata.NewAttributeValueString("99.1"),
},
},
}

factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Actions = []attraction.ActionKeyValue{
{Key: "to.int", Action: attraction.CONVERT, ConvertedType: "int"},
{Key: "to.double", Action: attraction.CONVERT, ConvertedType: "double"},
{Key: "to.string", Action: attraction.CONVERT, ConvertedType: "string"},
}

tp, err := factory.CreateMetricsProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.Nil(t, err)
require.NotNil(t, tp)

for _, tt := range testCases {
runIndividualMetricTestCase(t, tt, tp)
}
}

func BenchmarkAttributes_FilterMetricsByName(b *testing.B) {
testCases := []metricTestCase{
Expand Down
Loading