-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-2806 : Receiving an empty list when using RecordFilterStrategy on batch messages #3216
GH-2806 : Receiving an empty list when using RecordFilterStrategy on batch messages #3216
Conversation
spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java
Outdated
Show resolved
Hide resolved
...ng-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
Outdated
Show resolved
Hide resolved
...ng-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
Outdated
Show resolved
Hide resolved
I see! i reverted all and make new commits. |
...in/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
...in/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
I think the problem comes from the
where we have:
and that leads to the:
So, the logic in that Maybe
|
Thank you for your analysis a lot 🙇♂️🙇♂️🙇♂️🙇♂️! |
I make new commit to apply your reviews. default boolean ignoreEmptyBatch() {
return false;
} I added This way, those who want to be as it is can do so without modifying their codes. Meanwhile, it provides a choice for users who have considered this to be an issue until now. What do you think? |
...in/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some tests for this new feature. And, yes, I also think that this should go to 3.3
already . So, please, fix Javadoc respectively. And add some doc, too.
9e349c2
to
c714551
Compare
@artembilan , thanks for your comments 🙇♂️
I added a couple of test cases to test new public API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The review is a bit thorough, but there is no rush with the fixes.
We have like a month yet until with start a new 3.3
version.
Thanks
spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/filtering.adoc
Outdated
Show resolved
Hide resolved
spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/filtering.adoc
Outdated
Show resolved
Hide resolved
spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/filtering.adoc
Outdated
Show resolved
Hide resolved
spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/filtering.adoc
Outdated
Show resolved
Hide resolved
spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/filtering.adoc
Outdated
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java
Outdated
Show resolved
Hide resolved
...ng-kafka/src/test/java/org/springframework/kafka/listener/adapter/FilteringAdapterTests.java
Outdated
Show resolved
Hide resolved
...ng-kafka/src/test/java/org/springframework/kafka/listener/adapter/FilteringAdapterTests.java
Outdated
Show resolved
Hide resolved
...ng-kafka/src/test/java/org/springframework/kafka/listener/adapter/FilteringAdapterTests.java
Outdated
Show resolved
Hide resolved
...ng-kafka/src/test/java/org/springframework/kafka/listener/adapter/FilteringAdapterTests.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like your changes have slipped somehow from our sight.
@chickenchickenlove ,
let us know if there is anything else to do in this PR.
I can only suggest to rebase your branch to the latest main
and add a short section into the whats-new.adoc
with a link to your comprehensive explanation in the filtering.adoc
.
The first milestone for 3.3
is due next Monday.
Thank you!
d883afd
to
23c048d
Compare
Hi, @artembilan ! long time no see 😄. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulling your changes locally for final review and merge...
Merged as f91f8a9. thank you very much for the contribution (again)! A couple notes:
|
Well, merged, so closing |
Thanks for your time and looking this PR! |
* Determine whether {@link FilteringBatchMessageListenerAdapter} should invoke | ||
* the {@link BatchMessageListener} when all {@link ConsumerRecord}s in a batch have been filtered out | ||
* resulting in empty list. By default, do invoke the {@link BatchMessageListener} (return false). | ||
* @return true for {@link FilteringBatchMessageListenerAdapter} to {@link BatchMessageListener} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{@link FilteringBatchMessageListenerAdapter} to {@link BatchMessageListener}
-> {@link FilteringBatchMessageListenerAdapter} to not invoke {@link BatchMessageListener}
not invoke
is correct doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed: 5c34122.
Thank you!
Motivation:
batch
mode, even if theRecordFilterStrategy
filters all records resulting in anEmpty List
being returned, theKafkaListener
is still invoked. In contrast, insingle record
mode, ifrecord
are filtered, theKafkaListener
is not called. This difference in behavior between the two modes can cause confusion for users.Modifications:
isAnyManualAck()
toAcknowledgment
to verify thatmanualAck
is needed onFilteringBatchMessageListenerAdapter
.FilteringBatchMessageListenerAdapter
.consumerAware
asfinal
(IMHO, we don't need to calculate it every single callonMessage()
.empty list
andmanual Ack == true
,KafkaListener
will be invoked. ifempty list
andmanual Ack == false
,KafkaListener
will not be invoked even iflistener
is kind ofConsumerAware
. In detail, See Discussion section below.)Result:
RecordFilterStrategy
filters all records and returns an Empty List, theKafkaListener
is invoked only if it is in manualACK
mode.Discussion
ConsumerAware
Listener, commits can be made usingConsumer.commitSync()
andConsumer.commitAsync()
. However, when using aConsumerAwareAckListener
, it seems possible that commits using theConsumer
and commits usingAck
could be processed simultaneously. That situation seems quite ambiguous.