Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KAFKA-8522] Streamline tombstone and transaction marker removal #10914

Merged
merged 26 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
96d128a
[KAKFA-8522] Streamline tombstone and transaction marker removal
tedyu Jan 15, 2021
6c105a0
Merge branch 'trunk' into trunk
ConcurrencyPractitioner Jan 20, 2021
0506bda
Merge branch 'trunk' into trunk
ConcurrencyPractitioner Jan 27, 2021
147803f
Merge branch 'trunk' of github.com:apache/kafka into trunk
mattwong949 Jun 22, 2021
e179f92
missed a couple fixes on the merge
mattwong949 Jun 22, 2021
44c50bd
Merge branch 'trunk' of github.com:apache/kafka into KAKFA-8522
mattwong949 Jul 7, 2021
8403960
fix nit, remove firstTimestamp(), revert checkstyle
mattwong949 Jul 8, 2021
6731c4d
suppress checkstyle
mattwong949 Jul 12, 2021
bb47b16
address comments - use OptionalLong, rename constants
mattwong949 Jul 12, 2021
c293392
improve test for correct timestamps as delta for delete horizon
mattwong949 Jul 12, 2021
798a5f7
address comments
mattwong949 Jul 29, 2021
3f4dc87
wip addressing comments
mattwong949 Aug 3, 2021
7a13fc1
Add tests to MemoryRecordsBuilder for direct deleteHorizon value set
mattwong949 Aug 3, 2021
9bc5cb6
add in latestDeleteHorizon again for review
mattwong949 Aug 4, 2021
6fa401d
Merge branch 'trunk' of github.com:apache/kafka into KAKFA-8522
mattwong949 Aug 18, 2021
cd77df9
Trigger build
mattwong949 Aug 18, 2021
d036c08
Merge branch 'trunk' of github.com:apache/kafka into KAKFA-8522
mattwong949 Sep 1, 2021
43698e0
address Jun's comments, log reason for compaction
mattwong949 Sep 2, 2021
05fea42
addressing comments
mattwong949 Sep 9, 2021
217624a
address Jason's comment on extra var in shouldRetainRecord
mattwong949 Sep 9, 2021
890c355
move legacyDeleteHorizonMs computation to doClean
mattwong949 Sep 9, 2021
9f69673
remove DeleteHorizon cleaning pass and LogCleaningReason
mattwong949 Sep 10, 2021
6f2f2b7
remove logic for delethorizon-triggered cleaning
mattwong949 Sep 10, 2021
2477f34
address last comments on code comments, readability, logging
mattwong949 Sep 10, 2021
b113c47
address Jun's comments
mattwong949 Sep 15, 2021
cf7b0a7
fix comment
mattwong949 Sep 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
<suppress checks="ParameterNumber"
files="DefaultRecordBatch.java"/>
<suppress checks="ParameterNumber"
files="Sender.java"/>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same suppression is on L54

files="MemoryRecordsBuilder.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
Expand All @@ -68,7 +68,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator).java"/>
files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.OptionalLong;

import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
Expand Down Expand Up @@ -213,6 +214,11 @@ public boolean isControlBatch() {
return false;
}

@Override
public OptionalLong deleteHorizonMs() {
return OptionalLong.empty();
}

/**
* Get an iterator for the nested entries contained within this batch. Note that
* if the batch is not compressed, then this method will return an iterator over the
Expand Down Expand Up @@ -464,6 +470,11 @@ public long offset() {
return buffer.getLong(OFFSET_OFFSET);
}

@Override
public OptionalLong deleteHorizonMs() {
return OptionalLong.empty();
}

@Override
public LegacyRecord outerRecord() {
return record;
Expand Down Expand Up @@ -553,6 +564,11 @@ public long baseOffset() {
return loadFullBatch().baseOffset();
}

@Override
public OptionalLong deleteHorizonMs() {
return OptionalLong.empty();
}

@Override
public long lastOffset() {
return offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.OptionalLong;

import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;

Expand Down Expand Up @@ -90,11 +91,15 @@
* by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
* the previous value prior to becoming empty.
*
* The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
* the time for which tombstones / transaction markers need to be removed. If it is true, then the first timestamp is
* the delete horizon, otherwise, it is merely the first timestamp of the record batch.
*
* The current attributes are given below:
*
* -------------------------------------------------------------------------------------------------
* | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* -------------------------------------------------------------------------------------------------
* ---------------------------------------------------------------------------------------------------------------------------
* | Unused (7-15) | Delete Horizon Flag (6) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* ---------------------------------------------------------------------------------------------------------------------------
*/
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
static final int BASE_OFFSET_OFFSET = 0;
Expand All @@ -111,9 +116,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
static final int ATTRIBUTE_LENGTH = 2;
public static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
static final int LAST_OFFSET_DELTA_LENGTH = 4;
static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
static final int FIRST_TIMESTAMP_LENGTH = 8;
static final int MAX_TIMESTAMP_OFFSET = FIRST_TIMESTAMP_OFFSET + FIRST_TIMESTAMP_LENGTH;
static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
static final int BASE_TIMESTAMP_LENGTH = 8;
static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH;
static final int MAX_TIMESTAMP_LENGTH = 8;
static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH;
static final int PRODUCER_ID_LENGTH = 8;
Expand All @@ -129,6 +134,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private static final byte COMPRESSION_CODEC_MASK = 0x07;
private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
private static final int CONTROL_FLAG_MASK = 0x20;
private static final byte DELETE_HORIZON_FLAG_MASK = 0x40;
private static final byte TIMESTAMP_TYPE_MASK = 0x08;

private static final int MAX_SKIP_BUFFER_SIZE = 2048;
Expand Down Expand Up @@ -156,13 +162,13 @@ public void ensureValid() {
}

/**
* Get the timestamp of the first record in this batch. It is always the create time of the record even if the
* timestamp type of the batch is log append time.
*
* @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
* Gets the base timestamp of the batch which is used to calculate the timestamp deltas.
mattwong949 marked this conversation as resolved.
Show resolved Hide resolved
*
* @return The base timestamp or
* {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
mattwong949 marked this conversation as resolved.
Show resolved Hide resolved
*/
public long firstTimestamp() {
return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
public long baseTimestamp() {
hachikuji marked this conversation as resolved.
Show resolved Hide resolved
return buffer.getLong(BASE_TIMESTAMP_OFFSET);
}

@Override
Expand Down Expand Up @@ -246,6 +252,18 @@ public boolean isTransactional() {
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
}

private boolean hasDeleteHorizonMs() {
return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
}

@Override
public OptionalLong deleteHorizonMs() {
if (hasDeleteHorizonMs())
return OptionalLong.of(buffer.getLong(BASE_TIMESTAMP_OFFSET));
else
return OptionalLong.empty();
}

@Override
public boolean isControlBatch() {
return (attributes() & CONTROL_FLAG_MASK) > 0;
Expand Down Expand Up @@ -290,9 +308,9 @@ private CloseableIterator<Record> uncompressedIterator() {
buffer.position(RECORDS_OFFSET);
return new RecordIterator() {
@Override
protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime);
return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (BufferUnderflowException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
}
Expand Down Expand Up @@ -364,7 +382,7 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) {
if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
return;

byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch());
byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), hasDeleteHorizonMs());
buffer.putShort(ATTRIBUTES_OFFSET, attributes);
buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
long crc = computeChecksum();
Expand Down Expand Up @@ -411,7 +429,7 @@ public int hashCode() {
}

private static byte computeAttributes(CompressionType type, TimestampType timestampType,
boolean isTransactional, boolean isControl) {
boolean isTransactional, boolean isControl, boolean isDeleteHorizonSet) {
if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
"format v2 and above");
Expand All @@ -423,6 +441,8 @@ private static byte computeAttributes(CompressionType type, TimestampType timest
attributes |= COMPRESSION_CODEC_MASK & type.id;
if (timestampType == TimestampType.LOG_APPEND_TIME)
attributes |= TIMESTAMP_TYPE_MASK;
if (isDeleteHorizonSet)
attributes |= DELETE_HORIZON_FLAG_MASK;
return attributes;
}

Expand All @@ -440,8 +460,8 @@ public static void writeEmptyHeader(ByteBuffer buffer,
boolean isControlRecord) {
int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0);
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0);
}

public static void writeHeader(ByteBuffer buffer,
Expand All @@ -458,22 +478,23 @@ public static void writeHeader(ByteBuffer buffer,
int sequence,
boolean isTransactional,
boolean isControlBatch,
boolean isDeleteHorizonSet,
int partitionLeaderEpoch,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
throw new IllegalArgumentException("Invalid magic value " + magic);
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);

short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);

int position = buffer.position();
buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
buffer.putInt(position + LENGTH_OFFSET, sizeInBytes - LOG_OVERHEAD);
buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
buffer.put(position + MAGIC_OFFSET, magic);
buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp);
buffer.putLong(position + BASE_TIMESTAMP_OFFSET, firstTimestamp);
buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
Expand Down Expand Up @@ -555,15 +576,15 @@ public static int decrementSequence(int sequence, int decrement) {
private abstract class RecordIterator implements CloseableIterator<Record> {
private final Long logAppendTime;
private final long baseOffset;
private final long firstTimestamp;
private final long baseTimestamp;
private final int baseSequence;
private final int numRecords;
private int readRecords = 0;

RecordIterator() {
this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
this.baseOffset = baseOffset();
this.firstTimestamp = firstTimestamp();
this.baseTimestamp = baseTimestamp();
this.baseSequence = baseSequence();
int numRecords = count();
if (numRecords < 0)
Expand All @@ -583,7 +604,7 @@ public Record next() {
throw new NoSuchElementException();

readRecords++;
Record rec = readNext(baseOffset, firstTimestamp, baseSequence, logAppendTime);
Record rec = readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime);
if (readRecords == numRecords) {
// Validate that the actual size of the batch is equal to declared size
// by checking that after reading declared number of items, there no items left
Expand All @@ -594,7 +615,7 @@ public Record next() {
return rec;
}

protected abstract Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime);
protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime);

protected abstract boolean ensureNoneRemaining();

Expand All @@ -616,9 +637,9 @@ private abstract class StreamRecordIterator extends RecordIterator {
abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException;

@Override
protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
return doReadRecord(baseOffset, firstTimestamp, baseSequence, logAppendTime);
return doReadRecord(baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (EOFException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
} catch (IOException e) {
Expand Down Expand Up @@ -705,6 +726,11 @@ public boolean isTransactional() {
return loadBatchHeader().isTransactional();
}

@Override
public OptionalLong deleteHorizonMs() {
return loadBatchHeader().deleteHorizonMs();
}

@Override
public boolean isControlBatch() {
return loadBatchHeader().isControlBatch();
Expand Down
Loading