From be8ea9e963e873a326d88bcfae047bdac4096196 Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Thu, 21 Nov 2019 17:18:41 +0200 Subject: [PATCH] Cherry-pick #14330 to 7.x: [Metricbeat] Add Kafka JMX metricsets (#14656) --- CHANGELOG.next.asciidoc | 1 + metricbeat/docker-compose.yml | 11 +- metricbeat/docs/fields.asciidoc | 324 ++++++++++++++++++ metricbeat/docs/modules/kafka.asciidoc | 45 ++- metricbeat/docs/modules/kafka/broker.asciidoc | 24 ++ .../docs/modules/kafka/consumer.asciidoc | 24 ++ .../docs/modules/kafka/producer.asciidoc | 24 ++ metricbeat/docs/modules_list.asciidoc | 5 +- metricbeat/metricbeat.reference.yml | 27 +- metricbeat/module/kafka/_meta/Dockerfile | 14 +- .../module/kafka/_meta/config.reference.yml | 26 -- metricbeat/module/kafka/_meta/config.yml | 22 ++ metricbeat/module/kafka/_meta/docs.asciidoc | 6 + metricbeat/module/kafka/_meta/healthcheck.sh | 2 +- .../_meta/jaas-kafka-client-consumer.conf | 5 + .../_meta/jaas-kafka-client-producer.conf | 5 + metricbeat/module/kafka/_meta/run.sh | 18 +- .../kafka/_meta/sasl-consumer.properties | 2 + .../kafka/_meta/sasl-producer.properties | 2 + .../module/kafka/broker/_meta/data.json | 82 +++++ .../module/kafka/broker/_meta/docs.asciidoc | 20 ++ .../module/kafka/broker/_meta/fields.yml | 50 +++ metricbeat/module/kafka/broker/manifest.yml | 65 ++++ .../module/kafka/consumer/_meta/data.json | 53 +++ .../module/kafka/consumer/_meta/docs.asciidoc | 20 ++ .../module/kafka/consumer/_meta/fields.yml | 20 ++ metricbeat/module/kafka/consumer/manifest.yml | 21 ++ .../consumergroup_integration_test.go | 4 +- metricbeat/module/kafka/fields.go | 2 +- metricbeat/module/kafka/module.yml | 5 + .../partition/partition_integration_test.go | 4 +- .../module/kafka/producer/_meta/data.json | 60 ++++ .../module/kafka/producer/_meta/docs.asciidoc | 20 ++ .../module/kafka/producer/_meta/fields.yml | 47 +++ metricbeat/module/kafka/producer/manifest.yml | 39 +++ metricbeat/modules.d/kafka.yml.disabled | 22 ++ x-pack/metricbeat/metricbeat.reference.yml | 27 +- 37 files changed, 1102 insertions(+), 46 deletions(-) create mode 100644 metricbeat/docs/modules/kafka/broker.asciidoc create mode 100644 metricbeat/docs/modules/kafka/consumer.asciidoc create mode 100644 metricbeat/docs/modules/kafka/producer.asciidoc delete mode 100644 metricbeat/module/kafka/_meta/config.reference.yml create mode 100644 metricbeat/module/kafka/_meta/jaas-kafka-client-consumer.conf create mode 100644 metricbeat/module/kafka/_meta/jaas-kafka-client-producer.conf create mode 100644 metricbeat/module/kafka/_meta/sasl-consumer.properties create mode 100644 metricbeat/module/kafka/_meta/sasl-producer.properties create mode 100644 metricbeat/module/kafka/broker/_meta/data.json create mode 100644 metricbeat/module/kafka/broker/_meta/docs.asciidoc create mode 100644 metricbeat/module/kafka/broker/_meta/fields.yml create mode 100644 metricbeat/module/kafka/broker/manifest.yml create mode 100644 metricbeat/module/kafka/consumer/_meta/data.json create mode 100644 metricbeat/module/kafka/consumer/_meta/docs.asciidoc create mode 100644 metricbeat/module/kafka/consumer/_meta/fields.yml create mode 100644 metricbeat/module/kafka/consumer/manifest.yml create mode 100644 metricbeat/module/kafka/module.yml create mode 100644 metricbeat/module/kafka/producer/_meta/data.json create mode 100644 metricbeat/module/kafka/producer/_meta/docs.asciidoc create mode 100644 metricbeat/module/kafka/producer/_meta/fields.yml create mode 100644 metricbeat/module/kafka/producer/manifest.yml diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3db2c5ec9cef..595248c2293a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -484,6 +484,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `elasticsearch/enrich` metricset. {pull}14243[14243] {issue}14221[14221] - Add support for Application ELB and Network ELB. {pull}14123[14123] {issue}13538[13538] {issue}13539[13539] - Release aws ebs metricset as GA. {pull}14312[14312] {issue}14060[14060] +- Add Kafka JMX metricsets. {pull}14330[14330] - Add metrics to envoyproxy server metricset and support for envoy proxy 1.12. {pull}14416[14416] {issue}13642[13642] *Packetbeat* diff --git a/metricbeat/docker-compose.yml b/metricbeat/docker-compose.yml index ccbba1ef42a9..ebbcc352c769 100644 --- a/metricbeat/docker-compose.yml +++ b/metricbeat/docker-compose.yml @@ -129,7 +129,7 @@ services: haproxy: image: docker.elastic.co/observability-ci/beats-integration-haproxy:${HAPROXY_VERSION:-1.8}-1 - build: + build: context: ./module/haproxy/_meta args: HAPROXY_VERSION: ${HAPROXY_VERSION:-1.8} @@ -155,13 +155,16 @@ services: - 8778 kafka: - image: docker.elastic.co/observability-ci/beats-integration-kafka:${KAFKA_VERSION:-2.1.1}-1 + image: docker.elastic.co/observability-ci/beats-integration-kafka:${KAFKA_VERSION:-2.1.1}-2 build: context: ./module/kafka/_meta args: KAFKA_VERSION: ${KAFKA_VERSION:-2.1.1} ports: - 9092 + - 8779 + - 8775 + - 8774 kibana: image: docker.elastic.co/observability-ci/beats-integration-kibana:${KIBANA_VERSION:-7.4.0}-1 @@ -253,7 +256,7 @@ services: nginx: image: docker.elastic.co/observability-ci/beats-integration-nginx:${NGINX_VERSION:-1.9}-1 - build: + build: context: ./module/nginx/_meta args: NGINX_VERSION: ${NGINX_VERSION:-1.9} @@ -262,7 +265,7 @@ services: phpfpm: image: docker.elastic.co/observability-ci/beats-integration-phpfpm:${PHPFPM_VERSION:-7.1}-1 - build: + build: context: ./module/php_fpm/_meta args: PHPFPM_VERSION: ${PHPFPM_VERSION:-7.1} diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index c911736a7df9..4130e383b7b1 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -14965,6 +14965,198 @@ type: keyword -- +[float] +=== broker + +Broker metrics from Kafka Broker JMX + + +*`kafka.broker.mbean`*:: ++ +-- +Mbean that this event is related to + +type: keyword + +-- + +*`kafka.broker.request.channel.queue.size`*:: ++ +-- +The size of the request queue + +type: long + +-- + +*`kafka.broker.request.produce.failed_per_second`*:: ++ +-- +The rate of failed produce requests per second + +type: float + +-- + +*`kafka.broker.request.fetch.failed_per_second`*:: ++ +-- +The rate of client fetch request failures per second + +type: float + +-- + +*`kafka.broker.replication.leader_elections`*:: ++ +-- +The leader election rate + +type: float + +-- + +*`kafka.broker.replication.unclean_leader_elections`*:: ++ +-- +The unclean leader election rate + +type: float + +-- + +*`kafka.broker.session.zookeeper.disconnect`*:: ++ +-- +The ZooKeeper closed sessions per second + +type: float + +-- + +*`kafka.broker.session.zookeeper.expire`*:: ++ +-- +The ZooKeeper expired sessions per second + +type: float + +-- + +*`kafka.broker.session.zookeeper.readonly`*:: ++ +-- +The ZooKeeper readonly sessions per second + +type: float + +-- + +*`kafka.broker.session.zookeeper.sync`*:: ++ +-- +The ZooKeeper client connections per second + +type: float + +-- + +*`kafka.broker.log.flush_rate`*:: ++ +-- +The log flush rate + +type: float + +-- + +*`kafka.broker.topic.net.bytes_in`*:: ++ +-- +The incoming byte rate + +type: float + +-- + +*`kafka.broker.topic.net.bytes_out`*:: ++ +-- +The outgoing byte rate + +type: float + +-- + +*`kafka.broker.topic.net.bytes_rejected`*:: ++ +-- +The rejected byte rate + +type: float + +-- + +*`kafka.broker.topic.messages_in`*:: ++ +-- +The incoming message rate + +type: float + +-- + +[float] +=== consumer + +Consumer metrics from Kafka Consumer JMX + + +*`kafka.consumer.mbean`*:: ++ +-- +Mbean that this event is related to + +type: keyword + +-- + +*`kafka.consumer.fetch_rate`*:: ++ +-- +The minimum rate at which the consumer sends fetch requests to a broker + +type: float + +-- + +*`kafka.consumer.bytes_consumed`*:: ++ +-- +The average number of bytes consumed for a specific topic per second + +type: float + +-- + +*`kafka.consumer.records_consumed`*:: ++ +-- +The average number of records consumed per second for a specific topic + +type: float + +-- + +*`kafka.consumer.bytes_in`*:: ++ +-- +The rate of bytes coming in to the consumer + +type: float + +-- + [float] === consumergroup @@ -15260,6 +15452,138 @@ type: keyword -- +[float] +=== producer + +Producer metrics from Kafka Producer JMX + + +*`kafka.producer.mbean`*:: ++ +-- +Mbean that this event is related to + +type: keyword + +-- + +*`kafka.producer.available_buffer_bytes`*:: ++ +-- +The total amount of buffer memory + +type: float + +-- + +*`kafka.producer.batch_size_avg`*:: ++ +-- +The average number of bytes sent + +type: float + +-- + +*`kafka.producer.batch_size_max`*:: ++ +-- +The maximum number of bytes sent + +type: long + +-- + +*`kafka.producer.record_send_rate`*:: ++ +-- +The average number of records sent per second + +type: float + +-- + +*`kafka.producer.record_retry_rate`*:: ++ +-- +The average number of retried record sends per second + +type: float + +-- + +*`kafka.producer.record_error_rate`*:: ++ +-- +The average number of retried record sends per second + +type: float + +-- + +*`kafka.producer.records_per_request`*:: ++ +-- +The average number of records sent per second + +type: float + +-- + +*`kafka.producer.record_size_avg`*:: ++ +-- +The average record size + +type: float + +-- + +*`kafka.producer.record_size_max`*:: ++ +-- +The maximum record size + +type: long + +-- + +*`kafka.producer.request_rate`*:: ++ +-- +The number of producer requests per second + +type: float + +-- + +*`kafka.producer.response_rate`*:: ++ +-- +The number of producer responses per second + +type: float + +-- + +*`kafka.producer.io_wait`*:: ++ +-- +The producer I/O wait time + +type: float + +-- + +*`kafka.producer.bytes_out`*:: ++ +-- +The rate of bytes going out for the producer + +type: float + +-- + [[exported-fields-kibana]] == Kibana fields diff --git a/metricbeat/docs/modules/kafka.asciidoc b/metricbeat/docs/modules/kafka.asciidoc index 4f97262b1f4f..6da37428fa96 100644 --- a/metricbeat/docs/modules/kafka.asciidoc +++ b/metricbeat/docs/modules/kafka.asciidoc @@ -29,6 +29,12 @@ kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allo This module is tested with Kafka 0.10.2.1, 1.1.0 and 2.1.1. +The Broker, Producer, Consumer metricsets require <>to fetch JMX metrics. Refer to the link for Jolokia's compatibility notes. + +[float] +=== Usage +The Broker, Producer, Consumer metricsets require <>to fetch JMX metrics. Refer to those Metricsets' documentation about how to use Jolokia. + [float] === Example configuration @@ -39,11 +45,13 @@ in <>. Here is an example configuration: [source,yaml] ---- metricbeat.modules: +# Kafka metrics collected using the Kafka protocol - module: kafka - metricsets: ["consumergroup", "partition"] + #metricsets: + # - partition + # - consumergroup period: 10s hosts: ["localhost:9092"] - enabled: true #client_id: metricbeat #retries: 3 @@ -65,6 +73,27 @@ metricbeat.modules: # SASL authentication #username: "" #password: "" + +# Metrics collected from a Kafka broker using Jolokia +#- module: kafka +# metricsets: +# - broker +# period: 10s +# hosts: ["localhost:8779"] + +# Metrics collected from a Java Kafka consumer using Jolokia +#- module: kafka +# metricsets: +# - consumer +# period: 10s +# hosts: ["localhost:8774"] + +# Metrics collected from a Java Kafka producer using Jolokia +#- module: kafka +# metricsets: +# - producer +# period: 10s +# hosts: ["localhost:8775"] ---- [float] @@ -72,11 +101,23 @@ metricbeat.modules: The following metricsets are available: +* <> + +* <> + * <> * <> +* <> + +include::kafka/broker.asciidoc[] + +include::kafka/consumer.asciidoc[] + include::kafka/consumergroup.asciidoc[] include::kafka/partition.asciidoc[] +include::kafka/producer.asciidoc[] + diff --git a/metricbeat/docs/modules/kafka/broker.asciidoc b/metricbeat/docs/modules/kafka/broker.asciidoc new file mode 100644 index 000000000000..9ef1c702ce2a --- /dev/null +++ b/metricbeat/docs/modules/kafka/broker.asciidoc @@ -0,0 +1,24 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +[[metricbeat-metricset-kafka-broker]] +=== Kafka broker metricset + +beta[] + +include::../../../module/kafka/broker/_meta/docs.asciidoc[] + +This is a default metricset. If the host module is unconfigured, this metricset is enabled by default. + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/kafka/broker/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules/kafka/consumer.asciidoc b/metricbeat/docs/modules/kafka/consumer.asciidoc new file mode 100644 index 000000000000..a30bd9001188 --- /dev/null +++ b/metricbeat/docs/modules/kafka/consumer.asciidoc @@ -0,0 +1,24 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +[[metricbeat-metricset-kafka-consumer]] +=== Kafka consumer metricset + +beta[] + +include::../../../module/kafka/consumer/_meta/docs.asciidoc[] + +This is a default metricset. If the host module is unconfigured, this metricset is enabled by default. + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/kafka/consumer/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules/kafka/producer.asciidoc b/metricbeat/docs/modules/kafka/producer.asciidoc new file mode 100644 index 000000000000..d7e9e6b9d536 --- /dev/null +++ b/metricbeat/docs/modules/kafka/producer.asciidoc @@ -0,0 +1,24 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +[[metricbeat-metricset-kafka-producer]] +=== Kafka producer metricset + +beta[] + +include::../../../module/kafka/producer/_meta/docs.asciidoc[] + +This is a default metricset. If the host module is unconfigured, this metricset is enabled by default. + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../module/kafka/producer/_meta/data.json[] +---- diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index 4d5ace9d8b6e..953541bf49fb 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -92,8 +92,11 @@ This file is generated! See scripts/mage/docs_collector.go |<> |image:./images/icon-no.png[No prebuilt dashboards] | .1+| .1+| |<> |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | -.2+| .2+| |<> +.5+| .5+| |<> beta[] +|<> beta[] +|<> |<> +|<> beta[] |<> |image:./images/icon-no.png[No prebuilt dashboards] | .2+| .2+| |<> |<> diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index e7399cb6a18b..ac3b96cf0c97 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -374,11 +374,13 @@ metricbeat.modules: jmx.instance: #-------------------------------- Kafka Module -------------------------------- +# Kafka metrics collected using the Kafka protocol - module: kafka - metricsets: ["consumergroup", "partition"] + #metricsets: + # - partition + # - consumergroup period: 10s hosts: ["localhost:9092"] - enabled: true #client_id: metricbeat #retries: 3 @@ -401,6 +403,27 @@ metricbeat.modules: #username: "" #password: "" +# Metrics collected from a Kafka broker using Jolokia +#- module: kafka +# metricsets: +# - broker +# period: 10s +# hosts: ["localhost:8779"] + +# Metrics collected from a Java Kafka consumer using Jolokia +#- module: kafka +# metricsets: +# - consumer +# period: 10s +# hosts: ["localhost:8774"] + +# Metrics collected from a Java Kafka producer using Jolokia +#- module: kafka +# metricsets: +# - producer +# period: 10s +# hosts: ["localhost:8775"] + #-------------------------------- Kibana Module -------------------------------- - module: kibana metricsets: ["status"] diff --git a/metricbeat/module/kafka/_meta/Dockerfile b/metricbeat/module/kafka/_meta/Dockerfile index ebfdbf295caf..45f75c9016fc 100644 --- a/metricbeat/module/kafka/_meta/Dockerfile +++ b/metricbeat/module/kafka/_meta/Dockerfile @@ -16,14 +16,24 @@ RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && \ "https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz" && \ tar xzf ${INSTALL_DIR}/kafka.tgz -C ${KAFKA_HOME} --strip-components 1 +RUN retry --min 1 --max 180 -- curl -J -L -s -f --show-error -o /opt/jolokia-jvm-1.5.0-agent.jar \ + http://search.maven.org/remotecontent\?filepath\=org/jolokia/jolokia-jvm/1.5.0/jolokia-jvm-1.5.0-agent.jar + ADD kafka_server_jaas.conf /etc/kafka/server_jaas.conf +ADD jaas-kafka-client-producer.conf /kafka/bin/jaas-kafka-client-producer.conf +ADD sasl-producer.properties /kafka/bin/sasl-producer.properties +ADD jaas-kafka-client-consumer.conf /kafka/bin/jaas-kafka-client-consumer.conf +ADD sasl-consumer.properties /kafka/bin/sasl-consumer.properties ADD run.sh /run.sh ADD healthcheck.sh /healthcheck.sh EXPOSE 9092 EXPOSE 2181 +EXPOSE 8779 +EXPOSE 8775 +EXPOSE 8774 -# Healthcheck creates an empty topic foo. As soon as a topic is created, it assumes broke is available -HEALTHCHECK --interval=1s --retries=90 CMD /healthcheck.sh +# Healthcheck creates an empty topic foo. As soon as a topic is created, it assumes broker is available +HEALTHCHECK --interval=1s --retries=700 CMD /healthcheck.sh ENTRYPOINT ["/run.sh"] diff --git a/metricbeat/module/kafka/_meta/config.reference.yml b/metricbeat/module/kafka/_meta/config.reference.yml deleted file mode 100644 index 0a9d9fe88caf..000000000000 --- a/metricbeat/module/kafka/_meta/config.reference.yml +++ /dev/null @@ -1,26 +0,0 @@ -- module: kafka - metricsets: ["consumergroup", "partition"] - period: 10s - hosts: ["localhost:9092"] - enabled: true - - #client_id: metricbeat - #retries: 3 - #backoff: 250ms - - # List of Topics to query metadata for. If empty, all topics will be queried. - #topics: [] - - # Optional SSL. By default is off. - # List of root certificates for HTTPS server verifications - #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] - - # Certificate for SSL client authentication - #ssl.certificate: "/etc/pki/client/cert.pem" - - # Client Certificate Key - #ssl.key: "/etc/pki/client/cert.key" - - # SASL authentication - #username: "" - #password: "" diff --git a/metricbeat/module/kafka/_meta/config.yml b/metricbeat/module/kafka/_meta/config.yml index 9ed476ffe6ff..fa900f7b6666 100644 --- a/metricbeat/module/kafka/_meta/config.yml +++ b/metricbeat/module/kafka/_meta/config.yml @@ -1,3 +1,4 @@ +# Kafka metrics collected using the Kafka protocol - module: kafka #metricsets: # - partition @@ -25,3 +26,24 @@ # SASL authentication #username: "" #password: "" + +# Metrics collected from a Kafka broker using Jolokia +#- module: kafka +# metricsets: +# - broker +# period: 10s +# hosts: ["localhost:8779"] + +# Metrics collected from a Java Kafka consumer using Jolokia +#- module: kafka +# metricsets: +# - consumer +# period: 10s +# hosts: ["localhost:8774"] + +# Metrics collected from a Java Kafka producer using Jolokia +#- module: kafka +# metricsets: +# - producer +# period: 10s +# hosts: ["localhost:8775"] diff --git a/metricbeat/module/kafka/_meta/docs.asciidoc b/metricbeat/module/kafka/_meta/docs.asciidoc index 89f8e6a3f827..d6a59cbb8aef 100644 --- a/metricbeat/module/kafka/_meta/docs.asciidoc +++ b/metricbeat/module/kafka/_meta/docs.asciidoc @@ -21,3 +21,9 @@ kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allo === Compatibility This module is tested with Kafka 0.10.2.1, 1.1.0 and 2.1.1. + +The Broker, Producer, Consumer metricsets require <>to fetch JMX metrics. Refer to the link for Jolokia's compatibility notes. + +[float] +=== Usage +The Broker, Producer, Consumer metricsets require <>to fetch JMX metrics. Refer to those Metricsets' documentation about how to use Jolokia. diff --git a/metricbeat/module/kafka/_meta/healthcheck.sh b/metricbeat/module/kafka/_meta/healthcheck.sh index 97d70b812edc..9314577c9523 100755 --- a/metricbeat/module/kafka/_meta/healthcheck.sh +++ b/metricbeat/module/kafka/_meta/healthcheck.sh @@ -10,5 +10,5 @@ if [[ $rc != 0 ]]; then exit $rc fi -${KAFKA_HOME}/bin/kafka-topic.sh --zookeeper=127.0.0.1:2181 --delete --topic "${TOPIC}" +${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper=127.0.0.1:2181 --delete --topic "${TOPIC}" exit 0 diff --git a/metricbeat/module/kafka/_meta/jaas-kafka-client-consumer.conf b/metricbeat/module/kafka/_meta/jaas-kafka-client-consumer.conf new file mode 100644 index 000000000000..80b39c1e16ca --- /dev/null +++ b/metricbeat/module/kafka/_meta/jaas-kafka-client-consumer.conf @@ -0,0 +1,5 @@ +KafkaClient { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="consumer" + password="consumer-secret"; +}; diff --git a/metricbeat/module/kafka/_meta/jaas-kafka-client-producer.conf b/metricbeat/module/kafka/_meta/jaas-kafka-client-producer.conf new file mode 100644 index 000000000000..fa9193ae05b4 --- /dev/null +++ b/metricbeat/module/kafka/_meta/jaas-kafka-client-producer.conf @@ -0,0 +1,5 @@ +KafkaClient { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="producer" + password="producer-secret"; +}; diff --git a/metricbeat/module/kafka/_meta/run.sh b/metricbeat/module/kafka/_meta/run.sh index 68df3e8a4c6a..a598cec6012e 100755 --- a/metricbeat/module/kafka/_meta/run.sh +++ b/metricbeat/module/kafka/_meta/run.sh @@ -34,6 +34,7 @@ wait_for_port() { nc -z localhost $port } +${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper=127.0.0.1:2181 --create --partitions 1 --topic test --replication-factor 1 echo "Starting ZooKeeper" ${KAFKA_HOME}/bin/zookeeper-server-start.sh ${KAFKA_HOME}/config/zookeeper.properties & @@ -41,7 +42,7 @@ wait_for_port 2181 echo "Starting Kafka broker" mkdir -p ${KAFKA_LOGS_DIR} -export KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/server_jaas.conf +export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/server_jaas.conf -javaagent:/opt/jolokia-jvm-1.5.0-agent.jar=port=8779,host=0.0.0.0" ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \ --override authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer \ --override super.users=User:admin \ @@ -55,6 +56,8 @@ ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \ --override logs.dir=${KAFKA_LOGS_DIR} & wait_for_port 9092 +wait_for_port 8779 + echo "Kafka load status code $?" @@ -68,5 +71,18 @@ ${KAFKA_HOME}/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localh touch /tmp/.acls_loaded + +# Start a forever producer +{ while sleep 1; do echo message; done } | KAFKA_OPTS="-Djava.security.auth.login.config=/kafka/bin/jaas-kafka-client-producer.conf -javaagent:/opt/jolokia-jvm-1.5.0-agent.jar=port=8775,host=0.0.0.0" \ + ${KAFKA_HOME}/bin/kafka-console-producer.sh --topic test --broker-list localhost:9091 --producer.config ${KAFKA_HOME}/bin/sasl-producer.properties > /dev/null & + +wait_for_port 8775 + +# Start a forever consumer +KAFKA_OPTS="-Djava.security.auth.login.config=/kafka/bin/jaas-kafka-client-consumer.conf -javaagent:/opt/jolokia-jvm-1.5.0-agent.jar=port=8774,host=0.0.0.0" \ + ${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic=test --bootstrap-server=localhost:9091 --consumer.config ${KAFKA_HOME}/bin/sasl-producer.properties > /dev/null & + +wait_for_port 8774 + # Make sure the container keeps running tail -f /dev/null diff --git a/metricbeat/module/kafka/_meta/sasl-consumer.properties b/metricbeat/module/kafka/_meta/sasl-consumer.properties new file mode 100644 index 000000000000..74ec7eba0fde --- /dev/null +++ b/metricbeat/module/kafka/_meta/sasl-consumer.properties @@ -0,0 +1,2 @@ +security.protocol=SASL_PLAINTEXT +sasl.mechanism=PLAIN diff --git a/metricbeat/module/kafka/_meta/sasl-producer.properties b/metricbeat/module/kafka/_meta/sasl-producer.properties new file mode 100644 index 000000000000..74ec7eba0fde --- /dev/null +++ b/metricbeat/module/kafka/_meta/sasl-producer.properties @@ -0,0 +1,2 @@ +security.protocol=SASL_PLAINTEXT +sasl.mechanism=PLAIN diff --git a/metricbeat/module/kafka/broker/_meta/data.json b/metricbeat/module/kafka/broker/_meta/data.json new file mode 100644 index 000000000000..15ac30d2f18d --- /dev/null +++ b/metricbeat/module/kafka/broker/_meta/data.json @@ -0,0 +1,82 @@ +{ + "@timestamp": "2019-10-30T14:22:37.475Z", + "@metadata": { + "beat": "metricbeat", + "type": "_doc", + "version": "8.0.0" + }, + "agent": { + "ephemeral_id": "f08b3bea-3631-4aaf-b35d-4cef29aeeb06", + "hostname": "MBP.lan", + "id": "79dd1677-1bea-4efd-9131-e8ca464eddf0", + "version": "8.0.0", + "type": "metricbeat" + }, + "ecs": { + "version": "1.2.0" + }, + "metricset": { + "name": "broker", + "period": 10000 + }, + "service": { + "address": "localhost:8779", + "type": "kafka" + }, + "kafka": { + "broker": { + "request": { + "fetch": { + "failed_per_second": 0 + }, + "channel": { + "queue": { + "size": 0 + } + }, + "produce": { + "failed_per_second": 0 + } + }, + "replication": { + "leader_elections": 0, + "unclean_leader_elections": 0 + }, + "session": { + "zookeeper": { + "expire": 0, + "readonly": 0, + "sync": 0.00017675970397749868, + "disconnect": 0 + } + }, + "topic": { + "net": { + "bytes_out": 0, + "bytes_rejected": 0, + "bytes_in": 0 + }, + "messages_in": 0 + } + } + }, + "event": { + "dataset": "kafka.broker", + "module": "kafka", + "duration": 7870293 + }, + "host": { + "name": "MBP.lan", + "hostname": "MBP.lan", + "architecture": "x86_64", + "os": { + "version": "10.14.6", + "family": "darwin", + "name": "Mac OS X", + "kernel": "18.7.0", + "build": "18G95", + "platform": "darwin" + }, + "id": "883134FF-0EC4-5E1B-9F9E-FD06FB681D84" + } +} diff --git a/metricbeat/module/kafka/broker/_meta/docs.asciidoc b/metricbeat/module/kafka/broker/_meta/docs.asciidoc new file mode 100644 index 000000000000..8f4187ba8def --- /dev/null +++ b/metricbeat/module/kafka/broker/_meta/docs.asciidoc @@ -0,0 +1,20 @@ +This metricset periodically fetches JMX metrics from Kafka Broker JMX. + +[float] +=== Compatibility +The module has been tested with Kafka 2.1.1. Other versions are expected to work. + +[float] +=== Usage +The Broker metricset requires <>to fetch JMX metrics. Refer to the link for instructions about how to use Jolokia. + +Note that the Jolokia agent is required to be deployed along with the Kafka JVM application. This can be achieved by +using the `KAFKA_OPTS` environment variable when starting the Kafka broker application: + +[source,shell] +---- +export KAFKA_OPTS=-javaagent:/opt/jolokia-jvm-1.5.0-agent.jar=port=8779,host=localhost +./bin/kafka-server-start.sh ./config/server.properties +---- + +Then it will be possible to collect the JMX metrics from `localhost:8779`. diff --git a/metricbeat/module/kafka/broker/_meta/fields.yml b/metricbeat/module/kafka/broker/_meta/fields.yml new file mode 100644 index 000000000000..4fa195e7b298 --- /dev/null +++ b/metricbeat/module/kafka/broker/_meta/fields.yml @@ -0,0 +1,50 @@ +- name: broker + type: group + description: Broker metrics from Kafka Broker JMX + release: beta + fields: + - name: mbean + description: Mbean that this event is related to + type: keyword + - name: request.channel.queue.size + description: The size of the request queue + type: long + - name: request.produce.failed_per_second + description: The rate of failed produce requests per second + type: float + - name: request.fetch.failed_per_second + description: The rate of client fetch request failures per second + type: float + - name: replication.leader_elections + description: The leader election rate + type: float + - name: replication.unclean_leader_elections + description: The unclean leader election rate + type: float + - name: session.zookeeper.disconnect + description: The ZooKeeper closed sessions per second + type: float + - name: session.zookeeper.expire + description: The ZooKeeper expired sessions per second + type: float + - name: session.zookeeper.readonly + description: The ZooKeeper readonly sessions per second + type: float + - name: session.zookeeper.sync + description: The ZooKeeper client connections per second + type: float + - name: log.flush_rate + description: The log flush rate + type: float + - name: topic.net.bytes_in + description: The incoming byte rate + type: float + - name: topic.net.bytes_out + description: The outgoing byte rate + type: float + - name: topic.net.bytes_rejected + description: The rejected byte rate + type: float + - name: topic.messages_in + description: The incoming message rate + type: float diff --git a/metricbeat/module/kafka/broker/manifest.yml b/metricbeat/module/kafka/broker/manifest.yml new file mode 100644 index 000000000000..e8c68c4459ed --- /dev/null +++ b/metricbeat/module/kafka/broker/manifest.yml @@ -0,0 +1,65 @@ +default: true +input: + module: jolokia + metricset: jmx + defaults: + namespace: "broker" + hosts: ["localhost:8779"] + path: "/jolokia/?ignoreErrors=true&canonicalNaming=false" + jmx.mappings: + - mbean: 'kafka.network:type=RequestChannel,name=RequestQueueSize' + attributes: + - attr: Value + field: request.channel.queue.size + - mbean: 'kafka.server:name=FailedProduceRequestsPerSec,type=BrokerTopicMetrics' + attributes: + - attr: MeanRate + field: request.produce.failed_per_second + - mbean: 'kafka.server:name=FailedFetchRequestsPerSec,type=BrokerTopicMetrics' + attributes: + - attr: MeanRate + field: request.fetch.failed_per_second + - mbean: 'kafka.controller:name=LeaderElectionRateAndTimeMs,type=ControllerStats' + attributes: + - attr: MeanRate + field: replication.leader_elections + - mbean: 'kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec' + attributes: + - attr: MeanRate + field: replication.unclean_leader_elections + - mbean: 'kafka.server:name=ZooKeeperDisconnectsPerSec,type=SessionExpireListener' + attributes: + - attr: MeanRate + field: session.zookeeper.disconnect + - mbean: 'kafka.server:name=ZooKeeperExpiresPerSec,type=SessionExpireListener' + attributes: + - attr: MeanRate + field: session.zookeeper.expire + - mbean: 'kafka.server:name=ZooKeeperReadOnlyConnectsPerSec,type=SessionExpireListener' + attributes: + - attr: MeanRate + field: session.zookeeper.readonly + - mbean: 'kafka.server:name=ZooKeeperSyncConnectsPerSec,type=SessionExpireListener' + attributes: + - attr: MeanRate + field: session.zookeeper.sync + - mbean: 'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs' + attributes: + - attr: MeanRate + field: log.flush_rate + - mbean: 'kafka.server:name=BytesRejectedPerSec,type=BrokerTopicMetrics' + attributes: + - attr: MeanRate + field: topic.net.bytes_rejected + - mbean: 'kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics' + attributes: + - attr: MeanRate + field: topic.net.bytes_in + - mbean: 'kafka.server:name=BytesOutPerSec,type=BrokerTopicMetrics' + attributes: + - attr: MeanRate + field: topic.net.bytes_out + - mbean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec' + attributes: + - attr: MeanRate + field: topic.messages_in diff --git a/metricbeat/module/kafka/consumer/_meta/data.json b/metricbeat/module/kafka/consumer/_meta/data.json new file mode 100644 index 000000000000..e5cd7d1e031f --- /dev/null +++ b/metricbeat/module/kafka/consumer/_meta/data.json @@ -0,0 +1,53 @@ +{ + "@timestamp": "2019-10-31T13:22:06.700Z", + "@metadata": { + "beat": "metricbeat", + "type": "_doc", + "version": "8.0.0" + }, + "kafka": { + "consumer": { + "bytes_consumed": 2.9521300228e+10, + "fetch_rate": 0, + "records_consumed": 1.23075656e+08, + "mbean": "kafka.consumer:client-id=consumer-1,type=consumer-fetch-manager-metrics" + } + }, + "event": { + "dataset": "kafka.consumer", + "module": "kafka", + "duration": 7042831 + }, + "ecs": { + "version": "1.2.0" + }, + "host": { + "name": "pr.local", + "hostname": "pr.local", + "architecture": "x86_64", + "os": { + "kernel": "18.7.0", + "build": "18G95", + "platform": "darwin", + "version": "10.14.6", + "family": "darwin", + "name": "Mac OS X" + }, + "id": "883134FF-0EC4-5E1B-9F9E-FD06FB681D84" + }, + "agent": { + "hostname": "pr.local", + "id": "79dd1677-1bea-4efd-9131-e8ca464eddf0", + "version": "8.0.0", + "type": "metricbeat", + "ephemeral_id": "e40f5843-d3aa-4bdc-a100-64022b70851b" + }, + "metricset": { + "name": "consumer", + "period": 10000 + }, + "service": { + "address": "localhost:8774", + "type": "kafka" + } +} diff --git a/metricbeat/module/kafka/consumer/_meta/docs.asciidoc b/metricbeat/module/kafka/consumer/_meta/docs.asciidoc new file mode 100644 index 000000000000..0fd42e7d26e7 --- /dev/null +++ b/metricbeat/module/kafka/consumer/_meta/docs.asciidoc @@ -0,0 +1,20 @@ +This metricset periodically fetches JMX metrics from Kafka Consumers implemented in java and expose JMX metrics through jolokia agent. + +[float] +=== Compatibility +The module has been tested with Kafka 2.1.1. Other versions are expected to work. + +[float] +=== Usage +The Consumer metricset requires <>to fetch JMX metrics. Refer to the link for more information about Jolokia. + +Note that the Jolokia agent is required to be deployed along with the JVM application. This can be achieved by +using the `KAFKA_OPTS` environment variable when starting the Kafka consumer application: + +[source,shell] +---- +export KAFKA_OPTS=-javaagent:/opt/jolokia-jvm-1.5.0-agent.jar=port=8774,host=localhost +./bin/kafka-console-consumer.sh --topic=test --bootstrap-server=localhost:9091 +---- + +Then it will be possible to collect the JMX metrics from `localhost:8774`. diff --git a/metricbeat/module/kafka/consumer/_meta/fields.yml b/metricbeat/module/kafka/consumer/_meta/fields.yml new file mode 100644 index 000000000000..c9946d6d801c --- /dev/null +++ b/metricbeat/module/kafka/consumer/_meta/fields.yml @@ -0,0 +1,20 @@ +- name: consumer + type: group + description: Consumer metrics from Kafka Consumer JMX + release: beta + fields: + - name: mbean + description: Mbean that this event is related to + type: keyword + - name: fetch_rate + description: The minimum rate at which the consumer sends fetch requests to a broker + type: float + - name: bytes_consumed + description: The average number of bytes consumed for a specific topic per second + type: float + - name: records_consumed + description: The average number of records consumed per second for a specific topic + type: float + - name: bytes_in + description: The rate of bytes coming in to the consumer + type: float diff --git a/metricbeat/module/kafka/consumer/manifest.yml b/metricbeat/module/kafka/consumer/manifest.yml new file mode 100644 index 000000000000..93b8b29ad70a --- /dev/null +++ b/metricbeat/module/kafka/consumer/manifest.yml @@ -0,0 +1,21 @@ +default: true +input: + module: jolokia + metricset: jmx + defaults: + namespace: "consumer" + hosts: ["localhost:8774"] + path: "/jolokia/?ignoreErrors=true&canonicalNaming=false" + jmx.mappings: + - mbean: 'kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics' + attributes: + - attr: fetch-rate + field: fetch_rate + - attr: bytes-consumed-total + field: bytes_consumed + - attr: records-consumed-total + field: records_consumed + - mbean: 'kafka.consumer:client-id=*,type=consumer-metrics' + attributes: + - attr: incoming-byte-total + field: bytes_in diff --git a/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go b/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go index 1131b7a2d24d..4f1abe6092fd 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go @@ -41,7 +41,7 @@ const ( func TestData(t *testing.T) { service := compose.EnsureUp(t, "kafka", - compose.UpWithTimeout(120*time.Second), + compose.UpWithTimeout(600*time.Second), compose.UpWithAdvertisedHostEnvFileForPort(9092), ) @@ -64,7 +64,7 @@ func TestData(t *testing.T) { func TestFetch(t *testing.T) { service := compose.EnsureUp(t, "kafka", - compose.UpWithTimeout(120*time.Second), + compose.UpWithTimeout(600*time.Second), compose.UpWithAdvertisedHostEnvFileForPort(9092), ) diff --git a/metricbeat/module/kafka/fields.go b/metricbeat/module/kafka/fields.go index 7a7d0fa0bb20..4176d06ada76 100644 --- a/metricbeat/module/kafka/fields.go +++ b/metricbeat/module/kafka/fields.go @@ -32,5 +32,5 @@ func init() { // AssetKafka returns asset data. // This is the base64 encoded gzipped contents of ../metricbeat/module/kafka. func AssetKafka() string { - return "eJy8WM1u2zwQvPspFjk5h+g7fT34UKBNisJo0BZpezZocWUTpkiVpB347QtSli1RIvVjJzrFjDQzXC5nl3yAHR4XsCPZjswADDMcF3D3zf6+mwFQ1KlihWFSLODjDADA/Q9ySfccZwB6K5VZpVJkbLOAjHBtRxVyJBoXsLGwGUNO9cJ9/gCC5HihtI85FvZVJffFaaSDtwlTh1oruUN1Hu7CC2KWz2eHAI9S6H2OCr7aT2EpMqlyYj+ALTkgrBEFKCQUMiVzmJ8+2xJBORObBqTZIqQVnpNyn9Re8OdSnw+jjeFqPlx6FNEp1abF6KyTh1CqUOtOsh0eX6XyhQzjI/SAyjCN9EzRWjMjC5Ym9u/WurWpI7S/LY7DDHGgUlIlqaRtJi+ivTQOCixU0mYriDLMfps01m8s088KBhiNsrjZrTq44vFrkP0R7O8egVGQmcvY4sIu3EAZw34d5R58HzlABHW/StKkJa7ad74DjPaFEFDT3qontqdbHhXWU2oqFKbEIF3Ah+T/Sdvwlm4G/Y4WigDEnA1i7jZgqtBwuWokyN/teNDjeuN0+BxjLH6A6/qr+hQnchtmFNfgxKtZb1TCeRMPr2qDNVzcsi8QMss0mkmF9Zz0JQYwYWTNmtbIxMZtpLiCHA2ZuurpXhuZX5RYLKDEENBG1TdrJ3NnCRw+/3a6u+btrOa/SyzqFTIqKeUMRfdydPthVM8nrdlGID3BusWwi4IHFEaX/hZKwys8a6BdPJailk8wLwOn0Rgrr1SbMHrfb11bqf1wTRXSgAoS5piv/Yo+iZUJg0oQ7hWPE0F948YcY3Tt7gIZX7cjtjElTw+EcbLmeMLVVaOzYQcUtaZqZI4KfMVIekyvrd8dcGV8flfmy6yFjdO3EfTDAfcLmlCJJqznpfxYL36PbihWGQcIhtYJoxoNquRIaKtzHaC0V8ezA7bd/rzsjlvtZC1UOqzgqqbtmblssgTAqA4rUFhwlvoF/BZxeCmRuwMRjojQR5Gu+mStpeRI/IQfqGwpKLPZpoFlVQCAaWAi5XuKtDqVMfFgxVSvGLQVDubLXy+DZqJXPTn2JpOwwj3eoMRgAwU3WP8v556pbFQyNOnWtgdDbS1yzxHVd9Uh07RuRMLSvGseuN0ZpK1r6KnkdHcw5q7t+iO5x+ZpmXIfd72m4Hl19i8AAP//Xtd9qA==" + return "eJzUWk+P27YTve+nGOS0OUQ5/X6HPRRok6LYJmmCNAWKXgSaHFnsUqRCUt44n74gKcmy/lCS5U2bPa1lcd7jcGY4fPQLeMDjHTyQ7IHcAFhuBd7Bszfu87MbAIaGal5aruQd/HADAOC/g0KxSuANgMmVtilVMuP7O8iIMO6pRoHE4B3sndmMo2Dmzg9/AZIUeIJ0f/ZYule1qsr6yQjuuZmuqZ1WD6jbx2P2Jm2Gv5+8BXilpKkK1PCLGwr3MlO6IG4A5OSAsEOUoJEwyLQq4LYelhPJBJf7M5M2R6CNPU/ledJ5oT+X7nw4O3vczEeoHkR0Sp1pcXYzikMY02jMKNgDHh+V7hNZhkfYAbXlBlkLMVgzq0pOE/f/YN2G0BHYT86OtzmFgVornVDFhkg9j87CeFPgTCVDtJJoy93Y5Gz91iJ9aMwAZ1EUP7t0BCvuvzOwPyT/XCFwBirzEVue0KV/EHw4zyPk4LehA0Qy/ymAJlcpCHXsFmg1pyYkeCh19Te/vvuzM7YtcDu0ZGFeFzskMpZR79wLYHNiwebcAB5QWuDGoRGLDKxanKwNqMbPFRqb0JxIiSL5XGGFieFfMcbkU47g3mkWorYCfvSy6tQnUGrFKopJRrhAlpaoU4NUyWiNcTw0sZ5HGAi1ncaugRI1jFoKxDKhiI0yy9DS/HJeVHC3TN5K6yhnrdK4iV0pOPW7TyKQMNQpCqTuc79kD6iF96F531PdAF9JKpDIdC2Netw16Bg0xlH5qtQDYok6YdxQJSVSO0fjL6Xe+DFAhXK7Um1sw+IM6eCXkuvZnDpRCe8/DRfXoigpjsvZNCOehI45SrpmjXw61Wu7jYtQ+yQTlcnTkZAbZo3ag3/7kgCtGxq0ye5o0aQ8WukdHJdUFVzuwQ24BqSqZlNBVXavromp8W+kFueLZf3eRuACjSH7de6txywFbQCb5n1dG9EeIUYaifa777SV8DvcokwquORFVYQtklh4zDnNz49EBiUz55umAauADLu3JeERorG2PhuN5IDahYSsih1qt4n78Q07BpnSQMCUSHnGad12btjKqdJsC73awongicso14u8N59TTcvTeMvnl2vN1dniXpBl/aRafYSfMnSuRDR/sdyKxF/feOBUaqQup+7g/8n/LjoxX1N4gHnxYcoDEBMhICZELJgqnAkSzZNJ/HFxAmYEinU8+hhr1JgFAkl/VV/HgaYTdwprceB1VJIohfa8veyIt4rDSdiYc4TKMoP9VmaZBtYGfbABXFrVURF26AqWS6Q4g+J8J4Y1q04rY1VxYuJsASOWgLG6m6yjyKNq1fL5D8Pd66wtm5cnX3TFrCil0JCvqIdRPj8aw/cSWdPnu8Vwi+IblLpdmgrDDTVrYbl4FUjdv4bb4DiD1jp6gW3C2fP50pUr03fXpUTOTE0CFuj6hHT79Lm0qCURvc2jBugmbqxirN67x4ys37cjZeOSOD0QLshOYG3XNFLYnh9QdvTPlTEq8REj4XH53vqbN9wUvr6A2qfZcZtgT0PovTc8T+iCneiC9TxtP64Wf4tuKLYzLiAMg8uA5ukky6CzPcFSvg0CHmdwG7rjQTvZcZWZZrCpaXvLfTQ5AODMTDOopcsn8MPHYHncEdMekeYoaTpHa6eUGJ7qFzK7l4y7aDPAs8YB7rjPJRUVQ9ZcoHD5wpFp5V10Oxzc3v/+cdFMTDoTY08yCdtK2vMUJxsouML6/9z2TKFR8fKFaw+WlrXIlWSU36ZDph1cXk5T693IwvXOIENeS08l9TXfmmvx7UdyPq6B1VwuuTrfzmnyvDpsy8Il2Urx8kM9aky8bL/7TsVL0vRz6a7KMtSp17Cih2Z/32yJAFKoSvq9J4x1/bDS/euVWZGNWJqnhn/FlByip7iYRGmmzmKLgAvyZQ64IF+8drsYOHLzS5VmqUHJFqnG07Knw94qv6YarT5eTMRqjqw2VYvXWwn5ivwfImT85Xctxv/Li7U2TRo/DH/XsAZwRXrMAc79HmLRup+c2xT0q/zswZRKGrycQRi/gQJX6SPhszHWQt6/fA9uAFg+0Z7M3W0suBs9v9wI16Sqsv5+xXbYRPD/CQAA//8NHQ2M" } diff --git a/metricbeat/module/kafka/module.yml b/metricbeat/module/kafka/module.yml new file mode 100644 index 000000000000..91621d292096 --- /dev/null +++ b/metricbeat/module/kafka/module.yml @@ -0,0 +1,5 @@ +name: kafka +metricsets: +- broker +- producer +- consumer diff --git a/metricbeat/module/kafka/partition/partition_integration_test.go b/metricbeat/module/kafka/partition/partition_integration_test.go index 4ee53002884a..af5b0f72ea5b 100644 --- a/metricbeat/module/kafka/partition/partition_integration_test.go +++ b/metricbeat/module/kafka/partition/partition_integration_test.go @@ -44,7 +44,7 @@ const ( func TestData(t *testing.T) { service := compose.EnsureUp(t, "kafka", - compose.UpWithTimeout(120*time.Second), + compose.UpWithTimeout(600*time.Second), compose.UpWithAdvertisedHostEnvFileForPort(9092), ) @@ -60,7 +60,7 @@ func TestData(t *testing.T) { func TestTopic(t *testing.T) { service := compose.EnsureUp(t, "kafka", - compose.UpWithTimeout(120*time.Second), + compose.UpWithTimeout(600*time.Second), compose.UpWithAdvertisedHostEnvFileForPort(9092), ) diff --git a/metricbeat/module/kafka/producer/_meta/data.json b/metricbeat/module/kafka/producer/_meta/data.json new file mode 100644 index 000000000000..77273c67d7f0 --- /dev/null +++ b/metricbeat/module/kafka/producer/_meta/data.json @@ -0,0 +1,60 @@ +{ + "@timestamp": "2019-10-31T10:19:14.758Z", + "@metadata": { + "beat": "metricbeat", + "type": "_doc", + "version": "8.0.0" + }, + "metricset": { + "name": "producer", + "period": 10000 + }, + "ecs": { + "version": "1.2.0" + }, + "host": { + "os": { + "name": "Mac OS X", + "kernel": "18.7.0", + "build": "18G95", + "platform": "darwin", + "version": "10.14.6", + "family": "darwin" + }, + "id": "883134FF-0EC4-5E1B-9F9E-FD06FB681D84", + "hostname": "abc.local", + "name": "abc.local", + "architecture": "x86_64" + }, + "agent": { + "type": "metricbeat", + "ephemeral_id": "b95327e7-2737-4262-a1a6-ab8547fc8c8d", + "hostname": "abc.local", + "id": "79dd1677-1bea-4efd-9131-e8ca464eddf0", + "version": "8.0.0" + }, + "service": { + "address": "localhost:8775", + "type": "kafka" + }, + "event": { + "dataset": "kafka.producer", + "module": "kafka", + "duration": 4485726 + }, + "kafka": { + "producer": { + "response_rate": 0, + "request_rate": 0, + "record_send_rate": 0, + "batch_size_avg": 0, + "record_size_avg": 0, + "record_retry_rate": 0, + "records_per_request": 0, + "io_wait": 1.2487715219630156e+07, + "mbean": "kafka.producer:client-id=console-producer,type=producer-metrics", + "available_buffer_bytes": 0, + "record_error_rate": 737.5234685412391 + } + } +} diff --git a/metricbeat/module/kafka/producer/_meta/docs.asciidoc b/metricbeat/module/kafka/producer/_meta/docs.asciidoc new file mode 100644 index 000000000000..29bd793c708a --- /dev/null +++ b/metricbeat/module/kafka/producer/_meta/docs.asciidoc @@ -0,0 +1,20 @@ +This metricset periodically fetches JMX metrics from Kafka Producers implemented in java and expose JMX metrics through jolokia agent. + +[float] +=== Compatibility +The module has been tested with Kafka 2.1.1. Other versions are expected to work. + +[float] +=== Usage +The Producer metricset requires <>to fetch JMX metrics. Refer to the link for more information about Jolokia. + +Note that the Jolokia agent is required to be deployed along with the JVM application. This can be achieved by +using the `KAFKA_OPTS` environment variable when starting the Kafka producer application: + +[source,shell] +---- +export KAFKA_OPTS=-javaagent:/opt/jolokia-jvm-1.5.0-agent.jar=port=8775,host=localhost +./bin/kafka-console-producer.sh --topic test --broker-list localhost:9091 +---- + +Then it will be possible to collect the JMX metrics from `localhost:8775`. diff --git a/metricbeat/module/kafka/producer/_meta/fields.yml b/metricbeat/module/kafka/producer/_meta/fields.yml new file mode 100644 index 000000000000..1edd723e0a4b --- /dev/null +++ b/metricbeat/module/kafka/producer/_meta/fields.yml @@ -0,0 +1,47 @@ +- name: producer + type: group + description: Producer metrics from Kafka Producer JMX + release: beta + fields: + - name: mbean + description: Mbean that this event is related to + type: keyword + - name: available_buffer_bytes + description: The total amount of buffer memory + type: float + - name: batch_size_avg + description: The average number of bytes sent + type: float + - name: batch_size_max + description: The maximum number of bytes sent + type: long + - name: record_send_rate + description: The average number of records sent per second + type: float + - name: record_retry_rate + description: The average number of retried record sends per second + type: float + - name: record_error_rate + description: The average number of retried record sends per second + type: float + - name: records_per_request + description: The average number of records sent per second + type: float + - name: record_size_avg + description: The average record size + type: float + - name: record_size_max + description: The maximum record size + type: long + - name: request_rate + description: The number of producer requests per second + type: float + - name: response_rate + description: The number of producer responses per second + type: float + - name: io_wait + description: The producer I/O wait time + type: float + - name: bytes_out + description: The rate of bytes going out for the producer + type: float diff --git a/metricbeat/module/kafka/producer/manifest.yml b/metricbeat/module/kafka/producer/manifest.yml new file mode 100644 index 000000000000..9f8af3452f9b --- /dev/null +++ b/metricbeat/module/kafka/producer/manifest.yml @@ -0,0 +1,39 @@ +default: true +input: + module: jolokia + metricset: jmx + defaults: + namespace: "producer" + hosts: ["localhost:8775"] + path: "/jolokia/?ignoreErrors=true&canonicalNaming=false" + jmx.mappings: + - mbean: 'kafka.producer:type=producer-metrics,client-id=*' + attributes: + - attr: buffer-available-bytes + field: available_buffer_bytes + - attr: batch-size-avg + field: batch_size_avg + - attr: batch-size-max + field: batch_size_max + - attr: record-send-rate + field: record_send_rate + - attr: record-retry-rate + field: record_retry_rate + - attr: record-error-rate + field: record_error_rate + - attr: records-per-request-avg + field: records_per_request + - attr: record-size-avg + field: record_size_avg + - attr: record-size-max + field: record_size_max + - attr: request-rate + field: request_rate + - attr: response-rate + field: response_rate + - attr: io-wait-time-ns-avg + field: io_wait + - mbean: 'kafka.producer:client-id=console-producer,node-id=*,type=producer-node-metrics' + attributes: + - attr: outgoing-byte-total + field: bytes_out diff --git a/metricbeat/modules.d/kafka.yml.disabled b/metricbeat/modules.d/kafka.yml.disabled index 1d65b8f913a0..daf5e35cb412 100644 --- a/metricbeat/modules.d/kafka.yml.disabled +++ b/metricbeat/modules.d/kafka.yml.disabled @@ -1,6 +1,7 @@ # Module: kafka # Docs: https://www.elastic.co/guide/en/beats/metricbeat/7.x/metricbeat-module-kafka.html +# Kafka metrics collected using the Kafka protocol - module: kafka #metricsets: # - partition @@ -28,3 +29,24 @@ # SASL authentication #username: "" #password: "" + +# Metrics collected from a Kafka broker using Jolokia +#- module: kafka +# metricsets: +# - broker +# period: 10s +# hosts: ["localhost:8779"] + +# Metrics collected from a Java Kafka consumer using Jolokia +#- module: kafka +# metricsets: +# - consumer +# period: 10s +# hosts: ["localhost:8774"] + +# Metrics collected from a Java Kafka producer using Jolokia +#- module: kafka +# metricsets: +# - producer +# period: 10s +# hosts: ["localhost:8775"] diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index b0670dda2ed9..1c66d148ba70 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -476,11 +476,13 @@ metricbeat.modules: jmx.instance: #-------------------------------- Kafka Module -------------------------------- +# Kafka metrics collected using the Kafka protocol - module: kafka - metricsets: ["consumergroup", "partition"] + #metricsets: + # - partition + # - consumergroup period: 10s hosts: ["localhost:9092"] - enabled: true #client_id: metricbeat #retries: 3 @@ -503,6 +505,27 @@ metricbeat.modules: #username: "" #password: "" +# Metrics collected from a Kafka broker using Jolokia +#- module: kafka +# metricsets: +# - broker +# period: 10s +# hosts: ["localhost:8779"] + +# Metrics collected from a Java Kafka consumer using Jolokia +#- module: kafka +# metricsets: +# - consumer +# period: 10s +# hosts: ["localhost:8774"] + +# Metrics collected from a Java Kafka producer using Jolokia +#- module: kafka +# metricsets: +# - producer +# period: 10s +# hosts: ["localhost:8775"] + #-------------------------------- Kibana Module -------------------------------- - module: kibana metricsets: ["status"]