Skip to content

Commit

Permalink
Fix awscloudwatch worker allocation (#38953) (#39164)
Browse files Browse the repository at this point in the history
Fix a bug in cloudwatch worker allocation that could cause data loss (#38918).

The previous behavior wasn't really tested, since worker tasks were computed in cloudwatchPoller's polling loop which required live AWS connections. So in addition to the basic logical fix, I did some refactoring to cloudwatchPoller that makes the task iteration visible to unit tests.

(cherry picked from commit deece39)

Co-authored-by: Fae Charlton <[email protected]>
  • Loading branch information
mergify[bot] and faec authored Apr 24, 2024
1 parent 11357af commit 56c2828
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 230 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556]
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]
- [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917]
- Fix a bug in cloudwatch task allocation that could skip some logs {issue}38918[38918] {pull}38953[38953]
- Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985]
- entity-analytics input: Improve structured logging. {pull}38990[38990]
- Fix config validation for CEL and HTTPJSON inputs when using password grant authentication and `client.id` or `client.secret` are not present. {pull}38962[38962]
Expand Down
134 changes: 104 additions & 30 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,61 +14,69 @@ import (
awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"

awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
)

type cloudwatchPoller struct {
numberOfWorkers int
apiSleep time.Duration
config config
region string
logStreams []*string
logStreamPrefix string
startTime int64
endTime int64
workerSem *awscommon.Sem
log *logp.Logger
metrics *inputMetrics
workersListingMap *sync.Map
workersProcessingMap *sync.Map

// When a worker is ready for its next task, it should
// send to workRequestChan and then read from workResponseChan.
// The worker can cancel the request based on other context
// cancellations, but if the write succeeds it _must_ read from
// workResponseChan to avoid deadlocking the main loop.
workRequestChan chan struct{}
workResponseChan chan workResponse

workerWg sync.WaitGroup
}

type workResponse struct {
logGroup string
startTime, endTime time.Time
}

func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
awsRegion string, apiSleep time.Duration,
numberOfWorkers int, logStreams []*string, logStreamPrefix string) *cloudwatchPoller {
awsRegion string, config config) *cloudwatchPoller {
if metrics == nil {
metrics = newInputMetrics("", nil)
}

return &cloudwatchPoller{
numberOfWorkers: numberOfWorkers,
apiSleep: apiSleep,
region: awsRegion,
logStreams: logStreams,
logStreamPrefix: logStreamPrefix,
startTime: int64(0),
endTime: int64(0),
workerSem: awscommon.NewSem(numberOfWorkers),
log: log,
metrics: metrics,
region: awsRegion,
config: config,
workersListingMap: new(sync.Map),
workersProcessingMap: new(sync.Map),
// workRequestChan is unbuffered to guarantee that
// the worker and main loop agree whether a request
// was sent. workerResponseChan is buffered so the
// main loop doesn't have to block on the workers
// while distributing new data.
workRequestChan: make(chan struct{}),
workResponseChan: make(chan workResponse, 10),
}
}

func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) {
func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) {
err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor)
if err != nil {
var errRequestCanceled *awssdk.RequestCanceledError
if errors.As(err, &errRequestCanceled) {
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err)
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", errRequestCanceled)
}
p.log.Error("getLogEventsFromCloudWatch failed: ", err)
}
}

// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error {
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) error {
// construct FilterLogEventsInput
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup)
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput)
Expand All @@ -83,8 +91,8 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents)))

// This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region).
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep)
time.Sleep(p.apiSleep)
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.config.APISleep)
time.Sleep(p.config.APISleep)
p.log.Debug("done sleeping")

p.log.Debugf("Processing #%v events", len(logEvents))
Expand All @@ -93,21 +101,87 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
return nil
}

func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String(logGroup),
StartTime: awssdk.Int64(startTime),
EndTime: awssdk.Int64(endTime),
StartTime: awssdk.Int64(startTime.UnixNano() / int64(time.Millisecond)),
EndTime: awssdk.Int64(endTime.UnixNano() / int64(time.Millisecond)),
}

if len(p.logStreams) > 0 {
for _, stream := range p.logStreams {
if len(p.config.LogStreams) > 0 {
for _, stream := range p.config.LogStreams {
filterLogEventsInput.LogStreamNames = append(filterLogEventsInput.LogStreamNames, *stream)
}
}

if p.logStreamPrefix != "" {
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix)
if p.config.LogStreamPrefix != "" {
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.config.LogStreamPrefix)
}
return filterLogEventsInput
}

func (p *cloudwatchPoller) startWorkers(
ctx context.Context,
svc *cloudwatchlogs.Client,
logProcessor *logProcessor,
) {
for i := 0; i < p.config.NumberOfWorkers; i++ {
p.workerWg.Add(1)
go func() {
defer p.workerWg.Done()
for {
var work workResponse
select {
case <-ctx.Done():
return
case p.workRequestChan <- struct{}{}:
work = <-p.workResponseChan
}

p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroup)
p.run(svc, work.logGroup, work.startTime, work.endTime, logProcessor)
p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroup)
}
}()
}
}

// receive implements the main run loop that distributes tasks to the worker
// goroutines. It accepts a "clock" callback (which on a live input should
// equal time.Now) to allow deterministic unit tests.
func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, clock func() time.Time) {
defer p.workerWg.Wait()
// startTime and endTime are the bounds of the current scanning interval.
// If we're starting at the end of the logs, advance the start time to the
// most recent scan window
var startTime time.Time
endTime := clock().Add(-p.config.Latency)
if p.config.StartPosition == "end" {
startTime = endTime.Add(-p.config.ScanFrequency)
}
for ctx.Err() == nil {
for _, lg := range logGroupNames {
select {
case <-ctx.Done():
return
case <-p.workRequestChan:
p.workResponseChan <- workResponse{
logGroup: lg,
startTime: startTime,
endTime: endTime,
}
}
}

// Delay for ScanFrequency after finishing a time span
p.log.Debugf("sleeping for %v before checking new logs", p.config.ScanFrequency)
select {
case <-time.After(p.config.ScanFrequency):
case <-ctx.Done():
}
p.log.Debug("done sleeping")

// Advance to the next time span
startTime, endTime = endTime, clock().Add(-p.config.Latency)
}
}
Loading

0 comments on commit 56c2828

Please sign in to comment.