From d47fdab9ee1ad23c24b2006ba670ae93dc61bcc9 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Wed, 1 Mar 2023 18:34:34 -0500 Subject: [PATCH] Implement get_partitions_for_topic --- .../kafka_consumer/client/confluent_kafka_client.py | 5 ++++- .../kafka_consumer/client/generic_kafka_client.py | 8 +++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py index 09ad7b1b44823..f61f6c2762288 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py @@ -35,7 +35,10 @@ def reset_offsets(self): raise NotImplementedError def get_partitions_for_topic(self, topic): - raise NotImplementedError + cluster_metadata = self.kafka_client.list_topics(topic) + topics = cluster_metadata.topics + partitions = list(topics[topic].partitions.keys()) + return partitions or [] def request_metadata_update(self): raise NotImplementedError diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py index e186877eed599..8a969c212b5c9 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py @@ -46,11 +46,9 @@ def reset_offsets(self): return self.python_kafka_client.reset_offsets() def get_partitions_for_topic(self, topic): - # TODO when this method is implemented in ConfluentKafkaClient, replace this with: - # if self.use_legacy_client: - # return self.python_kafka_client.get_partitions_for_topic(topic) - # return self.confluent_kafka_client.get_partitions_for_topic(topic) - return self.python_kafka_client.get_partitions_for_topic(topic) + if self.use_legacy_client: + return self.python_kafka_client.get_partitions_for_topic(topic) + return self.confluent_kafka_client.get_partitions_for_topic(topic) def request_metadata_update(self): # TODO when this method is implemented in ConfluentKafkaClient, replace this with: