diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 22f5812d4be4..624fd3d515f4 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -36,8 +36,8 @@ import ( func TestOutputReload(t *testing.T) { tests := map[string]func(mockPublishFn) outputs.Client{ - "client": newMockClient, - //"network_client": newMockNetworkClient, + "client": newMockClient, + "network_client": newMockNetworkClient, } for name, ctor := range tests { diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 2e45c9c37f5f..d170008d5a7a 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -108,10 +108,10 @@ func (w *clientWorker) run() { continue } - w.lf("received batch of %v events", len(batch.events)) + w.lf("clientWorker: received batch of %v events", len(batch.events)) if w.closed.Load() { if batch != nil { - w.lf("canceling batch of %v events", len(batch.events)) + w.lf("clientWorker: canceling batch of %v events", len(batch.events)) batch.Cancelled() } return @@ -136,12 +136,19 @@ func (w *netClientWorker) Close() error { } func (w *netClientWorker) run() { + defer func() { + w.lf("netClientWorker closed") + }() for !w.closed.Load() { reconnectAttempts := 0 // start initial connect loop from first batch, but return // batch to pipeline for other outputs to catch up while we're trying to connect for batch := range w.qu { + if batch == nil { + continue + } + batch.Cancelled() if w.closed.Load() { @@ -169,19 +176,24 @@ func (w *netClientWorker) run() { // send loop for batch := range w.qu { + if batch == nil { + continue + } + + w.lf("netClientWorker: received batch of %v events", len(batch.events)) if w.closed.Load() { if batch != nil { + w.lf("netClientWorker: canceling batch of %v events", len(batch.events)) batch.Cancelled() } return } w.inflight = make(chan struct{}) - err := w.client.Publish(batch) - if err != nil { - close(w.inflight) - w.logger.Errorf("Failed to publish events: %v", err) + if err := w.client.Publish(batch); err != nil { // on error return to connect loop + w.logger.Errorf("Failed to publish events: %v", err) + close(w.inflight) break } close(w.inflight)