Skip to content

Commit

Permalink
Add integration/system tests for Kafka JMX metricsets (#14677)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored Nov 25, 2019
1 parent ba27b12 commit a5d86cb
Show file tree
Hide file tree
Showing 17 changed files with 531 additions and 21 deletions.
1 change: 1 addition & 0 deletions metricbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ services:
beat:
build: ${PWD}/.
environment:
- BEAT_STRICT_PERMS=false
- TEST_ENVIRONMENT=false
working_dir: /go/src/github.com/elastic/beats/metricbeat
volumes:
Expand Down
111 changes: 105 additions & 6 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15436,6 +15436,24 @@ type: float
--
*`kafka.broker.request.produce.failed`*::
+
--
The number of failed produce requests
type: float
--
*`kafka.broker.request.fetch.failed`*::
+
--
The number of client fetch request failures
type: float
--
*`kafka.broker.replication.leader_elections`*::
+
--
Expand Down Expand Up @@ -15499,7 +15517,43 @@ type: float
--
*`kafka.broker.topic.net.bytes_in`*::
*`kafka.broker.topic.net.in.bytes_per_sec`*::
+
--
The incoming byte rate per topic
type: float
--
*`kafka.broker.topic.net.out.bytes_per_sec`*::
+
--
The outgoing byte rate per topic
type: float
--
*`kafka.broker.topic.net.rejected.bytes_per_sec`*::
+
--
The rejected byte rate per topic
type: float
--
*`kafka.broker.topic.messages_in`*::
+
--
The incoming message rate per topic
type: float
--
*`kafka.broker.net.in.bytes_per_sec`*::
+
--
The incoming byte rate
Expand All @@ -15508,7 +15562,7 @@ type: float
--
*`kafka.broker.topic.net.bytes_out`*::
*`kafka.broker.net.out.bytes_per_sec`*::
+
--
The outgoing byte rate
Expand All @@ -15517,7 +15571,7 @@ type: float
--
*`kafka.broker.topic.net.bytes_rejected`*::
*`kafka.broker.net.rejected.bytes_per_sec`*::
+
--
The rejected byte rate
Expand All @@ -15526,7 +15580,7 @@ type: float
--
*`kafka.broker.topic.messages_in`*::
*`kafka.broker.messages_in`*::
+
--
The incoming message rate
Expand Down Expand Up @@ -15577,7 +15631,7 @@ type: float
--
*`kafka.consumer.bytes_in`*::
*`kafka.consumer.in.bytes_per_sec`*::
+
--
The rate of bytes coming in to the consumer
Expand All @@ -15586,6 +15640,42 @@ type: float
--
*`kafka.consumer.max_lag`*::
+
--
The maximum consumer lag
type: float
--
*`kafka.consumer.zookeeper_commits`*::
+
--
The rate of offset commits to ZooKeeper
type: float
--
*`kafka.consumer.kafka_commits`*::
+
--
The rate of offset commits to Kafka
type: float
--
*`kafka.consumer.messages_in`*::
+
--
The rate of consumer message consumption
type: float
--
[float]
=== consumergroup
Expand Down Expand Up @@ -16004,7 +16094,7 @@ type: float
--
*`kafka.producer.bytes_out`*::
*`kafka.producer.out.bytes_per_sec`*::
+
--
The rate of bytes going out for the producer
Expand All @@ -16013,6 +16103,15 @@ type: float
--
*`kafka.producer.message_rate`*::
+
--
The producer message rate
type: float
--
[[exported-fields-kibana]]
== Kibana fields
Expand Down
26 changes: 22 additions & 4 deletions metricbeat/module/kafka/broker/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
- name: request.fetch.failed_per_second
description: The rate of client fetch request failures per second
type: float
- name: request.produce.failed
description: The number of failed produce requests
type: float
- name: request.fetch.failed
description: The number of client fetch request failures
type: float
- name: replication.leader_elections
description: The leader election rate
type: float
Expand All @@ -36,15 +42,27 @@
- name: log.flush_rate
description: The log flush rate
type: float
- name: topic.net.bytes_in
- name: topic.net.in.bytes_per_sec
description: The incoming byte rate per topic
type: float
- name: topic.net.out.bytes_per_sec
description: The outgoing byte rate per topic
type: float
- name: topic.net.rejected.bytes_per_sec
description: The rejected byte rate per topic
type: float
- name: topic.messages_in
description: The incoming message rate per topic
type: float
- name: net.in.bytes_per_sec
description: The incoming byte rate
type: float
- name: topic.net.bytes_out
- name: net.out.bytes_per_sec
description: The outgoing byte rate
type: float
- name: topic.net.bytes_rejected
- name: net.rejected.bytes_per_sec
description: The rejected byte rate
type: float
- name: topic.messages_in
- name: messages_in
description: The incoming message rate
type: float
66 changes: 66 additions & 0 deletions metricbeat/module/kafka/broker/broker_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build integration

package broker

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/tests/compose"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
// Register input module and metricset
_ "github.com/elastic/beats/metricbeat/module/jolokia"
_ "github.com/elastic/beats/metricbeat/module/jolokia/jmx"
)

func TestData(t *testing.T) {
service := compose.EnsureUp(t, "kafka",
compose.UpWithTimeout(600*time.Second),
compose.UpWithAdvertisedHostEnvFileForPort(9092),
)

m := mbtest.NewFetcher(t, getConfig(service.HostForPort(8779)))
m.WriteEvents(t, "")
}

func TestFetch(t *testing.T) {
service := compose.EnsureUp(t, "kafka",
compose.UpWithTimeout(600*time.Second),
compose.UpWithAdvertisedHostEnvFileForPort(9092),
)

m := mbtest.NewFetcher(t, getConfig(service.HostForPort(8779)))
events, errs := m.FetchEvents()
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)
t.Logf("%s/%s event: %+v", m.Module().Name(), m.Name(), events[0])
}

func getConfig(host string) map[string]interface{} {
return map[string]interface{}{
"module": "kafka",
"metricsets": []string{"broker"},
"hosts": []string{host},
}
}
33 changes: 33 additions & 0 deletions metricbeat/module/kafka/broker/broker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package broker

import (
"os"

"github.com/elastic/beats/metricbeat/mb"
// Register input module and metricset
_ "github.com/elastic/beats/metricbeat/module/jolokia"
_ "github.com/elastic/beats/metricbeat/module/jolokia/jmx"
)

func init() {
// To be moved to some kind of helper
os.Setenv("BEAT_STRICT_PERMS", "false")
mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module"))
}
34 changes: 30 additions & 4 deletions metricbeat/module/kafka/broker/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ input:
attributes:
- attr: MeanRate
field: request.fetch.failed_per_second
- mbean: 'kafka.server:name=FailedProduceRequestsPerSec,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: request.produce.failed
- mbean: 'kafka.server:name=FailedFetchRequestsPerSec,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: request.fetch.failed
- mbean: 'kafka.controller:name=LeaderElectionRateAndTimeMs,type=ControllerStats'
attributes:
- attr: MeanRate
Expand Down Expand Up @@ -47,19 +55,37 @@ input:
attributes:
- attr: MeanRate
field: log.flush_rate
- mbean: 'kafka.server:name=BytesRejectedPerSec,topic=*,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: topic.net.rejected.bytes_per_sec
- mbean: 'kafka.server:name=BytesInPerSec,topic=*,type=BrokerTopicMetrics,topic=*'
attributes:
- attr: MeanRate
field: topic.net.in.bytes_per_sec
- mbean: 'kafka.server:name=BytesOutPerSec,topic=*,type=BrokerTopicMetrics,topic=*'
attributes:
- attr: MeanRate
field: topic.net.out.bytes_per_sec
- mbean: 'kafka.server:type=BrokerTopicMetrics,topic=*,name=MessagesInPerSec,topic=*'
attributes:
- attr: MeanRate
field: topic.messages_in
- mbean: 'kafka.server:name=BytesRejectedPerSec,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: topic.net.bytes_rejected
field: net.rejected.bytes_per_sec
- mbean: 'kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: topic.net.bytes_in
field: net.in.bytes_per_sec
- mbean: 'kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics'
attributes:
- attr: MeanRate
field: topic.net.bytes_out
field: net.out.bytes_per_sec
- mbean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec'
attributes:
- attr: MeanRate
field: topic.messages_in
field: messages_in


Loading

0 comments on commit a5d86cb

Please sign in to comment.