-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KAFKA-8522] Streamline tombstone and transaction marker removal #7884
[KAFKA-8522] Streamline tombstone and transaction marker removal #7884
Conversation
@junrao This PR is ready for review. :) |
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ConcurrencyPractitioner : Thanks for the new patch. Overall, the logic still seems a bit over-complicated to me. Left a few more comments below.
@@ -334,7 +334,8 @@ class LogCleaner(initialConfig: CleanerConfig, | |||
@throws(classOf[LogCleaningException]) | |||
private def cleanFilthiestLog(): Boolean = { | |||
val preCleanStats = new PreCleanStats() | |||
val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match { | |||
val ltc =cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ltc => logToClean ? Also, do we need to use another local val since ltc is only used once?
@@ -623,6 +625,7 @@ private[log] class Cleaner(val id: Int, | |||
* @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment | |||
* @param maxLogMessageSize The maximum message size of the corresponding topic | |||
* @param stats Collector for cleaning statistics | |||
* @param tombstoneRetentionMs Defines how long a tombstone should be kept as defined by log configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make it clear the difference btw retainDeletesAndTxnMarkers and tombstoneRetentionMs. Also, it's probably better to put they as adjacent params.
// we piggy-back on the tombstone retention logic to delay deletion of transaction markers. | ||
// note that we will never delete a marker until all the records from that transaction are removed. | ||
discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) | ||
val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) | ||
isControlBatchEmpty = canDiscardBatch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, isControlBatchEmpty is a bit misleading since batch is not always a control batch.
override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = { | ||
override def checkBatchRetention(batch: RecordBatch): BatchRetention = checkBatchRetention(batch, batch.deleteHorizonMs()) | ||
|
||
override def shouldRetainRecord(batch: RecordBatch, record: Record, newDeleteHorizonMs: Long): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to have the logic to determine if deleteHorizonMs should be set here instead of MemoryRecords since it's log cleaner specific logic. I was thinking that we could extend checkBatchRetention() to return (Boolean, shouldSetHorizon).
var shouldRetainDeletes = true | ||
if (isLatestVersion) | ||
shouldRetainDeletes = (batch.deleteHorizonSet() && currentTime < batch.deleteHorizonMs()) || | ||
(!batch.deleteHorizonSet() && currentTime < newBatchDeleteHorizonMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if deleteHorizonSet is not set, we shouldn't be deleting the tombstone. So, not sure what newBatchDeleteHorizonMs is intended for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this is used as a means to help the tests in LogCleanerTest.scala pass. LogCleanerTest usually wants the tombstones removed in a single pass (but that pass is usually used for setting the delete horizon ms, which means without doing the above, we would be unable to remove tombstones). Therefore, by adding the newBatchDeleteHorizonMs
argument (which is passed in by MemoryRecords), whenever LogCleaner calls clean log with the current time marked as Log.MAX_VALUE
, we will be able to remove the tombstones / control records in one pass.
} | ||
final BatchIterationResult iterationResult = iterateOverBatch(batch, decompressionBufferSupplier, filterResult, filter, | ||
batchMagic, writeOriginalBatch, maxOffset, retainedRecords, | ||
containsTombstonesOrMarker, deleteHorizonMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why are we passing in containsTombstonesOrMarker, which is always false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I can remove that.
long deleteHorizonMs = filter.retrieveDeleteHorizon(batch); | ||
final BatchRetention batchRetention; | ||
if (!batch.deleteHorizonSet()) | ||
batchRetention = filter.checkBatchRetention(batch, deleteHorizonMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since deleteHorizonMs can be obtained from batch, it's not clear why we need to pass that in as a param.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, look in comment above. This delete horizon is used for the case where we want to remove the tombstones in a single pass. On the first iteration of Log Cleaner, we are unable to remove the tombstone because no delete horizon has not been set yet. Therefore, when we compute the delete horizon, we need to pass the delete horizon back into checkBatchRetention
so that tombstones can be removed in one iteration.
On second thought, I think we don't need to add an extra parameter to the checkBatchRetention
method. Such logic would only need to be restricted to LogCleaner. i.e. we store the delete horizon in another variable in the Record Filter we implemented in LogCleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I am still not sure why we need to remove a tombstone in one pass. If a tombstone's delete horizon is not set, it can't be removed in this round of cleaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, acknowledged. I think thats a good point.
@@ -204,7 +205,35 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], | |||
(ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio | |||
} | |||
if(cleanableLogs.isEmpty) { | |||
None | |||
// in this case, we are probably in a low throughput situation | |||
// therefore, we should take advantage of this fact and remove tombstones if we can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this. A round of cleaning can be expensive since we need to read in all existing cleaned segments. That's why by default, we only trigger a round of cleaning if the dirty portion of the log is as large as the cleaned portion. Not sure if it's worth doing cleaning more aggressively just to remove the tombstone. So, perhaps we can leave it outside of this PR for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao I did some thinking about this. The integration test I added does not pass without this part. Because what happens is that in logs with tombstones, there is the possibility that without further throughput, the cleanable logs will always be empty. Therefore, as I mentioned in the comment, since we are in a low throughput situation, LogCleaner's workload is relatively light anyways. In that case, we can clean tombstones since we don't have much else to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a way to figure out whether if log cleaner has a heavy workload or not. If cleanable logs has remained empty for a long period of time (for a set threshold), then we can safely say that the log cleaner thread isn't busy since there is no logs to clean. After that threshold has passed, we can start processing logs with tombstones and removing them.
This should help us know exactly when we can go back and remove tombstones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, we can keep track of the largest deleteHorizonMs in the cleaned portion. We can then trigger a round of cleaning when the current time has passed the largest deleteHorizonMs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I found that this approach probably is a lot better.
Hi @junrao Thanks for the comments you left! Overall, I managed to simplify the code somewhat and removed a couple of methods that was probably not necessary. Notably, there is not as many calls involving |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ConcurrencyPractitioner : Thanks for addressing the comments. A few more comments below.
* @param maxLogMessageSize The maximum message size of the corresponding topic | ||
* @param stats Collector for cleaning statistics | ||
* @param tombstoneRetentionMs Defines how long a tombstone should be kept as defined by log configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tombstoneRetentionMs is duplicated in the javadoc.
maxLogMessageSize: Int, | ||
transactionMetadata: CleanedTransactionMetadata, | ||
lastRecordsOfActiveProducers: Map[Long, LastRecord], | ||
stats: CleanerStats): Unit = { | ||
stats: CleanerStats, | ||
currentTime: Long = RecordBatch.NO_TIMESTAMP): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add currentTime to the javadoc?
/** | ||
* Checks if the control batch (if it is one) can be removed (making sure that it is empty) | ||
*/ | ||
protected boolean isControlBatchEmpty(RecordBatch recordBatch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems never used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, will get rid of that.
@@ -204,7 +205,35 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], | |||
(ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio | |||
} | |||
if(cleanableLogs.isEmpty) { | |||
None | |||
// in this case, we are probably in a low throughput situation | |||
// therefore, we should take advantage of this fact and remove tombstones if we can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, we can keep track of the largest deleteHorizonMs in the cleaned portion. We can then trigger a round of cleaning when the current time has passed the largest deleteHorizonMs.
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
Outdated
Show resolved
Hide resolved
long deleteHorizonMs = filter.retrieveDeleteHorizon(batch); | ||
final BatchRetention batchRetention; | ||
if (!batch.deleteHorizonSet()) | ||
batchRetention = filter.checkBatchRetention(batch, deleteHorizonMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I am still not sure why we need to remove a tombstone in one pass. If a tombstone's delete horizon is not set, it can't be removed in this round of cleaning.
@junrao I've mostly resolved your comments. I'm working on how we could trigger a call for a clean when the latest delete horizon had been passed. Other than that, feel free to add anything else. :) |
@junrao Do you want to take another look? |
@junrao pinging. |
@ConcurrencyPractitioner : Thanks for the updated PR. Will take another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ConcurrencyPractitioner : Thanks for the updated PR. Made another pass of the non-testing files. A few comments below.
* @param maxLogMessageSize The maximum message size of the corresponding topic | ||
* @param stats Collector for cleaning statistics | ||
* @param tombstoneRetentionMs Defines how long a tombstone should be kept as defined by log configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this up to below retainDeletesAndTxnMarkers?
writeOriginalBatch = false; | ||
} | ||
} | ||
return new BatchIterationResult(writeOriginalBatch, containsTombstonesOrMarker, maxOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to name writeOriginalBatch here to sth like recordsFiltered since we combine other information to determine writeOriginalBatch later on.
// if the batch does not contain tombstones, then we don't need to overwrite batch | ||
boolean canControlBatchBeRemoved = batch.isControlBatch() && deleteHorizonMs > RecordBatch.NO_TIMESTAMP; | ||
if (writeOriginalBatch && (deleteHorizonMs == RecordBatch.NO_TIMESTAMP || deleteHorizonMs == batch.deleteHorizonMs() | ||
|| (!containsTombstonesOrMarker && !canControlBatchBeRemoved))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the logic can be simplified a bit. It seems that we can do this branch if writeOriginalBatch is true and needToSetDeleteHorizon is false (needToSetDeleteHorizon = (batch magic >= V2 && containsTombstonesOrMarker && batch's deleteHorizon not set)
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sure, that's fine. But we also still need to account for the control batch and check whether or not it is empty yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a control batch, it's only removed at the batch level. So, if the batch can be deleted at the batch level, we won't get in here. If the batch can't be deleted at the batch level, the record within the batch will always be retained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao Is this always the case? If I remember correctly in the KIP, control batches, if it contains only tombstones, will be persisted in the logs for a set period of time i.e. we need to at some point remove the tombstones first before the control batches can be deleted. Therefore, I think it would be very much possible that we need to check for isControlBatchEmpty
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ConcurrencyPractitioner : A control batch has only a single marker record (either a commit or abort). When all records before the control batch are removed, we set the deleteHorizon for the control batch. When the time passes the deleteHorizon, the control batch is removed. A control batch never contains a tombstone.
|
||
if (batch.deleteHorizonSet()) { | ||
if (batch.deleteHorizonMs() > latestDeleteHorizon) { | ||
latestDeleteHorizon = batch.deleteHorizonMs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may not be the best place to track latestDeleteHorizon. Perhaps we can return the largest deleteHorizon in MemoryRecords.filterTo() and keep track of latestDeleteHorizon in the while loop in line 713. If we do that, I am not sure if we need retrieveDeleteHorizon() since MemoryRecords.filterTo() can obtain whether deleteHorizon is set from the batch and calculate the new deleteHorizon if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I think there is multiple problems we might need to think about:
- We don't know what the current time is since MemoryRecords doesn't have access to a
Time
instance. - For control batches,
retrieveDeleteHorizon
serves a critical function: We callcontrolBatch.onTransactionRead
there to determine if we can set a delete horizon for our batch.
In summation, I think that there are multiple dependencies (located in LogCleaner) which must be called from MemoryRecords#filterTo
. It would be more of a hassle I think if we need to figure out a way how to call all these methods from filterTo as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point on #2. My concern is that the batch could be filtered after retrieveDeleteHorizon() is called. Then, the latestDeleteHorizon maintained here won't be very accurate.
@@ -774,7 +818,14 @@ private[log] class Cleaner(val id: Int, | |||
* 2) The message doesn't has value but it can't be deleted now. | |||
*/ | |||
val latestOffsetForKey = record.offset() >= foundOffset | |||
val isRetainedValue = record.hasValue || retainDeletes | |||
val isLatestVersion = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isLatestVersion => supportDeleteHorizon?
// therefore, we should take advantage of this fact and remove tombstones if we can | ||
// under the condition that the log's latest delete horizon is less than the current time | ||
// tracked | ||
val logsContainingTombstones = logs.filter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we put the common logic into a shared method to avoid duplicating most of the code below?
producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0); | ||
} | ||
|
||
public static void writeEmptyHeader(ByteBuffer buffer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method seems unused?
producerEpoch, baseSequence, isTransactional, isControlRecord, isDeleteHorizonSet, partitionLeaderEpoch, 0); | ||
} | ||
|
||
static void writeHeader(ByteBuffer buffer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method seems unused?
@hachikuji All comments addressed. See if there is anything else that we might need to account for. |
Pinging @hachikuji. |
@hachikuji Pinging for review |
@ConcurrencyPractitioner : We now have https://github.com/apache/kafka/blob/trunk/.asf.yaml. You can add yourself to Jenkins's whitelist by following https://cwiki.apache.org/confluence/display/INFRA/.asf.yaml+features+for+git+repositories#id-.asf.yamlfeaturesforgitrepositories-JenkinsPRWhitelisting . |
ok to test |
ok to test |
@junrao Cool. It's just that should I edit the |
@ConcurrencyPractitioner : You can just submit a separate PR to add yourself in .asf.yml. |
@junrao Alright, got it done. |
ok to test |
test this please |
@junrao I don't think the .asf.yaml worked. Tried to trigger a few test rounds, but Jenkins didn't respond. |
@ConcurrencyPractitioner Could you try "retest this please"? If it still doesn't work, you can file an Apache infra jira for help. |
@junrao Did try on another PR. Looks like it didn't work. I will fire a JIRA. |
Reported in JIRA here: https://issues.apache.org/jira/browse/INFRA-20182 |
@hachikuji Do you have time to review? Just give me a heads-up if there are some comments left unaddressed. |
Hi @ConcurrencyPractitioner . What is the status of this PR? We are also experiencing https://issues.apache.org/jira/browse/KAFKA-8522 . Thanks! |
@wushujames : This PR is mostly ready. It's just waiting for another committer more familiar with the transactional logic to take another look. @ConcurrencyPractitioner : Would you be able to rebase this PR? Thanks. |
@ConcurrencyPractitioner @junrao this PR has been stale since April 2020. When would it be ready to merge? We are hitting this issue and it causes insanely long startup times in our applications as they need to read all the tombstones that are not being removed. |
@akamensky @wushujames @junrao Migrating to a new PR. You could find it here #9915. |
The objective of this PR is to prevent tombstones from persisting in logs under low throughput conditions.