diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index 07b40f276fcb..2ce792ed8875 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -271,7 +271,7 @@ func (w *clientCloseWaiter) signalClose() { return } - w.closing.Store(false) + w.closing.Store(true) if w.events.Load() == 0 { w.finishClose() return diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 88c3a67eb817..6c4c3006845f 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -21,11 +21,14 @@ import ( "context" "sync" "testing" + "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" "github.com/elastic/beats/v7/libbeat/tests/resources" ) @@ -113,3 +116,92 @@ func TestClient(t *testing.T) { } }) } + +func TestClientWaitClose(t *testing.T) { + routinesChecker := resources.NewGoroutinesChecker() + defer routinesChecker.Check(t) + + makePipeline := func(settings Settings, qu queue.Queue) *Pipeline { + p, err := New(beat.Info{}, + Monitors{}, + func(queue.ACKListener) (queue.Queue, error) { return qu, nil }, + outputs.Group{}, + settings, + ) + if err != nil { + panic(err) + } + + return p + } + if testing.Verbose() { + logp.TestingSetup() + } + + q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1}) + pipeline := makePipeline(Settings{}, q) + defer pipeline.Close() + + t.Run("WaitClose blocks", func(t *testing.T) { + client, err := pipeline.ConnectWith(beat.ClientConfig{ + WaitClose: 500 * time.Millisecond, + }) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + // Send an event which never gets acknowledged. + client.Publish(beat.Event{}) + + closed := make(chan struct{}) + go func() { + defer close(closed) + client.Close() + }() + + select { + case <-closed: + t.Fatal("expected Close to wait for event acknowledgement") + case <-time.After(100 * time.Millisecond): + } + + select { + case <-closed: + case <-time.After(10 * time.Second): + t.Fatal("expected Close to stop waiting after WaitClose elapses") + } + }) + + t.Run("ACKing events unblocks WaitClose", func(t *testing.T) { + client, err := pipeline.ConnectWith(beat.ClientConfig{ + WaitClose: time.Minute, + }) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + // Send an event which gets acknowledged immediately. + client.Publish(beat.Event{}) + output := newMockClient(func(batch publisher.Batch) error { + batch.ACK() + return nil + }) + defer output.Close() + pipeline.output.Set(outputs.Group{Clients: []outputs.Client{output}}) + defer pipeline.output.Set(outputs.Group{}) + + closed := make(chan struct{}) + go func() { + defer close(closed) + client.Close() + }() + + select { + case <-closed: + case <-time.After(10 * time.Second): + t.Fatal("expected Close to stop waiting after event acknowledgement") + } + }) +}