Skip to content

Commit

Permalink
Allow to commit offset in case of serialization errors
Browse files Browse the repository at this point in the history
  • Loading branch information
giamo committed Jul 16, 2023
1 parent f6867ac commit f5c52a7
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import jakarta.inject.Singleton;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micronaut.core.annotation.NonNull;
import java.util.Collections;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -43,6 +45,7 @@ public class DefaultKafkaListenerExceptionHandler implements KafkaListenerExcept
private static final Pattern SERIALIZATION_EXCEPTION_MESSAGE_PATTERN = Pattern.compile(".+ for partition (.+)-(\\d+) at offset (\\d+)\\..+");

private boolean skipRecordOnDeserializationFailure = true;
private boolean commitRecordOnDeserializationFailure = false;

@Override
public void handle(KafkaListenerException exception) {
Expand Down Expand Up @@ -76,7 +79,15 @@ public void setSkipRecordOnDeserializationFailure(boolean skipRecordOnDeserializ
}

/**
* Seeks past a serialization exception if an error occurs.
* Sets whether to commit the offset of past records that are not deserializable and are skipped.
* @param commitRecordOnDeserializationFailure True if the offset for records that are not deserializable should be committed after being skipped.
*/
public void setCommitRecordOnDeserializationFailure(boolean commitRecordOnDeserializationFailure) {
this.commitRecordOnDeserializationFailure = commitRecordOnDeserializationFailure;
}

/**
* Seeks past a serialization exception if an error occurs. Additionally commits the offset if commitRecordOnDeserializationFailure is set
* @param cause The cause
* @param consumerBean The consumer bean
* @param kafkaConsumer The kafka consumer
Expand All @@ -95,6 +106,14 @@ protected void seekPastDeserializationError(
TopicPartition tp = new TopicPartition(topic, partition);
LOG.debug("Seeking past unserializable consumer record for partition {}-{} and offset {}", topic, partition, offset);
kafkaConsumer.seek(tp, offset + 1);
if (this.commitRecordOnDeserializationFailure) {
try {
LOG.debug("Permanently skipping unserializable consumer record by committing offset {} for partition {}-{}", offset, topic, partition);
kafkaConsumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(offset + 1)));
} catch (Throwable e) {
LOG.error("Kafka consumer [{}] failed to commit offset of unserializable value: {}", consumerBean, e.getMessage(), e);
}
}
}
} catch (Throwable e) {
LOG.error("Kafka consumer [{}] failed to seek past unserializable value: {}", consumerBean, e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package io.micronaut.configuration.kafka.exceptions

import io.micronaut.context.annotation.Property
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.ProducerConfig
import io.micronaut.configuration.kafka.AbstractEmbeddedServerSpec
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.apache.kafka.common.TopicPartition

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED

class DefaultKafkaListenerExceptionHandlerSpec extends AbstractEmbeddedServerSpec {

private static final String TEST_TOPIC = "uuids"

void "test seek past record on deserialization error by default"() {
given:
StringProducer stringProducer = context.getBean(StringProducer)
DefaultBehaviorOnDeserializationErrorConsumer consumer = context.getBean(DefaultBehaviorOnDeserializationErrorConsumer)

when: "A producer sends a message with wrong serialization"
stringProducer.sendMessage("not-a-uuid")

then: "The message is skipped with a seek() but not committed"
conditions.eventually {
consumer.currentPosition == 1
consumer.committedOffset == 0
}
}

void "test commit record on deserialization error"() {
given:
StringProducer stringProducer = context.getBean(StringProducer)
CommitOnDeserializationErrorConsumer consumer = context.getBean(CommitOnDeserializationErrorConsumer)

when: "A producer sends a message with wrong serialization"
stringProducer.sendMessage("not-a-uuid")

then: "The message is skipped and committed"
conditions.eventually {
consumer.currentPosition == 1
consumer.committedOffset == 1
}
}

void "test do nothing on deserialization error"() {
given:
StringProducer stringProducer = context.getBean(StringProducer)
DoNothingOnDeserializationErrorConsumer consumer = context.getBean(DoNothingOnDeserializationErrorConsumer)

when: "A producer sends a message with wrong serialization"
stringProducer.sendMessage("not-a-uuid")

then: "The message is neither skipped nor committed"
conditions.eventually {
consumer.currentPosition == 0
consumer.committedOffset == 0
}
}

@Requires(property = 'spec.name', value = 'DefaultKafkaListenerExceptionHandlerSpec')
@KafkaListener(
groupId = "DefaultBehaviorOnDeserializationErrorConsumer",
offsetReset = EARLIEST,
offsetStrategy = DISABLED,
properties = [@Property(
name = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
value = "org.apache.kafka.common.serialization.UUIDSerializer"
)]
)
static class DefaultBehaviorOnDeserializationErrorConsumer implements KafkaListenerExceptionHandler {
Long currentPosition = -1
Long committedOffset = -1
DefaultKafkaListenerExceptionHandler errorHandler = new DefaultKafkaListenerExceptionHandler()

@Topic(TEST_TOPIC)
void receive(UUID uuid) {
}

@Override
void handle(KafkaListenerException exception) {
errorHandler.handle(exception)
TopicPartition tp = new TopicPartition(TEST_TOPIC, 0)
currentPosition = exception.kafkaConsumer.position(tp)
OffsetAndMetadata committedOffsetAndMetadata = exception.kafkaConsumer.committed(tp)
if (committedOffsetAndMetadata != null) {
committedOffset = committedOffsetAndMetadata.offset()
} else {
committedOffset = 0
}
}
}

@KafkaListener(
groupId = "CommitOnDeserializationErrorConsumer",
offsetReset = EARLIEST,
offsetStrategy = DISABLED,
properties = [@Property(
name = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
value = "org.apache.kafka.common.serialization.UUIDSerializer"
)]
)
static class CommitOnDeserializationErrorConsumer implements KafkaListenerExceptionHandler {
Long currentPosition = -1
Long committedOffset = -1
DefaultKafkaListenerExceptionHandler errorHandler = new DefaultKafkaListenerExceptionHandler()

CommitOnDeserializationErrorConsumer() {
errorHandler.setSkipRecordOnDeserializationFailure(true)
errorHandler.setCommitRecordOnDeserializationFailure(true)
}

@Topic(TEST_TOPIC)
void receive(UUID uuid) {
}

@Override
void handle(KafkaListenerException exception) {
errorHandler.handle(exception)
TopicPartition tp = new TopicPartition(TEST_TOPIC, 0)
currentPosition = exception.kafkaConsumer.position(tp)
OffsetAndMetadata committedOffsetAndMetadata = exception.kafkaConsumer.committed(tp)
if (committedOffsetAndMetadata != null) {
committedOffset = committedOffsetAndMetadata.offset()
} else {
committedOffset = 0
}
}
}

@KafkaListener(
groupId = "DoNothingOnDeserializationErrorConsumer",
offsetReset = EARLIEST,
offsetStrategy = DISABLED,
properties = [@Property(
name = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
value = "org.apache.kafka.common.serialization.UUIDSerializer"
)]
)
static class DoNothingOnDeserializationErrorConsumer implements KafkaListenerExceptionHandler {
Long currentPosition = -1
Long committedOffset = -1
DefaultKafkaListenerExceptionHandler errorHandler = new DefaultKafkaListenerExceptionHandler()

DoNothingOnDeserializationErrorConsumer() {
errorHandler.setSkipRecordOnDeserializationFailure(false)
}

@Topic(TEST_TOPIC)
void receive(UUID uuid) {
}

@Override
void handle(KafkaListenerException exception) {
errorHandler.handle(exception)
TopicPartition tp = new TopicPartition(TEST_TOPIC, 0)
currentPosition = exception.kafkaConsumer.position(tp)
OffsetAndMetadata committedOffsetAndMetadata = exception.kafkaConsumer.committed(tp)
if (committedOffsetAndMetadata != null) {
committedOffset = committedOffsetAndMetadata.offset()
} else {
committedOffset = 0
}
}
}

@Requires(property = 'spec.name', value = 'DefaultKafkaListenerExceptionHandlerSpec')
@KafkaClient
static interface StringProducer {
@Topic(TEST_TOPIC)
void sendMessage(String message)
}
}

0 comments on commit f5c52a7

Please sign in to comment.