Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add consumer_lag in Kafka consumergroup metricset #14822

Merged
merged 10 commits into from
Nov 28, 2019
9 changes: 9 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15791,6 +15791,15 @@ type: keyword

--

*`kafka.consumergroup.consumer_lag`*::
+
--
consumer lag for partition/topic

type: long

--

*`kafka.consumergroup.error.code`*::
+
--
Expand Down
27 changes: 27 additions & 0 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func Version(version string) kafka.Version {
type Broker struct {
broker *sarama.Broker
cfg *sarama.Config
client sarama.Client

advertisedAddr string
id int32
Expand Down Expand Up @@ -96,6 +97,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
return &Broker{
broker: sarama.NewBroker(host),
cfg: cfg,
client: nil,
id: noID,
matchID: settings.MatchID,
}
Expand All @@ -104,6 +106,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
// Close the broker connection
func (b *Broker) Close() error {
closeBroker(b.broker)
b.client.Close()
return nil
}

Expand Down Expand Up @@ -134,6 +137,13 @@ func (b *Broker) Connect() error {
debugf("found matching broker %v with id %v", other.Addr(), other.ID())
b.id = other.ID()
b.advertisedAddr = other.Addr()

c, err := getClusteWideClient(b.Addr(), b.cfg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that we could use this client for everything, but not, we may still need to fetch offsets from non-leader partition replicas for monitoring purpouses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though we can also use client.Leader(topic, partitionID) for that.

Copy link
Member Author

@ChrsMark ChrsMark Nov 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on revisiting this in a followup PR with refactoring purpose

if err != nil {
closeBroker(b.broker)
return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr())
}
b.client = c
return nil
}

Expand Down Expand Up @@ -270,6 +280,15 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32)
return b.broker.FetchOffset(requ)
}

// GetPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader.
func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Use Fetch for consistency with other methods here.

Suggested change
func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {

offset, err := b.client.GetOffset(topic, partitionID, sarama.OffsetNewest)
if err != nil {
return -1, err
}
return offset, nil
}

// ID returns the broker or -1 if the broker id is unknown.
mtojek marked this conversation as resolved.
Show resolved Hide resolved
func (b *Broker) ID() int32 {
if b.id == noID {
Expand Down Expand Up @@ -524,6 +543,14 @@ func anyIPsMatch(as, bs []net.IP) bool {
return false
}

func getClusteWideClient(addr string, cfg *sarama.Config) (sarama.Client, error) {
mtojek marked this conversation as resolved.
Show resolved Hide resolved
client, err := sarama.NewClient([]string{addr}, cfg)
if err != nil {
return nil, err
}
return client, nil
}

func brokerAddresses(brokers []*sarama.Broker) []string {
addresses := make([]string, len(brokers))
for i, b := range brokers {
Expand Down
36 changes: 20 additions & 16 deletions metricbeat/module/kafka/consumergroup/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,45 +1,49 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"hostname": "host.example.com",
"name": "host.example.com"
"event": {
"dataset": "kafka.consumergroup",
"duration": 115000,
"module": "kafka"
},
"kafka": {
"broker": {
"address": "172.18.0.2:9092",
"address": "localhost:32768",
"id": 0
},
"consumergroup": {
"broker": {
"address": "172.18.0.2:9092",
"address": "localhost:32768",
"id": 0
},
"client": {
"host": "172.18.0.1",
"id": "sarama",
"member_id": "sarama-fcb5a5db-0474-4f3a-a5af-29e2f14549c5"
"host": "127.0.0.1",
"id": "consumer-1",
"member_id": "consumer-1-a12ac7d4-00aa-45a0-8b35-0a9c6e880bf4"
},
"consumer_lag": 1059,
"error": {
"code": 0
},
"id": "test-group",
"id": "console-consumer-50413",
"meta": "",
"offset": -1,
"partition": 0,
"topic": "metricbeat-test"
"topic": "test"
},
"partition": {
"id": 0,
"topic_id": "0-metricbeat-test"
"topic_id": "0-test"
},
"topic": {
"name": "metricbeat-test"
"name": "test"
}
},
"metricset": {
"host": "kafka:9092",
"module": "kafka",
"name": "consumergroup",
"rtt": 115
"period": 10000
},
"service": {
"address": "localhost:32768",
"type": "kafka"
}
}
}
4 changes: 4 additions & 0 deletions metricbeat/module/kafka/consumergroup/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
type: keyword
description: custom consumer meta data string

- name: consumer_lag
type: long
description: consumer lag for partition/topic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably worth it explaining what this is, as an important metric: the difference between the partition offset and consumer offset


- name: error.code
type: long
description: >
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/kafka/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch consumer group metrics from kafka
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
broker, err := m.Connect()

if err != nil {
return errors.Wrap(err, "error in connect")
mtojek marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
13 changes: 10 additions & 3 deletions metricbeat/module/kafka/consumergroup/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
)

type mockClient struct {
listGroups func() ([]string, error)
describeGroups func(group []string) (map[string]kafka.GroupDescription, error)
fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error)
listGroups func() ([]string, error)
describeGroups func(group []string) (map[string]kafka.GroupDescription, error)
fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error)
getPartitionOffsetFromTheLeader func(topic string, partitionID int32) (int64, error)
}

type mockState struct {
Expand All @@ -45,6 +46,9 @@ func defaultMockClient(state mockState) *mockClient {
listGroups: makeListGroups(state),
describeGroups: makeDescribeGroups(state),
fetchGroupOffsets: makeFetchGroupOffsets(state),
getPartitionOffsetFromTheLeader: func(topic string, partitionID int32) (int64, error) {
return 42, nil
},
}
}

Expand Down Expand Up @@ -145,3 +149,6 @@ func (c *mockClient) DescribeGroups(groups []string) (map[string]kafka.GroupDesc
func (c *mockClient) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
return c.fetchGroupOffsets(group)
}
func (c *mockClient) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
return c.getPartitionOffsetFromTheLeader(topic, partitionID)
}
30 changes: 24 additions & 6 deletions metricbeat/module/kafka/consumergroup/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type client interface {
ListGroups() ([]string, error)
DescribeGroups(group []string) (map[string]kafka.GroupDescription, error)
FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error)
GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error)
}

func fetchGroupInfo(
Expand Down Expand Up @@ -113,12 +114,22 @@ func fetchGroupInfo(

for topic, partitions := range ret.off.Blocks {
for partition, info := range partitions {
partitionOffset, err := getPartitionOffsetFromTheLeader(b, topic, partition)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to explore here.

I guess that it may happen that the partition offset here is always going to be ahead of the group offset, because we get first the group offset, and then the partition offset. Between both operations the partition offset may have changed.

Starting on version 4 of ListOffsets (the API method used to get partition offsets), it is possible to indicate a current_leader_epoch to retrieve "old" metadata.

Starting on version 5 of OffsetFetch (the API method used to get consumer group offsets), its response contains a leader_epoch field.

I wonder if we can use the leader_epoch contained in the response of OffsetFetch when available to query for the offset of the partition in the same epoch. This way we could have a more accurate value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the implications for the support matrix? I'm not very familiar with API version vs Kafka versioning.

Copy link
Member Author

@ChrsMark ChrsMark Nov 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsoriano not sure if this can be achieved with the current implementation of GetOffset

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the implications for the support matrix? I'm not very familiar with API version vs Kafka versioning.

All messages in kafka protocol are versioned, each client and broker can support a different range of versions for each message. There is a message (ApiVersionsRequest) to query the versions supported by the broker, we could use this method to decide if we can use the methods aware of the epoch.

@jsoriano not sure if this can be achieved with the current implementation of GetOffset

No, we would need to forge our own request as we do to request partition offsets to the leader. Or we could contribute to Sarama the support for these versions. 🙂

Copy link
Member Author

@ChrsMark ChrsMark Nov 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if err != nil {
logp.Err("failed to fetch offset for (topic, partition): ('%v', %v)", topic, partition)
continue
}
consumerLag := partitionOffset - info.Offset
if consumerLag < 0 {
mtojek marked this conversation as resolved.
Show resolved Hide resolved
consumerLag = 0
}
event := common.MapStr{
"id": ret.group,
"topic": topic,
"partition": partition,
"offset": info.Offset,
"meta": info.Metadata,
"id": ret.group,
"topic": topic,
"partition": partition,
"offset": info.Offset,
"meta": info.Metadata,
"consumer_lag": consumerLag,
"error": common.MapStr{
"code": info.Err,
},
Expand All @@ -133,7 +144,6 @@ func fetchGroupInfo(
}
}
}

emit(event)
}
}
Expand All @@ -145,6 +155,14 @@ func fetchGroupInfo(
return err
}

func getPartitionOffsetFromTheLeader(b client, topic string, partitionID int32) (int64, error) {
offset, err := b.GetPartitionOffsetFromTheLeader(topic, partitionID)
if err != nil {
return -1, err
}
return offset, nil
}

func listGroups(b client, filter func(string) bool) ([]string, error) {
groups, err := b.ListGroups()
if err != nil {
Expand Down
22 changes: 11 additions & 11 deletions metricbeat/module/kafka/consumergroup/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,35 +68,35 @@ func TestFetchGroupInfo(t *testing.T) {
expected: []common.MapStr{
testEvent("group1", "topic1", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(10),
"offset": int64(10), "consumer_lag": int64(42) - int64(10),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit.

Suggested change
"offset": int64(10), "consumer_lag": int64(42) - int64(10),
"offset": int64(10),
"consumer_lag": int64(42) - int64(10),

}),
testEvent("group1", "topic1", 1, common.MapStr{
"client": clientMeta(1),
"offset": int64(11),
"offset": int64(11), "consumer_lag": int64(42) - int64(11),
}),
testEvent("group1", "topic1", 2, common.MapStr{
"client": clientMeta(0),
"offset": int64(12),
"offset": int64(12), "consumer_lag": int64(42) - int64(12),
}),
testEvent("group1", "topic3", 0, common.MapStr{
"client": clientMeta(1),
"offset": int64(6),
"offset": int64(6), "consumer_lag": int64(42) - int64(6),
}),
testEvent("group1", "topic3", 1, common.MapStr{
"client": clientMeta(0),
"offset": int64(7),
"offset": int64(7), "consumer_lag": int64(42) - int64(7),
}),
testEvent("group2", "topic2", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(3),
"offset": int64(3), "consumer_lag": int64(42) - int64(3),
}),
testEvent("group2", "topic3", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(9),
"offset": int64(9), "consumer_lag": int64(42) - int64(9),
}),
testEvent("group2", "topic3", 1, common.MapStr{
"client": clientMeta(0),
"offset": int64(10),
"offset": int64(10), "consumer_lag": int64(42) - int64(10),
}),
},
},
Expand Down Expand Up @@ -128,11 +128,11 @@ func TestFetchGroupInfo(t *testing.T) {
expected: []common.MapStr{
testEvent("group1", "topic1", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(1),
"offset": int64(1), "consumer_lag": int64(42) - int64(1),
}),
testEvent("group1", "topic1", 1, common.MapStr{
"client": clientMeta(0),
"offset": int64(2),
"offset": int64(2), "consumer_lag": int64(42) - int64(2),
}),
},
},
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestFetchGroupInfo(t *testing.T) {
for key, expected := range indexEvents(test.expected) {
event, found := indexed[key]
if !found {
t.Errorf("Missing event: %v", key)
t.Errorf("Missing key %v from events: %v", key, events)
continue
}
assertEvent(t, expected, event)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.