From 43ca900fded5df0c8e4f7d43667a1c8fe0dd9ed6 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 23 Sep 2020 15:41:57 +0200 Subject: [PATCH] Add recursive split to httpjson (#21214) --- x-pack/filebeat/input/httpjson/requester.go | 31 ++++--- .../filebeat/input/httpjson/requester_test.go | 86 +++++++++++++++++++ 2 files changed, 107 insertions(+), 10 deletions(-) create mode 100644 x-pack/filebeat/input/httpjson/requester_test.go diff --git a/x-pack/filebeat/input/httpjson/requester.go b/x-pack/filebeat/input/httpjson/requester.go index 579e5e267564..b5f58179aa0b 100644 --- a/x-pack/filebeat/input/httpjson/requester.go +++ b/x-pack/filebeat/input/httpjson/requester.go @@ -12,6 +12,7 @@ import ( "io" "io/ioutil" "net/http" + "strings" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/common" @@ -214,7 +215,7 @@ func (r *requester) processEventArray(publisher stateless.Publisher, events []in for _, t := range events { switch v := t.(type) { case map[string]interface{}: - for _, e := range r.splitEvent(v) { + for _, e := range splitEvent(r.splitEventsBy, v) { last = e d, err := json.Marshal(e) if err != nil { @@ -229,15 +230,23 @@ func (r *requester) processEventArray(publisher stateless.Publisher, events []in return last, nil } -func (r *requester) splitEvent(event map[string]interface{}) []map[string]interface{} { +func splitEvent(splitKey string, event map[string]interface{}) []map[string]interface{} { m := common.MapStr(event) - hasSplitKey, _ := m.HasKey(r.splitEventsBy) - if r.splitEventsBy == "" || !hasSplitKey { + // NOTE: this notation is only used internally, not meant to be documented + // and will be removed in the next release + keys := strings.SplitN(splitKey, "..", 2) + if len(keys) < 2 { + // we append an empty key to force the recursive call + keys = append(keys, "") + } + + hasSplitKey, _ := m.HasKey(keys[0]) + if keys[0] == "" || !hasSplitKey { return []map[string]interface{}{event} } - splitOnIfc, _ := m.GetValue(r.splitEventsBy) + splitOnIfc, _ := m.GetValue(keys[0]) splitOn, ok := splitOnIfc.([]interface{}) // if not an array or is empty, we do nothing if !ok || len(splitOn) == 0 { @@ -252,12 +261,14 @@ func (r *requester) splitEvent(event map[string]interface{}) []map[string]interf return []map[string]interface{}{event} } - mm := m.Clone() - if _, err := mm.Put(r.splitEventsBy, s); err != nil { - return []map[string]interface{}{event} + // call splitEvent recursively for each part + for _, nestedSplit := range splitEvent(keys[1], s) { + mm := m.Clone() + if _, err := mm.Put(keys[0], nestedSplit); err != nil { + return []map[string]interface{}{event} + } + events = append(events, mm) } - - events = append(events, mm) } return events diff --git a/x-pack/filebeat/input/httpjson/requester_test.go b/x-pack/filebeat/input/httpjson/requester_test.go new file mode 100644 index 000000000000..31e65a57c739 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/requester_test.go @@ -0,0 +1,86 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSplitEventsBy(t *testing.T) { + event := map[string]interface{}{ + "this": "is kept", + "alerts": []interface{}{ + map[string]interface{}{ + "this_is": "also kept", + "entities": []interface{}{ + map[string]interface{}{ + "something": "something", + }, + map[string]interface{}{ + "else": "else", + }, + }, + }, + map[string]interface{}{ + "this_is": "also kept 2", + "entities": []interface{}{ + map[string]interface{}{ + "something": "something 2", + }, + map[string]interface{}{ + "else": "else 2", + }, + }, + }, + }, + } + + expectedEvents := []map[string]interface{}{ + { + "this": "is kept", + "alerts": map[string]interface{}{ + "this_is": "also kept", + "entities": map[string]interface{}{ + "something": "something", + }, + }, + }, + { + "this": "is kept", + "alerts": map[string]interface{}{ + "this_is": "also kept", + "entities": map[string]interface{}{ + "else": "else", + }, + }, + }, + { + "this": "is kept", + "alerts": map[string]interface{}{ + "this_is": "also kept 2", + "entities": map[string]interface{}{ + "something": "something 2", + }, + }, + }, + { + "this": "is kept", + "alerts": map[string]interface{}{ + "this_is": "also kept 2", + "entities": map[string]interface{}{ + "else": "else 2", + }, + }, + }, + } + + const key = "alerts..entities" + + got := splitEvent(key, event) + + assert.Equal(t, expectedEvents, got) +}