diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 01e5a8f9b4b2..8dc4ebbd877f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Allow loading secrets that contain commas from the keystore {pull}31694{pull}. - Fix Windows service timeouts when the "TCP/IP NetBIOS Helper" service is disabled. {issue}31810[31810] {pull}31835[31835] +- Expand fields in `decode_json_fields` if target is set. {issue}31712[31712] {pull}32010[32010] *Auditbeat* diff --git a/libbeat/common/jsontransform/expand.go b/libbeat/common/jsontransform/expand.go index 5f5bc0cafcf1..c026343a3b04 100644 --- a/libbeat/common/jsontransform/expand.go +++ b/libbeat/common/jsontransform/expand.go @@ -21,11 +21,23 @@ import ( "fmt" "strings" - "github.com/pkg/errors" - + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) +// ExpandFields de-dots the keys in m by expanding them in-place into a +// nested object structure, merging objects as necessary. If there are any +// conflicts (i.e. a common prefix where one field is an object and another +// is a non-object), an error key is added to the event if add_error_key +// is enabled. +func ExpandFields(logger *logp.Logger, event *beat.Event, m mapstr.M, addErrorKey bool) { + if err := expandFields(m); err != nil { + logger.Errorf("JSON: failed to expand fields: %s", err) + event.SetErrorWithOption(createJSONError(err.Error()), addErrorKey) + } +} + // expandFields de-dots the keys in m by expanding them in-place into a // nested object structure, merging objects as necessary. If there are any // conflicts (i.e. a common prefix where one field is an object and another @@ -38,7 +50,7 @@ func expandFields(m mapstr.M) error { newMap, newIsMap := getMap(v) if newIsMap { if err := expandFields(newMap); err != nil { - return errors.Wrapf(err, "error expanding %q", k) + return fmt.Errorf("error expanding %q: %w", k, err) } } if dot := strings.IndexRune(k, '.'); dot < 0 { @@ -55,7 +67,7 @@ func expandFields(m mapstr.M) error { old, err := m.Put(k, v) if err != nil { // Put will return an error if we attempt to insert into a non-object value. - return fmt.Errorf("cannot expand %q: found conflicting key", k) + return fmt.Errorf("cannot expand %q: found conflicting key: %w", k, err) } if old == nil { continue @@ -68,7 +80,7 @@ func expandFields(m mapstr.M) error { return fmt.Errorf("cannot expand %q: found conflicting key", k) } if err := mergeObjects(newMap, oldMap); err != nil { - return errors.Wrapf(err, "cannot expand %q", k) + return fmt.Errorf("cannot expand %q: %w", k, err) } } } @@ -97,7 +109,7 @@ func mergeObjects(lhs, rhs mapstr.M) error { return fmt.Errorf("cannot merge %q: found (%T) value", k, rhsValue) } if err := mergeObjects(lhsMap, rhsMap); err != nil { - return errors.Wrapf(err, "cannot merge %q", k) + return fmt.Errorf("cannot merge %q: %w", k, err) } } return nil diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 4bbea0272132..0b0cb5df6de3 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -19,12 +19,11 @@ package actions import ( "encoding/json" + "errors" "fmt" "io" "strings" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common/jsontransform" @@ -84,7 +83,7 @@ func NewDecodeJSONFields(c *cfg.C) (processors.Processor, error) { err := c.Unpack(&config) if err != nil { logger.Warn("Error unpacking config for decode_json_fields") - return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err) + return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %w", err) } f := &decodeJSONFields{ @@ -142,13 +141,21 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { if tmp, err := mapstr.M(dict).GetValue(key); err == nil { if v, ok := tmp.(string); ok { id = v - mapstr.M(dict).Delete(key) + _ = mapstr.M(dict).Delete(key) } } } } if target != "" { + if f.expandKeys { + switch t := output.(type) { + case map[string]interface{}: + jsontransform.ExpandFields(f.logger, event, t, f.addErrorKey) + default: + errs = append(errs, "failed to expand keys") + } + } _, err = event.PutValue(target, output) } else { switch t := output.(type) { @@ -200,14 +207,14 @@ func unmarshal(maxDepth int, text string, fields *interface{}, processArray bool var tmp interface{} err := unmarshal(maxDepth, str, &tmp, processArray) if err != nil { - return v, err == errProcessingSkipped + return v, errors.Is(err, errProcessingSkipped) } return tmp, true } // try to deep unmarshal fields - switch O := interface{}(*fields).(type) { + switch O := (*fields).(type) { case map[string]interface{}: for k, v := range O { if decoded, ok := tryUnmarshal(v); ok { @@ -242,11 +249,11 @@ func decodeJSON(text string, to *interface{}) error { return errors.New("multiple json elements found") } - if _, err := dec.Token(); err != nil && err != io.EOF { + if _, err := dec.Token(); err != nil && !errors.Is(err, io.EOF) { return err } - switch O := interface{}(*to).(type) { + switch O := (*to).(type) { case map[string]interface{}: jsontransform.TransformNumbers(O) } diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index bae215b7d49d..ba0e7fda347f 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -485,34 +485,60 @@ func TestExpandKeys(t *testing.T) { assert.Equal(t, expected, actual) } -func TestExpandKeysError(t *testing.T) { +func TestExpandKeysWithTarget(t *testing.T) { testConfig := conf.MustNewConfigFrom(map[string]interface{}{ - "fields": fields, - "expand_keys": true, - "add_error_key": true, - "target": "", + "fields": fields, + "expand_keys": true, + "target": "my_target", }) - input := mapstr.M{"msg": `{"a.b": "c", "a.b.c": "d"}`} + input := mapstr.M{"msg": `{"a.b": {"c": "c"}, "a.b.d": "d"}`} expected := mapstr.M{ - "msg": `{"a.b": "c", "a.b.c": "d"}`, - "error": mapstr.M{ - "message": "cannot expand ...", - "type": "json", + "msg": `{"a.b": {"c": "c"}, "a.b.d": "d"}`, + "my_target": map[string]interface{}{ + "a": mapstr.M{ + "b": map[string]interface{}{ + "c": "c", + "d": "d", + }, + }, }, } - actual := getActualValue(t, testConfig, input) - assert.Contains(t, actual, "error") - errorField := actual["error"].(mapstr.M) - assert.Contains(t, errorField, "message") - - // The order in which keys are processed is not defined, so the error - // message is not defined. Apart from that, the outcome is the same. - assert.Regexp(t, `cannot expand ".*": .*`, errorField["message"]) - errorField["message"] = "cannot expand ..." assert.Equal(t, expected, actual) } +func TestExpandKeysError(t *testing.T) { + for _, target := range []string{"", "my_target"} { + t.Run(fmt.Sprintf("target set to '%s'", target), func(t *testing.T) { + testConfig := conf.MustNewConfigFrom(map[string]interface{}{ + "fields": fields, + "expand_keys": true, + "add_error_key": true, + "target": "", + }) + input := mapstr.M{"msg": `{"a.b": "c", "a.b.c": "d"}`} + expected := mapstr.M{ + "msg": `{"a.b": "c", "a.b.c": "d"}`, + "error": mapstr.M{ + "message": "cannot expand ...", + "type": "json", + }, + } + + actual := getActualValue(t, testConfig, input) + assert.Contains(t, actual, "error") + errorField := actual["error"].(mapstr.M) + assert.Contains(t, errorField, "message") + + // The order in which keys are processed is not defined, so the error + // message is not defined. Apart from that, the outcome is the same. + assert.Regexp(t, `cannot expand ".*": .*`, errorField["message"]) + errorField["message"] = "cannot expand ..." + assert.Equal(t, expected, actual) + }) + } +} + func TestOverwriteMetadata(t *testing.T) { testConfig := conf.MustNewConfigFrom(map[string]interface{}{ "fields": fields,