Skip to content

Commit

Permalink
Add log_group_name_prefix config option for aws-cloudwatch input (#26187
Browse files Browse the repository at this point in the history
) (#26527)

(cherry picked from commit 1c9a488)
  • Loading branch information
kaiyan-sheng authored Jun 28, 2021
1 parent bedc535 commit c6ab0c1
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update PanOS module to parse HIP Match logs. {issue}24350[24350] {pull}25686[25686]
- Support MongoDB 4.4 in filebeat's MongoDB module. {issue}20501[20501] {pull}24774[24774]
- Enhance GCP module to populate orchestrator.* fields for GKE / K8S logs {pull}25368[25368]
- Add log_group_name_prefix config into aws-cloudwatch input. {pull}26187[26187]
- Move Filebeat azure module to GA. {pull}26114[26114] {pull}26168[26168]
- Make `filestream` input GA. {pull}26127[26127]
- http_endpoint: Support multiple documents in a single request by POSTing an array or NDJSON format. {pull}25764[25764]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,60 @@

# List of S3 object metadata keys to include in events.
#include_s3_metadata: []

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
#enabled: false

# AWS Credentials
# If access_key_id and secret_access_key are configured, then use them to make api calls.
# If not, aws-cloudwatch input will load default AWS config or load with given profile name.
#access_key_id: '${AWS_ACCESS_KEY_ID:""}'
#secret_access_key: '${AWS_SECRET_ACCESS_KEY:""}'
#session_token: '${AWS_SESSION_TOKEN:"”}'
#credential_profile_name: test-aws-s3-input

# ARN of the log group to collect logs from
#log_group_arn: "arn:aws:logs:us-east-1:428152502467:log-group:test:*"

# Name of the log group to collect logs from.
# Note: region_name is required when log_group_name is given.
#log_group_name: test

# The prefix for a group of log group names.
# Note: `region_name` is required when `log_group_name_prefix` is given.
# `log_group_name` and `log_group_name_prefix` cannot be given at the same time.
#log_group_name_prefix: /aws/

# Region that the specified log group or log group prefix belongs to.
#region_name: us-east-1

# A list of strings of log streams names that Filebeat collect log events from.
#log_streams:
# - log_stream_name

# A string to filter the results to include only log events from log streams
# that have names starting with this prefix.
#log_stream_prefix: test

# `start_position` allows user to specify if this input should read log files
# from the `beginning` or from the `end`.
# `beginning`: reads from the beginning of the log group (default).
# `end`: read only new messages from current time minus `scan_frequency` going forward.
#start_position: beginning

# This config parameter sets how often Filebeat checks for new log events from the
# specified log group. Default `scan_frequency` is 1 minute, which means Filebeat
# will sleep for 1 minute before querying for new logs again.
#scan_frequency: 1m

# The maximum duration of AWS API can take. If it exceeds the timeout, AWS API
# will be interrupted.
# The default AWS API timeout for a message is 120 seconds.
# The minimum is 0 seconds.
#api_timeout: 120s

# This is used to sleep between AWS `FilterLogEvents` API calls inside the same
# collection period.
#api_sleep: 200ms
11 changes: 9 additions & 2 deletions x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,18 @@ ARN of the log group to collect logs from.

[float]
==== `log_group_name`
Name of the log group to collect logs from. Note: region_name is required when
Name of the log group to collect logs from. Note: `region_name` is required when
log_group_name is given.

[float]
==== `log_group_name_prefix`
The prefix for a group of log group names. Note: `region_name` is required when
log_group_name_prefix is given. `log_group_name` and `log_group_name_prefix`
cannot be given at the same time.

[float]
==== `region_name`
Region that the specified log group belongs to.
Region that the specified log group or log group prefix belongs to.

[float]
==== `log_streams`
Expand Down Expand Up @@ -109,6 +115,7 @@ Please see <<aws-credentials-config,AWS credentials options>> for more details.
=== AWS Permissions
Specific AWS permissions are required for IAM user to access aws-cloudwatch:
----
cloudwatchlogs:DescribeLogGroups
logs:FilterLogEvents
----

Expand Down
57 changes: 57 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3094,6 +3094,63 @@ filebeat.inputs:
# List of S3 object metadata keys to include in events.
#include_s3_metadata: []

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
#enabled: false

# AWS Credentials
# If access_key_id and secret_access_key are configured, then use them to make api calls.
# If not, aws-cloudwatch input will load default AWS config or load with given profile name.
#access_key_id: '${AWS_ACCESS_KEY_ID:""}'
#secret_access_key: '${AWS_SECRET_ACCESS_KEY:""}'
#session_token: '${AWS_SESSION_TOKEN:"”}'
#credential_profile_name: test-aws-s3-input

# ARN of the log group to collect logs from
#log_group_arn: "arn:aws:logs:us-east-1:428152502467:log-group:test:*"

# Name of the log group to collect logs from.
# Note: region_name is required when log_group_name is given.
#log_group_name: test

# The prefix for a group of log group names.
# Note: `region_name` is required when `log_group_name_prefix` is given.
# `log_group_name` and `log_group_name_prefix` cannot be given at the same time.
#log_group_name_prefix: /aws/

# Region that the specified log group or log group prefix belongs to.
#region_name: us-east-1

# A list of strings of log streams names that Filebeat collect log events from.
#log_streams:
# - log_stream_name

# A string to filter the results to include only log events from log streams
# that have names starting with this prefix.
#log_stream_prefix: test

# `start_position` allows user to specify if this input should read log files
# from the `beginning` or from the `end`.
# `beginning`: reads from the beginning of the log group (default).
# `end`: read only new messages from current time minus `scan_frequency` going forward.
#start_position: beginning

# This config parameter sets how often Filebeat checks for new log events from the
# specified log group. Default `scan_frequency` is 1 minute, which means Filebeat
# will sleep for 1 minute before querying for new logs again.
#scan_frequency: 1m

# The maximum duration of AWS API can take. If it exceeds the timeout, AWS API
# will be interrupted.
# The default AWS API timeout for a message is 120 seconds.
# The minimum is 0 seconds.
#api_timeout: 120s

# This is used to sleep between AWS `FilterLogEvents` API calls inside the same
# collection period.
#api_sleep: 200ms

# =========================== Filebeat autodiscover ============================

# Autodiscover allows you to detect changes in the system and spawn new modules
Expand Down
15 changes: 10 additions & 5 deletions x-pack/filebeat/input/awscloudwatch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type config struct {
harvester.ForwarderConfig `config:",inline"`
LogGroupARN string `config:"log_group_arn"`
LogGroupName string `config:"log_group_name"`
LogGroupNamePrefix string `config:"log_group_name_prefix"`
RegionName string `config:"region_name"`
LogStreams []string `config:"log_streams"`
LogStreamPrefix string `config:"log_stream_prefix"`
Expand Down Expand Up @@ -44,13 +45,17 @@ func (c *config) Validate() error {
"either 'beginning' or 'end'")
}

if c.LogGroupARN == "" && c.LogGroupName == "" {
return errors.New("log_group_arn and log_group_name config parameter" +
"cannot be both empty")
if c.LogGroupARN == "" && c.LogGroupName == "" && c.LogGroupNamePrefix == "" {
return errors.New("log_group_arn, log_group_name and log_group_name_prefix config parameter" +
"cannot all be empty")
}

if c.LogGroupName != "" && c.RegionName == "" {
return errors.New("region_name is required when log_group_name " +
if c.LogGroupName != "" && c.LogGroupNamePrefix != "" {
return errors.New("log_group_name and log_group_name_prefix cannot be given at the same time")
}

if (c.LogGroupName != "" || c.LogGroupNamePrefix != "") && c.RegionName == "" {
return errors.New("region_name is required when log_group_name or log_group_name_prefix " +
"config parameter is given")
}
return nil
Expand Down
62 changes: 53 additions & 9 deletions x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,33 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con

// Run runs the input
func (in *awsCloudWatchInput) Run() {
in.workerOnce.Do(func() {
in.workerWg.Add(1)
go func() {
in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", in.config.LogGroupName)
defer in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", in.config.LogGroupName)
defer in.workerWg.Done()
in.run()
}()
})
cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AwsConfig.Endpoint, "cloudwatchlogs", in.config.RegionName, in.awsConfig)
svc := cloudwatchlogs.New(cwConfig)

var logGroupNames []string
var err error
if in.config.LogGroupNamePrefix != "" {
logGroupNames, err = in.getLogGroupNames(svc)
if err != nil {
in.logger.Error("getLogGroupNames failed: ", err)
return
}
} else {
logGroupNames = []string{in.config.LogGroupName}
}

for _, logGroup := range logGroupNames {
in.config.LogGroupName = logGroup
in.workerOnce.Do(func() {
in.workerWg.Add(1)
go func() {
in.logger.Infof("aws-cloudwatch input worker for log group: '%v' has started", in.config.LogGroupName)
defer in.logger.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", in.config.LogGroupName)
defer in.workerWg.Done()
in.run()
}()
})
}
}

func (in *awsCloudWatchInput) run() {
Expand Down Expand Up @@ -176,6 +194,32 @@ func parseARN(logGroupARN string) (string, string, error) {
return "", "", errors.Errorf("cannot get log group name from log group ARN: %s", logGroupARN)
}

// getLogGroupNames uses DescribeLogGroups API to retrieve all log group names
func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI) ([]string, error) {
// construct DescribeLogGroupsInput
filterLogEventsInput := &cloudwatchlogs.DescribeLogGroupsInput{
LogGroupNamePrefix: awssdk.String(in.config.LogGroupNamePrefix),
}

// make API request
req := svc.DescribeLogGroupsRequest(filterLogEventsInput)
p := cloudwatchlogs.NewDescribeLogGroupsPaginator(req)
var logGroupNames []string
for p.Next(context.TODO()) {
page := p.CurrentPage()
in.logger.Debugf("Collecting #%v log group names", len(page.LogGroups))
for _, lg := range page.LogGroups {
logGroupNames = append(logGroupNames, *lg.LogGroupName)
}
}

if err := p.Err(); err != nil {
in.logger.Error("failed DescribeLogGroupsRequest: ", err)
return logGroupNames, err
}
return logGroupNames, nil
}

// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI) error {
ctx, cancelFn := context.WithTimeout(in.inputCtx, in.config.APITimeout)
Expand Down

0 comments on commit c6ab0c1

Please sign in to comment.