Skip to content

Commit

Permalink
initial commit: added consumer offset metricset to kafka module
Browse files Browse the repository at this point in the history
  • Loading branch information
vas78 committed Nov 29, 2016
1 parent 86d6223 commit 0005802
Show file tree
Hide file tree
Showing 4 changed files with 392 additions and 0 deletions.
15 changes: 15 additions & 0 deletions metricbeat/module/kafka/consumer/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
=== kafka partition MetricSet

This is the consumer metricset of the module kafka.

==== Configuration

The consumer metricset fetches the Kafka consumer offsets and lags from Burrow. Mandatory are only host and port where
Burrow is running, consumers are optional. In case no consumers are specified the metricset fetches data for
consumers connected.


==== Metricset

The current implementation of the consumer metricset fetches data from one single Kafka cluster. Multiple clusters to
be supported in future versions.
109 changes: 109 additions & 0 deletions metricbeat/module/kafka/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package consumer

import (
"net/http"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"fmt"
"io/ioutil"
)

var (
debugf = logp.MakeDebug("kafka-consumer")
)

// init registers the partition MetricSet with the central registry.
func init() {
if err := mb.Registry.AddMetricSet("kafka", "consumer", New); err != nil {
panic(err)
}
}

// MetricSet type defines all fields of the partition MetricSet
type MetricSet struct {
mb.BaseMetricSet
client *http.Client
host string
cluster string
consumers []string
}

// New create a new instance of the partition MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

logp.Warn("EXPERIMENTAL: The kafka consumer metricset is experimental")

config := struct {
Host string `yaml:"host"`
Cluster string `yaml:"cluster"`
Consumers []string `yaml:"consumers"`
}{}

if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

return &MetricSet{
BaseMetricSet: base,
client: &http.Client{Timeout: base.Module().Config().Timeout},
host: config.Host,
consumers: config.Consumers,
cluster: config.Cluster,
}, nil
}

// Fetch partition stats list from kafka
func (m *MetricSet) Fetch() ([]common.MapStr, error) {

events := []common.MapStr{}
var connected_consumers []string
var err error
//fetch connected consumer groups if no consumers specified in the config
if len(m.consumers) == 0 {
debugf("No consumer groups found in config, fetching all consumer groups from Kafka")
connected_consumers, err = fetchConsumerGroups(m)
if err != nil {
return nil, fmt.Errorf("Error fetching connected consumer groups from Kafka: %#v", err)
}
} else {
debugf("Fetching consumer groups from config")
connected_consumers = m.consumers
}
if len(connected_consumers) == 0 {
return nil, fmt.Errorf("No consumer groups found in Kafka")
}
debugf("Consumer groups to be fetched: ", connected_consumers)
for _, consumer := range connected_consumers {
url := "http://" + m.host + "/v2/kafka/" + m.cluster + "/consumer/" + consumer + "/lag"
debugf("Fetching url: ", url)
req, err := http.NewRequest("GET", url, nil)
resp, err := m.client.Do(req)
if err != nil {
_ = fmt.Errorf("Error making http request: %#v", err)
continue
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
_ = fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status)
continue
}

resp_body, err := ioutil.ReadAll(resp.Body)
if err != nil {
_ = fmt.Errorf("Error converting response body: %#v", err)
continue
}

event, err := eventMapping(resp_body)
if err != nil {
continue
}

events = append(events, event)

}

return events, nil
}
140 changes: 140 additions & 0 deletions metricbeat/module/kafka/consumer/consumer_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// +build integration

package consumer

import (
"fmt"
"math/rand"
"os"
"strconv"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/elastic/beats/libbeat/common"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
"github.com/stretchr/testify/assert"
)

const (
kafkaDefaultHost = "localhost"
kafkaDefaultPort = "9092"
)

func TestData(t *testing.T) {

generateKafkaData(t, "metricbeat-generate-data")

f := mbtest.NewEventsFetcher(t, getConfig())
err := mbtest.WriteEvents(f, t)
if err != nil {
t.Fatal("write", err)
}
}

func TestTopic(t *testing.T) {

id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int())
testTopic := fmt.Sprintf("test-metricbeat-%s", id)

// Create initial topic
generateKafkaData(t, testTopic)

f := mbtest.NewEventsFetcher(t, getConfig())
dataBefore, err := f.Fetch()
if err != nil {
t.Fatal("write", err)
}

var n int64 = 10
var i int64 = 0
// Create n messages
for ; i < n; i++ {
generateKafkaData(t, testTopic)
}

dataAfter, err := f.Fetch()
if err != nil {
t.Fatal("write", err)
}

// Checks that no new topics / partitions were added
assert.True(t, len(dataBefore) == len(dataAfter))

var offsetBefore int64 = 0
var offsetAfter int64 = 0

// Its possible that other topics exists -> select the right data
for _, data := range dataBefore {
if data["topic"] == testTopic {
offsetBefore = data["offset"].(common.MapStr)["newest"].(int64)
}
}

for _, data := range dataAfter {
if data["topic"] == testTopic {
offsetAfter = data["offset"].(common.MapStr)["newest"].(int64)
}
}

// Compares offset before and after
if offsetBefore+n != offsetAfter {
t.Errorf("Offset before: %v", offsetBefore)
t.Errorf("Offset after: %v", offsetAfter)
}
assert.True(t, offsetBefore+n == offsetAfter)

}

func generateKafkaData(t *testing.T, topic string) {

config := sarama.NewConfig()
client, err := sarama.NewClient([]string{getTestKafkaHost()}, config)
if err != nil {
t.Errorf("%s", err)
}

producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
t.Error(err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("Hello World"),
}

_, _, err = producer.SendMessage(msg)
if err != nil {
t.Errorf("FAILED to send message: %s\n", err)
}

client.RefreshMetadata(topic)
}

func getConfig() map[string]interface{} {
return map[string]interface{}{
"module": "kafka",
"metricsets": []string{"partition"},
"hosts": []string{getTestKafkaHost()},
}
}

func getTestKafkaHost() string {
return fmt.Sprintf("%v:%v",
getenv("KAFKA_HOST", kafkaDefaultHost),
getenv("KAFKA_PORT", kafkaDefaultPort),
)
}

func getenv(name, defaultValue string) string {
return strDefault(os.Getenv(name), defaultValue)
}

func strDefault(a, defaults string) string {
if len(a) == 0 {
return defaults
}
return a
}
128 changes: 128 additions & 0 deletions metricbeat/module/kafka/consumer/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package consumer

import (
"github.com/elastic/beats/libbeat/common"
"fmt"
"io/ioutil"
"net/http"
"encoding/json"
"strconv"
)

type ConsumerLagsResponse struct {
Error bool `json:"error"`
Message string `json:"message"`
Status Status `json:"status"`
}

type Status struct {
Cluster string `json:"cluster"`
Group string `json:"group"`
Status string `json:"status"`
Complete bool `json:"complete"`
Partitions []PartitionStatus `json:"partitions"`
Partition_count int `json:"partition_count"`
Maxlag int `json:"maxlag"`
Totallag int `json:"totallag"`
}

type PartitionStatus struct {
Topic string `json:"topic"`
Partition int `json:"partition"`
Status string `json:"status"`
Start map[string]interface{} `json:"start"`
End map[string]interface{} `json:"end"`
}


// Map responseBody to common.MapStr
func eventMapping(responseBody []byte) (common.MapStr, error) {

debugf("Got reponse body: ", string(responseBody[:]))

var consumer_lags_response ConsumerLagsResponse
err := json.Unmarshal(responseBody, &consumer_lags_response)

if err != nil {
return nil, fmt.Errorf("Cannot unmarshal json response: %s", err)
}
debugf("Unmarshalled json: ", consumer_lags_response)
if consumer_lags_response.Error == true {
return nil, fmt.Errorf("Got error from Kafka: %s", consumer_lags_response.Message)
}

event := make(map[string]interface{})
event["cluster"] = consumer_lags_response.Status.Cluster
event["group"] = consumer_lags_response.Status.Group
event["status"] = consumer_lags_response.Status.Status
event["complete"] = consumer_lags_response.Status.Complete
event["partition_count"] = consumer_lags_response.Status.Partition_count
event["max_lag"] = consumer_lags_response.Status.Maxlag
event["total_lag"] = consumer_lags_response.Status.Totallag


for _, partition_status := range consumer_lags_response.Status.Partitions {
subelement_key := partition_status.Topic + "_" + strconv.Itoa(partition_status.Partition)
event[subelement_key] = partition_status
/*
if nested, exists := event[consumer_lags_response.Status.Group]; exists {
if nested, ok := nested.(map[string]interface{}); ok {
//add to existing map
nested[subelement_key] = partition_status
} else {
//debugf("The alias '%s' already exists and is not nested, skipping...", aliasStructure[0])
}
} else {
//init map and add value
event[consumer_lags_response.Status.Group] = map[string]interface{}{subelement_key: partition_status}
}
*/
}

return event, nil
}

func convertLagStausToEvent(status map[string]interface{}) {

}

type ConsumerGroupsResponse struct {
Error bool `json:"error"`
Message string `json:"message"`
Consumers []string `json:"consumers"`
}

//fetch all connected consumer groups
func fetchConsumerGroups(m *MetricSet) ([]string, error) {
list_consumer_url := "http://" + m.host + "/v2/kafka/" + m.cluster + "/consumer/"
debugf("Fetching consumer groups from: ", list_consumer_url)
req, err := http.NewRequest("GET", list_consumer_url, nil)
resp, err := m.client.Do(req)
if err != nil {
return nil, fmt.Errorf("Error making http request: %#v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status)
}

resp_body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("Error converting response body: %#v", err)
}
debugf("Got reponse body: ", string(resp_body[:]))

var consumer_groups ConsumerGroupsResponse
err = json.Unmarshal(resp_body, &consumer_groups)
if err != nil {
return nil, fmt.Errorf("Cannot unmarshal json response: %s", err)
}

debugf("Unmarshalled json: ", consumer_groups)

if consumer_groups.Error == true {
return nil, fmt.Errorf("Got error from Kafka: %s", consumer_groups.Message)
}

return consumer_groups.Consumers, nil
}

0 comments on commit 0005802

Please sign in to comment.