Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Add max_number_of_messages config parameter for S3 input #21993

Merged
merged 3 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Adding support for Microsoft 365 Defender (Microsoft Threat Protection) {pull}21446[21446]
- Adding support for FIPS in s3 input {pull}21446[21446]
- Add SSL option to checkpoint module {pull}19560[19560]
- Add max_number_of_messages config into s3 input. {pull}21993[21993]

*Heartbeat*

Expand Down
89 changes: 45 additions & 44 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,32 @@ The `s3` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

[float]
==== `queue_url`

URL of the AWS SQS queue that messages will be received from. Required.

[float]
==== `fips_enabled`

Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`.
==== `api_timeout`

[float]
==== `visibility_timeout`
The maximum duration of AWS API can take. If it exceeds the timeout, AWS API
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
will be interrupted.
The default AWS API timeout for a message is 120 seconds. The minimum
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
is 0 seconds. The maximum is half of the visibility timeout value.

The duration that the received messages are hidden from subsequent
retrieve requests after being retrieved by a ReceiveMessage request.
This value needs to be a lot bigger than {beatname_uc} collection frequency so
if it took too long to read the s3 log, this sqs message will not be reprocessed.
The default visibility timeout for a message is 300 seconds. The minimum
is 0 seconds. The maximum is 12 hours.
["source","json"]
----
{
"Records": [
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:51:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
},
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:52:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
}
]
}
----

[float]
==== `expand_event_list_from_field`
Expand Down Expand Up @@ -93,40 +101,33 @@ file_selectors:
- regex: '^AWSLogs/\d+/CloudTrail/'
expand_event_list_from_field: 'Records'
- regex: '^AWSLogs/\d+/CloudTrail-Digest'
```
----

[float]
==== `fips_enabled`

Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`.

[float]
==== `api_timeout`
==== `max_number_of_messages`
The maximum number of messages to return. Amazon SQS never returns more messages
than this value (however, fewer messages might be returned).
Valid values: 1 to 10. Default: 5.

The maximum duration of AWS API can take. If it exceeds the timeout, AWS API
will be interrupted.
The default AWS API timeout for a message is 120 seconds. The minimum
is 0 seconds. The maximum is half of the visibility timeout value.
[float]
==== `queue_url`

["source","json"]
----
{
"Records": [
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:51:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
...
},
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:52:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
...
}
]
}
```
----
URL of the AWS SQS queue that messages will be received from. Required.

[float]
==== `visibility_timeout`

The duration that the received messages are hidden from subsequent
retrieve requests after being retrieved by a ReceiveMessage request.
This value needs to be a lot bigger than {beatname_uc} collection frequency so
if it took too long to read the s3 log, this sqs message will not be reprocessed.
The default visibility timeout for a message is 300 seconds. The minimum
is 0 seconds. The maximum is 12 hours.

[float]
==== `aws credentials`
Expand Down
18 changes: 6 additions & 12 deletions x-pack/filebeat/input/s3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,11 @@ type s3Context struct {
errC chan error
}

var (
// The maximum number of messages to return. Amazon SQS never returns more messages
// than this value (however, fewer messages might be returned).
maxNumberOfMessage uint8 = 10

// The duration (in seconds) for which the call waits for a message to arrive
// in the queue before returning. If a message is available, the call returns
// sooner than WaitTimeSeconds. If no messages are available and the wait time
// expires, the call returns successfully with an empty list of messages.
waitTimeSecond uint8 = 10
)
// The duration (in seconds) for which the call waits for a message to arrive
// in the queue before returning. If a message is available, the call returns
// sooner than WaitTimeSeconds. If no messages are available and the wait time
// expires, the call returns successfully with an empty list of messages.
var waitTimeSecond uint8 = 10

func (c *s3Collector) run() {
defer c.logger.Info("s3 input worker has stopped.")
Expand Down Expand Up @@ -205,7 +199,7 @@ func (c *s3Collector) receiveMessage(svcSQS sqsiface.ClientAPI, visibilityTimeou
&sqs.ReceiveMessageInput{
QueueUrl: &c.config.QueueURL,
MessageAttributeNames: []string{"All"},
MaxNumberOfMessages: awssdk.Int64(int64(maxNumberOfMessage)),
MaxNumberOfMessages: awssdk.Int64(int64(c.config.MaxNumberOfMessages)),
VisibilityTimeout: &visibilityTimeout,
WaitTimeSeconds: awssdk.Int64(int64(waitTimeSecond)),
})
Expand Down
22 changes: 15 additions & 7 deletions x-pack/filebeat/input/s3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
)

type config struct {
APITimeout time.Duration `config:"api_timeout"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
FileSelectors []FileSelectorCfg `config:"file_selectors"`
FipsEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url" validate:"nonzero,required"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
FipsEnabled bool `config:"fips_enabled"`
AwsConfig awscommon.ConfigAWS `config:",inline"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
APITimeout time.Duration `config:"api_timeout"`
FileSelectors []FileSelectorCfg `config:"file_selectors"`
}

// FileSelectorCfg defines type and configuration of FileSelectors
Expand All @@ -31,9 +32,10 @@ type FileSelectorCfg struct {

func defaultConfig() config {
return config{
VisibilityTimeout: 300 * time.Second,
APITimeout: 120 * time.Second,
FipsEnabled: false,
APITimeout: 120 * time.Second,
FipsEnabled: false,
MaxNumberOfMessages: 5,
VisibilityTimeout: 300 * time.Second,
}
}

Expand All @@ -42,16 +44,22 @@ func (c *config) Validate() error {
return fmt.Errorf("visibility timeout %v is not within the "+
"required range 0s to 12h", c.VisibilityTimeout)
}

kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
if c.APITimeout < 0 || c.APITimeout > c.VisibilityTimeout/2 {
return fmt.Errorf("api timeout %v needs to be larger than"+
" 0s and smaller than half of the visibility timeout", c.APITimeout)
}

kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
for i := range c.FileSelectors {
r, err := regexp.Compile(c.FileSelectors[i].RegexString)
if err != nil {
return err
}
c.FileSelectors[i].Regex = r
}

if c.MaxNumberOfMessages > 10 || c.MaxNumberOfMessages < 1 {
return fmt.Errorf(" max_number_of_messages %v needs to be between 1 and 10", c.MaxNumberOfMessages)
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (in *s3Input) createCollector(ctx v2.Context, pipeline beat.Pipeline) (*s3C
}

log.Debug("s3 service name = ", s3Servicename)

log.Debug("s3 input config max_number_of_messages = ", in.config.MaxNumberOfMessages)
return &s3Collector{
cancellation: ctxtool.FromCanceller(ctx.Cancelation),
logger: log,
Expand Down