Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] aws-s3 - create beat.Client for each SQS worker #33658

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Add Common Expression Language input. {pull}31233[31233]

*Auditbeat*
Expand Down
31 changes: 16 additions & 15 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,6 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}()
defer cancelInputCtx()

// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
ACKHandler: awscommon.NewEventACKHandler(),
})
if err != nil {
return fmt.Errorf("failed to create pipeline client: %w", err)
}
defer client.Close()

if in.config.QueueURL != "" {
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
if err != nil {
Expand All @@ -127,7 +117,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
in.awsConfig.Region = regionName

// Create SQS receiver and S3 notification processor.
receiver, err := in.createSQSReceiver(inputContext, client)
receiver, err := in.createSQSReceiver(inputContext, pipeline)
if err != nil {
return fmt.Errorf("failed to initialize sqs receiver: %w", err)
}
Expand All @@ -139,6 +129,16 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}

if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" {
// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
ACKHandler: awscommon.NewEventACKHandler(),
})
if err != nil {
return fmt.Errorf("failed to create pipeline client: %w", err)
}
defer client.Close()

// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Lister(inputContext, ctx, client, persistentStore, states)
if err != nil {
Expand All @@ -154,7 +154,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
return nil
}

func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsReader, error) {
func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*sqsReader, error) {
sqsAPI := &awsSQSAPI{
client: sqs.NewFromConfig(in.awsConfig, func(o *sqs.Options) {
if in.config.AWSConfig.FIPSEnabled {
Expand Down Expand Up @@ -192,8 +192,8 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe
if err != nil {
return nil, err
}
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)

return sqsReader, nil
Expand Down Expand Up @@ -267,10 +267,11 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
if len(in.config.FileSelectors) == 0 {
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
}
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors)
s3Poller := newS3Poller(log.Named("s3_poller"),
metrics,
s3API,
client,
s3EventHandlerFactory,
states,
persistentStore,
Expand Down
49 changes: 36 additions & 13 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,37 @@ func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager {
return c.pagerConstant
}

var _ beat.Pipeline = (*fakePipeline)(nil)

// fakePipeline returns new ackClients.
type fakePipeline struct{}

func (c *fakePipeline) ConnectWith(clientConfig beat.ClientConfig) (beat.Client, error) {
return &ackClient{}, nil
}

func (c *fakePipeline) Connect() (beat.Client, error) {
panic("Connect() is not implemented.")
}

var _ beat.Client = (*ackClient)(nil)

// ackClient is a fake beat.Client that ACKs the published messages.
type ackClient struct{}

func (c *ackClient) Close() error { return nil }

func (c *ackClient) Publish(event beat.Event) {
// Fake the ACK handling.
event.Private.(*awscommon.EventACKTracker).ACK()
}

func (c *ackClient) PublishAll(event []beat.Event) {
for _, e := range event {
c.Publish(e)
}
}

func makeBenchmarkConfig(t testing.TB) config {
cfg := conf.MustNewConfigFrom(`---
queue_url: foo
Expand All @@ -171,21 +202,13 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
metrics := newInputMetrics(metricRegistry, "test_id")
sqsAPI := newConstantSQS()
s3API := newConstantS3(t)
client := pubtest.NewChanClient(100)
defer close(client.Channel)
pipeline := &fakePipeline{}
conf := makeBenchmarkConfig(t)

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, s3EventHandlerFactory)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler)

go func() {
for event := range client.Channel {
// Fake the ACK handling that's not implemented in pubtest.
event.Private.(*awscommon.EventACKTracker).ACK()
}
}()

ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

Expand Down Expand Up @@ -313,8 +336,8 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
return
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, config.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)

if err := s3Poller.Poll(ctx); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
Expand Down
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/aws/smithy-go/middleware"

"github.com/elastic/beats/v7/libbeat/beat"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -28,7 +29,7 @@ import (
// Run 'go generate' to create mocks that are used in tests.
//go:generate go install github.com/golang/mock/[email protected]
//go:generate mockgen -source=interfaces.go -destination=mock_interfaces_test.go -package awss3 -mock_names=sqsAPI=MockSQSAPI,sqsProcessor=MockSQSProcessor,s3API=MockS3API,s3Pager=MockS3Pager,s3ObjectHandlerFactory=MockS3ObjectHandlerFactory,s3ObjectHandler=MockS3ObjectHandler
//go:generate mockgen -destination=mock_publisher_test.go -package=awss3 -mock_names=Client=MockBeatClient github.com/elastic/beats/v7/libbeat/beat Client
//go:generate mockgen -destination=mock_publisher_test.go -package=awss3 -mock_names=Client=MockBeatClient,Pipeline=MockBeatPipeline github.com/elastic/beats/v7/libbeat/beat Client,Pipeline

// ------
// SQS interfaces
Expand Down Expand Up @@ -88,7 +89,7 @@ type s3ObjectHandlerFactory interface {
// Create returns a new s3ObjectHandler that can be used to process the
// specified S3 object. If the handler is not configured to process the
// given S3 object (based on key name) then it will return nil.
Create(ctx context.Context, log *logp.Logger, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler
Create(ctx context.Context, log *logp.Logger, client beat.Client, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler
}

type s3ObjectHandler interface {
Expand Down
9 changes: 5 additions & 4 deletions x-pack/filebeat/input/awss3/mock_interfaces_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 54 additions & 1 deletion x-pack/filebeat/input/awss3/mock_publisher_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/gofrs/uuid"
"go.uber.org/multierr"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -53,6 +54,7 @@ type s3Poller struct {
s3 s3API
log *logp.Logger
metrics *inputMetrics
client beat.Client
s3ObjectHandler s3ObjectHandlerFactory
states *states
store *statestore.Store
Expand All @@ -63,6 +65,7 @@ type s3Poller struct {
func newS3Poller(log *logp.Logger,
metrics *inputMetrics,
s3 s3API,
client beat.Client,
s3ObjectHandler s3ObjectHandlerFactory,
states *states,
store *statestore.Store,
Expand All @@ -71,7 +74,8 @@ func newS3Poller(log *logp.Logger,
awsRegion string,
provider string,
numberOfWorkers int,
bucketPollInterval time.Duration) *s3Poller {
bucketPollInterval time.Duration,
) *s3Poller {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
}
Expand All @@ -86,6 +90,7 @@ func newS3Poller(log *logp.Logger,
s3: s3,
log: log,
metrics: metrics,
client: client,
s3ObjectHandler: s3ObjectHandler,
states: states,
store: store,
Expand Down Expand Up @@ -214,7 +219,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<-

acker := awscommon.NewEventACKTracker(ctx)

s3Processor := p.s3ObjectHandler.Create(ctx, p.log, acker, event)
s3Processor := p.s3ObjectHandler.Create(ctx, p.log, p.client, acker, event)
if s3Processor == nil {
p.log.Debugw("empty s3 processor.", "state", state)
continue
Expand Down
8 changes: 4 additions & 4 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ type s3ObjectProcessorFactory struct {
log *logp.Logger
metrics *inputMetrics
s3 s3Getter
publisher beat.Client
fileSelectors []fileSelectorConfig
}

func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3Getter, publisher beat.Client, sel []fileSelectorConfig) *s3ObjectProcessorFactory {
func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3Getter, sel []fileSelectorConfig) *s3ObjectProcessorFactory {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
}
Expand All @@ -59,7 +58,6 @@ func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3G
log: log,
metrics: metrics,
s3: s3,
publisher: publisher,
fileSelectors: sel,
}
}
Expand All @@ -75,7 +73,7 @@ func (f *s3ObjectProcessorFactory) findReaderConfig(key string) *readerConfig {

// Create returns a new s3ObjectProcessor. It returns nil when no file selectors
// match the S3 object key.
func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, ack *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler {
func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, client beat.Client, ack *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler {
log = log.With(
"bucket_arn", obj.S3.Bucket.Name,
"object_key", obj.S3.Object.Key)
Expand All @@ -90,6 +88,7 @@ func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger,
s3ObjectProcessorFactory: f,
log: log,
ctx: ctx,
publisher: client,
acker: ack,
readerConfig: readerConfig,
s3Obj: obj,
Expand All @@ -102,6 +101,7 @@ type s3ObjectProcessor struct {

log *logp.Logger
ctx context.Context
publisher beat.Client
acker *awscommon.EventACKTracker // ACKer tied to the SQS message (multiple S3 readers share an ACKer when the S3 notification event contains more than one S3 object).
readerConfig *readerConfig // Config about how to process the object.
s3Obj s3EventV2 // S3 object information.
Expand Down
Loading