Skip to content

Commit

Permalink
[8.15](backport #40309) x-pack/filebeat/input/awss3: allow cross-regi…
Browse files Browse the repository at this point in the history
…on bucket configuration (#40371)

* x-pack/filebeat/input/awss3: allow cross-region bucket configuration (#40309)

(cherry picked from commit e177fc5)

* remove irrelevant changelog entries

---------

Co-authored-by: Dan Kortschak <[email protected]>
  • Loading branch information
mergify[bot] and efd6 authored Jul 29, 2024
1 parent f218b32 commit 5054962
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Relax constraint on Base DN in entity analytics Active Directory provider. {pull}40054[40054]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]
- Allow cross-region bucket configuration in s3 input. {issue}22161[22161] {pull}40309[40309]

*Auditbeat*

Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ func newConstantS3(t testing.TB) *constantS3 {
}
}

func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
func (c constantS3) GetObject(ctx context.Context, _, bucket, key string) (*s3.GetObjectOutput, error) {
return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil
}

func (c constantS3) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
func (c constantS3) CopyObject(ctx context.Context, _, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
return nil, nil
}

func (c constantS3) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) {
func (c constantS3) DeleteObject(ctx context.Context, _, bucket, key string) (*s3.DeleteObjectOutput, error) {
return nil, nil
}

Expand Down
77 changes: 68 additions & 9 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"net/url"
"sync"
"time"

smithyhttp "github.com/aws/smithy-go/transport/http"
Expand All @@ -30,6 +31,8 @@ import (
//go:generate go install github.com/golang/mock/[email protected]
//go:generate mockgen -source=interfaces.go -destination=mock_interfaces_test.go -package awss3 -mock_names=sqsAPI=MockSQSAPI,sqsProcessor=MockSQSProcessor,s3API=MockS3API,s3Pager=MockS3Pager,s3ObjectHandlerFactory=MockS3ObjectHandlerFactory,s3ObjectHandler=MockS3ObjectHandler
//go:generate mockgen -destination=mock_publisher_test.go -package=awss3 -mock_names=Client=MockBeatClient,Pipeline=MockBeatPipeline github.com/elastic/beats/v7/libbeat/beat Client,Pipeline
//go:generate go-licenser -license Elastic .
//go:generate goimports -w -local github.com/elastic .

// ------
// SQS interfaces
Expand Down Expand Up @@ -79,12 +82,12 @@ type s3API interface {
}

type s3Getter interface {
GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error)
GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error)
}

type s3Mover interface {
CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error)
DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error)
CopyObject(ctx context.Context, region, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error)
DeleteObject(ctx context.Context, region, bucket, key string) (*s3.DeleteObjectOutput, error)
}

type s3Lister interface {
Expand Down Expand Up @@ -227,10 +230,23 @@ func (a *awsSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAt

type awsS3API struct {
client *s3.Client

// others is the set of other clients referred
// to by notifications seen by the API connection.
// The number of cached elements is limited to
// awsS3APIcacheMax.
mu sync.RWMutex
others map[string]*s3.Client
}

const awsS3APIcacheMax = 100

func newAWSs3API(cli *s3.Client) *awsS3API {
return &awsS3API{client: cli, others: make(map[string]*s3.Client)}
}

func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
getObjectOutput, err := a.client.GetObject(ctx, &s3.GetObjectInput{
func (a *awsS3API) GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error) {
getObjectOutput, err := a.clientFor(region).GetObject(ctx, &s3.GetObjectInput{
Bucket: awssdk.String(bucket),
Key: awssdk.String(key),
}, s3.WithAPIOptions(
Expand Down Expand Up @@ -262,8 +278,8 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb
return getObjectOutput, nil
}

func (a *awsS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
copyObjectOutput, err := a.client.CopyObject(ctx, &s3.CopyObjectInput{
func (a *awsS3API) CopyObject(ctx context.Context, region, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
copyObjectOutput, err := a.clientFor(region).CopyObject(ctx, &s3.CopyObjectInput{
Bucket: awssdk.String(to_bucket),
CopySource: awssdk.String(fmt.Sprintf("%s/%s", from_bucket, from_key)),
Key: awssdk.String(to_key),
Expand All @@ -274,8 +290,8 @@ func (a *awsS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_
return copyObjectOutput, nil
}

func (a *awsS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) {
deleteObjectOutput, err := a.client.DeleteObject(ctx, &s3.DeleteObjectInput{
func (a *awsS3API) DeleteObject(ctx context.Context, region, bucket, key string) (*s3.DeleteObjectOutput, error) {
deleteObjectOutput, err := a.clientFor(region).DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: awssdk.String(bucket),
Key: awssdk.String(key),
})
Expand All @@ -285,6 +301,49 @@ func (a *awsS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.De
return deleteObjectOutput, nil
}

func (a *awsS3API) clientFor(region string) *s3.Client {
// Conditionally replace the client if the region of
// the request does not match the pre-prepared client.
opts := a.client.Options()
if opts.Region == region {
return a.client
}
// Use a cached client if we have already seen this region.
a.mu.RLock()
cli, ok := a.others[region]
a.mu.RUnlock()
if ok {
return cli
}

a.mu.Lock()
defer a.mu.Unlock()

// Check that another writer did not beat us here.
cli, ok = a.others[region]
if ok {
// ... they did.
return cli
}

// Otherwise create a new client and cache it.
opts.Region = region
cli = s3.New(opts)
// We should never be in the situation that the cache
// grows unbounded, but ensure this is the case.
if len(a.others) >= awsS3APIcacheMax {
// Do a single iteration delete to perform a
// random cache eviction.
for r := range a.others {
delete(a.others, r)
break
}
}
a.others[region] = cli

return cli
}

func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager {
pager := s3.NewListObjectsV2Paginator(a.client, &s3.ListObjectsV2Input{
Bucket: awssdk.String(bucket),
Expand Down
48 changes: 24 additions & 24 deletions x-pack/filebeat/input/awss3/mock_interfaces_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ func createS3API(ctx context.Context, config config, awsConfig awssdk.Config) (*
s3Client = s3.NewFromConfig(awsConfig, config.s3ConfigModifier)
}

return &awsS3API{
client: s3Client,
}, nil
return newAWSs3API(s3Client), nil
}

func createPipelineClient(pipeline beat.Pipeline) (beat.Client, error) {
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (p *s3ObjectProcessor) ProcessS3Object() error {
// Content-Type and reader to get the object's contents. The caller must
// close the returned reader.
func (p *s3ObjectProcessor) download() (contentType string, metadata map[string]interface{}, body io.ReadCloser, err error) {
getObjectOutput, err := p.s3.GetObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
getObjectOutput, err := p.s3.GetObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
if err != nil {
return "", nil, nil, err
}
Expand Down Expand Up @@ -441,14 +441,14 @@ func (p *s3ObjectProcessor) FinalizeS3Object() error {
return nil
}
backupKey := p.backupConfig.BackupToBucketPrefix + p.s3Obj.S3.Object.Key
_, err := p.s3.CopyObject(p.ctx, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey)
_, err := p.s3.CopyObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey)
if err != nil {
return fmt.Errorf("failed to copy object to backup bucket: %w", err)
}
if !p.backupConfig.Delete {
return nil
}
_, err = p.s3.DeleteObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
_, err = p.s3.DeleteObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
if err != nil {
return fmt.Errorf("failed to delete object from bucket: %w", err)
}
Expand Down
16 changes: 8 additions & 8 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestS3ObjectProcessor(t *testing.T) {
s3Event := newS3Event("log.txt")

mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{})
Expand All @@ -175,7 +175,7 @@ func TestS3ObjectProcessor(t *testing.T) {
s3Event := newS3Event("log.txt")

mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{})
Expand All @@ -197,7 +197,7 @@ func TestS3ObjectProcessor(t *testing.T) {
var events []beat.Event
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
mockPublisher.EXPECT().
Publish(gomock.Any()).
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestS3ObjectProcessor(t *testing.T) {

gomock.InOrder(
mockS3API.EXPECT().
CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil),
)

Expand All @@ -254,10 +254,10 @@ func TestS3ObjectProcessor(t *testing.T) {

gomock.InOrder(
mockS3API.EXPECT().
CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil),
mockS3API.EXPECT().
DeleteObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
DeleteObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil),
)

Expand All @@ -284,7 +284,7 @@ func TestS3ObjectProcessor(t *testing.T) {

gomock.InOrder(
mockS3API.EXPECT().
CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq("backup/testdata/log.txt")).
CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq("backup/testdata/log.txt")).
Return(nil, nil),
)

Expand Down Expand Up @@ -326,7 +326,7 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int,
var events []beat.Event
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
mockPublisher.EXPECT().
Publish(gomock.Any()).
Expand Down
Loading

0 comments on commit 5054962

Please sign in to comment.