From 979c68e52759d233b9541dd1475ad463d25b0efe Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 25 May 2023 09:59:00 +0930 Subject: [PATCH] address pr comments --- CHANGELOG-developer.next.asciidoc | 1 - CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-aws-s3.asciidoc | 6 +- x-pack/filebeat/input/awss3/config.go | 9 +- x-pack/filebeat/input/awss3/config_test.go | 61 +++++---- x-pack/filebeat/input/awss3/input.go | 18 ++- x-pack/filebeat/input/awss3/input_test.go | 123 ++++++++++++++---- 7 files changed, 150 insertions(+), 69 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 774f1440b34e..5d009736c342 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -154,7 +154,6 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add the file path of the instance lock on the error when it's is already locked {pull}33788[33788] - Add DropFields processor to js API {pull}33458[33458] - Add support for different folders when testing data {pull}34467[34467] -- Allow non-AWS endpoints for testing Filebeat awss3 input. {issue}35496[35496] {pull}35520[35520] ==== Deprecated diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 481cca15c725..daa132d64656 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -294,6 +294,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Add delegated account support when using Google ADC in `httpjson` input. {pull}35507[35507] - Add XML decoding support to CEL. {issue}34438[34438] {pull}35372[35372] - Mark CEL input as GA. {pull}35559[35559] +- Allow non-AWS endpoints for awss3 input. {issue}35496[35496] {pull}35520[35520] *Auditbeat* - Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817] diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 4e281c254716..f99e6ad01336 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -258,10 +258,10 @@ configuring multiline options. URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn` and `non_aws_bucket_name` are not set). [float] -==== `region_name` +==== `region` -The name of the AWS region of the end point for testing purposes. This option -must not be set when using amazonaws end points that provide a region. +The name of the AWS region of the end point. If this option is given it +takes precedence over the region name obtained from the `queue_url` value. [float] ==== `visibility_timeout` diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 12cc16efc281..6d2517e53efc 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -27,7 +27,7 @@ type config struct { SQSScript *scriptConfig `config:"sqs.notification_parsing_script"` MaxNumberOfMessages int `config:"max_number_of_messages"` QueueURL string `config:"queue_url"` - RegionName string `config:"region_name"` + RegionName string `config:"region"` BucketARN string `config:"bucket_arn"` NonAWSBucketName string `config:"non_aws_bucket_name"` BucketListInterval time.Duration `config:"bucket_list_interval"` @@ -79,13 +79,6 @@ func (c *config) Validate() error { return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers) } - if c.QueueURL != "" { - region, _ := getRegionFromQueueURL(c.QueueURL, c.AWSConfig.Endpoint) - if region != "" && c.RegionName != "" { - return fmt.Errorf("region_name <%s> must not be set with a queue_url containing a region name", c.RegionName) - } - } - if c.QueueURL != "" && (c.VisibilityTimeout <= 0 || c.VisibilityTimeout.Hours() > 12) { return fmt.Errorf("visibility_timeout <%v> must be greater than 0 and "+ "less than or equal to 12h", c.VisibilityTimeout) diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index b93c4583382b..880412ad377f 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -57,7 +57,7 @@ func TestConfig(t *testing.T) { nonAWSS3Bucket string config mapstr.M expectedErr string - expectedCfg func(queueURL, s3Bucket string, nonAWSS3Bucket string) config + expectedCfg func(queueURL, s3Bucket, nonAWSS3Bucket string) config }{ { name: "input with defaults for queueURL", @@ -80,7 +80,7 @@ func TestConfig(t *testing.T) { "number_of_workers": 5, }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { c := makeConfig("", s3Bucket, "") c.NumberOfWorkers = 5 return c @@ -100,7 +100,7 @@ func TestConfig(t *testing.T) { }, }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { c := makeConfig(queueURL, "", "") regex := match.MustCompile("/CloudTrail/") c.FileSelectors = []fileSelectorConfig{ @@ -113,17 +113,17 @@ func TestConfig(t *testing.T) { }, }, { - name: "non-AWS endpoint with explicit region", + name: "non-AWS_endpoint_with_explicit_region", queueURL: queueURL, s3Bucket: "", nonAWSS3Bucket: "", config: mapstr.M{ - "queue_url": queueURL, - "region_name": "region", - "endpoint": "ep", + "queue_url": queueURL, + "region": "region", + "endpoint": "ep", }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { c := makeConfig(queueURL, "", "") c.RegionName = "region" c.AWSConfig.Endpoint = "ep" @@ -131,29 +131,40 @@ func TestConfig(t *testing.T) { }, }, { - name: "explicit AWS endpoint with explicit region", + name: "explicit_AWS_endpoint_with_explicit_region", queueURL: queueURL, s3Bucket: "", nonAWSS3Bucket: "", config: mapstr.M{ - "queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", - "region_name": "region", - "endpoint": "amazonaws.com", + "queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + "region": "region", + "endpoint": "amazonaws.com", + }, + expectedErr: "", + expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, "", "") + c.QueueURL = "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs" + c.AWSConfig.Endpoint = "amazonaws.com" + c.RegionName = "region" + return c }, - expectedErr: "region_name must not be set with a queue_url containing a region name", - expectedCfg: nil, }, { - name: "inferred AWS endpoint with explicit region", + name: "inferred_AWS_endpoint_with_explicit_region", queueURL: queueURL, s3Bucket: "", nonAWSS3Bucket: "", config: mapstr.M{ - "queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", - "region_name": "region", + "queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + "region": "region", + }, + expectedErr: "", + expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, "", "") + c.QueueURL = "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs" + c.RegionName = "region" + return c }, - expectedErr: "region_name must not be set with a queue_url containing a region name", - expectedCfg: nil, }, { name: "localstack_with_region_name", @@ -161,11 +172,11 @@ func TestConfig(t *testing.T) { s3Bucket: "", nonAWSS3Bucket: "", config: mapstr.M{ - "queue_url": "http://localhost:4566/000000000000/sample-queue", - "region_name": "myregion", + "queue_url": "http://localhost:4566/000000000000/sample-queue", + "region": "myregion", }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { c := makeConfig(queueURL, "", "") c.RegionName = "myregion" return c @@ -365,7 +376,7 @@ func TestConfig(t *testing.T) { "number_of_workers": 5, }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { c := makeConfig("", "", nonAWSS3Bucket) c.NumberOfWorkers = 5 return c @@ -448,7 +459,7 @@ func TestConfig(t *testing.T) { "number_of_workers": 5, }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { c := makeConfig("", s3Bucket, "") c.BackupConfig.BackupToBucketArn = "arn:aws:s3:::bBucket" c.BackupConfig.BackupToBucketPrefix = "backup" @@ -468,7 +479,7 @@ func TestConfig(t *testing.T) { "number_of_workers": 5, }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { c := makeConfig("", "", nonAWSS3Bucket) c.NonAWSBucketName = nonAWSS3Bucket c.BackupConfig.NonAWSBackupToBucketName = "bBucket" diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index c290d13140d3..9ba508d439c5 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -115,12 +115,14 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { defer cancelInputCtx() if in.config.QueueURL != "" { - regionName := in.config.RegionName - if regionName == "" { - regionName, err = getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint) - if err != nil { - return fmt.Errorf("failed to get AWS region from queue_url: %w", err) - } + regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint) + if err != nil && in.config.RegionName == "" { + return fmt.Errorf("failed to get AWS region from queue_url: %w", err) + } + if in.config.RegionName != "" && regionName != in.config.RegionName { + inputContext.Logger.Warnf("configured region disagrees with queue_url region: %q != %q: using %[1]q", + in.config.RegionName, regionName) + regionName = in.config.RegionName } in.awsConfig.Region = regionName @@ -303,6 +305,8 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli return s3Poller, nil } +var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}") + func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) { // get region from queueURL // Example: https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs @@ -318,7 +322,7 @@ func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) { } } } - return "", fmt.Errorf("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}") + return "", errBadQueueURL } func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName string) (string, error) { diff --git a/x-pack/filebeat/input/awss3/input_test.go b/x-pack/filebeat/input/awss3/input_test.go index 953e8388fc0e..efa7519c3d5e 100644 --- a/x-pack/filebeat/input/awss3/input_test.go +++ b/x-pack/filebeat/input/awss3/input_test.go @@ -5,35 +5,108 @@ package awss3 import ( + "errors" "testing" "github.com/stretchr/testify/assert" ) func TestGetProviderFromDomain(t *testing.T) { - assert.Equal(t, "aws", getProviderFromDomain("", "")) - assert.Equal(t, "aws", getProviderFromDomain("c2s.ic.gov", "")) - assert.Equal(t, "abc", getProviderFromDomain("abc.com", "abc")) - assert.Equal(t, "xyz", getProviderFromDomain("oraclecloud.com", "xyz")) - assert.Equal(t, "aws", getProviderFromDomain("amazonaws.com", "")) - assert.Equal(t, "aws", getProviderFromDomain("c2s.sgov.gov", "")) - assert.Equal(t, "aws", getProviderFromDomain("c2s.ic.gov", "")) - assert.Equal(t, "aws", getProviderFromDomain("amazonaws.com.cn", "")) - assert.Equal(t, "backblaze", getProviderFromDomain("https://backblazeb2.com", "")) - assert.Equal(t, "cloudflare", getProviderFromDomain("https://1234567890.r2.cloudflarestorage.com", "")) - assert.Equal(t, "wasabi", getProviderFromDomain("https://wasabisys.com", "")) - assert.Equal(t, "digitalocean", getProviderFromDomain("https://digitaloceanspaces.com", "")) - assert.Equal(t, "dreamhost", getProviderFromDomain("https://dream.io", "")) - assert.Equal(t, "scaleway", getProviderFromDomain("https://scw.cloud", "")) - assert.Equal(t, "gcp", getProviderFromDomain("https://googleapis.com", "")) - assert.Equal(t, "arubacloud", getProviderFromDomain("https://cloud.it", "")) - assert.Equal(t, "linode", getProviderFromDomain("https://linodeobjects.com", "")) - assert.Equal(t, "vultr", getProviderFromDomain("https://vultrobjects.com", "")) - assert.Equal(t, "ibm", getProviderFromDomain("https://appdomain.cloud", "")) - assert.Equal(t, "alibaba", getProviderFromDomain("https://aliyuncs.com", "")) - assert.Equal(t, "oracle", getProviderFromDomain("https://oraclecloud.com", "")) - assert.Equal(t, "exoscale", getProviderFromDomain("https://exo.io", "")) - assert.Equal(t, "upcloud", getProviderFromDomain("https://upcloudobjects.com", "")) - assert.Equal(t, "iland", getProviderFromDomain("https://ilandcloud.com", "")) - assert.Equal(t, "zadara", getProviderFromDomain("https://zadarazios.com", "")) + tests := []struct { + endpoint string + override string + want string + }{ + {endpoint: "", override: "", want: "aws"}, + {endpoint: "c2s.ic.gov", want: "aws"}, + {endpoint: "abc.com", override: "abc", want: "abc"}, + {endpoint: "oraclecloud.com", override: "xyz", want: "xyz"}, + {endpoint: "amazonaws.com", want: "aws"}, + {endpoint: "c2s.sgov.gov", want: "aws"}, + {endpoint: "c2s.ic.gov", want: "aws"}, + {endpoint: "amazonaws.com.cn", want: "aws"}, + {endpoint: "https://backblazeb2.com", want: "backblaze"}, + {endpoint: "https://1234567890.r2.cloudflarestorage.com", want: "cloudflare"}, + {endpoint: "https://wasabisys.com", want: "wasabi"}, + {endpoint: "https://digitaloceanspaces.com", want: "digitalocean"}, + {endpoint: "https://dream.io", want: "dreamhost"}, + {endpoint: "https://scw.cloud", want: "scaleway"}, + {endpoint: "https://googleapis.com", want: "gcp"}, + {endpoint: "https://cloud.it", want: "arubacloud"}, + {endpoint: "https://linodeobjects.com", want: "linode"}, + {endpoint: "https://vultrobjects.com", want: "vultr"}, + {endpoint: "https://appdomain.cloud", want: "ibm"}, + {endpoint: "https://aliyuncs.com", want: "alibaba"}, + {endpoint: "https://oraclecloud.com", want: "oracle"}, + {endpoint: "https://exo.io", want: "exoscale"}, + {endpoint: "https://upcloudobjects.com", want: "upcloud"}, + {endpoint: "https://ilandcloud.com", want: "iland"}, + {endpoint: "https://zadarazios.com", want: "zadara"}, + } + + for _, test := range tests { + assert.Equal(t, test.want, getProviderFromDomain(test.endpoint, test.override), + "for endpoint=%q and override=%q", test.endpoint, test.override) + } +} + +func TestGetRegionFromQueueURL(t *testing.T) { + tests := []struct { + name string + queueURL string + endpoint string + want string + wantErr error + }{ + { + name: "amazonaws.com_domain_with_blank_endpoint", + queueURL: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + want: "us-east-1", + }, + { + name: "abc.xyz_and_domain_with_matching_endpoint", + queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "abc.xyz", + want: "us-east-1", + }, + { + name: "abc.xyz_and_domain_with_blank_endpoint", + queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + wantErr: errBadQueueURL, + }, + { + name: "abc.xyz_and_domain_with_different_endpoint", + queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "googlecloud.com", + wantErr: errBadQueueURL, + }, + { + name: "invalid_queue_url", + queueURL: ":foo", + wantErr: errors.New(":foo is not a valid URL"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, err := getRegionFromQueueURL(test.queueURL, test.endpoint) + if !sameError(err, test.wantErr) { + t.Errorf("unexpected error: got:%v want:%v", err, test.wantErr) + } + if got != test.want { + t.Errorf("unexpected result: got:%q want:%q", got, test.want) + } + }) + } +} + +func sameError(a, b error) bool { + switch { + case a == nil && b == nil: + return true + case a == nil, b == nil: + return false + default: + return a.Error() == b.Error() + } }