From b17003f95d960f9975ecb2d39f035f4994d1a3eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ty=C3=A1s=20Kuti?= Date: Mon, 22 Jan 2024 12:54:44 +0100 Subject: [PATCH] Replace async consumer with confluent-kafka one --- karapace/kafka/common.py | 29 +- karapace/kafka/consumer.py | 215 +++++++++++- karapace/kafka_rest_apis/consumer_manager.py | 192 ++++++----- stubs/confluent_kafka/__init__.pyi | 4 + stubs/confluent_kafka/cimpl.pyi | 22 +- .../integration/backup/test_legacy_backup.py | 2 +- tests/integration/conftest.py | 35 +- tests/integration/kafka/test_consumer.py | 318 +++++++++++++++++- tests/integration/kafka/test_producer.py | 14 - tests/integration/test_rest_consumer.py | 11 +- .../test_rest_consumer_protobuf.py | 2 +- tests/utils.py | 1 + 12 files changed, 728 insertions(+), 117 deletions(-) diff --git a/karapace/kafka/common.py b/karapace/kafka/common.py index cf25d22dd..54dcb3665 100644 --- a/karapace/kafka/common.py +++ b/karapace/kafka/common.py @@ -8,7 +8,14 @@ from collections.abc import Iterable from concurrent.futures import Future from confluent_kafka.error import KafkaError, KafkaException -from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError +from kafka.errors import ( + AuthenticationFailedError, + for_code, + IllegalStateError, + KafkaTimeoutError, + NoBrokersAvailable, + UnknownTopicOrPartitionError, +) from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar from typing_extensions import Unpack @@ -47,6 +54,10 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception: KafkaError._UNKNOWN_TOPIC, # pylint: disable=protected-access ): return UnknownTopicOrPartitionError() + if code == KafkaError._TIMED_OUT: # pylint: disable=protected-access + return KafkaTimeoutError() + if code == KafkaError._STATE: # pylint: disable=protected-access + return IllegalStateError() return for_code(code) @@ -89,12 +100,15 @@ class KafkaClientParams(TypedDict, total=False): ssl_crlfile: str | None ssl_keyfile: str | None sasl_oauth_token_provider: TokenWithExpiryProvider + topic_metadata_refresh_interval_ms: int | None # Consumer-only - auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"] - enable_auto_commit: bool - fetch_max_wait_ms: int - group_id: str - session_timeout_ms: int + auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"] | None + enable_auto_commit: bool | None + fetch_min_bytes: int | None + fetch_message_max_bytes: int | None + fetch_max_wait_ms: int | None + group_id: str | None + session_timeout_ms: int | None class _KafkaConfigMixin: @@ -142,10 +156,13 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para "ssl.certificate.location": params.get("ssl_certfile"), "ssl.crl.location": params.get("ssl_crlfile"), "ssl.key.location": params.get("ssl_keyfile"), + "topic.metadata.refresh.interval.ms": params.get("topic_metadata_refresh_interval_ms"), "error_cb": self._error_callback, # Consumer-only "auto.offset.reset": params.get("auto_offset_reset"), "enable.auto.commit": params.get("enable_auto_commit"), + "fetch.min.bytes": params.get("fetch_min_bytes"), + "fetch.message.max.bytes": params.get("fetch_message_max_bytes"), "fetch.wait.max.ms": params.get("fetch_max_wait_ms"), "group.id": params.get("group_id"), "session.timeout.ms": params.get("session_timeout_ms"), diff --git a/karapace/kafka/consumer.py b/karapace/kafka/consumer.py index d7315b378..2899820cd 100644 --- a/karapace/kafka/consumer.py +++ b/karapace/kafka/consumer.py @@ -5,14 +5,15 @@ from __future__ import annotations -from confluent_kafka import Consumer, TopicPartition +from confluent_kafka import Consumer, Message, TopicPartition from confluent_kafka.admin import PartitionMetadata from confluent_kafka.error import KafkaException -from kafka.errors import KafkaTimeoutError +from kafka.errors import IllegalStateError, KafkaTimeoutError from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception -from typing import Iterable +from typing import Any, Callable, Iterable, TypeVar from typing_extensions import Unpack +import asyncio import secrets @@ -32,6 +33,7 @@ def __init__( super().__init__(bootstrap_servers, verify_connection, **params) + self._subscription: frozenset[str] = frozenset() if topic is not None: self.subscribe([topic]) @@ -67,3 +69,210 @@ def get_watermark_offsets( return result except KafkaException as exc: raise_from_kafkaexception(exc) + + def commit( # type: ignore[override] + self, + message: Message | None = None, + offsets: list[TopicPartition] | None = None, + ) -> list[TopicPartition] | None: + """Commit offsets based on a message or offsets (topic partitions). + + The `message` and `offsets` parameters are mutually exclusive. + """ + if message is not None and offsets is not None: + raise ValueError("Parameters message and offsets are mutually exclusive.") + + try: + if message is not None: + return super().commit(message=message, asynchronous=False) + + return super().commit(offsets=offsets, asynchronous=False) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def committed(self, partitions: list[TopicPartition], timeout: float | None = None) -> list[TopicPartition]: + try: + if timeout is not None: + return super().committed(partitions, timeout) + + return super().committed(partitions) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def subscribe( # type: ignore[override] + self, + topics: list[str] | None = None, + patterns: list[str] | None = None, + ) -> None: + """Subscribe to a list of topics and/or topic patterns. + + Subscriptions are not incremental. + For `Consumer.subscribe`, Topic patterns must start with "^", eg. + "^this-is-a-regex-[0-9]", thus we prefix all strings in the `patterns` + list with "^". + + The `on_assign` and `on_revoke` callbacks are set to keep track of + subscriptions (topics). + + More in the confluent-kafka documentation: + https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.subscribe + """ + topics = topics or [] + patterns = patterns or [] + self.log.info("Subscribing to topics %s and patterns %s", topics, patterns) + if self._subscription: + self.log.warning("Overriding existing subscription: %s", self.subscription()) + try: + super().subscribe( + topics + [f"^{pattern}" for pattern in patterns], + on_assign=self._on_assign, + on_revoke=self._on_revoke, + ) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + # Prefill the subscription set with fixed topic names, so only topic + # pattern updates have the need to wait for the callback. + self._subscription = frozenset(topics) + + def _on_assign(self, _consumer: Consumer, partitions: list[TopicPartition]) -> None: + topics = frozenset(partition.topic for partition in partitions) + self._subscription = self._subscription.union(topics) + + def _on_revoke(self, _consumer: Consumer, partitions: list[TopicPartition]) -> None: + topics = frozenset(partition.topic for partition in partitions) + self._subscription = self._subscription.difference(topics) + + def subscription(self) -> frozenset[str]: + """Returns the list of topic names the consumer is subscribed to. + + The topic list is maintained by the `_on_assign` and `_on_revoke` callback + methods, which are set in `subscribe`. These callbacks are only called + when `poll` is called. + """ + return self._subscription + + def unsubscribe(self) -> None: + try: + super().unsubscribe() + except KafkaException as exc: + raise_from_kafkaexception(exc) + + self._subscription = frozenset() + + def assign(self, partitions: list[TopicPartition]) -> None: + """Assign a list of topic partitions to the consumer. + + Raises an `IllegalStateError` if `subscribe` has been previously called. + This is partly to match previous behaviour from `aiokafka`, but more + importantly to make sure we do not eventually reset the consumer by + calling `assign` after `subscribe` for the same topic - which would + result in the consumer starting from the beginning of a topic after an + unspecified time. + """ + if self._subscription: + raise IllegalStateError + + try: + super().assign(partitions) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def assignment(self) -> list[TopicPartition]: + try: + return super().assignment() + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def seek(self, partition: TopicPartition) -> None: + try: + super().seek(partition) + except KafkaException as exc: + raise_from_kafkaexception(exc) + + +T = TypeVar("T") + + +class AsyncKafkaConsumer: + """An async wrapper around `KafkaConsumer` built on confluent-kafka. + + Async methods are ran in the event loop's executor. Calling `start` + instantiates the underlying `KafkaConsumer`. + """ + + _START_ERROR: str = "Async consumer must be started" + + def __init__( + self, + bootstrap_servers: Iterable[str] | str, + topic: str | None = None, + loop: asyncio.AbstractEventLoop | None = None, + **params: Unpack[KafkaClientParams], + ) -> None: + self.loop = loop or asyncio.get_running_loop() + + self.consumer: KafkaConsumer | None = None + self._bootstrap_servers = bootstrap_servers + self._topic = topic + self._consumer_params = params + + async def _run_in_executor(self, func: Callable[..., T], *args: Any) -> T: + return await self.loop.run_in_executor(None, func, *args) + + def _start(self) -> None: + self.consumer = KafkaConsumer( + self._bootstrap_servers, + topic=self._topic, + **self._consumer_params, + ) + + async def start(self) -> None: + await self._run_in_executor(self._start) + + async def poll(self, timeout: float) -> Message | None: + assert self.consumer is not None, self._START_ERROR + return await self._run_in_executor(self.consumer.poll, timeout) + + async def commit( + self, + message: Message | None = None, + offsets: list[TopicPartition] | None = None, + ) -> list[TopicPartition] | None: + assert self.consumer is not None, self._START_ERROR + return await self._run_in_executor(self.consumer.commit, message, offsets) + + async def committed(self, partitions: list[TopicPartition], timeout: float | None = None) -> list[TopicPartition]: + assert self.consumer is not None, self._START_ERROR + return await self._run_in_executor(self.consumer.committed, partitions, timeout) + + async def subscribe(self, topics: list[str] | None = None, patterns: list[str] | None = None) -> None: + assert self.consumer is not None, self._START_ERROR + return await self._run_in_executor(self.consumer.subscribe, topics, patterns) + + def subscription(self) -> frozenset[str]: + assert self.consumer is not None, self._START_ERROR + return self.consumer.subscription() + + async def unsubscribe(self) -> None: + assert self.consumer is not None, self._START_ERROR + return await self._run_in_executor(self.consumer.unsubscribe) + + async def assign(self, partitions: list[TopicPartition]) -> None: + assert self.consumer is not None, self._START_ERROR + return await self._run_in_executor(self.consumer.assign, partitions) + + async def assignment(self) -> list[TopicPartition]: + assert self.consumer is not None, self._START_ERROR + return await self._run_in_executor(self.consumer.assignment) + + async def seek(self, partition: TopicPartition) -> None: + assert self.consumer is not None, self._START_ERROR + return await self._run_in_executor(self.consumer.seek, partition) + + async def stop(self) -> None: + assert self.consumer is not None, self._START_ERROR + # After the `KafkaConsumer` is closed, there is no further action to + # be taken, as it has its own checks and errors are raised if a closed + # consumer is tried to be used + return await self._run_in_executor(self.consumer.close) diff --git a/karapace/kafka_rest_apis/consumer_manager.py b/karapace/kafka_rest_apis/consumer_manager.py index afb4bfe0f..3a248bc66 100644 --- a/karapace/kafka_rest_apis/consumer_manager.py +++ b/karapace/kafka_rest_apis/consumer_manager.py @@ -2,9 +2,9 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from aiokafka import AIOKafkaConsumer from asyncio import Lock from collections import defaultdict, namedtuple +from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, TopicPartition from functools import partial from http import HTTPStatus from kafka.errors import ( @@ -13,9 +13,12 @@ KafkaConfigurationError, KafkaError, TopicAuthorizationFailedError, + UnknownTopicOrPartitionError, ) -from kafka.structs import TopicPartition -from karapace.config import Config, create_client_ssl_context +from karapace.config import Config +from karapace.kafka.common import translate_from_kafkaerror +from karapace.kafka.consumer import AsyncKafkaConsumer +from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS, Timestamp from karapace.kafka_rest_apis.authentication import get_kafka_client_auth_parameters_from_config from karapace.kafka_rest_apis.error_codes import RESTErrorCodes from karapace.karapace import empty_response, KarapaceBase @@ -198,30 +201,31 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ KarapaceBase.r(content_type=content_type, body={"base_uri": consumer_base_uri, "instance_id": consumer_name}) async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name, request_data): - ssl_context = create_client_ssl_context(self.config) for retry in [True, True, False]: try: session_timeout_ms = self.config["session_timeout_ms"] request_timeout_ms = max( session_timeout_ms, - 305000, # Copy of old default from kafka-python's request_timeout_ms (not exposed by aiokafka) + DEFAULT_REQUEST_TIMEOUT_MS, request_data["consumer.request.timeout.ms"], ) - c = AIOKafkaConsumer( + c = AsyncKafkaConsumer( bootstrap_servers=self.config["bootstrap_uri"], + auto_offset_reset=request_data["auto.offset.reset"], client_id=internal_name, - security_protocol=self.config["security_protocol"], - ssl_context=ssl_context, - group_id=group_name, - fetch_min_bytes=max(1, fetch_min_bytes), # Discard earlier negative values - fetch_max_bytes=self.config["consumer_request_max_bytes"], - fetch_max_wait_ms=self.config.get("consumer_fetch_max_wait_ms", 500), # Copy aiokafka default 500 ms - # This will cause delay if subscription is changed. - consumer_timeout_ms=self.config.get("consumer_timeout_ms", 200), # Copy aiokafka default 200 ms - request_timeout_ms=request_timeout_ms, enable_auto_commit=request_data["auto.commit.enable"], - auto_offset_reset=request_data["auto.offset.reset"], + fetch_max_wait_ms=self.config.get("consumer_fetch_max_wait_ms"), + fetch_message_max_bytes=self.config["consumer_request_max_bytes"], + fetch_min_bytes=max(1, fetch_min_bytes), # Discard earlier negative values + group_id=group_name, + security_protocol=self.config["security_protocol"], session_timeout_ms=session_timeout_ms, + socket_timeout_ms=request_timeout_ms, + ssl_cafile=self.config["ssl_cafile"], + ssl_certfile=self.config["ssl_certfile"], + ssl_crlfile=self.config["ssl_crlfile"], + ssl_keyfile=self.config["ssl_keyfile"], + topic_metadata_refresh_interval_ms=request_data.get("topic.metadata.refresh.interval.ms"), **get_kafka_client_auth_parameters_from_config(self.config), ) await c.start() @@ -255,14 +259,20 @@ async def commit_offsets( self._assert_consumer_exists(internal_name, content_type) if request_data: self._assert_has_key(request_data, "offsets", content_type) - payload = {} + payload = [] for el in request_data.get("offsets", []): for k in ["partition", "offset"]: convert_to_int(el, k, content_type) # If we commit for a partition that does not belong to this consumer, then the internal error raised # is marked as retriable, and thus the commit method will remain blocked in what looks like an infinite loop self._topic_and_partition_valid(cluster_metadata, el, content_type) - payload[TopicPartition(el["topic"], el["partition"])] = el["offset"] + 1 + payload.append( + TopicPartition( + topic=el["topic"], + partition=el["partition"], + offset=el["offset"] + 1, + ), + ) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer @@ -284,7 +294,7 @@ async def get_offsets(self, internal_name: Tuple[str, str], content_type: str, r convert_to_int(el, "partition", content_type) tp = TopicPartition(el["topic"], el["partition"]) try: - offset = await consumer.committed(tp) + [committed_partition] = await consumer.committed([tp]) except GroupAuthorizationFailedError: KarapaceBase.r(body={"message": "Forbidden"}, content_type=content_type, status=HTTPStatus.FORBIDDEN) except KafkaError as ex: @@ -292,14 +302,14 @@ async def get_offsets(self, internal_name: Tuple[str, str], content_type: str, r message=f"Failed to get offsets: {ex}", content_type=content_type, ) - if offset is None: + if committed_partition is None: continue response["offsets"].append( { "topic": tp.topic, "partition": tp.partition, "metadata": "", - "offset": offset, + "offset": committed_partition.offset, } ) KarapaceBase.r(body=response, content_type=content_type) @@ -324,9 +334,9 @@ async def set_subscription(self, internal_name: Tuple[str, str], content_type: s async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer try: - # aiokafka does not verify access to topics subscribed during this call, thus cannot get topic authorzation - # error immediately - consumer.subscribe(topics=topics, pattern=topics_pattern) + # the client does not verify access to topics subscribed during this call, + # thus cannot get topic authorization error immediately + await consumer.subscribe(topics=topics, patterns=[topics_pattern] if topics_pattern is not None else None) empty_response() except IllegalStateError as e: self._illegal_state_fail(str(e), content_type=content_type) @@ -338,17 +348,13 @@ async def get_subscription(self, internal_name: Tuple[str, str], content_type: s self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer - if consumer.subscription() is None: - topics = [] - else: - topics = list(consumer.subscription()) - KarapaceBase.r(content_type=content_type, body={"topics": topics}) + KarapaceBase.r(content_type=content_type, body={"topics": list(consumer.subscription())}) async def delete_subscription(self, internal_name: Tuple[str, str], content_type: str): LOG.info("Deleting subscription for %s", internal_name) self._assert_consumer_exists(internal_name, content_type) async with self.consumer_locks[internal_name]: - self.consumers[internal_name].consumer.unsubscribe() + await self.consumers[internal_name].consumer.unsubscribe() empty_response() # ASSIGNMENTS @@ -364,7 +370,7 @@ async def set_assignments(self, internal_name: Tuple[str, str], content_type: st async with self.consumer_locks[internal_name]: try: consumer = self.consumers[internal_name].consumer - consumer.assign(partitions) + await consumer.assign(partitions) empty_response() except IllegalStateError as e: self._illegal_state_fail(message=str(e), content_type=content_type) @@ -378,7 +384,7 @@ async def get_assignments(self, internal_name: Tuple[str, str], content_type: st consumer = self.consumers[internal_name].consumer KarapaceBase.r( content_type=content_type, - body={"partitions": [{"topic": pd.topic, "partition": pd.partition} for pd in consumer.assignment()]}, + body={"partitions": [{"topic": pd.topic, "partition": pd.partition} for pd in await consumer.assignment()]}, ) # POSITIONS @@ -393,13 +399,13 @@ async def seek_to(self, internal_name: Tuple[str, str], content_type: str, reque self._assert_has_key(el, k, content_type) convert_to_int(el, k, content_type) self._assert_positive_number(el, "offset", content_type) - seeks.append((TopicPartition(topic=el["topic"], partition=el["partition"]), el["offset"])) + seeks.append(TopicPartition(topic=el["topic"], partition=el["partition"], offset=el["offset"])) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer - for part, offset in seeks: + for part in seeks: try: - consumer.seek(part, offset) - except IllegalStateError: + await consumer.seek(part) + except (UnknownTopicOrPartitionError, IllegalStateError): self._illegal_state_fail(f"Partition {part} is unassigned", content_type) empty_response() @@ -415,18 +421,27 @@ async def seek_limit( convert_to_int(el, "partition", content_type) for k in ["topic", "partition"]: self._assert_has_key(el, k, content_type) - resets.append(TopicPartition(topic=el["topic"], partition=el["partition"])) + resets.append( + TopicPartition( + topic=el["topic"], + partition=el["partition"], + offset=OFFSET_BEGINNING if beginning else OFFSET_END, + ) + ) async with self.consumer_locks[internal_name]: consumer = self.consumers[internal_name].consumer try: - if beginning: - await consumer.seek_to_beginning(*resets) - else: - await consumer.seek_to_end(*resets) + await asyncio.gather(*(consumer.seek(topic_partition) for topic_partition in resets)) empty_response() - except AssertionError: + except IllegalStateError: self._illegal_state_fail(f"Trying to reset unassigned partitions to {direction}", content_type) + except UnknownTopicOrPartitionError: + KarapaceBase.not_found( + message="Unknown topic or partition", + content_type=content_type, + sub_code=RESTErrorCodes.UNKNOWN_TOPIC_OR_PARTITION.value, + ) async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats: dict, query_params: dict): LOG.info("Running fetch for name %s with parameters %r and formats %r", internal_name, query_params, formats) @@ -453,7 +468,11 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats # we get to be more in line with the confluent proxy by doing a bunch of fetches each time and # respecting the max fetch request size # pylint: disable=protected-access - max_bytes = int(query_params["max_bytes"]) if "max_bytes" in query_params else consumer._fetch_max_bytes + max_bytes = ( + int(query_params["max_bytes"]) + if "max_bytes" in query_params + else self.config["consumer_request_max_bytes"] + ) except ValueError: KarapaceBase.internal_error(message=f"Invalid request parameters: {query_params}", content_type=content_type) for val in [timeout, max_bytes]: @@ -470,9 +489,8 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats ) read_bytes = 0 start_time = time.monotonic() - poll_data = defaultdict(list) + poll_data = [] message_count = 0 - # Read buffered records with calling getmany() with possibly zero timeout and max_records=1 multiple times read_buffered = True while read_bytes < max_bytes and (start_time + timeout / 1000 > time.monotonic() or read_buffered): read_buffered = False @@ -486,56 +504,68 @@ async def fetch(self, internal_name: Tuple[str, str], content_type: str, formats ) timeout_left = max(0, (start_time - time.monotonic()) * 1000 + timeout) try: - data = await consumer.getmany(timeout_ms=timeout_left, max_records=1) + message = await consumer.poll(timeout=timeout_left / 1000) + if message is None: + continue + if message.error() is not None: + raise translate_from_kafkaerror(message.error()) except (GroupAuthorizationFailedError, TopicAuthorizationFailedError): KarapaceBase.r(body={"message": "Forbidden"}, content_type=content_type, status=HTTPStatus.FORBIDDEN) + except UnknownTopicOrPartitionError: + KarapaceBase.not_found( + message=f"Unknown topic or partition: {message.error()}", + content_type=content_type, + sub_code=RESTErrorCodes.UNKNOWN_TOPIC_OR_PARTITION.value, + ) except KafkaError as ex: KarapaceBase.internal_error( message=f"Failed to fetch: {ex}", content_type=content_type, ) LOG.debug("Successfully polled for messages") - for topic, records in data.items(): - for rec in records: - message_count += 1 - read_bytes += max(0, 0 if rec.key is None else len(rec.key)) + max( - 0, 0 if rec.value is None else len(rec.value) - ) - poll_data[topic].append(rec) - read_buffered = True + message_count += 1 + key_bytes = 0 if message.key() is None else len(message.key()) + value_bytes = 0 if message.value() is None else len(message.value()) + read_bytes += key_bytes + value_bytes + poll_data.append(message) + read_buffered = True LOG.info( "Gathered %d total messages (%d bytes read) in %r", message_count, read_bytes, time.monotonic() - start_time, ) - for tp in poll_data: - for msg in poll_data[tp]: - try: - key = await self.deserialize(msg.key, request_format) if msg.key else None - except DeserializationError as e: - KarapaceBase.unprocessable_entity( - message=f"key deserialization error for format {request_format}: {e}", - sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, - content_type=content_type, - ) - try: - value = await self.deserialize(msg.value, request_format) if msg.value else None - except DeserializationError as e: - KarapaceBase.unprocessable_entity( - message=f"value deserialization error for format {request_format}: {e}", - sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, - content_type=content_type, - ) - element = { - "topic": tp.topic, - "partition": tp.partition, - "offset": msg.offset, - "timestamp": msg.timestamp, - "key": key, - "value": value, - } - response.append(element) + for msg in poll_data: + try: + key = await self.deserialize(msg.key(), request_format) if msg.key() else None + except DeserializationError as e: + KarapaceBase.unprocessable_entity( + message=f"key deserialization error for format {request_format}: {e}", + sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, + content_type=content_type, + ) + try: + value = await self.deserialize(msg.value(), request_format) if msg.value() else None + except DeserializationError as e: + KarapaceBase.unprocessable_entity( + message=f"value deserialization error for format {request_format}: {e}", + sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value, + content_type=content_type, + ) + element = { + "topic": msg.topic(), + "partition": msg.partition(), + "offset": msg.offset(), + # `confluent_kafka.Message.timestamp()` returns a tuple where the first component is + # the timestamp type, see `karapace.kafka.types.Timestamp` + # In case of the `NOT_AVAILABLE` type whatever the timestamp may be, it cannot be trusted + # and should be ignored according to the confluent-kafka documentation: + # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/#confluent_kafka.Message + "timestamp": msg.timestamp()[1] if msg.timestamp()[0] != Timestamp.NOT_AVAILABLE else None, + "key": key, + "value": value, + } + response.append(element) KarapaceBase.r(content_type=content_type, body=response) diff --git a/stubs/confluent_kafka/__init__.pyi b/stubs/confluent_kafka/__init__.pyi index 3d26b0393..175569fb4 100644 --- a/stubs/confluent_kafka/__init__.pyi +++ b/stubs/confluent_kafka/__init__.pyi @@ -2,6 +2,8 @@ from ._model import IsolationLevel from .cimpl import ( Consumer, Message, + OFFSET_BEGINNING, + OFFSET_END, Producer, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME, @@ -14,6 +16,8 @@ __all__ = ( "IsolationLevel", "Message", "Producer", + "OFFSET_BEGINNING", + "OFFSET_END", "TIMESTAMP_CREATE_TIME", "TIMESTAMP_LOG_APPEND_TIME", "TIMESTAMP_NOT_AVAILABLE", diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index 48d67c155..ac91b00a5 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -1,13 +1,20 @@ from confluent_kafka.admin._metadata import ClusterMetadata from typing import Any, Callable, Final +OFFSET_BEGINNING: Final = ... +OFFSET_END: Final = ... + class KafkaError: _NOENT: int _AUTHENTICATION: int _UNKNOWN_TOPIC: int _UNKNOWN_PARTITION: int + _TIMED_OUT: int + _STATE: int + UNKNOWN_TOPIC_OR_PART: int def code(self) -> int: ... + def str(self) -> str: ... class KafkaException(Exception): def __init__(self, *args: Any, **kwargs: Any) -> None: @@ -38,6 +45,7 @@ class TopicPartition: self.offset: int self.metadata: str | None self.leader_epoch: int | None + self.error: KafkaError | None class Message: def offset(self) -> int: ... @@ -65,7 +73,12 @@ class Producer: def poll(self, timeout: float = -1) -> int: ... class Consumer: - def subscribe(self, topics: list[str]) -> None: ... + def subscribe( + self, + topics: list[str], + on_assign: Callable[[Consumer, list[TopicPartition]], None] | None = None, + on_revoke: Callable[[Consumer, list[TopicPartition]], None] | None = None, + ) -> None: ... def get_watermark_offsets( self, partition: TopicPartition, timeout: float | None = None, cached: bool = False ) -> tuple[int, int] | None: ... @@ -74,6 +87,13 @@ class Consumer: def consume(self, num_messages: int = 1, timeout: float = -1) -> list[Message]: ... def poll(self, timeout: float = -1) -> Message | None: ... def assign(self, partitions: list[TopicPartition]) -> None: ... + def commit( + self, message: Message | None = None, offsets: list[TopicPartition] | None = None, asynchronous: bool = True + ) -> list[TopicPartition] | None: ... + def committed(self, partitions: list[TopicPartition], timeout: float = -1) -> list[TopicPartition]: ... + def unsubscribe(self) -> None: ... + def assignment(self) -> list[TopicPartition]: ... + def seek(self, partition: TopicPartition) -> None: ... TIMESTAMP_CREATE_TIME: Final = ... TIMESTAMP_NOT_AVAILABLE: Final = ... diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 9f7d4a77d..12a400dde 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -129,7 +129,7 @@ def _assert_canonical_key_format( ) -> None: # Consume all records and assert key order is canonical consumer = KafkaConsumer( - schemas_topic, + topic=schemas_topic, group_id="assert-canonical-key-format-consumer", enable_auto_commit=False, bootstrap_servers=bootstrap_servers, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index beba5adbe..c5e7af152 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -14,8 +14,8 @@ from karapace.client import Client from karapace.config import Config, set_config_defaults, write_config from karapace.kafka.admin import KafkaAdminClient -from karapace.kafka.consumer import KafkaConsumer -from karapace.kafka.producer import KafkaProducer +from karapace.kafka.consumer import AsyncKafkaConsumer, KafkaConsumer +from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer from karapace.kafka_rest_apis import KafkaRest from pathlib import Path from tests.conftest import KAFKA_VERSION @@ -224,11 +224,42 @@ def fixture_consumer( bootstrap_servers=kafka_servers.bootstrap_servers, auto_offset_reset="earliest", enable_auto_commit=False, + # Speed things up for consumer tests to discover topics, etc. + topic_metadata_refresh_interval_ms=200, ) yield consumer consumer.close() +@pytest.fixture(scope="function", name="asyncproducer") +async def fixture_asyncproducer( + kafka_servers: KafkaServers, + loop: asyncio.AbstractEventLoop, +) -> Iterator[AsyncKafkaProducer]: + asyncproducer = AsyncKafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers, loop=loop) + await asyncproducer.start() + yield asyncproducer + await asyncproducer.stop() + + +@pytest.fixture(scope="function", name="asyncconsumer") +async def fixture_asyncconsumer( + kafka_servers: KafkaServers, + loop: asyncio.AbstractEventLoop, +) -> Iterator[AsyncKafkaConsumer]: + asyncconsumer = AsyncKafkaConsumer( + bootstrap_servers=kafka_servers.bootstrap_servers, + loop=loop, + auto_offset_reset="earliest", + enable_auto_commit=False, + # Speed things up for consumer tests to discover topics, etc. + topic_metadata_refresh_interval_ms=200, + ) + await asyncconsumer.start() + yield asyncconsumer + await asyncconsumer.stop() + + @pytest.fixture(scope="function", name="rest_async") async def fixture_rest_async( request: SubRequest, diff --git a/tests/integration/kafka/test_consumer.py b/tests/integration/kafka/test_consumer.py index 4fceb1b72..c724c0ed1 100644 --- a/tests/integration/kafka/test_consumer.py +++ b/tests/integration/kafka/test_consumer.py @@ -4,15 +4,22 @@ """ from __future__ import annotations -from confluent_kafka import TopicPartition +from confluent_kafka import OFFSET_BEGINNING, OFFSET_END, TopicPartition from confluent_kafka.admin import NewTopic -from kafka.errors import UnknownTopicOrPartitionError -from karapace.kafka.consumer import KafkaConsumer -from karapace.kafka.producer import KafkaProducer +from confluent_kafka.error import KafkaError +from kafka.errors import IllegalStateError, KafkaTimeoutError, UnknownTopicOrPartitionError +from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import AsyncKafkaConsumer, KafkaConsumer +from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer +from karapace.utils import Expiration from tests.integration.utils.kafka_server import KafkaServers +from tests.utils import new_topic as create_new_topic +from typing import Final import pytest +POLL_TIMEOUT_S: Final = 10 + class TestPartitionsForTopic: def test_partitions_for_returns_empty_for_unknown_topic(self, consumer: KafkaConsumer) -> None: @@ -56,3 +63,306 @@ def test_get_watermark_offsets_topic_with_one_message( assert beginning == 0 assert end == 1 + + +class TestCommit: + def test_commit_message_and_offset_mutual_exclusion( + self, + consumer: KafkaConsumer, + producer: KafkaProducer, + new_topic: NewTopic, + ) -> None: + fut = producer.send(new_topic.topic) + producer.flush() + message = fut.result() + + with pytest.raises(ValueError): + consumer.commit(message=message, offsets=[]) + + def test_commit_message( + self, + producer: KafkaProducer, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + consumer.subscribe([new_topic.topic]) + first_fut = producer.send(new_topic.topic) + second_fut = producer.send(new_topic.topic) + producer.flush() + first_fut.result() + second_fut.result() + consumer.poll(timeout=POLL_TIMEOUT_S) + message = consumer.poll(timeout=POLL_TIMEOUT_S) + + [topic_partition] = consumer.commit(message) + [committed_partition] = consumer.committed([TopicPartition(new_topic.topic, partition=0)]) + + assert topic_partition.topic == new_topic.topic + assert topic_partition.partition == 0 + assert topic_partition.offset == 2 + assert committed_partition.topic == new_topic.topic + assert committed_partition.partition == 0 + assert committed_partition.offset == 2 + + def test_commit_offsets( + self, + producer: KafkaProducer, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + consumer.subscribe([new_topic.topic]) + first_fut = producer.send(new_topic.topic) + second_fut = producer.send(new_topic.topic) + producer.flush() + first_fut.result() + second_fut.result() + consumer.poll(timeout=POLL_TIMEOUT_S) + message = consumer.poll(timeout=POLL_TIMEOUT_S) + + [topic_partition] = consumer.commit( + offsets=[ + TopicPartition( + new_topic.topic, + partition=0, + offset=message.offset() + 1, + ), + ] + ) + [committed_partition] = consumer.committed([TopicPartition(new_topic.topic, partition=0)]) + + assert topic_partition.topic == new_topic.topic + assert topic_partition.partition == 0 + assert topic_partition.offset == 2 + assert committed_partition.topic == new_topic.topic + assert committed_partition.partition == 0 + assert committed_partition.offset == 2 + + def test_commit_raises_for_unknown_partition( + self, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + consumer.subscribe([new_topic.topic]) + with pytest.raises(UnknownTopicOrPartitionError): + consumer.commit(offsets=[TopicPartition(new_topic.topic, partition=99, offset=0)]) + + +class TestCommitted: + def test_committed_raises_on_timeout(self, consumer: KafkaConsumer) -> None: + with pytest.raises(KafkaTimeoutError): + consumer.committed([], timeout=0.00001) + + +class TestSubscribe: + def _poll_until_no_message(self, consumer: KafkaConsumer) -> None: + """Polls until there is no message returned. + + When verifying subscriptions this can be used to wait until the topics + subscribed to are all ready. Until a topic is not ready, a message is + returned with an error indicating that certain topics are not ready to + be consumed yet. + """ + msg = consumer.poll(timeout=POLL_TIMEOUT_S) + while msg is not None: + msg = consumer.poll(timeout=POLL_TIMEOUT_S) + + def test_subscription_is_recorded( + self, + admin_client: KafkaAdminClient, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + prefix = "subscribe" + topics = [create_new_topic(admin_client, prefix=prefix) for _ in range(3)] + + consumer.subscribe(topics=[new_topic.topic], patterns=[f"{prefix}.*"]) + self._poll_until_no_message(consumer) + + assert consumer.subscription() == frozenset(topics + [new_topic.topic]) + + def test_unsubscribe_empties_subscription( + self, + admin_client: KafkaAdminClient, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + prefix = "unsubscribe" + _ = [create_new_topic(admin_client, prefix=prefix) for _ in range(3)] + consumer.subscribe(topics=[new_topic.topic], patterns=[f"{prefix}.*"]) + self._poll_until_no_message(consumer) + + consumer.unsubscribe() + + self._poll_until_no_message(consumer) + + assert consumer.subscription() == frozenset() + + def test_resubscribe_modifies_subscription( + self, + admin_client: KafkaAdminClient, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + prefix = "resubscribe" + _ = [create_new_topic(admin_client, prefix=prefix) for _ in range(3)] + consumer.subscribe(topics=[new_topic.topic], patterns=[f"{prefix}.*"]) + self._poll_until_no_message(consumer) + + consumer.subscribe(topics=[new_topic.topic]) + + self._poll_until_no_message(consumer) + + assert consumer.subscription() == frozenset([new_topic.topic]) + + +class TestAssign: + def test_assign(self, consumer: KafkaConsumer, producer: KafkaProducer, new_topic: NewTopic) -> None: + first_fut = producer.send(new_topic.topic) + second_fut = producer.send(new_topic.topic, value=b"message-value") + producer.flush() + first_fut.result() + second_fut.result() + consumer.assign([TopicPartition(new_topic.topic, partition=0, offset=1)]) + + [assigned_partition] = consumer.assignment() + first_message = consumer.poll(POLL_TIMEOUT_S) + second_message = consumer.poll(POLL_TIMEOUT_S) + + assert first_message.offset() == 1 + assert first_message.topic() == new_topic.topic + assert first_message.partition() == 0 + assert first_message.key() is None + assert first_message.value() == b"message-value" + assert second_message is None + + assert assigned_partition.topic == new_topic.topic + assert assigned_partition.partition == 0 + assert assigned_partition.offset == 1 + + def test_assign_raises_illegal_state_after_subscribe( + self, + consumer: KafkaConsumer, + new_topic: NewTopic, + ) -> None: + consumer.subscribe([new_topic.topic]) + consumer.poll(timeout=POLL_TIMEOUT_S) + + with pytest.raises(IllegalStateError): + consumer.assign([TopicPartition("some-topic", 0)]) + + +class TestSeek: + def test_seek(self, consumer: KafkaConsumer, producer: KafkaProducer, new_topic: NewTopic) -> None: + consumer.subscribe([new_topic.topic]) + fut = producer.send(new_topic.topic, value=b"message-value") + producer.flush() + fut.result() + + message = consumer.poll(timeout=POLL_TIMEOUT_S) + consumer.seek(TopicPartition(new_topic.topic, partition=0, offset=OFFSET_BEGINNING)) + same_message = consumer.poll(timeout=POLL_TIMEOUT_S) + + assert message.offset() == same_message.offset() + assert message.topic() == same_message.topic() + assert message.partition() == same_message.partition() + assert message.key() is None + assert same_message.key() is None + assert message.value() == same_message.value() + + def test_seek_unassigned_partition_raises(self, consumer: KafkaConsumer, new_topic: NewTopic) -> None: + with pytest.raises(UnknownTopicOrPartitionError): + consumer.seek(TopicPartition(new_topic.topic, partition=0, offset=OFFSET_END)) + + +class TestAsyncPoll: + async def test_async_poll( + self, + asyncproducer: AsyncKafkaProducer, + asyncconsumer: AsyncKafkaConsumer, + new_topic: NewTopic, + ) -> None: + await asyncconsumer.subscribe([new_topic.topic]) + aiofut = await asyncproducer.send(new_topic.topic) + await aiofut + + message = await asyncconsumer.poll(timeout=POLL_TIMEOUT_S) + + assert message.offset() == 0 + assert message.topic() == new_topic.topic + assert message.partition() == 0 + assert message.key() is None + assert message.value() is None + + async def test_async_poll_no_message(self, asyncconsumer: AsyncKafkaConsumer, new_topic: NewTopic) -> None: + await asyncconsumer.subscribe([new_topic.topic]) + + message = await asyncconsumer.poll(timeout=1) + + assert message is None + + async def test_async_poll_unknown_topic(self, asyncconsumer: AsyncKafkaConsumer) -> None: + await asyncconsumer.subscribe(["nonexistent"]) + + message = await asyncconsumer.poll(timeout=POLL_TIMEOUT_S) + + assert message.topic() == "nonexistent" + assert message.partition() == 0 + assert message.error() is not None + assert message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART + assert message.offset() is None + assert message.key() is None + assert message.value().decode() == message.error().str() + + async def test_async_poll_existing_topic_and_unknown_topic_pattern( + self, + asyncproducer: AsyncKafkaProducer, + asyncconsumer: AsyncKafkaConsumer, + new_topic: NewTopic, + ) -> None: + await asyncconsumer.subscribe(topics=[new_topic.topic], patterns=["nonexistent.*"]) + aiofut = await asyncproducer.send(new_topic.topic, value="message-value") + sent_message = await aiofut + + message = await asyncconsumer.poll(timeout=POLL_TIMEOUT_S) + while message.error() is not None: + assert message.topic() in ("^nonexistent.*", new_topic.topic) + assert message.partition() == 0 + assert message.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART + assert message.offset() is None + assert message.key() is None + assert message.value().decode() == message.error().str() + message = await asyncconsumer.poll(timeout=POLL_TIMEOUT_S) + + message_none = await asyncconsumer.poll(timeout=POLL_TIMEOUT_S) + + assert message.key() == sent_message.key() + assert message.value() == sent_message.value() + + assert message_none is None + + +async def test_pattern_subscription_async( + admin_client: KafkaAdminClient, + asyncproducer: AsyncKafkaProducer, + asyncconsumer: AsyncKafkaConsumer, +) -> None: + prefix = "patterntest" + number_of_topics = 3 + topics = [create_new_topic(admin_client, prefix=prefix) for _ in range(number_of_topics)] + await asyncconsumer.subscribe(patterns=[f"{prefix}.*"]) + for i, topic in enumerate(topics): + aiofut = await asyncproducer.send(topic, value=f"{i}-value") + await aiofut + + messages = [] + expiration = Expiration.from_timeout(30) + while len(messages) != 3: + expiration.raise_timeout_if_expired( + "Timeout elapsed waiting for messages from topic pattern. Only received {messages}", + messages=messages, + ) + message = await asyncconsumer.poll(timeout=POLL_TIMEOUT_S) + if message is not None and message.error() is None: + messages.append(message) + + assert sorted(message.value().decode() for message in messages) == [f"{i}-value" for i in range(number_of_topics)] diff --git a/tests/integration/kafka/test_producer.py b/tests/integration/kafka/test_producer.py index 59c0da0ef..04c27f108 100644 --- a/tests/integration/kafka/test_producer.py +++ b/tests/integration/kafka/test_producer.py @@ -9,10 +9,7 @@ from kafka.errors import MessageSizeTooLargeError, UnknownTopicOrPartitionError from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer from karapace.kafka.types import Timestamp -from tests.integration.utils.kafka_server import KafkaServers -from typing import Iterator -import asyncio import pytest import time @@ -76,17 +73,6 @@ def test_partitions_for(self, producer: KafkaProducer, new_topic: NewTopic) -> N assert partitions[0].isrs == [1] -@pytest.fixture(scope="function", name="asyncproducer") -async def fixture_asyncproducer( - kafka_servers: KafkaServers, - loop: asyncio.AbstractEventLoop, -) -> Iterator[AsyncKafkaProducer]: - asyncproducer = AsyncKafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers, loop=loop) - await asyncproducer.start() - yield asyncproducer - await asyncproducer.stop() - - class TestAsyncSend: async def test_async_send(self, asyncproducer: AsyncKafkaProducer, new_topic: NewTopic) -> None: key = b"key" diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index e1620d1b2..1539b15f7 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -11,6 +11,7 @@ repeat_until_successful_request, REST_HEADERS, schema_data, + wait_for_topics, ) import base64 @@ -87,7 +88,7 @@ async def test_subscription(rest_async_client, admin_client, producer, trail): topic_name = new_topic(admin_client) instance_id = await new_consumer(rest_async_client, group_name, fmt="binary", trail=trail) sub_path = f"/consumers/{group_name}/instances/{instance_id}/subscription{trail}" - consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=1000" + consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=5000" res = await rest_async_client.get(sub_path, headers=header) assert res.ok data = res.json() @@ -119,6 +120,7 @@ async def test_subscription(rest_async_client, admin_client, producer, trail): # one pattern sub will get all 3 prefix = f"{hash(random.random())}" pattern_topics = [new_topic(admin_client, prefix=f"{prefix}{i}") for i in range(3)] + await wait_for_topics(rest_async_client, topic_names=pattern_topics, timeout=20, sleep=1) res = await rest_async_client.post(sub_path, json={"topic_pattern": f"{prefix}.*"}, headers=REST_HEADERS["json"]) assert res.ok @@ -175,6 +177,7 @@ async def test_seek(rest_async_client, admin_client, trail): assign_payload = {"partitions": [{"topic": topic_name, "partition": 0}]} res = await rest_async_client.post(assign_path, headers=REST_HEADERS["json"], json=assign_payload) assert res.ok + await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=20, sleep=1) seek_payload = {"offsets": [{"topic": topic_name, "partition": 0, "offset": 10}]} res = await rest_async_client.post(seek_path, json=seek_payload, headers=REST_HEADERS["json"]) assert res.ok, f"Unexpected status for {res}" @@ -262,7 +265,7 @@ async def test_consume(rest_async_client, admin_client, producer, trail): instance_id = await new_consumer(rest_async_client, group_name, fmt=fmt, trail=trail) assign_path = f"/consumers/{group_name}/instances/{instance_id}/assignments{trail}" seek_path = f"/consumers/{group_name}/instances/{instance_id}/positions/beginning{trail}" - consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=1000" + consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=5000" topic_name = new_topic(admin_client) assign_payload = {"partitions": [{"topic": topic_name, "partition": 0}]} res = await rest_async_client.post(assign_path, json=assign_payload, headers=header) @@ -350,7 +353,7 @@ async def test_publish_consume_avro(rest_async_client, admin_client, trail, sche group_name = "e2e_group" instance_id = await new_consumer(rest_async_client, group_name, fmt=schema_type, trail=trail) assign_path = f"/consumers/{group_name}/instances/{instance_id}/assignments{trail}" - consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=1000" + consume_path = f"/consumers/{group_name}/instances/{instance_id}/records{trail}?timeout=5000" tn = new_topic(admin_client) assign_payload = {"partitions": [{"topic": tn, "partition": 0}]} res = await rest_async_client.post(assign_path, json=assign_payload, headers=header) @@ -393,7 +396,7 @@ async def test_consume_grafecul_deserialization_error_handling(rest_async_client res = await rest_async_client.post(assign_path, json=assign_payload, headers=headers) assert res.ok - consume_path = f"/consumers/{group_name}/instances/{instance_id}/records?timeout=1000" + consume_path = f"/consumers/{group_name}/instances/{instance_id}/records?timeout=5000" resp = await rest_async_client.get(consume_path, headers=headers) if fmt == "binary": assert resp.status_code == 200, f"Expected 200 response: {resp}" diff --git a/tests/integration/test_rest_consumer_protobuf.py b/tests/integration/test_rest_consumer_protobuf.py index 903d94327..d4c7ca7c4 100644 --- a/tests/integration/test_rest_consumer_protobuf.py +++ b/tests/integration/test_rest_consumer_protobuf.py @@ -239,7 +239,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references( subscribe_path = f"/consumers/{group}/instances/{instance_id}/subscription" - consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=1000" + consume_path = f"/consumers/{group}/instances/{instance_id}/records?timeout=5000" res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["binary"]) assert res.ok diff --git a/tests/utils.py b/tests/utils.py index bf17d9c8b..a48bc55cc 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -27,6 +27,7 @@ "consumer.request.timeout.ms": 11000, "fetch.min.bytes": 100000, "auto.commit.enable": "true", + "topic.metadata.refresh.interval.ms": 100, } schema_jsonschema_json = json.dumps( {