Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add consumer_lag in Kafka consumergroup metricset #14822

Merged
merged 10 commits into from
Nov 28, 2019
Prev Previous commit
Next Next commit
Store cluster client in Broker struct for reusability
Signed-off-by: chrismark <chrismarkou92@gmail.com>
ChrsMark committed Nov 28, 2019
commit 0f6a8eff9a5347858dce1b7f9df2eb9b72617251
25 changes: 19 additions & 6 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ func Version(version string) kafka.Version {
type Broker struct {
broker *sarama.Broker
cfg *sarama.Config
client sarama.Client

advertisedAddr string
id int32
@@ -96,6 +97,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
return &Broker{
broker: sarama.NewBroker(host),
cfg: cfg,
client: nil,
id: noID,
matchID: settings.MatchID,
}
@@ -104,6 +106,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
// Close the broker connection
func (b *Broker) Close() error {
closeBroker(b.broker)
b.client.Close()
return nil
}

@@ -134,6 +137,13 @@ func (b *Broker) Connect() error {
debugf("found matching broker %v with id %v", other.Addr(), other.ID())
b.id = other.ID()
b.advertisedAddr = other.Addr()

c, err := getClusteWideClient(b.Addr(), b.cfg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that we could use this client for everything, but not, we may still need to fetch offsets from non-leader partition replicas for monitoring purpouses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though we can also use client.Leader(topic, partitionID) for that.

Copy link
Member Author

@ChrsMark ChrsMark Nov 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on revisiting this in a followup PR with refactoring purpose

if err != nil {
closeBroker(b.broker)
return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr())
}
b.client = c
return nil
}

@@ -272,15 +282,10 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32)

// GetPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader.
func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Use Fetch for consistency with other methods here.

Suggested change
func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {

client, err := sarama.NewClient([]string{b.Addr()}, b.cfg)
if err != nil {
return -1, err
}
offset, err := client.GetOffset(topic, partitionID, sarama.OffsetNewest)
offset, err := b.client.GetOffset(topic, partitionID, sarama.OffsetNewest)
if err != nil {
return -1, err
}
defer client.Close()
return offset, nil
}

@@ -538,6 +543,14 @@ func anyIPsMatch(as, bs []net.IP) bool {
return false
}

func getClusteWideClient(addr string, cfg *sarama.Config) (sarama.Client, error) {
mtojek marked this conversation as resolved.
Show resolved Hide resolved
client, err := sarama.NewClient([]string{addr}, cfg)
if err != nil {
return nil, err
}
return client, nil
}

func brokerAddresses(brokers []*sarama.Broker) []string {
addresses := make([]string, len(brokers))
for i, b := range brokers {