Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer metricset #2977

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions libbeat/scripts/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ build-image: write-environment
start-environment: stop-environment
${DOCKER_COMPOSE} up -d

start-env:
${DOCKER_COMPOSE} run beat bash

.PHONY: stop-environment
stop-environment:
-${DOCKER_COMPOSE} stop
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/_meta/kibana/index-pattern/metricbeat.json

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions metricbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,3 @@ services:

zookeeper:
image: jplock/zookeeper:3.4.8


30 changes: 30 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,36 @@ kafka Module



[float]
== broker Fields

broker



[float]
=== kafka.broker.example

type: keyword

Example field


[float]
== consumer Fields

consumer



[float]
=== kafka.consumer.example

type: keyword

Example field


[float]
== partition Fields

Expand Down
8 changes: 8 additions & 0 deletions metricbeat/docs/modules/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ metricbeat.modules:

The following metricsets are available:

* <<metricbeat-metricset-kafka-broker,broker>>

* <<metricbeat-metricset-kafka-consumer,consumer>>

* <<metricbeat-metricset-kafka-partition,partition>>

include::kafka/broker.asciidoc[]

include::kafka/consumer.asciidoc[]

include::kafka/partition.asciidoc[]

19 changes: 19 additions & 0 deletions metricbeat/docs/modules/kafka/broker.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
////
This file is generated! See scripts/docs_collector.py
////

[[metricbeat-metricset-kafka-broker]]
include::../../../module/kafka/broker/_meta/docs.asciidoc[]


==== Fields

For a description of each field in the metricset, see the
<<exported-fields-kafka,exported fields>> section.

Here is an example document generated by this metricset:

[source,json]
----
include::../../../module/kafka/broker/_meta/data.json[]
----
19 changes: 19 additions & 0 deletions metricbeat/docs/modules/kafka/consumer.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
////
This file is generated! See scripts/docs_collector.py
////

[[metricbeat-metricset-kafka-consumer]]
include::../../../module/kafka/consumer/_meta/docs.asciidoc[]


==== Fields

For a description of each field in the metricset, see the
<<exported-fields-kafka,exported fields>> section.

Here is an example document generated by this metricset:

[source,json]
----
include::../../../module/kafka/consumer/_meta/data.json[]
----
2 changes: 2 additions & 0 deletions metricbeat/include/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
_ "github.com/elastic/beats/metricbeat/module/haproxy/info"
_ "github.com/elastic/beats/metricbeat/module/haproxy/stat"
_ "github.com/elastic/beats/metricbeat/module/kafka"
_ "github.com/elastic/beats/metricbeat/module/kafka/broker"
_ "github.com/elastic/beats/metricbeat/module/kafka/consumer"
_ "github.com/elastic/beats/metricbeat/module/kafka/partition"
_ "github.com/elastic/beats/metricbeat/module/mongodb"
_ "github.com/elastic/beats/metricbeat/module/mongodb/status"
Expand Down
18 changes: 18 additions & 0 deletions metricbeat/metricbeat.template-es2x.json
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,24 @@
},
"kafka": {
"properties": {
"broker": {
"properties": {
"example": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
}
}
},
"consumer": {
"properties": {
"example": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
}
}
},
"partition": {
"properties": {
"broker": {
Expand Down
16 changes: 16 additions & 0 deletions metricbeat/metricbeat.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,22 @@
},
"kafka": {
"properties": {
"broker": {
"properties": {
"example": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"consumer": {
"properties": {
"example": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"partition": {
"properties": {
"broker": {
Expand Down
19 changes: 19 additions & 0 deletions metricbeat/module/kafka/consumer/_meta/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"@timestamp":"2016-05-23T08:05:34.853Z",
"beat":{
"hostname":"beathost",
"name":"beathost"
},
"metricset":{
"host":"localhost",
"module":"mysql",
"name":"status",
"rtt":44269
},
"kafka":{
"consumer":{
"example": "consumer"
}
},
"type":"metricsets"
}
3 changes: 3 additions & 0 deletions metricbeat/module/kafka/consumer/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
=== kafka consumer MetricSet

This is the consumer metricset of the module kafka.
9 changes: 9 additions & 0 deletions metricbeat/module/kafka/consumer/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- name: consumer
type: group
description: >
consumer
fields:
- name: example
type: keyword
description: >
Example field
115 changes: 115 additions & 0 deletions metricbeat/module/kafka/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package consumer

import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/kafka"

"github.com/Shopify/sarama"
"github.com/wvanbergen/kazoo-go"
)

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
if err := mb.Registry.AddMetricSet("kafka", "consumer", New); err != nil {
panic(err)
}
}

// MetricSet type defines all fields of the MetricSet
// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with
// additional entries. These variables can be used to persist data or configuration between
// multiple fetch calls.
type MetricSet struct {
mb.BaseMetricSet
client sarama.Client
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

config := struct{}{}

if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

return &MetricSet{
BaseMetricSet: base,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {

var err error
m.client, err = kafka.GetClient(m.client, m)
if err != nil {
return nil, err
}

// Create config for it
hosts := []string{"kafka:2181"}

zookeeperClient, err := kazoo.NewKazoo(hosts, nil)
if err != nil {
return nil, err
}

groups, err := zookeeperClient.Consumergroups()
if err != nil {
return nil, err
}

topics, err := m.client.Topics()
if err != nil {
return nil, err
}

events := []common.MapStr{}
for _, group := range groups {
broker, err := m.client.Coordinator(group.Name)
if err != nil {
logp.Err("Broker error: %s", err)
continue
}

offsetRequest := &sarama.OffsetFetchRequest{
ConsumerGroup: group.Name,
Version: 0,
}
response, err := broker.FetchOffset(offsetRequest)
for _, topic := range topics {
partitions, err := m.client.Partitions(topic)
if err != nil {
logp.Err("Fetch partition info for topic %s: %s", topic, err)
}

for _, partition := range partitions {

// Could we use group.FetchOffset() instead?
offset := response.GetBlock(topic, partition)

event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": "consumer",
"partition": partition,
"topic": topic,
"group": group.Name,
"offset": offset,
}
events = append(events, event)
}
}
}

return events, nil
}
70 changes: 70 additions & 0 deletions metricbeat/module/kafka/consumer/consumer_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// +build integration

package consumer

import (
"testing"

mbtest "github.com/elastic/beats/metricbeat/mb/testing"
"github.com/elastic/beats/metricbeat/module/kafka"
)

func TestData(t *testing.T) {

kafka.GenerateKafkaData(t)

f := mbtest.NewEventsFetcher(t, getConfig())
err := mbtest.WriteEvents(f, t)
if err != nil {
t.Fatal("write", err)
}
}

func TestTopic(t *testing.T) {

// Create initial topic
kafka.GenerateKafkaData(t)

f := mbtest.NewEventsFetcher(t, getConfig())
dataBefore, err := f.Fetch()
if err != nil {
t.Fatal("write", err)
}

t.Errorf("%v", dataBefore)
/*

var n int64 = 10
var i int64 = 0
// Create n messages
for ; i < n; i++ {
kafka.GenerateKafkaData(t)
}

dataAfter, err := f.Fetch()
if err != nil {
t.Fatal("write", err)
}*/

// Checks that no new topics / partitions were added
//assert.True(t, len(dataBefore) == len(dataAfter))

// Compares offset before and after
/*offsetBefore := dataBefore[0]["offset"].(common.MapStr)["newest"].(int64)
offsetAfter := dataAfter[0]["offset"].(common.MapStr)["newest"].(int64)

if offsetBefore+n != offsetAfter {
t.Errorf("Offset before: %v", offsetBefore)
t.Errorf("Offset after: %v", offsetAfter)
}
assert.True(t, offsetBefore+n == offsetAfter)*/

}

func getConfig() map[string]interface{} {
return map[string]interface{}{
"module": "kafka",
"metricsets": []string{"consumer"},
"hosts": []string{kafka.GetTestKafkaHost()},
}
}
4 changes: 4 additions & 0 deletions metricbeat/module/kafka/consumer/glide.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package: github.com/elastic/beats/metricbeat/module/kafka/consumer
import:
- package: github.com/wvanbergen/kazoo-go
version: 968957352185472eacb69215fa3dbfcfdbac1096
Submodule kazoo-go added at 968957
Loading