Skip to content

Commit

Permalink
libbeat/publisher/pipeline: fix data races (elastic#19821)
Browse files Browse the repository at this point in the history
Fix how we pass the initial queue consumer into
eventConsumer.loop; we were referencing c.consumer
in a background goroutine, which can race with
updates to the consumer.

Update tests to properly load atomic variables.
Changed serially updated numEvents vars to basic,
non-atomic types.
  • Loading branch information
axw authored and melchiormoulin committed Oct 14, 2020
1 parent 7b60010 commit e4d6327
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
5 changes: 3 additions & 2 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ func newEventConsumer(
queue queue.Queue,
ctx *batchContext,
) *eventConsumer {
consumer := queue.Consumer()
c := &eventConsumer{
logger: log,
sig: make(chan consumerSignal, 3),
out: nil,

queue: queue,
consumer: queue.Consumer(),
consumer: consumer,
ctx: ctx,
}

Expand All @@ -82,7 +83,7 @@ func newEventConsumer(
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.loop(c.consumer)
c.loop(consumer)
}()
return c
}
Expand Down
12 changes: 6 additions & 6 deletions libbeat/publisher/pipeline/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestMakeClientWorker(t *testing.T) {

err := quick.Check(func(i uint) bool {
numBatches := 300 + (i % 100) // between 300 and 399
numEvents := atomic.MakeUint(0)
var numEvents uint

logger := makeBufLogger(t)

Expand All @@ -69,7 +69,7 @@ func TestMakeClientWorker(t *testing.T) {

for i := uint(0); i < numBatches; i++ {
batch := randomBatch(50, 150).withRetryer(retryer)
numEvents.Add(uint(len(batch.Events())))
numEvents += uint(len(batch.Events()))
wqu <- batch
}

Expand All @@ -78,7 +78,7 @@ func TestMakeClientWorker(t *testing.T) {

// Make sure that all events have eventually been published
success := waitUntilTrue(timeout, func() bool {
return numEvents == published
return numEvents == published.Load()
})
if !success {
logger.Flush()
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestMakeClientTracer(t *testing.T) {
testutil.SeedPRNG(t)

numBatches := 10
numEvents := atomic.MakeUint(0)
var numEvents uint

logger := makeBufLogger(t)

Expand All @@ -226,7 +226,7 @@ func TestMakeClientTracer(t *testing.T) {

for i := 0; i < numBatches; i++ {
batch := randomBatch(10, 15).withRetryer(retryer)
numEvents.Add(uint(len(batch.Events())))
numEvents += uint(len(batch.Events()))
wqu <- batch
}

Expand All @@ -235,7 +235,7 @@ func TestMakeClientTracer(t *testing.T) {

// Make sure that all events have eventually been published
matches := waitUntilTrue(timeout, func() bool {
return numEvents == published
return numEvents == published.Load()
})
if !matches {
t.Errorf("expected %d events, got %d", numEvents, published)
Expand Down

0 comments on commit e4d6327

Please sign in to comment.