From 7ef21b1a37f4cff441343775a54bce953d5b60aa Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Mon, 20 Jan 2020 16:04:37 +0100 Subject: [PATCH] Cherry-pick #15568 to 7.x: Fix panic: don't send events if client is nil (#15677) * Fix panic: don't send events if client is nil (#15568) * Fix panic: don't send events if client is nil * Use mutex * Add CHANGELOG entry * Rename changelog entry * Fix: changelog * Temporarily use specific logstash release (cherry picked from commit 3e39fdfa22bd0371c3c4418e62a1511bcb6009f5) * Fix CHANGELOG --- CHANGELOG.next.asciidoc | 1 + libbeat/outputs/logstash/async.go | 21 ++++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3e2aff8ea1b..0d4cbee543a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix spooling to disk blocking infinitely if the lock file can not be acquired. {pull}15338[15338] - Fix `metricbeat test output` with an ipv6 ES host in the output.hosts. {pull}15368[15368] - Fix `convert` processor conversion of string to integer with leading zeros. {issue}15513[15513] {pull}15557[15557] +- Fix panic in the Logstash output when trying to send events to closed connection. {pull}15568[15568] *Auditbeat* diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 96374e192d0..967ae7d0f6c 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -18,7 +18,9 @@ package logstash import ( + "errors" "net" + "sync" "time" "github.com/elastic/beats/libbeat/beat" @@ -37,6 +39,8 @@ type asyncClient struct { win *window connect func() error + + mutex sync.Mutex } type msgRef struct { @@ -113,7 +117,11 @@ func (c *asyncClient) Connect() error { } func (c *asyncClient) Close() error { + c.mutex.Lock() + defer c.mutex.Unlock() + logp.Debug("logstash", "close connection") + if c.client != nil { err := c.client.Close() c.client = nil @@ -197,12 +205,23 @@ func (c *asyncClient) publishWindowed( } func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error { + client := c.getClient() + if client == nil { + return errors.New("connection closed") + } window := make([]interface{}, len(events)) for i := range events { window[i] = &events[i].Content } ref.count.Inc() - return c.client.Send(ref.callback, window) + return client.Send(ref.callback, window) +} + +func (c *asyncClient) getClient() *v2.AsyncClient { + c.mutex.Lock() + client := c.client + c.mutex.Unlock() + return client } func (r *msgRef) callback(seq uint32, err error) {