Skip to content

Kafka_zookeeper_commands

kaliraja edited this page Feb 16, 2021 · 20 revisions

To check the Kafka broker cluster status

Use the below command to check the available broker nodes in the Kafka cluster. If you are running this command to check the 3 node Kafka cluster then the output will be [1, 2, 3] when all 3 nodes are in cluster.

/opt/kafka/bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

To list the topics

/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

To run the console consumer

/opt/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic <topic name> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic ntpprod.learning.events.failed --from-beginning

To get the topics offset

/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic name> --time -1

To describe the topic

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic <topic name>

To alter the topic or To increase the partion

/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic <topic name> --partitions 6

To reassign the partion or increase the replication of topics

/opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/sample.json --execute /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/sample.json --verify

To consume the events from specific offset

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition <partition number> --offset <offset number> --topic <topic name>

Load the data to specific topic

/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topic name> < pipeline_test_data.json

To describe the consumer group

/opt/kafka/bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group <group name>

TO update the retention hours

/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic <topic name> --config retention.ms=<>

TO update the topic max.message.bytes setting

/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic <topic name> --config max.message.bytes=<>

To delete the topics from the zookeeper level forcefully

Note: Not advisable to execute this command in production.

rmr /brokers/topics/{topic_name}
rmr /admin/delete_topics/{topic_name}
OR
sudo ./zookeeper-shell.sh localhost:2181 rmr /brokers/topics/your_topic

To update the retention period

/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic <topicname> --config retention.ms=0

To increase the replication factor for topic partition or partition reassignment

NOTE: These steps can be used when we are scaling the Kafka cluster and want to reassign the partitions to the new node

  • Describe the topic for which you want to increase the replication factor and check the replication factor count by using the below command.

    • /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic <topic name>
  • Keep the below JSON in one file(This file will be used in the next step) and fill out the topic name, partition number, and replicas based on the requirement.

{
  "version": 1, 
  "partitions":[
     {"topic":"ntpprod.telemetry.ingest","partition":0,"replicas":[3,1]},
     {"topic":"ntpprod.telemetry.ingest","partition":1,"replicas":[1,2]},
     {"topic":"ntpprod.telemetry.ingest","partition":2,"replicas":[2,3]},
     {"topic":"ntpprod.telemetry.ingest","partition":3,"replicas":[3,1]}
  ]
}
  • Execute the below command in one of the Kafka brokers to start the partition reassignment.

    • /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file <json-file-name> --execute
  • Check the status of partition reassignment by using the below command.

    • /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file <json-file-name> --verify
  • You will see the output of the above command as below if the partition reassignment is completed successfully.

Status of partition reassignment: 
Reassignment of partition env.telemetry.ingest-0 completed successfully
Reassignment of partition env.telemetry.ingest-1 completed successfully
Reassignment of partition env.telemetry.ingest-2 completed successfully
Reassignment of partition env.telemetry.ingest-3 completed successfully

To find the big size events in a specific topic:

  • Describe the topic to get how many partitions available.
  • Get the current offset for the failed flink job by describing the consumer group of that job. Get the consumer group name from the values.j2 file from flink jobs deployment helm chart. Take the current offset from the CURRENT-OFFSET column from the output for each partition and use it in the below script.
#!/usr/bin/python
import sys
reload(sys)
sys.setdefaultencoding('utf8')
​
import json
import os
​
partition_num=14
file_path="/home/kaliraja/extractor_{}.json".format(partition_num)
cnt=0
​
offsets = {
    6: 1094182919,
    7: 1102557342,
    8: 725922918,
    9: 711410778,
    10: 726199863,
    11: 717579325,
    12: 696646219,
    13: 725824982,
    14: 711230106,
    15: 725814842
}
​
kafka_offsets_cmd="/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ntpprod.telemetry.ingest --partition {} --offset {} --max-messages 5000 > /home/anandp/extractor_{}.json".format(partition_num, offsets[partition_num], partition_num)
​
os.system("{}".format(kafka_offsets_cmd))
​
output_file_name = "/home/kaliraja/extractor_error_events_{}.txt".format(partition_num)
os.system("touch {}".format(output_file_name))
output_file = open(output_file_name, "a")
​
def utf8len(string):
    return len(string.encode('utf-8'))
​
with open(file_path) as f:
    for line in f:
        cnt = cnt + 1
        batch_event = json.loads(line)['events']
        for event in batch_event:
            line_size = utf8len(line)
            if(line_size > 4194304):
                output_file.write("{}\t{}".format(cnt, json.dumps(event)))
​
output_file.close()
Clone this wiki locally