From b606514604e32349d52ed757f5e0a2ad02851452 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ty=C3=A1s=20Kuti?= Date: Tue, 21 Nov 2023 12:41:17 +0100 Subject: [PATCH] Replace sync Kafka Producers with confluent_kafka one This change replaces all synchronous Kafka producers (from the kafka-python library), with a new implementation based on confluent-kafka-python's `Producer`. The aim is to keep the same interface as much as possible. The change so far doesn't intend to fully remove all references to kafka-python, most notably developer friendly errors and exceptions are still coming from kafka-python. A new `karapace.kafka` module is introduced, splitting the previously added admin client and the new producer into their own modules. Resources: * confluent-kafka-python documentation: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html# * librdkafka configuration documentation: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md --- karapace/backup/api.py | 36 +- karapace/backup/topic_configurations.py | 2 +- karapace/kafka/__init__.py | 0 karapace/kafka/admin.py | 178 ++++++++++ karapace/kafka/common.py | 178 ++++++++++ karapace/kafka/producer.py | 61 ++++ karapace/kafka_admin.py | 307 ------------------ karapace/kafka_rest_apis/__init__.py | 2 +- karapace/kafka_utils.py | 13 +- karapace/messaging.py | 18 +- karapace/schema_reader.py | 2 +- karapace/schema_registry_apis.py | 1 + stubs/confluent_kafka/__init__.pyi | 4 +- stubs/confluent_kafka/admin/__init__.pyi | 5 +- stubs/confluent_kafka/cimpl.pyi | 28 +- .../backup/test_get_topic_configurations.py | 2 +- .../integration/backup/test_legacy_backup.py | 2 +- tests/integration/backup/test_v3_backup.py | 63 ++-- tests/integration/conftest.py | 9 +- tests/integration/kafka/__init__.py | 0 .../test_admin.py} | 9 +- tests/integration/kafka/test_producer.py | 71 ++++ tests/integration/test_rest.py | 7 +- tests/integration/test_rest_consumer.py | 12 +- .../test_rest_consumer_protobuf.py | 2 +- tests/integration/test_schema.py | 16 +- tests/integration/test_schema_reader.py | 28 +- .../integration/test_schema_registry_auth.py | 2 +- tests/integration/utils/kafka_server.py | 2 +- tests/unit/backup/test_api.py | 21 +- tests/utils.py | 2 +- 31 files changed, 658 insertions(+), 425 deletions(-) create mode 100644 karapace/kafka/__init__.py create mode 100644 karapace/kafka/admin.py create mode 100644 karapace/kafka/common.py create mode 100644 karapace/kafka/producer.py delete mode 100644 karapace/kafka_admin.py create mode 100644 tests/integration/kafka/__init__.py rename tests/integration/{test_kafka_admin.py => kafka/test_admin.py} (96%) create mode 100644 tests/integration/kafka/test_producer.py diff --git a/karapace/backup/api.py b/karapace/backup/api.py index b117127f8..04cc8f989 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -21,25 +21,27 @@ ) from .poll_timeout import PollTimeout from .topic_configurations import ConfigSource, get_topic_configurations +from concurrent.futures import Future from enum import Enum from functools import partial -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaConsumer from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import KafkaError, TopicAlreadyExistsError -from kafka.structs import PartitionMetadata, TopicPartition +from kafka.structs import TopicPartition from karapace import constants from karapace.backup.backends.v1 import SchemaBackupV1Reader from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess from karapace.config import Config -from karapace.kafka_admin import KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.producer import KafkaProducer from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config from karapace.key_format import KeyFormatter from karapace.utils import assert_never from pathlib import Path from rich.console import Console from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed -from typing import AbstractSet, Callable, Collection, Iterator, Literal, Mapping, NewType, TypeVar +from typing import Callable, Collection, Iterator, Literal, Mapping, NewType, Sized, TypeVar import contextlib import datetime @@ -170,7 +172,7 @@ def before_sleep(it: RetryCallState) -> None: return before_sleep -def __check_partition_count(topic: str, supplier: Callable[[str], AbstractSet[PartitionMetadata]]) -> None: +def __check_partition_count(topic: str, supplier: Callable[[str], Sized]) -> None: """Checks that the given topic has exactly one partition. :param topic: to check. @@ -375,7 +377,7 @@ def _handle_restore_topic( def _handle_producer_send( instruction: ProducerSend, producer: KafkaProducer, - producer_error_callback: Callable[[Exception], None], + producer_callback: Callable[[Future], None], ) -> None: LOG.debug( "Sending kafka msg key: %r, value: %r", @@ -388,11 +390,11 @@ def _handle_producer_send( key=instruction.key, value=instruction.value, partition=instruction.partition_index, - headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers], - timestamp_ms=instruction.timestamp, - ).add_errback(producer_error_callback) - except (KafkaError, AssertionError) as ex: - raise BackupDataRestorationError("Error while calling send on restoring messages") from ex + headers=[(key.decode(), value) for key, value in instruction.headers if key is not None], + timestamp=instruction.timestamp, + ).add_done_callback(producer_callback) + except (KafkaError, AssertionError) as exc: + raise BackupDataRestorationError("Error while calling send on restoring messages") from exc def restore_backup( @@ -433,10 +435,12 @@ def restore_backup( with contextlib.ExitStack() as stack: producer = None - def _producer_error_callback(exception: Exception) -> None: - LOG.error("Producer error", exc_info=exception) - nonlocal _producer_exception - _producer_exception = exception + def _producer_callback(future: Future) -> None: + exception = future.exception() + if exception is not None: + LOG.error("Producer error", exc_info=exception) + nonlocal _producer_exception + _producer_exception = exception def _check_producer_exception() -> None: if _producer_exception is not None: @@ -452,7 +456,7 @@ def _check_producer_exception() -> None: elif isinstance(instruction, ProducerSend): if producer is None: raise RuntimeError("Backend has not yet sent RestoreTopic.") - _handle_producer_send(instruction, producer, _producer_error_callback) + _handle_producer_send(instruction, producer, _producer_callback) # Immediately check if producer.send() generated an exception. This call is # only an optimization, as producing is asynchronous and no sends might # have been executed once we reach this line. diff --git a/karapace/backup/topic_configurations.py b/karapace/backup/topic_configurations.py index 9a46d5b9a..93b9ceacf 100644 --- a/karapace/backup/topic_configurations.py +++ b/karapace/backup/topic_configurations.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from karapace.kafka_admin import ConfigSource, KafkaAdminClient +from karapace.kafka.admin import ConfigSource, KafkaAdminClient from typing import Container, Final ALL_CONFIG_SOURCES: Final = ConfigSource diff --git a/karapace/kafka/__init__.py b/karapace/kafka/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/karapace/kafka/admin.py b/karapace/kafka/admin.py new file mode 100644 index 000000000..5b9d9e5ad --- /dev/null +++ b/karapace/kafka/admin.py @@ -0,0 +1,178 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from collections.abc import Iterable +from concurrent.futures import Future +from confluent_kafka import TopicPartition +from confluent_kafka.admin import ( + AdminClient, + BrokerMetadata, + ClusterMetadata, + ConfigResource, + ConfigSource, + NewTopic, + OffsetSpec, + ResourceType, + TopicMetadata, +) +from confluent_kafka.error import KafkaException +from karapace.constants import TOPIC_CREATION_TIMEOUT_S +from karapace.kafka.common import ( + _KafkaConfigMixin, + raise_from_kafkaexception, + single_futmap_result, + UnknownTopicOrPartitionError, +) +from typing import Container + + +class KafkaAdminClient(_KafkaConfigMixin, AdminClient): + def new_topic( + self, + name: str, + *, + num_partitions: int = 1, + replication_factor: int = 1, + config: dict[str, str] | None = None, + request_timeout: float = TOPIC_CREATION_TIMEOUT_S, + ) -> NewTopic: + new_topic = NewTopic( + topic=name, + num_partitions=num_partitions, + replication_factor=replication_factor, + config=config if config is not None else {}, + ) + self.log.info("Creating new topic %s with replication factor %s", new_topic, replication_factor) + futmap: dict[str, Future] = self.create_topics([new_topic], request_timeout=request_timeout) + try: + single_futmap_result(futmap) + return new_topic + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def update_topic_config(self, name: str, config: dict[str, str]) -> None: + self.log.info("Updating topic '%s' configuration with %s", name, config) + futmap = self.alter_configs([ConfigResource(ResourceType.TOPIC, name, set_config=config)]) + try: + single_futmap_result(futmap) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def delete_topic(self, name: str) -> None: + self.log.info("Deleting topic '%s'", name) + futmap = self.delete_topics([name]) + try: + single_futmap_result(futmap) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def cluster_metadata(self, topics: Iterable[str] | None = None) -> dict: + """Fetch cluster metadata and topic information for given topics or all topics if not given. + + Using the `list_topics` method of the `AdminClient`, as this actually provides + metadata for the entire cluster, not just topics, as suggested by the name. + + The topics filter is only applied _after_ fetching the cluster metadata, + due to `list_topics` only accepting a single topic as a filter. + """ + self.log.info("Fetching cluster metadata with topic filter: %s", topics) + cluster_metadata: ClusterMetadata = self.list_topics() + topics_metadata: dict[str, TopicMetadata] = cluster_metadata.topics + brokers_metadata: dict[int, BrokerMetadata] = cluster_metadata.brokers + + if topics is not None and any(topic not in topics_metadata.keys() for topic in topics): + raise UnknownTopicOrPartitionError() + + topics_data: dict[str, dict] = {} + for topic, topic_metadata in topics_metadata.items(): + if topics is not None and topic not in topics: + continue + + partitions_data = [] + for partition_id, partition_metadata in topic_metadata.partitions.items(): + partition_data = { + "partition": partition_id, + "leader": partition_metadata.leader, + "replicas": [ + { + "broker": replica_id, + "leader": replica_id == partition_metadata.leader, + "in_sync": replica_id in partition_metadata.isrs, + } + for replica_id in partition_metadata.replicas + ], + } + partitions_data.append(partition_data) + + topics_data[topic] = {"partitions": partitions_data} + + return {"topics": topics_data, "brokers": list(brokers_metadata.keys())} + + def get_topic_config( + self, + name: str, + config_name_filter: Container[str] | None = None, + config_source_filter: Container[ConfigSource] | None = None, + ) -> dict[str, str]: + """Fetches, filters and returns topic configuration. + + The two filters, `config_name_filter` and `config_source_filter` work together + so if a config entry matches either of them, it'll be returned. + If a filter is not provided (ie. is `None`), it'll act as if matching all + config entries. + """ + self.log.info( + "Fetching config for topic '%s' with name filter %s and source filter %s", + name, + config_name_filter, + config_source_filter, + ) + futmap: dict[ConfigResource, Future] = self.describe_configs([ConfigResource(ResourceType.TOPIC, name)]) + try: + topic_configs = single_futmap_result(futmap) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + config: dict[str, str] = {} + for config_name, config_entry in topic_configs.items(): + matches_name_filter: bool = config_name_filter is None or config_name in config_name_filter + matches_source_filter: bool = ( + config_source_filter is None or ConfigSource(config_entry.source) in config_source_filter + ) + + if matches_name_filter or matches_source_filter: + config[config_name] = config_entry.value + + return config + + def get_offsets(self, topic: str, partition_id: int) -> dict[str, int]: + """Returns the beginning and end offsets for a topic partition. + + Making two separate requests for beginning and end offsets, due to the + `AdminClient.list_offsets` behaviour: it expects a dictionary of topic + partitions as keys, thus unable to fetch different values in one request + for the same topic and partition. + """ + try: + self.log.info("Fetching latest offset for topic '%s' partition %s", topic, partition_id) + latest_offset_futmap: dict[TopicPartition, Future] = self.list_offsets( + { + TopicPartition(topic, partition_id): OffsetSpec.latest(), + } + ) + endoffset = single_futmap_result(latest_offset_futmap) + + self.log.info("Fetching earliest offset for topic '%s' partition %s", topic, partition_id) + earliest_offset_futmap: dict[TopicPartition, Future] = self.list_offsets( + { + TopicPartition(topic, partition_id): OffsetSpec.earliest(), + } + ) + startoffset = single_futmap_result(earliest_offset_futmap) + except KafkaException as exc: + raise_from_kafkaexception(exc) + return {"beginning_offset": startoffset.offset, "end_offset": endoffset.offset} diff --git a/karapace/kafka/common.py b/karapace/kafka/common.py new file mode 100644 index 000000000..cb38165c8 --- /dev/null +++ b/karapace/kafka/common.py @@ -0,0 +1,178 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from collections.abc import Iterable +from concurrent.futures import Future +from confluent_kafka.error import KafkaError, KafkaException +from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError +from typing import Any, Callable, NoReturn, Protocol, TypedDict, TypeVar +from typing_extensions import Unpack + +import logging + +T = TypeVar("T") + + +def single_futmap_result(futmap: dict[Any, Future[T]]) -> T: + """Extract the result of a future wrapped in a dict. + + Bulk operations of the `confluent_kafka` library's Kafka clients return results + wrapped in a dictionary of futures. Most often we use these bulk operations to + operate on a single resource/entity. This function makes sure the dictionary of + futures contains a single future and returns its result. + """ + (future,) = futmap.values() + return future.result() + + +def translate_from_kafkaerror(error: KafkaError) -> Exception: + """Translate a `KafkaError` from `confluent_kafka` to a friendlier exception. + + `kafka.errors.for_code` is used to translate the original exception's error code + to a domain specific error class from `kafka-python`. + + In some cases `KafkaError`s are created with error codes internal to `confluent_kafka`, + such as various internal error codes for unknown topics or partitions: + `_NOENT`, `_UNKNOWN_PARTITION`, `_UNKNOWN_TOPIC` - these internal errors + have negative error codes that needs to be handled separately. + """ + code = error.code() + if code in ( + KafkaError._NOENT, # pylint: disable=protected-access + KafkaError._UNKNOWN_PARTITION, # pylint: disable=protected-access + KafkaError._UNKNOWN_TOPIC, # pylint: disable=protected-access + ): + return UnknownTopicOrPartitionError() + + return for_code(code) + + +def raise_from_kafkaexception(exc: KafkaException) -> NoReturn: + """Raises a more developer-friendly error from a `KafkaException`. + + The `confluent_kafka` library's `KafkaException` is a wrapper around its internal + `KafkaError`. The resulting, raised exception however is coming from + `kafka-python`, due to these exceptions having human-readable names, providing + better context for error handling. + """ + raise translate_from_kafkaerror(exc.args[0]) from exc + + +# For now this is a bit of a trick to replace an explicit usage of +# `karapace.kafka_rest_apis.authentication.SimpleOauthTokenProvider` +# to avoid circular imports +class TokenWithExpiryProvider(Protocol): + def token_with_expiry(self, config: str | None) -> tuple[str, int | None]: + ... + + +class KafkaClientParams(TypedDict, total=False): + client_id: str | None + connections_max_idle_ms: int | None + max_block_ms: int | None + metadata_max_age_ms: int | None + retries: int | None + sasl_mechanism: str | None + sasl_plain_password: str | None + sasl_plain_username: str | None + security_protocol: str | None + socket_timeout_ms: int | None + ssl_cafile: str | None + ssl_certfile: str | None + ssl_keyfile: str | None + sasl_oauth_token_provider: TokenWithExpiryProvider + + +class _KafkaConfigMixin: + """A mixin-class for Kafka client initialization. + + This mixin assumes that it'll be used in conjunction with a Kafka client + from `confluent_kafka`, eg. `AdminClient`, `Producer`, etc. The goal is to + extract configuration, initialization and connection verification. + """ + + def __init__( + self, + bootstrap_servers: Iterable[str] | str, + verify_connection: bool = True, + **params: Unpack[KafkaClientParams], + ) -> None: + self._errors: set[KafkaError] = set() + self.log = logging.getLogger(f"{self.__module__}.{self.__class__.__qualname__}") + + super().__init__(self._get_config_from_params(bootstrap_servers, **params)) # type: ignore[call-arg] + self._activate_callbacks() + if verify_connection: + self._verify_connection() + + def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **params: Unpack[KafkaClientParams]) -> dict: + if not isinstance(bootstrap_servers, str): + bootstrap_servers = ",".join(bootstrap_servers) + + config: dict[str, int | str | Callable | None] = { + "bootstrap.servers": bootstrap_servers, + "client.id": params.get("client_id"), + "connections.max.idle.ms": params.get("connections_max_idle_ms"), + "metadata.max.age.ms": params.get("metadata_max_age_ms"), + "retries": params.get("retries"), + "sasl.mechanism": params.get("sasl_mechanism"), + "sasl.password": params.get("sasl_plain_password"), + "sasl.username": params.get("sasl_plain_username"), + "security.protocol": params.get("security_protocol"), + "socket.timeout.ms": params.get("socket_timeout_ms"), + "ssl.ca.location": params.get("ssl_cafile"), + "ssl.certificate.location": params.get("ssl_certfile"), + "ssl.key.location": params.get("ssl_keyfile"), + "error_cb": self._error_callback, + } + config = {key: value for key, value in config.items() if value is not None} + + if "sasl_oauth_token_provider" in params: + config["oauth_cb"] = params["sasl_oauth_token_provider"].token_with_expiry + + return config + + def _error_callback(self, error: KafkaError) -> None: + self._errors.add(error) + + def _activate_callbacks(self) -> None: + # Any client in the `confluent_kafka` library needs `poll` called to + # trigger any callbacks registered (eg. for errors, OAuth tokens, etc.) + self.poll(timeout=0.0) # type: ignore[attr-defined] + + def _verify_connection(self) -> None: + """Attempts to call `list_topics` a few times. + + The `list_topics` method is the only meaningful synchronous method of + the `confluent_kafka`'s client classes that can be used to verify that + a connection and authentication has been established with a Kafka + cluster. + + Just instantiating and initializing the client doesn't result in + anything in its main thread in case of errors, only error logs from another + thread otherwise. + """ + for _ in range(3): + try: + self.list_topics(timeout=1) # type: ignore[attr-defined] + except KafkaException as exc: + # Other than `list_topics` throwing a `KafkaException` with an underlying + # `KafkaError` with code `_TRANSPORT` (`-195`), if the address or port is + # incorrect, we get no symptoms + # Authentication errors however do show up in the errors passed + # to the callback function defined in the `error_cb` config + self._activate_callbacks() + self.log.info("Could not establish connection due to errors: %s", self._errors) + if any( + error.code() == KafkaError._AUTHENTICATION for error in self._errors # pylint: disable=protected-access + ): + raise AuthenticationFailedError() from exc + continue + else: + break + else: + raise NoBrokersAvailable() diff --git a/karapace/kafka/producer.py b/karapace/kafka/producer.py new file mode 100644 index 000000000..0caecb4c2 --- /dev/null +++ b/karapace/kafka/producer.py @@ -0,0 +1,61 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from concurrent.futures import Future +from confluent_kafka import Message, Producer +from confluent_kafka.admin import PartitionMetadata +from confluent_kafka.error import KafkaError, KafkaException +from functools import partial +from karapace.kafka.common import _KafkaConfigMixin, raise_from_kafkaexception, translate_from_kafkaerror +from typing import cast, TypedDict +from typing_extensions import Unpack + +import logging + +LOG = logging.getLogger(__name__) + + +def _on_delivery_callback(future: Future, error: KafkaError | None, msg: Message | None) -> None: + if error is not None: + LOG.info("Kafka producer delivery error: %s", error) + future.set_exception(translate_from_kafkaerror(error)) + else: + future.set_result(msg) + + +class ProducerSendParams(TypedDict, total=False): + value: str | bytes | None + key: str | bytes | None + partition: int + timestamp: int | None + headers: dict[str, bytes | None] | list[tuple[str, bytes | None]] | None + + +class KafkaProducer(_KafkaConfigMixin, Producer): + def send(self, topic: str, **params: Unpack[ProducerSendParams]) -> Future[Message]: + """A convenience wrapper around `Producer.produce`, to be able to access the message via a Future.""" + result: Future[Message] = Future() + + params = cast(ProducerSendParams, {key: value for key, value in params.items() if value is not None}) + + try: + self.produce( + topic, + on_delivery=partial(_on_delivery_callback, result), + **params, + ) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + return result + + def partitions_for(self, topic: str) -> dict[int, PartitionMetadata]: + """Returns all partition metadata for the given topic.""" + try: + return self.list_topics(topic).topics[topic].partitions + except KafkaException as exc: + raise_from_kafkaexception(exc) diff --git a/karapace/kafka_admin.py b/karapace/kafka_admin.py deleted file mode 100644 index cc0cb286d..000000000 --- a/karapace/kafka_admin.py +++ /dev/null @@ -1,307 +0,0 @@ -""" -Copyright (c) 2023 Aiven Ltd -See LICENSE for details -""" - -from __future__ import annotations - -from collections.abc import Iterable -from concurrent.futures import Future -from confluent_kafka import TopicPartition -from confluent_kafka.admin import ( - AdminClient, - BrokerMetadata, - ClusterMetadata, - ConfigResource, - ConfigSource, - NewTopic, - OffsetSpec, - ResourceType, - TopicMetadata, -) -from confluent_kafka.error import KafkaError, KafkaException -from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError -from karapace.constants import TOPIC_CREATION_TIMEOUT_S -from typing import Any, Callable, Container, NoReturn, Protocol, TypedDict, TypeVar -from typing_extensions import Unpack - -import logging - -LOG = logging.getLogger(__name__) - - -T = TypeVar("T") - - -def single_futmap_result(futmap: dict[Any, Future[T]]) -> T: - """Extract the result of a future wrapped in a dict. - - Bulk operations of the `confluent_kafka` library's Kafka clients return results - wrapped in a dictionary of futures. Most often we use these bulk operations to - operate on a single resource/entity. This function makes sure the dictionary of - futures contains a single future and returns its result. - """ - (future,) = futmap.values() - return future.result() - - -def raise_from_kafkaexception(exc: KafkaException) -> NoReturn: - """Raises a more developer-friendly error from a `KafkaException`. - - The `confluent_kafka` library's `KafkaException` is a wrapper around its internal - `KafkaError`. The resulting, raised exception however is coming from - `kafka-python`, due to these exceptions having human-readable names, providing - better context for error handling. - - `kafka.errors.for_code` is used to translate the original exception's error code - to a domain specific error class from `kafka-python`. - """ - raise for_code(exc.args[0].code()) from exc - - -# For now this is a bit of a trick to replace an explicit usage of -# `karapace.kafka_rest_apis.authentication.SimpleOauthTokenProvider` -# to avoid circular imports -class TokenWithExpiryProvider(Protocol): - def token_with_expiry(self, config: str | None) -> tuple[str, int | None]: - ... - - -class AdminClientParams(TypedDict, total=False): - client_id: str | None - connections_max_idle_ms: int | None - metadata_max_age_ms: int | None - sasl_mechanism: str | None - sasl_plain_password: str | None - sasl_plain_username: str | None - security_protocol: str | None - ssl_cafile: str | None - ssl_certfile: str | None - ssl_keyfile: str | None - sasl_oauth_token_provider: TokenWithExpiryProvider - - -class KafkaAdminClient(AdminClient): - def __init__(self, bootstrap_servers: Iterable[str] | str, **params: Unpack[AdminClientParams]) -> None: - self._errors: set[KafkaError] = set() - - super().__init__(self._get_config_from_params(bootstrap_servers, **params)) - self._activate_callbacks() - self._verify_connection() - - def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **params: Unpack[AdminClientParams]) -> dict: - if not isinstance(bootstrap_servers, str): - bootstrap_servers = ",".join(bootstrap_servers) - - config: dict[str, int | str | Callable | None] = { - "bootstrap.servers": bootstrap_servers, - "client.id": params.get("client_id"), - "connections.max.idle.ms": params.get("connections_max_idle_ms"), - "metadata.max.age.ms": params.get("metadata_max_age_ms"), - "sasl.mechanism": params.get("sasl_mechanism"), - "sasl.password": params.get("sasl_plain_password"), - "sasl.username": params.get("sasl_plain_username"), - "security.protocol": params.get("security_protocol"), - "ssl.ca.location": params.get("ssl_cafile"), - "ssl.certificate.location": params.get("ssl_certfile"), - "ssl.key.location": params.get("ssl_keyfile"), - "error_cb": self._error_callback, - } - config = {key: value for key, value in config.items() if value is not None} - - if "sasl_oauth_token_provider" in params: - config["oauth_cb"] = params["sasl_oauth_token_provider"].token_with_expiry - - return config - - def _error_callback(self, error: KafkaError) -> None: - self._errors.add(error) - - def _activate_callbacks(self) -> None: - # Any client in the `confluent_kafka` library needs `poll` called to - # trigger any callbacks registered (eg. for errors, OAuth tokens, etc.) - self.poll(timeout=0.0) - - def _verify_connection(self) -> None: - """Attempts to call `AdminClient.list_topics` a few times. - - The `list_topics` method is the only meaningful synchronous method of - the `AdminClient` class that can be used to verify that a connection and - authentication has been established with a Kafka cluster. - - Just instantiating and initializing the admin client doesn't result in - anything in its main thread in case of errors, only error logs from another - thread otherwise. - """ - for _ in range(3): - try: - self.list_topics(timeout=1) - except KafkaException as exc: - # Other than `list_topics` throwing a `KafkaException` with an underlying - # `KafkaError` with code `_TRANSPORT` (`-195`), if the address or port is - # incorrect, we get no symptoms - # Authentication errors however do show up in the errors passed - # to the callback function defined in the `error_cb` config - self._activate_callbacks() - LOG.info("Could not establish connection due to errors: %s", self._errors) - if any( - error.code() == KafkaError._AUTHENTICATION for error in self._errors # pylint: disable=protected-access - ): - raise AuthenticationFailedError() from exc - continue - else: - break - else: - raise NoBrokersAvailable() - - def new_topic( - self, - name: str, - *, - num_partitions: int = 1, - replication_factor: int = 1, - config: dict[str, str] | None = None, - request_timeout: float = TOPIC_CREATION_TIMEOUT_S, - ) -> NewTopic: - new_topic = NewTopic( - topic=name, - num_partitions=num_partitions, - replication_factor=replication_factor, - config=config if config is not None else {}, - ) - LOG.info("Creating new topic %s with replication factor %s", new_topic, replication_factor) - futmap: dict[str, Future] = self.create_topics([new_topic], request_timeout=request_timeout) - try: - single_futmap_result(futmap) - return new_topic - except KafkaException as exc: - raise_from_kafkaexception(exc) - - def update_topic_config(self, name: str, config: dict[str, str]) -> None: - LOG.info("Updating topic '%s' configuration with %s", name, config) - futmap = self.alter_configs([ConfigResource(ResourceType.TOPIC, name, set_config=config)]) - try: - single_futmap_result(futmap) - except KafkaException as exc: - raise_from_kafkaexception(exc) - - def delete_topic(self, name: str) -> None: - LOG.info("Deleting topic '%s'", name) - futmap = self.delete_topics([name]) - try: - single_futmap_result(futmap) - except KafkaException as exc: - raise_from_kafkaexception(exc) - - def cluster_metadata(self, topics: Iterable[str] | None = None) -> dict: - """Fetch cluster metadata and topic information for given topics or all topics if not given. - - Using the `list_topics` method of the `AdminClient`, as this actually provides - metadata for the entire cluster, not just topics, as suggested by the name. - - The topics filter is only applied _after_ fetching the cluster metadata, - due to `list_topics` only accepting a single topic as a filter. - """ - LOG.info("Fetching cluster metadata with topic filter: %s", topics) - cluster_metadata: ClusterMetadata = self.list_topics() - topics_metadata: dict[str, TopicMetadata] = cluster_metadata.topics - brokers_metadata: dict[int, BrokerMetadata] = cluster_metadata.brokers - - if topics is not None and any(topic not in topics_metadata.keys() for topic in topics): - raise UnknownTopicOrPartitionError() - - topics_data: dict[str, dict] = {} - for topic, topic_metadata in topics_metadata.items(): - if topics is not None and topic not in topics: - continue - - partitions_data = [] - for partition_id, partition_metadata in topic_metadata.partitions.items(): - partition_data = { - "partition": partition_id, - "leader": partition_metadata.leader, - "replicas": [ - { - "broker": replica_id, - "leader": replica_id == partition_metadata.leader, - "in_sync": replica_id in partition_metadata.isrs, - } - for replica_id in partition_metadata.replicas - ], - } - partitions_data.append(partition_data) - - topics_data[topic] = {"partitions": partitions_data} - - return {"topics": topics_data, "brokers": list(brokers_metadata.keys())} - - def get_topic_config( - self, - name: str, - config_name_filter: Container[str] | None = None, - config_source_filter: Container[ConfigSource] | None = None, - ) -> dict[str, str]: - """Fetches, filters and returns topic configuration. - - The two filters, `config_name_filter` and `config_source_filter` work together - so if a config entry matches either of them, it'll be returned. - If a filter is not provided (ie. is `None`), it'll act as if matching all - config entries. - """ - LOG.info( - "Fetching config for topic '%s' with name filter %s and source filter %s", - name, - config_name_filter, - config_source_filter, - ) - futmap: dict[ConfigResource, Future] = self.describe_configs([ConfigResource(ResourceType.TOPIC, name)]) - try: - topic_configs = single_futmap_result(futmap) - except KafkaException as exc: - raise_from_kafkaexception(exc) - - config: dict[str, str] = {} - for config_name, config_entry in topic_configs.items(): - matches_name_filter: bool = config_name_filter is None or config_name in config_name_filter - matches_source_filter: bool = ( - config_source_filter is None or ConfigSource(config_entry.source) in config_source_filter - ) - - if matches_name_filter or matches_source_filter: - config[config_name] = config_entry.value - - return config - - def get_offsets(self, topic: str, partition_id: int) -> dict[str, int]: - """Returns the beginning and end offsets for a topic partition. - - Making two separate requests for beginning and end offsets, due to the - `AdminClient.list_offsets` behaviour: it expects a dictionary of topic - partitions as keys, thus unable to fetch different values in one request - for the same topic and partition. - """ - try: - LOG.info("Fetching latest offset for topic '%s' partition %s", topic, partition_id) - latest_offset_futmap: dict[TopicPartition, Future] = self.list_offsets( - { - TopicPartition(topic, partition_id): OffsetSpec.latest(), - } - ) - endoffset = single_futmap_result(latest_offset_futmap) - - LOG.info("Fetching earliest offset for topic '%s' partition %s", topic, partition_id) - earliest_offset_futmap: dict[TopicPartition, Future] = self.list_offsets( - { - TopicPartition(topic, partition_id): OffsetSpec.earliest(), - } - ) - startoffset = single_futmap_result(earliest_offset_futmap) - except KafkaException as exc: - code = exc.args[0].code() - # In some cases `list_offsets` raises an error with a `_NOENT`, code `-156` error - # with the message "Failed to query partition leaders: No leaders found", which is - # "internal" to `confluent_kafka` and has to be handled separately. - if code == KafkaError._NOENT: # pylint: disable=protected-access - raise UnknownTopicOrPartitionError() from exc - raise_from_kafkaexception(exc) - return {"beginning_offset": startoffset.offset, "end_offset": endoffset.offset} diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 3f3d08dd7..b9b7b28aa 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -14,7 +14,7 @@ ) from karapace.config import Config, create_client_ssl_context from karapace.errors import InvalidSchema -from karapace.kafka_admin import KafkaAdminClient, KafkaException +from karapace.kafka.admin import KafkaAdminClient, KafkaException from karapace.kafka_rest_apis.authentication import ( get_auth_config_from_header, get_expiration_time_from_header, diff --git a/karapace/kafka_utils.py b/karapace/kafka_utils.py index 6b79bbf36..c70cd530c 100644 --- a/karapace/kafka_utils.py +++ b/karapace/kafka_utils.py @@ -4,8 +4,9 @@ """ from .config import Config from .utils import KarapaceKafkaClient -from kafka import KafkaConsumer, KafkaProducer -from karapace.kafka_admin import KafkaAdminClient +from kafka import KafkaConsumer +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.producer import KafkaProducer from typing import Iterator import contextlib @@ -50,7 +51,7 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons @contextlib.contextmanager -def kafka_producer_from_config(config: Config) -> KafkaProducer: +def kafka_producer_from_config(config: Config) -> Iterator[KafkaProducer]: producer = KafkaProducer( bootstrap_servers=config["bootstrap_uri"], security_protocol=config["security_protocol"], @@ -60,13 +61,9 @@ def kafka_producer_from_config(config: Config) -> KafkaProducer: sasl_mechanism=config["sasl_mechanism"], sasl_plain_username=config["sasl_plain_username"], sasl_plain_password=config["sasl_plain_password"], - kafka_client=KarapaceKafkaClient, retries=0, ) try: yield producer finally: - try: - producer.flush() - finally: - producer.close() + producer.flush() diff --git a/karapace/messaging.py b/karapace/messaging.py index a543515c0..36d07c638 100644 --- a/karapace/messaging.py +++ b/karapace/messaging.py @@ -4,13 +4,13 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from kafka import KafkaProducer from kafka.errors import MessageSizeTooLargeError from karapace.config import Config from karapace.errors import SchemaTooLargeException +from karapace.kafka.producer import KafkaProducer from karapace.key_format import KeyFormatter from karapace.offset_watcher import OffsetWatcher -from karapace.utils import json_encode, KarapaceKafkaClient +from karapace.utils import json_encode from karapace.version import __version__ from typing import Any, Dict, Final, Optional, Union @@ -38,6 +38,7 @@ def initialize_karapace_producer( try: self._producer = KafkaProducer( bootstrap_servers=self._config["bootstrap_uri"], + verify_connection=False, security_protocol=self._config["security_protocol"], ssl_cafile=self._config["ssl_cafile"], ssl_certfile=self._config["ssl_certfile"], @@ -45,11 +46,9 @@ def initialize_karapace_producer( sasl_mechanism=self._config["sasl_mechanism"], sasl_plain_username=self._config["sasl_plain_username"], sasl_plain_password=self._config["sasl_plain_password"], - api_version=(1, 0, 0), metadata_max_age_ms=self._config["metadata_max_age_ms"], - max_block_ms=2000, # missing topics will block unless we cache cluster metadata and pre-check + socket_timeout_ms=2000, # missing topics will block unless we cache cluster metadata and pre-check connections_max_idle_ms=self._config["connections_max_idle_ms"], # helps through cluster upgrades ?? - kafka_client=KarapaceKafkaClient, ) return except: # pylint: disable=bare-except @@ -57,8 +56,9 @@ def initialize_karapace_producer( time.sleep(1) def close(self) -> None: + LOG.info("Closing karapace_producer") if self._producer is not None: - self._producer.close() + self._producer.flush() def _send_kafka_message(self, key: Union[bytes, str], value: Union[bytes, str]) -> None: assert self._producer is not None @@ -76,14 +76,14 @@ def _send_kafka_message(self, key: Union[bytes, str], value: Union[bytes, str]) ) self._producer.flush(timeout=self._kafka_timeout) try: - msg = future.get(self._kafka_timeout) + msg = future.result(self._kafka_timeout) except MessageSizeTooLargeError as ex: raise SchemaTooLargeException from ex - sent_offset = msg.offset + sent_offset = msg.offset() LOG.info( - "Waiting for schema reader to caught up. key: %r, value: %r, offset: %r", + "Waiting for schema reader to catch up. key: %r, value: %r, offset: %r", key, value, sent_offset, diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 9b25b3ad4..b2fde9bbf 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -24,7 +24,7 @@ from karapace.dependency import Dependency from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase -from karapace.kafka_admin import KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 0ba18f5cd..e8bbaf969 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -304,6 +304,7 @@ def _add_schema_registry_routes(self) -> None: ) async def close(self) -> None: + self.log.info("Closing karapace_schema_registry_controller") async with AsyncExitStack() as stack: stack.push_async_callback(super().close) stack.push_async_callback(self.schema_registry.close) diff --git a/stubs/confluent_kafka/__init__.pyi b/stubs/confluent_kafka/__init__.pyi index 94c2c13ff..5762cb52a 100644 --- a/stubs/confluent_kafka/__init__.pyi +++ b/stubs/confluent_kafka/__init__.pyi @@ -1,4 +1,4 @@ from ._model import IsolationLevel -from .cimpl import TopicPartition +from .cimpl import Message, Producer, TopicPartition -__all__ = ("IsolationLevel", "TopicPartition") +__all__ = ("IsolationLevel", "Message", "Producer", "TopicPartition") diff --git a/stubs/confluent_kafka/admin/__init__.pyi b/stubs/confluent_kafka/admin/__init__.pyi index ad4d09af8..02abcc033 100644 --- a/stubs/confluent_kafka/admin/__init__.pyi +++ b/stubs/confluent_kafka/admin/__init__.pyi @@ -1,7 +1,7 @@ from ..cimpl import NewTopic from ._config import ConfigEntry, ConfigResource, ConfigSource from ._listoffsets import ListOffsetsResultInfo, OffsetSpec -from ._metadata import BrokerMetadata, ClusterMetadata, TopicMetadata +from ._metadata import BrokerMetadata, ClusterMetadata, PartitionMetadata, TopicMetadata from ._resource import ResourceType from concurrent.futures import Future from confluent_kafka import IsolationLevel, TopicPartition @@ -15,13 +15,14 @@ __all__ = ( "ConfigSource", "NewTopic", "OffsetSpec", + "PartitionMetadata", "ResourceType", "TopicMetadata", ) class AdminClient: def __init__(self, config: dict[str, str | int | Callable]) -> None: ... - def poll(self, timeout: float) -> int: ... + def poll(self, timeout: float = -1) -> int: ... def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... def create_topics( self, diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index f32df8e0d..9b573c5b9 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -1,8 +1,11 @@ -from typing import Any +from confluent_kafka.admin._metadata import ClusterMetadata +from typing import Any, Callable class KafkaError: _NOENT: int _AUTHENTICATION: int + _UNKNOWN_TOPIC: int + _UNKNOWN_PARTITION: int def code(self) -> int: ... @@ -30,3 +33,26 @@ class TopicPartition: metadata: str | None = None, leader_epoc: int | None = None, ) -> None: ... + +class Message: + def offset(self) -> int: ... + def timestamp(self) -> tuple[int, int]: ... + def key(self) -> str | bytes | None: ... + def value(self) -> str | bytes | None: ... + def topic(self) -> str: ... + def partition(self) -> int: ... + +class Producer: + def produce( + self, + topic: str, + value: str | bytes | None = None, + key: str | bytes | None = None, + partition: int = -1, + on_delivery: Callable[[KafkaError, Message], Any] | None = None, + timestamp: int | None = -1, + headers: dict[str, bytes | None] | list[tuple[str, bytes | None]] | None = None, + ) -> None: ... + def flush(self, timeout: float = -1) -> None: ... + def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... + def poll(self, timeout: float = -1) -> int: ... diff --git a/tests/integration/backup/test_get_topic_configurations.py b/tests/integration/backup/test_get_topic_configurations.py index 884033cc2..16592aed3 100644 --- a/tests/integration/backup/test_get_topic_configurations.py +++ b/tests/integration/backup/test_get_topic_configurations.py @@ -5,7 +5,7 @@ from __future__ import annotations from karapace.backup.topic_configurations import ALL_CONFIG_SOURCES, ConfigSource, DEFAULT_CONFIGS, get_topic_configurations -from karapace.kafka_admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient, NewTopic import pytest diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 2c6e70df4..3d732bd58 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -12,7 +12,7 @@ from karapace.backup.poll_timeout import PollTimeout from karapace.client import Client from karapace.config import set_config_defaults -from karapace.kafka_admin import KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient from karapace.key_format import is_key_in_canonical_format from karapace.utils import Expiration from pathlib import Path diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 817b5f970..be336afcf 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -5,7 +5,7 @@ from __future__ import annotations from dataclasses import fields -from kafka import KafkaProducer, TopicPartition +from kafka import TopicPartition from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import UnknownTopicOrPartitionError from karapace.backup import api @@ -16,9 +16,9 @@ from karapace.backup.poll_timeout import PollTimeout from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations from karapace.config import Config, set_config_defaults -from karapace.kafka_admin import KafkaAdminClient, NewTopic -from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config -from karapace.utils import KarapaceKafkaClient +from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.producer import KafkaProducer +from karapace.kafka_utils import kafka_consumer_from_config, kafka_producer_from_config from karapace.version import __version__ from pathlib import Path from tempfile import mkdtemp @@ -75,15 +75,6 @@ def config_file_fixture( shutil.rmtree(directory_path) -@pytest.fixture(scope="function", name="kafka_admin") -def admin_fixture(karapace_config: Config) -> Iterator[KafkaAdminClient]: - admin = kafka_admin_from_config(karapace_config) - try: - yield admin - finally: - admin.close() - - @pytest.fixture(scope="function", name="producer") def producer_fixture(karapace_config: Config) -> Iterator[KafkaProducer]: with kafka_producer_from_config(karapace_config) as producer: @@ -111,8 +102,8 @@ def test_roundtrip_from_kafka_state( key=b"bar", value=b"foo", partition=0, - timestamp_ms=1683474641, - ).add_errback(_raise) + timestamp=1683474641, + ) producer.send( new_topic.topic, key=b"foo", @@ -122,8 +113,8 @@ def test_roundtrip_from_kafka_state( ("some-header", b"some header value"), ("other-header", b"some other header value"), ], - timestamp_ms=1683474657, - ).add_errback(_raise) + timestamp=1683474657, + ) producer.flush() topic_config = get_topic_configurations(admin_client, new_topic.topic, {ConfigSource.DYNAMIC_TOPIC_CONFIG}) @@ -571,15 +562,14 @@ def __init__(self): sasl_mechanism=config["sasl_mechanism"], sasl_plain_username=config["sasl_plain_username"], sasl_plain_password=config["sasl_plain_password"], - kafka_client=KarapaceKafkaClient, - max_block_ms=5000, + socket_timeout_ms=5000, ) def __enter__(self): return self._producer def __exit__(self, exc_type, exc_value, exc_traceback): - self._producer.close() + self._producer.flush() with patch("karapace.backup.api._producer") as p: p.return_value = LowTimeoutProducer() @@ -592,7 +582,7 @@ def __exit__(self, exc_type, exc_value, exc_traceback): ) -def test_producer_raises_exceptions( +def test_backup_restoration_fails_when_producer_send_fails( admin_client: KafkaAdminClient, kafka_servers: KafkaServers, ) -> None: @@ -614,8 +604,31 @@ def test_producer_raises_exceptions( } ) - with patch("kafka.producer.record_accumulator.RecordAccumulator.append") as p: - p.side_effect = UnknownTopicOrPartitionError() + class FailToSendProducer(KafkaProducer): + def send(self, *args, **kwargs): + raise UnknownTopicOrPartitionError() + + class FailToSendProducerContext: + def __init__(self): + self._producer = FailToSendProducer( + bootstrap_servers=config["bootstrap_uri"], + security_protocol=config["security_protocol"], + ssl_cafile=config["ssl_cafile"], + ssl_certfile=config["ssl_certfile"], + ssl_keyfile=config["ssl_keyfile"], + sasl_mechanism=config["sasl_mechanism"], + sasl_plain_username=config["sasl_plain_username"], + sasl_plain_password=config["sasl_plain_password"], + ) + + def __enter__(self): + return self._producer + + def __exit__(self, exc_type, exc_value, exc_traceback): + self._producer.flush() + + with patch("karapace.backup.api._producer") as p: + p.return_value = FailToSendProducerContext() with pytest.raises(BackupDataRestorationError): api.restore_backup( config=config, @@ -839,7 +852,7 @@ def test_can_verify_file_integrity_from_large_topic( key=1000 * b"a", value=1000 * b"b", partition=0, - ).add_errback(_raise) + ) producer.flush() # Execute backup creation. @@ -894,7 +907,7 @@ def test_can_verify_record_integrity_from_large_topic( key=1000 * b"a", value=1000 * b"b", partition=0, - ).add_errback(_raise) + ) producer.flush() # Execute backup creation. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 688f24fe1..d9ab70d40 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -7,13 +7,13 @@ from _pytest.fixtures import SubRequest from aiohttp.pytest_plugin import AiohttpClient from aiohttp.test_utils import TestClient -from contextlib import closing, ExitStack +from contextlib import ExitStack from dataclasses import asdict from filelock import FileLock -from kafka import KafkaProducer from karapace.client import Client from karapace.config import Config, set_config_defaults, write_config -from karapace.kafka_admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.producer import KafkaProducer from karapace.kafka_rest_apis import KafkaRest from pathlib import Path from tests.conftest import KAFKA_VERSION @@ -206,8 +206,7 @@ def fixture_kafka_server( @pytest.fixture(scope="function", name="producer") def fixture_producer(kafka_servers: KafkaServers) -> KafkaProducer: - with closing(KafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers)) as prod: - yield prod + yield KafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers) @pytest.fixture(scope="function", name="admin_client") diff --git a/tests/integration/kafka/__init__.py b/tests/integration/kafka/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/test_kafka_admin.py b/tests/integration/kafka/test_admin.py similarity index 96% rename from tests/integration/test_kafka_admin.py rename to tests/integration/kafka/test_admin.py index 542694d20..d6d586a6c 100644 --- a/tests/integration/test_kafka_admin.py +++ b/tests/integration/kafka/test_admin.py @@ -5,9 +5,9 @@ from __future__ import annotations -from kafka import KafkaProducer from kafka.errors import InvalidReplicationFactorError, TopicAlreadyExistsError, UnknownTopicOrPartitionError -from karapace.kafka_admin import ConfigSource, KafkaAdminClient, NewTopic +from karapace.kafka.admin import ConfigSource, KafkaAdminClient, NewTopic +from karapace.kafka.producer import KafkaProducer from tests.utils import new_topic as create_new_topic import pytest @@ -122,9 +122,8 @@ def test_get_offsets(self, admin_client: KafkaAdminClient, new_topic: NewTopic, partition_id = 0 number_of_messages = 5 for _ in range(number_of_messages): - fut = producer.send(topic_name, value=b"test-message") - producer.flush() - fut.get() + producer.send(topic_name) + producer.flush() offsets = admin_client.get_offsets(topic_name, partition_id) diff --git a/tests/integration/kafka/test_producer.py b/tests/integration/kafka/test_producer.py new file mode 100644 index 000000000..69e06ea08 --- /dev/null +++ b/tests/integration/kafka/test_producer.py @@ -0,0 +1,71 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from kafka.errors import MessageSizeTooLargeError, UnknownTopicOrPartitionError +from karapace.kafka.admin import NewTopic +from karapace.kafka.producer import KafkaProducer + +import pytest +import time + + +class TestSend: + def test_send(self, producer: KafkaProducer, new_topic: NewTopic) -> None: + key = b"key" + value = b"value" + partition = 0 + timestamp = int(time.time() * 1000) + headers = [(b"something", b"123")] + + fut = producer.send( + new_topic.topic, + key=key, + value=value, + partition=partition, + timestamp=timestamp, + headers=headers, + ) + producer.flush() + message = fut.result() + + assert message.offset() == 0 + assert message.partition() == partition + assert message.topic() == new_topic.topic + assert message.key() == key + assert message.value() == value + assert message.timestamp()[1] == timestamp + + def test_send_raises_for_unknown_topic(self, producer: KafkaProducer) -> None: + fut = producer.send("nonexistent") + producer.flush() + + with pytest.raises(UnknownTopicOrPartitionError): + fut.result() + + def test_send_raises_for_unknown_partition(self, producer: KafkaProducer, new_topic: NewTopic) -> None: + fut = producer.send(new_topic.topic, partition=99) + producer.flush() + + with pytest.raises(UnknownTopicOrPartitionError): + fut.result() + + def test_send_raises_for_too_large_message(self, producer: KafkaProducer, new_topic: NewTopic) -> None: + with pytest.raises(MessageSizeTooLargeError): + producer.send(new_topic.topic, value=b"x" * 1000001) + + +class TestPartitionsFor: + def test_partitions_for_returns_empty_for_unknown_topic(self, producer: KafkaProducer) -> None: + assert producer.partitions_for("nonexistent") == {} + + def test_partitions_for(self, producer: KafkaProducer, new_topic: NewTopic) -> None: + partitions = producer.partitions_for(new_topic.topic) + + assert len(partitions) == 1 + assert partitions[0].id == 0 + assert partitions[0].replicas == [1] + assert partitions[0].isrs == [1] diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index e7282309d..d2287f05f 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -4,9 +4,9 @@ """ from __future__ import annotations -from kafka import KafkaProducer from karapace.client import Client -from karapace.kafka_admin import KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.producer import KafkaProducer from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX from karapace.version import __version__ from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES @@ -660,7 +660,8 @@ async def test_partitions( assert res.status_code == 404 assert res.json()["error_code"] == 40402 for _ in range(5): - producer.send(topic_name, value=b"foo_val").get() + producer.send(topic_name, value=b"foo_val") + producer.flush() offset_res = await rest_async_client.get(f"/topics/{topic_name}/partitions/0/offsets", headers=header) assert offset_res.ok, f"Status code {offset_res.status_code!r} is not expected: {offset_res.json()!r}" data = offset_res.json() diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index ed0186f9f..e1620d1b2 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -102,7 +102,8 @@ async def test_subscription(rest_async_client, admin_client, producer, trail): "topics" in data and len(data["topics"]) == 1 and data["topics"][0] == topic_name ), f"expecting {topic_name} in {data}" for _ in range(3): - producer.send(topic_name, b"foo").get() + producer.send(topic_name, value=b"foo") + producer.flush() resp = await rest_async_client.get(consume_path, headers=header) data = resp.json() assert resp.ok, f"Expected a successful response: {data['message']}" @@ -136,7 +137,8 @@ async def test_subscription(rest_async_client, admin_client, producer, trail): # writing to all 3 will get us results from all 3 for t in pattern_topics: for _ in range(3): - producer.send(t, b"bar").get() + producer.send(t, value=b"bar") + producer.flush() resp = await rest_async_client.get(consume_path, headers=header) data = resp.json() assert resp.ok, f"Expected a successful response: {data['message']}" @@ -266,7 +268,8 @@ async def test_consume(rest_async_client, admin_client, producer, trail): res = await rest_async_client.post(assign_path, json=assign_payload, headers=header) assert res.ok for i in range(len(values[fmt])): - producer.send(topic_name, value=values[fmt][i]).get() + producer.send(topic_name, value=values[fmt][i]) + producer.flush() seek_payload = {"partitions": [{"topic": topic_name, "partition": 0}]} resp = await rest_async_client.post(seek_path, headers=header, json=seek_payload) assert resp.ok @@ -303,7 +306,8 @@ async def test_consume_timeout(rest_async_client, admin_client, producer): res = await rest_async_client.post(assign_path, json=assign_payload, headers=header) assert res.ok for i in range(len(values[fmt])): - producer.send(topic_name, value=values[fmt][i]).get() + producer.send(topic_name, value=values[fmt][i]) + producer.flush() seek_payload = {"partitions": [{"topic": topic_name, "partition": 0}]} resp = await rest_async_client.post(seek_path, headers=header, json=seek_payload) assert resp.ok diff --git a/tests/integration/test_rest_consumer_protobuf.py b/tests/integration/test_rest_consumer_protobuf.py index 52662aeb9..903d94327 100644 --- a/tests/integration/test_rest_consumer_protobuf.py +++ b/tests/integration/test_rest_consumer_protobuf.py @@ -4,7 +4,7 @@ """ from karapace.client import Client -from karapace.kafka_admin import KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient from karapace.protobuf.kotlin_wrapper import trim_margin from tests.integration.test_rest import NEW_TOPIC_TIMEOUT from tests.utils import ( diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 4e325a2a0..90d4172bd 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -5,8 +5,8 @@ See LICENSE for details """ from http import HTTPStatus -from kafka import KafkaProducer from karapace.client import Client +from karapace.kafka.producer import KafkaProducer from karapace.rapu import is_success from karapace.schema_registry_apis import SchemaErrorMessages from karapace.utils import json_encode @@ -2382,7 +2382,8 @@ async def test_malformed_kafka_message( message_value.update(payload) producer.send( registry_cluster.schemas_topic, key=json.dumps(message_key).encode(), value=json.dumps(message_value).encode() - ).get() + ) + producer.flush() path = f"schemas/ids/{schema_id}" res = await repeat_until_successful_request( @@ -2876,9 +2877,10 @@ async def test_schema_recreate_after_odd_hard_delete( registry_cluster.schemas_topic, key=message_key, value=json_encode(message_value, sort_keys=False, compact=True, binary=True), - ).get() + ) # Produce manual hard delete without soft delete first - producer.send(registry_cluster.schemas_topic, key=message_key, value=None).get() + producer.send(registry_cluster.schemas_topic, key=message_key, value=None) + producer.flush() res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) assert res.status_code == 200 @@ -3018,7 +3020,8 @@ async def test_schema_non_compliant_namespace_in_existing( registry_cluster.schemas_topic, key=message_key, value=json_encode(message_value, sort_keys=False, compact=True, binary=True), - ).get() + ) + producer.flush() evolved_schema = { "type": "record", @@ -3114,7 +3117,8 @@ async def test_schema_non_compliant_name_in_existing( registry_cluster.schemas_topic, key=message_key, value=json_encode(message_value, sort_keys=False, compact=True, binary=True), - ).get() + ) + producer.flush() evolved_schema = { "type": "record", diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index 9fc482c9b..ea39e663a 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -4,11 +4,11 @@ """ from contextlib import closing from dataclasses import dataclass -from kafka import KafkaProducer from karapace.config import set_config_defaults from karapace.constants import DEFAULT_SCHEMA_TOPIC from karapace.in_memory_database import InMemoryDatabase -from karapace.kafka_admin import KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.producer import KafkaProducer from karapace.key_format import KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher @@ -105,9 +105,10 @@ def test_regression_soft_delete_schemas_should_be_registered( key=json_encode(key, binary=True), value=json_encode(value, binary=True), ) - msg = future.get() + producer.flush() + msg = future.result() - schema_reader._offset_watcher.wait_for_offset(msg.offset, timeout=5) # pylint: disable=protected-access + schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) # pylint: disable=protected-access schemas = database.find_subject_schemas(subject=subject, include_deleted=True) assert len(schemas) == 1, "Deleted schemas must have been registered" @@ -132,11 +133,11 @@ def test_regression_soft_delete_schemas_should_be_registered( key=json_encode(key, binary=True), value=json_encode(value, binary=True), ) - msg = future.get() + producer.flush() + msg = future.result() - assert ( - schema_reader._offset_watcher.wait_for_offset(msg.offset, timeout=5) is True # pylint: disable=protected-access - ) + seen = schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) # pylint: disable=protected-access + assert seen is True assert database.global_schema_id == test_global_schema_id schemas = database.find_subject_schemas(subject=subject, include_deleted=True) @@ -187,11 +188,11 @@ def test_regression_config_for_inexisting_object_should_not_throw( key=json_encode(key, binary=True), value=json_encode(value, binary=True), ) - msg = future.get() + producer.flush() + msg = future.result() - assert ( - schema_reader._offset_watcher.wait_for_offset(msg.offset, timeout=5) is True # pylint: disable=protected-access - ) + seen = schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) # pylint: disable=protected-access + assert seen is True assert database.find_subject(subject=subject) is not None, "The above message should be handled gracefully" @@ -244,12 +245,11 @@ def test_key_format_detection( test_topic = new_topic(admin_client) for message in testcase.raw_msgs: - future = producer.send( + producer.send( test_topic, key=message[0], value=message[1], ) - future.get() producer.flush() config = set_config_defaults( diff --git a/tests/integration/test_schema_registry_auth.py b/tests/integration/test_schema_registry_auth.py index 7706596b4..5bcfc3dd0 100644 --- a/tests/integration/test_schema_registry_auth.py +++ b/tests/integration/test_schema_registry_auth.py @@ -5,7 +5,7 @@ See LICENSE for details """ from karapace.client import Client -from karapace.kafka_admin import KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient from karapace.schema_models import SchemaType, ValidatedTypedSchema from tests.utils import ( new_random_name, diff --git a/tests/integration/utils/kafka_server.py b/tests/integration/utils/kafka_server.py index 9a54698d0..cc882f157 100644 --- a/tests/integration/utils/kafka_server.py +++ b/tests/integration/utils/kafka_server.py @@ -4,7 +4,7 @@ """ from dataclasses import dataclass from kafka.errors import AuthenticationFailedError, NoBrokersAvailable -from karapace.kafka_admin import KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient from karapace.utils import Expiration from pathlib import Path from subprocess import Popen diff --git a/tests/unit/backup/test_api.py b/tests/unit/backup/test_api.py index e952aede5..c112d5ffc 100644 --- a/tests/unit/backup/test_api.py +++ b/tests/unit/backup/test_api.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaConsumer from kafka.errors import KafkaError, TopicAlreadyExistsError from kafka.structs import PartitionMetadata from karapace import config @@ -24,6 +24,7 @@ from karapace.backup.errors import BackupError, PartitionCountError from karapace.config import Config from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.kafka.producer import KafkaProducer from pathlib import Path from types import FunctionType from typing import Callable, cast, ContextManager @@ -150,10 +151,10 @@ def _partition_metadata(c: int = 1) -> set[PartitionMetadata]: return {PartitionMetadata("topic", i, 0, tuple(), tuple(), None) for i in range(0, c)} @pytest.mark.parametrize( - "ctx_mng,client_class,partitions_method", + "ctx_mng,client_class,partitions_method,close_method_name", ( - (_consumer, KafkaConsumer, KafkaConsumer.partitions_for_topic), - (_producer, KafkaProducer, KafkaProducer.partitions_for), + (_consumer, KafkaConsumer, KafkaConsumer.partitions_for_topic, "close"), + (_producer, KafkaProducer, KafkaProducer.partitions_for, "flush"), ), ) def test_auto_closing( @@ -161,20 +162,21 @@ def test_auto_closing( ctx_mng: Callable[[Config, str], ContextManager[KafkaConsumer | KafkaProducer]], client_class: type[KafkaConsumer | KafkaProducer], partitions_method: FunctionType, + close_method_name: str, ) -> None: with mock.patch(f"{client_class.__module__}.{client_class.__qualname__}.__new__", autospec=True) as client_ctor: client_mock = client_ctor.return_value getattr(client_mock, partitions_method.__name__).return_value = self._partition_metadata() with ctx_mng(config.DEFAULTS, "topic") as client: assert client is client_mock - assert client_mock.close.call_count == 1 + assert getattr(client_mock, close_method_name).call_count == 1 @pytest.mark.parametrize("partition_count", (0, 2)) @pytest.mark.parametrize( - "ctx_mng,client_class,partitions_method", + "ctx_mng,client_class,partitions_method,close_method_name", ( - (_consumer, KafkaConsumer, KafkaConsumer.partitions_for_topic), - (_producer, KafkaProducer, KafkaProducer.partitions_for), + (_consumer, KafkaConsumer, KafkaConsumer.partitions_for_topic, "close"), + (_producer, KafkaProducer, KafkaProducer.partitions_for, "flush"), ), ) def test_raises_partition_count_error_for_unexpected_count( @@ -183,6 +185,7 @@ def test_raises_partition_count_error_for_unexpected_count( client_class: type[KafkaConsumer | KafkaProducer], partitions_method: FunctionType, partition_count: int, + close_method_name: str, ) -> None: with mock.patch(f"{client_class.__module__}.{client_class.__qualname__}.__new__", autospec=True) as client_ctor: client_mock = client_ctor.return_value @@ -190,7 +193,7 @@ def test_raises_partition_count_error_for_unexpected_count( with pytest.raises(PartitionCountError): with ctx_mng(config.DEFAULTS, "topic") as client: assert client == client_mock - assert client_mock.close.call_count == 1 + assert getattr(client_mock, close_method_name).call_count == 1 class TestNormalizeLocation: diff --git a/tests/utils.py b/tests/utils.py index 24e3e8bcf..12e544408 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -5,7 +5,7 @@ from aiohttp.client_exceptions import ClientOSError, ServerDisconnectedError from kafka.errors import TopicAlreadyExistsError from karapace.client import Client -from karapace.kafka_admin import KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient from karapace.protobuf.kotlin_wrapper import trim_margin from karapace.utils import Expiration from pathlib import Path