-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve batch message acking by removing batch message tracker #1424
Improve batch message acking by removing batch message tracker #1424
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change looks good, refactoring that logic into own class was long overdue. I think there is a part missing when handling cumulative acks and there seems to be legit test failures.
} | ||
|
||
// bitset shared across messages in the same batch. | ||
private final BitSet bitSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have many of these objects, we could inherit from BitSet
to avoid an extra object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm then I have to change how BatchMessageAckerDisabled
and such. I think this change here removes the hashmap, which already remove bunch of allocations per batch message. It might be better to hide bitset behind the acker and make batch message acker a recycle object, which we can reuse those objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can do that later
if (batchMessageAckTracker.isEmpty()) { | ||
return; | ||
} | ||
MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how this logic to remove all entries from previous batches (when doing cumulative ack) has been replaced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Nice improvement!
retest this please |
@sijie can you merge with master? It already has the flaky test fixed. |
merged latest master |
@sijie I think there might be a legitimate test failure: https://builds.apache.org/job/pulsar_precommit_java8/55/testReport/junit/org.apache.pulsar.client.api/SimpleProducerConsumerStatTest/testMessageListener/ |
@merlimat will take a look |
Fixes apache#130 ### Motivation It's a catchup for apache/pulsar#1424 ### Modifications Migrate `BitSet` implementation from JDK. Though we have `vector<bool>` or `boost::dynamic_bitset`, to support batch index ACK in future, it's better to have the same method to convert a bit set to the underlying long array. Add `BatchedMessageIdImpl` to maintain a `BatchMessageAcker`, which is shared by messages in the same batch. The acker is responsible to record which messages are acknowledged. Only if all messages in the batch are acknowledged will the message id be acknowledged. For cumulative ack, use the number of removed batches to update the stats. (TODO: record the batch size in the unacked message tracker in future).
Fixes apache#130 ### Motivation It's a catchup for apache/pulsar#1424 ### Modifications Migrate `BitSet` implementation from JDK. Though we have `vector<bool>` or `boost::dynamic_bitset`, to support batch index ACK in future, it's better to have the same method to convert a bit set to the underlying long array. Add `BatchedMessageIdImpl` to maintain a `BatchMessageAcker`, which is shared by messages in the same batch. The acker is responsible to record which messages are acknowledged. Only if all messages in the batch are acknowledged will the message id be acknowledged.
Fixes apache#130 ### Motivation It's a catchup for apache/pulsar#1424 ### Modifications Migrate `BitSet` implementation from JDK. Though we have `vector<bool>` or `boost::dynamic_bitset`, to support batch index ACK in future, it's better to have the same method to convert a bit set to the underlying long array. Add `BatchedMessageIdImpl` to maintain a `BatchMessageAcker`, which is shared by messages in the same batch. The acker is responsible to record which messages are acknowledged. Only if all messages in the batch are acknowledged will the message id be acknowledged. The stats for individual ACKs only updates after the whole batch are acknowledged. Before this PR, each time the single message is acknowledged, the stats increase by one.
* Add BatchedMessageIdImpl to acknowledge batched messages Fixes #130 ### Motivation It's a catchup for apache/pulsar#1424 ### Modifications Migrate `BitSet` implementation from JDK. Though we have `vector<bool>` or `boost::dynamic_bitset`, to support batch index ACK in future, it's better to have the same method to convert a bit set to the underlying long array. Add `BatchedMessageIdImpl` to maintain a `BatchMessageAcker`, which is shared by messages in the same batch. The acker is responsible to record which messages are acknowledged. Only if all messages in the batch are acknowledged will the message id be acknowledged. The stats for individual ACKs only updates after the whole batch are acknowledged. Before this PR, each time the single message is acknowledged, the stats increase by one.
Motivation
client is maintaining a batch message tracker to track batch messages for acknowledge. if the application is using
Failover
subscription and doesn't ack the messages at all, the message id objects will be accumulating in the client.Modifications
Move the bitset as part of BatchMessageIdImpl and remove batch message tracker