From c01b025dd23606a490a41e84f02c66095d355ab4 Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Mon, 30 Sep 2024 15:13:49 +0200 Subject: [PATCH] fix: unknown Kafka errors with _RESOLVE error code Also don't log KafkaUnavailableError as unexpected error --- src/karapace/kafka/common.py | 11 ++++++++++- src/karapace/schema_reader.py | 3 +++ stubs/confluent_kafka/cimpl.pyi | 1 + 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/karapace/kafka/common.py b/src/karapace/kafka/common.py index 5df9838a2..add44eea3 100644 --- a/src/karapace/kafka/common.py +++ b/src/karapace/kafka/common.py @@ -6,7 +6,14 @@ from __future__ import annotations from aiokafka.client import UnknownTopicOrPartitionError -from aiokafka.errors import AuthenticationFailedError, for_code, IllegalStateError, KafkaTimeoutError, NoBrokersAvailable +from aiokafka.errors import ( + AuthenticationFailedError, + for_code, + IllegalStateError, + KafkaTimeoutError, + KafkaUnavailableError, + NoBrokersAvailable, +) from collections.abc import Iterable from concurrent.futures import Future from confluent_kafka.error import KafkaError, KafkaException @@ -52,6 +59,8 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception: return KafkaTimeoutError() if code == KafkaError._STATE: # pylint: disable=protected-access return IllegalStateError() + if code == KafkaError._RESOLVE: # pylint: disable=protected-access + return KafkaUnavailableError() return for_code(code) diff --git a/src/karapace/schema_reader.py b/src/karapace/schema_reader.py index 01f07f379..cd04944dc 100644 --- a/src/karapace/schema_reader.py +++ b/src/karapace/schema_reader.py @@ -11,6 +11,7 @@ InvalidReplicationFactorError, KafkaConfigurationError, KafkaTimeoutError, + KafkaUnavailableError, LeaderNotAvailableError, NoBrokersAvailable, NodeNotReadyError, @@ -250,6 +251,8 @@ def run(self) -> None: except ShutdownException: self._stop_schema_reader.set() shutdown() + except KafkaUnavailableError: + LOG.warning("Kafka cluster is unavailable or broker can't be resolved.") except Exception as e: # pylint: disable=broad-except self.stats.unexpected_exception(ex=e, where="schema_reader_loop") LOG.warning("Unexpected exception in schema reader loop - %s", e) diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index 74760897c..6936d10f0 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -11,6 +11,7 @@ class KafkaError: _UNKNOWN_PARTITION: int _TIMED_OUT: int _STATE: int + _RESOLVE: int UNKNOWN_TOPIC_OR_PART: int def code(self) -> int: ...