Skip to content

Commit

Permalink
Cherry-pick #15568 to 7.x: Fix panic: don't send events if client is …
Browse files Browse the repository at this point in the history
…nil (#15677)

* Fix panic: don't send events if client is nil (#15568)

* Fix panic: don't send events if client is nil

* Use mutex

* Add CHANGELOG entry

* Rename changelog entry

* Fix: changelog

* Temporarily use specific logstash release

(cherry picked from commit 3e39fdf)

* Fix CHANGELOG
  • Loading branch information
mtojek authored Jan 20, 2020
1 parent 4c28112 commit 7ef21b1
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix spooling to disk blocking infinitely if the lock file can not be acquired. {pull}15338[15338]
- Fix `metricbeat test output` with an ipv6 ES host in the output.hosts. {pull}15368[15368]
- Fix `convert` processor conversion of string to integer with leading zeros. {issue}15513[15513] {pull}15557[15557]
- Fix panic in the Logstash output when trying to send events to closed connection. {pull}15568[15568]

*Auditbeat*

Expand Down
21 changes: 20 additions & 1 deletion libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package logstash

import (
"errors"
"net"
"sync"
"time"

"github.com/elastic/beats/libbeat/beat"
Expand All @@ -37,6 +39,8 @@ type asyncClient struct {
win *window

connect func() error

mutex sync.Mutex
}

type msgRef struct {
Expand Down Expand Up @@ -113,7 +117,11 @@ func (c *asyncClient) Connect() error {
}

func (c *asyncClient) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()

logp.Debug("logstash", "close connection")

if c.client != nil {
err := c.client.Close()
c.client = nil
Expand Down Expand Up @@ -197,12 +205,23 @@ func (c *asyncClient) publishWindowed(
}

func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error {
client := c.getClient()
if client == nil {
return errors.New("connection closed")
}
window := make([]interface{}, len(events))
for i := range events {
window[i] = &events[i].Content
}
ref.count.Inc()
return c.client.Send(ref.callback, window)
return client.Send(ref.callback, window)
}

func (c *asyncClient) getClient() *v2.AsyncClient {
c.mutex.Lock()
client := c.client
c.mutex.Unlock()
return client
}

func (r *msgRef) callback(seq uint32, err error) {
Expand Down

0 comments on commit 7ef21b1

Please sign in to comment.