diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py index 00237cc8eff0f..7d3a8646b84ee 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py @@ -1,6 +1,7 @@ # (C) Datadog, Inc. 2023-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +from confluent_kafka import KafkaException from confluent_kafka.admin import AdminClient from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient @@ -35,7 +36,15 @@ def reset_offsets(self): raise NotImplementedError def get_partitions_for_topic(self, topic): - raise NotImplementedError + + try: + cluster_metadata = self.kafka_client.list_topics(topic) + topic_metadata = cluster_metadata.topics[topic] + partitions = list(topic_metadata.partitions.keys()) + return partitions + except KafkaException as e: + self.log.error("Received exception when getting partitions for topic %s: %s", topic, e) + return None def request_metadata_update(self): raise NotImplementedError diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py index 109b3b2e2b158..836f949d78df8 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py @@ -46,11 +46,9 @@ def reset_offsets(self): return self.python_kafka_client.reset_offsets() def get_partitions_for_topic(self, topic): - # TODO when this method is implemented in ConfluentKafkaClient, replace this with: - # if self.use_legacy_client: - # return self.python_kafka_client.get_partitions_for_topic(topic) - # return self.confluent_kafka_client.get_partitions_for_topic(topic) - return self.python_kafka_client.get_partitions_for_topic(topic) + if self.use_legacy_client: + return self.python_kafka_client.get_partitions_for_topic(topic) + return self.confluent_kafka_client.get_partitions_for_topic(topic) def request_metadata_update(self): # TODO when this method is implemented in ConfluentKafkaClient, replace this with: