From 16a9b2b5efe746e1dac6e553a61497748adaeb69 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 17 Jan 2024 11:48:08 -0700 Subject: [PATCH] [pkg/ottl] Add flatten function (#30455) **Description:** Adds a `flatten` function that allows flattening maps. I went with an editor instead of a converter, but I'm open to debate. Using an editor means that a user can do `flatten(body) where IsMap(body)` instead of `set(body, Flatten(body)) where IsMap(body). When using ParseJson you have to do: ``` - merge_maps(cache, ParseJSON(body), "upsert") - flatten(cache) ``` instead of `merge_maps(cache, Flatten(ParseJSON(body)), "upsert")`. Ultimately I went with an editor for similar reasons that `merge_maps` is an editor: chaining too many functions together is messy and updating maps is very fast with pdata. The function supports 2 optional parameters, `prefix` and `depth`. Use `prefix` to add a "namespace" to the values that are being flattened. Use `depth` to prevent trying to flatten maps that are too deep. See the function doc for examples. **Link to tracking Issue:** Closes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29283 **Testing:** Added new unit and e2e tests. Please scrutinize. **Documentation:** Added function doc. --------- Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- .chloggen/ottl-flatten-function.yaml | 27 ++++ pkg/ottl/e2e/e2e_test.go | 68 ++++++++- pkg/ottl/ottlfuncs/README.md | 121 ++++++++++++--- pkg/ottl/ottlfuncs/func_flatten.go | 81 ++++++++++ pkg/ottl/ottlfuncs/func_flatten_test.go | 189 ++++++++++++++++++++++++ pkg/ottl/ottlfuncs/functions.go | 1 + 6 files changed, 469 insertions(+), 18 deletions(-) create mode 100755 .chloggen/ottl-flatten-function.yaml create mode 100644 pkg/ottl/ottlfuncs/func_flatten.go create mode 100644 pkg/ottl/ottlfuncs/func_flatten_test.go diff --git a/.chloggen/ottl-flatten-function.yaml b/.chloggen/ottl-flatten-function.yaml new file mode 100755 index 000000000000..59732a9a4de1 --- /dev/null +++ b/.chloggen/ottl-flatten-function.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/ottl + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `flatten` function for flattening maps + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30455] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/ottl/e2e/e2e_test.go b/pkg/ottl/e2e/e2e_test.go index f7075bdd4d47..57bbedb708fe 100644 --- a/pkg/ottl/e2e/e2e_test.go +++ b/pkg/ottl/e2e/e2e_test.go @@ -48,6 +48,57 @@ func Test_e2e_editors(t *testing.T) { tCtx.GetLogRecord().Attributes().Remove("http.url") }, }, + { + statement: `flatten(attributes)`, + want: func(tCtx ottllog.TransformContext) { + tCtx.GetLogRecord().Attributes().Remove("foo") + tCtx.GetLogRecord().Attributes().PutStr("foo.bar", "pass") + tCtx.GetLogRecord().Attributes().PutStr("foo.flags", "pass") + tCtx.GetLogRecord().Attributes().PutStr("foo.slice.0", "val") + tCtx.GetLogRecord().Attributes().PutStr("foo.nested.test", "pass") + }, + }, + { + statement: `flatten(attributes, "test")`, + want: func(tCtx ottllog.TransformContext) { + m := pcommon.NewMap() + m.PutStr("test.http.method", "get") + m.PutStr("test.http.path", "/health") + m.PutStr("test.http.url", "http://localhost/health") + m.PutStr("test.flags", "A|B|C") + m.PutStr("test.total.string", "123456789") + m.PutStr("test.foo.bar", "pass") + m.PutStr("test.foo.flags", "pass") + m.PutStr("test.foo.bar", "pass") + m.PutStr("test.foo.flags", "pass") + m.PutStr("test.foo.slice.0", "val") + m.PutStr("test.foo.nested.test", "pass") + m.CopyTo(tCtx.GetLogRecord().Attributes()) + }, + }, + { + statement: `flatten(attributes, depth=0)`, + want: func(tCtx ottllog.TransformContext) {}, + }, + { + statement: `flatten(attributes, depth=1)`, + want: func(tCtx ottllog.TransformContext) { + m := pcommon.NewMap() + m.PutStr("http.method", "get") + m.PutStr("http.path", "/health") + m.PutStr("http.url", "http://localhost/health") + m.PutStr("flags", "A|B|C") + m.PutStr("total.string", "123456789") + m.PutStr("foo.bar", "pass") + m.PutStr("foo.flags", "pass") + m.PutStr("foo.bar", "pass") + m.PutStr("foo.flags", "pass") + m.PutStr("foo.slice.0", "val") + m2 := m.PutEmptyMap("foo.nested") + m2.PutStr("test", "pass") + m.CopyTo(tCtx.GetLogRecord().Attributes()) + }, + }, { statement: `keep_keys(attributes, ["flags", "total.string"])`, want: func(tCtx ottllog.TransformContext) { @@ -75,6 +126,11 @@ func Test_e2e_editors(t *testing.T) { statement: `merge_maps(attributes, attributes["foo"], "insert")`, want: func(tCtx ottllog.TransformContext) { tCtx.GetLogRecord().Attributes().PutStr("bar", "pass") + s := tCtx.GetLogRecord().Attributes().PutEmptySlice("slice") + v := s.AppendEmpty() + v.SetStr("val") + m2 := tCtx.GetLogRecord().Attributes().PutEmptyMap("nested") + m2.PutStr("test", "pass") }, }, { @@ -88,6 +144,11 @@ func Test_e2e_editors(t *testing.T) { want: func(tCtx ottllog.TransformContext) { tCtx.GetLogRecord().Attributes().PutStr("bar", "pass") tCtx.GetLogRecord().Attributes().PutStr("flags", "pass") + s := tCtx.GetLogRecord().Attributes().PutEmptySlice("slice") + v := s.AppendEmpty() + v.SetStr("val") + m2 := tCtx.GetLogRecord().Attributes().PutEmptyMap("nested") + m2.PutStr("test", "pass") }, }, { @@ -330,7 +391,7 @@ func Test_e2e_converters(t *testing.T) { { statement: `set(attributes["test"], Len(attributes["foo"]))`, want: func(tCtx ottllog.TransformContext) { - tCtx.GetLogRecord().Attributes().PutInt("test", 2) + tCtx.GetLogRecord().Attributes().PutInt("test", 4) }, }, { @@ -588,6 +649,11 @@ func constructLogTransformContext() ottllog.TransformContext { m := logRecord.Attributes().PutEmptyMap("foo") m.PutStr("bar", "pass") m.PutStr("flags", "pass") + s := m.PutEmptySlice("slice") + v := s.AppendEmpty() + v.SetStr("val") + m2 := m.PutEmptyMap("nested") + m2.PutStr("test", "pass") return ottllog.NewTransformContext(logRecord, scope, resource) } diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index b22aba970849..f4d6923e96f9 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -47,6 +47,7 @@ Available Editors: - [delete_key](#delete_key) - [delete_matching_keys](#delete_matching_keys) +- [flatten](#flatten) - [keep_keys](#keep_keys) - [limit](#limit) - [merge_maps](#merge_maps) @@ -61,9 +62,9 @@ Available Editors: `delete_key(target, key)` -The `delete_key` function removes a key from a `pdata.Map` +The `delete_key` function removes a key from a `pcommon.Map` -`target` is a path expression to a `pdata.Map` type field. `key` is a string that is a key in the map. +`target` is a path expression to a `pcommon.Map` type field. `key` is a string that is a key in the map. The key will be deleted from the map. @@ -78,9 +79,9 @@ Examples: `delete_matching_keys(target, pattern)` -The `delete_matching_keys` function removes all keys from a `pdata.Map` that match a regex pattern. +The `delete_matching_keys` function removes all keys from a `pcommon.Map` that match a regex pattern. -`target` is a path expression to a `pdata.Map` type field. `pattern` is a regex string. +`target` is a path expression to a `pcommon.Map` type field. `pattern` is a regex string. All keys that match the pattern will be deleted from the map. @@ -91,13 +92,99 @@ Examples: - `delete_matching_keys(resource.attributes, "(?i).*password.*")` +### flatten + +`flatten(target, Optional[prefix], Optional[depth])` + +The `flatten` function flattens a `pcommon.Map` by moving items from nested maps to the root. + +`target` is a path expression to a `pcommon.Map` type field. `prefix` is an optional string. `depth` is an optional non-negative int. + +For example, the following map + +```json +{ + "name": "test", + "address": { + "street": "first", + "house": 1234 + }, + "occupants": ["user 1", "user 2"] +} +``` + +is converted to + +```json +{ + "name": "test", + "address.street": "first", + "address.house": 1234, + "occupants.0": "user 1", + "occupants.1": "user 2" +} +``` + +If `prefix` is supplied, it will be appended to the start of the new keys. This can help you namespace the changes. For example, if in the above example a `prefix` of `app` was configured, the result would be + +```json +{ + "app.name": "test", + "app.address.street": "first", + "app.address.house": 1234, + "app.occupants.0": "user 1", + "app.occupants.1": "user 2" +} +``` + +If `depth` is supplied, the function will only flatten nested maps up to that depth. For example, if a `depth` of `2` was configured, the following map + +```json +{ + "0": { + "1": { + "2": { + "3": { + "4": "value" + } + } + } + } +} +``` + +the result would be + +```json +{ + "0.1.2": { + "3": { + "4": "value" + } + } +} +``` + +A `depth` of `0` means that no flattening will occur. + +Examples: + +- `flatten(attributes)` + + +- `flatten(cache, "k8s", 4)` + + +- `flatten(body, depth=2)` + + ### keep_keys `keep_keys(target, keys[])` -The `keep_keys` function removes all keys from the `pdata.Map` that do not match one of the supplied keys. +The `keep_keys` function removes all keys from the `pcommon.Map` that do not match one of the supplied keys. -`target` is a path expression to a `pdata.Map` type field. `keys` is a slice of one or more strings. +`target` is a path expression to a `pcommon.Map` type field. `keys` is a slice of one or more strings. The map will be changed to only contain the keys specified by the list of strings. @@ -112,9 +199,9 @@ Examples: `limit(target, limit, priority_keys[])` -The `limit` function reduces the number of elements in a `pdata.Map` to be no greater than the limit. +The `limit` function reduces the number of elements in a `pcommon.Map` to be no greater than the limit. -`target` is a path expression to a `pdata.Map` type field. `limit` is a non-negative integer. +`target` is a path expression to a `pcommon.Map` type field. `limit` is a non-negative integer. `priority_keys` is a list of strings of attribute keys that won't be dropped during limiting. The number of priority keys must be less than the supplied `limit`. @@ -137,7 +224,7 @@ Examples: The `merge_maps` function merges the source map into the target map using the supplied strategy to handle conflicts. -`target` is a `pdata.Map` type field. `source` is a `pdata.Map` type field. `strategy` is a string that must be one of `insert`, `update`, or `upsert`. +`target` is a `pcommon.Map` type field. `source` is a `pcommon.Map` type field. `strategy` is a string that must be one of `insert`, `update`, or `upsert`. If strategy is: @@ -159,11 +246,11 @@ Examples: ### replace_all_matches -`replace_all_matches(target, pattern, replacement, function)` +`replace_all_matches(target, pattern, replacement, Optional[function])` The `replace_all_matches` function replaces any matching string value with the replacement string. -`target` is a path expression to a `pdata.Map` type field. `pattern` is a string following [filepath.Match syntax](https://pkg.go.dev/path/filepath#Match). `replacement` is either a path expression to a string telemetry field or a literal string. `function` is an optional argument that can take in any Converter that accepts a (`replacement`) string and returns a string. An example is a hash function that replaces any matching string with the hash value of `replacement`. +`target` is a path expression to a `pcommon.Map` type field. `pattern` is a string following [filepath.Match syntax](https://pkg.go.dev/path/filepath#Match). `replacement` is either a path expression to a string telemetry field or a literal string. `function` is an optional argument that can take in any Converter that accepts a (`replacement`) string and returns a string. An example is a hash function that replaces any matching string with the hash value of `replacement`. Each string value in `target` that matches `pattern` will get replaced with `replacement`. Non-string values are ignored. @@ -177,11 +264,11 @@ Examples: ### replace_all_patterns -`replace_all_patterns(target, mode, regex, replacement, function)` +`replace_all_patterns(target, mode, regex, replacement, Optional[function])` The `replace_all_patterns` function replaces any segments in a string value or key that match the regex pattern with the replacement string. -`target` is a path expression to a `pdata.Map` type field. `regex` is a regex string indicating a segment to replace. `replacement` is either a path expression to a string telemetry field or a literal string. +`target` is a path expression to a `pcommon.Map` type field. `regex` is a regex string indicating a segment to replace. `replacement` is either a path expression to a string telemetry field or a literal string. `mode` determines whether the match and replace will occur on the map's value or key. Valid values are `key` and `value`. @@ -208,7 +295,7 @@ If using OTTL outside of collector configuration, `$` should not be escaped and ### replace_match -`replace_match(target, pattern, replacement, function)` +`replace_match(target, pattern, replacement, Optional[function])` The `replace_match` function allows replacing entire strings if they match a glob pattern. @@ -228,7 +315,7 @@ Examples: ### replace_pattern -`replace_pattern(target, regex, replacement, function)` +`replace_pattern(target, regex, replacement, Optional[function])` The `replace_pattern` function allows replacing all string sections that match a regex pattern with a new value. @@ -281,9 +368,9 @@ Examples: `truncate_all(target, limit)` -The `truncate_all` function truncates all string values in a `pdata.Map` so that none are longer than the limit. +The `truncate_all` function truncates all string values in a `pcommon.Map` so that none are longer than the limit. -`target` is a path expression to a `pdata.Map` type field. `limit` is a non-negative integer. +`target` is a path expression to a `pcommon.Map` type field. `limit` is a non-negative integer. The map will be mutated such that the number of characters in all string values is less than or equal to the limit. Non-string values are ignored. diff --git a/pkg/ottl/ottlfuncs/func_flatten.go b/pkg/ottl/ottlfuncs/func_flatten.go new file mode 100644 index 000000000000..44a97b8095bc --- /dev/null +++ b/pkg/ottl/ottlfuncs/func_flatten.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottlfuncs // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs" + +import ( + "context" + "fmt" + "math" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +) + +type FlattenArguments[K any] struct { + Target ottl.PMapGetter[K] + Prefix ottl.Optional[string] + Depth ottl.Optional[int64] +} + +func NewFlattenFactory[K any]() ottl.Factory[K] { + return ottl.NewFactory("flatten", &FlattenArguments[K]{}, createFlattenFunction[K]) +} + +func createFlattenFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[K], error) { + args, ok := oArgs.(*FlattenArguments[K]) + + if !ok { + return nil, fmt.Errorf("FlattenFactory args must be of type *FlattenArguments[K]") + } + + return flatten(args.Target, args.Prefix, args.Depth) +} + +func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.Optional[int64]) (ottl.ExprFunc[K], error) { + depth := int64(math.MaxInt64) + if !d.IsEmpty() { + depth = d.Get() + if depth < 0 { + return nil, fmt.Errorf("invalid depth for flatten function, %d cannot be negative", depth) + } + } + + var prefix string + if !p.IsEmpty() { + prefix = p.Get() + } + + return func(ctx context.Context, tCtx K) (any, error) { + m, err := target.Get(ctx, tCtx) + if err != nil { + return nil, err + } + + result := pcommon.NewMap() + flattenHelper(m, result, prefix, 0, depth) + result.CopyTo(m) + + return nil, nil + }, nil +} + +func flattenHelper(m pcommon.Map, result pcommon.Map, prefix string, currentDepth, maxDepth int64) { + if len(prefix) > 0 { + prefix += "." + } + m.Range(func(k string, v pcommon.Value) bool { + switch { + case v.Type() == pcommon.ValueTypeMap && currentDepth < maxDepth: + flattenHelper(v.Map(), result, prefix+k, currentDepth+1, maxDepth) + case v.Type() == pcommon.ValueTypeSlice: + for i := 0; i < v.Slice().Len(); i++ { + v.Slice().At(i).CopyTo(result.PutEmpty(fmt.Sprintf("%v.%v", prefix+k, i))) + } + default: + v.CopyTo(result.PutEmpty(prefix + k)) + } + return true + }) +} diff --git a/pkg/ottl/ottlfuncs/func_flatten_test.go b/pkg/ottl/ottlfuncs/func_flatten_test.go new file mode 100644 index 000000000000..4ac1dd9ab2bd --- /dev/null +++ b/pkg/ottl/ottlfuncs/func_flatten_test.go @@ -0,0 +1,189 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottlfuncs + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +) + +func Test_flatten(t *testing.T) { + tests := []struct { + name string + target map[string]any + prefix ottl.Optional[string] + depth ottl.Optional[int64] + expected map[string]any + }{ + { + name: "simple", + target: map[string]any{ + "name": "test", + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "name": "test", + }, + }, + { + name: "nested map", + target: map[string]any{ + "address": map[string]any{ + "street": "first", + "house": int64(1234), + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "address.street": "first", + "address.house": int64(1234), + }, + }, + { + name: "nested slice", + target: map[string]any{ + "occupants": []any{ + "user 1", + "user 2", + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "occupants.0": "user 1", + "occupants.1": "user 2", + }, + }, + { + name: "combination", + target: map[string]any{ + "name": "test", + "address": map[string]any{ + "street": "first", + "house": int64(1234), + }, + "occupants": []any{ + "user 1", + "user 2", + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "name": "test", + "address.street": "first", + "address.house": int64(1234), + "occupants.0": "user 1", + "occupants.1": "user 2", + }, + }, + { + name: "deep nesting", + target: map[string]any{ + "1": map[string]any{ + "2": map[string]any{ + "3": map[string]any{ + "4": "5", + }, + }, + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "1.2.3.4": "5", + }, + }, + { + name: "use prefix", + target: map[string]any{ + "name": "test", + "address": map[string]any{ + "street": "first", + "house": int64(1234), + }, + "occupants": []any{ + "user 1", + "user 2", + }, + }, + prefix: ottl.NewTestingOptional[string]("app"), + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "app.name": "test", + "app.address.street": "first", + "app.address.house": int64(1234), + "app.occupants.0": "user 1", + "app.occupants.1": "user 2", + }, + }, + { + name: "max depth", + target: map[string]any{ + "0": map[string]any{ + "1": map[string]any{ + "2": map[string]any{ + "3": "value", + }, + }, + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.NewTestingOptional[int64](2), + expected: map[string]any{ + "0.1.2": map[string]any{ + "3": "value", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + m := pcommon.NewMap() + err := m.FromRaw(tt.target) + assert.NoError(t, err) + target := ottl.StandardPMapGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return m, nil + }, + } + + exprFunc, err := flatten[any](target, tt.prefix, tt.depth) + assert.NoError(t, err) + _, err = exprFunc(nil, nil) + assert.NoError(t, err) + + assert.Equal(t, tt.expected, m.AsRaw()) + }) + } +} +func Test_flatten_bad_target(t *testing.T) { + target := &ottl.StandardPMapGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return 1, nil + }, + } + exprFunc, err := flatten[any](target, ottl.Optional[string]{}, ottl.Optional[int64]{}) + assert.NoError(t, err) + _, err = exprFunc(nil, nil) + assert.Error(t, err) +} + +func Test_flatten_bad_depth(t *testing.T) { + target := &ottl.StandardPMapGetter[any]{ + Getter: func(ctx context.Context, tCtx any) (any, error) { + return pcommon.NewMap(), nil + }, + } + _, err := flatten[any](target, ottl.Optional[string]{}, ottl.NewTestingOptional[int64](-1)) + assert.Error(t, err) +} diff --git a/pkg/ottl/ottlfuncs/functions.go b/pkg/ottl/ottlfuncs/functions.go index dd794e856054..3e498549a58c 100644 --- a/pkg/ottl/ottlfuncs/functions.go +++ b/pkg/ottl/ottlfuncs/functions.go @@ -12,6 +12,7 @@ func StandardFuncs[K any]() map[string]ottl.Factory[K] { // Editors NewDeleteKeyFactory[K](), NewDeleteMatchingKeysFactory[K](), + NewFlattenFactory[K](), NewKeepKeysFactory[K](), NewLimitFactory[K](), NewMergeMapsFactory[K](),