Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'expand_keys' option to JSON input/processor #22849

Merged
merged 11 commits into from
Dec 14, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for ephemeral containers in kubernetes autodiscover and `add_kubernetes_metadata`. {pull}22389[22389] {pull}22439[22439]
- Added support for wildcard fields and keyword fallback in beats setup commands. {pull}22521[22521]
- Fix polling node when it is not ready and monitor by hostname {pull}22666[22666]
- Add `expand_keys` option to `decode_json_fields` processor and `json` input, to recusively de-dot and expand json keys into hierarchical object structures {pull}22849[22849]
- Update k8s client and release k8s leader lock gracefully {pull}22919[22919]
- Improve equals check. {pull}22778[22778]
- Added "detect_mime_type" processor for detecting mime types {pull}22940[22940]
Expand Down
5 changes: 5 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ filebeat.inputs:
# in case of conflicts.
#json.overwrite_keys: false

# If this setting is enabled, then keys in the decoded JSON object will be recursively
# de-dotted, and expanded into a hierarchical object structure.
# For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
#json.expand_keys: false

# If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON
# unmarshaling errors or when a text key is defined in the configuration but cannot
# be used.
Expand Down
7 changes: 6 additions & 1 deletion filebeat/docs/inputs/input-common-harvester-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ level in the output document. The default is false.
values from the decoded JSON object overwrite the fields that {beatname_uc}
normally adds (type, source, offset, etc.) in case of conflicts.

*`expand_keys`*:: If this setting is enabled, {beatname_uc} will recursively
de-dot keys in the decoded JSON, and expand them into a hierarchical object
structure. For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
This setting should be enabled when the input is produced by an
https://github.com/elastic/ecs-logging[ECS logger].

*`add_error_key`*:: If this setting is enabled, {beatname_uc} adds a
"error.message" and "error.type: json" key in case of JSON unmarshalling errors
or when a `message_key` is defined in the configuration but cannot be used.
Expand All @@ -206,4 +212,3 @@ Options that control how {beatname_uc} deals with log messages that span
multiple lines. See <<multiline-examples>> for more information about
configuring multiline options.


5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ filebeat.inputs:
# in case of conflicts.
#json.overwrite_keys: false

# If this setting is enabled, then keys in the decoded JSON object will be recursively
# de-dotted, and expanded into a hierarchical object structure.
# For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
#json.expand_keys: false

# If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON
# unmarshaling errors or when a text key is defined in the configuration but cannot
# be used.
Expand Down
114 changes: 114 additions & 0 deletions libbeat/common/jsontransform/expand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jsontransform

import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
)

// expandFields de-dots the keys in m by expanding them in-place into a
// nested object structure, merging objects as necessary. If there are any
// conflicts (i.e. a common prefix where one field is an object and another
// is a non-object), an error will be returned.
//
// Note that expandFields is destructive, and in the case of an error the
// map may be left in a semi-expanded state.
func expandFields(m common.MapStr) error {
for k, v := range m {
newMap, newIsMap := getMap(v)
if newIsMap {
if err := expandFields(newMap); err != nil {
return errors.Wrapf(err, "error expanding %q", k)
}
}
if dot := strings.IndexRune(k, '.'); dot < 0 {
continue
}

// Delete the dotted key.
delete(m, k)

// Put expands k, returning the original value if any.
//
// If v is a map then we will merge with an existing map if any,
// otherwise there must not be an existing value.
old, err := m.Put(k, v)
if err != nil {
// Put will return an error if we attempt to insert into a non-object value.
return fmt.Errorf("cannot expand %q: found conflicting key", k)
}
if old == nil {
continue
}
if !newIsMap {
return fmt.Errorf("cannot expand %q: found existing (%T) value", k, old)
} else {
oldMap, oldIsMap := getMap(old)
if !oldIsMap {
return fmt.Errorf("cannot expand %q: found conflicting key", k)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to happen on type conflict only. I think we have similar cases in metricbeat. In that case we modify the key for old to be <k>.value. If the new object has a field named value we can drop old (because it is overwritten).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is effectively the same as #22849 (comment) - or is this something else?

The intended behaviour is to recursively merge objects, returning an error if there are two matching keys which either both have scalar values, or with one having a scalar value and one having an object value. This is intentionally strict for the first implementation; we could later either relax by default, or add options to relax.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentionally strict for the first implementation; we could later either relax by default, or add options to relax.

I'm ok if we follow up with this one later on.

Yeah, the two comments belong together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened #23135 to track this.

}
if err := mergeObjects(newMap, oldMap); err != nil {
axw marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrapf(err, "cannot expand %q", k)
}
}
}
return nil
}

// mergeObjects deep merges the elements of rhs into lhs.
//
// mergeObjects will recursively combine the entries of
// objects with the same key in each object. If there exist
// two entries with the same key in each object which
// are not both objects, then an error will result.
func mergeObjects(lhs, rhs common.MapStr) error {
for k, rhsValue := range rhs {
lhsValue, ok := lhs[k]
if !ok {
lhs[k] = rhsValue
continue
}
lhsMap, ok := getMap(lhsValue)
if !ok {
return fmt.Errorf("cannot merge %q: found (%T) value", k, lhsValue)
}
rhsMap, ok := getMap(rhsValue)
if !ok {
return fmt.Errorf("cannot merge %q: found (%T) value", k, rhsValue)
}
axw marked this conversation as resolved.
Show resolved Hide resolved
if err := mergeObjects(lhsMap, rhsMap); err != nil {
return errors.Wrapf(err, "cannot merge %q", k)
}
}
return nil
}

func getMap(v interface{}) (map[string]interface{}, bool) {
switch v := v.(type) {
case map[string]interface{}:
return v, true
case common.MapStr:
return v, true
}
return nil, false
}
133 changes: 133 additions & 0 deletions libbeat/common/jsontransform/expand_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jsontransform

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
)

func TestExpand(t *testing.T) {
type data struct {
Event common.MapStr
Expected common.MapStr
Err string
}
tests := []data{
{
Event: common.MapStr{
"hello.world": 15,
},
Expected: common.MapStr{
"hello": common.MapStr{
"world": 15,
},
},
},
{
Event: common.MapStr{
"test": 15,
},
Expected: common.MapStr{
"test": 15,
},
},
{
Event: common.MapStr{
"test": 15,
"hello.there": 1,
"hello.world.ok": "test",
"elastic.for": "search",
},
Expected: common.MapStr{
"test": 15,
"hello": common.MapStr{
"there": 1,
"world": common.MapStr{
"ok": "test",
},
},
"elastic": common.MapStr{
"for": "search",
},
},
},
{
Event: common.MapStr{
"root": common.MapStr{
"ok": 1,
},
"root.shared": "yes",
"root.one.two.three": 4,
},
Expected: common.MapStr{
"root": common.MapStr{
"ok": 1,
"shared": "yes",
"one": common.MapStr{"two": common.MapStr{"three": 4}},
},
},
},
{
Event: common.MapStr{
"root": common.MapStr{
"seven": 1,
},
"root.seven.eight": 2,
},
Err: `cannot expand .*`,
},
{
Event: common.MapStr{
"a.b": 1,
"a": common.MapStr{
"b": 2,
},
},
Err: `cannot expand .*`,
},
{
Event: common.MapStr{
"a.b": common.MapStr{
"c": common.MapStr{
"d": 1,
},
},
"a.b.c": common.MapStr{
"d": 2,
},
},
Err: `cannot expand .*`,
},
}

for _, test := range tests {
err := expandFields(test.Event)
if test.Err != "" {
require.Error(t, err)
assert.Regexp(t, test.Err, err.Error())
continue
}
require.NoError(t, err)
assert.Equal(t, test.Expected, test.Event)
}
}
9 changes: 8 additions & 1 deletion libbeat/common/jsontransform/jsonhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ import (
)

// WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys bool, addErrKey bool) {
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, overwriteKeys, addErrKey bool) {
logger := logp.NewLogger("jsonhelper")
if expandKeys {
if err := expandFields(keys); err != nil {
logger.Errorf("JSON: failed to expand fields: %s", err)
event.SetErrorWithOption(createJSONError(err.Error()), addErrKey)
return
}
}
if !overwriteKeys {
// @timestamp and @metadata fields are root-level fields. We remove them so they
// don't become part of event.Fields.
Expand Down
42 changes: 41 additions & 1 deletion libbeat/common/jsontransform/jsonhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestWriteJSONKeys(t *testing.T) {

tests := map[string]struct {
keys map[string]interface{}
expandKeys bool
overwriteKeys bool
expectedMetadata common.MapStr
expectedTimestamp time.Time
Expand Down Expand Up @@ -117,6 +118,45 @@ func TestWriteJSONKeys(t *testing.T) {
"top_c": "COMPLETELY_NEW_c",
},
},
"expand_true": {
expandKeys: true,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": common.MapStr{
"inner_e": "COMPLETELY_NEW_e",
},
},
},
},
"expand_false": {
expandKeys: false,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": "dee",
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
},
}

for name, test := range tests {
Expand All @@ -127,7 +167,7 @@ func TestWriteJSONKeys(t *testing.T) {
Fields: eventFields.Clone(),
}

WriteJSONKeys(event, test.keys, test.overwriteKeys, false)
WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, false)
require.Equal(t, test.expectedMetadata, event.Meta)
require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano())
require.Equal(t, test.expectedFields, event.Fields)
Expand Down
Loading