-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KAFKA-8522] Streamline tombstone and transaction marker removal #10914
Conversation
@junrao @hachikuji Could you help take a review pass? I know Jun has reviewed before, but since we've rebased several times I think it would be helpful to look over again |
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
Outdated
Show resolved
Hide resolved
@@ -57,7 +57,7 @@ | |||
<suppress checks="ParameterNumber" | |||
files="DefaultRecordBatch.java"/> | |||
<suppress checks="ParameterNumber" | |||
files="Sender.java"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same suppression is on L54
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattwong949 : Thanks for the PR. Just a few minor comments below.
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
Outdated
Show resolved
Hide resolved
@@ -522,13 +523,13 @@ private[log] class Cleaner(val id: Int, | |||
val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) | |||
|
|||
// group the segments and clean the groups | |||
info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) | |||
info("Cleaning log %s (cleaning prior to %s, discarding legacy tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might not be very clear what a "legacy tombstone" means. Would it be fair to call this an upper bound on the deletion horizon?
// therefore, we should take advantage of this fact and remove tombstones if we can | ||
// under the condition that the log's latest delete horizon is less than the current time | ||
// tracked | ||
ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the broker is initialized, log.latestDeleteHorizon
will be NO_TIMESTAMP
. We need at least one run to trigger before we can initialize the value. Is there another condition we can rely on in order to ensure that the cleaning still occurs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to this, I am a bit concerned about the extra cleaning due to this. If we have just one tombstone record, this can force a round of cleaning on idle partitions. An alternative way is to clean the number of total surviving records and tombstone records during cleaning. We only trigger a cleaning if #tombstone/#totalRecords > minCleanableRatio. @hachikuji What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao Yeah, that's an interesting idea. Do you think it would be possible to make it a size-based comparison?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, ideally, we want to do size based estimate. I just not sure how accurate we can estimate size given batching and compression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like whether we track the delete horizon or the # of tombstones we will need to checkpoint some state. Otherwise we will be forced to perform a pass after every broker restart. Could we track the delete horizon upon each log append, when we clean the log, and when we have to recover the log?
I'm not sure where a checkpoint should be stored given our current checkpoint file formats and the need to support downgrades.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could store some additional stats related to tombstone in the logcleaner checkpoint file. It seems that to support downgrade, we can't change the version number since the existing code expects the version in the file to match that in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao @hachikuji @lbradstreet I've removed the logic for tracking the latestDeleteHorizon and the deleteHorizon-triggered cleaning in grabFilthiestCompactedLog since this part of the PR is not a part of KIP-534
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattwong949 : Thanks for the updated PR. A couple of more comments.
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
Show resolved
Hide resolved
// therefore, we should take advantage of this fact and remove tombstones if we can | ||
// under the condition that the log's latest delete horizon is less than the current time | ||
// tracked | ||
ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to this, I am a bit concerned about the extra cleaning due to this. If we have just one tombstone record, this can force a round of cleaning on idle partitions. An alternative way is to clean the number of total surviving records and tombstone records during cleaning. We only trigger a cleaning if #tombstone/#totalRecords > minCleanableRatio. @hachikuji What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattwong949 : Thanks for the updated PR. A few more comments below.
if (batch.isControlBatch) | ||
discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime | ||
else | ||
discardBatchRecords = canDiscardBatch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an existing issue. The following comment on line 1136 seems out of place since the code does that check is inside isBatchLastRecordOfProducer() below.
// We may retain a record from an aborted transaction if it is the last entry
// written by a given producerId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. I've removed that comment on 1136 since the case is mentioned in isBatchLastRecordOfProducer
BatchRetention.DELETE | ||
else | ||
BatchRetention.DELETE_EMPTY | ||
new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that containsMarkerForEmptyTxn should only be set to canDiscardBatch if this batch is a control batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm yeah I think you are right. I'll change to canDiscardBatch && batch.isControlBatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattwong949 : Thanks for the updated PR. Just a minor comment below.
@hachikuji : Do you have any more comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattwong949 : Thanks for the updated PR. LGTM
…che#10914) This PR aims to remove tombstones that persist indefinitely due to low throughput. Previously, deleteHorizon was calculated from the segment's last modified time. In this PR, the deleteHorizon will now be tracked in the baseTimestamp of RecordBatches. After the first cleaning pass that finds a record batch with tombstones, the record batch is recopied with deleteHorizon flag and a new baseTimestamp that is the deleteHorizonMs. The records in the batch are rebuilt with relative timestamps based on the deleteHorizonMs that is recorded. Later cleaning passes will be able to remove tombstones more accurately on their deleteHorizon due to the individual time tracking on record batches. KIP 534: https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds Co-authored-by: Ted Yu <[email protected]> Co-authored-by: Richard Yu <[email protected]>
…`deleteHorizonMs` in batch format (#11694) This PR updates the documentation and tooling to match #10914, which added support for encoding `deleteHorizonMs` in the record batch schema. The changes include adding the new attribute and updating field names. We have also updated stale references to the old `FirstTimestamp` field in the code and comments. Finally, In the `DumpLogSegments` tool, when record batch information is printed, it will also include the value of `deleteHorizonMs` is (e.g. `OptionalLong.empty` or `OptionalLong[123456]`). Reviewers: Vincent Jiang <[email protected]>, Kvicii <[email protected]>, Jason Gustafson <[email protected]>
This is rebased PR for #7884 and #9915.
This PR aims to remove tombstones that persist indefinitely due to low throughput. Previously, deleteHorizon was calculated from the segment's last modified time.
In this PR, the deleteHorizon will now be tracked in the baseTimestamp of RecordBatches. After the first cleaning pass that finds a record batch with tombstones, the record batch is recopied with deleteHorizon flag and a new baseTimestamp that is the deleteHorizonMs. The records in the batch are rebuilt with relative timestamps based on the deleteHorizonMs that is recorded. Later cleaning passes will be able to remove tombstones more accurately on their deleteHorizon due to the individual time tracking on record batches.
KIP 534: https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds
co author: @ConcurrencyPractitioner
Committer Checklist (excluded from commit message)