Skip to content

Commit

Permalink
Allow to commit offset in case of deserialization errors (#761)
Browse files Browse the repository at this point in the history
* Allow to commit offset in case of serialization errors

* Disable test kafka listeners unless running their specific test

* Add documentation

* Update test to reflect current behavior

Previous behavior was buggy. It was fixed by:
#771

* Create config classes for the default kafka listener exception handler

* Update documentation

* Add an empty, `@Deprecated` constructor

---------

Co-authored-by: Guillermo Calvo <[email protected]>
Co-authored-by: Guillermo Calvo <[email protected]>
  • Loading branch information
3 people authored Aug 18, 2023
1 parent 0e575af commit 1281d47
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.config;

import io.micronaut.core.util.Toggleable;

/**
* Default Kafka listener exception handler configuration.
* @since 5.1.0
*/
public interface DefaultKafkaListenerExceptionHandlerConfiguration extends Toggleable {

/**
* @return Whether to skip record on deserialization failure.
*/
boolean isSkipRecordOnDeserializationFailure();

/**
* @return Whether to commit record on deserialization failure.
*/
boolean isCommitRecordOnDeserializationFailure();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.config;

import io.micronaut.context.annotation.ConfigurationProperties;

/**
* {@link ConfigurationProperties} implementation of {@link DefaultKafkaListenerExceptionHandlerConfiguration}.
* @since 5.1.0
*/
@ConfigurationProperties(DefaultKafkaListenerExceptionHandlerConfigurationProperties.PREFIX)
public class DefaultKafkaListenerExceptionHandlerConfigurationProperties implements DefaultKafkaListenerExceptionHandlerConfiguration {

/**
* The default prefix used for the default Kafka listener exception handler configuration.
*/
public static final String PREFIX = AbstractKafkaConfiguration.PREFIX + ".default-listener-exception-handler";

/**
* The default value for {@code skipRecordOnDeserializationFailure}.
*/
@SuppressWarnings("WeakerAccess")
public static final boolean DEFAULT_SKIP_RECORD_ON_DESERIALIZATION_FAILURE = true;

/**
* The default value for {@code commitRecordOnDeserializationFailure}.
*/
@SuppressWarnings("WeakerAccess")
public static final boolean DEFAULT_COMMIT_RECORD_ON_DESERIALIZATION_FAILURE = false;

private boolean skipRecordOnDeserializationFailure = DEFAULT_SKIP_RECORD_ON_DESERIALIZATION_FAILURE;

private boolean commitRecordOnDeserializationFailure = DEFAULT_COMMIT_RECORD_ON_DESERIALIZATION_FAILURE;

@Override
public boolean isSkipRecordOnDeserializationFailure() {
return skipRecordOnDeserializationFailure;
}

/**
* Whether to skip record on deserialization failure. Default value {@value #DEFAULT_SKIP_RECORD_ON_DESERIALIZATION_FAILURE}
*
* @param skipRecordOnDeserializationFailure Whether to skip record on deserialization failure.
*/
public void setSkipRecordOnDeserializationFailure(boolean skipRecordOnDeserializationFailure) {
this.skipRecordOnDeserializationFailure = skipRecordOnDeserializationFailure;
}

@Override
public boolean isCommitRecordOnDeserializationFailure() {
return commitRecordOnDeserializationFailure;
}

/**
* Whether to commit record on deserialization failure. Default value {@value #DEFAULT_COMMIT_RECORD_ON_DESERIALIZATION_FAILURE}
*
* @param commitRecordOnDeserializationFailure Whether to commit record on deserialization failure.
*/
public void setCommitRecordOnDeserializationFailure(boolean commitRecordOnDeserializationFailure) {
this.commitRecordOnDeserializationFailure = commitRecordOnDeserializationFailure;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
*/
package io.micronaut.configuration.kafka.exceptions;

import io.micronaut.configuration.kafka.config.DefaultKafkaListenerExceptionHandlerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaListenerExceptionHandlerConfigurationProperties;
import io.micronaut.context.annotation.Primary;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micronaut.core.annotation.NonNull;
import java.util.Collections;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -42,7 +47,27 @@ public class DefaultKafkaListenerExceptionHandler implements KafkaListenerExcept
private static final Logger LOG = LoggerFactory.getLogger(KafkaListenerExceptionHandler.class);
private static final Pattern SERIALIZATION_EXCEPTION_MESSAGE_PATTERN = Pattern.compile(".+ for partition (.+)-(\\d+) at offset (\\d+)\\..+");

private boolean skipRecordOnDeserializationFailure = true;
private boolean skipRecordOnDeserializationFailure;
private boolean commitRecordOnDeserializationFailure;

/**
* Creates a new instance.
*
* @param config The default Kafka listener exception handler configuration
*/
@Inject
public DefaultKafkaListenerExceptionHandler(DefaultKafkaListenerExceptionHandlerConfiguration config) {
skipRecordOnDeserializationFailure = config.isSkipRecordOnDeserializationFailure();
commitRecordOnDeserializationFailure = config.isCommitRecordOnDeserializationFailure();
}

/**
* @deprecated Use {@link DefaultKafkaListenerExceptionHandler#DefaultKafkaListenerExceptionHandler(DefaultKafkaListenerExceptionHandlerConfiguration)}
*/
@Deprecated(since = "5.1.0", forRemoval = true)
public DefaultKafkaListenerExceptionHandler() {
this(new DefaultKafkaListenerExceptionHandlerConfigurationProperties());
}

@Override
public void handle(KafkaListenerException exception) {
Expand Down Expand Up @@ -76,7 +101,15 @@ public void setSkipRecordOnDeserializationFailure(boolean skipRecordOnDeserializ
}

/**
* Seeks past a serialization exception if an error occurs.
* Sets whether to commit the offset of past records that are not deserializable and are skipped.
* @param commitRecordOnDeserializationFailure True if the offset for records that are not deserializable should be committed after being skipped.
*/
public void setCommitRecordOnDeserializationFailure(boolean commitRecordOnDeserializationFailure) {
this.commitRecordOnDeserializationFailure = commitRecordOnDeserializationFailure;
}

/**
* Seeks past a serialization exception if an error occurs. Additionally commits the offset if commitRecordOnDeserializationFailure is set
* @param cause The cause
* @param consumerBean The consumer bean
* @param kafkaConsumer The kafka consumer
Expand All @@ -87,17 +120,32 @@ protected void seekPastDeserializationError(
@NonNull Consumer<?, ?> kafkaConsumer) {
try {
final String message = cause.getMessage();
LOG.debug("Extracting unserializable consumer record topic, partition and offset from error message: {}", message);
final Matcher matcher = SERIALIZATION_EXCEPTION_MESSAGE_PATTERN.matcher(message);
if (matcher.find()) {
final String topic = matcher.group(1);
final int partition = Integer.valueOf(matcher.group(2));
final int offset = Integer.valueOf(matcher.group(3));
TopicPartition tp = new TopicPartition(topic, partition);
LOG.debug("Seeking past unserializable consumer record for partition {}-{} and offset {}", topic, partition, offset);
kafkaConsumer.seek(tp, offset + 1);

try {
kafkaConsumer.seek(tp, offset + 1);
} catch (Throwable e) {
LOG.error("Kafka consumer [{}] failed to seek past unserializable value: {}", consumerBean, e.getMessage(), e);
}

if (this.commitRecordOnDeserializationFailure) {
try {
LOG.debug("Permanently skipping unserializable consumer record by committing offset {} for partition {}-{}", offset, topic, partition);
kafkaConsumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(offset + 1)));
} catch (Throwable e) {
LOG.error("Kafka consumer [{}] failed to commit offset of unserializable value: {}", consumerBean, e.getMessage(), e);
}
}
}
} catch (Throwable e) {
LOG.error("Kafka consumer [{}] failed to seek past unserializable value: {}", consumerBean, e.getMessage(), e);
LOG.error("Failed to extract topic, partition and offset from serialization error message: {}", cause.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package io.micronaut.configuration.kafka.exceptions

import io.micronaut.configuration.kafka.config.DefaultKafkaListenerExceptionHandlerConfiguration
import io.micronaut.configuration.kafka.config.DefaultKafkaListenerExceptionHandlerConfigurationProperties
import io.micronaut.context.annotation.Property
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.ProducerConfig
import io.micronaut.configuration.kafka.AbstractEmbeddedServerSpec
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.apache.kafka.common.TopicPartition

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED

class DefaultKafkaListenerExceptionHandlerSpec extends AbstractEmbeddedServerSpec {

private static final String TOPIC_SEEK = "on-deserialization-error-seek"
private static final String TOPIC_COMMIT = "on-deserialization-error-commit"
private static final String TOPIC_NOTHING = "on-deserialization-error-do-nothing"

void "test seek past record on deserialization error by default"() {
given:
StringProducer stringProducer = context.getBean(StringProducer)
DefaultBehaviorOnDeserializationErrorConsumer consumer = context.getBean(DefaultBehaviorOnDeserializationErrorConsumer)

when: "A producer sends a message with wrong serialization"
stringProducer.sendToSeekTopic("not-a-uuid")

then: "The message is skipped with a seek() but not committed"
conditions.eventually {
consumer.currentPosition == 1
consumer.committedOffset == 0
}
}

void "test commit record on deserialization error"() {
given:
StringProducer stringProducer = context.getBean(StringProducer)
CommitOnDeserializationErrorConsumer consumer = context.getBean(CommitOnDeserializationErrorConsumer)

when: "A producer sends a message with wrong serialization"
stringProducer.sendToCommitTopic("not-a-uuid")

then: "The message is skipped and committed"
conditions.eventually {
consumer.currentPosition == 1
consumer.committedOffset == 1
}
}

void "test do nothing on deserialization error"() {
given:
StringProducer stringProducer = context.getBean(StringProducer)
DoNothingOnDeserializationErrorConsumer consumer = context.getBean(DoNothingOnDeserializationErrorConsumer)

when: "A producer sends a message with wrong serialization"
stringProducer.sendToDoNothingTopic("not-a-uuid")

then: "The message is skipped, but not committed"
conditions.eventually {
consumer.currentPosition > 0
consumer.committedOffset == 0
}
}

static abstract class AbstractOnDeserializationErrorConsumer implements KafkaListenerExceptionHandler {
Long currentPosition = -1
Long committedOffset = -1
DefaultKafkaListenerExceptionHandlerConfiguration config = new DefaultKafkaListenerExceptionHandlerConfigurationProperties()
DefaultKafkaListenerExceptionHandler errorHandler = new DefaultKafkaListenerExceptionHandler(config)
String topic

AbstractOnDeserializationErrorConsumer(String topic) {
this.topic = topic
}

@Override
void handle(KafkaListenerException exception) {
errorHandler.handle(exception)
TopicPartition tp = new TopicPartition(topic, 0)
currentPosition = exception.kafkaConsumer.position(tp)
OffsetAndMetadata committedOffsetAndMetadata = exception.kafkaConsumer.committed(tp)
if (committedOffsetAndMetadata != null) {
committedOffset = committedOffsetAndMetadata.offset()
} else {
committedOffset = 0
}
}
}

@Requires(property = 'spec.name', value = 'DefaultKafkaListenerExceptionHandlerSpec')
@KafkaListener(
offsetReset = EARLIEST,
offsetStrategy = DISABLED,
properties = [
@Property(name = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
value = "org.apache.kafka.common.serialization.UUIDSerializer")
]
)
static class DefaultBehaviorOnDeserializationErrorConsumer extends AbstractOnDeserializationErrorConsumer {
DefaultBehaviorOnDeserializationErrorConsumer() {
super(TOPIC_SEEK)
}

@Topic(TOPIC_SEEK)
void receive(UUID uuid) {
}
}

@Requires(property = "spec.name", value = "DefaultKafkaListenerExceptionHandlerSpec")
@KafkaListener(
offsetReset = EARLIEST,
offsetStrategy = DISABLED,
properties = [
@Property(name = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
value = "org.apache.kafka.common.serialization.UUIDSerializer")
]
)
static class CommitOnDeserializationErrorConsumer extends AbstractOnDeserializationErrorConsumer {
CommitOnDeserializationErrorConsumer() {
super(TOPIC_COMMIT)
errorHandler.setSkipRecordOnDeserializationFailure(true)
errorHandler.setCommitRecordOnDeserializationFailure(true)
}

@Topic(TOPIC_COMMIT)
void receive(UUID uuid) {
}
}

@Requires(property = "spec.name", value = "DefaultKafkaListenerExceptionHandlerSpec")
@KafkaListener(
offsetReset = EARLIEST,
offsetStrategy = DISABLED,
properties = [
@Property(name = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
value = "org.apache.kafka.common.serialization.UUIDSerializer")
]
)
static class DoNothingOnDeserializationErrorConsumer extends AbstractOnDeserializationErrorConsumer {
DoNothingOnDeserializationErrorConsumer() {
super(TOPIC_NOTHING)
errorHandler.setSkipRecordOnDeserializationFailure(false)
}

@Topic(TOPIC_NOTHING)
void receive(UUID uuid) {
}
}

@Requires(property = 'spec.name', value = 'DefaultKafkaListenerExceptionHandlerSpec')
@KafkaClient
static interface StringProducer {
@Topic(TOPIC_SEEK)
void sendToSeekTopic(String message)

@Topic(TOPIC_COMMIT)
void sendToCommitTopic(String message)

@Topic(TOPIC_NOTHING)
void sendToDoNothingTopic(String message)
}
}
4 changes: 4 additions & 0 deletions src/main/docs/guide/kafkaListener/kafkaErrors.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ NOTE: Specify exception to retry apply only for RETRY_ON_ERROR error strategy.

When an exception occurs in a ann:configuration.kafka.annotation.KafkaListener[] method by default the exception is simply logged. This is handled by api:configuration.kafka.exceptions.DefaultKafkaListenerExceptionHandler[].

The following options are available to configure the default Kafka listener exception handler:

include::{includedir}configurationProperties/io.micronaut.configuration.kafka.config.DefaultKafkaListenerExceptionHandlerConfigurationProperties.adoc[]

If you wish to replace this default exception handling with another implementation you can use the Micronaut's <<replaces, Bean Replacement>> feature to define a bean that replaces it: `@Replaces(DefaultKafkaListenerExceptionHandler.class)`.

You can also define per bean exception handling logic by implementing the api:configuration.kafka.exceptions.KafkaListenerExceptionHandler[] interface in your ann:configuration.kafka.annotation.KafkaListener[] class.
Expand Down

0 comments on commit 1281d47

Please sign in to comment.