-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add BatchedMessageIdImpl to acknowledge batched messages #132
Add BatchedMessageIdImpl to acknowledge batched messages #132
Conversation
c12340a
to
1066111
Compare
Fixes apache#130 ### Motivation It's a catchup for apache/pulsar#1424 ### Modifications Migrate `BitSet` implementation from JDK. Though we have `vector<bool>` or `boost::dynamic_bitset`, to support batch index ACK in future, it's better to have the same method to convert a bit set to the underlying long array. Add `BatchedMessageIdImpl` to maintain a `BatchMessageAcker`, which is shared by messages in the same batch. The acker is responsible to record which messages are acknowledged. Only if all messages in the batch are acknowledged will the message id be acknowledged. The stats for individual ACKs only updates after the whole batch are acknowledged. Before this PR, each time the single message is acknowledged, the stats increase by one.
1066111
to
5df2b47
Compare
auto batchedMessageIdImpl = std::dynamic_pointer_cast<BatchedMessageIdImpl>(messageIdImpl); | ||
|
||
auto batchSize = messageId.batchSize(); | ||
if (!batchedMessageIdImpl || batchedMessageIdImpl->ackIndividual(messageId.batchIndex())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When messageId is not BatchedMessageIdImpl
, the !batchedMessageIdImpl
condition is true, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. In this case batchedMessageIdImpl
will be a nullptr
(0x0), see https://en.cppreference.com/w/cpp/memory/shared_ptr/pointer_cast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/LGTM
### Motivation The serialization and deserialization of `MessageId` became wrong after apache#132. 1. The batch size is not serialized. 2. `BatchedMessageIdImpl` could never be deserialized. The wrong behaviors could lead to a result that all MessageId objects created from deserialization does not have a batch size, which might make `ReaderTest.testReaderOnSpecificMessageWithBatches` fail when the cmake build type is `Debug`. What's worse is that a MessageId created from deserialization is always treated as a `MessageIdImpl`, on which the acknowledgment will have wrong behavior. ### Modifications Serialize the batch size if it's valid. In deserialization, create a `BatchedMessageIdImpl` when the batch index and the batch size are valid as a batched message. There is a problem that if a `MessageId` is created from deserialization, it cannot share a `BatchMessageAcker` with other `MessageId` objects. In this case, create a fake `BatchMessageAcker` that returns false for both `ackIndividual` and `ackCumulative` methods. It will make acknowledgment always fail but will fall back to batch index ACK if batch index ACK is enabled. Add the `-DCMAKE_BUILD_TYPE=Debug` for tests to enable assertions.
* [fix] Fix MessageId serialization when it's a batched message ### Motivation The serialization and deserialization of `MessageId` became wrong after #132. 1. The batch size is not serialized. 2. `BatchedMessageIdImpl` could never be deserialized. The wrong behaviors could lead to a result that all MessageId objects created from deserialization does not have a batch size, which might make `ReaderTest.testReaderOnSpecificMessageWithBatches` fail when the cmake build type is `Debug`. What's worse is that a MessageId created from deserialization is always treated as a `MessageIdImpl`, on which the acknowledgment will have wrong behavior. ### Modifications Serialize the batch size if it's valid. In deserialization, create a `BatchedMessageIdImpl` when the batch index and the batch size are valid as a batched message. There is a problem that if a `MessageId` is created from deserialization, it cannot share a `BatchMessageAcker` with other `MessageId` objects. In this case, create a fake `BatchMessageAcker` that returns false for both `ackIndividual` and `ackCumulative` methods. It will make acknowledgment always fail but will fall back to batch index ACK if batch index ACK is enabled. Add the `-DCMAKE_BUILD_TYPE=Debug` for tests to enable assertions. * Add virtual destructor
Fixes #130
Motivation
It's a catchup for apache/pulsar#1424
Modifications
Migrate
BitSet
implementation from JDK. Though we havevector<bool>
orboost::dynamic_bitset
, to support batch index ACK in future, it's better to have the same method to convert a bit set to the underlying long array.Add
BatchedMessageIdImpl
to maintain aBatchMessageAcker
, which is shared by messages in the same batch. The acker is responsible to record which messages are acknowledged. Only if all messages in the batch are acknowledged will the message id be acknowledged.The stats for individual ACKs only updates after the whole batch are acknowledged. Before this PR, each time the single message is acknowledged, the stats increase by one.