From d9b5704fc427cb14b8b72c8e54ec2a72b9cdb7cf Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 27 Nov 2019 15:27:14 +0200 Subject: [PATCH 1/9] Add consumer_lag in Kafka consumergroup metricset Signed-off-by: chrismark --- metricbeat/docs/fields.asciidoc | 9 +++++ metricbeat/module/kafka/broker.go | 5 +++ .../kafka/consumergroup/_meta/fields.yml | 4 +++ .../kafka/consumergroup/consumergroup.go | 3 +- .../module/kafka/consumergroup/query.go | 34 +++++++++++++++---- .../module/kafka/consumergroup/query_test.go | 4 ++- metricbeat/module/kafka/fields.go | 2 +- 7 files changed, 52 insertions(+), 9 deletions(-) diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 8923f910ce2..0a40ef22245 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -15761,6 +15761,15 @@ type: keyword -- +*`kafka.consumergroup.consumer_lag`*:: ++ +-- +consumer lag for partition/topic + +type: long + +-- + *`kafka.consumergroup.error.code`*:: + -- diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index c016a43a6ce..be74f864f0c 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -148,6 +148,11 @@ func (b *Broker) AdvertisedAddr() string { return b.advertisedAddr } +// BrokerCfg returns the client configuration attached to the broker +func (b *Broker) BrokerCfg() *sarama.Config { + return b.cfg +} + // GetMetadata fetches most recent cluster metadata from the broker. func (b *Broker) GetMetadata(topics ...string) (*sarama.MetadataResponse, error) { return queryMetadataWithRetry(b.broker, b.cfg, topics) diff --git a/metricbeat/module/kafka/consumergroup/_meta/fields.yml b/metricbeat/module/kafka/consumergroup/_meta/fields.yml index 963844e43b4..4d7b2cc4c27 100644 --- a/metricbeat/module/kafka/consumergroup/_meta/fields.yml +++ b/metricbeat/module/kafka/consumergroup/_meta/fields.yml @@ -43,6 +43,10 @@ type: keyword description: custom consumer meta data string + - name: consumer_lag + type: long + description: consumer lag for partition/topic + - name: error.code type: long description: > diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go index 045276ed9d5..d56000f95ae 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -80,6 +80,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch consumer group metrics from kafka func (m *MetricSet) Fetch(r mb.ReporterV2) error { broker, err := m.Connect() + if err != nil { return errors.Wrap(err, "error in connect") } @@ -110,7 +111,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { MetricSetFields: event, }) } - err = fetchGroupInfo(emitEvent, broker, m.groups.pred(), m.topics.pred()) + err = fetchGroupInfo(emitEvent, broker, m.groups.pred(), m.topics.pred(), broker.BrokerCfg(), broker.Addr()) if err != nil { return errors.Wrap(err, "error in fetch") } diff --git a/metricbeat/module/kafka/consumergroup/query.go b/metricbeat/module/kafka/consumergroup/query.go index 1a2514c5d8e..16f18055cb7 100644 --- a/metricbeat/module/kafka/consumergroup/query.go +++ b/metricbeat/module/kafka/consumergroup/query.go @@ -34,7 +34,7 @@ type client interface { func fetchGroupInfo( emit func(common.MapStr), b client, - groupsFilter, topicsFilter func(string) bool, + groupsFilter, topicsFilter func(string) bool, cfg *sarama.Config, brokerAddr string, ) error { type result struct { err error @@ -113,12 +113,22 @@ func fetchGroupInfo( for topic, partitions := range ret.off.Blocks { for partition, info := range partitions { + partitionOffset, err := getPartitonOffsetFromTheLeader(topic, partition, cfg, brokerAddr) + if err != nil { + logp.Err("failed to fetch offset for (topic, partition): ('%v', %v)", topic, partition) + continue + } + consumerLag := info.Offset - partitionOffset + if consumerLag < 0 { + consumerLag = 0 + } event := common.MapStr{ - "id": ret.group, - "topic": topic, - "partition": partition, - "offset": info.Offset, - "meta": info.Metadata, + "id": ret.group, + "topic": topic, + "partition": partition, + "offset": info.Offset, + "meta": info.Metadata, + "consumer_lag": consumerLag, "error": common.MapStr{ "code": info.Err, }, @@ -145,6 +155,18 @@ func fetchGroupInfo( return err } +func getPartitonOffsetFromTheLeader(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) { + client, err := sarama.NewClient([]string{brokerAddr}, cfg) + if err != nil { + return -1, err + } + offset, err := client.GetOffset(topic, partitionID, sarama.OffsetNewest) + if err != nil { + return -1, err + } + return offset, nil +} + func listGroups(b client, filter func(string) bool) ([]string, error) { groups, err := b.ListGroups() if err != nil { diff --git a/metricbeat/module/kafka/consumergroup/query_test.go b/metricbeat/module/kafka/consumergroup/query_test.go index 6d696dbeada..c18922687cb 100644 --- a/metricbeat/module/kafka/consumergroup/query_test.go +++ b/metricbeat/module/kafka/consumergroup/query_test.go @@ -23,6 +23,8 @@ import ( "reflect" "testing" + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" @@ -209,7 +211,7 @@ func TestFetchGroupInfo(t *testing.T) { groups := makeNameSet(test.groups...).pred() topics := makeNameSet(test.topics...).pred() - err := fetchGroupInfo(collectEvents, test.client, groups, topics) + err := fetchGroupInfo(collectEvents, test.client, groups, topics, sarama.NewConfig(), "somehost") if err != nil { switch { case test.err == nil: diff --git a/metricbeat/module/kafka/fields.go b/metricbeat/module/kafka/fields.go index fcbffca68c4..d3c8d5ca1eb 100644 --- a/metricbeat/module/kafka/fields.go +++ b/metricbeat/module/kafka/fields.go @@ -32,5 +32,5 @@ func init() { // AssetKafka returns asset data. // This is the base64 encoded gzipped contents of ../metricbeat/module/kafka. func AssetKafka() string { - return "eJzUWs2O3DYSvs9TFHwaHyyfdg9zWGDXXgQT27HhOECQi8AmS93MSKRMUj3TfvqApKTWL0W1epx4TtOSqr6PVcVischX8ICnO3gg2QO5ATDc5HgHL97Z3y9uABhqqnhpuBR38J8bAAD3DgrJqhxvAPRBKpNSKTK+v4OM5No+VZgj0XgHe6s245gzfefEX4EgBZ4h7Z85lfZTJauyfjKB21fTVbVT8gFV+3hK36xO//c/pwHeSKGrAhX8ZEXhXmRSFcQKwIEcEXaIAhQSBpmSBdzWYgciWM7FvqfSHBBoo89ReZl0PhiOpTseznqPm/HkcgARHFJnWJzdTOIQxhRqPQn2gKdHqYZE4vAIO6IyXCNrIUY+M7LkNLH/j/w2hg7AfrF6nM45DFRKqoRKNkYaWHQRxqkCqyoZo5VEGW5lk57/1iJ9atQAZ0EUN7p0Aitsvx7Yb4J/rRA4A5m5iC3P6MI98DZc5uHn4PehA0Qw98uDJldJCHXsFmgUp9pPcJ/q6jc/f/i9I9smuB0aEjmvix0SEZpRH+wHYA7EgDlwDXhEYYBri0YMMjAyerI2oAq/VqhNQg9ECMyTrxVWmGj+DUNMvhwQ7DeNI2ot4KTjstOQQKkkqygmGeE5srRElWqkUgRzjOWhiHE8vCDUehq9GkpUMKnJE8tySUyQWYaGHi7nRXNu3eS0tIay2iqFV2DXt9sSKVEVO1QBc13BRvEcgqZZzaTMOXWrcZIjYahSzJHa30NVI0b+e2i+d67bAF8JmiMR6Voatdw16GjU2lL5JuUDYokqYVxTKQRSs0TjDynfORmgubSrdK1sQ7CO6eBTydVijjlT8d8/DxdbskmRn+LZNBLPQkefBF3jIzeHat9u45LLfZLllT6kEyE3njVyD+7rSwK0LvDQJFwku5NB3aTWJVguqCy42IOV8lnWDtgpvJiErMw6FrIye3ltFgr/RGqQraPSSF2NSoFakz3qlAfLkZ4zaplt8NcJhwtAr+D+C1Cv5e6V0FvdGwHXQDU73HW1drvPnqi223c/aL3tap2o9FpwwYuq8DOKGHg8cHro9w00Cqb75ZMGI4GMtzgxgeHDsNa+WMaRIyobEudyzsk37BhkUgEBXSLlGaf13mxDvUulYlvo1RrOBM9cJrmuJLg2cTX7g8Zqbp7ZfazsOXnt5CZPaU6CjSAXXOTJBVcbSmOZJaS2YEmpLAo+2jrMDlhmmUZbsTgpO962mllJwTUJt8O/6/Qao+0cn0TbjeA5rflk6h+4Ly/IqcMUurqrOaeo35xt/kKZNJBthso9p1IhtRn0Dv6d/Ctkv9km4jV7sbDcj52zAIT6shDqzUYMFXo92ubJLP50vxYWerbreAwxpjPhdG6P6BkPvfo2DDSfpuewogOv0zgOUmhbkHFdr1Uczr3eJUP4nBZPoQPSBn2dF7kwstNY3aFdluxECjMo+nUXrPE6rbSRRTc/GgKMGALaqO5knUSebODHj38c7m5Vadm8Ptui298PUvJ78hX5MMjnv1rzvUDWbPWtM6xTXDlaF8dzYbghZ0Wmizee1P1buPWG02iMpefZJpy9XE5dB6mH5rqUSE/VLGCBtipMtw+fC4NKkHyweNQA3Ykbyhir1+4pJevX7UDauCROj4TnZJdjrVc3pwN7fkTRORJaGaMCHzEQHpevrb84xU3iG54pDWl2zJaz5yH00SleJnTBSnSBP8/Lj83F36MaCq2MEYRhdD7aPJ1l6Vvtz+DK976Hzxnc+up4VE52TKXnGWwq2t5zF00WADjT8wzq04tnsMNnr3naEPMWEfokaLpEaydlPu7hRDK7F4zbaNPAs8YAwDVwQfOKIWvOlLl4Zcm0JzxoVzi4vf/1c9RIdLoQY88yCNOeai1TnC2g4Ar+/39bM/lCxTWrbHkQm9YCtzSC/DZtMs3oPsc8tcElFbjeHmTMK3ZXUt98WHNTaPuWnE93PGsul9wm2s5pdr86Lsv8QfjKVvWnWmqqVd2++0Fb1aSp59JdlWWoUtepDG6a3RUcQ3IghayEW3u8rK2HpRqesC42pImhh1Tzb5iS42JPc64href2YlHABXlaAm6aqdHAgcswVCqWahQs6oxgvsltsbc221OFRp0uJmIUR1arqo8qthJyGfkfRMg3++ujl7/ZWWunSWOH8VWvNYArpscS4NIVsSi/n43bJPSr3ATTpRQaL2fg5TdQ4DJ9JHwxxlrI+9cfwQqA4TPlyTzW6tPw/lGWPxiXlXGnaqbDaiWP+pwkyurleSGOOqn+KwAA//8jv6eW" + return "eJzUWs2O3DYSvs9TFHwaHyyfdg9zWGDXXgQT27HhOECQi8AmS93MSKRMUj3TfvqApKTWL0W1epx4TtOSqr6PxWKxWMVX8ICnO3gg2QO5ATDc5HgHL97Z3y9uABhqqnhpuBR38J8bAAD3DgrJqhxvAPRBKpNSKTK+v4OM5No+VZgj0XgHe6s245gzfefEX4EgBZ4h7Z85lfZTJauyfjKB21fTVbVT8gFV+3hK36xO//c/pwHeSKGrAhX8ZEXhXmRSFcQKwIEcEXaIAhQSBpmSBdzWYgciWM7FvqfSHBBoo89ReZl0PhiOpTseznqPm/HkcgARHFJnWJzdTOIQxhRqPQn2gKdHqYZE4vAIO6IyXCNrIUZzZmTJaWL/H83bGDoA+8XqcTrnMFApqRIq2RhpYNFFGKcKrKpkjFYSZbiVTXrztxbpU6MGOAuiuNGlE1hh+/XAfhP8a4XAGcjMeWx5RhfugbfhMg+/Br8PHSCCuV8eNLlKQKh9t0CjONV+gftQV7/5+cPvHdk2wO3QkMh1XeyQiNCK+mA/AHMgBsyBa8AjCgNcWzRikIGR0Yu1AVX4tUJtEnogQmCefK2wwkTzbxhi8uWAYL9pJqLWAk46LjoNCZRKsopikhGeI0tLVKlGKkUwxlgeihjHwwtCrafRq6FEBZOaPLEsl8QEmWVo6OFyXjTndpqcltZQVlul8Ars+nZbIiWqYocqYK4r2CieQ9A0q5mUOaduN05yJAxVijlS+3uoasTIfw/N927qNsBXguZIRLqWRi13DToatbZUvkn5gFiiShjXVAqB1CzR+EPKd04GaC7tLl0r2+CsYzr4VHK1GGPOVPz3z8PFpmxS5Kd4No3Es9DRJ0HXzJFbQ/XcbuOSy32S5ZU+pBMuN141cg/u60sctE7w0CRcJLuTQd2E1iVYLqgsuNiDlfJR1g7YKbyYhKzMOhayMnt5bRYK/0RqkK2j0khdjUqBWpM96pQH05HeZNQy2+Cv4w4XgF5h+i9AvdZ0r4TeOr0RcA1Uc8Jdl2u35+yJbLt994Pm2y7XiQqvBRe8qAq/ooiBxwOnh37dQKNgup8+aTASyPiIE+MY3g1r7YtpHDmisi5xTuecfMOOQSYVENAlUp5xWp/NNuS7VCq2hV6t4UzwzGWS60qCawNXcz5orObWmT3Hyt4kr13c5CnNSbAQ5JyLPDnnal1pLLOE1CYsKZVFwUdHh9kByyzTaDMWJ2XH22YzKym4IuF2+HedWmO0neODaHsQPIc1H0z9A/flBTF1GEJXVzXnFPWLs81fKJIGos1QuedUKqQ2gt7Bv5N/hew3W0S8Zi0WluuxcxaAUF0WQrXZiKFCr0bbPJnFn67XwkLNdh2PIcZ0JJyO7RE14+Gsvg0DzYfpOaxox+sUjoMU2hJkXNVrFYdzrXfJED6mxVPogLROX8dFLozsFFZ3aLclu5DCDIp+3gVrZp1W2siiGx8NAUYMAW1Ud7FOIjdiE9veSgvkZO8SgXb0r/sONgk/2T+IBx+vNreptaRen6ei214IW8SVBFaE4yCf/2rN9wJZU2mwvmB9wmXDdW4+two2hMzIaPXGk7p/C7fecBqNsfQ824Szl8uR8yD10FyXEumpmgUs0Cal6fbhc2FQCZIP9q4aoBs3QgFrdeowpWR92hCIWpf46ZHwnOxyrPXqpjmx50cUnY7USh8V+IgB97h8a//FKW7i7rClNaTZMVvOnofQR6d4mdAFG+EF83ne/exW8D2SsdDGHEEYRu3Z5uksS1/pf4apfO9bCJzBrU/OR9lsx1R6nsGmnPE9d95kAYAzPc+gbp48gx0+e83Thpi3iNAnQdMlWjsp83EJKZLZvWDcepsGnjUGAK6BC5pXDFnT0ubilSXTNpjQ7nBwe//r56iR6HTBx55lEKZtqi1TnE2g4Arz//82Z/KJiquV2fQgNqwFLokE+W0645rRdZJ5aoM7MnC9I9CYV+yhqL54seai0vaKAJ8uuNZcLrnMtJ3T7HF5nJb5PvzKSvmnWmqqUt6++0Er5aTJ59JdlWWoUlcoDbH44m4AGZIDKWQl3N7jZW0+LNWwwbtYDyeGHlLNv2FKjosl1bl6uJ47i0UBF+RpCbip5UYDB+7iUKlYqlGwqBbFfI3dYm+t9acKjTpdTMQojqxWVXdKthJyEfkfRMj3GurOz988WWuXSWOH8U2zNYArlscS4NINtah5Pxu3CehXuYimSyk0Xs7Ay2+gwGX6SPiij7WQ968/ghUAw2fSk3ms1c34fifN9+VlZVwtz3RYreRRt2mirF6eN+KoRvlfAQAA//93rco+" } From 558a39b961d2531e0601304541f9a6c2e0deebe5 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 27 Nov 2019 17:44:25 +0200 Subject: [PATCH 2/9] Fix sub and tests Signed-off-by: chrismark --- metricbeat/module/kafka/broker.go | 13 +++++++++++++ .../module/kafka/consumergroup/mock_test.go | 19 ++++++++++++++++--- .../module/kafka/consumergroup/query.go | 14 +++++--------- .../module/kafka/consumergroup/query_test.go | 2 +- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index be74f864f0c..f740f8673cb 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -275,6 +275,19 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32) return b.broker.FetchOffset(requ) } +// GetPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader. +func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) { + client, err := sarama.NewClient([]string{brokerAddr}, cfg) + if err != nil { + return -1, err + } + offset, err := client.GetOffset(topic, partitionID, sarama.OffsetNewest) + if err != nil { + return -1, err + } + return offset, nil +} + // ID returns the broker or -1 if the broker id is unknown. func (b *Broker) ID() int32 { if b.id == noID { diff --git a/metricbeat/module/kafka/consumergroup/mock_test.go b/metricbeat/module/kafka/consumergroup/mock_test.go index c22062ae9c3..7a8e958406f 100644 --- a/metricbeat/module/kafka/consumergroup/mock_test.go +++ b/metricbeat/module/kafka/consumergroup/mock_test.go @@ -27,9 +27,10 @@ import ( ) type mockClient struct { - listGroups func() ([]string, error) - describeGroups func(group []string) (map[string]kafka.GroupDescription, error) - fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error) + listGroups func() ([]string, error) + describeGroups func(group []string) (map[string]kafka.GroupDescription, error) + fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error) + getPartitionOffsetFromTheLeader func(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) } type mockState struct { @@ -45,6 +46,9 @@ func defaultMockClient(state mockState) *mockClient { listGroups: makeListGroups(state), describeGroups: makeDescribeGroups(state), fetchGroupOffsets: makeFetchGroupOffsets(state), + getPartitionOffsetFromTheLeader: func(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) { + return 42, nil + }, } } @@ -53,6 +57,12 @@ func (c *mockClient) with(fn func(*mockClient)) *mockClient { return c } +func makePartitonOffset(state mockState) func() (int64, error) { + return func() (int64, error) { + return 42, nil + } +} + func makeListGroups(state mockState) func() ([]string, error) { names := make([]string, 0, len(state.groups)) for name := range state.groups { @@ -145,3 +155,6 @@ func (c *mockClient) DescribeGroups(groups []string) (map[string]kafka.GroupDesc func (c *mockClient) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) { return c.fetchGroupOffsets(group) } +func (c *mockClient) GetPartitionOffsetFromTheLeader(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) { + return c.getPartitionOffsetFromTheLeader(topic, partitionID, cfg, brokerAddr) +} diff --git a/metricbeat/module/kafka/consumergroup/query.go b/metricbeat/module/kafka/consumergroup/query.go index 16f18055cb7..4d14b566019 100644 --- a/metricbeat/module/kafka/consumergroup/query.go +++ b/metricbeat/module/kafka/consumergroup/query.go @@ -29,6 +29,7 @@ type client interface { ListGroups() ([]string, error) DescribeGroups(group []string) (map[string]kafka.GroupDescription, error) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) + GetPartitionOffsetFromTheLeader(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) } func fetchGroupInfo( @@ -113,12 +114,12 @@ func fetchGroupInfo( for topic, partitions := range ret.off.Blocks { for partition, info := range partitions { - partitionOffset, err := getPartitonOffsetFromTheLeader(topic, partition, cfg, brokerAddr) + partitionOffset, err := getPartitionOffsetFromTheLeader(b, topic, partition, cfg, brokerAddr) if err != nil { logp.Err("failed to fetch offset for (topic, partition): ('%v', %v)", topic, partition) continue } - consumerLag := info.Offset - partitionOffset + consumerLag := partitionOffset - info.Offset if consumerLag < 0 { consumerLag = 0 } @@ -143,7 +144,6 @@ func fetchGroupInfo( } } } - emit(event) } } @@ -155,12 +155,8 @@ func fetchGroupInfo( return err } -func getPartitonOffsetFromTheLeader(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) { - client, err := sarama.NewClient([]string{brokerAddr}, cfg) - if err != nil { - return -1, err - } - offset, err := client.GetOffset(topic, partitionID, sarama.OffsetNewest) +func getPartitionOffsetFromTheLeader(b client, topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) { + offset, err := b.GetPartitionOffsetFromTheLeader(topic, partitionID, cfg, brokerAddr) if err != nil { return -1, err } diff --git a/metricbeat/module/kafka/consumergroup/query_test.go b/metricbeat/module/kafka/consumergroup/query_test.go index c18922687cb..0437c353f3c 100644 --- a/metricbeat/module/kafka/consumergroup/query_test.go +++ b/metricbeat/module/kafka/consumergroup/query_test.go @@ -226,7 +226,7 @@ func TestFetchGroupInfo(t *testing.T) { for key, expected := range indexEvents(test.expected) { event, found := indexed[key] if !found { - t.Errorf("Missing event: %v", key) + t.Errorf("Missing key %v from events: %v", key, events) continue } assertEvent(t, expected, event) From 3f8a29ea2bc47bec2ae5a16506fbdc76fa02d95a Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 27 Nov 2019 17:52:05 +0200 Subject: [PATCH 3/9] Include consumer_lag in unit_test checks Signed-off-by: chrismark --- .../module/kafka/consumergroup/query_test.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/metricbeat/module/kafka/consumergroup/query_test.go b/metricbeat/module/kafka/consumergroup/query_test.go index 0437c353f3c..bc7e2516df4 100644 --- a/metricbeat/module/kafka/consumergroup/query_test.go +++ b/metricbeat/module/kafka/consumergroup/query_test.go @@ -70,35 +70,35 @@ func TestFetchGroupInfo(t *testing.T) { expected: []common.MapStr{ testEvent("group1", "topic1", 0, common.MapStr{ "client": clientMeta(0), - "offset": int64(10), + "offset": int64(10), "consumer_lag": int64(42) - int64(10), }), testEvent("group1", "topic1", 1, common.MapStr{ "client": clientMeta(1), - "offset": int64(11), + "offset": int64(11), "consumer_lag": int64(42) - int64(11), }), testEvent("group1", "topic1", 2, common.MapStr{ "client": clientMeta(0), - "offset": int64(12), + "offset": int64(12), "consumer_lag": int64(42) - int64(12), }), testEvent("group1", "topic3", 0, common.MapStr{ "client": clientMeta(1), - "offset": int64(6), + "offset": int64(6), "consumer_lag": int64(42) - int64(6), }), testEvent("group1", "topic3", 1, common.MapStr{ "client": clientMeta(0), - "offset": int64(7), + "offset": int64(7), "consumer_lag": int64(42) - int64(7), }), testEvent("group2", "topic2", 0, common.MapStr{ "client": clientMeta(0), - "offset": int64(3), + "offset": int64(3), "consumer_lag": int64(42) - int64(3), }), testEvent("group2", "topic3", 0, common.MapStr{ "client": clientMeta(0), - "offset": int64(9), + "offset": int64(9), "consumer_lag": int64(42) - int64(9), }), testEvent("group2", "topic3", 1, common.MapStr{ "client": clientMeta(0), - "offset": int64(10), + "offset": int64(10), "consumer_lag": int64(42) - int64(10), }), }, }, @@ -130,11 +130,11 @@ func TestFetchGroupInfo(t *testing.T) { expected: []common.MapStr{ testEvent("group1", "topic1", 0, common.MapStr{ "client": clientMeta(0), - "offset": int64(1), + "offset": int64(1), "consumer_lag": int64(42) - int64(1), }), testEvent("group1", "topic1", 1, common.MapStr{ "client": clientMeta(0), - "offset": int64(2), + "offset": int64(2), "consumer_lag": int64(42) - int64(2), }), }, }, From 2eec175f057127f38390b2dcae1fb354ba47c10a Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 27 Nov 2019 18:15:25 +0200 Subject: [PATCH 4/9] Refactor methods to reuse struct fields Signed-off-by: chrismark --- metricbeat/module/kafka/broker.go | 9 ++------- .../module/kafka/consumergroup/consumergroup.go | 2 +- metricbeat/module/kafka/consumergroup/mock_test.go | 14 ++++---------- metricbeat/module/kafka/consumergroup/query.go | 10 +++++----- .../module/kafka/consumergroup/query_test.go | 4 +--- 5 files changed, 13 insertions(+), 26 deletions(-) diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index f740f8673cb..5942333ef1d 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -148,11 +148,6 @@ func (b *Broker) AdvertisedAddr() string { return b.advertisedAddr } -// BrokerCfg returns the client configuration attached to the broker -func (b *Broker) BrokerCfg() *sarama.Config { - return b.cfg -} - // GetMetadata fetches most recent cluster metadata from the broker. func (b *Broker) GetMetadata(topics ...string) (*sarama.MetadataResponse, error) { return queryMetadataWithRetry(b.broker, b.cfg, topics) @@ -276,8 +271,8 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32) } // GetPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader. -func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) { - client, err := sarama.NewClient([]string{brokerAddr}, cfg) +func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) { + client, err := sarama.NewClient([]string{b.Addr()}, b.cfg) if err != nil { return -1, err } diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go index d56000f95ae..f9b1397a497 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -111,7 +111,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { MetricSetFields: event, }) } - err = fetchGroupInfo(emitEvent, broker, m.groups.pred(), m.topics.pred(), broker.BrokerCfg(), broker.Addr()) + err = fetchGroupInfo(emitEvent, broker, m.groups.pred(), m.topics.pred()) if err != nil { return errors.Wrap(err, "error in fetch") } diff --git a/metricbeat/module/kafka/consumergroup/mock_test.go b/metricbeat/module/kafka/consumergroup/mock_test.go index 7a8e958406f..3755cc10f9f 100644 --- a/metricbeat/module/kafka/consumergroup/mock_test.go +++ b/metricbeat/module/kafka/consumergroup/mock_test.go @@ -30,7 +30,7 @@ type mockClient struct { listGroups func() ([]string, error) describeGroups func(group []string) (map[string]kafka.GroupDescription, error) fetchGroupOffsets func(group string) (*sarama.OffsetFetchResponse, error) - getPartitionOffsetFromTheLeader func(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) + getPartitionOffsetFromTheLeader func(topic string, partitionID int32) (int64, error) } type mockState struct { @@ -46,7 +46,7 @@ func defaultMockClient(state mockState) *mockClient { listGroups: makeListGroups(state), describeGroups: makeDescribeGroups(state), fetchGroupOffsets: makeFetchGroupOffsets(state), - getPartitionOffsetFromTheLeader: func(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) { + getPartitionOffsetFromTheLeader: func(topic string, partitionID int32) (int64, error) { return 42, nil }, } @@ -57,12 +57,6 @@ func (c *mockClient) with(fn func(*mockClient)) *mockClient { return c } -func makePartitonOffset(state mockState) func() (int64, error) { - return func() (int64, error) { - return 42, nil - } -} - func makeListGroups(state mockState) func() ([]string, error) { names := make([]string, 0, len(state.groups)) for name := range state.groups { @@ -155,6 +149,6 @@ func (c *mockClient) DescribeGroups(groups []string) (map[string]kafka.GroupDesc func (c *mockClient) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) { return c.fetchGroupOffsets(group) } -func (c *mockClient) GetPartitionOffsetFromTheLeader(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) { - return c.getPartitionOffsetFromTheLeader(topic, partitionID, cfg, brokerAddr) +func (c *mockClient) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) { + return c.getPartitionOffsetFromTheLeader(topic, partitionID) } diff --git a/metricbeat/module/kafka/consumergroup/query.go b/metricbeat/module/kafka/consumergroup/query.go index 4d14b566019..11426eb5b7f 100644 --- a/metricbeat/module/kafka/consumergroup/query.go +++ b/metricbeat/module/kafka/consumergroup/query.go @@ -29,13 +29,13 @@ type client interface { ListGroups() ([]string, error) DescribeGroups(group []string) (map[string]kafka.GroupDescription, error) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) - GetPartitionOffsetFromTheLeader(topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) + GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) } func fetchGroupInfo( emit func(common.MapStr), b client, - groupsFilter, topicsFilter func(string) bool, cfg *sarama.Config, brokerAddr string, + groupsFilter, topicsFilter func(string) bool, ) error { type result struct { err error @@ -114,7 +114,7 @@ func fetchGroupInfo( for topic, partitions := range ret.off.Blocks { for partition, info := range partitions { - partitionOffset, err := getPartitionOffsetFromTheLeader(b, topic, partition, cfg, brokerAddr) + partitionOffset, err := getPartitionOffsetFromTheLeader(b, topic, partition) if err != nil { logp.Err("failed to fetch offset for (topic, partition): ('%v', %v)", topic, partition) continue @@ -155,8 +155,8 @@ func fetchGroupInfo( return err } -func getPartitionOffsetFromTheLeader(b client, topic string, partitionID int32, cfg *sarama.Config, brokerAddr string) (int64, error) { - offset, err := b.GetPartitionOffsetFromTheLeader(topic, partitionID, cfg, brokerAddr) +func getPartitionOffsetFromTheLeader(b client, topic string, partitionID int32) (int64, error) { + offset, err := b.GetPartitionOffsetFromTheLeader(topic, partitionID) if err != nil { return -1, err } diff --git a/metricbeat/module/kafka/consumergroup/query_test.go b/metricbeat/module/kafka/consumergroup/query_test.go index bc7e2516df4..534fe81fdeb 100644 --- a/metricbeat/module/kafka/consumergroup/query_test.go +++ b/metricbeat/module/kafka/consumergroup/query_test.go @@ -23,8 +23,6 @@ import ( "reflect" "testing" - "github.com/Shopify/sarama" - "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" @@ -211,7 +209,7 @@ func TestFetchGroupInfo(t *testing.T) { groups := makeNameSet(test.groups...).pred() topics := makeNameSet(test.topics...).pred() - err := fetchGroupInfo(collectEvents, test.client, groups, topics, sarama.NewConfig(), "somehost") + err := fetchGroupInfo(collectEvents, test.client, groups, topics) if err != nil { switch { case test.err == nil: From 85a29aa3d14b38958c672103b7b7497163007eee Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 28 Nov 2019 11:01:18 +0200 Subject: [PATCH 5/9] Close client connection Signed-off-by: chrismark --- metricbeat/module/kafka/broker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index 5942333ef1d..c3c610e25b1 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -280,6 +280,7 @@ func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32 if err != nil { return -1, err } + defer client.Close() return offset, nil } From 0f6a8eff9a5347858dce1b7f9df2eb9b72617251 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 28 Nov 2019 12:11:09 +0200 Subject: [PATCH 6/9] Store cluster client in Broker struct for reusability Signed-off-by: chrismark --- metricbeat/module/kafka/broker.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index c3c610e25b1..8a49efc5253 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -43,6 +43,7 @@ func Version(version string) kafka.Version { type Broker struct { broker *sarama.Broker cfg *sarama.Config + client sarama.Client advertisedAddr string id int32 @@ -96,6 +97,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker { return &Broker{ broker: sarama.NewBroker(host), cfg: cfg, + client: nil, id: noID, matchID: settings.MatchID, } @@ -104,6 +106,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker { // Close the broker connection func (b *Broker) Close() error { closeBroker(b.broker) + b.client.Close() return nil } @@ -134,6 +137,13 @@ func (b *Broker) Connect() error { debugf("found matching broker %v with id %v", other.Addr(), other.ID()) b.id = other.ID() b.advertisedAddr = other.Addr() + + c, err := getClusteWideClient(b.Addr(), b.cfg) + if err != nil { + closeBroker(b.broker) + return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr()) + } + b.client = c return nil } @@ -272,15 +282,10 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32) // GetPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader. func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) { - client, err := sarama.NewClient([]string{b.Addr()}, b.cfg) - if err != nil { - return -1, err - } - offset, err := client.GetOffset(topic, partitionID, sarama.OffsetNewest) + offset, err := b.client.GetOffset(topic, partitionID, sarama.OffsetNewest) if err != nil { return -1, err } - defer client.Close() return offset, nil } @@ -538,6 +543,14 @@ func anyIPsMatch(as, bs []net.IP) bool { return false } +func getClusteWideClient(addr string, cfg *sarama.Config) (sarama.Client, error) { + client, err := sarama.NewClient([]string{addr}, cfg) + if err != nil { + return nil, err + } + return client, nil +} + func brokerAddresses(brokers []*sarama.Broker) []string { addresses := make([]string, len(brokers)) for i, b := range brokers { From 6500bf702e280ab9c26fd8b10c59bbbc1ea1ce6e Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 28 Nov 2019 12:28:15 +0200 Subject: [PATCH 7/9] Update data.json Signed-off-by: chrismark --- .../kafka/consumergroup/_meta/data.json | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/metricbeat/module/kafka/consumergroup/_meta/data.json b/metricbeat/module/kafka/consumergroup/_meta/data.json index 55cc9cefda8..2bce75648f3 100644 --- a/metricbeat/module/kafka/consumergroup/_meta/data.json +++ b/metricbeat/module/kafka/consumergroup/_meta/data.json @@ -1,45 +1,49 @@ { "@timestamp": "2017-10-12T08:05:34.853Z", - "beat": { - "hostname": "host.example.com", - "name": "host.example.com" + "event": { + "dataset": "kafka.consumergroup", + "duration": 115000, + "module": "kafka" }, "kafka": { "broker": { - "address": "172.18.0.2:9092", + "address": "localhost:32768", "id": 0 }, "consumergroup": { "broker": { - "address": "172.18.0.2:9092", + "address": "localhost:32768", "id": 0 }, "client": { - "host": "172.18.0.1", - "id": "sarama", - "member_id": "sarama-fcb5a5db-0474-4f3a-a5af-29e2f14549c5" + "host": "127.0.0.1", + "id": "consumer-1", + "member_id": "consumer-1-a12ac7d4-00aa-45a0-8b35-0a9c6e880bf4" }, + "consumer_lag": 1059, "error": { "code": 0 }, - "id": "test-group", + "id": "console-consumer-50413", "meta": "", "offset": -1, "partition": 0, - "topic": "metricbeat-test" + "topic": "test" }, "partition": { "id": 0, - "topic_id": "0-metricbeat-test" + "topic_id": "0-test" }, "topic": { - "name": "metricbeat-test" + "name": "test" } }, "metricset": { - "host": "kafka:9092", - "module": "kafka", "name": "consumergroup", - "rtt": 115 + "period": 10000 + }, + "service": { + "address": "localhost:32768", + "type": "kafka" } -} +} \ No newline at end of file From 7061004c6fd31578157ad04765b850baab9d4dd5 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 28 Nov 2019 15:06:42 +0200 Subject: [PATCH 8/9] Review fixes Signed-off-by: chrismark --- metricbeat/docs/fields.asciidoc | 2 +- metricbeat/module/kafka/broker.go | 10 ++-- .../kafka/consumergroup/_meta/fields.yml | 2 +- .../kafka/consumergroup/consumergroup.go | 1 - .../module/kafka/consumergroup/mock_test.go | 2 +- .../module/kafka/consumergroup/query.go | 7 +-- .../module/kafka/consumergroup/query_test.go | 50 +++++++++++-------- metricbeat/module/kafka/fields.go | 2 +- 8 files changed, 41 insertions(+), 35 deletions(-) diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index f2c0c44d88f..086c928db71 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -15794,7 +15794,7 @@ type: keyword *`kafka.consumergroup.consumer_lag`*:: + -- -consumer lag for partition/topic +consumer lag for partition/topic calculated as the difference between the partition offset and consumer offset type: long diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index 8a49efc5253..1b5c446007b 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -138,7 +138,7 @@ func (b *Broker) Connect() error { b.id = other.ID() b.advertisedAddr = other.Addr() - c, err := getClusteWideClient(b.Addr(), b.cfg) + c, err := getClusterWideClient(b.Addr(), b.cfg) if err != nil { closeBroker(b.broker) return fmt.Errorf("Could not get cluster client for advertised broker with address %v", b.Addr()) @@ -280,8 +280,8 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32) return b.broker.FetchOffset(requ) } -// GetPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader. -func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) { +// FetchPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader. +func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) { offset, err := b.client.GetOffset(topic, partitionID, sarama.OffsetNewest) if err != nil { return -1, err @@ -289,7 +289,7 @@ func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32 return offset, nil } -// ID returns the broker or -1 if the broker id is unknown. +// ID returns the broker ID or -1 if the broker id is unknown. func (b *Broker) ID() int32 { if b.id == noID { return b.broker.ID() @@ -543,7 +543,7 @@ func anyIPsMatch(as, bs []net.IP) bool { return false } -func getClusteWideClient(addr string, cfg *sarama.Config) (sarama.Client, error) { +func getClusterWideClient(addr string, cfg *sarama.Config) (sarama.Client, error) { client, err := sarama.NewClient([]string{addr}, cfg) if err != nil { return nil, err diff --git a/metricbeat/module/kafka/consumergroup/_meta/fields.yml b/metricbeat/module/kafka/consumergroup/_meta/fields.yml index 4d7b2cc4c27..0f197d3c178 100644 --- a/metricbeat/module/kafka/consumergroup/_meta/fields.yml +++ b/metricbeat/module/kafka/consumergroup/_meta/fields.yml @@ -45,7 +45,7 @@ - name: consumer_lag type: long - description: consumer lag for partition/topic + description: consumer lag for partition/topic calculated as the difference between the partition offset and consumer offset - name: error.code type: long diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go index f9b1397a497..045276ed9d5 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -80,7 +80,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch consumer group metrics from kafka func (m *MetricSet) Fetch(r mb.ReporterV2) error { broker, err := m.Connect() - if err != nil { return errors.Wrap(err, "error in connect") } diff --git a/metricbeat/module/kafka/consumergroup/mock_test.go b/metricbeat/module/kafka/consumergroup/mock_test.go index 3755cc10f9f..030af091bff 100644 --- a/metricbeat/module/kafka/consumergroup/mock_test.go +++ b/metricbeat/module/kafka/consumergroup/mock_test.go @@ -149,6 +149,6 @@ func (c *mockClient) DescribeGroups(groups []string) (map[string]kafka.GroupDesc func (c *mockClient) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) { return c.fetchGroupOffsets(group) } -func (c *mockClient) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) { +func (c *mockClient) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) { return c.getPartitionOffsetFromTheLeader(topic, partitionID) } diff --git a/metricbeat/module/kafka/consumergroup/query.go b/metricbeat/module/kafka/consumergroup/query.go index 11426eb5b7f..8b9b98e3ec0 100644 --- a/metricbeat/module/kafka/consumergroup/query.go +++ b/metricbeat/module/kafka/consumergroup/query.go @@ -29,7 +29,7 @@ type client interface { ListGroups() ([]string, error) DescribeGroups(group []string) (map[string]kafka.GroupDescription, error) FetchGroupOffsets(group string, partitions map[string][]int32) (*sarama.OffsetFetchResponse, error) - GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) + FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) } func fetchGroupInfo( @@ -120,9 +120,6 @@ func fetchGroupInfo( continue } consumerLag := partitionOffset - info.Offset - if consumerLag < 0 { - consumerLag = 0 - } event := common.MapStr{ "id": ret.group, "topic": topic, @@ -156,7 +153,7 @@ func fetchGroupInfo( } func getPartitionOffsetFromTheLeader(b client, topic string, partitionID int32) (int64, error) { - offset, err := b.GetPartitionOffsetFromTheLeader(topic, partitionID) + offset, err := b.FetchPartitionOffsetFromTheLeader(topic, partitionID) if err != nil { return -1, err } diff --git a/metricbeat/module/kafka/consumergroup/query_test.go b/metricbeat/module/kafka/consumergroup/query_test.go index 534fe81fdeb..febf616b67e 100644 --- a/metricbeat/module/kafka/consumergroup/query_test.go +++ b/metricbeat/module/kafka/consumergroup/query_test.go @@ -67,36 +67,44 @@ func TestFetchGroupInfo(t *testing.T) { }), expected: []common.MapStr{ testEvent("group1", "topic1", 0, common.MapStr{ - "client": clientMeta(0), - "offset": int64(10), "consumer_lag": int64(42) - int64(10), + "client": clientMeta(0), + "offset": int64(10), + "consumer_lag": int64(42) - int64(10), }), testEvent("group1", "topic1", 1, common.MapStr{ - "client": clientMeta(1), - "offset": int64(11), "consumer_lag": int64(42) - int64(11), + "client": clientMeta(1), + "offset": int64(11), + "consumer_lag": int64(42) - int64(11), }), testEvent("group1", "topic1", 2, common.MapStr{ - "client": clientMeta(0), - "offset": int64(12), "consumer_lag": int64(42) - int64(12), + "client": clientMeta(0), + "offset": int64(12), + "consumer_lag": int64(42) - int64(12), }), testEvent("group1", "topic3", 0, common.MapStr{ - "client": clientMeta(1), - "offset": int64(6), "consumer_lag": int64(42) - int64(6), + "client": clientMeta(1), + "offset": int64(6), + "consumer_lag": int64(42) - int64(6), }), testEvent("group1", "topic3", 1, common.MapStr{ - "client": clientMeta(0), - "offset": int64(7), "consumer_lag": int64(42) - int64(7), + "client": clientMeta(0), + "offset": int64(7), + "consumer_lag": int64(42) - int64(7), }), testEvent("group2", "topic2", 0, common.MapStr{ - "client": clientMeta(0), - "offset": int64(3), "consumer_lag": int64(42) - int64(3), + "client": clientMeta(0), + "offset": int64(3), + "consumer_lag": int64(42) - int64(3), }), testEvent("group2", "topic3", 0, common.MapStr{ - "client": clientMeta(0), - "offset": int64(9), "consumer_lag": int64(42) - int64(9), + "client": clientMeta(0), + "offset": int64(9), + "consumer_lag": int64(42) - int64(9), }), testEvent("group2", "topic3", 1, common.MapStr{ - "client": clientMeta(0), - "offset": int64(10), "consumer_lag": int64(42) - int64(10), + "client": clientMeta(0), + "offset": int64(10), + "consumer_lag": int64(42) - int64(10), }), }, }, @@ -127,12 +135,14 @@ func TestFetchGroupInfo(t *testing.T) { topics: []string{"topic1"}, expected: []common.MapStr{ testEvent("group1", "topic1", 0, common.MapStr{ - "client": clientMeta(0), - "offset": int64(1), "consumer_lag": int64(42) - int64(1), + "client": clientMeta(0), + "offset": int64(1), + "consumer_lag": int64(42) - int64(1), }), testEvent("group1", "topic1", 1, common.MapStr{ - "client": clientMeta(0), - "offset": int64(2), "consumer_lag": int64(42) - int64(2), + "client": clientMeta(0), + "offset": int64(2), + "consumer_lag": int64(42) - int64(2), }), }, }, diff --git a/metricbeat/module/kafka/fields.go b/metricbeat/module/kafka/fields.go index d3c8d5ca1eb..38e2bfb076b 100644 --- a/metricbeat/module/kafka/fields.go +++ b/metricbeat/module/kafka/fields.go @@ -32,5 +32,5 @@ func init() { // AssetKafka returns asset data. // This is the base64 encoded gzipped contents of ../metricbeat/module/kafka. func AssetKafka() string { - return "eJzUWs2O3DYSvs9TFHwaHyyfdg9zWGDXXgQT27HhOECQi8AmS93MSKRMUj3TfvqApKTWL0W1epx4TtOSqr6PxWKxWMVX8ICnO3gg2QO5ATDc5HgHL97Z3y9uABhqqnhpuBR38J8bAAD3DgrJqhxvAPRBKpNSKTK+v4OM5No+VZgj0XgHe6s245gzfefEX4EgBZ4h7Z85lfZTJauyfjKB21fTVbVT8gFV+3hK36xO//c/pwHeSKGrAhX8ZEXhXmRSFcQKwIEcEXaIAhQSBpmSBdzWYgciWM7FvqfSHBBoo89ReZl0PhiOpTseznqPm/HkcgARHFJnWJzdTOIQxhRqPQn2gKdHqYZE4vAIO6IyXCNrIUZzZmTJaWL/H83bGDoA+8XqcTrnMFApqRIq2RhpYNFFGKcKrKpkjFYSZbiVTXrztxbpU6MGOAuiuNGlE1hh+/XAfhP8a4XAGcjMeWx5RhfugbfhMg+/Br8PHSCCuV8eNLlKQKh9t0CjONV+gftQV7/5+cPvHdk2wO3QkMh1XeyQiNCK+mA/AHMgBsyBa8AjCgNcWzRikIGR0Yu1AVX4tUJtEnogQmCefK2wwkTzbxhi8uWAYL9pJqLWAk46LjoNCZRKsopikhGeI0tLVKlGKkUwxlgeihjHwwtCrafRq6FEBZOaPLEsl8QEmWVo6OFyXjTndpqcltZQVlul8Ars+nZbIiWqYocqYK4r2CieQ9A0q5mUOaduN05yJAxVijlS+3uoasTIfw/N927qNsBXguZIRLqWRi13DToatbZUvkn5gFiiShjXVAqB1CzR+EPKd04GaC7tLl0r2+CsYzr4VHK1GGPOVPz3z8PFpmxS5Kd4No3Es9DRJ0HXzJFbQ/XcbuOSy32S5ZU+pBMuN141cg/u60sctE7w0CRcJLuTQd2E1iVYLqgsuNiDlfJR1g7YKbyYhKzMOhayMnt5bRYK/0RqkK2j0khdjUqBWpM96pQH05HeZNQy2+Cv4w4XgF5h+i9AvdZ0r4TeOr0RcA1Uc8Jdl2u35+yJbLt994Pm2y7XiQqvBRe8qAq/ooiBxwOnh37dQKNgup8+aTASyPiIE+MY3g1r7YtpHDmisi5xTuecfMOOQSYVENAlUp5xWp/NNuS7VCq2hV6t4UzwzGWS60qCawNXcz5orObWmT3Hyt4kr13c5CnNSbAQ5JyLPDnnal1pLLOE1CYsKZVFwUdHh9kByyzTaDMWJ2XH22YzKym4IuF2+HedWmO0neODaHsQPIc1H0z9A/flBTF1GEJXVzXnFPWLs81fKJIGos1QuedUKqQ2gt7Bv5N/hew3W0S8Zi0WluuxcxaAUF0WQrXZiKFCr0bbPJnFn67XwkLNdh2PIcZ0JJyO7RE14+Gsvg0DzYfpOaxox+sUjoMU2hJkXNVrFYdzrXfJED6mxVPogLROX8dFLozsFFZ3aLclu5DCDIp+3gVrZp1W2siiGx8NAUYMAW1Ud7FOIjdiE9veSgvkZO8SgXb0r/sONgk/2T+IBx+vNreptaRen6ei214IW8SVBFaE4yCf/2rN9wJZU2mwvmB9wmXDdW4+two2hMzIaPXGk7p/C7fecBqNsfQ824Szl8uR8yD10FyXEumpmgUs0Cal6fbhc2FQCZIP9q4aoBs3QgFrdeowpWR92hCIWpf46ZHwnOxyrPXqpjmx50cUnY7USh8V+IgB97h8a//FKW7i7rClNaTZMVvOnofQR6d4mdAFG+EF83ne/exW8D2SsdDGHEEYRu3Z5uksS1/pf4apfO9bCJzBrU/OR9lsx1R6nsGmnPE9d95kAYAzPc+gbp48gx0+e83Thpi3iNAnQdMlWjsp83EJKZLZvWDcepsGnjUGAK6BC5pXDFnT0ubilSXTNpjQ7nBwe//r56iR6HTBx55lEKZtqi1TnE2g4Arz//82Z/KJiquV2fQgNqwFLokE+W0645rRdZJ5aoM7MnC9I9CYV+yhqL54seai0vaKAJ8uuNZcLrnMtJ3T7HF5nJb5PvzKSvmnWmqqUt6++0Er5aTJ59JdlWWoUlcoDbH44m4AGZIDKWQl3N7jZW0+LNWwwbtYDyeGHlLNv2FKjosl1bl6uJ47i0UBF+RpCbip5UYDB+7iUKlYqlGwqBbFfI3dYm+t9acKjTpdTMQojqxWVXdKthJyEfkfRMj3GurOz988WWuXSWOH8U2zNYArlscS4NINtah5Pxu3CehXuYimSyk0Xs7Ay2+gwGX6SPiij7WQ968/ghUAw2fSk3ms1c34fifN9+VlZVwtz3RYreRRt2mirF6eN+KoRvlfAQAA//93rco+" + return "eJzUWs2O3DYSvs9TFHwaHyyfdg9zWGDXXgQT27HhOECQi8AmS93MSKRMUj3TfvqApKTWL0W1epx4TtOSqr6PxWKxWMVX8ICnO3gg2QO5ATDc5HgHL97Z3y9uABhqqnhpuBR38J8bAAD3DgrJqhxvAPRBKpNSKTK+v4OM5No+VZgj0XgHe6s245gzfefEX4EgBZ4h7Z85lfZTJauyfjKB21fTVbVT8gFV+3hK36xO//c/pwHeSKGrAhX8ZEXhXmRSFcQKwIEcEXaIAhQSBpmSBdzWYgciWM7FvqfSHBBoo89ReZl0PhiOpTseznqPm/HkcgARHFJnWJzdTOIQxhRqPQn2gKdHqYZE4vAIO6IyXCNrIUZzZmTJaWL/H83bGDoA+8XqcTrnMFApqRIq2RhpYNFFGKcKrKpkjFYSZbiVTXrztxbpU6MGOAuiuNGlE1hh+/XAfhP8a4XAGcjMeWx5RhfugbfhMg+/Br8PHSCCuV8eNLlKQKh9t0CjONV+gftQV7/5+cPvHdk2wO3QkMh1XeyQiNCK+mA/AHMgBsyBa8AjCgNcWzRikIGR0Yu1AVX4tUJtEnogQmCefK2wwkTzbxhi8uWAYL9pJqLWAk46LjoNCZRKsopikhGeI0tLVKlGKkUwxlgeihjHwwtCrafRq6FEBZOaPLEsl8QEmWVo6OFyXjTndpqcltZQVlul8Ars+nZbIiWqYocqYK4r2CieQ9A0q5mUOaduN05yJAxVijlS+3uoasTIfw/N927qNsBXguZIRLqWRi13DToatbZUvkn5gFiiShjXVAqB1CzR+EPKd04GaC7tLl0r2+CsYzr4VHK1GGPOVPz3z8PFpmxS5Kd4No3Es9DRJ0HXzJFbQ/XcbuOSy32S5ZU+pBMuN141cg/u60sctE7w0CRcJLuTQd2E1iVYLqgsuNiDlfJR1g7YKbyYhKzMOhayMnt5bRYK/0RqkK2j0khdjUqBWpM96pQH05HeZNQy2+Cv4w4XgF5h+i9AvdZ0r4TeOr0RcA1Uc8Jdl2u35+yJbLt994Pm2y7XiQqvBRe8qAq/ooiBxwOnh37dQKNgup8+aTASyPiIE+MY3g1r7YtpHDmisi5xTuecfMOOQSYVENAlUp5xWp/NNuS7VCq2hV6t4UzwzGWS60qCawNXcz5orObWmT3Hyt4kr13c5CnNSbAQ5JyLPDnnal1pLLOE1CYsKZVFwUdHh9kByyzTaDMWJ2XH22YzKym4IuF2+HedWmO0neODaHsQPIc1H0z9A/flBTF1GEJXVzXnFPWLs81fKJIGos1QuedUKqQ2gt7Bv5N/hew3W0S8Zi0WluuxcxaAUF0WQrXZiKFCr0bbPJnFn67XwkLNdh2PIcZ0JJyO7RE14+Gsvg0DzYfpOaxox+sUjoMU2hJkXNVrFYdzrXfJED6mxVPogLROX8dFLozsFFZ3aLclu5DCDIp+3gVrZp1W2siiGx8NAUYMAW1Ud7FOIjdiE9veSgvkZO8SgXb0r33OQklOK5/xEe0iBeNZhgoFtYHGPNpY069H18Ykgg0NHB7MZDcifijjteu2yJbD6zPDbrMibF9XYFgR3IN8/qs13wtkTd3Cepb1MJdb15n+3JraEIAjY98bT+r+Ldx6w2k0xtLzbBPOXi7H4YPUQ3NdSqSnahawQJviptuHz4VBJUg+2AlrgG4UCoW/1YnIlJL1SUggBl7ip0fCc7LLsdarm1bHnh9RdPpbK31U4CMG3OPyROEXp7gJPMMG2ZBmx2w5ex5CH53iZUIXbKsXzOd5L7Uby/dI7ULbfARhGDV7m6ezLH3f4Bmm8r1vSHAGtz7VH+XGHVPpeQabMtD33HmTBQDO9DyDuhXzDHb47DVPG2LeIkKfBE2XaO2kzMcFqUhm94Jx620aeNYYALgGLmheMWRNg5yLV5ZM265Cu8PB7f2vn6NGotMFH3uWQZi2RbdMcTaBgivM///bnMknKq7yZtOD2LAWuHIS5LfpxGxGl1PmqQ1u3MD1DlRjXrFHrPoax5prT9vrC3y6fFtzueRq1HZOs4fvcVrmu/or6+6faqmpunv77getu5Mmn0t3lT27pa7sGmLxxd0nMiQHUshKuL3Hy9p8WKphu3ixuk4MPaSaf8OUHBcLtHPVdT13FosCLsjTEnBTGY4GDtzsoVKxVKNgUQ2P+Yq9xd7aOUgVGnW6mIhRHFmtqu67bCXkIvI/iJDvXNR9pL95stYuk8YO43trawBXLI8lwKX7blHzfjZuE9Cvcq1Nl1JovJyBl99Agcv0kfBFH2sh719/BCsAhs+kJ/NYq1v7/b6c7/LLyrjKoOmwWsmjbvpEWb08b8RRbfe/AgAA//9LpueC" } From 240c16e7839c104f8aece2f1b432e49c64d1a25e Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 28 Nov 2019 15:10:05 +0200 Subject: [PATCH 9/9] Add changelog Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6243b09ca30..96af2eeb330 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -350,6 +350,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `fingerprint` processor. {issue}11173[11173] {pull}14205[14205] - Add support for API keys in Elasticsearch outputs. {pull}14324[14324] - Ensure that init containers are no longer tailed after they stop {pull}14394[14394] +- Add consumer_lag in Kafka consumergroup metricset {pull}14822[14822] *Auditbeat*