diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index bd53af1c58977..f8bc1bfde2539 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -57,7 +57,7 @@
+ files="MemoryRecordsBuilder.java"/>
@@ -68,7 +68,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
+ files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 35ea4d36543fd..0f2ccde2cd7c8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -35,6 +35,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.OptionalLong;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
@@ -213,6 +214,11 @@ public boolean isControlBatch() {
return false;
}
+ @Override
+ public OptionalLong deleteHorizonMs() {
+ return OptionalLong.empty();
+ }
+
/**
* Get an iterator for the nested entries contained within this batch. Note that
* if the batch is not compressed, then this method will return an iterator over the
@@ -464,6 +470,11 @@ public long offset() {
return buffer.getLong(OFFSET_OFFSET);
}
+ @Override
+ public OptionalLong deleteHorizonMs() {
+ return OptionalLong.empty();
+ }
+
@Override
public LegacyRecord outerRecord() {
return record;
@@ -553,6 +564,11 @@ public long baseOffset() {
return loadFullBatch().baseOffset();
}
+ @Override
+ public OptionalLong deleteHorizonMs() {
+ return OptionalLong.empty();
+ }
+
@Override
public long lastOffset() {
return offset;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 62cab8fafe42c..ec3c7204fe67b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -37,6 +37,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.OptionalLong;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
@@ -90,11 +91,15 @@
* by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
* the previous value prior to becoming empty.
*
+ * The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
+ * the time for which tombstones / transaction markers need to be removed. If it is true, then the first timestamp is
+ * the delete horizon, otherwise, it is merely the first timestamp of the record batch.
+ *
* The current attributes are given below:
*
- * -------------------------------------------------------------------------------------------------
- * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
- * -------------------------------------------------------------------------------------------------
+ * ---------------------------------------------------------------------------------------------------------------------------
+ * | Unused (7-15) | Delete Horizon Flag (6) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
+ * ---------------------------------------------------------------------------------------------------------------------------
*/
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
static final int BASE_OFFSET_OFFSET = 0;
@@ -111,9 +116,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
static final int ATTRIBUTE_LENGTH = 2;
public static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
static final int LAST_OFFSET_DELTA_LENGTH = 4;
- static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
- static final int FIRST_TIMESTAMP_LENGTH = 8;
- static final int MAX_TIMESTAMP_OFFSET = FIRST_TIMESTAMP_OFFSET + FIRST_TIMESTAMP_LENGTH;
+ static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
+ static final int BASE_TIMESTAMP_LENGTH = 8;
+ static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH;
static final int MAX_TIMESTAMP_LENGTH = 8;
static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH;
static final int PRODUCER_ID_LENGTH = 8;
@@ -129,6 +134,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private static final byte COMPRESSION_CODEC_MASK = 0x07;
private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
private static final int CONTROL_FLAG_MASK = 0x20;
+ private static final byte DELETE_HORIZON_FLAG_MASK = 0x40;
private static final byte TIMESTAMP_TYPE_MASK = 0x08;
private static final int MAX_SKIP_BUFFER_SIZE = 2048;
@@ -156,13 +162,12 @@ public void ensureValid() {
}
/**
- * Get the timestamp of the first record in this batch. It is always the create time of the record even if the
- * timestamp type of the batch is log append time.
- *
- * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
+ * Gets the base timestamp of the batch which is used to calculate the record timestamps from the deltas.
+ *
+ * @return The base timestamp
*/
- public long firstTimestamp() {
- return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
+ public long baseTimestamp() {
+ return buffer.getLong(BASE_TIMESTAMP_OFFSET);
}
@Override
@@ -246,6 +251,18 @@ public boolean isTransactional() {
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
}
+ private boolean hasDeleteHorizonMs() {
+ return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
+ }
+
+ @Override
+ public OptionalLong deleteHorizonMs() {
+ if (hasDeleteHorizonMs())
+ return OptionalLong.of(buffer.getLong(BASE_TIMESTAMP_OFFSET));
+ else
+ return OptionalLong.empty();
+ }
+
@Override
public boolean isControlBatch() {
return (attributes() & CONTROL_FLAG_MASK) > 0;
@@ -290,9 +307,9 @@ private CloseableIterator uncompressedIterator() {
buffer.position(RECORDS_OFFSET);
return new RecordIterator() {
@Override
- protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
+ protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
- return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime);
+ return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (BufferUnderflowException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
}
@@ -364,7 +381,7 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) {
if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
return;
- byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch());
+ byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), hasDeleteHorizonMs());
buffer.putShort(ATTRIBUTES_OFFSET, attributes);
buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
long crc = computeChecksum();
@@ -411,7 +428,7 @@ public int hashCode() {
}
private static byte computeAttributes(CompressionType type, TimestampType timestampType,
- boolean isTransactional, boolean isControl) {
+ boolean isTransactional, boolean isControl, boolean isDeleteHorizonSet) {
if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
"format v2 and above");
@@ -423,6 +440,8 @@ private static byte computeAttributes(CompressionType type, TimestampType timest
attributes |= COMPRESSION_CODEC_MASK & type.id;
if (timestampType == TimestampType.LOG_APPEND_TIME)
attributes |= TIMESTAMP_TYPE_MASK;
+ if (isDeleteHorizonSet)
+ attributes |= DELETE_HORIZON_FLAG_MASK;
return attributes;
}
@@ -440,8 +459,8 @@ public static void writeEmptyHeader(ByteBuffer buffer,
boolean isControlRecord) {
int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
- CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
- producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0);
+ CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
+ producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0);
}
public static void writeHeader(ByteBuffer buffer,
@@ -458,6 +477,7 @@ public static void writeHeader(ByteBuffer buffer,
int sequence,
boolean isTransactional,
boolean isControlBatch,
+ boolean isDeleteHorizonSet,
int partitionLeaderEpoch,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
@@ -465,7 +485,7 @@ public static void writeHeader(ByteBuffer buffer,
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);
- short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
+ short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);
int position = buffer.position();
buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
@@ -473,7 +493,7 @@ public static void writeHeader(ByteBuffer buffer,
buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
buffer.put(position + MAGIC_OFFSET, magic);
buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
- buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp);
+ buffer.putLong(position + BASE_TIMESTAMP_OFFSET, firstTimestamp);
buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
@@ -555,7 +575,7 @@ public static int decrementSequence(int sequence, int decrement) {
private abstract class RecordIterator implements CloseableIterator {
private final Long logAppendTime;
private final long baseOffset;
- private final long firstTimestamp;
+ private final long baseTimestamp;
private final int baseSequence;
private final int numRecords;
private int readRecords = 0;
@@ -563,7 +583,7 @@ private abstract class RecordIterator implements CloseableIterator {
RecordIterator() {
this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
this.baseOffset = baseOffset();
- this.firstTimestamp = firstTimestamp();
+ this.baseTimestamp = baseTimestamp();
this.baseSequence = baseSequence();
int numRecords = count();
if (numRecords < 0)
@@ -583,7 +603,7 @@ public Record next() {
throw new NoSuchElementException();
readRecords++;
- Record rec = readNext(baseOffset, firstTimestamp, baseSequence, logAppendTime);
+ Record rec = readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime);
if (readRecords == numRecords) {
// Validate that the actual size of the batch is equal to declared size
// by checking that after reading declared number of items, there no items left
@@ -594,7 +614,7 @@ public Record next() {
return rec;
}
- protected abstract Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime);
+ protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime);
protected abstract boolean ensureNoneRemaining();
@@ -616,9 +636,9 @@ private abstract class StreamRecordIterator extends RecordIterator {
abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException;
@Override
- protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
+ protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
- return doReadRecord(baseOffset, firstTimestamp, baseSequence, logAppendTime);
+ return doReadRecord(baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (EOFException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
} catch (IOException e) {
@@ -705,6 +725,11 @@ public boolean isTransactional() {
return loadBatchHeader().isTransactional();
}
+ @Override
+ public OptionalLong deleteHorizonMs() {
+ return loadBatchHeader().deleteHorizonMs();
+ }
+
@Override
public boolean isControlBatch() {
return loadBatchHeader().isControlBatch();
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index b6311713c3d87..eacc2113b0063 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -23,6 +23,7 @@
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetentionResult;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
@@ -153,15 +154,20 @@ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, Byte
return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize, decompressionBufferSupplier);
}
+ /**
+ * Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record)
+ * to the delete horizon of the tombstones or txn markers which are present in the batch.
+ */
private static FilterResult filterTo(TopicPartition partition, Iterable batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
BufferSupplier decompressionBufferSupplier) {
FilterResult filterResult = new FilterResult(destinationBuffer);
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
-
for (MutableRecordBatch batch : batches) {
- long maxOffset = -1L;
- BatchRetention batchRetention = filter.checkBatchRetention(batch);
+ final BatchRetentionResult batchRetentionResult = filter.checkBatchRetention(batch);
+ final boolean containsMarkerForEmptyTxn = batchRetentionResult.containsMarkerForEmptyTxn;
+ final BatchRetention batchRetention = batchRetentionResult.batchRetention;
+
filterResult.bytesRead += batch.sizeInBytes();
if (batchRetention == BatchRetention.DELETE)
@@ -171,38 +177,33 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords = new ArrayList<>();
- try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) {
- while (iterator.hasNext()) {
- Record record = iterator.next();
- filterResult.messagesRead += 1;
-
- if (filter.shouldRetainRecord(batch, record)) {
- // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
- // the corrupted batch with correct data.
- if (!record.hasMagic(batchMagic))
- writeOriginalBatch = false;
-
- if (record.offset() > maxOffset)
- maxOffset = record.offset();
-
- retainedRecords.add(record);
- } else {
- writeOriginalBatch = false;
- }
- }
- }
+ final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter,
+ batchMagic, true, retainedRecords);
+ boolean containsTombstones = iterationResult.containsTombstones;
+ boolean writeOriginalBatch = iterationResult.writeOriginalBatch;
+ long maxOffset = iterationResult.maxOffset;
if (!retainedRecords.isEmpty()) {
- if (writeOriginalBatch) {
+ // we check if the delete horizon should be set to a new value
+ // in which case, we need to reset the base timestamp and overwrite the timestamp deltas
+ // if the batch does not contain tombstones, then we don't need to overwrite batch
+ boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn)
+ && !batch.deleteHorizonMs().isPresent();
+ if (writeOriginalBatch && !needToSetDeleteHorizon) {
batch.writeTo(bufferOutputStream);
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
} else {
- MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
+ final MemoryRecordsBuilder builder;
+ long deleteHorizonMs;
+ if (needToSetDeleteHorizon)
+ deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs;
+ else
+ deleteHorizonMs = batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP);
+ builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs);
+
MemoryRecords records = builder.build();
int filteredBatchSize = records.sizeInBytes();
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
@@ -239,9 +240,59 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords) {
+ long maxOffset = -1;
+ boolean containsTombstones = false;
+ try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) {
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ filterResult.messagesRead += 1;
+
+ if (filter.shouldRetainRecord(batch, record)) {
+ // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
+ // the corrupted batch with correct data.
+ if (!record.hasMagic(batchMagic))
+ writeOriginalBatch = false;
+
+ if (record.offset() > maxOffset)
+ maxOffset = record.offset();
+
+ retainedRecords.add(record);
+
+ if (!record.hasValue()) {
+ containsTombstones = true;
+ }
+ } else {
+ writeOriginalBatch = false;
+ }
+ }
+ return new BatchFilterResult(writeOriginalBatch, containsTombstones, maxOffset);
+ }
+ }
+
+ private static class BatchFilterResult {
+ private final boolean writeOriginalBatch;
+ private final boolean containsTombstones;
+ private final long maxOffset;
+ private BatchFilterResult(final boolean writeOriginalBatch,
+ final boolean containsTombstones,
+ final long maxOffset) {
+ this.writeOriginalBatch = writeOriginalBatch;
+ this.containsTombstones = containsTombstones;
+ this.maxOffset = maxOffset;
+ }
+ }
+
private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch,
List retainedRecords,
- ByteBufferOutputStream bufferOutputStream) {
+ ByteBufferOutputStream bufferOutputStream,
+ final long deleteHorizonMs) {
byte magic = originalBatch.magic();
TimestampType timestampType = originalBatch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ?
@@ -252,7 +303,7 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic,
originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(),
originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(),
- originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit());
+ originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit(), deleteHorizonMs);
for (Record record : retainedRecords)
builder.append(record);
@@ -303,6 +354,24 @@ public int hashCode() {
}
public static abstract class RecordFilter {
+ public final long currentTime;
+ public final long deleteRetentionMs;
+
+ public RecordFilter(final long currentTime, final long deleteRetentionMs) {
+ this.currentTime = currentTime;
+ this.deleteRetentionMs = deleteRetentionMs;
+ }
+
+ public static class BatchRetentionResult {
+ public final BatchRetention batchRetention;
+ public final boolean containsMarkerForEmptyTxn;
+ public BatchRetentionResult(final BatchRetention batchRetention,
+ final boolean containsMarkerForEmptyTxn) {
+ this.batchRetention = batchRetention;
+ this.containsMarkerForEmptyTxn = containsMarkerForEmptyTxn;
+ }
+ }
+
public enum BatchRetention {
DELETE, // Delete the batch without inspecting records
RETAIN_EMPTY, // Retain the batch even if it is empty
@@ -313,7 +382,7 @@ public enum BatchRetention {
* Check whether the full batch can be discarded (i.e. whether we even need to
* check the records individually).
*/
- protected abstract BatchRetention checkBatchRetention(RecordBatch batch);
+ protected abstract BatchRetentionResult checkBatchRetention(RecordBatch batch);
/**
* Check whether a record should be retained in the log. Note that {@link #checkBatchRetention(RecordBatch)}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 48e1ec8f7d9e2..b825a937e084b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -80,9 +80,10 @@ public void write(int b) {
private int numRecords = 0;
private float actualCompressionRatio = 1;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
+ private long deleteHorizonMs;
private long offsetOfMaxTimestamp = -1;
private Long lastOffset = null;
- private Long firstTimestamp = null;
+ private Long baseTimestamp = null;
private MemoryRecords builtRecords;
private boolean aborted = false;
@@ -99,7 +100,8 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
- int writeLimit) {
+ int writeLimit,
+ long deleteHorizonMs) {
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
if (magic < RecordBatch.MAGIC_VALUE_V2) {
@@ -109,6 +111,8 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
throw new IllegalArgumentException("Control records are not supported for magic " + magic);
if (compressionType == CompressionType.ZSTD)
throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic);
+ if (deleteHorizonMs != RecordBatch.NO_TIMESTAMP)
+ throw new IllegalArgumentException("Delete horizon timestamp is not supported for magic " + magic);
}
this.magic = magic;
@@ -125,6 +129,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
this.baseSequence = baseSequence;
this.isTransactional = isTransactional;
this.isControlBatch = isControlBatch;
+ this.deleteHorizonMs = deleteHorizonMs;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
this.initialPosition = bufferStream.position();
@@ -133,6 +138,28 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
bufferStream.position(initialPosition + batchHeaderSizeInBytes);
this.bufferStream = bufferStream;
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
+
+ if (hasDeleteHorizonMs()) {
+ this.baseTimestamp = deleteHorizonMs;
+ }
+ }
+
+ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long baseOffset,
+ long logAppendTime,
+ long producerId,
+ short producerEpoch,
+ int baseSequence,
+ boolean isTransactional,
+ boolean isControlBatch,
+ int partitionLeaderEpoch,
+ int writeLimit) {
+ this(bufferStream, magic, compressionType, timestampType, baseOffset, logAppendTime, producerId,
+ producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, writeLimit,
+ RecordBatch.NO_TIMESTAMP);
}
/**
@@ -197,6 +224,10 @@ public boolean isTransactional() {
return isTransactional;
}
+ public boolean hasDeleteHorizonMs() {
+ return magic >= RecordBatch.MAGIC_VALUE_V2 && deleteHorizonMs >= 0L;
+ }
+
/**
* Close this builder and return the resulting buffer.
* @return The built log buffer
@@ -369,8 +400,8 @@ private int writeDefaultBatchHeader() {
maxTimestamp = this.maxTimestamp;
DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
- firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
- partitionLeaderEpoch, numRecords);
+ baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
+ hasDeleteHorizonMs(), partitionLeaderEpoch, numRecords);
buffer.position(pos);
return writtenCompressed;
@@ -416,8 +447,8 @@ private void appendWithOffset(long offset, boolean isControlRecord, long timesta
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
- if (firstTimestamp == null)
- firstTimestamp = timestamp;
+ if (baseTimestamp == null)
+ baseTimestamp = timestamp;
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
@@ -624,12 +655,12 @@ public void appendUncheckedWithOffset(long offset, SimpleRecord record) throws I
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
int offsetDelta = (int) (offset - baseOffset);
long timestamp = record.timestamp();
- if (firstTimestamp == null)
- firstTimestamp = timestamp;
+ if (baseTimestamp == null)
+ baseTimestamp = timestamp;
int sizeInBytes = DefaultRecord.writeTo(appendStream,
offsetDelta,
- timestamp - firstTimestamp,
+ timestamp - baseTimestamp,
record.key(),
record.value(),
record.headers());
@@ -683,7 +714,7 @@ private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, By
Header[] headers) throws IOException {
ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
- long timestampDelta = timestamp - firstTimestamp;
+ long timestampDelta = timestamp - baseTimestamp;
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}
@@ -788,7 +819,7 @@ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Head
recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
} else {
int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
- long timestampDelta = firstTimestamp == null ? 0 : timestamp - firstTimestamp;
+ long timestampDelta = baseTimestamp == null ? 0 : timestamp - baseTimestamp;
recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 1cff7a238906d..7d231c1774367 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.OptionalLong;
/**
* A record batch is a container for records. In old versions of the record format (versions 0 and 1),
@@ -211,6 +212,12 @@ public interface RecordBatch extends Iterable {
*/
boolean isTransactional();
+ /**
+ * Get the delete horizon, returns OptionalLong.EMPTY if the first timestamp is not the delete horizon
+ * @return timestamp of the delete horizon
+ */
+ OptionalLong deleteHorizonMs();
+
/**
* Get the partition leader epoch of this record batch.
* @return The leader epoch or -1 if it is unknown
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5e96cbbdcdf5b..7b9f33d8612eb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -3147,10 +3147,10 @@ public void testUpdatePositionWithLastRecordMissingFromBatch() {
new SimpleRecord(null, "value".getBytes()));
// Remove the last record to simulate compaction
- MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter() {
+ MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter(0, 0) {
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
- return BatchRetention.DELETE_EMPTY;
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+ return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index 065f1b51b39aa..0864a2dd9f347 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -67,7 +67,7 @@ public void testWriteEmptyHeader() {
assertEquals(isTransactional, batch.isTransactional());
assertEquals(timestampType, batch.timestampType());
assertEquals(timestamp, batch.maxTimestamp());
- assertEquals(RecordBatch.NO_TIMESTAMP, batch.firstTimestamp());
+ assertEquals(RecordBatch.NO_TIMESTAMP, batch.baseTimestamp());
assertEquals(isControlBatch, batch.isControlBatch());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 5dedf66f36d32..4f3f03c3f2d21 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -19,6 +19,9 @@
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
@@ -35,6 +38,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.OptionalLong;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Supplier;
@@ -755,6 +759,48 @@ else if (iterations > 2 && memUsed < (iterations - 2) * 1024)
assertTrue(iterations < 100, "Memory usage too high: " + memUsed);
}
+ @ParameterizedTest
+ @ArgumentsSource(V2MemoryRecordsBuilderArgumentsProvider.class)
+ public void testRecordTimestampsWithDeleteHorizon(Args args) {
+ long deleteHorizon = 100;
+ int payloadLen = 1024 * 1024;
+ ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2);
+ ByteBufferOutputStream byteBufferOutputStream = new ByteBufferOutputStream(buffer);
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(byteBufferOutputStream, args.magic, args.compressionType,
+ TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH, 0, deleteHorizon);
+
+ builder.append(50L, "0".getBytes(), "0".getBytes());
+ builder.append(100L, "1".getBytes(), null);
+ builder.append(150L, "2".getBytes(), "2".getBytes());
+
+ MemoryRecords records = builder.build();
+ List batches = TestUtils.toList(records.batches());
+ assertEquals(OptionalLong.of(deleteHorizon), batches.get(0).deleteHorizonMs());
+
+ CloseableIterator recordIterator = batches.get(0).streamingIterator(BufferSupplier.create());
+ Record record = recordIterator.next();
+ assertEquals(50L, record.timestamp());
+ record = recordIterator.next();
+ assertEquals(100L, record.timestamp());
+ record = recordIterator.next();
+ assertEquals(150L, record.timestamp());
+ recordIterator.close();
+ }
+
+ private static class V2MemoryRecordsBuilderArgumentsProvider implements ArgumentsProvider {
+ @Override
+ public Stream extends Arguments> provideArguments(ExtensionContext context) {
+ List values = new ArrayList<>();
+ for (int bufferOffset : Arrays.asList(0, 15))
+ for (CompressionType type: CompressionType.values()) {
+ values.add(Arguments.of(new Args(bufferOffset, type, MAGIC_VALUE_V2)));
+ }
+ return values.stream();
+ }
+ }
+
private void verifyRecordsProcessingStats(CompressionType compressionType, RecordConversionStats processingStats,
int numRecords, int numRecordsConverted, long finalBytes,
long preConvertedBytes) {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 16a51e6e3dd17..3f0195bf5d149 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -21,8 +21,10 @@
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@@ -37,6 +39,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.OptionalLong;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -101,6 +104,18 @@ public Stream extends Arguments> provideArguments(ExtensionContext context) {
}
}
+ private static class V2MemoryRecordsArgumentsProvider implements ArgumentsProvider {
+ @Override
+ public Stream extends Arguments> provideArguments(ExtensionContext context) {
+ List arguments = new ArrayList<>();
+ for (long firstOffset : asList(0L, 57L))
+ for (CompressionType type: CompressionType.values()) {
+ arguments.add(Arguments.of(new Args(RecordBatch.MAGIC_VALUE_V2, firstOffset, type)));
+ }
+ return arguments.stream();
+ }
+ }
+
private final long logAppendTime = System.currentTimeMillis();
@ParameterizedTest
@@ -316,11 +331,11 @@ public void testFilterToEmptyBatchRetention(Args args) {
MemoryRecords records = builder.build();
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
- new MemoryRecords.RecordFilter() {
+ new MemoryRecords.RecordFilter(0, 0) {
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
// retain all batches
- return BatchRetention.RETAIN_EMPTY;
+ return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false);
}
@Override
@@ -378,11 +393,11 @@ public void testEmptyBatchRetention() {
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords records = MemoryRecords.readableRecords(buffer);
MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
- new MemoryRecords.RecordFilter() {
+ new MemoryRecords.RecordFilter(0, 0) {
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
// retain all batches
- return BatchRetention.RETAIN_EMPTY;
+ return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false);
}
@Override
@@ -426,10 +441,10 @@ public void testEmptyBatchDeletion() {
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords records = MemoryRecords.readableRecords(buffer);
MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
- new MemoryRecords.RecordFilter() {
+ new MemoryRecords.RecordFilter(0, 0) {
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
- return deleteRetention;
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+ return new BatchRetentionResult(deleteRetention, false);
}
@Override
@@ -483,6 +498,53 @@ public void testBuildEndTxnMarker() {
assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
}
+ /**
+ * This test is used to see if the base timestamp of the batch has been successfully
+ * converted to a delete horizon for the tombstones / transaction markers of the batch.
+ * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon.
+ */
+ @ParameterizedTest
+ @ArgumentsSource(V2MemoryRecordsArgumentsProvider.class)
+ public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+ int partitionLeaderEpoch = 998;
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME,
+ 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+ builder.append(5L, "0".getBytes(), "0".getBytes());
+ builder.append(10L, "1".getBytes(), null);
+ builder.append(15L, "2".getBytes(), "2".getBytes());
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ final long deleteHorizon = Integer.MAX_VALUE / 2;
+ final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {
+ @Override
+ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+ return true;
+ }
+
+ @Override
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+ return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false);
+ }
+ };
+ builder.build().filterTo(new TopicPartition("random", 0), recordFilter, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+ filtered.flip();
+ MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+ List batches = TestUtils.toList(filteredRecords.batches());
+ assertEquals(1, batches.size());
+ assertEquals(OptionalLong.of(deleteHorizon), batches.get(0).deleteHorizonMs());
+
+ CloseableIterator recordIterator = batches.get(0).streamingIterator(BufferSupplier.create());
+ Record record = recordIterator.next();
+ assertEquals(5L, record.timestamp());
+ record = recordIterator.next();
+ assertEquals(10L, record.timestamp());
+ record = recordIterator.next();
+ assertEquals(15L, record.timestamp());
+ recordIterator.close();
+ }
+
@Test
public void testBuildLeaderChangeMessage() {
final int leaderId = 5;
@@ -554,13 +616,13 @@ public void testFilterToBatchDiscard(Args args) {
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
- MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
+ MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter(0, 0) {
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
// discard the second and fourth batches
if (batch.lastOffset() == 2L || batch.lastOffset() == 6L)
- return BatchRetention.DELETE;
- return BatchRetention.DELETE_EMPTY;
+ return new BatchRetentionResult(BatchRetention.DELETE, false);
+ return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
}
@Override
@@ -1012,9 +1074,13 @@ public void testUnsupportedCompress() {
}
private static class RetainNonNullKeysFilter extends MemoryRecords.RecordFilter {
+ public RetainNonNullKeysFilter() {
+ super(0, 0);
+ }
+
@Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
- return BatchRetention.DELETE_EMPTY;
+ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
+ return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false);
}
@Override
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 1655dfb794555..baf937d6415eb 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -67,6 +67,8 @@ import scala.util.control.ControlThrowable
* The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic
* basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed).
* Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning.
+ * This time is tracked by setting the base timestamp of a record batch with delete markers when the batch is recopied in the first cleaning that encounters
+ * it. The relative timestamps of the records in the batch are also modified when recopied in this cleaning according to the new base timestamp of the batch.
*
* Note that cleaning is more complicated with the idempotent/transactional producer capabilities. The following
* are the key points:
@@ -342,7 +344,8 @@ class LogCleaner(initialConfig: CleanerConfig,
@throws(classOf[LogCleaningException])
private def cleanFilthiestLog(): Boolean = {
val preCleanStats = new PreCleanStats()
- val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match {
+ val ltc = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats)
+ val cleaned = ltc match {
case None =>
false
case Some(cleanable) =>
@@ -493,19 +496,20 @@ private[log] class Cleaner(val id: Int,
* @return The first offset not cleaned and the statistics for this round of cleaning
*/
private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
+ doClean(cleanable, time.milliseconds())
+ }
+
+ private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = {
+ info("Beginning cleaning of log %s".format(cleanable.log.name))
+
// figure out the timestamp below which it is safe to remove delete tombstones
// this position is defined to be a configurable time beneath the last modified time of the last clean segment
- val deleteHorizonMs =
+ // this timestamp is only used on the older message formats older than MAGIC_VALUE_V2
+ val legacyDeleteHorizonMs =
cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
- }
-
- doClean(cleanable, deleteHorizonMs)
- }
-
- private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
- info("Beginning cleaning of log %s.".format(cleanable.log.name))
+ }
val log = cleanable.log
val stats = new CleanerStats()
@@ -522,13 +526,13 @@ private[log] class Cleaner(val id: Int,
val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
// group the segments and clean the groups
- info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
+ info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to upper bound deletion horizon %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs)))
val transactionMetadata = new CleanedTransactionMetadata
val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
log.config.maxIndexSize, cleanable.firstUncleanableOffset)
for (group <- groupedSegments)
- cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata)
+ cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs)
// record buffer utilization
stats.bufferUtilization = offsetMap.utilization
@@ -544,17 +548,19 @@ private[log] class Cleaner(val id: Int,
* @param log The log being cleaned
* @param segments The group of segments being cleaned
* @param map The offset map to use for cleaning segments
- * @param deleteHorizonMs The time to retain delete tombstones
+ * @param currentTime The current time in milliseconds
* @param stats Collector for cleaning statistics
* @param transactionMetadata State of ongoing transactions which is carried between the cleaning
* of the grouped segments
+ * @param legacyDeleteHorizonMs The delete horizon used for tombstones whose version is less than 2
*/
private[log] def cleanSegments(log: UnifiedLog,
segments: Seq[LogSegment],
map: OffsetMap,
- deleteHorizonMs: Long,
+ currentTime: Long,
stats: CleanerStats,
- transactionMetadata: CleanedTransactionMetadata): Unit = {
+ transactionMetadata: CleanedTransactionMetadata,
+ legacyDeleteHorizonMs: Long): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes
val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
@@ -574,14 +580,15 @@ private[log] class Cleaner(val id: Int,
val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
transactionMetadata.addAbortedTransactions(abortedTransactions)
- val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
+ val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified > legacyDeleteHorizonMs
info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
- s"with deletion horizon $deleteHorizonMs, " +
- s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
+ s"with an upper bound deletion horizon $legacyDeleteHorizonMs computed from " +
+ s"the segment last modified time of ${currentSegment.lastModified}," +
+ s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.")
try {
- cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize,
- transactionMetadata, lastOffsetOfActiveProducers, stats)
+ cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainLegacyDeletesAndTxnMarkers, log.config.deleteRetentionMs,
+ log.config.maxMessageSize, transactionMetadata, lastOffsetOfActiveProducers, stats, currentTime = currentTime)
} catch {
case e: LogSegmentOffsetOverflowException =>
// Split the current segment. It's also safest to abort the current cleaning process, so that we retry from
@@ -622,26 +629,35 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
- * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment
+ * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment
+ * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding topic
* @param stats Collector for cleaning statistics
+ * @param currentTime The time at which the clean was initiated
*/
private[log] def cleanInto(topicPartition: TopicPartition,
sourceRecords: FileRecords,
dest: LogSegment,
map: OffsetMap,
- retainDeletesAndTxnMarkers: Boolean,
+ retainLegacyDeletesAndTxnMarkers: Boolean,
+ deleteRetentionMs: Long,
maxLogMessageSize: Int,
transactionMetadata: CleanedTransactionMetadata,
lastRecordsOfActiveProducers: Map[Long, LastRecord],
- stats: CleanerStats): Unit = {
- val logCleanerFilter: RecordFilter = new RecordFilter {
+ stats: CleanerStats,
+ currentTime: Long): Unit = {
+ val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
var discardBatchRecords: Boolean = _
- override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+ override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = {
// we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
// note that we will never delete a marker until all the records from that transaction are removed.
- discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers)
+ val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+ if (batch.isControlBatch)
+ discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime
+ else
+ discardBatchRecords = canDiscardBatch
def isBatchLastRecordOfProducer: Boolean = {
// We retain the batch in order to preserve the state of active producers. There are three cases:
@@ -658,12 +674,14 @@ private[log] class Cleaner(val id: Int,
}
}
- if (batch.hasProducerId && isBatchLastRecordOfProducer)
- BatchRetention.RETAIN_EMPTY
- else if (discardBatchRecords)
- BatchRetention.DELETE
- else
- BatchRetention.DELETE_EMPTY
+ val batchRetention: BatchRetention =
+ if (batch.hasProducerId && isBatchLastRecordOfProducer)
+ BatchRetention.RETAIN_EMPTY
+ else if (discardBatchRecords)
+ BatchRetention.DELETE
+ else
+ BatchRetention.DELETE_EMPTY
+ new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch && batch.isControlBatch)
}
override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = {
@@ -671,7 +689,7 @@ private[log] class Cleaner(val id: Int,
// The batch is only retained to preserve producer sequence information; the records can be removed
false
else
- Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats)
+ Cleaner.this.shouldRetainRecord(map, retainLegacyDeletesAndTxnMarkers, batch, record, stats, currentTime = currentTime)
}
}
@@ -686,6 +704,7 @@ private[log] class Cleaner(val id: Int,
val records = MemoryRecords.readableRecords(readBuffer)
throttler.maybeThrottle(records.sizeInBytes)
val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize, decompressionBufferSupplier)
+
stats.readMessages(result.messagesRead, result.bytesRead)
stats.recopyMessages(result.messagesRetained, result.bytesRetained)
@@ -747,22 +766,19 @@ private[log] class Cleaner(val id: Int,
}
private def shouldDiscardBatch(batch: RecordBatch,
- transactionMetadata: CleanedTransactionMetadata,
- retainTxnMarkers: Boolean): Boolean = {
- if (batch.isControlBatch) {
- val canDiscardControlBatch = transactionMetadata.onControlBatchRead(batch)
- canDiscardControlBatch && !retainTxnMarkers
- } else {
- val canDiscardBatch = transactionMetadata.onBatchRead(batch)
- canDiscardBatch
- }
+ transactionMetadata: CleanedTransactionMetadata): Boolean = {
+ if (batch.isControlBatch)
+ transactionMetadata.onControlBatchRead(batch)
+ else
+ transactionMetadata.onBatchRead(batch)
}
private def shouldRetainRecord(map: kafka.log.OffsetMap,
- retainDeletes: Boolean,
+ retainDeletesForLegacyRecords: Boolean,
batch: RecordBatch,
record: Record,
- stats: CleanerStats): Boolean = {
+ stats: CleanerStats,
+ currentTime: Long): Boolean = {
val pastLatestOffset = record.offset > map.latestOffset
if (pastLatestOffset)
return true
@@ -776,7 +792,14 @@ private[log] class Cleaner(val id: Int,
* 2) The message doesn't has value but it can't be deleted now.
*/
val latestOffsetForKey = record.offset() >= foundOffset
- val isRetainedValue = record.hasValue || retainDeletes
+ val legacyRecord = batch.magic() < RecordBatch.MAGIC_VALUE_V2
+ def shouldRetainDeletes = {
+ if (!legacyRecord)
+ !batch.deleteHorizonMs().isPresent || currentTime < batch.deleteHorizonMs().getAsLong
+ else
+ retainDeletesForLegacyRecords
+ }
+ val isRetainedValue = record.hasValue || shouldRetainDeletes
latestOffsetForKey && isRetainedValue
} else {
stats.invalidMessage()
@@ -1111,8 +1134,6 @@ private[log] class CleanedTransactionMetadata {
case ControlRecordType.ABORT =>
ongoingAbortedTxns.remove(producerId) match {
// Retain the marker until all batches from the transaction have been removed.
- // We may retain a record from an aborted transaction if it is the last entry
- // written by a given producerId.
case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
cleanedIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
false
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index c0030b12e5382..02d6a30ad2b0a 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -169,11 +169,11 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val now = time.milliseconds
this.timeOfLastRun = now
val lastClean = allCleanerCheckpoints
+
val dirtyLogs = logs.filter {
- case (_, log) => log.config.compact // match logs that are marked as compacted
+ case (_, log) => log.config.compact
}.filterNot {
case (topicPartition, log) =>
- // skip any logs already in-progress and uncleanable partitions
inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition)
}.map {
case (topicPartition, log) => // create a LogToClean instance for each
@@ -198,9 +198,10 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val cleanableLogs = dirtyLogs.filter { ltc =>
(ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio
}
- if(cleanableLogs.isEmpty) {
+
+ if (cleanableLogs.isEmpty)
None
- } else {
+ else {
preCleanStats.recordCleanablePartitions(cleanableLogs.size)
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 5a65fe3c58aa3..5f6a56d21feb0 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -18,6 +18,7 @@
package kafka.log
import java.io.PrintWriter
+
import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import kafka.utils.{MockTime, TestUtils}
@@ -186,7 +187,8 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K
}
}
- private def writeKeyDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] = {
+ private def writeKeyDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType, timestamp: Long,
+ startValue: Int, step: Int): Seq[(Int, Int)] = {
var valCounter = startValue
for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
val curValue = valCounter
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index e20e661431e52..c4a71cc22cc97 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -195,7 +195,6 @@ class LogCleanerManagerTest extends Logging {
cleanerManager.setCleaningState(tp2, LogCleaningInProgress)
val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get
-
assertEquals(tp1, filthiestLog.topicPartition)
assertEquals(tp1, filthiestLog.log.topicPartition)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 0a95735378008..8f1d241a5d444 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -52,6 +52,8 @@ class LogCleanerTest {
val logConfig = LogConfig(logProps)
val time = new MockTime()
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
+ val tombstoneRetentionMs = 86400000
+ val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
@AfterEach
def teardown(): Unit = {
@@ -84,8 +86,8 @@ class LogCleanerTest {
val segments = log.logSegments.take(3).toSeq
val stats = new CleanerStats()
val expectedBytesRead = segments.map(_.size).sum
- cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata)
val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
+ cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata, -1)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
assertEquals(expectedBytesRead, stats.bytesRead)
}
@@ -170,7 +172,7 @@ class LogCleanerTest {
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats()
cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats)
- cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata)
+ cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata, -1)
// Validate based on the file name that log segment file is renamed exactly once for async deletion
assertEquals(expectedFileName, firstLogFile.file().getPath)
@@ -365,7 +367,7 @@ class LogCleanerTest {
log.roll()
// cannot remove the marker in this pass because there are still valid records
- var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(1, 3, 2), LogTestUtils.keysInLog(log))
assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
@@ -374,17 +376,17 @@ class LogCleanerTest {
log.roll()
// the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
- // delete horizon forced to 0 to verify marker is not removed early
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
+ // clean again with same timestamp to verify marker is not removed early
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
- // clean again with large delete horizon and verify the marker is removed
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ // clean again with max timestamp to verify the marker is removed
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
}
@@ -413,11 +415,12 @@ class LogCleanerTest {
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
log.roll()
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2), LogTestUtils.keysInLog(log))
assertEquals(List(1, 3, 4), offsetsInLog(log))
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ // In the first pass, the deleteHorizon for {Producer2: Commit} is set. In the second pass, it's removed.
+ runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4), offsetsInLog(log))
}
@@ -454,14 +457,14 @@ class LogCleanerTest {
// first time through the records are removed
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
- var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
// the empty batch remains if cleaned again because it still holds the last sequence
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
@@ -475,13 +478,15 @@ class LogCleanerTest {
log.roll()
// Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ // The deleteHorizon for {Producer2: Commit} is still not set yet.
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
// Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ // In the first pass, the deleteHorizon for {Producer2: Commit} is set. In the second pass, it's removed.
+ dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
@@ -506,14 +511,16 @@ class LogCleanerTest {
// first time through the control batch is retained as an empty batch
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
- var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ // In the first pass, the deleteHorizon for the commit marker is set. In the second pass, the commit marker is removed
+ // but the empty batch is retained for preserving the producer epoch.
+ var dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
// the empty control batch does not cause an exception when cleaned
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
@@ -537,7 +544,7 @@ class LogCleanerTest {
log.roll()
// Both the record and the marker should remain after cleaning
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(0, 1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
}
@@ -562,12 +569,12 @@ class LogCleanerTest {
// Both the batch and the marker should remain after cleaning. The batch is retained
// because it is the last entry for this producerId. The marker is retained because
// there are still batches remaining from this transaction.
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
// The empty batch and the marker is still retained after a second cleaning.
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
}
@@ -591,13 +598,13 @@ class LogCleanerTest {
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator)
log.roll()
- // delete horizon set to 0 to verify marker is not removed early
- val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1
+ // Aborted records are removed, but the abort marker is still preserved.
+ val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5), offsetsInLog(log))
- // clean again with large delete horizon and verify the marker is removed
- cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ // In the first pass, the delete horizon for the abort marker is set. In the second pass, the abort marker is removed.
+ runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5), offsetsInLog(log))
}
@@ -633,12 +640,12 @@ class LogCleanerTest {
// Both transactional batches will be cleaned. The last one will remain in the log
// as an empty batch in order to preserve the producer sequence number and epoch
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(1, 3, 4, 5), offsetsInLog(log))
assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
- // On the second round of cleaning, the marker from the first transaction should be removed.
- cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)
+ // In the first pass, the delete horizon for the first marker is set. In the second pass, the first marker is removed.
+ runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(3, 4, 5), offsetsInLog(log))
assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
}
@@ -670,14 +677,14 @@ class LogCleanerTest {
assertAbortedTransactionIndexed()
// first time through the records are removed
- var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertAbortedTransactionIndexed()
assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is retained
assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is retained
// the empty batch remains if cleaned again because it still holds the last sequence
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertAbortedTransactionIndexed()
assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained
@@ -687,13 +694,14 @@ class LogCleanerTest {
appendProducer(Seq(1))
log.roll()
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertAbortedTransactionIndexed()
assertEquals(List(1), LogTestUtils.keysInLog(log))
assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet gone because we read the empty batch
assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not preserve the empty batch
- dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1
+ // In the first pass, the delete horizon for the abort marker is set. In the second pass, the abort marker is removed.
+ dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(1), LogTestUtils.keysInLog(log))
assertEquals(List(3), offsetsInLog(log)) // abort marker is gone
assertEquals(List(3), lastOffsetsPerBatchInLog(log))
@@ -728,7 +736,7 @@ class LogCleanerTest {
// clean the log
val stats = new CleanerStats()
- cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata)
+ cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata, -1)
val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_))
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@@ -741,7 +749,7 @@ class LogCleanerTest {
val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
- cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
+ cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
val shouldRemain = LogTestUtils.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString))
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
@@ -760,7 +768,7 @@ class LogCleanerTest {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
- cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
+ cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
)
}
@@ -777,7 +785,7 @@ class LogCleanerTest {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
- cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
+ cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
)
}
@@ -1112,7 +1120,7 @@ class LogCleanerTest {
keys.foreach(k => map.put(key(k), Long.MaxValue))
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats(),
- new CleanedTransactionMetadata)
+ new CleanedTransactionMetadata, -1)
)
}
@@ -1372,7 +1380,7 @@ class LogCleanerTest {
// Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort.
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, Seq(segmentWithOverflow), offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata)
+ new CleanedTransactionMetadata, -1)
)
assertEquals(numSegmentsInitial + 1, log.logSegments.size)
assertEquals(allKeys, LogTestUtils.keysInLog(log))
@@ -1381,7 +1389,7 @@ class LogCleanerTest {
// Clean each segment now that split is complete.
for (segmentToClean <- log.logSegments)
cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata)
+ new CleanedTransactionMetadata, -1)
assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
assertFalse(LogTestUtils.hasOffsetOverflow(log))
log.close()
@@ -1422,7 +1430,7 @@ class LogCleanerTest {
// clean the log
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
var cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1438,7 +1446,7 @@ class LogCleanerTest {
// clean again
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1455,7 +1463,7 @@ class LogCleanerTest {
// clean again
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1477,7 +1485,7 @@ class LogCleanerTest {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1495,7 +1503,7 @@ class LogCleanerTest {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1513,7 +1521,7 @@ class LogCleanerTest {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
- new CleanedTransactionMetadata)
+ new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
@@ -1654,6 +1662,7 @@ class LogCleanerTest {
key = "0".getBytes,
timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000), leaderEpoch = 0)
log.roll()
+
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset))
assertEquals(1, log.logSegments.head.log.batches.iterator.next().lastOffset,
"The tombstone should be retained.")
@@ -1854,6 +1863,19 @@ class LogCleanerTest {
private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): UnifiedLog = {
LogTestUtils.recoverAndCheck(dir, config, expectedKeys, new BrokerTopicStats(), time, time.scheduler)
}
+
+ /**
+ * We need to run a two pass clean to perform the following steps to stimulate a proper clean:
+ * 1. On the first run, set the delete horizon in the batches with tombstone or markers with empty txn records.
+ * 2. For the second pass, we will advance the current time by tombstoneRetentionMs, which will cause the
+ * tombstones to expire, leading to their prompt removal from the log.
+ * Returns the first dirty offset in the log as a result of the second cleaning.
+ */
+ private def runTwoPassClean(cleaner: Cleaner, logToClean: LogToClean, currentTime: Long,
+ tombstoneRetentionMs: Long = 86400000) : Long = {
+ cleaner.doClean(logToClean, currentTime)
+ cleaner.doClean(logToClean, currentTime + tombstoneRetentionMs + 1)._1
+ }
}
class FakeOffsetMap(val slots: Int) extends OffsetMap {
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index af585bfd49605..4275684230736 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -1521,7 +1521,7 @@ class LogValidatorTest {
def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = {
batch match {
case b: DefaultRecordBatch =>
- assertEquals(expected, b.firstTimestamp, s"Unexpected base timestamp of batch $batch")
+ assertEquals(expected, b.baseTimestamp, s"Unexpected base timestamp of batch $batch")
case _ => // no-op
}
}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 79671341acc6c..be63413316b76 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -33,7 +33,6 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
-import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
@@ -749,8 +748,9 @@ class UnifiedLogTest {
records.batches.forEach(_.setPartitionLeaderEpoch(0))
val filtered = ByteBuffer.allocate(2048)
- records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
- override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY
+ records.filterTo(new TopicPartition("foo", 0), new RecordFilter(0, 0) {
+ override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult =
+ new RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.DELETE_EMPTY, false)
override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
}, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
filtered.flip()
@@ -790,8 +790,9 @@ class UnifiedLogTest {
records.batches.forEach(_.setPartitionLeaderEpoch(0))
val filtered = ByteBuffer.allocate(2048)
- records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
- override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.RETAIN_EMPTY
+ records.filterTo(new TopicPartition("foo", 0), new RecordFilter(0, 0) {
+ override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult =
+ new RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.RETAIN_EMPTY, true)
override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = false
}, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
filtered.flip()
@@ -833,8 +834,9 @@ class UnifiedLogTest {
records.batches.forEach(_.setPartitionLeaderEpoch(0))
val filtered = ByteBuffer.allocate(2048)
- records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
- override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY
+ records.filterTo(new TopicPartition("foo", 0), new RecordFilter(0, 0) {
+ override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult =
+ new RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.DELETE_EMPTY, false)
override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey
}, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
filtered.flip()
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
index d83ad4459b601..982040b84ee51 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
@@ -256,6 +256,7 @@ private void writeDefaultBatchHeader() {
RecordBatch.NO_SEQUENCE,
false,
isControlBatch,
+ false,
leaderEpoch,
numRecords()
);