diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6243b09ca30..96af2eeb330 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -350,6 +350,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `fingerprint` processor. {issue}11173[11173] {pull}14205[14205] - Add support for API keys in Elasticsearch outputs. {pull}14324[14324] - Ensure that init containers are no longer tailed after they stop {pull}14394[14394] +- Add consumer_lag in Kafka consumergroup metricset {pull}14822[14822] *Auditbeat* diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index be4848bb7a6..086c928db71 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -15791,6 +15791,15 @@ type: keyword -- +*`kafka.consumergroup.consumer_lag`*:: ++ +-- +consumer lag for partition/topic calculated as the difference between the partition offset and consumer offset + +type: long + +-- + *`kafka.consumergroup.error.code`*:: + -- diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index c016a43a6ce..1b5c446007b 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -43,6 +43,7 @@ func Version(version string) kafka.Version { type Broker struct { broker *sarama.Broker cfg *sarama.Config + client sarama.Client advertisedAddr string id int32 @@ -96,6 +97,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker { return &Broker{ broker: sarama.NewBroker(host), cfg: cfg, + client: nil, id: noID, matchID: settings.MatchID, } @@ -104,6 +106,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker { // Close the broker connection func (b *Broker) Close() error { closeBroker(b.broker) + b.client.Close() return nil } @@ -134,6 +137,13 @@ func (b *Broker) Connect() error { debugf("found matching broker %v with id %v", other.Addr(), other.ID()) b.id = other.ID() b.advertisedAddr = other.Addr() + + c, err := getClusterWideClient(b.Addr(), b.cfg) + if err != nil { + closeBroker(b.broker) + return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr()) + } + b.client = c return nil } @@ -270,7 +280,16 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32) return b.broker.FetchOffset(requ) } -// ID returns the broker or -1 if the broker id is unknown. +// FetchPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader. +func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) { + offset, err := b.client.GetOffset(topic, partitionID, sarama.OffsetNewest) + if err != nil { + return -1, err + } + return offset, nil +} + +// ID returns the broker ID or -1 if the broker id is unknown. func (b *Broker) ID() int32 { if b.id == noID { return b.broker.ID() @@ -524,6 +543,14 @@ func anyIPsMatch(as, bs []net.IP) bool { return false } +func getClusterWideClient(addr string, cfg *sarama.Config) (sarama.Client, error) { + client, err := sarama.NewClient([]string{addr}, cfg) + if err != nil { + return nil, err + } + return client, nil +} + func brokerAddresses(brokers []*sarama.Broker) []string { addresses := make([]string, len(brokers)) for i, b := range brokers { diff --git a/metricbeat/module/kafka/consumergroup/_meta/data.json b/metricbeat/module/kafka/consumergroup/_meta/data.json index 55cc9cefda8..2bce75648f3 100644 --- a/metricbeat/module/kafka/consumergroup/_meta/data.json +++ b/metricbeat/module/kafka/consumergroup/_meta/data.json @@ -1,45 +1,49 @@ { "@timestamp": "2017-10-12T08:05:34.853Z", - "beat": { - "hostname": "host.example.com", - "name": "host.example.com" + "event": { + "dataset": "kafka.consumergroup", + "duration": 115000, + "module": "kafka" }, "kafka": { "broker": { - "address": "172.18.0.2:9092", + "address": "localhost:32768", "id": 0 }, "consumergroup": { "broker": { - "address": "172.18.0.2:9092", + "address": "localhost:32768", "id": 0 }, "client": { - "host": "172.18.0.1", - "id": "sarama", - "member_id": "sarama-fcb5a5db-0474-4f3a-a5af-29e2f14549c5" + "host": "127.0.0.1", + "id": "consumer-1", + "member_id": "consumer-1-a12ac7d4-00aa-45a0-8b35-0a9c6e880bf4" }, + "consumer_lag": 1059, "error": { "code": 0 }, - "id": "test-group", + "id": "console-consumer-50413", "meta": "", "offset": -1, "partition": 0, - "topic": "metricbeat-test" + "topic": "test" }, "partition": { "id": 0, - "topic_id": "0-metricbeat-test" + "topic_id": "0-test" }, "topic": { - "name": "metricbeat-test" + "name": "test" } }, "metricset": { - "host": "kafka:9092", - "module": "kafka", "name": "consumergroup", - "rtt": 115 + "period": 10000 + }, + "service": { + "address": "localhost:32768", + "type": "kafka" } -} +} \ No newline at end of file diff --git a/metricbeat/module/kafka/consumergroup/_meta/fields.yml b/metricbeat/module/kafka/consumergroup/_meta/fields.yml index 963844e43b4..0f197d3c178 100644 --- a/metricbeat/module/kafka/consumergroup/_meta/fields.yml +++ b/metricbeat/module/kafka/consumergroup/_meta/fields.yml @@ -43,6 +43,10 @@ type: keyword description: custom consumer meta data string + - name: consumer_lag + type: long + description: consumer lag for partition/topic calculated as the difference between the partition offset and consumer offset + - name: error.code type: long description: > diff --git a/metricbeat/module/kafka/consumergroup/mock_test.go b/metricbeat/module/kafka/consumergroup/mock_test.go index c22062ae9c3..030af091bff 100644 --- a/metricbeat/module/kafka/consumergroup/mock_test.go +++ b/metricbeat/module/kafka/consumergroup/mock_test.go @@ -27,9 +27,10 @@ import ( ) type mockClient struct { - listGroups func() ([]string, error) - describeGroups func(group []string) (map[string]kafka.GroupDescription, error) - fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error) + listGroups func() ([]string, error) + describeGroups func(group []string) (map[string]kafka.GroupDescription, error) + fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error) + getPartitionOffsetFromTheLeader func(topic string, partitionID int32) (int64, error) } type mockState struct { @@ -45,6 +46,9 @@ func defaultMockClient(state mockState) *mockClient { listGroups: makeListGroups(state), describeGroups: makeDescribeGroups(state), fetchGroupOffsets: makeFetchGroupOffsets(state), + getPartitionOffsetFromTheLeader: func(topic string, partitionID int32) (int64, error) { + return 42, nil + }, } } @@ -145,3 +149,6 @@ func (c *mockClient) DescribeGroups(groups []string) (map[string]kafka.GroupDesc func (c *mockClient) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) { return c.fetchGroupOffsets(group) } +func (c *mockClient) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) { + return c.getPartitionOffsetFromTheLeader(topic, partitionID) +} diff --git a/metricbeat/module/kafka/consumergroup/query.go b/metricbeat/module/kafka/consumergroup/query.go index 1a2514c5d8e..8b9b98e3ec0 100644 --- a/metricbeat/module/kafka/consumergroup/query.go +++ b/metricbeat/module/kafka/consumergroup/query.go @@ -29,6 +29,7 @@ type client interface { ListGroups() ([]string, error) DescribeGroups(group []string) (map[string]kafka.GroupDescription, error) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) + FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) } func fetchGroupInfo( @@ -113,12 +114,19 @@ func fetchGroupInfo( for topic, partitions := range ret.off.Blocks { for partition, info := range partitions { + partitionOffset, err := getPartitionOffsetFromTheLeader(b, topic, partition) + if err != nil { + logp.Err("failed to fetch offset for (topic, partition): ('%v', %v)", topic, partition) + continue + } + consumerLag := partitionOffset - info.Offset event := common.MapStr{ - "id": ret.group, - "topic": topic, - "partition": partition, - "offset": info.Offset, - "meta": info.Metadata, + "id": ret.group, + "topic": topic, + "partition": partition, + "offset": info.Offset, + "meta": info.Metadata, + "consumer_lag": consumerLag, "error": common.MapStr{ "code": info.Err, }, @@ -133,7 +141,6 @@ func fetchGroupInfo( } } } - emit(event) } } @@ -145,6 +152,14 @@ func fetchGroupInfo( return err } +func getPartitionOffsetFromTheLeader(b client, topic string, partitionID int32) (int64, error) { + offset, err := b.FetchPartitionOffsetFromTheLeader(topic, partitionID) + if err != nil { + return -1, err + } + return offset, nil +} + func listGroups(b client, filter func(string) bool) ([]string, error) { groups, err := b.ListGroups() if err != nil { diff --git a/metricbeat/module/kafka/consumergroup/query_test.go b/metricbeat/module/kafka/consumergroup/query_test.go index 6d696dbeada..febf616b67e 100644 --- a/metricbeat/module/kafka/consumergroup/query_test.go +++ b/metricbeat/module/kafka/consumergroup/query_test.go @@ -67,36 +67,44 @@ func TestFetchGroupInfo(t *testing.T) { }), expected: []common.MapStr{ testEvent("group1", "topic1", 0, common.MapStr{ - "client": clientMeta(0), - "offset": int64(10), + "client": clientMeta(0), + "offset": int64(10), + "consumer_lag": int64(42) - int64(10), }), testEvent("group1", "topic1", 1, common.MapStr{ - "client": clientMeta(1), - "offset": int64(11), + "client": clientMeta(1), + "offset": int64(11), + "consumer_lag": int64(42) - int64(11), }), testEvent("group1", "topic1", 2, common.MapStr{ - "client": clientMeta(0), - "offset": int64(12), + "client": clientMeta(0), + "offset": int64(12), + "consumer_lag": int64(42) - int64(12), }), testEvent("group1", "topic3", 0, common.MapStr{ - "client": clientMeta(1), - "offset": int64(6), + "client": clientMeta(1), + "offset": int64(6), + "consumer_lag": int64(42) - int64(6), }), testEvent("group1", "topic3", 1, common.MapStr{ - "client": clientMeta(0), - "offset": int64(7), + "client": clientMeta(0), + "offset": int64(7), + "consumer_lag": int64(42) - int64(7), }), testEvent("group2", "topic2", 0, common.MapStr{ - "client": clientMeta(0), - "offset": int64(3), + "client": clientMeta(0), + "offset": int64(3), + "consumer_lag": int64(42) - int64(3), }), testEvent("group2", "topic3", 0, common.MapStr{ - "client": clientMeta(0), - "offset": int64(9), + "client": clientMeta(0), + "offset": int64(9), + "consumer_lag": int64(42) - int64(9), }), testEvent("group2", "topic3", 1, common.MapStr{ - "client": clientMeta(0), - "offset": int64(10), + "client": clientMeta(0), + "offset": int64(10), + "consumer_lag": int64(42) - int64(10), }), }, }, @@ -127,12 +135,14 @@ func TestFetchGroupInfo(t *testing.T) { topics: []string{"topic1"}, expected: []common.MapStr{ testEvent("group1", "topic1", 0, common.MapStr{ - "client": clientMeta(0), - "offset": int64(1), + "client": clientMeta(0), + "offset": int64(1), + "consumer_lag": int64(42) - int64(1), }), testEvent("group1", "topic1", 1, common.MapStr{ - "client": clientMeta(0), - "offset": int64(2), + "client": clientMeta(0), + "offset": int64(2), + "consumer_lag": int64(42) - int64(2), }), }, }, @@ -224,7 +234,7 @@ func TestFetchGroupInfo(t *testing.T) { for key, expected := range indexEvents(test.expected) { event, found := indexed[key] if !found { - t.Errorf("Missing event: %v", key) + t.Errorf("Missing key %v from events: %v", key, events) continue } assertEvent(t, expected, event) diff --git a/metricbeat/module/kafka/fields.go b/metricbeat/module/kafka/fields.go index fcbffca68c4..38e2bfb076b 100644 --- a/metricbeat/module/kafka/fields.go +++ b/metricbeat/module/kafka/fields.go @@ -32,5 +32,5 @@ func init() { // AssetKafka returns asset data. // This is the base64 encoded gzipped contents of ../metricbeat/module/kafka. func AssetKafka() string { - return "eJzUWs2O3DYSvs9TFHwaHyyfdg9zWGDXXgQT27HhOECQi8AmS93MSKRMUj3TfvqApKTWL0W1epx4TtOSqr6PVcVischX8ICnO3gg2QO5ATDc5HgHL97Z3y9uABhqqnhpuBR38J8bAAD3DgrJqhxvAPRBKpNSKTK+v4OM5No+VZgj0XgHe6s245gzfefEX4EgBZ4h7Z85lfZTJauyfjKB21fTVbVT8gFV+3hK36xO//c/pwHeSKGrAhX8ZEXhXmRSFcQKwIEcEXaIAhQSBpmSBdzWYgciWM7FvqfSHBBoo89ReZl0PhiOpTseznqPm/HkcgARHFJnWJzdTOIQxhRqPQn2gKdHqYZE4vAIO6IyXCNrIUY+M7LkNLH/j/w2hg7AfrF6nM45DFRKqoRKNkYaWHQRxqkCqyoZo5VEGW5lk57/1iJ9atQAZ0EUN7p0Aitsvx7Yb4J/rRA4A5m5iC3P6MI98DZc5uHn4PehA0Qw98uDJldJCHXsFmgUp9pPcJ/q6jc/f/i9I9smuB0aEjmvix0SEZpRH+wHYA7EgDlwDXhEYYBri0YMMjAyerI2oAq/VqhNQg9ECMyTrxVWmGj+DUNMvhwQ7DeNI2ot4KTjstOQQKkkqygmGeE5srRElWqkUgRzjOWhiHE8vCDUehq9GkpUMKnJE8tySUyQWYaGHi7nRXNu3eS0tIay2iqFV2DXt9sSKVEVO1QBc13BRvEcgqZZzaTMOXWrcZIjYahSzJHa30NVI0b+e2i+d67bAF8JmiMR6Voatdw16GjU2lL5JuUDYokqYVxTKQRSs0TjDynfORmgubSrdK1sQ7CO6eBTydVijjlT8d8/DxdbskmRn+LZNBLPQkefBF3jIzeHat9u45LLfZLllT6kEyE3njVyD+7rSwK0LvDQJFwku5NB3aTWJVguqCy42IOV8lnWDtgpvJiErMw6FrIye3ltFgr/RGqQraPSSF2NSoFakz3qlAfLkZ4zaplt8NcJhwtAr+D+C1Cv5e6V0FvdGwHXQDU73HW1drvPnqi223c/aL3tap2o9FpwwYuq8DOKGHg8cHro9w00Cqb75ZMGI4GMtzgxgeHDsNa+WMaRIyobEudyzsk37BhkUgEBXSLlGaf13mxDvUulYlvo1RrOBM9cJrmuJLg2cTX7g8Zqbp7ZfazsOXnt5CZPaU6CjSAXXOTJBVcbSmOZJaS2YEmpLAo+2jrMDlhmmUZbsTgpO962mllJwTUJt8O/6/Qao+0cn0TbjeA5rflk6h+4Ly/IqcMUurqrOaeo35xt/kKZNJBthso9p1IhtRn0Dv6d/Ctkv9km4jV7sbDcj52zAIT6shDqzUYMFXo92ubJLP50vxYWerbreAwxpjPhdG6P6BkPvfo2DDSfpuewogOv0zgOUmhbkHFdr1Uczr3eJUP4nBZPoQPSBn2dF7kwstNY3aFdluxECjMo+nUXrPE6rbSRRTc/GgKMGALaqO5knUSebODHj38c7m5Vadm8Ptui298PUvJ78hX5MMjnv1rzvUDWbPWtM6xTXDlaF8dzYbghZ0Wmizee1P1buPWG02iMpefZJpy9XE5dB6mH5rqUSE/VLGCBtipMtw+fC4NKkHyweNQA3Ykbyhir1+4pJevX7UDauCROj4TnZJdjrVc3pwN7fkTRORJaGaMCHzEQHpevrb84xU3iG54pDWl2zJaz5yH00SleJnTBSnSBP8/Lj83F36MaCq2MEYRhdD7aPJ1l6Vvtz+DK976Hzxnc+up4VE52TKXnGWwq2t5zF00WADjT8wzq04tnsMNnr3naEPMWEfokaLpEaydlPu7hRDK7F4zbaNPAs8YAwDVwQfOKIWvOlLl4Zcm0JzxoVzi4vf/1c9RIdLoQY88yCNOeai1TnC2g4Ar+/39bM/lCxTWrbHkQm9YCtzSC/DZtMs3oPsc8tcElFbjeHmTMK3ZXUt98WHNTaPuWnE93PGsul9wm2s5pdr86Lsv8QfjKVvWnWmqqVd2++0Fb1aSp59JdlWWoUtepDG6a3RUcQ3IghayEW3u8rK2HpRqesC42pImhh1Tzb5iS42JPc64href2YlHABXlaAm6aqdHAgcswVCqWahQs6oxgvsltsbc221OFRp0uJmIUR1arqo8qthJyGfkfRMg3++ujl7/ZWWunSWOH8VWvNYArpscS4NIVsSi/n43bJPSr3ATTpRQaL2fg5TdQ4DJ9JHwxxlrI+9cfwQqA4TPlyTzW6tPw/lGWPxiXlXGnaqbDaiWP+pwkyurleSGOOqn+KwAA//8jv6eW" + return "eJzUWs2O3DYSvs9TFHwaHyyfdg9zWGDXXgQT27HhOECQi8AmS93MSKRMUj3TfvqApKTWL0W1epx4TtOSqr6PxWKxWMVX8ICnO3gg2QO5ATDc5HgHL97Z3y9uABhqqnhpuBR38J8bAAD3DgrJqhxvAPRBKpNSKTK+v4OM5No+VZgj0XgHe6s245gzfefEX4EgBZ4h7Z85lfZTJauyfjKB21fTVbVT8gFV+3hK36xO//c/pwHeSKGrAhX8ZEXhXmRSFcQKwIEcEXaIAhQSBpmSBdzWYgciWM7FvqfSHBBoo89ReZl0PhiOpTseznqPm/HkcgARHFJnWJzdTOIQxhRqPQn2gKdHqYZE4vAIO6IyXCNrIUZzZmTJaWL/H83bGDoA+8XqcTrnMFApqRIq2RhpYNFFGKcKrKpkjFYSZbiVTXrztxbpU6MGOAuiuNGlE1hh+/XAfhP8a4XAGcjMeWx5RhfugbfhMg+/Br8PHSCCuV8eNLlKQKh9t0CjONV+gftQV7/5+cPvHdk2wO3QkMh1XeyQiNCK+mA/AHMgBsyBa8AjCgNcWzRikIGR0Yu1AVX4tUJtEnogQmCefK2wwkTzbxhi8uWAYL9pJqLWAk46LjoNCZRKsopikhGeI0tLVKlGKkUwxlgeihjHwwtCrafRq6FEBZOaPLEsl8QEmWVo6OFyXjTndpqcltZQVlul8Ars+nZbIiWqYocqYK4r2CieQ9A0q5mUOaduN05yJAxVijlS+3uoasTIfw/N927qNsBXguZIRLqWRi13DToatbZUvkn5gFiiShjXVAqB1CzR+EPKd04GaC7tLl0r2+CsYzr4VHK1GGPOVPz3z8PFpmxS5Kd4No3Es9DRJ0HXzJFbQ/XcbuOSy32S5ZU+pBMuN141cg/u60sctE7w0CRcJLuTQd2E1iVYLqgsuNiDlfJR1g7YKbyYhKzMOhayMnt5bRYK/0RqkK2j0khdjUqBWpM96pQH05HeZNQy2+Cv4w4XgF5h+i9AvdZ0r4TeOr0RcA1Uc8Jdl2u35+yJbLt994Pm2y7XiQqvBRe8qAq/ooiBxwOnh37dQKNgup8+aTASyPiIE+MY3g1r7YtpHDmisi5xTuecfMOOQSYVENAlUp5xWp/NNuS7VCq2hV6t4UzwzGWS60qCawNXcz5orObWmT3Hyt4kr13c5CnNSbAQ5JyLPDnnal1pLLOE1CYsKZVFwUdHh9kByyzTaDMWJ2XH22YzKym4IuF2+HedWmO0neODaHsQPIc1H0z9A/flBTF1GEJXVzXnFPWLs81fKJIGos1QuedUKqQ2gt7Bv5N/hew3W0S8Zi0WluuxcxaAUF0WQrXZiKFCr0bbPJnFn67XwkLNdh2PIcZ0JJyO7RE14+Gsvg0DzYfpOaxox+sUjoMU2hJkXNVrFYdzrXfJED6mxVPogLROX8dFLozsFFZ3aLclu5DCDIp+3gVrZp1W2siiGx8NAUYMAW1Ud7FOIjdiE9veSgvkZO8SgXb0r33OQklOK5/xEe0iBeNZhgoFtYHGPNpY069H18Ykgg0NHB7MZDcifijjteu2yJbD6zPDbrMibF9XYFgR3IN8/qs13wtkTd3Cepb1MJdb15n+3JraEIAjY98bT+r+Ldx6w2k0xtLzbBPOXi7H4YPUQ3NdSqSnahawQJviptuHz4VBJUg+2AlrgG4UCoW/1YnIlJL1SUggBl7ip0fCc7LLsdarm1bHnh9RdPpbK31U4CMG3OPyROEXp7gJPMMG2ZBmx2w5ex5CH53iZUIXbKsXzOd5L7Uby/dI7ULbfARhGDV7m6ezLH3f4Bmm8r1vSHAGtz7VH+XGHVPpeQabMtD33HmTBQDO9DyDuhXzDHb47DVPG2LeIkKfBE2XaO2kzMcFqUhm94Jx620aeNYYALgGLmheMWRNg5yLV5ZM265Cu8PB7f2vn6NGotMFH3uWQZi2RbdMcTaBgivM///bnMknKq7yZtOD2LAWuHIS5LfpxGxGl1PmqQ1u3MD1DlRjXrFHrPoax5prT9vrC3y6fFtzueRq1HZOs4fvcVrmu/or6+6faqmpunv77getu5Mmn0t3lT27pa7sGmLxxd0nMiQHUshKuL3Hy9p8WKphu3ixuk4MPaSaf8OUHBcLtHPVdT13FosCLsjTEnBTGY4GDtzsoVKxVKNgUQ2P+Yq9xd7aOUgVGnW6mIhRHFmtqu67bCXkIvI/iJDvXNR9pL95stYuk8YO43trawBXLI8lwKX7blHzfjZuE9Cvcq1Nl1JovJyBl99Agcv0kfBFH2sh719/BCsAhs+kJ/NYq1v7/b6c7/LLyrjKoOmwWsmjbvpEWb08b8RRbfe/AgAA//9LpueC" }