Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KAFKA-8522] Streamline tombstone and transaction marker removal #7884

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
fdf7095
[KAFKA-8522] Streamline tombstone and transaction marker removal
ConcurrencyPractitioner Jan 2, 2020
f7dcaac
Fixing stuff
ConcurrencyPractitioner Jan 2, 2020
2ab16af
Fixing stuff
ConcurrencyPractitioner Jan 3, 2020
d36f776
Resolving some comments
ConcurrencyPractitioner Jan 15, 2020
3b2193b
Resolving remaining comments
ConcurrencyPractitioner Jan 15, 2020
dcc2f65
Adding two pass modification
ConcurrencyPractitioner Jan 26, 2020
a9d8c4d
Adding some last changes
ConcurrencyPractitioner Jan 29, 2020
dd9ca28
Adding stuff
ConcurrencyPractitioner Feb 12, 2020
1587462
Getting modified
ConcurrencyPractitioner Feb 12, 2020
a25187b
Delete total.out
ConcurrencyPractitioner Feb 12, 2020
f469515
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
15d9d8a
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
a3bc996
Delete LogCleaner.scala.orig
ConcurrencyPractitioner Feb 12, 2020
e00f37c
Delete diff.out
ConcurrencyPractitioner Feb 12, 2020
331ebad
Delete LogCleanerManager.scala.orig
ConcurrencyPractitioner Feb 12, 2020
4b12ebb
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
2fc90d3
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
041e867
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
9df0e70
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
613d17d
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
69b5240
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
ad4ff10
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
8c9b50d
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
de5d0a1
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
5b43e43
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
f665275
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
e570f6c
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
3c96d55
Delete .gitignore
ConcurrencyPractitioner Feb 12, 2020
a78c563
Delete org.apache.kafka.connect.rest.ConnectRestExtension
ConcurrencyPractitioner Feb 12, 2020
e287b49
Getting modified
ConcurrencyPractitioner Feb 12, 2020
ee67247
Last renaming
ConcurrencyPractitioner Feb 12, 2020
5bedf9c
Addressing some other comments
ConcurrencyPractitioner Feb 19, 2020
bd3e18f
Removing dead code
ConcurrencyPractitioner Feb 19, 2020
4515e7d
Resolving last comments
ConcurrencyPractitioner Feb 19, 2020
60d72d0
One liner
ConcurrencyPractitioner Feb 19, 2020
92530fe
Fixing broken test
ConcurrencyPractitioner Feb 19, 2020
8baa416
Adding changed files
ConcurrencyPractitioner Feb 21, 2020
6d011ed
Cleaning up messy code
ConcurrencyPractitioner Feb 26, 2020
6cc19fe
Adding some test modifications
ConcurrencyPractitioner Feb 27, 2020
8e6f9a2
Adding some further test modifications
ConcurrencyPractitioner Feb 28, 2020
d6dd028
Making some changes to test
ConcurrencyPractitioner Mar 1, 2020
9335c36
Making one liner
ConcurrencyPractitioner Mar 1, 2020
3541ea2
Adding last comments
ConcurrencyPractitioner Mar 7, 2020
a88dc20
Bumping checkstyle
ConcurrencyPractitioner Mar 7, 2020
bc3f867
Adding some spotbug fixes
ConcurrencyPractitioner Mar 8, 2020
6a1b3da
Adding some temporary log statements
ConcurrencyPractitioner Mar 9, 2020
d7491c1
Rolling back changes
ConcurrencyPractitioner Mar 10, 2020
06d9ff5
Addressing most comments
ConcurrencyPractitioner Mar 17, 2020
bed40ab
Addressing last comments
ConcurrencyPractitioner Mar 19, 2020
54cb56b
Create .asf.yml
ConcurrencyPractitioner Apr 20, 2020
81ba24c
Merge remote-tracking branch 'upstream/trunk' into KAFKA-8522
ConcurrencyPractitioner Apr 20, 2020
cadee72
Adding github whitelist
ConcurrencyPractitioner Apr 20, 2020
90877fc
Merge branch 'KAFKA-8522' of https://github.com/ConcurrencyPractition…
ConcurrencyPractitioner Apr 20, 2020
e694f13
Removing .yml file
ConcurrencyPractitioner Apr 20, 2020
a9316ad
Reverting unnecessary change
ConcurrencyPractitioner Apr 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
<module name="MethodLength"/>
<module name="ParameterNumber">
<!-- default is 8 -->
<property name="max" value="13"/>
<property name="max" value="14"/>
</module>
<module name="ClassDataAbstractionCoupling">
<!-- default is 7 -->
Expand All @@ -124,7 +124,7 @@
</module>
<module name="CyclomaticComplexity">
<!-- default is 10-->
<property name="max" value="16"/>
<property name="max" value="17"/>
</module>
<module name="JavaNCSS">
<!-- default is 50 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,16 @@ public LegacyRecord outerRecord() {
return record;
}

@Override
public long deleteHorizonMs() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we move these into AbstractLegacyRecordBatch?

return RecordBatch.NO_TIMESTAMP;
}

@Override
public boolean deleteHorizonSet() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we use hasDeleteHorizonMs. Another option would be to make deleteHorizonMs return an optional long.

return false;
}

@Override
public boolean equals(Object o) {
if (this == o)
Expand Down Expand Up @@ -468,6 +478,16 @@ public long offset() {
return buffer.getLong(OFFSET_OFFSET);
}

@Override
public long deleteHorizonMs() {
return RecordBatch.NO_TIMESTAMP;
}

@Override
public boolean deleteHorizonSet() {
return false;
}

@Override
public LegacyRecord outerRecord() {
return record;
Expand Down Expand Up @@ -557,6 +577,16 @@ public long baseOffset() {
return loadFullBatch().baseOffset();
}

@Override
public long deleteHorizonMs() {
return RecordBatch.NO_TIMESTAMP;
}

@Override
public boolean deleteHorizonSet() {
return false;
}

@Override
public long lastOffset() {
return offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,15 @@
* by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
* the previous value prior to becoming empty.
*
* The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
* the time for which tombstones / transaction markers needs to be removed. If it is true, then the first timestamp is
* the delete horizon, otherwise, it is merely the first timestamp of the record batch.
*
* The current attributes are given below:
*
* -------------------------------------------------------------------------------------------------
* | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* -------------------------------------------------------------------------------------------------
* ---------------------------------------------------------------------------------------------------------------------------
* | Unused (7-15) | Delete Horizon Flag (6) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* ---------------------------------------------------------------------------------------------------------------------------
*/
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
static final int BASE_OFFSET_OFFSET = 0;
Expand Down Expand Up @@ -128,6 +132,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private static final byte COMPRESSION_CODEC_MASK = 0x07;
private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
private static final int CONTROL_FLAG_MASK = 0x20;
private static final byte DELETE_HORIZON_FLAG_MASK = 0x40;
private static final byte TIMESTAMP_TYPE_MASK = 0x08;

private static final int MAX_SKIP_BUFFER_SIZE = 2048;
Expand Down Expand Up @@ -155,10 +160,15 @@ public void ensureValid() {
}

/**
* Get the timestamp of the first record in this batch. It is always the create time of the record even if the
* Get the timestamp of the first record in this batch. It is usually the create time of the record even if the
* timestamp type of the batch is log append time.
*
* @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
* There is the possibility that the first timestamp had been set to the delete horizon of the batch,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. It seems a bit brittle to rely on documentation for this. I'm considering if we should change names to better reflect this. For example, maybe we should call this baseTimestamp and add a new method for firstRecordTimestamp or something like that.

* in which case, the delete horizon will be returned instead.
*
* @return The first timestamp if the batch's delete horizon has not been set
* The delete horizon if the batch's delete horizon has been set
* {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
*/
public long firstTimestamp() {
return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
Expand Down Expand Up @@ -245,6 +255,18 @@ public boolean isTransactional() {
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
}

@Override
public boolean deleteHorizonSet() {
return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
}

@Override
public long deleteHorizonMs() {
if (deleteHorizonSet())
return firstTimestamp();
return RecordBatch.NO_TIMESTAMP;
}

@Override
public boolean isControlBatch() {
return (attributes() & CONTROL_FLAG_MASK) > 0;
Expand Down Expand Up @@ -360,7 +382,7 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) {
if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
return;

byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch());
byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), deleteHorizonSet());
buffer.putShort(ATTRIBUTES_OFFSET, attributes);
buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
long crc = computeChecksum();
Expand Down Expand Up @@ -407,7 +429,7 @@ public int hashCode() {
}

private static byte computeAttributes(CompressionType type, TimestampType timestampType,
boolean isTransactional, boolean isControl) {
boolean isTransactional, boolean isControl, boolean isDeleteHorizonSet) {
if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
"format v2 and above");
Expand All @@ -419,6 +441,8 @@ private static byte computeAttributes(CompressionType type, TimestampType timest
attributes |= COMPRESSION_CODEC_MASK & type.id;
if (timestampType == TimestampType.LOG_APPEND_TIME)
attributes |= TIMESTAMP_TYPE_MASK;
if (isDeleteHorizonSet)
attributes |= DELETE_HORIZON_FLAG_MASK;
return attributes;
}

Expand All @@ -435,9 +459,49 @@ public static void writeEmptyHeader(ByteBuffer buffer,
boolean isTransactional,
boolean isControlRecord) {
int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0);
}

public static void writeEmptyHeader(ByteBuffer buffer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems unused?

byte magic,
long producerId,
short producerEpoch,
int baseSequence,
long baseOffset,
long lastOffset,
int partitionLeaderEpoch,
TimestampType timestampType,
long timestamp,
boolean isTransactional,
boolean isControlRecord,
boolean isDeleteHorizonSet) {
int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0);
producerEpoch, baseSequence, isTransactional, isControlRecord, isDeleteHorizonSet, partitionLeaderEpoch, 0);
}

static void writeHeader(ByteBuffer buffer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems unused?

long baseOffset,
int lastOffsetDelta,
int sizeInBytes,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long firstTimestamp,
long maxTimestamp,
long producerId,
short epoch,
int sequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int numRecords) {
writeHeader(buffer, baseOffset, lastOffsetDelta, sizeInBytes, magic, compressionType,
timestampType, firstTimestamp, maxTimestamp, producerId, epoch, sequence,
isTransactional, isControlBatch, false, partitionLeaderEpoch, numRecords);
}

static void writeHeader(ByteBuffer buffer,
Expand All @@ -454,14 +518,15 @@ static void writeHeader(ByteBuffer buffer,
int sequence,
boolean isTransactional,
boolean isControlBatch,
boolean isDeleteHorizonSet,
int partitionLeaderEpoch,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
throw new IllegalArgumentException("Invalid magic value " + magic);
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);

short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);

int position = buffer.position();
buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
Expand Down Expand Up @@ -699,6 +764,18 @@ public boolean isTransactional() {
return loadBatchHeader().isTransactional();
}

@Override
public boolean deleteHorizonSet() {
return loadBatchHeader().deleteHorizonSet();
}

@Override
public long deleteHorizonMs() {
if (deleteHorizonSet())
return super.loadBatchHeader().deleteHorizonMs();
return RecordBatch.NO_TIMESTAMP;
}

@Override
public boolean isControlBatch() {
return loadBatchHeader().isControlBatch();
Expand Down
Loading