Skip to content

Kafka_zookeeper_commands

kaliraja edited this page Feb 15, 2021 · 20 revisions

To check the Kafka broker cluster status

Use the below command to check the available broker nodes in the Kafka cluster. If you are running this command to check the 3 node Kafka cluster then the output will be [1, 2, 3] when all 3 nodes are in cluster.

/opt/kafka/bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

To list the topics

/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

To run the console consumer

/opt/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic <topic name> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic ntpprod.learning.events.failed --from-beginning

To get the topics offset

/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic name> --time -1

To describe the topic

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic <topic name>

To alter the topic or To increase the partion

/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic <topic name> --partitions 6

To reassign the partion or increase the replication of topics

/opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/sample.json --execute /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/sample.json --verify

To consume the events from specific offset

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition <partition number> --offset <offset number> --topic <topic name>

Load the data to specific topic

/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topic name> < pipeline_test_data.json

To describe the consumer group

/opt/kafka/bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group <group name>

TO update the retention hours

/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic <topic name> --config retention.ms=<>

TO update the topic max.message.bytes setting

/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic <topic name> --config max.message.bytes=<>

To delete the topics from the zookeeper level forcefully

Note: Not advisable to execute this command in production.

rmr /brokers/topics/{topic_name}
rmr /admin/delete_topics/{topic_name}
OR
sudo ./zookeeper-shell.sh localhost:2181 rmr /brokers/topics/your_topic

To update the retention period

/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic <topicname> --config retention.ms=0

To increase the replication factor for topic partition or partition reassignment

NOTE: These steps can be used when we are scaling the Kafka cluster and want to reassign the partitions to the new node

  • Describe the topic for which you want to increase the replication factor and check the replication factor count by using the below command.

    • /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic <topic name>
  • Keep the below JSON in one file(This file will be used in next step) and fill out the topic name, partition number and replicas based on requirement.

{
  "version": 1, 
  "partitions":[
     {"topic":"ntpprod.telemetry.ingest","partition":0,"replicas":[3,1]},
     {"topic":"ntpprod.telemetry.ingest","partition":1,"replicas":[1,2]},
     {"topic":"ntpprod.telemetry.ingest","partition":2,"replicas":[2,3]},
     {"topic":"ntpprod.telemetry.ingest","partition":3,"replicas":[3,1]}
  ]
}
  • Execute the below command in one of the kafka broker to start the partition reassignment.

    • /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file <json-file-name> --execute
  • Check the status of partition reassignment by using below command.

    • /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file <json-file-name> --verify
  • You will see the output of above command as below if the partition reassignment is completed successfully.

Status of partition reassignment: 
Reassignment of partition env.telemetry.ingest-0 completed successfully
Reassignment of partition env.telemetry.ingest-1 completed successfully
Reassignment of partition env.telemetry.ingest-2 completed successfully
Reassignment of partition env.telemetry.ingest-3 completed successfully
Clone this wiki locally