-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KAFKA-8522] Streamline tombstone and transaction marker removal #7884
Changes from 1 commit
fdf7095
f7dcaac
2ab16af
d36f776
3b2193b
dcc2f65
a9d8c4d
dd9ca28
1587462
a25187b
f469515
15d9d8a
a3bc996
e00f37c
331ebad
4b12ebb
2fc90d3
041e867
9df0e70
613d17d
69b5240
ad4ff10
8c9b50d
de5d0a1
5b43e43
f665275
e570f6c
3c96d55
a78c563
e287b49
ee67247
5bedf9c
bd3e18f
4515e7d
60d72d0
92530fe
8baa416
6d011ed
6cc19fe
8e6f9a2
d6dd028
9335c36
3541ea2
a88dc20
bc3f867
6a1b3da
d7491c1
06d9ff5
bed40ab
54cb56b
81ba24c
cadee72
90877fc
e694f13
a9316ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -195,11 +195,12 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR | |
// in which case, we need to reset the base timestamp and overwrite the timestamp deltas | ||
// if the batch does not contain tombstones, then we don't need to overwrite batch | ||
boolean canControlBatchBeRemoved = batch.isControlBatch() && deleteHorizonMs > RecordBatch.NO_TIMESTAMP; | ||
if (writeOriginalBatch && (batch.deleteHorizonSet() || (!containsTombstonesOrMarker && !canControlBatchBeRemoved))) { | ||
if (writeOriginalBatch && (deleteHorizonMs == RecordBatch.NO_TIMESTAMP || deleteHorizonMs == batch.deleteHorizonMs() | ||
|| (!containsTombstonesOrMarker && !canControlBatchBeRemoved))) { | ||
batch.writeTo(bufferOutputStream); | ||
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false); | ||
} else { | ||
final MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs); | ||
MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, it seems that we only want to pass in deleteHorizonMs if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By current logic, this would actually break the code. Since we don't pass a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's just that in this PR, retrieveDeleteHorizon() returns deleteHorizonMs > 0 even for batches where deleteHorizonMs doesn't need to be set. Then, we will be setting deleteHorizonMs for those batches unnecessarily. |
||
MemoryRecords records = builder.build(); | ||
int filteredBatchSize = records.sizeInBytes(); | ||
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) | ||
|
@@ -241,7 +242,7 @@ private static BatchIterationResult iterateOverBatch(RecordBatch batch, | |
FilterResult filterResult, | ||
RecordFilter filter, | ||
byte batchMagic, | ||
boolean recordsFiltered, | ||
boolean writeOriginalBatch, | ||
long maxOffset, | ||
List<Record> retainedRecords) { | ||
boolean containsTombstonesOrMarker = false; | ||
|
@@ -254,7 +255,7 @@ private static BatchIterationResult iterateOverBatch(RecordBatch batch, | |
// Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite | ||
// the corrupted batch with correct data. | ||
if (!record.hasMagic(batchMagic)) | ||
recordsFiltered = false; | ||
writeOriginalBatch = false; | ||
|
||
if (record.offset() > maxOffset) | ||
maxOffset = record.offset(); | ||
|
@@ -265,10 +266,10 @@ private static BatchIterationResult iterateOverBatch(RecordBatch batch, | |
containsTombstonesOrMarker = true; | ||
} | ||
} else { | ||
recordsFiltered = false; | ||
writeOriginalBatch = false; | ||
} | ||
} | ||
return new BatchIterationResult(recordsFiltered, containsTombstonesOrMarker, maxOffset); | ||
return new BatchIterationResult(writeOriginalBatch, containsTombstonesOrMarker, maxOffset); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's probably better to name writeOriginalBatch here to sth like recordsFiltered since we combine other information to determine writeOriginalBatch later on. |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -208,9 +208,7 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K | |
|
||
// We sleep a little bit, so that log cleaner has already gone through | ||
// some iterations, ensures that delete horizons has been updated correctly | ||
Thread.sleep(400L) | ||
assertEquals(log.latestDeleteHorizon, T0 + tombstoneRetentionMs) | ||
|
||
Thread.sleep(300L) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems unnecessary since we are waiting in cleaner.awaitCleaned() already later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it definitely is inconsistent with other tests in that there is a Thread.sleep(). Problem is that this test seems prone to be somewhat flaky. Without the sleep, at the present state, it definitely fails. |
||
time.sleep(tombstoneRetentionMs + 1) | ||
|
||
val latestOffset: Long = log.latestEpoch match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to be a complicated way of getting latestOffset. We could just do log.logEndOffset. |
||
|
@@ -231,10 +229,10 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K | |
cleaner.awaitCleaned(new TopicPartition("log-partition", 0), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unnecessary given the waitUntilTrue() below. |
||
latestOffset + 1, maxWaitMs = tombstoneRetentionMs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To avoid transient failures, we probably want to give long enough maxWaitMs, sth like 5 secs. |
||
|
||
assertEquals(log.latestDeleteHorizon, RecordBatch.NO_TIMESTAMP) | ||
for (segment <- log.logSegments; record <- segment.log.records.asScala) { | ||
fail ("The log should not contain record " + record + ", tombstone has expired its lifetime.") | ||
} | ||
assertEquals(log.latestDeleteHorizon, -1L) | ||
} | ||
|
||
private def readFromLog(log: Log): Iterable[(Int, Int)] = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the logic can be simplified a bit. It seems that we can do this branch if writeOriginalBatch is true and needToSetDeleteHorizon is false (
needToSetDeleteHorizon = (batch magic >= V2 && containsTombstonesOrMarker && batch's deleteHorizon not set)
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sure, that's fine. But we also still need to account for the control batch and check whether or not it is empty yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a control batch, it's only removed at the batch level. So, if the batch can be deleted at the batch level, we won't get in here. If the batch can't be deleted at the batch level, the record within the batch will always be retained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao Is this always the case? If I remember correctly in the KIP, control batches, if it contains only tombstones, will be persisted in the logs for a set period of time i.e. we need to at some point remove the tombstones first before the control batches can be deleted. Therefore, I think it would be very much possible that we need to check for
isControlBatchEmpty
here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ConcurrencyPractitioner : A control batch has only a single marker record (either a commit or abort). When all records before the control batch are removed, we set the deleteHorizon for the control batch. When the time passes the deleteHorizon, the control batch is removed. A control batch never contains a tombstone.