Skip to content

Commit

Permalink
MINOR: Update documentation and DumpLogSegments tool for addition of …
Browse files Browse the repository at this point in the history
…`deleteHorizonMs` in batch format (#11694)

This PR updates the documentation and tooling to match #10914, which added support for encoding `deleteHorizonMs` in the record batch schema. The changes include adding the new attribute and updating field names. We have also updated stale references to the old `FirstTimestamp` field in the code and comments. Finally, In the `DumpLogSegments` tool, when record batch information is printed, it will also include the value of `deleteHorizonMs` is (e.g. `OptionalLong.empty` or `OptionalLong[123456]`).

Reviewers: Vincent Jiang <[email protected]>, Kvicii <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
mattwong949 authored Feb 5, 2022
1 parent d4fb388 commit 17dcb80
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* CRC => Uint32
* Attributes => Int16
* LastOffsetDelta => Int32 // also serves as LastSequenceDelta
* FirstTimestamp => Int64
* BaseTimestamp => Int64
* MaxTimestamp => Int64
* ProducerId => Int64
* ProducerEpoch => Int16
Expand Down Expand Up @@ -82,19 +82,16 @@
* are retained only until either a new sequence number is written by the corresponding producer or the producerId
* is expired from lack of activity.
*
* There is no similar need to preserve the timestamp from the original batch after compaction. The FirstTimestamp
* field therefore always reflects the timestamp of the first record in the batch. If the batch is empty, the
* FirstTimestamp will be set to -1 (NO_TIMESTAMP).
* There is no similar need to preserve the timestamp from the original batch after compaction. The BaseTimestamp
* field therefore reflects the timestamp of the first record in the batch in most cases. If the batch is empty, the
* BaseTimestamp will be set to -1 (NO_TIMESTAMP). If the delete horizon flag is set to 1, the BaseTimestamp
* will be set to the time at which tombstone records and aborted transaction markers in the batch should be removed.
*
* Similarly, the MaxTimestamp field reflects the maximum timestamp of the current records if the timestamp type
* is CREATE_TIME. For LOG_APPEND_TIME, on the other hand, the MaxTimestamp field reflects the timestamp set
* by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
* the previous value prior to becoming empty.
*
* The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
* the time for which tombstones / transaction markers need to be removed. If it is true, then the first timestamp is
* the delete horizon, otherwise, it is merely the first timestamp of the record batch.
*
* The current attributes are given below:
*
* ---------------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -288,15 +285,15 @@ private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSuppli

return new StreamRecordIterator(inputStream) {
@Override
protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException {
return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, firstTimestamp, baseSequence, logAppendTime);
protected Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException {
return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, baseTimestamp, baseSequence, logAppendTime);
}
};
} else {
return new StreamRecordIterator(inputStream) {
@Override
protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException {
return DefaultRecord.readFrom(inputStream, baseOffset, firstTimestamp, baseSequence, logAppendTime);
protected Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException {
return DefaultRecord.readFrom(inputStream, baseOffset, baseTimestamp, baseSequence, logAppendTime);
}
};
}
Expand Down Expand Up @@ -470,7 +467,7 @@ public static void writeHeader(ByteBuffer buffer,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long firstTimestamp,
long baseTimestamp,
long maxTimestamp,
long producerId,
short epoch,
Expand All @@ -482,8 +479,8 @@ public static void writeHeader(ByteBuffer buffer,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
throw new IllegalArgumentException("Invalid magic value " + magic);
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);
if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp);

short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);

Expand All @@ -493,7 +490,7 @@ public static void writeHeader(ByteBuffer buffer,
buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
buffer.put(position + MAGIC_OFFSET, magic);
buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
buffer.putLong(position + BASE_TIMESTAMP_OFFSET, firstTimestamp);
buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp);
buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
Expand All @@ -519,13 +516,13 @@ public static int sizeInBytes(long baseOffset, Iterable<Record> records) {
return 0;

int size = RECORD_BATCH_OVERHEAD;
Long firstTimestamp = null;
Long baseTimestamp = null;
while (iterator.hasNext()) {
Record record = iterator.next();
int offsetDelta = (int) (record.offset() - baseOffset);
if (firstTimestamp == null)
firstTimestamp = record.timestamp();
long timestampDelta = record.timestamp() - firstTimestamp;
if (baseTimestamp == null)
baseTimestamp = record.timestamp();
long timestampDelta = record.timestamp() - baseTimestamp;
size += DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(),
record.headers());
}
Expand All @@ -539,12 +536,12 @@ public static int sizeInBytes(Iterable<SimpleRecord> records) {

int size = RECORD_BATCH_OVERHEAD;
int offsetDelta = 0;
Long firstTimestamp = null;
Long baseTimestamp = null;
while (iterator.hasNext()) {
SimpleRecord record = iterator.next();
if (firstTimestamp == null)
firstTimestamp = record.timestamp();
long timestampDelta = record.timestamp() - firstTimestamp;
if (baseTimestamp == null)
baseTimestamp = record.timestamp();
long timestampDelta = record.timestamp() - baseTimestamp;
size += DefaultRecord.sizeInBytes(offsetDelta++, timestampDelta, record.key(), record.value(),
record.headers());
}
Expand Down Expand Up @@ -633,7 +630,7 @@ private abstract class StreamRecordIterator extends RecordIterator {
this.inputStream = inputStream;
}

abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException;
abstract Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException;

@Override
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface MutableRecordBatch extends RecordBatch {
/**
* Set the max timestamp for this batch. When using log append time, this effectively overrides the individual
* timestamps of all the records contained in the batch. To avoid recompression, the record fields are not updated
* by this method, but clients ignore them if the timestamp time is log append time. Note that firstTimestamp is not
* by this method, but clients ignore them if the timestamp time is log append time. Note that baseTimestamp is not
* updated by this method.
*
* This typically requires re-computation of the batch's CRC.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/DumpLogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ object DumpLogSegments {
" baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence +
" producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch +
" partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional +
" isControl: " + batch.isControlBatch)
" isControl: " + batch.isControlBatch + " deleteHorizonMs: " + batch.deleteHorizonMs)
else
print("offset: " + batch.lastOffset)

Expand Down
10 changes: 7 additions & 3 deletions docs/implementation.html
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ <h4 class="anchor-heading"><a id="recordbatch" class="anchor-link"></a><a href="
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6~15: unused
bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
bit 7~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
Expand All @@ -65,9 +66,12 @@ <h4 class="anchor-heading"><a id="recordbatch" class="anchor-link"></a><a href="
<p>On compaction: unlike the older message formats, magic v2 and above preserves the first and last offset/sequence numbers from the original batch when the log is cleaned. This is required in order to be able to restore the
producer's state when the log is reloaded. If we did not retain the last sequence number, for example, then after a partition leader failure, the producer might see an OutOfSequence error. The base sequence number must
be preserved for duplicate checking (the broker checks incoming Produce requests for duplicates by verifying that the first and last sequence numbers of the incoming batch match the last from that producer). As a result,
it is possible to have empty batches in the log when all the records in the batch are cleaned but batch is still retained in order to preserve a producer's last sequence number. One oddity here is that the firstTimestamp
it is possible to have empty batches in the log when all the records in the batch are cleaned but batch is still retained in order to preserve a producer's last sequence number. One oddity here is that the baseTimestamp
field is not preserved during compaction, so it will change if the first record in the batch is compacted away.</p>

<p>Compaction may also modify the baseTimestamp if the record batch contains records with a null payload or aborted transaction markers. The baseTimestamp will be set to the timestamp of when those records should be deleted
with the delete horizon attribute bit also set.</p>

<h5 class="anchor-heading"><a id="controlbatch" class="anchor-link"></a><a href="#controlbatch">5.3.1.1 Control Batches</a></h5>
<p>A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.</p>
<p> The key of a control record conforms to the following schema: </p>
Expand Down

0 comments on commit 17dcb80

Please sign in to comment.