Skip to content

Commit

Permalink
Cherry-pick #15370 to 7.x: [Filebeat] Add basic json support into s3 …
Browse files Browse the repository at this point in the history
…input (#15477)

* [Filebeat] Add basic json support into s3 input (#15370)

* Add basic support for json format logs with message_key
* Change to use expand_event_list_from_field

(cherry picked from commit 8962224)

* update changelog
  • Loading branch information
kaiyan-sheng authored Jan 13, 2020
1 parent b642e0e commit 90afa71
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `index` option to all inputs to directly set a per-input index value. {pull}14010[14010]
- Add new fileset googlecloud/audit for ingesting Google Cloud Audit logs. {pull}15200[15200]
- Add dashboards to the CEF module (ported from the Logstash ArcSight module). {pull}14342[14342]
- Add expand_event_list_from_field support in s3 input for reading json format AWS logs. {issue}15357[15357] {pull}15370[15370]
- Add azure-eventhub input which will use the azure eventhub go sdk. {issue}14092[14092] {pull}14882[14882]
- Expose more metrics of harvesters (e.g. `read_offset`, `start_time`). {pull}13395[13395]
- Integrate the azure-eventhub with filebeat azure module (replace the kafka input). {pull}15480[15480]
Expand Down
36 changes: 34 additions & 2 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ stopped and the sqs message will be returned back to the queue.
{beatname_lc}.inputs:
- type: s3
queue_url: https://sqs.ap-southeast-1.amazonaws.com/1234/test-s3-queue
access_key_id: my-access-key
secret_access_key: my-secret-access-key
credential_profile_name: elastic-beats
expand_event_list_from_field: Records
----

The `s3` input supports the following configuration options plus the
Expand All @@ -52,6 +52,38 @@ if it took too long to read the s3 log, this sqs message will not be reprocessed
The default visibility timeout for a message is 300 seconds. The minimum
is 0 seconds. The maximum is 12 hours.

[float]
==== `expand_event_list_from_field`

If the fileset using this input expects to receive multiple messages bundled
under a specific field then the config option expand_event_list_from_field value
can be assigned the name of the field. This setting will be able to split the
messages under the group value into separate events. For example, CloudTrail logs
are in JSON format and events are found under the JSON object "Records":

["source","json"]
----
{
"Records": [
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:51:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
...
},
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:52:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
...
}
]
}
```
----

[float]
==== `aws credentials`

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/s3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type config struct {
QueueURL string `config:"queue_url" validate:"nonzero,required"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
AwsConfig awscommon.ConfigAWS `config:",inline"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

func defaultConfig() config {
Expand Down
104 changes: 91 additions & 13 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func init() {
}
}

// Input is a input for s3
// s3Input is a input for s3
type s3Input struct {
outlet channel.Outleter // Output of received s3 logs.
config config
Expand Down Expand Up @@ -376,26 +376,38 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
continue
}

// Decode JSON documents when expand_event_list_from_field is given in config
if p.config.ExpandEventListFromField != "" {
decoder := json.NewDecoder(reader)
err := p.decodeJSONWithKey(decoder, objectHash, s3Info, s3Context)
if err != nil {
err = errors.Wrapf(err, "decodeJSONWithKey failed for %v", s3Info.key)
s3Context.Fail(err)
return err
}
return nil
}

// handle s3 objects that are not json content-type
offset := 0
for {
log, err := reader.ReadString('\n')
if log == "" {
break
}

if err != nil {
if err == io.EOF {
// create event for last line
offset += len([]byte(log))
event := createEvent(log, offset, s3Info, objectHash, s3Context)
err = p.forwardEvent(event)
if err != nil {
err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key)
s3Context.Fail(err)
return err
}
return nil
if err == io.EOF {
// create event for last line
offset += len([]byte(log))
event := createEvent(log, offset, s3Info, objectHash, s3Context)
err = p.forwardEvent(event)
if err != nil {
err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key)
s3Context.Fail(err)
return err
}
return nil
} else if err != nil {
return errors.Wrapf(err, "ReadString failed for %v", s3Info.key)
}

Expand All @@ -414,6 +426,72 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
return nil
}

func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Context *s3Context) error {
offset := 0
for {
var jsonFields map[string][]interface{}
err := decoder.Decode(&jsonFields)
if jsonFields == nil {
return nil
}

if err == io.EOF {
// create event for last line
// get logs from expand_event_list_from_field
textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField))
p.logger.Error(err)
return err
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Context)
if err != nil {
err = errors.Wrapf(err, fmt.Sprintf("convertJSONToEvent failed for %v", s3Info.key))
p.logger.Error(err)
return err
}
}
} else if err != nil {
// decode json failed, skip this log file
p.logger.Warnf(fmt.Sprintf("Decode json failed for '%s', skipping this file", s3Info.key))
return nil
}

textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField))
p.logger.Error(err)
return err
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Context)
if err != nil {
err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField))
p.logger.Error(err)
return err
}
}
}
}

func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Context *s3Context) error {
vJSON, err := json.Marshal(jsonFields)
log := string(vJSON)
offset += len([]byte(log))
event := createEvent(log, offset, s3Info, objectHash, s3Context)

err = p.forwardEvent(event)
if err != nil {
err = errors.Wrapf(err, fmt.Sprintf("forwardEvent failed for %s", s3Info.key))
p.logger.Error(err)
return err
}
return nil
}

func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufio.Reader, error) {
s3GetObjectInput := &s3.GetObjectInput{
Bucket: awssdk.String(s3Info.name),
Expand Down

0 comments on commit 90afa71

Please sign in to comment.