-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] Deserialized BatchMessageIdImpl cannot be used for acknowledgment #19030
Open
2 tasks done
BewareMyPower opened this issue
Dec 22, 2022
· 2 comments
· May be fixed by BewareMyPower/pulsar#15 or #19031
Open
2 tasks done
[Bug] Deserialized BatchMessageIdImpl cannot be used for acknowledgment #19030
BewareMyPower opened this issue
Dec 22, 2022
· 2 comments
· May be fixed by BewareMyPower/pulsar#15 or #19031
Comments
The deserialization logic in #9855 is meaningless. pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java Lines 99 to 105 in f85d591
Creating a independent |
2 tasks
BewareMyPower
added a commit
to BewareMyPower/pulsar
that referenced
this issue
Dec 22, 2022
Fixes apache#19030 ### Motivation When a `BatchMessageIdImpl` is created from a deserialization, the `BatchMessageAcker` object cannot be shared by all instances in the same batch, which leads to an acknowledgment failure when batch index ACK is disabled (by default). ### Modifications Maintain a map from the `(ledger id, entry id)` pair to the `BatchMessageAcker` in `ConsumerImpl`. If the `BatchMessageIdImpl` doesn't carry a valid `BatchMessageAcker`, create and cache a `BatchMessageAcker` instance and remove it when all messages in the batch are acknowledged. It requires a change in `MessageIdImpl#fromByteArray` that a `BatchMessageAckerDisabled` will be created to indicate there is no shared acker. To avoid making code more complicated, this patch refactors the existing code that many logics about consumer are moved from the ACK tracker to the consumer. It also removes the `AckType` parameter when acknowledging a list of messages.
This was referenced Dec 22, 2022
Open
BewareMyPower
added a commit
to BewareMyPower/pulsar
that referenced
this issue
Dec 22, 2022
Fixes apache#19030 ### Motivation When a `BatchMessageIdImpl` is created from a deserialization, the `BatchMessageAcker` object cannot be shared by all instances in the same batch, which leads to an acknowledgment failure when batch index ACK is disabled (by default). ### Modifications Maintain a map from the `(ledger id, entry id)` pair to the `BatchMessageAcker` in `ConsumerImpl`. If the `BatchMessageIdImpl` doesn't carry a valid `BatchMessageAcker`, create and cache a `BatchMessageAcker` instance and remove it when all messages in the batch are acknowledged. It requires a change in `MessageIdImpl#fromByteArray` that a `BatchMessageAckerDisabled` will be created to indicate there is no shared acker. To avoid making code more complicated, this patch refactors the existing code that many logics about consumer are moved from the ACK tracker to the consumer. It also removes the `AckType` parameter when acknowledging a list of messages.
BewareMyPower
added a commit
to BewareMyPower/pulsar
that referenced
this issue
Dec 23, 2022
Fixes apache#19030 ### Motivation When a `BatchMessageIdImpl` is created from a deserialization, the `BatchMessageAcker` object cannot be shared by all instances in the same batch, which leads to an acknowledgment failure when batch index ACK is disabled (by default). ### Modifications Maintain a map from the `(ledger id, entry id)` pair to the `BatchMessageAcker` in `ConsumerImpl`. If the `BatchMessageIdImpl` doesn't carry a valid `BatchMessageAcker`, create and cache a `BatchMessageAcker` instance and remove it when all messages in the batch are acknowledged. It requires a change in `MessageIdImpl#fromByteArray` that a `BatchMessageAckerDisabled` will be created to indicate there is no shared acker. To avoid making code more complicated, this patch refactors the existing code that many logics about consumer are moved from the ACK tracker to the consumer. It also removes the `AckType` parameter when acknowledging a list of messages.
The issue had no activity for 30 days, mark with Stale label. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Search before asking
Version
OS: Ubuntu 20.04
Pulsar: master (41edd2e)
Minimal reproduce step
Add a unit test that does these things:
MessageId
objects, which share the same ledger id, entry id, batch size, only the batch indexes are different.MessageId
objects by a serialization (MessageId#toByteArray
) and a deserialization (MessageId#fromByteArray
).MessageId
objects.What did you expect to see?
The restarted consumer should receive nothing.
What did you see instead?
The restarted consumer received the 1st message.
Anything else?
The root cause is from #1424, which make all
MessageId
instances in the same batch share the sameBatchMessageAcker
object. However, whenMessageId
instances are created from a deserialization. It's impossible to make them share the sameBatchMessageAcker
.Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: