-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow to commit offset in case of deserialization errors #761
Allow to commit offset in case of deserialization errors #761
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need a test if we wanted to go with this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logically I think this option makes sense overall. I'd potentially like to see more fine-grained error handling as mentioned, and as @sdelamo said, it needs tests.
...n/java/io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandler.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for the lack of tests.
As an altenative approach, we could have merged those two boolean flags into a single configurable property onDeserializationFailure
or deserializationFailureStrategy
that could be populated with an enum type like this:
public enum DeserializationFailureStrategy {
/** Does nothing. */
DO_NOTHING,
/** Seeks past the offending record. */
SKIP,
/** Skips and commits offending record. */
COMMIT
}
But I admit the current approach is more backward compatible.
Then enum would make sense but I suppose it’s better to be backward compatible and not change the interface and the behavior for people already using the handler. I will try to add the test one of these days! |
f5c52a7
to
4d7688a
Compare
@sdelamo @jeremyg484 @guillermocalvo added a test and a separate try/catch for the commit operation |
Yes, we cannot introduce a breaking change. |
4d7688a
to
d5881ac
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will people discover this new setting? I think we have to add docs.
.../io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandlerSpec.groovy
Show resolved
Hide resolved
.../io/micronaut/configuration/kafka/exceptions/DefaultKafkaListenerExceptionHandlerSpec.groovy
Show resolved
Hide resolved
Previous behavior was buggy. It was fixed by: micronaut-projects#771
|
||
then: "The message is skipped, but not committed" | ||
conditions.eventually { | ||
consumer.currentPosition > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guillermocalvo it's strange that this test passes, the DoNothingOnDeserializationErrorConsumer
explicitly sets the skipRecordOnDeserializationFailure
to false so I'd expect the offset not to be advanced... or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problem
When a consumed message cannnot be deserialized (e.g. when using the Avro deserializer and receiving a JSON) the
DefaultKafkaListenerExceptionHandler
has an option to skip the message with a seek() and keep consuming the next message, but it doesn’t commit the message offset. This means that until a “good” message is processed and committed, the consumer group will show a lag and it will try reprocessing the bad message in case of restart or group rebalancing. This issue especially impacts low-traffic topics.Proposed solution
Add a flag
commitRecordOnDeserializationFailure
to indicate if the default error handler should also commit the offset in addition to doing the seek(). The flag is set to false by default, so the handler will continue with the current behavior unless the flag is set explicitly by the user.