Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.14] Fix handling of custom Endpoint when using S3 + SQS #39709

Merged
merged 14 commits into from
May 28, 2024
Merged
71 changes: 58 additions & 13 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,29 @@ type s3Input struct {
func newInput(config config, store beater.StateStore) (*s3Input, error) {
awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig)

strawgate marked this conversation as resolved.
Show resolved Hide resolved
if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
if err != nil {
return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err)
}

endpointUri, err := url.Parse(config.AWSConfig.Endpoint)
strawgate marked this conversation as resolved.
Show resolved Hide resolved

// A custom endpoint has been specified!
if err == nil && config.AWSConfig.Endpoint != "" && !strings.HasPrefix(endpointUri.Hostname(), "s3") {
strawgate marked this conversation as resolved.
Show resolved Hide resolved
strawgate marked this conversation as resolved.
Show resolved Hide resolved

// For backwards compat:
// If the endpoint does not start with S3, we will use the endpoint resolver to make all SDK requests use the specified endpoint
// If the endpoint does start with S3, we will use the default resolver uses the endpoint field but can replace s3 with the desired service name like sqs

awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
PartitionID: "aws",
Source: awssdk.EndpointSourceCustom,
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
HostnameImmutable: true,
strawgate marked this conversation as resolved.
Show resolved Hide resolved
}, nil
})
}

if err != nil {
return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err)
}

return &s3Input{
Expand Down Expand Up @@ -112,16 +122,23 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
defer cancelInputCtx()

if in.config.QueueURL != "" {
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName)
if err != nil && in.config.RegionName == "" {
return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.AWSConfig.DefaultRegion)
strawgate marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's uncommon to return an error and a value that the caller should use. Typically these are mutually exclusive. You either get an error OR you get values that you should use. I suggest trying to a do a small bit of refactoring to keep with those conventions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@faec has refactored basically all of this plugin on main including undoing this but it's too different to backport.

I made a series of integration tests which cover all the various combinations of settings but I'm worried refactoring this might not be worth it given it's all going away soon


// If we can't get a region from anywhere, error out
if err != nil && regionName == "" && in.config.RegionName == "" {
return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", err)
}
var warn regionMismatchError
if errors.As(err, &warn) {
// Warn of mismatch, but go ahead with configured region name.
inputContext.Logger.Warnf("%v: using %q", err, regionName)
}
in.awsConfig.Region = regionName

// Ensure we don't overwrite region when getRegionFromURL fails
// Ensure we don't overwrite a user-specified region with a parsed region.
if regionName != "" && in.config.RegionName == "" {
strawgate marked this conversation as resolved.
Show resolved Hide resolved
in.awsConfig.Region = regionName
}

// Create SQS receiver and S3 notification processor.
receiver, err := in.createSQSReceiver(inputContext, pipeline)
Expand Down Expand Up @@ -322,17 +339,45 @@ var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_

func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (region string, err error) {
// get region from queueURL
// Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
u, err := url.Parse(queueURL)
if err != nil {
return "", fmt.Errorf(queueURL + " is not a valid URL")
}

e, err := url.Parse(endpoint)
if err != nil {
return "", fmt.Errorf(endpoint + " is not a valid URL")
}

if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" {
queueHostSplit := strings.SplitN(u.Host, ".", 3)
endpointSplit := strings.SplitN(e.Host, ".", 3)
// check for sqs queue url

// Parse a user-provided custom endpoint
if endpoint != "" && queueHostSplit[0] == "sqs" && len(queueHostSplit) == 3 && len(endpointSplit) == 3 {
// Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint
endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 3)[2] == strings.SplitN(e.Hostname(), ".", 3)[2]
strawgate marked this conversation as resolved.
Show resolved Hide resolved
if !endpointMatchesQueueUrl {
// We couldn't resolve the URL
// We cannot infer the region by matching the endpoint and queue url, return the default region with a region mismatch warning
return defaultRegion, regionMismatchError{queueURLRegion: queueHostSplit[1], defaultRegion: endpointSplit[1]}
}

region = queueHostSplit[1]
if defaultRegion != "" && region != defaultRegion {
return region, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
}
return region, nil
}

// Parse a standard SQS url
if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" {
if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
// handle endpoint with no scheme, handle endpoint with scheme
if queueHostSplit[2] == endpoint || queueHostSplit[2] == e.Host || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) {
region = queueHostSplit[1]
if defaultRegion != "" && region != defaultRegion {
return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion}
Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/awss3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ func TestGetRegionFromQueueURL(t *testing.T) {
endpoint: "abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_endpoint_and_scheme",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_url_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "https://s3.us-east-1.abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_blank_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
Expand Down
Loading