-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker][client] PIP-192 PIP-215 Added TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView. #18195
Conversation
41d03d6
to
4faed76
Compare
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/CompactionReaderImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
Outdated
Show resolved
Hide resolved
2880ab0
to
a53b13c
Compare
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
Outdated
Show resolved
Hide resolved
addToCompactedLedger(lh, message, topic, outstanding) | ||
.whenComplete((res, exception2) -> { | ||
outstanding.release(); | ||
if (exception2 != null) { | ||
promise.completeExceptionally(exception2); | ||
return; | ||
} | ||
}); | ||
phaseTwoLoop(topic, reader, lh, outstanding, promise); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addToCompactedLedger(lh, message, topic, outstanding) | |
.whenComplete((res, exception2) -> { | |
outstanding.release(); | |
if (exception2 != null) { | |
promise.completeExceptionally(exception2); | |
return; | |
} | |
}); | |
phaseTwoLoop(topic, reader, lh, outstanding, promise); | |
addToCompactedLedger(lh, message, topic, outstanding) | |
.whenComplete((res, exception2) -> { | |
outstanding.release(); | |
if (exception2 != null) { | |
promise.completeExceptionally(exception2); | |
return; | |
} | |
phaseTwoLoop(topic, reader, lh, outstanding, promise); | |
}); | |
If I understand correctly, the logic should be like this, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. We don't want to wait for each addToCompactedLedger
call to complete for high concurrency (this concurrency is limited by the semaphore, outstanding
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
b4ee527
to
b27bdfb
Compare
Codecov Report
@@ Coverage Diff @@
## master #18195 +/- ##
============================================
+ Coverage 45.92% 46.69% +0.76%
- Complexity 10104 10533 +429
============================================
Files 680 709 +29
Lines 66758 69358 +2600
Branches 7147 7441 +294
============================================
+ Hits 30660 32388 +1728
- Misses 32680 33334 +654
- Partials 3418 3636 +218
Flags with carried forward coverage won't be shown. Click here to find out more.
|
this.cryptoKeyReader = cryptoKeyReader; | ||
} | ||
|
||
public ByteBuf toByteBuf() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some comments to describe which exceptions will throw in this method? Since this method is public, some others develop might use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
I will add the comments when resolving comments from others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
* batched into single batch message: | ||
* [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] | ||
*/ | ||
public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this is just a need for a bulk message container based on the maximum number of batches, and I feel that extending the BatchMessageContainerImpl
will become too heavy(There are various producer-related properties and operations in the BatchMessageContainer implementation), and it also adds to the complexity of the original BatchMessageContainer.
Maybe a simple batch container could be re-implemented. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to reuse the batching and payload serialization logic from the parent class.
I do not think this PR adds significant complexity to the parent class.
It just checks if the producer is null or not.
We could pass a mock producer if adding the producer null-check is not desirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, sorry, I miss some code.
We could pass a mock producer if adding the producer null-check is not desirable.
I'm leaning towards this.
other, I wonder why batch processing is needed here? Is it because it is known that there is a performance bottleneck here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I am not fully convinced this mock producer is better than the null check. Can you explain why this could be better? The base class sets producer later too, it also has potential that producer could be null.
Obviously, We dont want to create ledger entry per message. especially this message payload is very small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is that developers will miss this later when they modify it. Using mock producer is transfer behavior to RawBatchMessageContainerImpl
.
I revisited the code, and the current implementation is not quite capable of the mock producer.
like this:
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
Lines 116 to 117 in 4faed76
producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize() | |
+ batchAllocatedSizeBytes); |
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
Lines 136 to 138 in 4faed76
if (producer != null) { | |
ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId())); | |
} |
BWT: In the original implementation, BatchContainer
coupled with the producer I didn't think was very good. We should make BatchContiner
independent, Leave things like releaseMemory
to the producer. (This is off topic and does not affect this PR)
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
Show resolved
Hide resolved
pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
Show resolved
Hide resolved
pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
Show resolved
Hide resolved
Rebased this pip-215 branch on top of the master. |
Master Issue: #18099
Motivation
This PR supports
TopicCompactionStrategy
inStrategicTwoPhaseCompactor
andTableView
for the internal system topics, as proposed in PIP-215.This PR does not expose StrategicTwoPhaseCompactor for customer topics yet. We want to expose this feature to users after proven to be stable on the new system topic in the new broker load balancer(in PIP-192) first.
Modifications
This PR adds the following classes to implement `StrategicTwoPhaseCompactor.
StrategicTwoPhaseCompactor
that extendsTwoPhaseCompactor
.TopicCompactionStrategy
interface.TopicCompactionStrategy
logic inStrategicTwoPhaseCompactor
andTableViewImpl
.CompactionReaderImpl
to scan topic messages in StrategicTwoPhaseCompactor. It cumulatively acknowledges the read messages at the end of StrategicTwoPhaseCompactor.SubscriptionMode
andSubscriptionInitialPosition
parameters inReaderConfigurationData
since theCompactionReaderImpl
reader's subscription needs to be durable and needs to read from the earliest position.RawBatchMessageContainerImpl
to batch and serialize messages inStrategicTwoPhaseCompactor
.This PR updates the
TableViewImpl
to useTopicCompactionStrategy
in its data K,V map update logic.topicCompactionStrategy
member variable inTableViewConfigurationData
listen()
interface inTableView
to provide an option to call the listener actions only for the tail messages.TableViewImpl
and itshandleMessage()
function to considerTopicCompactionStrategy
when updating the data K,V map.This PR updated the compaction test classes to reuse the test cases.
This PR updated the modifiers of the parent classes of the added classes to access the member variables and member functions.
Verifying this change
This change added tests and can be verified as follows:
StrategicTwoPhaseCompactor
.Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
This PR does not enable strategic compaction for the customer topics. We will update the doc when enabling this compaction for the customer topics.
Matching PR in forked repository
PR in forked repository: heesung-sn#12