diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 2c0372fe561..b62f816fc3b 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -69,20 +69,38 @@ type s3Input struct { func newInput(config config, store beater.StateStore) (*s3Input, error) { awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) + if err != nil { + return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) + } - if config.AWSConfig.Endpoint != "" { - // Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint - awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { - return awssdk.Endpoint{ - PartitionID: "aws", - URL: config.AWSConfig.Endpoint, - SigningRegion: awsConfig.Region, - }, nil - }) + // The awsConfig now contains the region from the credential profile or default region + // if the region is explicitly set in the config, then it wins + if config.RegionName != "" { + awsConfig.Region = config.RegionName } - if err != nil { - return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) + // A custom endpoint has been specified! + if config.AWSConfig.Endpoint != "" { + + // Parse a URL for the host regardless of it missing the scheme + endpointUri, err := url.Parse(config.AWSConfig.Endpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse endpoint: %w", err) + } + + // For backwards compat: + // If the endpoint does not start with S3, we will use the endpoint resolver to make all SDK requests use the specified endpoint + // If the endpoint does start with S3, we will use the default resolver uses the endpoint field but can replace s3 with the desired service name like sqs + if !strings.HasPrefix(endpointUri.Hostname(), "s3") { + awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { + return awssdk.Endpoint{ + PartitionID: "aws", + Source: awssdk.EndpointSourceCustom, + URL: config.AWSConfig.Endpoint, + SigningRegion: awsConfig.Region, + }, nil + }) + } } return &s3Input{ @@ -112,16 +130,23 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { defer cancelInputCtx() if in.config.QueueURL != "" { - regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName) - if err != nil && in.config.RegionName == "" { - return fmt.Errorf("failed to get AWS region from queue_url: %w", err) + regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.AWSConfig.DefaultRegion) + + // If we can't get a region from anywhere, error out + if err != nil && regionName == "" && in.config.RegionName == "" { + return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", err) } var warn regionMismatchError if errors.As(err, &warn) { // Warn of mismatch, but go ahead with configured region name. inputContext.Logger.Warnf("%v: using %q", err, regionName) } - in.awsConfig.Region = regionName + + // Ensure we don't overwrite region when getRegionFromURL fails + // Ensure we don't overwrite a user-specified region with a parsed region. + if regionName != "" && in.config.RegionName == "" { + in.awsConfig.Region = regionName + } // Create SQS receiver and S3 notification processor. receiver, err := in.createSQSReceiver(inputContext, pipeline) @@ -186,7 +211,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s if in.config.AWSConfig.FIPSEnabled { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } + if in.config.AWSConfig.Endpoint != "" { + o.EndpointResolver = sqs.EndpointResolverFromURL(in.config.AWSConfig.Endpoint) + } }), + queueURL: in.config.QueueURL, apiTimeout: in.config.APITimeout, visibilityTimeout: in.config.VisibilityTimeout, @@ -198,6 +227,9 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s if in.config.AWSConfig.FIPSEnabled { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } + if in.config.AWSConfig.Endpoint != "" { + o.EndpointResolver = s3.EndpointResolverFromURL(in.config.AWSConfig.Endpoint) + } o.UsePathStyle = in.config.PathStyle }), } @@ -322,17 +354,45 @@ var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (region string, err error) { // get region from queueURL + // Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs // Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs // Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue u, err := url.Parse(queueURL) if err != nil { return "", fmt.Errorf(queueURL + " is not a valid URL") } + + e, err := url.Parse(endpoint) + if err != nil { + return "", fmt.Errorf(endpoint + " is not a valid URL") + } + if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" { queueHostSplit := strings.SplitN(u.Host, ".", 3) + endpointSplit := strings.SplitN(e.Host, ".", 3) // check for sqs queue url + + // Parse a user-provided custom endpoint + if endpoint != "" && queueHostSplit[0] == "sqs" && len(queueHostSplit) == 3 && len(endpointSplit) == 3 { + // Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint + endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 3)[2] == strings.SplitN(e.Hostname(), ".", 3)[2] + if !endpointMatchesQueueUrl { + // We couldn't resolve the URL + // We cannot infer the region by matching the endpoint and queue url, return the default region with a region mismatch warning + return defaultRegion, regionMismatchError{queueURLRegion: queueHostSplit[1], defaultRegion: endpointSplit[1]} + } + + region = queueHostSplit[1] + if defaultRegion != "" && region != defaultRegion { + return region, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion} + } + return region, nil + } + + // Parse a standard SQS url if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" { - if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { + // handle endpoint with no scheme, handle endpoint with scheme + if queueHostSplit[2] == endpoint || queueHostSplit[2] == e.Host || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { region = queueHostSplit[1] if defaultRegion != "" && region != defaultRegion { return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion} diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 62cbc835011..aa6874b9216 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -262,6 +262,174 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } +func TestInputRunSQSWithConfig(t *testing.T) { + tests := []struct { + name string + queue_url string + endpoint string + region string + default_region string + want string + wantErr error + }{ + { + name: "no region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + want: "us-east-1", + }, + { + name: "no region but with long endpoint", + queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.abc.xyz", + want: "us-east-1", + }, + { + name: "no region but with short endpoint", + queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://abc.xyz", + want: "us-east-1", + }, + { + name: "no region custom queue domain", + queue_url: "https://sqs.us-east-1.xyz.abc/627959692251/test-s3-logs", + wantErr: errBadQueueURL, + }, + { + name: "region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + want: "us-west-2", + }, + { + name: "default_region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + default_region: "us-west-2", + want: "us-west-2", + }, + { + name: "region and default_region", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-east-2", + default_region: "us-east-3", + want: "us-east-2", + }, + { + name: "short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + endpoint: "https://amazonaws.com", + want: "us-east-1", + }, + { + name: "long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.amazonaws.com", + want: "us-east-1", + }, + { + name: "region and custom short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://.elastic.co", + want: "us-west-2", + }, + { + name: "region and custom long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://s3.us-east-1.elastic.co", + want: "us-west-2", + }, + { + name: "region and short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://amazonaws.com", + want: "us-west-2", + }, + { + name: "region and long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + endpoint: "https://s3.us-east-1.amazonaws.com", + want: "us-west-2", + }, + { + name: "region and default region and short_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + default_region: "us-east-1", + endpoint: "https://amazonaws.com", + want: "us-west-2", + }, + { + name: "region and default region and long_endpoint", + queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + region: "us-west-2", + default_region: "us-east-1", + endpoint: "https://s3.us-east-1.amazonaws.com", + want: "us-west-2", + }, + } + + for _, test := range tests { + logp.TestingSetup() + + // Create a filebeat config using the provided test parameters + config := "" + if test.queue_url != "" { + config += fmt.Sprintf("queue_url: %s \n", test.queue_url) + } + if test.region != "" { + config += fmt.Sprintf("region: %s \n", test.region) + } + if test.default_region != "" { + config += fmt.Sprintf("default_region: %s \n", test.default_region) + } + if test.endpoint != "" { + config += fmt.Sprintf("endpoint: %s \n", test.endpoint) + } + + s3Input := createInput(t, conf.MustNewConfigFrom(config)) + + inputCtx, cancel := newV2Context() + t.Cleanup(cancel) + time.AfterFunc(5*time.Second, func() { + cancel() + }) + + var errGroup errgroup.Group + errGroup.Go(func() error { + return s3Input.Run(inputCtx, &fakePipeline{}) + }) + + if err := errGroup.Wait(); err != nil { + // assert that err == test.wantErr + if test.wantErr != nil { + continue + } + // Print the test name to help identify the failing test + t.Fatal(test.name, err) + } + + // If the endpoint starts with s3, the endpoint resolver should be null at this point + // If the endpoint does not start with s3, the endpointresolverwithoptions should be set + // If the endpoint is not set, the endpoint resolver should be null + if test.endpoint == "" { + assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name) + assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name) + } else if strings.HasPrefix(test.endpoint, "https://s3") { + // S3 resolvers are added later in the code than this integration test covers + assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name) + assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name) + } else { // If the endpoint is specified but is not s3 + assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name) + assert.NotNil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name) + } + + assert.EqualValues(t, test.want, s3Input.awsConfig.Region, test.name) + } +} + func TestInputRunSQS(t *testing.T) { logp.TestingSetup() @@ -353,7 +521,7 @@ func TestInputRunS3(t *testing.T) { assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3ObjectsListedTotal.Get(), 8) assert.EqualValues(t, s3Input.metrics.s3ObjectsProcessedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 6) + assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) } diff --git a/x-pack/filebeat/input/awss3/input_test.go b/x-pack/filebeat/input/awss3/input_test.go index abc9f5c9a6a..fdd9d691a18 100644 --- a/x-pack/filebeat/input/awss3/input_test.go +++ b/x-pack/filebeat/input/awss3/input_test.go @@ -69,6 +69,18 @@ func TestGetRegionFromQueueURL(t *testing.T) { endpoint: "abc.xyz", want: "us-east-1", }, + { + name: "abc.xyz_and_domain_with_matching_endpoint_and_scheme", + queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://abc.xyz", + want: "us-east-1", + }, + { + name: "abc.xyz_and_domain_with_matching_url_endpoint", + queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs", + endpoint: "https://s3.us-east-1.abc.xyz", + want: "us-east-1", + }, { name: "abc.xyz_and_domain_with_blank_endpoint", queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",