From 0aa6f22d54d4ab6992fe3599f5ad63a2cd73f1d7 Mon Sep 17 00:00:00 2001 From: William Easton Date: Thu, 23 May 2024 20:54:13 -0500 Subject: [PATCH 01/12] Initial fix of endpoint in a backwards compat way --- x-pack/filebeat/input/awss3/input.go | 44 +++++++++++++++++++++------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 2c0372fe561..0b8e4576302 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -70,15 +70,31 @@ type s3Input struct { func newInput(config config, store beater.StateStore) (*s3Input, error) { awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) + // A custom endpoint has been specified! if config.AWSConfig.Endpoint != "" { - // Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint - awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { - return awssdk.Endpoint{ - PartitionID: "aws", - URL: config.AWSConfig.Endpoint, - SigningRegion: awsConfig.Region, - }, nil - }) + endpointUri, err := url.Parse(config.AWSConfig.Endpoint) + + if err != nil { + // Log the error and continue with the default endpoint + fmt.Printf("Failed to parse the endpoint: %v", err) + } + + // For backwards compat: + // If the endpoint does not start with S3, we will use the endpoint resolver to all SDK requests through this endpoint + // If the endpoint does start with S3, we will use the default resolver which can replace s3 with the service name + + if !strings.HasPrefix(endpointUri.Hostname(), "s3") { + // Get the resolver from the endpoint url + awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { + return awssdk.Endpoint{ + PartitionID: "aws", + Source: awssdk.EndpointSourceCustom, + URL: config.AWSConfig.Endpoint, + SigningRegion: awsConfig.Region, + HostnameImmutable: true, + }, nil + }) + } } if err != nil { @@ -112,7 +128,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { defer cancelInputCtx() if in.config.QueueURL != "" { - regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName) + regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.AWSConfig.DefaultRegion) if err != nil && in.config.RegionName == "" { return fmt.Errorf("failed to get AWS region from queue_url: %w", err) } @@ -328,11 +344,17 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg if err != nil { return "", fmt.Errorf(queueURL + " is not a valid URL") } + + e, err := url.Parse(endpoint) + if err != nil { + return "", fmt.Errorf(endpoint + " is not a valid URL") + } + if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" { - queueHostSplit := strings.SplitN(u.Host, ".", 3) + queueHostSplit := strings.SplitN(u.Hostname(), ".", 3) // check for sqs queue url if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" { - if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { + if queueHostSplit[2] == e.Hostname() || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { region = queueHostSplit[1] if defaultRegion != "" && region != defaultRegion { return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion} From 846de8c6c256d52ea1b977dc7153f39b4861d610 Mon Sep 17 00:00:00 2001 From: William Easton Date: Thu, 23 May 2024 21:16:54 -0500 Subject: [PATCH 02/12] do not overwrite a valid region with an invalid one, add parsing of user-provided custom endpoints --- x-pack/filebeat/input/awss3/input.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 0b8e4576302..29be94dd438 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -137,7 +137,10 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { // Warn of mismatch, but go ahead with configured region name. inputContext.Logger.Warnf("%v: using %q", err, regionName) } - in.awsConfig.Region = regionName + + if regionName != "" { + in.awsConfig.Region = regionName + } // Create SQS receiver and S3 notification processor. receiver, err := in.createSQSReceiver(inputContext, pipeline) @@ -353,6 +356,23 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" { queueHostSplit := strings.SplitN(u.Hostname(), ".", 3) // check for sqs queue url + + // Parse a user-provided custom endpoint + if endpoint != "" && len(queueHostSplit) >= 3 && queueHostSplit[0] == "sqs" { + // Check if everything after the first dot in the queue url matches everything after the first dot in the endpoint + endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 2)[1] == strings.SplitN(e.Hostname(), ".", 2)[1] + if !endpointMatchesQueueUrl { + return "", fmt.Errorf("endpoint %q does not match queue_url %q", e.Hostname(), u.Hostname()) + } + + region = queueHostSplit[1] + if defaultRegion != "" && region != defaultRegion { + return region, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion} + } + return region, nil + } + + // Parse a standard SQS url if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" { if queueHostSplit[2] == e.Hostname() || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { region = queueHostSplit[1] From c66e2d9c5e3b2e453ffea3cf798442c5b0604007 Mon Sep 17 00:00:00 2001 From: William Easton Date: Thu, 23 May 2024 21:29:42 -0500 Subject: [PATCH 03/12] Reduce PR size --- x-pack/filebeat/input/awss3/input.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 29be94dd438..932d62e83b0 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -354,7 +354,7 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg } if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" { - queueHostSplit := strings.SplitN(u.Hostname(), ".", 3) + queueHostSplit := strings.SplitN(u.Host, ".", 3) // check for sqs queue url // Parse a user-provided custom endpoint @@ -374,7 +374,7 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg // Parse a standard SQS url if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" { - if queueHostSplit[2] == e.Hostname() || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { + if queueHostSplit[2] == e.Host || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { region = queueHostSplit[1] if defaultRegion != "" && region != defaultRegion { return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion} From 282619b951242d522d5a23d7344c60d5137079fc Mon Sep 17 00:00:00 2001 From: William Easton Date: Thu, 23 May 2024 22:44:54 -0500 Subject: [PATCH 04/12] Add/Fix tests --- x-pack/filebeat/input/awss3/input.go | 49 +++++++++++------------ x-pack/filebeat/input/awss3/input_test.go | 12 ++++++ 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 932d62e83b0..2416327dbad 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -70,35 +70,30 @@ type s3Input struct { func newInput(config config, store beater.StateStore) (*s3Input, error) { awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) - // A custom endpoint has been specified! - if config.AWSConfig.Endpoint != "" { - endpointUri, err := url.Parse(config.AWSConfig.Endpoint) + if err != nil { + return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) + } - if err != nil { - // Log the error and continue with the default endpoint - fmt.Printf("Failed to parse the endpoint: %v", err) - } + endpointUri, err := url.Parse(config.AWSConfig.Endpoint) + + // A custom endpoint has been specified! + if err == nil && config.AWSConfig.Endpoint != "" && !strings.HasPrefix(endpointUri.Hostname(), "s3") { // For backwards compat: // If the endpoint does not start with S3, we will use the endpoint resolver to all SDK requests through this endpoint // If the endpoint does start with S3, we will use the default resolver which can replace s3 with the service name - if !strings.HasPrefix(endpointUri.Hostname(), "s3") { - // Get the resolver from the endpoint url - awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { - return awssdk.Endpoint{ - PartitionID: "aws", - Source: awssdk.EndpointSourceCustom, - URL: config.AWSConfig.Endpoint, - SigningRegion: awsConfig.Region, - HostnameImmutable: true, - }, nil - }) - } - } + // Get the resolver from the endpoint url + awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { + return awssdk.Endpoint{ + PartitionID: "aws", + Source: awssdk.EndpointSourceCustom, + URL: config.AWSConfig.Endpoint, + SigningRegion: awsConfig.Region, + HostnameImmutable: true, + }, nil + }) - if err != nil { - return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) } return &s3Input{ @@ -355,12 +350,13 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" { queueHostSplit := strings.SplitN(u.Host, ".", 3) + endpointSplit := strings.SplitN(e.Host, ".", 3) // check for sqs queue url // Parse a user-provided custom endpoint - if endpoint != "" && len(queueHostSplit) >= 3 && queueHostSplit[0] == "sqs" { - // Check if everything after the first dot in the queue url matches everything after the first dot in the endpoint - endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 2)[1] == strings.SplitN(e.Hostname(), ".", 2)[1] + if endpoint != "" && queueHostSplit[0] == "sqs" && len(queueHostSplit) >= 3 && len(endpointSplit) >= 3 { + // Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint + endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 2)[2] == strings.SplitN(e.Hostname(), ".", 2)[2] if !endpointMatchesQueueUrl { return "", fmt.Errorf("endpoint %q does not match queue_url %q", e.Hostname(), u.Hostname()) } @@ -374,7 +370,8 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg // Parse a standard SQS url if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" { - if queueHostSplit[2] == e.Host || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { + // handle endpoint with no scheme, handle endpoint with scheme + if queueHostSplit[2] == endpoint || queueHostSplit[2] == e.Host || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { region = queueHostSplit[1] if defaultRegion != "" && region != defaultRegion { return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion} diff --git a/x-pack/filebeat/input/awss3/input_test.go b/x-pack/filebeat/input/awss3/input_test.go index abc9f5c9a6a..fdd9d691a18 100644 --- a/x-pack/filebeat/input/awss3/input_test.go +++ b/x-pack/filebeat/input/awss3/input_test.go @@ -69,6 +69,18 @@ func TestGetRegionFromQueueURL(t *testing.T) { endpoint: "abc.xyz", want: "us-east-1", }, + { + name: "abc.xyz_and_domain_with_matching_endpoint_and_scheme", + queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://abc.xyz", + want: "us-east-1", + }, + { + name: "abc.xyz_and_domain_with_matching_url_endpoint", + queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.abc.xyz", + want: "us-east-1", + }, { name: "abc.xyz_and_domain_with_blank_endpoint", queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", From 534610745a96d3c1cfca1541575937b183be1d33 Mon Sep 17 00:00:00 2001 From: William Easton Date: Thu, 23 May 2024 22:47:16 -0500 Subject: [PATCH 05/12] Comment Cleanup --- x-pack/filebeat/input/awss3/input.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 2416327dbad..91e9d7f4772 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -81,9 +81,8 @@ func newInput(config config, store beater.StateStore) (*s3Input, error) { // For backwards compat: // If the endpoint does not start with S3, we will use the endpoint resolver to all SDK requests through this endpoint - // If the endpoint does start with S3, we will use the default resolver which can replace s3 with the service name + // If the endpoint does start with S3, we will use the default resolver which can replace s3 with the desired service name like sqs - // Get the resolver from the endpoint url awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { return awssdk.Endpoint{ PartitionID: "aws", @@ -336,6 +335,7 @@ var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (region string, err error) { // get region from queueURL + // Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs // Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs // Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue u, err := url.Parse(queueURL) From 702fe8a6d2e64a5ea073d84b71e33e06b1e255d7 Mon Sep 17 00:00:00 2001 From: William Easton Date: Thu, 23 May 2024 23:06:48 -0500 Subject: [PATCH 06/12] small fix for custom domain endpoint url matchin --- x-pack/filebeat/input/awss3/input.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 91e9d7f4772..0ca015b5c8b 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -354,9 +354,9 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg // check for sqs queue url // Parse a user-provided custom endpoint - if endpoint != "" && queueHostSplit[0] == "sqs" && len(queueHostSplit) >= 3 && len(endpointSplit) >= 3 { + if endpoint != "" && queueHostSplit[0] == "sqs" && len(queueHostSplit) == 3 && len(endpointSplit) == 3 { // Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint - endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 2)[2] == strings.SplitN(e.Hostname(), ".", 2)[2] + endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 3)[2] == strings.SplitN(e.Hostname(), ".", 3)[2] if !endpointMatchesQueueUrl { return "", fmt.Errorf("endpoint %q does not match queue_url %q", e.Hostname(), u.Hostname()) } From c3c3806b113dce14b3217b2a88a02ed354d16fa3 Mon Sep 17 00:00:00 2001 From: William Easton Date: Thu, 23 May 2024 23:46:05 -0500 Subject: [PATCH 07/12] parsed region does not match default_region and region config is empty, warn, not error --- x-pack/filebeat/input/awss3/input.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 0ca015b5c8b..89e9bda1144 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -123,7 +123,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { if in.config.QueueURL != "" { regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.AWSConfig.DefaultRegion) - if err != nil && in.config.RegionName == "" { + if err != nil && regionName != "" && in.config.RegionName == "" { return fmt.Errorf("failed to get AWS region from queue_url: %w", err) } var warn regionMismatchError @@ -358,7 +358,8 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg // Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 3)[2] == strings.SplitN(e.Hostname(), ".", 3)[2] if !endpointMatchesQueueUrl { - return "", fmt.Errorf("endpoint %q does not match queue_url %q", e.Hostname(), u.Hostname()) + // We cannot infer the region by matching the endpoint and queue url + return "", regionMismatchError{queueURLRegion: queueHostSplit[1], defaultRegion: endpointSplit[1]} } region = queueHostSplit[1] From 6416b6841973b6a6fa23ec0cfe8bf17eaca50479 Mon Sep 17 00:00:00 2001 From: William Easton Date: Fri, 24 May 2024 07:12:59 -0500 Subject: [PATCH 08/12] Small fixes before PR review --- x-pack/filebeat/input/awss3/input.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 89e9bda1144..a8da8e42b00 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -80,8 +80,8 @@ func newInput(config config, store beater.StateStore) (*s3Input, error) { if err == nil && config.AWSConfig.Endpoint != "" && !strings.HasPrefix(endpointUri.Hostname(), "s3") { // For backwards compat: - // If the endpoint does not start with S3, we will use the endpoint resolver to all SDK requests through this endpoint - // If the endpoint does start with S3, we will use the default resolver which can replace s3 with the desired service name like sqs + // If the endpoint does not start with S3, we will use the endpoint resolver to make all SDK requests use the specified endpoint + // If the endpoint does start with S3, we will use the default resolver uses the endpoint field but can replace s3 with the desired service name like sqs awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { return awssdk.Endpoint{ @@ -123,8 +123,10 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { if in.config.QueueURL != "" { regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.AWSConfig.DefaultRegion) - if err != nil && regionName != "" && in.config.RegionName == "" { - return fmt.Errorf("failed to get AWS region from queue_url: %w", err) + + // If we can't get a region from anywhere, error out + if err != nil && regionName == "" && in.config.RegionName == "" { + return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", err) } var warn regionMismatchError if errors.As(err, &warn) { @@ -132,6 +134,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { inputContext.Logger.Warnf("%v: using %q", err, regionName) } + // Ensure we don't overwrite region when getRegionFromURL fails if regionName != "" { in.awsConfig.Region = regionName } From 46b89ec7eea83f8ef7cf93e662bc46256b3a10c2 Mon Sep 17 00:00:00 2001 From: William Easton Date: Fri, 24 May 2024 07:57:20 -0500 Subject: [PATCH 09/12] Don't overwrite user's specified region if it exists, and correctly use the default_region if a custom endpoint is parsed --- x-pack/filebeat/input/awss3/input.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index a8da8e42b00..cb8f87f1155 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -135,7 +135,8 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { } // Ensure we don't overwrite region when getRegionFromURL fails - if regionName != "" { + // Ensure we don't overwrite a user-specified region with a parsed region. + if regionName != "" && in.config.RegionName == "" { in.awsConfig.Region = regionName } @@ -361,8 +362,9 @@ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (reg // Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 3)[2] == strings.SplitN(e.Hostname(), ".", 3)[2] if !endpointMatchesQueueUrl { - // We cannot infer the region by matching the endpoint and queue url - return "", regionMismatchError{queueURLRegion: queueHostSplit[1], defaultRegion: endpointSplit[1]} + // We couldn't resolve the URL + // We cannot infer the region by matching the endpoint and queue url, return the default region with a region mismatch warning + return defaultRegion, regionMismatchError{queueURLRegion: queueHostSplit[1], defaultRegion: endpointSplit[1]} } region = queueHostSplit[1] From 1f8e8d94c15aa095fed0abc83603efd6a35bd522 Mon Sep 17 00:00:00 2001 From: William Easton Date: Fri, 24 May 2024 15:57:41 -0500 Subject: [PATCH 10/12] Implementing PR feedback --- x-pack/filebeat/input/awss3/input.go | 30 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index cb8f87f1155..5e1008d9090 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -69,30 +69,30 @@ type s3Input struct { func newInput(config config, store beater.StateStore) (*s3Input, error) { awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) - if err != nil { return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) } - endpointUri, err := url.Parse(config.AWSConfig.Endpoint) - // A custom endpoint has been specified! - if err == nil && config.AWSConfig.Endpoint != "" && !strings.HasPrefix(endpointUri.Hostname(), "s3") { + if config.AWSConfig.Endpoint != "" { + endpointUri, err := url.Parse(config.AWSConfig.Endpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse endpoint as url/domain: %w", err) + } // For backwards compat: // If the endpoint does not start with S3, we will use the endpoint resolver to make all SDK requests use the specified endpoint // If the endpoint does start with S3, we will use the default resolver uses the endpoint field but can replace s3 with the desired service name like sqs - - awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { - return awssdk.Endpoint{ - PartitionID: "aws", - Source: awssdk.EndpointSourceCustom, - URL: config.AWSConfig.Endpoint, - SigningRegion: awsConfig.Region, - HostnameImmutable: true, - }, nil - }) - + if !strings.HasPrefix(endpointUri.Hostname(), "s3") { + awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { + return awssdk.Endpoint{ + PartitionID: "aws", + Source: awssdk.EndpointSourceCustom, + URL: config.AWSConfig.Endpoint, + SigningRegion: awsConfig.Region, + }, nil + }) + } } return &s3Input{ From 4956d1007173f5c38c25a111a0e99ce1bf2051c8 Mon Sep 17 00:00:00 2001 From: William Easton Date: Fri, 24 May 2024 19:04:10 -0500 Subject: [PATCH 11/12] Add working integration tests --- x-pack/filebeat/input/awss3/input.go | 17 +- .../input/awss3/input_integration_test.go | 168 ++++++++++++++++++ 2 files changed, 184 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 5e1008d9090..b62f816fc3b 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -73,11 +73,19 @@ func newInput(config config, store beater.StateStore) (*s3Input, error) { return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) } + // The awsConfig now contains the region from the credential profile or default region + // if the region is explicitly set in the config, then it wins + if config.RegionName != "" { + awsConfig.Region = config.RegionName + } + // A custom endpoint has been specified! if config.AWSConfig.Endpoint != "" { + + // Parse a URL for the host regardless of it missing the scheme endpointUri, err := url.Parse(config.AWSConfig.Endpoint) if err != nil { - return nil, fmt.Errorf("failed to parse endpoint as url/domain: %w", err) + return nil, fmt.Errorf("failed to parse endpoint: %w", err) } // For backwards compat: @@ -203,7 +211,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s if in.config.AWSConfig.FIPSEnabled { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } + if in.config.AWSConfig.Endpoint != "" { + o.EndpointResolver = sqs.EndpointResolverFromURL(in.config.AWSConfig.Endpoint) + } }), + queueURL: in.config.QueueURL, apiTimeout: in.config.APITimeout, visibilityTimeout: in.config.VisibilityTimeout, @@ -215,6 +227,9 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s if in.config.AWSConfig.FIPSEnabled { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } + if in.config.AWSConfig.Endpoint != "" { + o.EndpointResolver = s3.EndpointResolverFromURL(in.config.AWSConfig.Endpoint) + } o.UsePathStyle = in.config.PathStyle }), } diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 62cbc835011..f5755fac449 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -262,6 +262,174 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } +func TestInputRunSQSWithConfig(t *testing.T) { + tests := []struct { + name string + queue_url string + endpoint string + region string + default_region string + want string + wantErr error + }{ + { + name: "no region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + want: "us-east-1", + }, + { + name: "no region but with long endpoint", + queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.abc.xyz", + want: "us-east-1", + }, + { + name: "no region but with short endpoint", + queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://abc.xyz", + want: "us-east-1", + }, + { + name: "no region custom queue domain", + queue_url: "https://sqs.us-east-1.xyz.abc/627959692251/test-s3-logs", + wantErr: errBadQueueURL, + }, + { + name: "region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + want: "us-west-2", + }, + { + name: "default_region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + default_region: "us-west-2", + want: "us-west-2", + }, + { + name: "region and default_region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-east-2", + default_region: "us-east-3", + want: "us-east-2", + }, + { + name: "short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + endpoint: "https://amazonaws.com", + want: "us-east-1", + }, + { + name: "long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.amazonaws.com", + want: "us-east-1", + }, + { + name: "region and custom short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://.elastic.co", + want: "us-west-2", + }, + { + name: "region and custom long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://s3.us-east-1.elastic.co", + want: "us-west-2", + }, + { + name: "region and short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://amazonaws.com", + want: "us-west-2", + }, + { + name: "region and long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://s3.us-east-1.amazonaws.com", + want: "us-west-2", + }, + { + name: "region and default region and short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + default_region: "us-east-1", + endpoint: "https://amazonaws.com", + want: "us-west-2", + }, + { + name: "region and default region and long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + default_region: "us-east-1", + endpoint: "https://s3.us-east-1.amazonaws.com", + want: "us-west-2", + }, + } + + for _, test := range tests { + logp.TestingSetup() + + // Create a filebeat config using the provided test parameters + config := "" + if test.queue_url != "" { + config += fmt.Sprintf("queue_url: %s \n", test.queue_url) + } + if test.region != "" { + config += fmt.Sprintf("region: %s \n", test.region) + } + if test.default_region != "" { + config += fmt.Sprintf("default_region: %s \n", test.default_region) + } + if test.endpoint != "" { + config += fmt.Sprintf("endpoint: %s \n", test.endpoint) + } + + s3Input := createInput(t, conf.MustNewConfigFrom(config)) + + inputCtx, cancel := newV2Context() + t.Cleanup(cancel) + time.AfterFunc(5*time.Second, func() { + cancel() + }) + + var errGroup errgroup.Group + errGroup.Go(func() error { + return s3Input.Run(inputCtx, &fakePipeline{}) + }) + + if err := errGroup.Wait(); err != nil { + // assert that err == test.wantErr + if test.wantErr != nil { + continue + } + // Print the test name to help identify the failing test + t.Fatal(test.name, err) + } + + // If the endpoint starts with s3, the endpoint resolver should be null at this point + // If the endpoint does not start with s3, the endpointresolverwithoptions should be set + // If the endpoint is not set, the endpoint resolver should be null + if test.endpoint == "" { + assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name) + assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name) + } else if strings.HasPrefix(test.endpoint, "https://s3") { + // S3 resolvers are added later in the code than this integration test covers + assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name) + assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name) + } else { // If the endpoint is specified but is not s3 + assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name) + assert.NotNil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name) + } + + assert.EqualValues(t, test.want, s3Input.awsConfig.Region, test.name) + } +} + func TestInputRunSQS(t *testing.T) { logp.TestingSetup() From 685503ae6ff79091361c453fa9b24b63513caf39 Mon Sep 17 00:00:00 2001 From: William Easton Date: Tue, 28 May 2024 13:31:56 -0500 Subject: [PATCH 12/12] Change a 6 to a 7 --- x-pack/filebeat/input/awss3/input_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index f5755fac449..aa6874b9216 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -521,7 +521,7 @@ func TestInputRunS3(t *testing.T) { assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3ObjectsListedTotal.Get(), 8) assert.EqualValues(t, s3Input.metrics.s3ObjectsProcessedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 6) + assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) }