Skip to content

Commit

Permalink
x-pack/filebeat/input/awss3: allow cross-region bucket configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Jul 23, 2024
1 parent 7263696 commit 927a67a
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Implement Elastic Agent status and health reporting for Netflow Filebeat input. {pull}40080[40080]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]
- Allow cross-region bucket configuration in s3 input. {issue}22161[22161] {pull}40309[40309]

*Auditbeat*

Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ func newConstantS3(t testing.TB) *constantS3 {
}
}

func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
func (c constantS3) GetObject(ctx context.Context, _, bucket, key string) (*s3.GetObjectOutput, error) {
return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil
}

func (c constantS3) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
func (c constantS3) CopyObject(ctx context.Context, _, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
return nil, nil
}

func (c constantS3) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) {
func (c constantS3) DeleteObject(ctx context.Context, _, bucket, key string) (*s3.DeleteObjectOutput, error) {
return nil, nil
}

Expand Down
31 changes: 22 additions & 9 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
//go:generate go install github.com/golang/mock/[email protected]
//go:generate mockgen -source=interfaces.go -destination=mock_interfaces_test.go -package awss3 -mock_names=sqsAPI=MockSQSAPI,sqsProcessor=MockSQSProcessor,s3API=MockS3API,s3Pager=MockS3Pager,s3ObjectHandlerFactory=MockS3ObjectHandlerFactory,s3ObjectHandler=MockS3ObjectHandler
//go:generate mockgen -destination=mock_publisher_test.go -package=awss3 -mock_names=Client=MockBeatClient,Pipeline=MockBeatPipeline github.com/elastic/beats/v7/libbeat/beat Client,Pipeline
//go:generate go-licenser -license Elastic .
//go:generate goimports -w -local github.com/elastic .

// ------
// SQS interfaces
Expand Down Expand Up @@ -79,12 +81,12 @@ type s3API interface {
}

type s3Getter interface {
GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error)
GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error)
}

type s3Mover interface {
CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error)
DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error)
CopyObject(ctx context.Context, region, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error)
DeleteObject(ctx context.Context, region, bucket, key string) (*s3.DeleteObjectOutput, error)
}

type s3Lister interface {
Expand Down Expand Up @@ -229,8 +231,8 @@ type awsS3API struct {
client *s3.Client
}

func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
getObjectOutput, err := a.client.GetObject(ctx, &s3.GetObjectInput{
func (a *awsS3API) GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error) {
getObjectOutput, err := a.clientFor(region).GetObject(ctx, &s3.GetObjectInput{
Bucket: awssdk.String(bucket),
Key: awssdk.String(key),
}, s3.WithAPIOptions(
Expand Down Expand Up @@ -262,8 +264,8 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb
return getObjectOutput, nil
}

func (a *awsS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
copyObjectOutput, err := a.client.CopyObject(ctx, &s3.CopyObjectInput{
func (a *awsS3API) CopyObject(ctx context.Context, region, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
copyObjectOutput, err := a.clientFor(region).CopyObject(ctx, &s3.CopyObjectInput{
Bucket: awssdk.String(to_bucket),
CopySource: awssdk.String(fmt.Sprintf("%s/%s", from_bucket, from_key)),
Key: awssdk.String(to_key),
Expand All @@ -274,8 +276,8 @@ func (a *awsS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_
return copyObjectOutput, nil
}

func (a *awsS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) {
deleteObjectOutput, err := a.client.DeleteObject(ctx, &s3.DeleteObjectInput{
func (a *awsS3API) DeleteObject(ctx context.Context, region, bucket, key string) (*s3.DeleteObjectOutput, error) {
deleteObjectOutput, err := a.clientFor(region).DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: awssdk.String(bucket),
Key: awssdk.String(key),
})
Expand All @@ -285,6 +287,17 @@ func (a *awsS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.De
return deleteObjectOutput, nil
}

func (a *awsS3API) clientFor(region string) *s3.Client {
// Conditionally replace the client if the region of
// the request does not match the pre-prepared client.
opts := a.client.Options()
if opts.Region == region {
return a.client
}
opts.Region = region
return s3.New(opts)
}

func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager {
pager := s3.NewListObjectsV2Paginator(a.client, &s3.ListObjectsV2Input{
Bucket: awssdk.String(bucket),
Expand Down
48 changes: 24 additions & 24 deletions x-pack/filebeat/input/awss3/mock_interfaces_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (p *s3ObjectProcessor) ProcessS3Object() error {
// Content-Type and reader to get the object's contents. The caller must
// close the returned reader.
func (p *s3ObjectProcessor) download() (contentType string, metadata map[string]interface{}, body io.ReadCloser, err error) {
getObjectOutput, err := p.s3.GetObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
getObjectOutput, err := p.s3.GetObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
if err != nil {
return "", nil, nil, err
}
Expand Down Expand Up @@ -441,14 +441,14 @@ func (p *s3ObjectProcessor) FinalizeS3Object() error {
return nil
}
backupKey := p.backupConfig.BackupToBucketPrefix + p.s3Obj.S3.Object.Key
_, err := p.s3.CopyObject(p.ctx, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey)
_, err := p.s3.CopyObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey)
if err != nil {
return fmt.Errorf("failed to copy object to backup bucket: %w", err)
}
if !p.backupConfig.Delete {
return nil
}
_, err = p.s3.DeleteObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
_, err = p.s3.DeleteObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
if err != nil {
return fmt.Errorf("failed to delete object from bucket: %w", err)
}
Expand Down
16 changes: 8 additions & 8 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestS3ObjectProcessor(t *testing.T) {
s3Event := newS3Event("log.txt")

mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{})
Expand All @@ -175,7 +175,7 @@ func TestS3ObjectProcessor(t *testing.T) {
s3Event := newS3Event("log.txt")

mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{})
Expand All @@ -197,7 +197,7 @@ func TestS3ObjectProcessor(t *testing.T) {
var events []beat.Event
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
mockPublisher.EXPECT().
Publish(gomock.Any()).
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestS3ObjectProcessor(t *testing.T) {

gomock.InOrder(
mockS3API.EXPECT().
CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil),
)

Expand All @@ -254,10 +254,10 @@ func TestS3ObjectProcessor(t *testing.T) {

gomock.InOrder(
mockS3API.EXPECT().
CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil),
mockS3API.EXPECT().
DeleteObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
DeleteObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil),
)

Expand All @@ -284,7 +284,7 @@ func TestS3ObjectProcessor(t *testing.T) {

gomock.InOrder(
mockS3API.EXPECT().
CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq("backup/testdata/log.txt")).
CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq("backup/testdata/log.txt")).
Return(nil, nil),
)

Expand Down Expand Up @@ -326,7 +326,7 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int,
var events []beat.Event
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
mockPublisher.EXPECT().
Publish(gomock.Any()).
Expand Down
22 changes: 11 additions & 11 deletions x-pack/filebeat/input/awss3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,27 +103,27 @@ func TestS3Poller(t *testing.T) {
})

mockAPI.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key1")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key1")).
Return(nil, errFakeConnectivityFailure)

mockAPI.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key2")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key2")).
Return(nil, errFakeConnectivityFailure)

mockAPI.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key3")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key3")).
Return(nil, errFakeConnectivityFailure)

mockAPI.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key4")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key4")).
Return(nil, errFakeConnectivityFailure)

mockAPI.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key5")).
Return(nil, errFakeConnectivityFailure)

mockAPI.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("2024-02-08T08:35:00+00:02.json.gz")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("2024-02-08T08:35:00+00:02.json.gz")).
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{})
Expand Down Expand Up @@ -245,23 +245,23 @@ func TestS3Poller(t *testing.T) {
})

mockS3.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key1")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key1")).
Return(nil, errFakeConnectivityFailure)

mockS3.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key2")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key2")).
Return(nil, errFakeConnectivityFailure)

mockS3.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key3")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key3")).
Return(nil, errFakeConnectivityFailure)

mockS3.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key4")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key4")).
Return(nil, errFakeConnectivityFailure)

mockS3.EXPECT().
GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")).
GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key5")).
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3, nil, backupConfig{})
Expand Down

0 comments on commit 927a67a

Please sign in to comment.