Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/attributesprocessor] Add a convert action #7930

Merged
merged 28 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions internal/coreinternal/attraction/attraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// Settings specifies the processor settings.
type Settings struct {
// Actions specifies the list of attributes to act on.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT, COERCE}.
// This is a required field.
Actions []ActionKeyValue `mapstructure:"actions"`
}
Expand Down Expand Up @@ -64,6 +64,11 @@ type ActionKeyValue struct {
// If the key has multiple values the values will be joined with `;` separator.
FromContext string `mapstructure:"from_context"`

// TargetType specifies the target type of an attribute to be coerced
// If the key doesn't exist, no action is performed.
// If the value cannot be coerced, the null value will be inserted.
TargetType string `mapstructure:"target_type"`
hughsimpson marked this conversation as resolved.
Show resolved Hide resolved

// Action specifies the type of action to perform.
// The set of values are {INSERT, UPDATE, UPSERT, DELETE, HASH}.
// Both lower case and upper case are supported.
Expand All @@ -85,6 +90,8 @@ type ActionKeyValue struct {
// EXTRACT - Extracts values using a regular expression rule from the input
// 'key' to target keys specified in the 'rule'. If a target key
// already exists, it will be overridden.
// COERCE - coerces the type of an existing attribute, or sets it to the null
// value if the existing value cannot be coerced
// This is a required field.
Action Action `mapstructure:"action"`
}
Expand Down Expand Up @@ -134,12 +141,17 @@ const (
// 'key' to target keys specified in the 'rule'. If a target key already
// exists, it will be overridden.
EXTRACT Action = "extract"

// COERCE coerces the type of an existing attribute, or sets it to the null
// value if the existing value cannot be coerced
COERCE Action = "coerce"
)

type attributeAction struct {
Key string
FromAttribute string
FromContext string
TargetType string
// Compiled regex if provided
Regex *regexp.Regexp
// Attribute names extracted from the regexp's subexpressions.
Expand Down Expand Up @@ -190,7 +202,9 @@ func NewAttrProc(settings *Settings) (*AttrProc, error) {
}
if a.RegexPattern != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use the \"pattern\" field. This must not be specified for %d-th action", a.Action, i)

}
if a.TargetType != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use the \"target_type\" field. This must not be specified for %d-th action", a.Action, i)
}
// Convert the raw value from the configuration to the internal trace representation of the value.
if a.Value != nil {
Expand All @@ -207,13 +221,18 @@ func NewAttrProc(settings *Settings) (*AttrProc, error) {
if valueSourceCount > 0 || a.RegexPattern != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use value sources or \"pattern\" field. These must not be specified for %d-th action", a.Action, i)
}
if a.TargetType != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use the \"target_type\" field. This must not be specified for %d-th action", a.Action, i)
}
case EXTRACT:
if valueSourceCount > 0 {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use a value source field. These must not be specified for %d-th action", a.Action, i)
}
if a.RegexPattern == "" {
return nil, fmt.Errorf("error creating AttrProc due to missing required field \"pattern\" for action \"%s\" at the %d-th action", a.Action, i)

}
if a.TargetType != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use the \"target_type\" field. This must not be specified for %d-th action", a.Action, i)
}
re, err := regexp.Compile(a.RegexPattern)
if err != nil {
Expand All @@ -231,6 +250,20 @@ func NewAttrProc(settings *Settings) (*AttrProc, error) {
}
action.Regex = re
action.AttrNames = attrNames
case COERCE:
if valueSourceCount > 0 || a.RegexPattern != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use value sources or \"pattern\" field. These must not be specified for %d-th action", a.Action, i)
}
switch a.TargetType {
case "string":
hughsimpson marked this conversation as resolved.
Show resolved Hide resolved
case "int":
case "double":
case "":
return nil, fmt.Errorf("error creating AttrProc due to missing required field \"target_type\" for action \"%s\" at the %d-th action", a.Action, i)
default:
return nil, fmt.Errorf("error creating AttrProc due to invalid value \"%s\" in field \"target_type\" for action \"%s\" at the %d-th action", a.TargetType, a.Action, i)
}
action.TargetType = a.TargetType
default:
return nil, fmt.Errorf("error creating AttrProc due to unsupported action %q at the %d-th actions", a.Action, i)
}
Expand Down Expand Up @@ -272,6 +305,8 @@ func (ap *AttrProc) Process(ctx context.Context, attrs pdata.AttributeMap) {
hashAttribute(action, attrs)
case EXTRACT:
extractAttributes(action, attrs)
case COERCE:
coerceAttribute(action, attrs)
}
}
}
Expand Down Expand Up @@ -306,6 +341,12 @@ func hashAttribute(action attributeAction, attrs pdata.AttributeMap) {
}
}

func coerceAttribute(action attributeAction, attrs pdata.AttributeMap) {
if value, exists := attrs.Get(action.Key); exists {
coerceValue(action.TargetType, value)
}
}

func extractAttributes(action attributeAction, attrs pdata.AttributeMap) {
value, found := attrs.Get(action.Key)

Expand Down
69 changes: 69 additions & 0 deletions internal/coreinternal/attraction/coercer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package attraction

import (
"regexp"
"strconv"

"go.opentelemetry.io/collector/model/pdata"
)

var num = regexp.MustCompile(`^(\d+)(?:\.\d+)?$`)
var dub = regexp.MustCompile(`^(\d+(?:\.\d+)?)$`)

func coerceValue(to string, v pdata.AttributeValue) {
switch to {
case "string":
switch v.Type().String() {
case "STRING":
default:
v.SetStringVal(v.AsString())
}
case "int":
switch v.Type().String() {
case "INT":
case "DOUBLE":
v.SetIntVal(int64(v.DoubleVal()))
case "BOOL":
if v.BoolVal() {
v.SetIntVal(1)
} else {
v.SetIntVal(0)
}
case "STRING":
s := v.StringVal()
n := num.FindStringSubmatch(s)
hughsimpson marked this conversation as resolved.
Show resolved Hide resolved
if n != nil {
intVal, _ := strconv.Atoi(n[1])
v.SetIntVal(int64(intVal))
} else {
v.SetIntVal(int64(0))
}
default:
v.SetIntVal(int64(0))
hughsimpson marked this conversation as resolved.
Show resolved Hide resolved
}
case "double":
switch v.Type().String() {
case "INT":
v.SetDoubleVal(float64(v.IntVal()))
case "DOUBLE":
case "BOOL":
if v.BoolVal() {
v.SetDoubleVal(1)
} else {
v.SetDoubleVal(0)
}
case "STRING":
s := v.StringVal()
n := dub.FindStringSubmatch(s)
if n != nil {
hughsimpson marked this conversation as resolved.
Show resolved Hide resolved
dubVal, _ := strconv.ParseFloat(n[1], 64)
v.SetDoubleVal(dubVal)
} else {
v.SetDoubleVal(0)
}
default:
v.SetDoubleVal(0)
}
default: // No-op
}
}
15 changes: 15 additions & 0 deletions processor/attributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ For the `extract` action,

```


For the `coerce` action,
- `key` is required
- `action: coerce` is required.
- `target_type` is required and must be one of int, double or string
```yaml
# Key specifies the attribute to act upon.
- key: <key>
action: coerce
target_type: <type>
```

The list of actions can be composed to create rich scenarios, such as
back filling attribute, copying values to a new key, redacting sensitive information.
The following is a sample configuration.
Expand All @@ -110,6 +122,9 @@ processors:
action: delete
- key: account_email
action: hash
- key: http.status_code
action: coerce
target_type: int

```

Expand Down
103 changes: 103 additions & 0 deletions processor/attributesprocessor/attributes_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,109 @@ func TestAttributes_Hash(t *testing.T) {
}
}

func TestAttributes_Coerce(t *testing.T) {
testCases := []testCase{
{
name: "int to int",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(1),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(1),
},
},
{
name: "true to int",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueBool(true),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(1),
},
},
{
name: "false to int",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueBool(false),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(0),
},
},
{
name: "String to int (good)",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueString("123"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(123),
},
},
{
name: "String to int (bad)",
inputAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueString("int-10"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.int": pdata.NewAttributeValueInt(0),
},
},
{
name: "String to double (int-ish)",
inputAttributes: map[string]pdata.AttributeValue{
"to.double": pdata.NewAttributeValueString("123"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.double": pdata.NewAttributeValueDouble(123),
},
},
{
name: "String to double (double-ish)",
inputAttributes: map[string]pdata.AttributeValue{
"to.double": pdata.NewAttributeValueString("123.6"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.double": pdata.NewAttributeValueDouble(123.6),
},
},
{
name: "String to double (bad)",
inputAttributes: map[string]pdata.AttributeValue{
"to.double": pdata.NewAttributeValueString("int-10"),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.double": pdata.NewAttributeValueDouble(0),
},
},
{
name: "Double to string",
inputAttributes: map[string]pdata.AttributeValue{
"to.string": pdata.NewAttributeValueDouble(99.1),
},
expectedAttributes: map[string]pdata.AttributeValue{
"to.string": pdata.NewAttributeValueString("99.1"),
},
},
}

factory := NewFactory()
cfg := factory.CreateDefaultConfig()
oCfg := cfg.(*Config)
oCfg.Actions = []attraction.ActionKeyValue{
{Key: "to.int", Action: attraction.COERCE, TargetType: "int"},
{Key: "to.double", Action: attraction.COERCE, TargetType: "double"},
{Key: "to.string", Action: attraction.COERCE, TargetType: "string"},
}

tp, err := factory.CreateTracesProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
require.Nil(t, err)
require.NotNil(t, tp)

for _, tt := range testCases {
runIndividualTestCase(t, tt, tp)
}
}

func BenchmarkAttributes_FilterSpansByName(b *testing.B) {
testCases := []testCase{
{
Expand Down
10 changes: 10 additions & 0 deletions processor/attributesprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,14 @@ func TestLoadingConfig(t *testing.T) {
},
})

p11 := cfg.Processors[config.NewComponentIDWithName(typeStr, "coerce")]
assert.Equal(t, p11, &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(typeStr, "coerce")),
Settings: attraction.Settings{
Actions: []attraction.ActionKeyValue{
{Key: "http.status_code", Action: attraction.COERCE, TargetType: "int"},
},
},
})

}
8 changes: 8 additions & 0 deletions processor/attributesprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ func TestFactoryCreateTracesProcessor_InvalidActions(t *testing.T) {
ap, err := factory.CreateTracesProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
assert.Error(t, err)
assert.Nil(t, ap)
// Invalid target type
oCfg.Actions = []attraction.ActionKeyValue{
{Key: "http.status_code", TargetType: "array", Action: attraction.COERCE},
}
ap2, err2 := factory.CreateTracesProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
assert.Error(t, err2)
assert.Equal(t, "error creating \"attributes\" processor: error creating AttrProc due to invalid value \"array\" in field \"target_type\" for action \"coerce\" at the 0-th action of processor attributes", err2.Error())
assert.Nil(t, ap2)
}

func TestFactoryCreateTracesProcessor(t *testing.T) {
Expand Down
9 changes: 8 additions & 1 deletion processor/attributesprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ processors:
attributes/hash:
actions:
- key: user.email
action: hash
action: hash

# The following demonstrates coercing the type of existing attribute values.
attributes/coerce:
actions:
- key: http.status_code
action: coerce
target_type: int


# The following demonstrates excluding spans from this attributes processor.
Expand Down