Skip to content

Commit

Permalink
[KAFKA-8522] Streamline tombstone and transaction marker removal (#10914
Browse files Browse the repository at this point in the history
)

This PR aims to remove tombstones that persist indefinitely due to low throughput. Previously, deleteHorizon was calculated from the segment's last modified time.

In this PR, the deleteHorizon will now be tracked in the baseTimestamp of RecordBatches. After the first cleaning pass that finds a record batch with tombstones, the record batch is recopied with deleteHorizon flag and a new baseTimestamp that is the deleteHorizonMs. The records in the batch are rebuilt with relative timestamps based on the deleteHorizonMs that is recorded. Later cleaning passes will be able to remove tombstones more accurately on their deleteHorizon due to the individual time tracking on record batches.

KIP 534: https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds

Co-authored-by: Ted Yu <[email protected]>
Co-authored-by: Richard Yu <[email protected]>
  • Loading branch information
3 people authored Sep 16, 2021
1 parent 7de8a93 commit 6c80643
Show file tree
Hide file tree
Showing 18 changed files with 499 additions and 191 deletions.
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
<suppress checks="ParameterNumber"
files="DefaultRecordBatch.java"/>
<suppress checks="ParameterNumber"
files="Sender.java"/>
files="MemoryRecordsBuilder.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
Expand All @@ -68,7 +68,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator).java"/>
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.OptionalLong;

import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
Expand Down Expand Up @@ -213,6 +214,11 @@ public boolean isControlBatch() {
return false;
}

@Override
public OptionalLong deleteHorizonMs() {
return OptionalLong.empty();
}

/**
* Get an iterator for the nested entries contained within this batch. Note that
* if the batch is not compressed, then this method will return an iterator over the
Expand Down Expand Up @@ -464,6 +470,11 @@ public long offset() {
return buffer.getLong(OFFSET_OFFSET);
}

@Override
public OptionalLong deleteHorizonMs() {
return OptionalLong.empty();
}

@Override
public LegacyRecord outerRecord() {
return record;
Expand Down Expand Up @@ -553,6 +564,11 @@ public long baseOffset() {
return loadFullBatch().baseOffset();
}

@Override
public OptionalLong deleteHorizonMs() {
return OptionalLong.empty();
}

@Override
public long lastOffset() {
return offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.OptionalLong;

import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;

Expand Down Expand Up @@ -90,11 +91,15 @@
* by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
* the previous value prior to becoming empty.
*
* The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
* the time for which tombstones / transaction markers need to be removed. If it is true, then the first timestamp is
* the delete horizon, otherwise, it is merely the first timestamp of the record batch.
*
* The current attributes are given below:
*
* -------------------------------------------------------------------------------------------------
* | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* -------------------------------------------------------------------------------------------------
* ---------------------------------------------------------------------------------------------------------------------------
* | Unused (7-15) | Delete Horizon Flag (6) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* ---------------------------------------------------------------------------------------------------------------------------
*/
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
static final int BASE_OFFSET_OFFSET = 0;
Expand All @@ -111,9 +116,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
static final int ATTRIBUTE_LENGTH = 2;
public static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
static final int LAST_OFFSET_DELTA_LENGTH = 4;
static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
static final int FIRST_TIMESTAMP_LENGTH = 8;
static final int MAX_TIMESTAMP_OFFSET = FIRST_TIMESTAMP_OFFSET + FIRST_TIMESTAMP_LENGTH;
static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
static final int BASE_TIMESTAMP_LENGTH = 8;
static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH;
static final int MAX_TIMESTAMP_LENGTH = 8;
static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH;
static final int PRODUCER_ID_LENGTH = 8;
Expand All @@ -129,6 +134,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private static final byte COMPRESSION_CODEC_MASK = 0x07;
private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
private static final int CONTROL_FLAG_MASK = 0x20;
private static final byte DELETE_HORIZON_FLAG_MASK = 0x40;
private static final byte TIMESTAMP_TYPE_MASK = 0x08;

private static final int MAX_SKIP_BUFFER_SIZE = 2048;
Expand Down Expand Up @@ -156,13 +162,12 @@ public void ensureValid() {
}

/**
* Get the timestamp of the first record in this batch. It is always the create time of the record even if the
* timestamp type of the batch is log append time.
*
* @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
* Gets the base timestamp of the batch which is used to calculate the record timestamps from the deltas.
*
* @return The base timestamp
*/
public long firstTimestamp() {
return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
public long baseTimestamp() {
return buffer.getLong(BASE_TIMESTAMP_OFFSET);
}

@Override
Expand Down Expand Up @@ -246,6 +251,18 @@ public boolean isTransactional() {
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
}

private boolean hasDeleteHorizonMs() {
return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
}

@Override
public OptionalLong deleteHorizonMs() {
if (hasDeleteHorizonMs())
return OptionalLong.of(buffer.getLong(BASE_TIMESTAMP_OFFSET));
else
return OptionalLong.empty();
}

@Override
public boolean isControlBatch() {
return (attributes() & CONTROL_FLAG_MASK) > 0;
Expand Down Expand Up @@ -290,9 +307,9 @@ private CloseableIterator<Record> uncompressedIterator() {
buffer.position(RECORDS_OFFSET);
return new RecordIterator() {
@Override
protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime);
return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (BufferUnderflowException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
}
Expand Down Expand Up @@ -364,7 +381,7 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) {
if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
return;

byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch());
byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), hasDeleteHorizonMs());
buffer.putShort(ATTRIBUTES_OFFSET, attributes);
buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
long crc = computeChecksum();
Expand Down Expand Up @@ -411,7 +428,7 @@ public int hashCode() {
}

private static byte computeAttributes(CompressionType type, TimestampType timestampType,
boolean isTransactional, boolean isControl) {
boolean isTransactional, boolean isControl, boolean isDeleteHorizonSet) {
if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
"format v2 and above");
Expand All @@ -423,6 +440,8 @@ private static byte computeAttributes(CompressionType type, TimestampType timest
attributes |= COMPRESSION_CODEC_MASK & type.id;
if (timestampType == TimestampType.LOG_APPEND_TIME)
attributes |= TIMESTAMP_TYPE_MASK;
if (isDeleteHorizonSet)
attributes |= DELETE_HORIZON_FLAG_MASK;
return attributes;
}

Expand All @@ -440,8 +459,8 @@ public static void writeEmptyHeader(ByteBuffer buffer,
boolean isControlRecord) {
int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0);
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0);
}

public static void writeHeader(ByteBuffer buffer,
Expand All @@ -458,22 +477,23 @@ public static void writeHeader(ByteBuffer buffer,
int sequence,
boolean isTransactional,
boolean isControlBatch,
boolean isDeleteHorizonSet,
int partitionLeaderEpoch,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
throw new IllegalArgumentException("Invalid magic value " + magic);
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);

short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);

int position = buffer.position();
buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
buffer.putInt(position + LENGTH_OFFSET, sizeInBytes - LOG_OVERHEAD);
buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
buffer.put(position + MAGIC_OFFSET, magic);
buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp);
buffer.putLong(position + BASE_TIMESTAMP_OFFSET, firstTimestamp);
buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
Expand Down Expand Up @@ -555,15 +575,15 @@ public static int decrementSequence(int sequence, int decrement) {
private abstract class RecordIterator implements CloseableIterator<Record> {
private final Long logAppendTime;
private final long baseOffset;
private final long firstTimestamp;
private final long baseTimestamp;
private final int baseSequence;
private final int numRecords;
private int readRecords = 0;

RecordIterator() {
this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
this.baseOffset = baseOffset();
this.firstTimestamp = firstTimestamp();
this.baseTimestamp = baseTimestamp();
this.baseSequence = baseSequence();
int numRecords = count();
if (numRecords < 0)
Expand All @@ -583,7 +603,7 @@ public Record next() {
throw new NoSuchElementException();

readRecords++;
Record rec = readNext(baseOffset, firstTimestamp, baseSequence, logAppendTime);
Record rec = readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime);
if (readRecords == numRecords) {
// Validate that the actual size of the batch is equal to declared size
// by checking that after reading declared number of items, there no items left
Expand All @@ -594,7 +614,7 @@ public Record next() {
return rec;
}

protected abstract Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime);
protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime);

protected abstract boolean ensureNoneRemaining();

Expand All @@ -616,9 +636,9 @@ private abstract class StreamRecordIterator extends RecordIterator {
abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException;

@Override
protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
return doReadRecord(baseOffset, firstTimestamp, baseSequence, logAppendTime);
return doReadRecord(baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (EOFException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
} catch (IOException e) {
Expand Down Expand Up @@ -705,6 +725,11 @@ public boolean isTransactional() {
return loadBatchHeader().isTransactional();
}

@Override
public OptionalLong deleteHorizonMs() {
return loadBatchHeader().deleteHorizonMs();
}

@Override
public boolean isControlBatch() {
return loadBatchHeader().isControlBatch();
Expand Down
Loading

0 comments on commit 6c80643

Please sign in to comment.