Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KAFKA-8522] Streamline tombstone and transaction marker removal #10914

Merged
merged 26 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
96d128a
[KAKFA-8522] Streamline tombstone and transaction marker removal
tedyu Jan 15, 2021
6c105a0
Merge branch 'trunk' into trunk
ConcurrencyPractitioner Jan 20, 2021
0506bda
Merge branch 'trunk' into trunk
ConcurrencyPractitioner Jan 27, 2021
147803f
Merge branch 'trunk' of github.com:apache/kafka into trunk
mattwong949 Jun 22, 2021
e179f92
missed a couple fixes on the merge
mattwong949 Jun 22, 2021
44c50bd
Merge branch 'trunk' of github.com:apache/kafka into KAKFA-8522
mattwong949 Jul 7, 2021
8403960
fix nit, remove firstTimestamp(), revert checkstyle
mattwong949 Jul 8, 2021
6731c4d
suppress checkstyle
mattwong949 Jul 12, 2021
bb47b16
address comments - use OptionalLong, rename constants
mattwong949 Jul 12, 2021
c293392
improve test for correct timestamps as delta for delete horizon
mattwong949 Jul 12, 2021
798a5f7
address comments
mattwong949 Jul 29, 2021
3f4dc87
wip addressing comments
mattwong949 Aug 3, 2021
7a13fc1
Add tests to MemoryRecordsBuilder for direct deleteHorizon value set
mattwong949 Aug 3, 2021
9bc5cb6
add in latestDeleteHorizon again for review
mattwong949 Aug 4, 2021
6fa401d
Merge branch 'trunk' of github.com:apache/kafka into KAKFA-8522
mattwong949 Aug 18, 2021
cd77df9
Trigger build
mattwong949 Aug 18, 2021
d036c08
Merge branch 'trunk' of github.com:apache/kafka into KAKFA-8522
mattwong949 Sep 1, 2021
43698e0
address Jun's comments, log reason for compaction
mattwong949 Sep 2, 2021
05fea42
addressing comments
mattwong949 Sep 9, 2021
217624a
address Jason's comment on extra var in shouldRetainRecord
mattwong949 Sep 9, 2021
890c355
move legacyDeleteHorizonMs computation to doClean
mattwong949 Sep 9, 2021
9f69673
remove DeleteHorizon cleaning pass and LogCleaningReason
mattwong949 Sep 10, 2021
6f2f2b7
remove logic for delethorizon-triggered cleaning
mattwong949 Sep 10, 2021
2477f34
address last comments on code comments, readability, logging
mattwong949 Sep 10, 2021
b113c47
address Jun's comments
mattwong949 Sep 15, 2021
cf7b0a7
fix comment
mattwong949 Sep 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
<module name="MethodLength"/>
<module name="ParameterNumber">
<!-- default is 8 -->
<property name="max" value="14"/>
<property name="max" value="13"/>
</module>
<module name="ClassDataAbstractionCoupling">
<!-- default is 7 -->
Expand All @@ -133,7 +133,7 @@
</module>
<module name="CyclomaticComplexity">
<!-- default is 10-->
<property name="max" value="18"/>
<property name="max" value="16"/>
</module>
<module name="JavaNCSS">
<!-- default is 50 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
* the previous value prior to becoming empty.
*
* The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
* the time for which tombstones / transaction markers needs to be removed. If it is true, then the first timestamp is
* the time for which tombstones / transaction markers need to be removed. If it is true, then the first timestamp is
* the delete horizon, otherwise, it is merely the first timestamp of the record batch.
*
* The current attributes are given below:
Expand Down Expand Up @@ -167,21 +167,9 @@ public void ensureValid() {
* {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
mattwong949 marked this conversation as resolved.
Show resolved Hide resolved
*/
public long baseTimestamp() {
hachikuji marked this conversation as resolved.
Show resolved Hide resolved
return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
}

/**
* Get the timestamp of the first record in this batch. It is usually the create time of the record even if the
* timestamp type of the batch is log append time.
*
* @return The first timestamp if a record has been appended, unless the delete horizon has been set
* {@link RecordBatch#NO_TIMESTAMP} if the batch is empty or if the delete horizon is set
*/
public long firstTimestamp() {
final long baseTimestamp = baseTimestamp();
if (hasDeleteHorizonMs())
return RecordBatch.NO_TIMESTAMP;
mattwong949 marked this conversation as resolved.
Show resolved Hide resolved
return baseTimestamp;
return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
mattwong949 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -272,9 +260,8 @@ public boolean hasDeleteHorizonMs() {

@Override
public long deleteHorizonMs() {
mattwong949 marked this conversation as resolved.
Show resolved Hide resolved
final long baseTimestamp = baseTimestamp();
if (hasDeleteHorizonMs())
return baseTimestamp;
return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
return RecordBatch.NO_TIMESTAMP;
}

Expand Down Expand Up @@ -322,9 +309,9 @@ private CloseableIterator<Record> uncompressedIterator() {
buffer.position(RECORDS_OFFSET);
return new RecordIterator() {
@Override
protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime);
return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (BufferUnderflowException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
}
Expand Down Expand Up @@ -590,15 +577,15 @@ public static int decrementSequence(int sequence, int decrement) {
private abstract class RecordIterator implements CloseableIterator<Record> {
private final Long logAppendTime;
private final long baseOffset;
private final long firstTimestamp;
private final long baseTimestamp;
private final int baseSequence;
private final int numRecords;
private int readRecords = 0;

RecordIterator() {
this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
this.baseOffset = baseOffset();
this.firstTimestamp = firstTimestamp();
this.baseTimestamp = baseTimestamp();
this.baseSequence = baseSequence();
int numRecords = count();
if (numRecords < 0)
Expand All @@ -618,7 +605,7 @@ public Record next() {
throw new NoSuchElementException();

readRecords++;
Record rec = readNext(baseOffset, firstTimestamp, baseSequence, logAppendTime);
Record rec = readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime);
if (readRecords == numRecords) {
// Validate that the actual size of the batch is equal to declared size
// by checking that after reading declared number of items, there no items left
Expand All @@ -629,7 +616,7 @@ public Record next() {
return rec;
}

protected abstract Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime);
protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime);

protected abstract boolean ensureNoneRemaining();

Expand All @@ -651,9 +638,9 @@ private abstract class StreamRecordIterator extends RecordIterator {
abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException;

@Override
protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
return doReadRecord(baseOffset, firstTimestamp, baseSequence, logAppendTime);
return doReadRecord(baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (EOFException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testWriteEmptyHeader() {
assertEquals(isTransactional, batch.isTransactional());
assertEquals(timestampType, batch.timestampType());
assertEquals(timestamp, batch.maxTimestamp());
assertEquals(RecordBatch.NO_TIMESTAMP, batch.firstTimestamp());
assertEquals(RecordBatch.NO_TIMESTAMP, batch.baseTimestamp());
assertEquals(isControlBatch, batch.isControlBatch());
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,7 @@ class LogValidatorTest {
def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = {
batch match {
case b: DefaultRecordBatch =>
assertEquals(expected, b.firstTimestamp, s"Unexpected base timestamp of batch $batch")
assertEquals(expected, b.baseTimestamp, s"Unexpected base timestamp of batch $batch")
case _ => // no-op
}
}
Expand Down