diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index 1b5c446007b..fa61b2dc0c5 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -116,6 +116,13 @@ func (b *Broker) Connect() error { return errors.Wrap(err, "broker.Open failed") } + c, err := getClusterWideClient(b.Addr(), b.cfg) + if err != nil { + closeBroker(b.broker) + return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr()) + } + b.client = c + if b.id != noID || !b.matchID { return nil } @@ -138,12 +145,6 @@ func (b *Broker) Connect() error { b.id = other.ID() b.advertisedAddr = other.Addr() - c, err := getClusterWideClient(b.Addr(), b.cfg) - if err != nil { - closeBroker(b.broker) - return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr()) - } - b.client = c return nil } diff --git a/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go b/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go index 4f1abe6092f..a30b5d10828 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go @@ -81,7 +81,7 @@ func TestFetch(t *testing.T) { for retries := 0; retries < 3; retries++ { data, errors = mbtest.ReportingFetchV2Error(f) if len(data) > 0 { - break + continue } time.Sleep(500 * time.Millisecond) }