From 61ac6da6321508ef7afed7c4f662583f2957a710 Mon Sep 17 00:00:00 2001 From: Matthew Wong Date: Thu, 16 Sep 2021 09:17:15 -0700 Subject: [PATCH] [KAFKA-8522] Streamline tombstone and transaction marker removal (#10914) This PR aims to remove tombstones that persist indefinitely due to low throughput. Previously, deleteHorizon was calculated from the segment's last modified time. In this PR, the deleteHorizon will now be tracked in the baseTimestamp of RecordBatches. After the first cleaning pass that finds a record batch with tombstones, the record batch is recopied with deleteHorizon flag and a new baseTimestamp that is the deleteHorizonMs. The records in the batch are rebuilt with relative timestamps based on the deleteHorizonMs that is recorded. Later cleaning passes will be able to remove tombstones more accurately on their deleteHorizon due to the individual time tracking on record batches. KIP 534: https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds Co-authored-by: Ted Yu Co-authored-by: Richard Yu --- checkstyle/suppressions.xml | 4 +- .../record/AbstractLegacyRecordBatch.java | 16 +++ .../common/record/DefaultRecordBatch.java | 77 +++++++---- .../kafka/common/record/MemoryRecords.java | 129 ++++++++++++++---- .../common/record/MemoryRecordsBuilder.java | 53 +++++-- .../kafka/common/record/RecordBatch.java | 7 + .../consumer/internals/FetcherTest.java | 6 +- .../common/record/DefaultRecordBatchTest.java | 2 +- .../record/MemoryRecordsBuilderTest.java | 46 +++++++ .../common/record/MemoryRecordsTest.java | 96 +++++++++++-- .../src/main/scala/kafka/log/LogCleaner.scala | 113 ++++++++------- .../scala/kafka/log/LogCleanerManager.scala | 9 +- .../kafka/log/LogCleanerIntegrationTest.scala | 4 +- .../kafka/log/LogCleanerManagerTest.scala | 1 - .../scala/unit/kafka/log/LogCleanerTest.scala | 108 +++++++++------ .../unit/kafka/log/LogValidatorTest.scala | 2 +- .../scala/unit/kafka/log/UnifiedLogTest.scala | 16 ++- .../kafka/raft/internals/BatchBuilder.java | 1 + 18 files changed, 499 insertions(+), 191 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index bd53af1c58977..f8bc1bfde2539 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -57,7 +57,7 @@ + files="MemoryRecordsBuilder.java"/> @@ -68,7 +68,7 @@ files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/> + files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords).java"/> diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 35ea4d36543fd..0f2ccde2cd7c8 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -35,6 +35,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.OptionalLong; import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; import static org.apache.kafka.common.record.Records.OFFSET_OFFSET; @@ -213,6 +214,11 @@ public boolean isControlBatch() { return false; } + @Override + public OptionalLong deleteHorizonMs() { + return OptionalLong.empty(); + } + /** * Get an iterator for the nested entries contained within this batch. Note that * if the batch is not compressed, then this method will return an iterator over the @@ -464,6 +470,11 @@ public long offset() { return buffer.getLong(OFFSET_OFFSET); } + @Override + public OptionalLong deleteHorizonMs() { + return OptionalLong.empty(); + } + @Override public LegacyRecord outerRecord() { return record; @@ -553,6 +564,11 @@ public long baseOffset() { return loadFullBatch().baseOffset(); } + @Override + public OptionalLong deleteHorizonMs() { + return OptionalLong.empty(); + } + @Override public long lastOffset() { return offset; diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 62cab8fafe42c..ec3c7204fe67b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.OptionalLong; import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; @@ -90,11 +91,15 @@ * by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains * the previous value prior to becoming empty. * + * The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to + * the time for which tombstones / transaction markers need to be removed. If it is true, then the first timestamp is + * the delete horizon, otherwise, it is merely the first timestamp of the record batch. + * * The current attributes are given below: * - * ------------------------------------------------------------------------------------------------- - * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) | - * ------------------------------------------------------------------------------------------------- + * --------------------------------------------------------------------------------------------------------------------------- + * | Unused (7-15) | Delete Horizon Flag (6) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) | + * --------------------------------------------------------------------------------------------------------------------------- */ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch { static final int BASE_OFFSET_OFFSET = 0; @@ -111,9 +116,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe static final int ATTRIBUTE_LENGTH = 2; public static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; static final int LAST_OFFSET_DELTA_LENGTH = 4; - static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; - static final int FIRST_TIMESTAMP_LENGTH = 8; - static final int MAX_TIMESTAMP_OFFSET = FIRST_TIMESTAMP_OFFSET + FIRST_TIMESTAMP_LENGTH; + static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; + static final int BASE_TIMESTAMP_LENGTH = 8; + static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH; static final int MAX_TIMESTAMP_LENGTH = 8; static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH; static final int PRODUCER_ID_LENGTH = 8; @@ -129,6 +134,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe private static final byte COMPRESSION_CODEC_MASK = 0x07; private static final byte TRANSACTIONAL_FLAG_MASK = 0x10; private static final int CONTROL_FLAG_MASK = 0x20; + private static final byte DELETE_HORIZON_FLAG_MASK = 0x40; private static final byte TIMESTAMP_TYPE_MASK = 0x08; private static final int MAX_SKIP_BUFFER_SIZE = 2048; @@ -156,13 +162,12 @@ public void ensureValid() { } /** - * Get the timestamp of the first record in this batch. It is always the create time of the record even if the - * timestamp type of the batch is log append time. - * - * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty + * Gets the base timestamp of the batch which is used to calculate the record timestamps from the deltas. + * + * @return The base timestamp */ - public long firstTimestamp() { - return buffer.getLong(FIRST_TIMESTAMP_OFFSET); + public long baseTimestamp() { + return buffer.getLong(BASE_TIMESTAMP_OFFSET); } @Override @@ -246,6 +251,18 @@ public boolean isTransactional() { return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0; } + private boolean hasDeleteHorizonMs() { + return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0; + } + + @Override + public OptionalLong deleteHorizonMs() { + if (hasDeleteHorizonMs()) + return OptionalLong.of(buffer.getLong(BASE_TIMESTAMP_OFFSET)); + else + return OptionalLong.empty(); + } + @Override public boolean isControlBatch() { return (attributes() & CONTROL_FLAG_MASK) > 0; @@ -290,9 +307,9 @@ private CloseableIterator uncompressedIterator() { buffer.position(RECORDS_OFFSET); return new RecordIterator() { @Override - protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) { + protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { try { - return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime); + return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime); } catch (BufferUnderflowException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } @@ -364,7 +381,7 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) { if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp) return; - byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch()); + byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), hasDeleteHorizonMs()); buffer.putShort(ATTRIBUTES_OFFSET, attributes); buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp); long crc = computeChecksum(); @@ -411,7 +428,7 @@ public int hashCode() { } private static byte computeAttributes(CompressionType type, TimestampType timestampType, - boolean isTransactional, boolean isControl) { + boolean isTransactional, boolean isControl, boolean isDeleteHorizonSet) { if (timestampType == TimestampType.NO_TIMESTAMP_TYPE) throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " + "format v2 and above"); @@ -423,6 +440,8 @@ private static byte computeAttributes(CompressionType type, TimestampType timest attributes |= COMPRESSION_CODEC_MASK & type.id; if (timestampType == TimestampType.LOG_APPEND_TIME) attributes |= TIMESTAMP_TYPE_MASK; + if (isDeleteHorizonSet) + attributes |= DELETE_HORIZON_FLAG_MASK; return attributes; } @@ -440,8 +459,8 @@ public static void writeEmptyHeader(ByteBuffer buffer, boolean isControlRecord) { int offsetDelta = (int) (lastOffset - baseOffset); writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic, - CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId, - producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0); + CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId, + producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0); } public static void writeHeader(ByteBuffer buffer, @@ -458,6 +477,7 @@ public static void writeHeader(ByteBuffer buffer, int sequence, boolean isTransactional, boolean isControlBatch, + boolean isDeleteHorizonSet, int partitionLeaderEpoch, int numRecords) { if (magic < RecordBatch.CURRENT_MAGIC_VALUE) @@ -465,7 +485,7 @@ public static void writeHeader(ByteBuffer buffer, if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP) throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp); - short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch); + short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet); int position = buffer.position(); buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset); @@ -473,7 +493,7 @@ public static void writeHeader(ByteBuffer buffer, buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch); buffer.put(position + MAGIC_OFFSET, magic); buffer.putShort(position + ATTRIBUTES_OFFSET, attributes); - buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp); + buffer.putLong(position + BASE_TIMESTAMP_OFFSET, firstTimestamp); buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp); buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta); buffer.putLong(position + PRODUCER_ID_OFFSET, producerId); @@ -555,7 +575,7 @@ public static int decrementSequence(int sequence, int decrement) { private abstract class RecordIterator implements CloseableIterator { private final Long logAppendTime; private final long baseOffset; - private final long firstTimestamp; + private final long baseTimestamp; private final int baseSequence; private final int numRecords; private int readRecords = 0; @@ -563,7 +583,7 @@ private abstract class RecordIterator implements CloseableIterator { RecordIterator() { this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null; this.baseOffset = baseOffset(); - this.firstTimestamp = firstTimestamp(); + this.baseTimestamp = baseTimestamp(); this.baseSequence = baseSequence(); int numRecords = count(); if (numRecords < 0) @@ -583,7 +603,7 @@ public Record next() { throw new NoSuchElementException(); readRecords++; - Record rec = readNext(baseOffset, firstTimestamp, baseSequence, logAppendTime); + Record rec = readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime); if (readRecords == numRecords) { // Validate that the actual size of the batch is equal to declared size // by checking that after reading declared number of items, there no items left @@ -594,7 +614,7 @@ public Record next() { return rec; } - protected abstract Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime); + protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime); protected abstract boolean ensureNoneRemaining(); @@ -616,9 +636,9 @@ private abstract class StreamRecordIterator extends RecordIterator { abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException; @Override - protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) { + protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { try { - return doReadRecord(baseOffset, firstTimestamp, baseSequence, logAppendTime); + return doReadRecord(baseOffset, baseTimestamp, baseSequence, logAppendTime); } catch (EOFException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } catch (IOException e) { @@ -705,6 +725,11 @@ public boolean isTransactional() { return loadBatchHeader().isTransactional(); } + @Override + public OptionalLong deleteHorizonMs() { + return loadBatchHeader().deleteHorizonMs(); + } + @Override public boolean isControlBatch() { return loadBatchHeader().isControlBatch(); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index b6311713c3d87..eacc2113b0063 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.message.SnapshotFooterRecord; import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; +import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetentionResult; import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.ByteBufferOutputStream; @@ -153,15 +154,20 @@ public FilterResult filterTo(TopicPartition partition, RecordFilter filter, Byte return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize, decompressionBufferSupplier); } + /** + * Note: This method is also used to convert the first timestamp of the batch (which is usually the timestamp of the first record) + * to the delete horizon of the tombstones or txn markers which are present in the batch. + */ private static FilterResult filterTo(TopicPartition partition, Iterable batches, RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize, BufferSupplier decompressionBufferSupplier) { FilterResult filterResult = new FilterResult(destinationBuffer); ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer); - for (MutableRecordBatch batch : batches) { - long maxOffset = -1L; - BatchRetention batchRetention = filter.checkBatchRetention(batch); + final BatchRetentionResult batchRetentionResult = filter.checkBatchRetention(batch); + final boolean containsMarkerForEmptyTxn = batchRetentionResult.containsMarkerForEmptyTxn; + final BatchRetention batchRetention = batchRetentionResult.batchRetention; + filterResult.bytesRead += batch.sizeInBytes(); if (batchRetention == BatchRetention.DELETE) @@ -171,38 +177,33 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords = new ArrayList<>(); - try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) { - while (iterator.hasNext()) { - Record record = iterator.next(); - filterResult.messagesRead += 1; - - if (filter.shouldRetainRecord(batch, record)) { - // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite - // the corrupted batch with correct data. - if (!record.hasMagic(batchMagic)) - writeOriginalBatch = false; - - if (record.offset() > maxOffset) - maxOffset = record.offset(); - - retainedRecords.add(record); - } else { - writeOriginalBatch = false; - } - } - } + final BatchFilterResult iterationResult = filterBatch(batch, decompressionBufferSupplier, filterResult, filter, + batchMagic, true, retainedRecords); + boolean containsTombstones = iterationResult.containsTombstones; + boolean writeOriginalBatch = iterationResult.writeOriginalBatch; + long maxOffset = iterationResult.maxOffset; if (!retainedRecords.isEmpty()) { - if (writeOriginalBatch) { + // we check if the delete horizon should be set to a new value + // in which case, we need to reset the base timestamp and overwrite the timestamp deltas + // if the batch does not contain tombstones, then we don't need to overwrite batch + boolean needToSetDeleteHorizon = batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && (containsTombstones || containsMarkerForEmptyTxn) + && !batch.deleteHorizonMs().isPresent(); + if (writeOriginalBatch && !needToSetDeleteHorizon) { batch.writeTo(bufferOutputStream); filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false); } else { - MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream); + final MemoryRecordsBuilder builder; + long deleteHorizonMs; + if (needToSetDeleteHorizon) + deleteHorizonMs = filter.currentTime + filter.deleteRetentionMs; + else + deleteHorizonMs = batch.deleteHorizonMs().orElse(RecordBatch.NO_TIMESTAMP); + builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream, deleteHorizonMs); + MemoryRecords records = builder.build(); int filteredBatchSize = records.sizeInBytes(); if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) @@ -239,9 +240,59 @@ private static FilterResult filterTo(TopicPartition partition, Iterable retainedRecords) { + long maxOffset = -1; + boolean containsTombstones = false; + try (final CloseableIterator iterator = batch.streamingIterator(decompressionBufferSupplier)) { + while (iterator.hasNext()) { + Record record = iterator.next(); + filterResult.messagesRead += 1; + + if (filter.shouldRetainRecord(batch, record)) { + // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite + // the corrupted batch with correct data. + if (!record.hasMagic(batchMagic)) + writeOriginalBatch = false; + + if (record.offset() > maxOffset) + maxOffset = record.offset(); + + retainedRecords.add(record); + + if (!record.hasValue()) { + containsTombstones = true; + } + } else { + writeOriginalBatch = false; + } + } + return new BatchFilterResult(writeOriginalBatch, containsTombstones, maxOffset); + } + } + + private static class BatchFilterResult { + private final boolean writeOriginalBatch; + private final boolean containsTombstones; + private final long maxOffset; + private BatchFilterResult(final boolean writeOriginalBatch, + final boolean containsTombstones, + final long maxOffset) { + this.writeOriginalBatch = writeOriginalBatch; + this.containsTombstones = containsTombstones; + this.maxOffset = maxOffset; + } + } + private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch, List retainedRecords, - ByteBufferOutputStream bufferOutputStream) { + ByteBufferOutputStream bufferOutputStream, + final long deleteHorizonMs) { byte magic = originalBatch.magic(); TimestampType timestampType = originalBatch.timestampType(); long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? @@ -252,7 +303,7 @@ private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch origina MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream, magic, originalBatch.compressionType(), timestampType, baseOffset, logAppendTime, originalBatch.producerId(), originalBatch.producerEpoch(), originalBatch.baseSequence(), originalBatch.isTransactional(), - originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit()); + originalBatch.isControlBatch(), originalBatch.partitionLeaderEpoch(), bufferOutputStream.limit(), deleteHorizonMs); for (Record record : retainedRecords) builder.append(record); @@ -303,6 +354,24 @@ public int hashCode() { } public static abstract class RecordFilter { + public final long currentTime; + public final long deleteRetentionMs; + + public RecordFilter(final long currentTime, final long deleteRetentionMs) { + this.currentTime = currentTime; + this.deleteRetentionMs = deleteRetentionMs; + } + + public static class BatchRetentionResult { + public final BatchRetention batchRetention; + public final boolean containsMarkerForEmptyTxn; + public BatchRetentionResult(final BatchRetention batchRetention, + final boolean containsMarkerForEmptyTxn) { + this.batchRetention = batchRetention; + this.containsMarkerForEmptyTxn = containsMarkerForEmptyTxn; + } + } + public enum BatchRetention { DELETE, // Delete the batch without inspecting records RETAIN_EMPTY, // Retain the batch even if it is empty @@ -313,7 +382,7 @@ public enum BatchRetention { * Check whether the full batch can be discarded (i.e. whether we even need to * check the records individually). */ - protected abstract BatchRetention checkBatchRetention(RecordBatch batch); + protected abstract BatchRetentionResult checkBatchRetention(RecordBatch batch); /** * Check whether a record should be retained in the log. Note that {@link #checkBatchRetention(RecordBatch)} diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 48e1ec8f7d9e2..b825a937e084b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -80,9 +80,10 @@ public void write(int b) { private int numRecords = 0; private float actualCompressionRatio = 1; private long maxTimestamp = RecordBatch.NO_TIMESTAMP; + private long deleteHorizonMs; private long offsetOfMaxTimestamp = -1; private Long lastOffset = null; - private Long firstTimestamp = null; + private Long baseTimestamp = null; private MemoryRecords builtRecords; private boolean aborted = false; @@ -99,7 +100,8 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, boolean isTransactional, boolean isControlBatch, int partitionLeaderEpoch, - int writeLimit) { + int writeLimit, + long deleteHorizonMs) { if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE) throw new IllegalArgumentException("TimestampType must be set for magic >= 0"); if (magic < RecordBatch.MAGIC_VALUE_V2) { @@ -109,6 +111,8 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, throw new IllegalArgumentException("Control records are not supported for magic " + magic); if (compressionType == CompressionType.ZSTD) throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic); + if (deleteHorizonMs != RecordBatch.NO_TIMESTAMP) + throw new IllegalArgumentException("Delete horizon timestamp is not supported for magic " + magic); } this.magic = magic; @@ -125,6 +129,7 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, this.baseSequence = baseSequence; this.isTransactional = isTransactional; this.isControlBatch = isControlBatch; + this.deleteHorizonMs = deleteHorizonMs; this.partitionLeaderEpoch = partitionLeaderEpoch; this.writeLimit = writeLimit; this.initialPosition = bufferStream.position(); @@ -133,6 +138,28 @@ public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, bufferStream.position(initialPosition + batchHeaderSizeInBytes); this.bufferStream = bufferStream; this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); + + if (hasDeleteHorizonMs()) { + this.baseTimestamp = deleteHorizonMs; + } + } + + public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, + byte magic, + CompressionType compressionType, + TimestampType timestampType, + long baseOffset, + long logAppendTime, + long producerId, + short producerEpoch, + int baseSequence, + boolean isTransactional, + boolean isControlBatch, + int partitionLeaderEpoch, + int writeLimit) { + this(bufferStream, magic, compressionType, timestampType, baseOffset, logAppendTime, producerId, + producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, writeLimit, + RecordBatch.NO_TIMESTAMP); } /** @@ -197,6 +224,10 @@ public boolean isTransactional() { return isTransactional; } + public boolean hasDeleteHorizonMs() { + return magic >= RecordBatch.MAGIC_VALUE_V2 && deleteHorizonMs >= 0L; + } + /** * Close this builder and return the resulting buffer. * @return The built log buffer @@ -369,8 +400,8 @@ private int writeDefaultBatchHeader() { maxTimestamp = this.maxTimestamp; DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType, - firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, - partitionLeaderEpoch, numRecords); + baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, + hasDeleteHorizonMs(), partitionLeaderEpoch, numRecords); buffer.position(pos); return writtenCompressed; @@ -416,8 +447,8 @@ private void appendWithOffset(long offset, boolean isControlRecord, long timesta if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new IllegalArgumentException("Magic v" + magic + " does not support record headers"); - if (firstTimestamp == null) - firstTimestamp = timestamp; + if (baseTimestamp == null) + baseTimestamp = timestamp; if (magic > RecordBatch.MAGIC_VALUE_V1) { appendDefaultRecord(offset, timestamp, key, value, headers); @@ -624,12 +655,12 @@ public void appendUncheckedWithOffset(long offset, SimpleRecord record) throws I if (magic >= RecordBatch.MAGIC_VALUE_V2) { int offsetDelta = (int) (offset - baseOffset); long timestamp = record.timestamp(); - if (firstTimestamp == null) - firstTimestamp = timestamp; + if (baseTimestamp == null) + baseTimestamp = timestamp; int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, - timestamp - firstTimestamp, + timestamp - baseTimestamp, record.key(), record.value(), record.headers()); @@ -683,7 +714,7 @@ private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, By Header[] headers) throws IOException { ensureOpenForRecordAppend(); int offsetDelta = (int) (offset - baseOffset); - long timestampDelta = timestamp - firstTimestamp; + long timestampDelta = timestamp - baseTimestamp; int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); recordWritten(offset, timestamp, sizeInBytes); } @@ -788,7 +819,7 @@ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Head recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value); } else { int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1); - long timestampDelta = firstTimestamp == null ? 0 : timestamp - firstTimestamp; + long timestampDelta = baseTimestamp == null ? 0 : timestamp - baseTimestamp; recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 1cff7a238906d..7d231c1774367 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.OptionalLong; /** * A record batch is a container for records. In old versions of the record format (versions 0 and 1), @@ -211,6 +212,12 @@ public interface RecordBatch extends Iterable { */ boolean isTransactional(); + /** + * Get the delete horizon, returns OptionalLong.EMPTY if the first timestamp is not the delete horizon + * @return timestamp of the delete horizon + */ + OptionalLong deleteHorizonMs(); + /** * Get the partition leader epoch of this record batch. * @return The leader epoch or -1 if it is unknown diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 5e96cbbdcdf5b..7b9f33d8612eb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -3147,10 +3147,10 @@ public void testUpdatePositionWithLastRecordMissingFromBatch() { new SimpleRecord(null, "value".getBytes())); // Remove the last record to simulate compaction - MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter() { + MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter(0, 0) { @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { - return BatchRetention.DELETE_EMPTY; + protected BatchRetentionResult checkBatchRetention(RecordBatch batch) { + return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false); } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java index 065f1b51b39aa..0864a2dd9f347 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java @@ -67,7 +67,7 @@ public void testWriteEmptyHeader() { assertEquals(isTransactional, batch.isTransactional()); assertEquals(timestampType, batch.timestampType()); assertEquals(timestamp, batch.maxTimestamp()); - assertEquals(RecordBatch.NO_TIMESTAMP, batch.firstTimestamp()); + assertEquals(RecordBatch.NO_TIMESTAMP, batch.baseTimestamp()); assertEquals(isControlBatch, batch.isControlBatch()); } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 5dedf66f36d32..4f3f03c3f2d21 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -19,6 +19,9 @@ import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.LeaderChangeMessage.Voter; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; @@ -35,6 +38,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import java.util.Random; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -755,6 +759,48 @@ else if (iterations > 2 && memUsed < (iterations - 2) * 1024) assertTrue(iterations < 100, "Memory usage too high: " + memUsed); } + @ParameterizedTest + @ArgumentsSource(V2MemoryRecordsBuilderArgumentsProvider.class) + public void testRecordTimestampsWithDeleteHorizon(Args args) { + long deleteHorizon = 100; + int payloadLen = 1024 * 1024; + ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2); + ByteBufferOutputStream byteBufferOutputStream = new ByteBufferOutputStream(buffer); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(byteBufferOutputStream, args.magic, args.compressionType, + TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, 0, deleteHorizon); + + builder.append(50L, "0".getBytes(), "0".getBytes()); + builder.append(100L, "1".getBytes(), null); + builder.append(150L, "2".getBytes(), "2".getBytes()); + + MemoryRecords records = builder.build(); + List batches = TestUtils.toList(records.batches()); + assertEquals(OptionalLong.of(deleteHorizon), batches.get(0).deleteHorizonMs()); + + CloseableIterator recordIterator = batches.get(0).streamingIterator(BufferSupplier.create()); + Record record = recordIterator.next(); + assertEquals(50L, record.timestamp()); + record = recordIterator.next(); + assertEquals(100L, record.timestamp()); + record = recordIterator.next(); + assertEquals(150L, record.timestamp()); + recordIterator.close(); + } + + private static class V2MemoryRecordsBuilderArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext context) { + List values = new ArrayList<>(); + for (int bufferOffset : Arrays.asList(0, 15)) + for (CompressionType type: CompressionType.values()) { + values.add(Arguments.of(new Args(bufferOffset, type, MAGIC_VALUE_V2))); + } + return values.stream(); + } + } + private void verifyRecordsProcessingStats(CompressionType compressionType, RecordConversionStats processingStats, int numRecords, int numRecordsConverted, long finalBytes, long preConvertedBytes) { diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 16a51e6e3dd17..3f0195bf5d149 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -21,8 +21,10 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.LeaderChangeMessage.Voter; +import org.apache.kafka.common.record.MemoryRecords.RecordFilter; import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; @@ -37,6 +39,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import java.util.function.BiFunction; import java.util.function.Supplier; import java.util.stream.Stream; @@ -101,6 +104,18 @@ public Stream provideArguments(ExtensionContext context) { } } + private static class V2MemoryRecordsArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext context) { + List arguments = new ArrayList<>(); + for (long firstOffset : asList(0L, 57L)) + for (CompressionType type: CompressionType.values()) { + arguments.add(Arguments.of(new Args(RecordBatch.MAGIC_VALUE_V2, firstOffset, type))); + } + return arguments.stream(); + } + } + private final long logAppendTime = System.currentTimeMillis(); @ParameterizedTest @@ -316,11 +331,11 @@ public void testFilterToEmptyBatchRetention(Args args) { MemoryRecords records = builder.build(); ByteBuffer filtered = ByteBuffer.allocate(2048); MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), - new MemoryRecords.RecordFilter() { + new MemoryRecords.RecordFilter(0, 0) { @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { + protected BatchRetentionResult checkBatchRetention(RecordBatch batch) { // retain all batches - return BatchRetention.RETAIN_EMPTY; + return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false); } @Override @@ -378,11 +393,11 @@ public void testEmptyBatchRetention() { ByteBuffer filtered = ByteBuffer.allocate(2048); MemoryRecords records = MemoryRecords.readableRecords(buffer); MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), - new MemoryRecords.RecordFilter() { + new MemoryRecords.RecordFilter(0, 0) { @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { + protected BatchRetentionResult checkBatchRetention(RecordBatch batch) { // retain all batches - return BatchRetention.RETAIN_EMPTY; + return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false); } @Override @@ -426,10 +441,10 @@ public void testEmptyBatchDeletion() { ByteBuffer filtered = ByteBuffer.allocate(2048); MemoryRecords records = MemoryRecords.readableRecords(buffer); MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0), - new MemoryRecords.RecordFilter() { + new MemoryRecords.RecordFilter(0, 0) { @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { - return deleteRetention; + protected BatchRetentionResult checkBatchRetention(RecordBatch batch) { + return new BatchRetentionResult(deleteRetention, false); } @Override @@ -483,6 +498,53 @@ public void testBuildEndTxnMarker() { assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch()); } + /** + * This test is used to see if the base timestamp of the batch has been successfully + * converted to a delete horizon for the tombstones / transaction markers of the batch. + * It also verifies that the record timestamps remain correct as a delta relative to the delete horizon. + */ + @ParameterizedTest + @ArgumentsSource(V2MemoryRecordsArgumentsProvider.class) + public void testBaseTimestampToDeleteHorizonConversion(Args args) { + int partitionLeaderEpoch = 998; + ByteBuffer buffer = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, + 0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch); + builder.append(5L, "0".getBytes(), "0".getBytes()); + builder.append(10L, "1".getBytes(), null); + builder.append(15L, "2".getBytes(), "2".getBytes()); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + final long deleteHorizon = Integer.MAX_VALUE / 2; + final RecordFilter recordFilter = new MemoryRecords.RecordFilter(deleteHorizon - 1, 1) { + @Override + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { + return true; + } + + @Override + protected BatchRetentionResult checkBatchRetention(RecordBatch batch) { + return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false); + } + }; + builder.build().filterTo(new TopicPartition("random", 0), recordFilter, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + + List batches = TestUtils.toList(filteredRecords.batches()); + assertEquals(1, batches.size()); + assertEquals(OptionalLong.of(deleteHorizon), batches.get(0).deleteHorizonMs()); + + CloseableIterator recordIterator = batches.get(0).streamingIterator(BufferSupplier.create()); + Record record = recordIterator.next(); + assertEquals(5L, record.timestamp()); + record = recordIterator.next(); + assertEquals(10L, record.timestamp()); + record = recordIterator.next(); + assertEquals(15L, record.timestamp()); + recordIterator.close(); + } + @Test public void testBuildLeaderChangeMessage() { final int leaderId = 5; @@ -554,13 +616,13 @@ public void testFilterToBatchDiscard(Args args) { buffer.flip(); ByteBuffer filtered = ByteBuffer.allocate(2048); - MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() { + MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter(0, 0) { @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { + protected BatchRetentionResult checkBatchRetention(RecordBatch batch) { // discard the second and fourth batches if (batch.lastOffset() == 2L || batch.lastOffset() == 6L) - return BatchRetention.DELETE; - return BatchRetention.DELETE_EMPTY; + return new BatchRetentionResult(BatchRetention.DELETE, false); + return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false); } @Override @@ -1012,9 +1074,13 @@ public void testUnsupportedCompress() { } private static class RetainNonNullKeysFilter extends MemoryRecords.RecordFilter { + public RetainNonNullKeysFilter() { + super(0, 0); + } + @Override - protected BatchRetention checkBatchRetention(RecordBatch batch) { - return BatchRetention.DELETE_EMPTY; + protected BatchRetentionResult checkBatchRetention(RecordBatch batch) { + return new BatchRetentionResult(BatchRetention.DELETE_EMPTY, false); } @Override diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 1655dfb794555..baf937d6415eb 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -67,6 +67,8 @@ import scala.util.control.ControlThrowable * The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic * basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed). * Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning. + * This time is tracked by setting the base timestamp of a record batch with delete markers when the batch is recopied in the first cleaning that encounters + * it. The relative timestamps of the records in the batch are also modified when recopied in this cleaning according to the new base timestamp of the batch. * * Note that cleaning is more complicated with the idempotent/transactional producer capabilities. The following * are the key points: @@ -342,7 +344,8 @@ class LogCleaner(initialConfig: CleanerConfig, @throws(classOf[LogCleaningException]) private def cleanFilthiestLog(): Boolean = { val preCleanStats = new PreCleanStats() - val cleaned = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) match { + val ltc = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats) + val cleaned = ltc match { case None => false case Some(cleanable) => @@ -493,19 +496,20 @@ private[log] class Cleaner(val id: Int, * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { + doClean(cleanable, time.milliseconds()) + } + + private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = { + info("Beginning cleaning of log %s".format(cleanable.log.name)) + // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment - val deleteHorizonMs = + // this timestamp is only used on the older message formats older than MAGIC_VALUE_V2 + val legacyDeleteHorizonMs = cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs - } - - doClean(cleanable, deleteHorizonMs) - } - - private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = { - info("Beginning cleaning of log %s.".format(cleanable.log.name)) + } val log = cleanable.log val stats = new CleanerStats() @@ -522,13 +526,13 @@ private[log] class Cleaner(val id: Int, val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) // group the segments and clean the groups - info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) + info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to upper bound deletion horizon %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs))) val transactionMetadata = new CleanedTransactionMetadata val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset) for (group <- groupedSegments) - cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata) + cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs) // record buffer utilization stats.bufferUtilization = offsetMap.utilization @@ -544,17 +548,19 @@ private[log] class Cleaner(val id: Int, * @param log The log being cleaned * @param segments The group of segments being cleaned * @param map The offset map to use for cleaning segments - * @param deleteHorizonMs The time to retain delete tombstones + * @param currentTime The current time in milliseconds * @param stats Collector for cleaning statistics * @param transactionMetadata State of ongoing transactions which is carried between the cleaning * of the grouped segments + * @param legacyDeleteHorizonMs The delete horizon used for tombstones whose version is less than 2 */ private[log] def cleanSegments(log: UnifiedLog, segments: Seq[LogSegment], map: OffsetMap, - deleteHorizonMs: Long, + currentTime: Long, stats: CleanerStats, - transactionMetadata: CleanedTransactionMetadata): Unit = { + transactionMetadata: CleanedTransactionMetadata, + legacyDeleteHorizonMs: Long): Unit = { // create a new segment with a suffix appended to the name of the log and indexes val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset) transactionMetadata.cleanedIndex = Some(cleaned.txnIndex) @@ -574,14 +580,15 @@ private[log] class Cleaner(val id: Int, val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset) transactionMetadata.addAbortedTransactions(abortedTransactions) - val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs + val retainLegacyDeletesAndTxnMarkers = currentSegment.lastModified > legacyDeleteHorizonMs info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " + - s"with deletion horizon $deleteHorizonMs, " + - s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.") + s"with an upper bound deletion horizon $legacyDeleteHorizonMs computed from " + + s"the segment last modified time of ${currentSegment.lastModified}," + + s"${if(retainLegacyDeletesAndTxnMarkers) "retaining" else "discarding"} deletes.") try { - cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletesAndTxnMarkers, log.config.maxMessageSize, - transactionMetadata, lastOffsetOfActiveProducers, stats) + cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainLegacyDeletesAndTxnMarkers, log.config.deleteRetentionMs, + log.config.maxMessageSize, transactionMetadata, lastOffsetOfActiveProducers, stats, currentTime = currentTime) } catch { case e: LogSegmentOffsetOverflowException => // Split the current segment. It's also safest to abort the current cleaning process, so that we retry from @@ -622,26 +629,35 @@ private[log] class Cleaner(val id: Int, * @param sourceRecords The dirty log segment * @param dest The cleaned log segment * @param map The key=>offset mapping - * @param retainDeletesAndTxnMarkers Should tombstones and markers be retained while cleaning this segment + * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than version 2) and markers be retained while cleaning this segment + * @param deleteRetentionMs Defines how long a tombstone should be kept as defined by log configuration * @param maxLogMessageSize The maximum message size of the corresponding topic * @param stats Collector for cleaning statistics + * @param currentTime The time at which the clean was initiated */ private[log] def cleanInto(topicPartition: TopicPartition, sourceRecords: FileRecords, dest: LogSegment, map: OffsetMap, - retainDeletesAndTxnMarkers: Boolean, + retainLegacyDeletesAndTxnMarkers: Boolean, + deleteRetentionMs: Long, maxLogMessageSize: Int, transactionMetadata: CleanedTransactionMetadata, lastRecordsOfActiveProducers: Map[Long, LastRecord], - stats: CleanerStats): Unit = { - val logCleanerFilter: RecordFilter = new RecordFilter { + stats: CleanerStats, + currentTime: Long): Unit = { + val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) { var discardBatchRecords: Boolean = _ - override def checkBatchRetention(batch: RecordBatch): BatchRetention = { + override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. - discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletesAndTxnMarkers) + val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata) + + if (batch.isControlBatch) + discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime + else + discardBatchRecords = canDiscardBatch def isBatchLastRecordOfProducer: Boolean = { // We retain the batch in order to preserve the state of active producers. There are three cases: @@ -658,12 +674,14 @@ private[log] class Cleaner(val id: Int, } } - if (batch.hasProducerId && isBatchLastRecordOfProducer) - BatchRetention.RETAIN_EMPTY - else if (discardBatchRecords) - BatchRetention.DELETE - else - BatchRetention.DELETE_EMPTY + val batchRetention: BatchRetention = + if (batch.hasProducerId && isBatchLastRecordOfProducer) + BatchRetention.RETAIN_EMPTY + else if (discardBatchRecords) + BatchRetention.DELETE + else + BatchRetention.DELETE_EMPTY + new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch && batch.isControlBatch) } override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = { @@ -671,7 +689,7 @@ private[log] class Cleaner(val id: Int, // The batch is only retained to preserve producer sequence information; the records can be removed false else - Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats) + Cleaner.this.shouldRetainRecord(map, retainLegacyDeletesAndTxnMarkers, batch, record, stats, currentTime = currentTime) } } @@ -686,6 +704,7 @@ private[log] class Cleaner(val id: Int, val records = MemoryRecords.readableRecords(readBuffer) throttler.maybeThrottle(records.sizeInBytes) val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize, decompressionBufferSupplier) + stats.readMessages(result.messagesRead, result.bytesRead) stats.recopyMessages(result.messagesRetained, result.bytesRetained) @@ -747,22 +766,19 @@ private[log] class Cleaner(val id: Int, } private def shouldDiscardBatch(batch: RecordBatch, - transactionMetadata: CleanedTransactionMetadata, - retainTxnMarkers: Boolean): Boolean = { - if (batch.isControlBatch) { - val canDiscardControlBatch = transactionMetadata.onControlBatchRead(batch) - canDiscardControlBatch && !retainTxnMarkers - } else { - val canDiscardBatch = transactionMetadata.onBatchRead(batch) - canDiscardBatch - } + transactionMetadata: CleanedTransactionMetadata): Boolean = { + if (batch.isControlBatch) + transactionMetadata.onControlBatchRead(batch) + else + transactionMetadata.onBatchRead(batch) } private def shouldRetainRecord(map: kafka.log.OffsetMap, - retainDeletes: Boolean, + retainDeletesForLegacyRecords: Boolean, batch: RecordBatch, record: Record, - stats: CleanerStats): Boolean = { + stats: CleanerStats, + currentTime: Long): Boolean = { val pastLatestOffset = record.offset > map.latestOffset if (pastLatestOffset) return true @@ -776,7 +792,14 @@ private[log] class Cleaner(val id: Int, * 2) The message doesn't has value but it can't be deleted now. */ val latestOffsetForKey = record.offset() >= foundOffset - val isRetainedValue = record.hasValue || retainDeletes + val legacyRecord = batch.magic() < RecordBatch.MAGIC_VALUE_V2 + def shouldRetainDeletes = { + if (!legacyRecord) + !batch.deleteHorizonMs().isPresent || currentTime < batch.deleteHorizonMs().getAsLong + else + retainDeletesForLegacyRecords + } + val isRetainedValue = record.hasValue || shouldRetainDeletes latestOffsetForKey && isRetainedValue } else { stats.invalidMessage() @@ -1111,8 +1134,6 @@ private[log] class CleanedTransactionMetadata { case ControlRecordType.ABORT => ongoingAbortedTxns.remove(producerId) match { // Retain the marker until all batches from the transaction have been removed. - // We may retain a record from an aborted transaction if it is the last entry - // written by a given producerId. case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined => cleanedIndex.foreach(_.append(abortedTxnMetadata.abortedTxn)) false diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index c0030b12e5382..02d6a30ad2b0a 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -169,11 +169,11 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val now = time.milliseconds this.timeOfLastRun = now val lastClean = allCleanerCheckpoints + val dirtyLogs = logs.filter { - case (_, log) => log.config.compact // match logs that are marked as compacted + case (_, log) => log.config.compact }.filterNot { case (topicPartition, log) => - // skip any logs already in-progress and uncleanable partitions inProgress.contains(topicPartition) || isUncleanablePartition(log, topicPartition) }.map { case (topicPartition, log) => // create a LogToClean instance for each @@ -198,9 +198,10 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val cleanableLogs = dirtyLogs.filter { ltc => (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio } - if(cleanableLogs.isEmpty) { + + if (cleanableLogs.isEmpty) None - } else { + else { preCleanStats.recordCleanablePartitions(cleanableLogs.size) val filthiest = cleanableLogs.max inProgress.put(filthiest.topicPartition, LogCleaningInProgress) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 5a65fe3c58aa3..5f6a56d21feb0 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,6 +18,7 @@ package kafka.log import java.io.PrintWriter + import com.yammer.metrics.core.{Gauge, MetricName} import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import kafka.utils.{MockTime, TestUtils} @@ -186,7 +187,8 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K } } - private def writeKeyDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] = { + private def writeKeyDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType, timestamp: Long, + startValue: Int, step: Int): Seq[(Int, Int)] = { var valCounter = startValue for (_ <- 0 until numDups; key <- 0 until numKeys) yield { val curValue = valCounter diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index e20e661431e52..c4a71cc22cc97 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -195,7 +195,6 @@ class LogCleanerManagerTest extends Logging { cleanerManager.setCleaningState(tp2, LogCleaningInProgress) val filthiestLog: LogToClean = cleanerManager.grabFilthiestCompactedLog(time).get - assertEquals(tp1, filthiestLog.topicPartition) assertEquals(tp1, filthiestLog.log.topicPartition) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 0a95735378008..8f1d241a5d444 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -52,6 +52,8 @@ class LogCleanerTest { val logConfig = LogConfig(logProps) val time = new MockTime() val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) + val tombstoneRetentionMs = 86400000 + val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1 @AfterEach def teardown(): Unit = { @@ -84,8 +86,8 @@ class LogCleanerTest { val segments = log.logSegments.take(3).toSeq val stats = new CleanerStats() val expectedBytesRead = segments.map(_.size).sum - cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata) val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_)) + cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata, -1) assertEquals(shouldRemain, LogTestUtils.keysInLog(log)) assertEquals(expectedBytesRead, stats.bytesRead) } @@ -170,7 +172,7 @@ class LogCleanerTest { val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq val stats = new CleanerStats() cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats) - cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata) + cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata, -1) // Validate based on the file name that log segment file is renamed exactly once for async deletion assertEquals(expectedFileName, firstLogFile.file().getPath) @@ -365,7 +367,7 @@ class LogCleanerTest { log.roll() // cannot remove the marker in this pass because there are still valid records - var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1 assertEquals(List(1, 3, 2), LogTestUtils.keysInLog(log)) assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log)) @@ -374,17 +376,17 @@ class LogCleanerTest { log.roll() // the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction - dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1 assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log)) assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log)) - // delete horizon forced to 0 to verify marker is not removed early - dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1 + // clean again with same timestamp to verify marker is not removed early + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1 assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log)) assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log)) - // clean again with large delete horizon and verify the marker is removed - dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + // clean again with max timestamp to verify the marker is removed + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1 assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log)) assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log)) } @@ -413,11 +415,12 @@ class LogCleanerTest { log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator) log.roll() - cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue) + cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertEquals(List(2), LogTestUtils.keysInLog(log)) assertEquals(List(1, 3, 4), offsetsInLog(log)) - cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue) + // In the first pass, the deleteHorizon for {Producer2: Commit} is set. In the second pass, it's removed. + runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertEquals(List(2), LogTestUtils.keysInLog(log)) assertEquals(List(3, 4), offsetsInLog(log)) } @@ -454,14 +457,14 @@ class LogCleanerTest { // first time through the records are removed // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}] - var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1 assertEquals(List(2, 3), LogTestUtils.keysInLog(log)) assertEquals(List(4, 5, 6, 7), offsetsInLog(log)) assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log)) // the empty batch remains if cleaned again because it still holds the last sequence // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}] - dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1 assertEquals(List(2, 3), LogTestUtils.keysInLog(log)) assertEquals(List(4, 5, 6, 7), offsetsInLog(log)) assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log)) @@ -475,13 +478,15 @@ class LogCleanerTest { log.roll() // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}] - dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + // The deleteHorizon for {Producer2: Commit} is still not set yet. + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1 assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log)) assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log)) assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log)) // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}] - dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + // In the first pass, the deleteHorizon for {Producer2: Commit} is set. In the second pass, it's removed. + dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log)) assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log)) assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log)) @@ -506,14 +511,16 @@ class LogCleanerTest { // first time through the control batch is retained as an empty batch // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}] - var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + // In the first pass, the deleteHorizon for the commit marker is set. In the second pass, the commit marker is removed + // but the empty batch is retained for preserving the producer epoch. + var dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertEquals(List(2, 3), LogTestUtils.keysInLog(log)) assertEquals(List(1, 2), offsetsInLog(log)) assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log)) // the empty control batch does not cause an exception when cleaned // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}] - dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1 assertEquals(List(2, 3), LogTestUtils.keysInLog(log)) assertEquals(List(1, 2), offsetsInLog(log)) assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log)) @@ -537,7 +544,7 @@ class LogCleanerTest { log.roll() // Both the record and the marker should remain after cleaning - cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue) + runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertEquals(List(0, 1), offsetsInLog(log)) assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log)) } @@ -562,12 +569,12 @@ class LogCleanerTest { // Both the batch and the marker should remain after cleaning. The batch is retained // because it is the last entry for this producerId. The marker is retained because // there are still batches remaining from this transaction. - cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue) + cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertEquals(List(1), offsetsInLog(log)) assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log)) // The empty batch and the marker is still retained after a second cleaning. - cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue) + cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue) assertEquals(List(1), offsetsInLog(log)) assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log)) } @@ -591,13 +598,13 @@ class LogCleanerTest { log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.Coordinator) log.roll() - // delete horizon set to 0 to verify marker is not removed early - val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = 0L)._1 + // Aborted records are removed, but the abort marker is still preserved. + val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1 assertEquals(List(3), LogTestUtils.keysInLog(log)) assertEquals(List(3, 4, 5), offsetsInLog(log)) - // clean again with large delete horizon and verify the marker is removed - cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue) + // In the first pass, the delete horizon for the abort marker is set. In the second pass, the abort marker is removed. + runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertEquals(List(3), LogTestUtils.keysInLog(log)) assertEquals(List(4, 5), offsetsInLog(log)) } @@ -633,12 +640,12 @@ class LogCleanerTest { // Both transactional batches will be cleaned. The last one will remain in the log // as an empty batch in order to preserve the producer sequence number and epoch - cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue) + cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertEquals(List(1, 3, 4, 5), offsetsInLog(log)) assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log)) - // On the second round of cleaning, the marker from the first transaction should be removed. - cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue) + // In the first pass, the delete horizon for the first marker is set. In the second pass, the first marker is removed. + runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertEquals(List(3, 4, 5), offsetsInLog(log)) assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log)) } @@ -670,14 +677,14 @@ class LogCleanerTest { assertAbortedTransactionIndexed() // first time through the records are removed - var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1 assertAbortedTransactionIndexed() assertEquals(List(), LogTestUtils.keysInLog(log)) assertEquals(List(2), offsetsInLog(log)) // abort marker is retained assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is retained // the empty batch remains if cleaned again because it still holds the last sequence - dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertAbortedTransactionIndexed() assertEquals(List(), LogTestUtils.keysInLog(log)) assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained @@ -687,13 +694,14 @@ class LogCleanerTest { appendProducer(Seq(1)) log.roll() - dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1 assertAbortedTransactionIndexed() assertEquals(List(1), LogTestUtils.keysInLog(log)) assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet gone because we read the empty batch assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not preserve the empty batch - dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), deleteHorizonMs = Long.MaxValue)._1 + // In the first pass, the delete horizon for the abort marker is set. In the second pass, the abort marker is removed. + dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp) assertEquals(List(1), LogTestUtils.keysInLog(log)) assertEquals(List(3), offsetsInLog(log)) // abort marker is gone assertEquals(List(3), lastOffsetsPerBatchInLog(log)) @@ -728,7 +736,7 @@ class LogCleanerTest { // clean the log val stats = new CleanerStats() - cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata) + cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata, -1) val shouldRemain = LogTestUtils.keysInLog(log).filter(!keys.contains(_)) assertEquals(shouldRemain, LogTestUtils.keysInLog(log)) } @@ -741,7 +749,7 @@ class LogCleanerTest { val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024) val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) - cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata) + cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1) val shouldRemain = LogTestUtils.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString)) assertEquals(shouldRemain, LogTestUtils.keysInLog(log)) } @@ -760,7 +768,7 @@ class LogCleanerTest { val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) assertThrows(classOf[CorruptRecordException], () => - cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata) + cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1) ) } @@ -777,7 +785,7 @@ class LogCleanerTest { val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) assertThrows(classOf[CorruptRecordException], () => - cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata) + cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1) ) } @@ -1112,7 +1120,7 @@ class LogCleanerTest { keys.foreach(k => map.put(key(k), Long.MaxValue)) assertThrows(classOf[LogCleaningAbortedException], () => cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats(), - new CleanedTransactionMetadata) + new CleanedTransactionMetadata, -1) ) } @@ -1372,7 +1380,7 @@ class LogCleanerTest { // Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort. assertThrows(classOf[LogCleaningAbortedException], () => cleaner.cleanSegments(log, Seq(segmentWithOverflow), offsetMap, 0L, new CleanerStats(), - new CleanedTransactionMetadata) + new CleanedTransactionMetadata, -1) ) assertEquals(numSegmentsInitial + 1, log.logSegments.size) assertEquals(allKeys, LogTestUtils.keysInLog(log)) @@ -1381,7 +1389,7 @@ class LogCleanerTest { // Clean each segment now that split is complete. for (segmentToClean <- log.logSegments) cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats(), - new CleanedTransactionMetadata) + new CleanedTransactionMetadata, -1) assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log)) assertFalse(LogTestUtils.hasOffsetOverflow(log)) log.close() @@ -1422,7 +1430,7 @@ class LogCleanerTest { // clean the log cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), - new CleanedTransactionMetadata) + new CleanedTransactionMetadata, -1) // clear scheduler so that async deletes don't run time.scheduler.clear() var cleanedKeys = LogTestUtils.keysInLog(log) @@ -1438,7 +1446,7 @@ class LogCleanerTest { // clean again cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), - new CleanedTransactionMetadata) + new CleanedTransactionMetadata, -1) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTestUtils.keysInLog(log) @@ -1455,7 +1463,7 @@ class LogCleanerTest { // clean again cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), - new CleanedTransactionMetadata) + new CleanedTransactionMetadata, -1) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTestUtils.keysInLog(log) @@ -1477,7 +1485,7 @@ class LogCleanerTest { for (k <- 1 until messageCount by 2) offsetMap.put(key(k), Long.MaxValue) cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), - new CleanedTransactionMetadata) + new CleanedTransactionMetadata, -1) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTestUtils.keysInLog(log) @@ -1495,7 +1503,7 @@ class LogCleanerTest { for (k <- 1 until messageCount by 2) offsetMap.put(key(k), Long.MaxValue) cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), - new CleanedTransactionMetadata) + new CleanedTransactionMetadata, -1) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTestUtils.keysInLog(log) @@ -1513,7 +1521,7 @@ class LogCleanerTest { for (k <- 1 until messageCount by 2) offsetMap.put(key(k), Long.MaxValue) cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), - new CleanedTransactionMetadata) + new CleanedTransactionMetadata, -1) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTestUtils.keysInLog(log) @@ -1654,6 +1662,7 @@ class LogCleanerTest { key = "0".getBytes, timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000), leaderEpoch = 0) log.roll() + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset)) assertEquals(1, log.logSegments.head.log.batches.iterator.next().lastOffset, "The tombstone should be retained.") @@ -1854,6 +1863,19 @@ class LogCleanerTest { private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): UnifiedLog = { LogTestUtils.recoverAndCheck(dir, config, expectedKeys, new BrokerTopicStats(), time, time.scheduler) } + + /** + * We need to run a two pass clean to perform the following steps to stimulate a proper clean: + * 1. On the first run, set the delete horizon in the batches with tombstone or markers with empty txn records. + * 2. For the second pass, we will advance the current time by tombstoneRetentionMs, which will cause the + * tombstones to expire, leading to their prompt removal from the log. + * Returns the first dirty offset in the log as a result of the second cleaning. + */ + private def runTwoPassClean(cleaner: Cleaner, logToClean: LogToClean, currentTime: Long, + tombstoneRetentionMs: Long = 86400000) : Long = { + cleaner.doClean(logToClean, currentTime) + cleaner.doClean(logToClean, currentTime + tombstoneRetentionMs + 1)._1 + } } class FakeOffsetMap(val slots: Int) extends OffsetMap { diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index af585bfd49605..4275684230736 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -1521,7 +1521,7 @@ class LogValidatorTest { def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = { batch match { case b: DefaultRecordBatch => - assertEquals(expected, b.firstTimestamp, s"Unexpected base timestamp of batch $batch") + assertEquals(expected, b.baseTimestamp, s"Unexpected base timestamp of batch $batch") case _ => // no-op } } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 79671341acc6c..be63413316b76 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -33,7 +33,6 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.MemoryRecords.RecordFilter -import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} @@ -749,8 +748,9 @@ class UnifiedLogTest { records.batches.forEach(_.setPartitionLeaderEpoch(0)) val filtered = ByteBuffer.allocate(2048) - records.filterTo(new TopicPartition("foo", 0), new RecordFilter { - override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY + records.filterTo(new TopicPartition("foo", 0), new RecordFilter(0, 0) { + override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = + new RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.DELETE_EMPTY, false) override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) filtered.flip() @@ -790,8 +790,9 @@ class UnifiedLogTest { records.batches.forEach(_.setPartitionLeaderEpoch(0)) val filtered = ByteBuffer.allocate(2048) - records.filterTo(new TopicPartition("foo", 0), new RecordFilter { - override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.RETAIN_EMPTY + records.filterTo(new TopicPartition("foo", 0), new RecordFilter(0, 0) { + override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = + new RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.RETAIN_EMPTY, true) override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = false }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) filtered.flip() @@ -833,8 +834,9 @@ class UnifiedLogTest { records.batches.forEach(_.setPartitionLeaderEpoch(0)) val filtered = ByteBuffer.allocate(2048) - records.filterTo(new TopicPartition("foo", 0), new RecordFilter { - override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY + records.filterTo(new TopicPartition("foo", 0), new RecordFilter(0, 0) { + override def checkBatchRetention(batch: RecordBatch): RecordFilter.BatchRetentionResult = + new RecordFilter.BatchRetentionResult(RecordFilter.BatchRetention.DELETE_EMPTY, false) override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) filtered.flip() diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java index d83ad4459b601..982040b84ee51 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java @@ -256,6 +256,7 @@ private void writeDefaultBatchHeader() { RecordBatch.NO_SEQUENCE, false, isControlBatch, + false, leaderEpoch, numRecords() );