diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py index 7b18f6768b530..e78e932bebdee 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py @@ -75,7 +75,8 @@ def get_partitions_for_topic(self, topic): return None def request_metadata_update(self): - raise NotImplementedError + # https://github.com/confluentinc/confluent-kafka-python/issues/594 + self.kafka_client.list_topics(None, timeout=self.config._request_timeout_ms / 1000) def get_consumer_offsets(self): # {(consumer_group, topic, partition): offset} diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py index d31fc2505208f..9398a91e9792c 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py @@ -42,11 +42,9 @@ def get_partitions_for_topic(self, topic): return self.confluent_kafka_client.get_partitions_for_topic(topic) def request_metadata_update(self): - # TODO when this method is implemented in ConfluentKafkaClient, replace this with: - # if self.use_legacy_client: - # return self.python_kafka_client.request_metadata_update() - # return self.confluent_kafka_client.request_metadata_update() - return self.python_kafka_client.request_metadata_update() + if self.use_legacy_client: + return self.python_kafka_client.request_metadata_update() + return self.confluent_kafka_client.request_metadata_update() def get_consumer_offsets_dict(self): if self.use_legacy_client: