Skip to content

Commit

Permalink
Merge pull request #782 from Aiven-Open/matyaskuti/confluent_kafka_as…
Browse files Browse the repository at this point in the history
…ync_consumer

Implement async confluent-kafka consumer
  • Loading branch information
jjaakola-aiven authored Feb 15, 2024
2 parents 201428d + b17003f commit 2ff823b
Show file tree
Hide file tree
Showing 14 changed files with 752 additions and 147 deletions.
29 changes: 23 additions & 6 deletions karapace/kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
from collections.abc import Iterable
from concurrent.futures import Future
from confluent_kafka.error import KafkaError, KafkaException
from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError
from kafka.errors import (
AuthenticationFailedError,
for_code,
IllegalStateError,
KafkaTimeoutError,
NoBrokersAvailable,
UnknownTopicOrPartitionError,
)
from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar
from typing_extensions import Unpack

Expand Down Expand Up @@ -47,6 +54,10 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception:
KafkaError._UNKNOWN_TOPIC, # pylint: disable=protected-access
):
return UnknownTopicOrPartitionError()
if code == KafkaError._TIMED_OUT: # pylint: disable=protected-access
return KafkaTimeoutError()
if code == KafkaError._STATE: # pylint: disable=protected-access
return IllegalStateError()

return for_code(code)

Expand Down Expand Up @@ -89,12 +100,15 @@ class KafkaClientParams(TypedDict, total=False):
ssl_crlfile: str | None
ssl_keyfile: str | None
sasl_oauth_token_provider: TokenWithExpiryProvider
topic_metadata_refresh_interval_ms: int | None
# Consumer-only
auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"]
enable_auto_commit: bool
fetch_max_wait_ms: int
group_id: str
session_timeout_ms: int
auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"] | None
enable_auto_commit: bool | None
fetch_min_bytes: int | None
fetch_message_max_bytes: int | None
fetch_max_wait_ms: int | None
group_id: str | None
session_timeout_ms: int | None


class _KafkaConfigMixin:
Expand Down Expand Up @@ -142,10 +156,13 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para
"ssl.certificate.location": params.get("ssl_certfile"),
"ssl.crl.location": params.get("ssl_crlfile"),
"ssl.key.location": params.get("ssl_keyfile"),
"topic.metadata.refresh.interval.ms": params.get("topic_metadata_refresh_interval_ms"),
"error_cb": self._error_callback,
# Consumer-only
"auto.offset.reset": params.get("auto_offset_reset"),
"enable.auto.commit": params.get("enable_auto_commit"),
"fetch.min.bytes": params.get("fetch_min_bytes"),
"fetch.message.max.bytes": params.get("fetch_message_max_bytes"),
"fetch.wait.max.ms": params.get("fetch_max_wait_ms"),
"group.id": params.get("group_id"),
"session.timeout.ms": params.get("session_timeout_ms"),
Expand Down
220 changes: 215 additions & 5 deletions karapace/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@

from __future__ import annotations

from confluent_kafka import Consumer, TopicPartition
from confluent_kafka import Consumer, Message, TopicPartition
from confluent_kafka.admin import PartitionMetadata
from confluent_kafka.error import KafkaException
from kafka.errors import KafkaTimeoutError
from kafka.errors import IllegalStateError, KafkaTimeoutError
from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception
from typing import Iterable
from typing import Any, Callable, Iterable, TypeVar
from typing_extensions import Unpack

import asyncio
import secrets


class KafkaConsumer(_KafkaConfigMixin, Consumer):
def __init__(
self,
topic: str,
bootstrap_servers: Iterable[str] | str,
topic: str | None = None,
verify_connection: bool = True,
**params: Unpack[KafkaClientParams],
) -> None:
Expand All @@ -32,7 +33,9 @@ def __init__(

super().__init__(bootstrap_servers, verify_connection, **params)

self.subscribe([topic])
self._subscription: frozenset[str] = frozenset()
if topic is not None:
self.subscribe([topic])

@staticmethod
def _create_group_id() -> str:
Expand Down Expand Up @@ -66,3 +69,210 @@ def get_watermark_offsets(
return result
except KafkaException as exc:
raise_from_kafkaexception(exc)

def commit( # type: ignore[override]
self,
message: Message | None = None,
offsets: list[TopicPartition] | None = None,
) -> list[TopicPartition] | None:
"""Commit offsets based on a message or offsets (topic partitions).
The `message` and `offsets` parameters are mutually exclusive.
"""
if message is not None and offsets is not None:
raise ValueError("Parameters message and offsets are mutually exclusive.")

try:
if message is not None:
return super().commit(message=message, asynchronous=False)

return super().commit(offsets=offsets, asynchronous=False)
except KafkaException as exc:
raise_from_kafkaexception(exc)

def committed(self, partitions: list[TopicPartition], timeout: float | None = None) -> list[TopicPartition]:
try:
if timeout is not None:
return super().committed(partitions, timeout)

return super().committed(partitions)
except KafkaException as exc:
raise_from_kafkaexception(exc)

def subscribe( # type: ignore[override]
self,
topics: list[str] | None = None,
patterns: list[str] | None = None,
) -> None:
"""Subscribe to a list of topics and/or topic patterns.
Subscriptions are not incremental.
For `Consumer.subscribe`, Topic patterns must start with "^", eg.
"^this-is-a-regex-[0-9]", thus we prefix all strings in the `patterns`
list with "^".
The `on_assign` and `on_revoke` callbacks are set to keep track of
subscriptions (topics).
More in the confluent-kafka documentation:
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.subscribe
"""
topics = topics or []
patterns = patterns or []
self.log.info("Subscribing to topics %s and patterns %s", topics, patterns)
if self._subscription:
self.log.warning("Overriding existing subscription: %s", self.subscription())
try:
super().subscribe(
topics + [f"^{pattern}" for pattern in patterns],
on_assign=self._on_assign,
on_revoke=self._on_revoke,
)
except KafkaException as exc:
raise_from_kafkaexception(exc)

# Prefill the subscription set with fixed topic names, so only topic
# pattern updates have the need to wait for the callback.
self._subscription = frozenset(topics)

def _on_assign(self, _consumer: Consumer, partitions: list[TopicPartition]) -> None:
topics = frozenset(partition.topic for partition in partitions)
self._subscription = self._subscription.union(topics)

def _on_revoke(self, _consumer: Consumer, partitions: list[TopicPartition]) -> None:
topics = frozenset(partition.topic for partition in partitions)
self._subscription = self._subscription.difference(topics)

def subscription(self) -> frozenset[str]:
"""Returns the list of topic names the consumer is subscribed to.
The topic list is maintained by the `_on_assign` and `_on_revoke` callback
methods, which are set in `subscribe`. These callbacks are only called
when `poll` is called.
"""
return self._subscription

def unsubscribe(self) -> None:
try:
super().unsubscribe()
except KafkaException as exc:
raise_from_kafkaexception(exc)

self._subscription = frozenset()

def assign(self, partitions: list[TopicPartition]) -> None:
"""Assign a list of topic partitions to the consumer.
Raises an `IllegalStateError` if `subscribe` has been previously called.
This is partly to match previous behaviour from `aiokafka`, but more
importantly to make sure we do not eventually reset the consumer by
calling `assign` after `subscribe` for the same topic - which would
result in the consumer starting from the beginning of a topic after an
unspecified time.
"""
if self._subscription:
raise IllegalStateError

try:
super().assign(partitions)
except KafkaException as exc:
raise_from_kafkaexception(exc)

def assignment(self) -> list[TopicPartition]:
try:
return super().assignment()
except KafkaException as exc:
raise_from_kafkaexception(exc)

def seek(self, partition: TopicPartition) -> None:
try:
super().seek(partition)
except KafkaException as exc:
raise_from_kafkaexception(exc)


T = TypeVar("T")


class AsyncKafkaConsumer:
"""An async wrapper around `KafkaConsumer` built on confluent-kafka.
Async methods are ran in the event loop's executor. Calling `start`
instantiates the underlying `KafkaConsumer`.
"""

_START_ERROR: str = "Async consumer must be started"

def __init__(
self,
bootstrap_servers: Iterable[str] | str,
topic: str | None = None,
loop: asyncio.AbstractEventLoop | None = None,
**params: Unpack[KafkaClientParams],
) -> None:
self.loop = loop or asyncio.get_running_loop()

self.consumer: KafkaConsumer | None = None
self._bootstrap_servers = bootstrap_servers
self._topic = topic
self._consumer_params = params

async def _run_in_executor(self, func: Callable[..., T], *args: Any) -> T:
return await self.loop.run_in_executor(None, func, *args)

def _start(self) -> None:
self.consumer = KafkaConsumer(
self._bootstrap_servers,
topic=self._topic,
**self._consumer_params,
)

async def start(self) -> None:
await self._run_in_executor(self._start)

async def poll(self, timeout: float) -> Message | None:
assert self.consumer is not None, self._START_ERROR
return await self._run_in_executor(self.consumer.poll, timeout)

async def commit(
self,
message: Message | None = None,
offsets: list[TopicPartition] | None = None,
) -> list[TopicPartition] | None:
assert self.consumer is not None, self._START_ERROR
return await self._run_in_executor(self.consumer.commit, message, offsets)

async def committed(self, partitions: list[TopicPartition], timeout: float | None = None) -> list[TopicPartition]:
assert self.consumer is not None, self._START_ERROR
return await self._run_in_executor(self.consumer.committed, partitions, timeout)

async def subscribe(self, topics: list[str] | None = None, patterns: list[str] | None = None) -> None:
assert self.consumer is not None, self._START_ERROR
return await self._run_in_executor(self.consumer.subscribe, topics, patterns)

def subscription(self) -> frozenset[str]:
assert self.consumer is not None, self._START_ERROR
return self.consumer.subscription()

async def unsubscribe(self) -> None:
assert self.consumer is not None, self._START_ERROR
return await self._run_in_executor(self.consumer.unsubscribe)

async def assign(self, partitions: list[TopicPartition]) -> None:
assert self.consumer is not None, self._START_ERROR
return await self._run_in_executor(self.consumer.assign, partitions)

async def assignment(self) -> list[TopicPartition]:
assert self.consumer is not None, self._START_ERROR
return await self._run_in_executor(self.consumer.assignment)

async def seek(self, partition: TopicPartition) -> None:
assert self.consumer is not None, self._START_ERROR
return await self._run_in_executor(self.consumer.seek, partition)

async def stop(self) -> None:
assert self.consumer is not None, self._START_ERROR
# After the `KafkaConsumer` is closed, there is no further action to
# be taken, as it has its own checks and errors are raised if a closed
# consumer is tried to be used
return await self._run_in_executor(self.consumer.close)
Loading

0 comments on commit 2ff823b

Please sign in to comment.