Skip to content

Commit

Permalink
Add consumer_lag in Kafka consumergroup metricset (elastic#14822)
Browse files Browse the repository at this point in the history
(cherry picked from commit 23aaf5c)
  • Loading branch information
ChrsMark committed Nov 29, 2019
1 parent 1741b30 commit 9226f08
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `fingerprint` processor. {issue}11173[11173] {pull}14205[14205]
- Add support for API keys in Elasticsearch outputs. {pull}14324[14324]
- Ensure that init containers are no longer tailed after they stop {pull}14394[14394]
- Add consumer_lag in Kafka consumergroup metricset {pull}14822[14822]

*Auditbeat*

Expand Down
9 changes: 9 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15734,6 +15734,15 @@ type: keyword
--
*`kafka.consumergroup.consumer_lag`*::
+
--
consumer lag for partition/topic calculated as the difference between the partition offset and consumer offset
type: long
--
*`kafka.consumergroup.error.code`*::
+
--
Expand Down
29 changes: 28 additions & 1 deletion metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func Version(version string) kafka.Version {
type Broker struct {
broker *sarama.Broker
cfg *sarama.Config
client sarama.Client

advertisedAddr string
id int32
Expand Down Expand Up @@ -96,6 +97,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
return &Broker{
broker: sarama.NewBroker(host),
cfg: cfg,
client: nil,
id: noID,
matchID: settings.MatchID,
}
Expand All @@ -104,6 +106,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
// Close the broker connection
func (b *Broker) Close() error {
closeBroker(b.broker)
b.client.Close()
return nil
}

Expand Down Expand Up @@ -134,6 +137,13 @@ func (b *Broker) Connect() error {
debugf("found matching broker %v with id %v", other.Addr(), other.ID())
b.id = other.ID()
b.advertisedAddr = other.Addr()

c, err := getClusterWideClient(b.Addr(), b.cfg)
if err != nil {
closeBroker(b.broker)
return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr())
}
b.client = c
return nil
}

Expand Down Expand Up @@ -270,7 +280,16 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32)
return b.broker.FetchOffset(requ)
}

// ID returns the broker or -1 if the broker id is unknown.
// FetchPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader.
func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
offset, err := b.client.GetOffset(topic, partitionID, sarama.OffsetNewest)
if err != nil {
return -1, err
}
return offset, nil
}

// ID returns the broker ID or -1 if the broker id is unknown.
func (b *Broker) ID() int32 {
if b.id == noID {
return b.broker.ID()
Expand Down Expand Up @@ -524,6 +543,14 @@ func anyIPsMatch(as, bs []net.IP) bool {
return false
}

func getClusterWideClient(addr string, cfg *sarama.Config) (sarama.Client, error) {
client, err := sarama.NewClient([]string{addr}, cfg)
if err != nil {
return nil, err
}
return client, nil
}

func brokerAddresses(brokers []*sarama.Broker) []string {
addresses := make([]string, len(brokers))
for i, b := range brokers {
Expand Down
36 changes: 20 additions & 16 deletions metricbeat/module/kafka/consumergroup/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,45 +1,49 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"hostname": "host.example.com",
"name": "host.example.com"
"event": {
"dataset": "kafka.consumergroup",
"duration": 115000,
"module": "kafka"
},
"kafka": {
"broker": {
"address": "172.18.0.2:9092",
"address": "localhost:32768",
"id": 0
},
"consumergroup": {
"broker": {
"address": "172.18.0.2:9092",
"address": "localhost:32768",
"id": 0
},
"client": {
"host": "172.18.0.1",
"id": "sarama",
"member_id": "sarama-fcb5a5db-0474-4f3a-a5af-29e2f14549c5"
"host": "127.0.0.1",
"id": "consumer-1",
"member_id": "consumer-1-a12ac7d4-00aa-45a0-8b35-0a9c6e880bf4"
},
"consumer_lag": 1059,
"error": {
"code": 0
},
"id": "test-group",
"id": "console-consumer-50413",
"meta": "",
"offset": -1,
"partition": 0,
"topic": "metricbeat-test"
"topic": "test"
},
"partition": {
"id": 0,
"topic_id": "0-metricbeat-test"
"topic_id": "0-test"
},
"topic": {
"name": "metricbeat-test"
"name": "test"
}
},
"metricset": {
"host": "kafka:9092",
"module": "kafka",
"name": "consumergroup",
"rtt": 115
"period": 10000
},
"service": {
"address": "localhost:32768",
"type": "kafka"
}
}
}
4 changes: 4 additions & 0 deletions metricbeat/module/kafka/consumergroup/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
type: keyword
description: custom consumer meta data string

- name: consumer_lag
type: long
description: consumer lag for partition/topic calculated as the difference between the partition offset and consumer offset

- name: error.code
type: long
description: >
Expand Down
13 changes: 10 additions & 3 deletions metricbeat/module/kafka/consumergroup/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
)

type mockClient struct {
listGroups func() ([]string, error)
describeGroups func(group []string) (map[string]kafka.GroupDescription, error)
fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error)
listGroups func() ([]string, error)
describeGroups func(group []string) (map[string]kafka.GroupDescription, error)
fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error)
getPartitionOffsetFromTheLeader func(topic string, partitionID int32) (int64, error)
}

type mockState struct {
Expand All @@ -45,6 +46,9 @@ func defaultMockClient(state mockState) *mockClient {
listGroups: makeListGroups(state),
describeGroups: makeDescribeGroups(state),
fetchGroupOffsets: makeFetchGroupOffsets(state),
getPartitionOffsetFromTheLeader: func(topic string, partitionID int32) (int64, error) {
return 42, nil
},
}
}

Expand Down Expand Up @@ -145,3 +149,6 @@ func (c *mockClient) DescribeGroups(groups []string) (map[string]kafka.GroupDesc
func (c *mockClient) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
return c.fetchGroupOffsets(group)
}
func (c *mockClient) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
return c.getPartitionOffsetFromTheLeader(topic, partitionID)
}
27 changes: 21 additions & 6 deletions metricbeat/module/kafka/consumergroup/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type client interface {
ListGroups() ([]string, error)
DescribeGroups(group []string) (map[string]kafka.GroupDescription, error)
FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error)
FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error)
}

func fetchGroupInfo(
Expand Down Expand Up @@ -113,12 +114,19 @@ func fetchGroupInfo(

for topic, partitions := range ret.off.Blocks {
for partition, info := range partitions {
partitionOffset, err := getPartitionOffsetFromTheLeader(b, topic, partition)
if err != nil {
logp.Err("failed to fetch offset for (topic, partition): ('%v', %v)", topic, partition)
continue
}
consumerLag := partitionOffset - info.Offset
event := common.MapStr{
"id": ret.group,
"topic": topic,
"partition": partition,
"offset": info.Offset,
"meta": info.Metadata,
"id": ret.group,
"topic": topic,
"partition": partition,
"offset": info.Offset,
"meta": info.Metadata,
"consumer_lag": consumerLag,
"error": common.MapStr{
"code": info.Err,
},
Expand All @@ -133,7 +141,6 @@ func fetchGroupInfo(
}
}
}

emit(event)
}
}
Expand All @@ -145,6 +152,14 @@ func fetchGroupInfo(
return err
}

func getPartitionOffsetFromTheLeader(b client, topic string, partitionID int32) (int64, error) {
offset, err := b.FetchPartitionOffsetFromTheLeader(topic, partitionID)
if err != nil {
return -1, err
}
return offset, nil
}

func listGroups(b client, filter func(string) bool) ([]string, error) {
groups, err := b.ListGroups()
if err != nil {
Expand Down
52 changes: 31 additions & 21 deletions metricbeat/module/kafka/consumergroup/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,44 @@ func TestFetchGroupInfo(t *testing.T) {
}),
expected: []common.MapStr{
testEvent("group1", "topic1", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(10),
"client": clientMeta(0),
"offset": int64(10),
"consumer_lag": int64(42) - int64(10),
}),
testEvent("group1", "topic1", 1, common.MapStr{
"client": clientMeta(1),
"offset": int64(11),
"client": clientMeta(1),
"offset": int64(11),
"consumer_lag": int64(42) - int64(11),
}),
testEvent("group1", "topic1", 2, common.MapStr{
"client": clientMeta(0),
"offset": int64(12),
"client": clientMeta(0),
"offset": int64(12),
"consumer_lag": int64(42) - int64(12),
}),
testEvent("group1", "topic3", 0, common.MapStr{
"client": clientMeta(1),
"offset": int64(6),
"client": clientMeta(1),
"offset": int64(6),
"consumer_lag": int64(42) - int64(6),
}),
testEvent("group1", "topic3", 1, common.MapStr{
"client": clientMeta(0),
"offset": int64(7),
"client": clientMeta(0),
"offset": int64(7),
"consumer_lag": int64(42) - int64(7),
}),
testEvent("group2", "topic2", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(3),
"client": clientMeta(0),
"offset": int64(3),
"consumer_lag": int64(42) - int64(3),
}),
testEvent("group2", "topic3", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(9),
"client": clientMeta(0),
"offset": int64(9),
"consumer_lag": int64(42) - int64(9),
}),
testEvent("group2", "topic3", 1, common.MapStr{
"client": clientMeta(0),
"offset": int64(10),
"client": clientMeta(0),
"offset": int64(10),
"consumer_lag": int64(42) - int64(10),
}),
},
},
Expand Down Expand Up @@ -127,12 +135,14 @@ func TestFetchGroupInfo(t *testing.T) {
topics: []string{"topic1"},
expected: []common.MapStr{
testEvent("group1", "topic1", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(1),
"client": clientMeta(0),
"offset": int64(1),
"consumer_lag": int64(42) - int64(1),
}),
testEvent("group1", "topic1", 1, common.MapStr{
"client": clientMeta(0),
"offset": int64(2),
"client": clientMeta(0),
"offset": int64(2),
"consumer_lag": int64(42) - int64(2),
}),
},
},
Expand Down Expand Up @@ -224,7 +234,7 @@ func TestFetchGroupInfo(t *testing.T) {
for key, expected := range indexEvents(test.expected) {
event, found := indexed[key]
if !found {
t.Errorf("Missing event: %v", key)
t.Errorf("Missing key %v from events: %v", key, events)
continue
}
assertEvent(t, expected, event)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9226f08

Please sign in to comment.