Skip to content

Commit

Permalink
[AWS S3 input] Add support in s3 input for JSON with array of objects (
Browse files Browse the repository at this point in the history
…elastic#35475)

Co-authored-by: Andrew Kroh <[email protected]>
  • Loading branch information
kaiyan-sheng and andrewkroh authored Jun 8, 2023
1 parent 0672e11 commit b4572d4
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff]
- Move repeated Windows event channel not found errors in winlog input to debug level. {issue}35314[35314] {pull}35317[35317]
- Fix crash when processing forwarded logs missing a message. {issue}34705[34705] {pull}34865[34865]
- Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433]
- Add support in s3 input for JSON with array of objects. {pull}35475[35475]
- RFC5424 syslog timestamps with offset 'Z' will be treated as UTC rather than using the default timezone. {pull}35360[35360]
- [system] sync system/auth dataset with system integration 1.29.0. {pull}35581[35581]
- [GCS Input] - Fixed an issue where bucket_timeout was being applied to the entire bucket poll interval and not individual bucket object read operations. Fixed a map write concurrency issue arising from data races when using a high number of workers. Fixed the flaky tests that were present in the GCS test suit. {pull}35605[35605]
Expand Down
21 changes: 19 additions & 2 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ characters. This only applies to non-JSON logs. See <<_encoding_3>>.
==== `expand_event_list_from_field`

If the fileset using this input expects to receive multiple messages bundled
under a specific field then the config option `expand_event_list_from_field`
value can be assigned the name of the field. This setting will be able to split
under a specific field or an array of objects then the config option `expand_event_list_from_field`
value can be assigned the name of the field or `.[]`. This setting will be able to split
the messages under the group value into separate events. For example, CloudTrail
logs are in JSON format and events are found under the JSON object "Records".

Expand All @@ -145,6 +145,23 @@ logs are in JSON format and events are found under the JSON object "Records".
}
----

Or when `expand_event_list_from_field` is set to `.[]`, an array of objects will be split
into separate events.

["source","json"]
----
[
{
"id":"1234",
"message":"success"
},
{
"id":"5678",
"message":"failure"
}
]
----

Note: When `expand_event_list_from_field` parameter is given in the config,
aws-s3 input will assume the logs are in JSON format and decode them as JSON.
Content type will not be checked. If a file has "application/json" content-type,
Expand Down
18 changes: 11 additions & 7 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,18 @@ func (p *s3ObjectProcessor) readJSON(r io.Reader) error {
}

func (p *s3ObjectProcessor) splitEventList(key string, raw json.RawMessage, offset int64, objHash string) error {
var jsonObject map[string]json.RawMessage
if err := json.Unmarshal(raw, &jsonObject); err != nil {
return err
}
// .[] signifies the root object is an array, and it should be split.
if key != ".[]" {
var jsonObject map[string]json.RawMessage
if err := json.Unmarshal(raw, &jsonObject); err != nil {
return err
}

raw, found := jsonObject[key]
if !found {
return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key)
var found bool
raw, found = jsonObject[key]
if !found {
return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key)
}
}

dec := json.NewDecoder(bytes.NewReader(raw))
Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ func TestS3ObjectProcessor(t *testing.T) {
testProcessS3ObjectError(t, "testdata/events-array.json", "application/json", 0, sel)
})

t.Run("split array with expand_event_list_from_field equals .[]", func(t *testing.T) {
sel := fileSelectorConfig{ReaderConfig: readerConfig{ExpandEventListFromField: ".[]"}}
testProcessS3Object(t, "testdata/array.json", "application/json", 2, sel)
})

t.Run("split array without expand_event_list_from_field", func(t *testing.T) {
sel := fileSelectorConfig{ReaderConfig: readerConfig{ExpandEventListFromField: ""}}
testProcessS3Object(t, "testdata/array.json", "application/json", 1, sel)
})

t.Run("events have a unique repeatable _id", func(t *testing.T) {
// Hash of bucket ARN, object key, object versionId, and log offset.
events := testProcessS3Object(t, "testdata/log.txt", "text/plain", 2)
Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/awss3/testdata/array.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"time": "2021-05-25 18:20:58 UTC",
"msg": "hello"
},
{
"time": "2021-05-26 22:21:40 UTC",
"msg": "world"
}
]

0 comments on commit b4572d4

Please sign in to comment.