diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 609865cddfa..da174ed612c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff] - Move repeated Windows event channel not found errors in winlog input to debug level. {issue}35314[35314] {pull}35317[35317] - Fix crash when processing forwarded logs missing a message. {issue}34705[34705] {pull}34865[34865] - Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433] +- Add support in s3 input for JSON with array of objects. {pull}35475[35475] - RFC5424 syslog timestamps with offset 'Z' will be treated as UTC rather than using the default timezone. {pull}35360[35360] - [system] sync system/auth dataset with system integration 1.29.0. {pull}35581[35581] - [GCS Input] - Fixed an issue where bucket_timeout was being applied to the entire bucket poll interval and not individual bucket object read operations. Fixed a map write concurrency issue arising from data races when using a high number of workers. Fixed the flaky tests that were present in the GCS test suit. {pull}35605[35605] diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index f99e6ad0133..87e22552482 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -120,8 +120,8 @@ characters. This only applies to non-JSON logs. See <<_encoding_3>>. ==== `expand_event_list_from_field` If the fileset using this input expects to receive multiple messages bundled -under a specific field then the config option `expand_event_list_from_field` -value can be assigned the name of the field. This setting will be able to split +under a specific field or an array of objects then the config option `expand_event_list_from_field` +value can be assigned the name of the field or `.[]`. This setting will be able to split the messages under the group value into separate events. For example, CloudTrail logs are in JSON format and events are found under the JSON object "Records". @@ -145,6 +145,23 @@ logs are in JSON format and events are found under the JSON object "Records". } ---- +Or when `expand_event_list_from_field` is set to `.[]`, an array of objects will be split +into separate events. + +["source","json"] +---- +[ + { + "id":"1234", + "message":"success" + }, + { + "id":"5678", + "message":"failure" + } +] +---- + Note: When `expand_event_list_from_field` parameter is given in the config, aws-s3 input will assume the logs are in JSON format and decode them as JSON. Content type will not be checked. If a file has "application/json" content-type, diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 933a6c41a8e..eb9d9a7eddb 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -234,14 +234,18 @@ func (p *s3ObjectProcessor) readJSON(r io.Reader) error { } func (p *s3ObjectProcessor) splitEventList(key string, raw json.RawMessage, offset int64, objHash string) error { - var jsonObject map[string]json.RawMessage - if err := json.Unmarshal(raw, &jsonObject); err != nil { - return err - } + // .[] signifies the root object is an array, and it should be split. + if key != ".[]" { + var jsonObject map[string]json.RawMessage + if err := json.Unmarshal(raw, &jsonObject); err != nil { + return err + } - raw, found := jsonObject[key] - if !found { - return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key) + var found bool + raw, found = jsonObject[key] + if !found { + return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key) + } } dec := json.NewDecoder(bytes.NewReader(raw)) diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 61b6124cd9a..a1e7e4f0b81 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -113,6 +113,16 @@ func TestS3ObjectProcessor(t *testing.T) { testProcessS3ObjectError(t, "testdata/events-array.json", "application/json", 0, sel) }) + t.Run("split array with expand_event_list_from_field equals .[]", func(t *testing.T) { + sel := fileSelectorConfig{ReaderConfig: readerConfig{ExpandEventListFromField: ".[]"}} + testProcessS3Object(t, "testdata/array.json", "application/json", 2, sel) + }) + + t.Run("split array without expand_event_list_from_field", func(t *testing.T) { + sel := fileSelectorConfig{ReaderConfig: readerConfig{ExpandEventListFromField: ""}} + testProcessS3Object(t, "testdata/array.json", "application/json", 1, sel) + }) + t.Run("events have a unique repeatable _id", func(t *testing.T) { // Hash of bucket ARN, object key, object versionId, and log offset. events := testProcessS3Object(t, "testdata/log.txt", "text/plain", 2) diff --git a/x-pack/filebeat/input/awss3/testdata/array.json b/x-pack/filebeat/input/awss3/testdata/array.json new file mode 100644 index 00000000000..8d22df6aeb8 --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/array.json @@ -0,0 +1,10 @@ +[ + { + "time": "2021-05-25 18:20:58 UTC", + "msg": "hello" + }, + { + "time": "2021-05-26 22:21:40 UTC", + "msg": "world" + } +]