From 6c279ebf2789655725889f37820c959a8f2ea969 Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Fri, 29 Nov 2019 16:21:30 +0200 Subject: [PATCH] Fix cluster-wide Kafka client bug (#14857) --- metricbeat/module/kafka/broker.go | 13 +++++++------ .../consumergroup/consumergroup_integration_test.go | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index 1b5c446007b..fa61b2dc0c5 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -116,6 +116,13 @@ func (b *Broker) Connect() error { return errors.Wrap(err, "broker.Open failed") } + c, err := getClusterWideClient(b.Addr(), b.cfg) + if err != nil { + closeBroker(b.broker) + return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr()) + } + b.client = c + if b.id != noID || !b.matchID { return nil } @@ -138,12 +145,6 @@ func (b *Broker) Connect() error { b.id = other.ID() b.advertisedAddr = other.Addr() - c, err := getClusterWideClient(b.Addr(), b.cfg) - if err != nil { - closeBroker(b.broker) - return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr()) - } - b.client = c return nil } diff --git a/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go b/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go index 4f1abe6092f..a30b5d10828 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go @@ -81,7 +81,7 @@ func TestFetch(t *testing.T) { for retries := 0; retries < 3; retries++ { data, errors = mbtest.ReportingFetchV2Error(f) if len(data) > 0 { - break + continue } time.Sleep(500 * time.Millisecond) }