Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka common fields at the module level #7767

Merged
merged 13 commits into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
*Metricbeat*

- Add metrics about cache size to memcached module {pull}7740[7740]
- Move common kafka fields (broker, topic and partition.id) to the module level to facilitate events correlation. Old fields will be removed in future versions {pull}7767[7767]

*Packetbeat*

Expand Down
29 changes: 29 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,35 @@ COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

--------------------------------------------------------------------
Dependency: github.com/bsm/sarama-cluster
Revision: 7e67d87a6b3f83fe08c096fd084691bd9dca112f
License type (autodetected): MIT
./vendor/github.com/bsm/sarama-cluster/LICENSE:
--------------------------------------------------------------------
(The MIT License)

Copyright (c) 2017 Black Square Media Ltd

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
'Software'), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

--------------------------------------------------------------------
Dependency: github.com/coreos/bbolt
Revision: af9db2027c98c61ecd8e17caa5bd265792b9b9a2
Expand Down
8 changes: 8 additions & 0 deletions dev-tools/generate_notice.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ def create_notice(filename, beat, copyright, vendor_dirs, csvfile, overrides=Non
re.sub(r"\s+", " ", """Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies."""),
re.sub(r"\s+", " ", """Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
'Software'), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
"""),
]

BSD_LICENSE_CONTENTS = [
Expand Down
88 changes: 88 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6964,6 +6964,63 @@ Kafka module



[float]
== broker fields

Broker Consumer Group Information have been read from (Broker handling the consumer group).



*`kafka.broker.id`*::
+
--
type: long

Broker id


--

*`kafka.broker.address`*::
+
--
type: keyword

Broker advertised address


--

*`kafka.topic.name`*::
+
--
type: keyword

Topic name


--

*`kafka.topic.error.code`*::
+
--
type: long

Topic error code.


--

*`kafka.partition.id`*::
+
--
type: long

Partition id.


--

[float]
== consumergroup fields

Expand Down Expand Up @@ -7010,6 +7067,9 @@ Consumer Group ID
*`kafka.consumergroup.topic`*::
+
--

deprecated[6.5]

type: keyword

Topic name
Expand All @@ -7019,6 +7079,9 @@ Topic name
*`kafka.consumergroup.partition`*::
+
--

deprecated[6.5]

type: long

Partition ID
Expand Down Expand Up @@ -7131,6 +7194,9 @@ Partition data.
*`kafka.partition.partition.id`*::
+
--

deprecated[6.5]

type: long

Partition id.
Expand Down Expand Up @@ -7176,6 +7242,16 @@ type: boolean
Indicates if replica is included in the in-sync replicate set (ISR).


--

*`kafka.partition.partition.is_leader`*::
+
--
type: boolean

Indicates if replica is the leader


--

*`kafka.partition.partition.error.code`*::
Expand All @@ -7191,6 +7267,9 @@ Error code from fetching partition.
*`kafka.partition.topic.error.code`*::
+
--

deprecated[6.5]

type: long

topic error code.
Expand All @@ -7201,6 +7280,9 @@ topic error code.
*`kafka.partition.topic.name`*::
+
--

deprecated[6.5]

type: keyword

Topic name
Expand All @@ -7211,6 +7293,9 @@ Topic name
*`kafka.partition.broker.id`*::
+
--

deprecated[6.5]

type: long

Broker id
Expand All @@ -7221,6 +7306,9 @@ Broker id
*`kafka.partition.broker.address`*::
+
--

deprecated[6.5]

type: keyword

Broker address
Expand Down
30 changes: 30 additions & 0 deletions metricbeat/module/kafka/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,33 @@
type: group
description: >
fields:
- name: broker
type: group
description: >
Broker Consumer Group Information have been read from (Broker handling
the consumer group).
fields:
- name: id
type: long
description: >
Broker id

- name: address
type: keyword
description: >
Broker advertised address

- name: topic.name
type: keyword
description: >
Topic name

- name: topic.error.code
type: long
description: >
Topic error code.

- name: partition.id
type: long
description: >
Partition id.
60 changes: 34 additions & 26 deletions metricbeat/module/kafka/consumergroup/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,36 +1,44 @@
{
"@timestamp":"2016-05-23T08:05:34.853Z",
"type":"metricsets",
"beat":{
"hostname":"localhost",
"name":"localhost",
"version": "6.0.0-alpha1"
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"metricset":{
"host":"localhost",
"module":"kafka",
"name":"consumergroup",
"rtt":269
},
"kafka":{
"consumergroup":{
"id": "group",
"topic": "test",
"partition": 0,
"client": {
"host": "127.0.0.1",
"id": "client0",
"member_id": "client0-d20b677a-5740-433e-a7f8-fbdab1f0f150"
},
"kafka": {
"broker": {
"address": "172.18.0.2:9092",
"id": 0
},
"consumergroup": {
"broker": {
"address": "kafka0:9092",
"address": "172.18.0.2:9092",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, in my laptop the broker address is different for consumergroup and partition metricsets, this is not good for correlations 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using advertised address in both metricsets now, so it can be more easily used for correlations.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, in my laptop the broker address is different for consumergroup and partition metricsets, this is not good for correlations

This is correct behavior. In kafka a broker can fulfill a many tasks. E.g. being the leader of a partition producers write to, or manage a consumer group. Consumer groups can be handled by very different brokers. Yet, the consumer group index used to store committed offsets from consumers is another internal "topic" potentially owned by yet another broker. State is very localized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the problem was that in one metricset we were using the address we use to connect, and on the other the advertised address, so the broker was the same, but different addresses were being collected.

"id": 0
},
"client": {
"host": "172.18.0.1",
"id": "sarama",
"member_id": "sarama-714cfb8b-39e5-4128-9109-e5056c5e8f56"
},
"error": {
"code": 0
"code": 0
},
"id": "test-group",
"meta": "",
"offset": 0
"offset": -1,
"partition": 0,
"topic": "metricbeat-test"
},
"partition": {
"id": 0
},
"topic": {
"name": "metricbeat-test"
}
},
"metricset": {
"host": "kafka:9092",
"module": "kafka",
"name": "consumergroup",
"rtt": 115
}
}
}
3 changes: 3 additions & 0 deletions metricbeat/module/kafka/consumergroup/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
fields:
- name: broker
type: group
deprecated: 6.5
description: >
Broker Consumer Group Information have been read from (Broker handling
the consumer group).
Expand All @@ -26,10 +27,12 @@

- name: topic
type: keyword
deprecated: 6.5
description: Topic name

- name: partition
type: long
deprecated: 6.5
description: Partition ID

- name: offset
Expand Down
38 changes: 26 additions & 12 deletions metricbeat/module/kafka/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package consumergroup
import (
"crypto/tls"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
Expand Down Expand Up @@ -95,25 +97,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}, nil
}

func (m *MetricSet) Fetch() ([]common.MapStr, error) {
// Fetch consumer group metrics from kafka
func (m *MetricSet) Fetch(r mb.ReporterV2) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method MetricSet.Fetch should have comment or be unexported

if err := m.broker.Connect(); err != nil {
logp.Err("broker connect failed: %v", err)
return nil, err
r.Error(errors.Wrap(err, "broker connection failed"))
return
}

b := m.broker
defer b.Close()
defer m.broker.Close()

brokerInfo := common.MapStr{
"id": b.ID(),
"address": b.AdvertisedAddr(),
"id": m.broker.ID(),
"address": m.broker.AdvertisedAddr(),
}

var events []common.MapStr
emitEvent := func(event common.MapStr) {
// TODO (deprecation): Remove fields from MetricSetFields moved to ModuleFields
event["broker"] = brokerInfo
events = append(events, event)
r.Event(mb.Event{
ModuleFields: common.MapStr{
"broker": brokerInfo,
"topic": common.MapStr{
"name": event["topic"],
},
"partition": common.MapStr{
"id": event["partition"],
},
},
MetricSetFields: event,
})
}
err := fetchGroupInfo(emitEvent, m.broker, m.groups.pred(), m.topics.pred())
if err != nil {
r.Error(err)
}
err := fetchGroupInfo(emitEvent, b, m.groups.pred(), m.topics.pred())
return events, err
}
Loading