Skip to content

Commit

Permalink
PNLP-7915: replace confluent_kafka by aiokafka
Browse files Browse the repository at this point in the history
  • Loading branch information
SyrexMinus committed Oct 27, 2023
1 parent 0a7edd4 commit 54a5726
Show file tree
Hide file tree
Showing 11 changed files with 1,876 additions and 2,008 deletions.
4 changes: 2 additions & 2 deletions core/message/from_message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# coding=utf-8
from typing import Iterable, Dict, Optional, Set, Any, List, Union, Tuple
from typing import Iterable, Dict, Optional, Any, List, Union, Sequence
import json
import uuid

Expand Down Expand Up @@ -57,7 +57,7 @@ class SmartAppFromMessage:
uuid: dict

def __init__(self, value: Dict[str, Any], topic_key: str = None, creation_time: Optional[int] = None,
kafka_key: Optional[str] = None, headers: Optional[Iterable[Tuple[Any, Any]]] = None,
kafka_key: Optional[str] = None, headers: Optional[Sequence[tuple[str, bytes]]] = None,
masking_fields: Optional[Union[Dict[str, int], List[str]]] = None, headers_required: bool = True,
validators: Iterable[MessageValidator] = ()):
self.logging_uuid = str(uuid.uuid4())
Expand Down
66 changes: 0 additions & 66 deletions core/mq/kafka/async_kafka_publisher.py

This file was deleted.

20 changes: 20 additions & 0 deletions core/mq/kafka/consumer_rebalance_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from aiokafka.abc import ConsumerRebalanceListener

if TYPE_CHECKING:
from typing import List, Callable
from kafka import TopicPartition
from aiokafka import AIOKafkaConsumer


class CoreConsumerRebalanceListener(ConsumerRebalanceListener):
def __init__(self, consumer: AIOKafkaConsumer,
on_assign_callback: Callable[[AIOKafkaConsumer, List[TopicPartition]], None]):
self._consumer = consumer
self._on_assign_callback = on_assign_callback

def on_partitions_assigned(self, assigned: List[TopicPartition]):
self._on_assign_callback(self._consumer, assigned)
176 changes: 97 additions & 79 deletions core/mq/kafka/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,108 +1,123 @@
# coding: utf-8
from __future__ import annotations

import logging
import os
import time
import uuid
from typing import TYPE_CHECKING

from confluent_kafka import Consumer, TIMESTAMP_NOT_AVAILABLE
from confluent_kafka.cimpl import KafkaError, KafkaException, OFFSET_END, Message as KafkaMessage
from aiokafka import AIOKafkaConsumer, TopicPartition
from kafka.errors import KafkaError

import core.logging.logger_constants as log_const
from core.logging.logger_utils import log
from core.monitoring.monitoring import monitoring
from core.mq.kafka.base_kafka_consumer import BaseKafkaConsumer
from core.mq.kafka.consumer_rebalance_listener import CoreConsumerRebalanceListener

if TYPE_CHECKING:
from aiokafka import ConsumerRecord
from typing import Optional, Callable, Iterable, AsyncGenerator, Any, Dict, List
from asyncio import AbstractEventLoop


class KafkaConsumer(BaseKafkaConsumer):
def __init__(self, config):
def __init__(self, config: Dict[str, Any], loop: AbstractEventLoop):
self._config = config["consumer"]
self.assign_offset_end = self._config.get("assign_offset_end", False)
conf = self._config["conf"]
conf.setdefault("group.id", str(uuid.uuid1()))
self.autocommit_enabled = conf.get("enable.auto.commit", True)
self._update_old_config(conf)
conf.setdefault("group_id", str(uuid.uuid1()))
self.autocommit_enabled = conf.get("enable_auto_commit", True)
internal_log_path = self._config.get("internal_log_path")
conf["error_cb"] = self._error_callback
if internal_log_path:
debug_logger = logging.getLogger("debug_consumer")
debug_logger = logging.getLogger("debug_consumer") # TODO add debug logger to _consumer events
timestamp = time.strftime("_%d%m%Y_")
debug_logger.addHandler(logging.FileHandler(
"{}/kafka_consumer_debug{}{}.log".format(internal_log_path, timestamp, os.getpid())))
conf["logger"] = debug_logger
self._consumer = Consumer(**conf)
"{}/kafka_consumer_debug{}{}.log".format(internal_log_path, timestamp, os.getpid())
))
self._consumer = AIOKafkaConsumer(**conf, loop=loop)
loop.run_until_complete(self._consumer.start())

@staticmethod
def on_assign_offset_end(consumer, partitions):
def on_assign_offset_end(self, consumer: AIOKafkaConsumer, partitions: List[TopicPartition]) -> None:
for p in partitions:
p.offset = OFFSET_END
KafkaConsumer.on_assign_log(consumer, partitions)
consumer.assign(partitions)

@staticmethod
def on_coop_assign_offset_end(consumer, partitions):
p.offset = consumer.last_stable_offset(p)
self.on_assign_log(consumer, partitions)
try:
consumer.assign(partitions)
except KafkaError as e:
self._error_callback(e)

def on_coop_assign_offset_end(self, consumer: AIOKafkaConsumer, partitions: List[TopicPartition]) -> None:
for p in partitions:
p.offset = OFFSET_END
KafkaConsumer.on_assign_log(consumer, partitions)
consumer.incremental_assign(partitions)
p.offset = consumer.last_stable_offset(p)
self.on_assign_log(consumer, partitions)
consumer.incremental_assign(partitions) # TODO support incremental_assign by our own: consume previous assignments and add new one

@staticmethod
def on_assign_log(consumer, partitions):
def on_assign_log(self, consumer: AIOKafkaConsumer, partitions: List[TopicPartition]) -> None:
log_level = "WARNING"
for p in partitions:
if p.error:
log_level = "ERROR"
params = {
"partitions": str(list([str(partition) for partition in partitions or []])),
log_const.KEY_NAME: log_const.KAFKA_ON_ASSIGN_VALUE,
"log_level": log_level
}
log("KafkaConsumer.subscribe<on_assign>: assign %(partitions)s %(log_level)s", params=params, level=log_level)

def subscribe(self, topics=None):
def subscribe(self, topics: Iterable[str] = None) -> None:
topics = list(set(topics or list(self._config["topics"].values())))

params = {
"topics": topics
}
log("Topics to subscribe: %(topics)s", params=params)

self._consumer.subscribe(
topics,
on_assign=self.get_on_assign_callback() if self.assign_offset_end else KafkaConsumer.on_assign_log
)
try:
self._consumer.subscribe(topics, listener=CoreConsumerRebalanceListener(
consumer=self._consumer,
on_assign_callback=(self.get_on_assign_callback() if self.assign_offset_end
else self.on_assign_log)
))
except KafkaError as e:
self._error_callback(e)


def get_on_assign_callback(self):
if "cooperative" in self._config["conf"].get("partition.assignment.strategy", ""):
callback = KafkaConsumer.on_coop_assign_offset_end
def get_on_assign_callback(self) -> Callable[[AIOKafkaConsumer, List[TopicPartition]], None]:
if "cooperative" in self._config["conf"].get("partition_assignment_strategy", ""):
callback = self.on_coop_assign_offset_end
else:
callback = KafkaConsumer.on_assign_offset_end
callback = self.on_assign_offset_end
return callback

def unsubscribe(self):
def unsubscribe(self) -> None:
self._consumer.unsubscribe()

def poll(self):
msg = self._consumer.poll(self._config["poll_timeout"])
if msg is not None:
return self._process_message(msg)
async def poll(self) -> Optional[ConsumerRecord]:
msg = await self._consumer.getone()
return self._process_message(msg)

def consume(self, num_messages: int = 1):
messages = self._consumer.consume(num_messages=num_messages, timeout=self._config["poll_timeout"])
for msg in messages:
yield self._process_message(msg)
async def consume(self, num_messages: int = 1) -> AsyncGenerator[Optional[ConsumerRecord], None]:
timeout_ms = self._config["poll_timeout"] * 1000
messages = await self._consumer.getmany(max_records=num_messages, timeout_ms=timeout_ms)
for partition_messages in messages.values():
for msg in partition_messages:
processed = self._process_message(msg)
yield processed

def commit_offset(self, msg):
async def commit_offset(self, msg: ConsumerRecord) -> None:
if msg is not None:
if self.autocommit_enabled:
self._consumer.store_offsets(msg)
else:
self._consumer.commit(msg, **{"async": True})

def get_msg_create_time(self, mq_message):
timestamp_type, timestamp = mq_message.timestamp()
return timestamp if timestamp_type is not TIMESTAMP_NOT_AVAILABLE else None

def _error_callback(self, err):
if not self.autocommit_enabled:
tp = TopicPartition(msg.topic, msg.partition)
try:
await self._consumer.commit({tp: msg.offset + 1})
except KafkaError as e:
self._error_callback(e)

def get_msg_create_time(self, mq_message: ConsumerRecord) -> int:
timestamp = mq_message.timestamp
return timestamp

def _error_callback(self, err: Any) -> None:
params = {
"error": str(err),
log_const.KEY_NAME: log_const.EXCEPTION_VALUE
Expand All @@ -111,30 +126,33 @@ def _error_callback(self, err):
monitoring.got_counter("kafka_consumer_exception")

# noinspection PyMethodMayBeStatic
def _process_message(self, msg: KafkaMessage):
err = msg.error()
if err:
if err.code() == KafkaError._PARTITION_EOF:
return None
else:
monitoring.got_counter("kafka_consumer_exception")
params = {
"code": err.code(),
"pid": os.getpid(),
"topic": msg.topic(),
"partition": msg.partition(),
log_const.KEY_NAME: log_const.EXCEPTION_VALUE
}
log(
"KafkaConsumer Error %(code)s at pid %(pid)s: topic=%(topic)s partition=[%(partition)s]\n",
params=params, level="WARNING")
raise KafkaException(err)

if msg.value():
if msg.headers() is None:
msg.set_headers([])
def _process_message(self, msg: ConsumerRecord) -> Optional[ConsumerRecord]:
if msg.value:
if msg.headers is None:
msg.headers = list()
return msg

def close(self):
self._consumer.close()
async def close(self) -> None:
await self._consumer.stop()
log(f"consumer to topics {self._config['topics']} closed.")

def _update_old_config(self, conf: Dict[str, Any]) -> None:
if "default.topic.config" in conf:
conf.update(conf["default.topic.config"])
del conf["default.topic.config"]
param_old_to_new = {
"group.id": "group_id",
"enable.auto.commit": "enable_auto_commit",
"partition.assignment.strategy": "partition_assignment_strategy",
"bootstrap.servers": "bootstrap_servers",
"topic.metadata.refresh.interval.ms": "metadata_max_age_ms",
"session.timeout.ms": "session_timeout_ms",
"auto.commit.interval.ms": "auto_commit_interval_ms",
"enable.auto.offset.store": None,
"auto.offset.reset": "auto_offset_reset"
} # TODO map other old configs as well
for old, new in param_old_to_new.items():
if old in conf:
if new:
conf[new] = conf[old]
del conf[old]
Loading

0 comments on commit 54a5726

Please sign in to comment.