diff --git a/core/mq/kafka/kafka_consumer.py b/core/mq/kafka/kafka_consumer.py index 06c5e0b0..c015cf88 100644 --- a/core/mq/kafka/kafka_consumer.py +++ b/core/mq/kafka/kafka_consumer.py @@ -42,21 +42,6 @@ def __init__(self, config: Dict[str, Any], loop: AbstractEventLoop): self._consumer = AIOKafkaConsumer(**conf, loop=loop) loop.run_until_complete(self._consumer.start()) - def on_assign_offset_end(self, consumer: AIOKafkaConsumer, partitions: List[TopicPartition]) -> None: - for p in partitions: - p.offset = consumer.last_stable_offset(p) - self.on_assign_log(consumer, partitions) - try: - consumer.assign(partitions) - except KafkaError as e: - self._error_callback(e) - - def on_coop_assign_offset_end(self, consumer: AIOKafkaConsumer, partitions: List[TopicPartition]) -> None: - for p in partitions: - p.offset = consumer.last_stable_offset(p) - self.on_assign_log(consumer, partitions) - consumer.assign(consumer.assignment().update(partitions)) - def on_assign_log(self, consumer: AIOKafkaConsumer, partitions: List[TopicPartition]) -> None: log_level = "WARNING" params = { @@ -77,19 +62,11 @@ def subscribe(self, topics: Optional[Iterable[str]] = None) -> None: try: self._consumer.subscribe(topics, listener=CoreConsumerRebalanceListener( consumer=self._consumer, - on_assign_callback=(self.get_on_assign_callback() if self.assign_offset_end - else self.on_assign_log) + on_assign_callback=self.on_assign_log )) except KafkaError as e: self._error_callback(e) - def get_on_assign_callback(self) -> Callable[[AIOKafkaConsumer, List[TopicPartition]], None]: - if "cooperative" in self._config["conf"].get("partition_assignment_strategy", ""): - callback = self.on_coop_assign_offset_end - else: - callback = self.on_assign_offset_end - return callback - def unsubscribe(self) -> None: self._consumer.unsubscribe()