Skip to content

Commit

Permalink
[filebeat][gcs] - Added missing locks for safe concurrency (#34914)
Browse files Browse the repository at this point in the history
* added missing locks for safe concurrency

* updated asciidoc

* updated asciidoc

* updated locks for publishing events

* added nil state fail safe mechanism

---------

Co-authored-by: Denis <[email protected]>
  • Loading branch information
2 people authored and chrisberkhout committed Jun 1, 2023
1 parent 74327f5 commit cf74871
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix for httpjson first_response object throwing false positive errors by making it a flag based object {issue}34747[34747] {pull}34748[34748]
- Fix errors and panics due to re-used processors {pull}34761[34761]
- Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689]
- [Gcs Input] - Added missing locks for safe concurrency {pull}34914[34914]
- Fix the ignore_inactive option being ignored in Filebeat's filestream input {pull}34770[34770]

*Heartbeat*
Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"
"unicode"

Expand All @@ -27,6 +28,8 @@ import (
)

type job struct {
// Mutex lock for concurrent publishes
mu sync.Mutex
// gcs bucket handle
bucket *storage.BucketHandle
// gcs object attribute struct
Expand Down Expand Up @@ -107,9 +110,12 @@ func (j *job) do(ctx context.Context, id string) {
}
event.SetID(objectID(j.hash, 0))
j.state.save(j.object.Name, j.object.Updated)
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
}
}

Expand Down Expand Up @@ -217,9 +223,12 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
// partially saves read state using offset
j.state.savePartial(j.object.Name, offset+relativeOffset)
}
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/gcs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (l *limiter) wait() {
l.wg.Wait()
}

// release puts pack a worker thread.
// release puts back a worker thread.
func (l *limiter) release() {
<-l.limit
l.wg.Done()
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *scheduler) fetchObjectPager(ctx context.Context, pageSize int) *iterato
}

// moveToLastSeenJob, moves to the latest job position past the last seen job
// Jobs are stored in lexicographical order always , hence the latest position can be found either on the basis of job name or timestamp
// Jobs are stored in lexicographical order always, hence the latest position can be found either on the basis of job name or timestamp
func (s *scheduler) moveToLastSeenJob(jobs []*job) []*job {
var latestJobs []*job
jobsToReturn := make([]*job, 0)
Expand Down
17 changes: 16 additions & 1 deletion x-pack/filebeat/input/gcs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ func (s *state) save(name string, lastModifiedOn time.Time) {

// setRootArray, sets boolean true for objects that have their roots defined as an array type
func (s *state) setRootArray(name string) {
s.mu.Lock()
s.cp.IsRootArray[name] = true
s.mu.Unlock()
}

// savePartial, partially saves/updates the current state for cursor checkpoint
func (s *state) savePartial(name string, offset int64) {
s.mu.Lock()
s.cp.LastProcessedOffset[name] = offset
s.mu.Unlock()
}

// updateFailedJobs, adds a job name to a failedJobs map, which helps
Expand All @@ -87,11 +91,11 @@ func (s *state) savePartial(name string, offset int64) {
// A failed job will be re-tried a maximum of 3 times after which the
// entry is removed from the map
func (s *state) updateFailedJobs(jobName string) {
s.mu.Lock()
// we do not store partially processed jobs as failed jobs
if _, ok := s.cp.LastProcessedOffset[jobName]; ok {
return
}
s.mu.Lock()
s.cp.FailedJobs[jobName]++
if s.cp.FailedJobs[jobName] > maxFailedJobRetries {
delete(s.cp.FailedJobs, jobName)
Expand All @@ -100,7 +104,18 @@ func (s *state) updateFailedJobs(jobName string) {
}

// setCheckpoint, sets checkpoint from source to current state instance
// If for some reason the current state is empty, assigns new states as
// a fail safe mechanism
func (s *state) setCheckpoint(chkpt *Checkpoint) {
if chkpt.FailedJobs == nil {
chkpt.FailedJobs = make(map[string]int)
}
if chkpt.IsRootArray == nil {
chkpt.IsRootArray = make(map[string]bool)
}
if chkpt.LastProcessedOffset == nil {
chkpt.LastProcessedOffset = make(map[string]int64)
}
s.cp = chkpt
}

Expand Down

0 comments on commit cf74871

Please sign in to comment.