Skip to content

Commit

Permalink
Merge pull request #543 from aiven/giuseppelillo/restore-backup-messa…
Browse files Browse the repository at this point in the history
…ge-error-handling

Handle errors while producing backup restoration messages
  • Loading branch information
tvainika authored Feb 15, 2023
2 parents c3d8120 + a543250 commit 29b65dc
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions karapace/schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from enum import Enum
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient
from kafka.errors import NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from kafka.errors import KafkaError, NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from kafka.structs import PartitionMetadata
from karapace import constants
from karapace.anonymize_schemas import anonymize_avro
Expand Down Expand Up @@ -197,6 +197,7 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str]
self.topic_name = topic_option or self.config["topic_name"]
self.admin_client = None
self.timeout_ms = 1000
self.timeout_kafka_producer = 5

# Schema key formatter
self.key_formatter = None
Expand Down Expand Up @@ -281,8 +282,15 @@ def restore_backup(self) -> None:
def _handle_restore_message(self, producer: KafkaProducer, item: Tuple[str, str]) -> None:
key = self.encode_key(item[0])
value = encode_value(item[1])
producer.send(self.topic_name, key=key, value=value, partition=PARTITION_ZERO)
LOG.debug("Sent kafka msg key: %r, value: %r", key, value)
LOG.debug("Trying to send kafka msg key: %r, value: %r", key, value)
try:
msg = producer.send(self.topic_name, key=key, value=value, partition=PARTITION_ZERO)
producer.flush(timeout=self.timeout_kafka_producer)
metadata = msg.get(timeout=self.timeout_kafka_producer)
except KafkaError as ex:
raise BackupError("Error while producing restored message") from ex
else:
LOG.debug("Sent kafka msg key: %r, value: %r, offset: %r", key, value, metadata.offset)

def _restore_backup_version_1_single_array(self, producer: KafkaProducer, fp: IO) -> None:
raw_msg = fp.read()
Expand Down

0 comments on commit 29b65dc

Please sign in to comment.