Skip to content

Commit

Permalink
Making tests pass for networkClient as well
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Apr 13, 2020
1 parent 2854b4a commit dabff5e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
4 changes: 2 additions & 2 deletions libbeat/publisher/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (

func TestOutputReload(t *testing.T) {
tests := map[string]func(mockPublishFn) outputs.Client{
"client": newMockClient,
//"network_client": newMockNetworkClient,
"client": newMockClient,
"network_client": newMockNetworkClient,
}

for name, ctor := range tests {
Expand Down
24 changes: 18 additions & 6 deletions libbeat/publisher/pipeline/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ func (w *clientWorker) run() {
continue
}

w.lf("received batch of %v events", len(batch.events))
w.lf("clientWorker: received batch of %v events", len(batch.events))
if w.closed.Load() {
if batch != nil {
w.lf("canceling batch of %v events", len(batch.events))
w.lf("clientWorker: canceling batch of %v events", len(batch.events))
batch.Cancelled()
}
return
Expand All @@ -136,12 +136,19 @@ func (w *netClientWorker) Close() error {
}

func (w *netClientWorker) run() {
defer func() {
w.lf("netClientWorker closed")
}()
for !w.closed.Load() {
reconnectAttempts := 0

// start initial connect loop from first batch, but return
// batch to pipeline for other outputs to catch up while we're trying to connect
for batch := range w.qu {
if batch == nil {
continue
}

batch.Cancelled()

if w.closed.Load() {
Expand Down Expand Up @@ -169,19 +176,24 @@ func (w *netClientWorker) run() {

// send loop
for batch := range w.qu {
if batch == nil {
continue
}

w.lf("netClientWorker: received batch of %v events", len(batch.events))
if w.closed.Load() {
if batch != nil {
w.lf("netClientWorker: canceling batch of %v events", len(batch.events))
batch.Cancelled()
}
return
}

w.inflight = make(chan struct{})
err := w.client.Publish(batch)
if err != nil {
close(w.inflight)
w.logger.Errorf("Failed to publish events: %v", err)
if err := w.client.Publish(batch); err != nil {
// on error return to connect loop
w.logger.Errorf("Failed to publish events: %v", err)
close(w.inflight)
break
}
close(w.inflight)
Expand Down

0 comments on commit dabff5e

Please sign in to comment.