Skip to content

Commit

Permalink
Replace sync Kafka Producers with confluent_kafka one
Browse files Browse the repository at this point in the history
This change replaces all synchronous Kafka producers (from the
kafka-python library), with a new implementation based on
confluent-kafka-python's `Producer`. The aim is to keep the same
interface as much as possible.

The change so far doesn't intend to fully remove all references to
kafka-python, most notably developer friendly errors and exceptions are
still coming from kafka-python.

A new `karapace.kafka` module is introduced, splitting the previously
added admin client and the new producer into their own modules.

Resources:
* confluent-kafka-python documentation:
  https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#
* librdkafka configuration documentation:
  https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
Mátyás Kuti committed Nov 29, 2023

Unverified

No user is associated with the committer email.
1 parent 190f1af commit b606514
Showing 31 changed files with 658 additions and 425 deletions.
36 changes: 20 additions & 16 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
@@ -21,25 +21,27 @@
)
from .poll_timeout import PollTimeout
from .topic_configurations import ConfigSource, get_topic_configurations
from concurrent.futures import Future
from enum import Enum
from functools import partial
from kafka import KafkaConsumer, KafkaProducer
from kafka import KafkaConsumer
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import PartitionMetadata, TopicPartition
from kafka.structs import TopicPartition
from karapace import constants
from karapace.backup.backends.v1 import SchemaBackupV1Reader
from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER
from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess
from karapace.config import Config
from karapace.kafka_admin import KafkaAdminClient
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.producer import KafkaProducer
from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config
from karapace.key_format import KeyFormatter
from karapace.utils import assert_never
from pathlib import Path
from rich.console import Console
from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed
from typing import AbstractSet, Callable, Collection, Iterator, Literal, Mapping, NewType, TypeVar
from typing import Callable, Collection, Iterator, Literal, Mapping, NewType, Sized, TypeVar

import contextlib
import datetime
@@ -170,7 +172,7 @@ def before_sleep(it: RetryCallState) -> None:
return before_sleep


def __check_partition_count(topic: str, supplier: Callable[[str], AbstractSet[PartitionMetadata]]) -> None:
def __check_partition_count(topic: str, supplier: Callable[[str], Sized]) -> None:
"""Checks that the given topic has exactly one partition.
:param topic: to check.
@@ -375,7 +377,7 @@ def _handle_restore_topic(
def _handle_producer_send(
instruction: ProducerSend,
producer: KafkaProducer,
producer_error_callback: Callable[[Exception], None],
producer_callback: Callable[[Future], None],
) -> None:
LOG.debug(
"Sending kafka msg key: %r, value: %r",
@@ -388,11 +390,11 @@ def _handle_producer_send(
key=instruction.key,
value=instruction.value,
partition=instruction.partition_index,
headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers],
timestamp_ms=instruction.timestamp,
).add_errback(producer_error_callback)
except (KafkaError, AssertionError) as ex:
raise BackupDataRestorationError("Error while calling send on restoring messages") from ex
headers=[(key.decode(), value) for key, value in instruction.headers if key is not None],
timestamp=instruction.timestamp,
).add_done_callback(producer_callback)
except (KafkaError, AssertionError) as exc:
raise BackupDataRestorationError("Error while calling send on restoring messages") from exc


def restore_backup(
@@ -433,10 +435,12 @@ def restore_backup(
with contextlib.ExitStack() as stack:
producer = None

def _producer_error_callback(exception: Exception) -> None:
LOG.error("Producer error", exc_info=exception)
nonlocal _producer_exception
_producer_exception = exception
def _producer_callback(future: Future) -> None:
exception = future.exception()
if exception is not None:
LOG.error("Producer error", exc_info=exception)
nonlocal _producer_exception
_producer_exception = exception

def _check_producer_exception() -> None:
if _producer_exception is not None:
@@ -452,7 +456,7 @@ def _check_producer_exception() -> None:
elif isinstance(instruction, ProducerSend):
if producer is None:
raise RuntimeError("Backend has not yet sent RestoreTopic.")
_handle_producer_send(instruction, producer, _producer_error_callback)
_handle_producer_send(instruction, producer, _producer_callback)
# Immediately check if producer.send() generated an exception. This call is
# only an optimization, as producing is asynchronous and no sends might
# have been executed once we reach this line.
2 changes: 1 addition & 1 deletion karapace/backup/topic_configurations.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
"""
from __future__ import annotations

from karapace.kafka_admin import ConfigSource, KafkaAdminClient
from karapace.kafka.admin import ConfigSource, KafkaAdminClient
from typing import Container, Final

ALL_CONFIG_SOURCES: Final = ConfigSource
Empty file added karapace/kafka/__init__.py
Empty file.
178 changes: 178 additions & 0 deletions karapace/kafka/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from __future__ import annotations

from collections.abc import Iterable
from concurrent.futures import Future
from confluent_kafka import TopicPartition
from confluent_kafka.admin import (
AdminClient,
BrokerMetadata,
ClusterMetadata,
ConfigResource,
ConfigSource,
NewTopic,
OffsetSpec,
ResourceType,
TopicMetadata,
)
from confluent_kafka.error import KafkaException
from karapace.constants import TOPIC_CREATION_TIMEOUT_S
from karapace.kafka.common import (
_KafkaConfigMixin,
raise_from_kafkaexception,
single_futmap_result,
UnknownTopicOrPartitionError,
)
from typing import Container


class KafkaAdminClient(_KafkaConfigMixin, AdminClient):
def new_topic(
self,
name: str,
*,
num_partitions: int = 1,
replication_factor: int = 1,
config: dict[str, str] | None = None,
request_timeout: float = TOPIC_CREATION_TIMEOUT_S,
) -> NewTopic:
new_topic = NewTopic(
topic=name,
num_partitions=num_partitions,
replication_factor=replication_factor,
config=config if config is not None else {},
)
self.log.info("Creating new topic %s with replication factor %s", new_topic, replication_factor)
futmap: dict[str, Future] = self.create_topics([new_topic], request_timeout=request_timeout)
try:
single_futmap_result(futmap)
return new_topic
except KafkaException as exc:
raise_from_kafkaexception(exc)

def update_topic_config(self, name: str, config: dict[str, str]) -> None:
self.log.info("Updating topic '%s' configuration with %s", name, config)
futmap = self.alter_configs([ConfigResource(ResourceType.TOPIC, name, set_config=config)])
try:
single_futmap_result(futmap)
except KafkaException as exc:
raise_from_kafkaexception(exc)

def delete_topic(self, name: str) -> None:
self.log.info("Deleting topic '%s'", name)
futmap = self.delete_topics([name])
try:
single_futmap_result(futmap)
except KafkaException as exc:
raise_from_kafkaexception(exc)

def cluster_metadata(self, topics: Iterable[str] | None = None) -> dict:
"""Fetch cluster metadata and topic information for given topics or all topics if not given.
Using the `list_topics` method of the `AdminClient`, as this actually provides
metadata for the entire cluster, not just topics, as suggested by the name.
The topics filter is only applied _after_ fetching the cluster metadata,
due to `list_topics` only accepting a single topic as a filter.
"""
self.log.info("Fetching cluster metadata with topic filter: %s", topics)
cluster_metadata: ClusterMetadata = self.list_topics()
topics_metadata: dict[str, TopicMetadata] = cluster_metadata.topics
brokers_metadata: dict[int, BrokerMetadata] = cluster_metadata.brokers

if topics is not None and any(topic not in topics_metadata.keys() for topic in topics):
raise UnknownTopicOrPartitionError()

topics_data: dict[str, dict] = {}
for topic, topic_metadata in topics_metadata.items():
if topics is not None and topic not in topics:
continue

partitions_data = []
for partition_id, partition_metadata in topic_metadata.partitions.items():
partition_data = {
"partition": partition_id,
"leader": partition_metadata.leader,
"replicas": [
{
"broker": replica_id,
"leader": replica_id == partition_metadata.leader,
"in_sync": replica_id in partition_metadata.isrs,
}
for replica_id in partition_metadata.replicas
],
}
partitions_data.append(partition_data)

topics_data[topic] = {"partitions": partitions_data}

return {"topics": topics_data, "brokers": list(brokers_metadata.keys())}

def get_topic_config(
self,
name: str,
config_name_filter: Container[str] | None = None,
config_source_filter: Container[ConfigSource] | None = None,
) -> dict[str, str]:
"""Fetches, filters and returns topic configuration.
The two filters, `config_name_filter` and `config_source_filter` work together
so if a config entry matches either of them, it'll be returned.
If a filter is not provided (ie. is `None`), it'll act as if matching all
config entries.
"""
self.log.info(
"Fetching config for topic '%s' with name filter %s and source filter %s",
name,
config_name_filter,
config_source_filter,
)
futmap: dict[ConfigResource, Future] = self.describe_configs([ConfigResource(ResourceType.TOPIC, name)])
try:
topic_configs = single_futmap_result(futmap)
except KafkaException as exc:
raise_from_kafkaexception(exc)

config: dict[str, str] = {}
for config_name, config_entry in topic_configs.items():
matches_name_filter: bool = config_name_filter is None or config_name in config_name_filter
matches_source_filter: bool = (
config_source_filter is None or ConfigSource(config_entry.source) in config_source_filter
)

if matches_name_filter or matches_source_filter:
config[config_name] = config_entry.value

return config

def get_offsets(self, topic: str, partition_id: int) -> dict[str, int]:
"""Returns the beginning and end offsets for a topic partition.
Making two separate requests for beginning and end offsets, due to the
`AdminClient.list_offsets` behaviour: it expects a dictionary of topic
partitions as keys, thus unable to fetch different values in one request
for the same topic and partition.
"""
try:
self.log.info("Fetching latest offset for topic '%s' partition %s", topic, partition_id)
latest_offset_futmap: dict[TopicPartition, Future] = self.list_offsets(
{
TopicPartition(topic, partition_id): OffsetSpec.latest(),
}
)
endoffset = single_futmap_result(latest_offset_futmap)

self.log.info("Fetching earliest offset for topic '%s' partition %s", topic, partition_id)
earliest_offset_futmap: dict[TopicPartition, Future] = self.list_offsets(
{
TopicPartition(topic, partition_id): OffsetSpec.earliest(),
}
)
startoffset = single_futmap_result(earliest_offset_futmap)
except KafkaException as exc:
raise_from_kafkaexception(exc)
return {"beginning_offset": startoffset.offset, "end_offset": endoffset.offset}
Loading

0 comments on commit b606514

Please sign in to comment.