Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: deleteHorizonMs update to documentation and DumpLogSegments tool #11694

Merged
merged 3 commits into from
Feb 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* CRC => Uint32
* Attributes => Int16
* LastOffsetDelta => Int32 // also serves as LastSequenceDelta
* FirstTimestamp => Int64
* BaseTimestamp => Int64
* MaxTimestamp => Int64
* ProducerId => Int64
* ProducerEpoch => Int16
Expand Down Expand Up @@ -82,19 +82,16 @@
* are retained only until either a new sequence number is written by the corresponding producer or the producerId
* is expired from lack of activity.
*
* There is no similar need to preserve the timestamp from the original batch after compaction. The FirstTimestamp
* field therefore always reflects the timestamp of the first record in the batch. If the batch is empty, the
* FirstTimestamp will be set to -1 (NO_TIMESTAMP).
* There is no similar need to preserve the timestamp from the original batch after compaction. The BaseTimestamp
* field therefore reflects the timestamp of the first record in the batch in most cases. If the batch is empty, the
* BaseTimestamp will be set to -1 (NO_TIMESTAMP). If the delete horizon flag is set to 1, the BaseTimestamp
* will be set to the time at which tombstone records and aborted transaction markers in the batch should be removed.
*
* Similarly, the MaxTimestamp field reflects the maximum timestamp of the current records if the timestamp type
* is CREATE_TIME. For LOG_APPEND_TIME, on the other hand, the MaxTimestamp field reflects the timestamp set
* by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
* the previous value prior to becoming empty.
*
* The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
* the time for which tombstones / transaction markers need to be removed. If it is true, then the first timestamp is
* the delete horizon, otherwise, it is merely the first timestamp of the record batch.
*
* The current attributes are given below:
*
* ---------------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -288,15 +285,15 @@ private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSuppli

return new StreamRecordIterator(inputStream) {
@Override
protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException {
return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, firstTimestamp, baseSequence, logAppendTime);
protected Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException {
return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, baseTimestamp, baseSequence, logAppendTime);
}
};
} else {
return new StreamRecordIterator(inputStream) {
@Override
protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException {
return DefaultRecord.readFrom(inputStream, baseOffset, firstTimestamp, baseSequence, logAppendTime);
protected Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException {
return DefaultRecord.readFrom(inputStream, baseOffset, baseTimestamp, baseSequence, logAppendTime);
}
};
}
Expand Down Expand Up @@ -470,7 +467,7 @@ public static void writeHeader(ByteBuffer buffer,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long firstTimestamp,
long baseTimestamp,
long maxTimestamp,
long producerId,
short epoch,
Expand All @@ -482,8 +479,8 @@ public static void writeHeader(ByteBuffer buffer,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
throw new IllegalArgumentException("Invalid magic value " + magic);
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);
if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp);

short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);

Expand All @@ -493,7 +490,7 @@ public static void writeHeader(ByteBuffer buffer,
buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
buffer.put(position + MAGIC_OFFSET, magic);
buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
buffer.putLong(position + BASE_TIMESTAMP_OFFSET, firstTimestamp);
buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp);
buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
Expand All @@ -519,13 +516,13 @@ public static int sizeInBytes(long baseOffset, Iterable<Record> records) {
return 0;

int size = RECORD_BATCH_OVERHEAD;
Long firstTimestamp = null;
Long baseTimestamp = null;
while (iterator.hasNext()) {
Record record = iterator.next();
int offsetDelta = (int) (record.offset() - baseOffset);
if (firstTimestamp == null)
firstTimestamp = record.timestamp();
long timestampDelta = record.timestamp() - firstTimestamp;
if (baseTimestamp == null)
baseTimestamp = record.timestamp();
long timestampDelta = record.timestamp() - baseTimestamp;
size += DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(),
record.headers());
}
Expand All @@ -539,12 +536,12 @@ public static int sizeInBytes(Iterable<SimpleRecord> records) {

int size = RECORD_BATCH_OVERHEAD;
int offsetDelta = 0;
Long firstTimestamp = null;
Long baseTimestamp = null;
while (iterator.hasNext()) {
SimpleRecord record = iterator.next();
if (firstTimestamp == null)
firstTimestamp = record.timestamp();
long timestampDelta = record.timestamp() - firstTimestamp;
if (baseTimestamp == null)
baseTimestamp = record.timestamp();
long timestampDelta = record.timestamp() - baseTimestamp;
size += DefaultRecord.sizeInBytes(offsetDelta++, timestampDelta, record.key(), record.value(),
record.headers());
}
Expand Down Expand Up @@ -633,7 +630,7 @@ private abstract class StreamRecordIterator extends RecordIterator {
this.inputStream = inputStream;
}

abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException;
abstract Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException;

@Override
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface MutableRecordBatch extends RecordBatch {
/**
* Set the max timestamp for this batch. When using log append time, this effectively overrides the individual
* timestamps of all the records contained in the batch. To avoid recompression, the record fields are not updated
* by this method, but clients ignore them if the timestamp time is log append time. Note that firstTimestamp is not
* by this method, but clients ignore them if the timestamp time is log append time. Note that baseTimestamp is not
* updated by this method.
*
* This typically requires re-computation of the batch's CRC.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/DumpLogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ object DumpLogSegments {
" baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
" producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
" partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional +
" isControl: " + batch.isControlBatch)
" isControl: " + batch.isControlBatch + " deleteHorizonMs: " + batch.deleteHorizonMs)
else
print("offset: " + batch.lastOffset)

Expand Down
10 changes: 7 additions & 3 deletions docs/implementation.html
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ <h4 class="anchor-heading"><a id="recordbatch" class="anchor-link"></a><a href="
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6~15: unused
bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
bit 7~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
Expand All @@ -65,9 +66,12 @@ <h4 class="anchor-heading"><a id="recordbatch" class="anchor-link"></a><a href="
<p>On compaction: unlike the older message formats, magic v2 and above preserves the first and last offset/sequence numbers from the original batch when the log is cleaned. This is required in order to be able to restore the
producer's state when the log is reloaded. If we did not retain the last sequence number, for example, then after a partition leader failure, the producer might see an OutOfSequence error. The base sequence number must
be preserved for duplicate checking (the broker checks incoming Produce requests for duplicates by verifying that the first and last sequence numbers of the incoming batch match the last from that producer). As a result,
it is possible to have empty batches in the log when all the records in the batch are cleaned but batch is still retained in order to preserve a producer's last sequence number. One oddity here is that the firstTimestamp
it is possible to have empty batches in the log when all the records in the batch are cleaned but batch is still retained in order to preserve a producer's last sequence number. One oddity here is that the baseTimestamp
field is not preserved during compaction, so it will change if the first record in the batch is compacted away.</p>

<p>Compaction may also modify the baseTimestamp if the record batch contains records with a null payload or aborted transaction markers. The baseTimestamp will be set to the timestamp of when those records should be deleted
with the delete horizon attribute bit also set.</p>

<h5 class="anchor-heading"><a id="controlbatch" class="anchor-link"></a><a href="#controlbatch">5.3.1.1 Control Batches</a></h5>
<p>A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.</p>
<p> The key of a control record conforms to the following schema: </p>
Expand Down