Skip to content

Commit

Permalink
libbeat/publisher/pipeline: fix Client.Close (elastic#20125)
Browse files Browse the repository at this point in the history
Set the "closing" variable to true, not false,
upon signalling the waiter to close.

(cherry picked from commit e7b42d8)
  • Loading branch information
axw authored Jul 23, 2020
1 parent 80fcb07 commit ab41986
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 1 deletion.
2 changes: 1 addition & 1 deletion libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (w *clientCloseWaiter) signalClose() {
return
}

w.closing.Store(false)
w.closing.Store(true)
if w.events.Load() == 0 {
w.finishClose()
return
Expand Down
92 changes: 92 additions & 0 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"context"
"sync"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/beats/v7/libbeat/tests/resources"
)

Expand Down Expand Up @@ -113,3 +116,92 @@ func TestClient(t *testing.T) {
}
})
}

func TestClientWaitClose(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
func(queue.ACKListener) (queue.Queue, error) { return qu, nil },
outputs.Group{},
settings,
)
if err != nil {
panic(err)
}

return p
}
if testing.Verbose() {
logp.TestingSetup()
}

q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1})
pipeline := makePipeline(Settings{}, q)
defer pipeline.Close()

t.Run("WaitClose blocks", func(t *testing.T) {
client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitClose: 500 * time.Millisecond,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()

// Send an event which never gets acknowledged.
client.Publish(beat.Event{})

closed := make(chan struct{})
go func() {
defer close(closed)
client.Close()
}()

select {
case <-closed:
t.Fatal("expected Close to wait for event acknowledgement")
case <-time.After(100 * time.Millisecond):
}

select {
case <-closed:
case <-time.After(10 * time.Second):
t.Fatal("expected Close to stop waiting after WaitClose elapses")
}
})

t.Run("ACKing events unblocks WaitClose", func(t *testing.T) {
client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitClose: time.Minute,
})
if err != nil {
t.Fatal(err)
}
defer client.Close()

// Send an event which gets acknowledged immediately.
client.Publish(beat.Event{})
output := newMockClient(func(batch publisher.Batch) error {
batch.ACK()
return nil
})
defer output.Close()
pipeline.output.Set(outputs.Group{Clients: []outputs.Client{output}})
defer pipeline.output.Set(outputs.Group{})

closed := make(chan struct{})
go func() {
defer close(closed)
client.Close()
}()

select {
case <-closed:
case <-time.After(10 * time.Second):
t.Fatal("expected Close to stop waiting after event acknowledgement")
}
})
}

0 comments on commit ab41986

Please sign in to comment.