Skip to content

Commit

Permalink
Implement get_partitions_for_topic
Browse files Browse the repository at this point in the history
  • Loading branch information
yzhan289 committed Mar 3, 2023
1 parent 74e3b19 commit 966b3cf
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def reset_offsets(self):
raise NotImplementedError

def get_partitions_for_topic(self, topic):
raise NotImplementedError
cluster_metadata = self.kafka_client.list_topics(topic)
topics = cluster_metadata.topics
partitions = list(topics[topic].partitions.keys())
return partitions or []

def request_metadata_update(self):
raise NotImplementedError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ def reset_offsets(self):
return self.python_kafka_client.reset_offsets()

def get_partitions_for_topic(self, topic):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.get_partitions_for_topic(topic)
# return self.confluent_kafka_client.get_partitions_for_topic(topic)
return self.python_kafka_client.get_partitions_for_topic(topic)
if self.use_legacy_client:
return self.python_kafka_client.get_partitions_for_topic(topic)
return self.confluent_kafka_client.get_partitions_for_topic(topic)

def request_metadata_update(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
Expand Down

0 comments on commit 966b3cf

Please sign in to comment.