diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 80747e14b528d..3475062105099 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -95,6 +95,8 @@
+
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 3da0937a62f00..ada85371d8ee9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -771,12 +771,13 @@ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch
boolean skippedRecords = false;
for (LogEntry entry : partition.records.entries()) {
for (LogRecord record : entry) {
- // Skip the messages earlier than current position.
- if (record.offset() >= position) {
+ // control records should not be returned to the user. also skip anything out of range
+ if (record.isControlRecord() || record.offset() < position) {
+ skippedRecords = true;
+ } else {
parsed.add(parseRecord(tp, entry, record));
bytes += record.sizeInBytes();
- } else
- skippedRecords = true;
+ }
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
new file mode 100644
index 0000000000000..2b3d5237a091a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+
+import java.nio.ByteBuffer;
+
+public enum ControlRecordType {
+ COMMIT((short) 0),
+ ABORT((short) 1),
+
+ // UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
+ UNKNOWN((short) -1);
+
+ private static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0;
+ private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new Schema(
+ new Field("version", Type.INT16),
+ new Field("type", Type.INT16));
+
+ private final short type;
+
+ ControlRecordType(short type) {
+ this.type = type;
+ }
+
+ public Struct recordKey() {
+ if (this == UNKNOWN)
+ throw new IllegalArgumentException("Cannot serialize UNKNOWN control record type");
+
+ Struct struct = new Struct(CONTROL_RECORD_KEY_SCHEMA_VERSION_V0);
+ struct.set("version", CURRENT_CONTROL_RECORD_KEY_VERSION);
+ struct.set("type", type);
+ return struct;
+ }
+
+ public static ControlRecordType parse(ByteBuffer key) {
+ short version = key.getShort(0);
+ if (version != CURRENT_CONTROL_RECORD_KEY_VERSION)
+ throw new IllegalArgumentException("Cannot parse control record key schema with version " + version);
+ short type = key.getShort(2);
+ switch (type) {
+ case 0:
+ return COMMIT;
+ case 1:
+ return ABORT;
+ default:
+ return UNKNOWN;
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/EosLogRecord.java b/clients/src/main/java/org/apache/kafka/common/record/EosLogRecord.java
index a0ea1a530beb2..c28924281cdcc 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/EosLogRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/EosLogRecord.java
@@ -51,6 +51,7 @@ public class EosLogRecord implements LogRecord {
private static final int MAX_RECORD_OVERHEAD = 21;
private static final int NULL_KEY_MASK = 0x01;
private static final int NULL_VALUE_MASK = 0x02;
+ private static final int CONTROL_FLAG_MASK = 0x04;
private final int sizeInBytes;
private final byte attributes;
@@ -149,6 +150,7 @@ public ByteBuffer value() {
}
public static long writeTo(DataOutputStream out,
+ boolean isControlRecord,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
@@ -156,7 +158,7 @@ public static long writeTo(DataOutputStream out,
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value);
ByteUtils.writeVarint(sizeInBytes, out);
- byte attributes = computeAttributes(key, value);
+ byte attributes = computeAttributes(isControlRecord, key, value);
out.write(attributes);
ByteUtils.writeVarlong(timestampDelta, out);
@@ -175,12 +177,13 @@ public static long writeTo(DataOutputStream out,
}
public static long writeTo(ByteBuffer out,
+ boolean isControlRecord,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value) {
try {
- return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta,
+ return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), isControlRecord, offsetDelta,
timestampDelta, key, value);
} catch (IOException e) {
// cannot actually be raised by ByteBufferOutputStream
@@ -224,6 +227,11 @@ public boolean hasTimestampType(TimestampType timestampType) {
return false;
}
+ @Override
+ public boolean isControlRecord() {
+ return (attributes & CONTROL_FLAG_MASK) != 0;
+ }
+
public static EosLogRecord readFrom(DataInputStream input,
long baseOffset,
long baseTimestamp,
@@ -280,8 +288,8 @@ private static EosLogRecord readFrom(ByteBuffer buffer,
return new EosLogRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value);
}
- private static byte computeAttributes(ByteBuffer key, ByteBuffer value) {
- byte attributes = 0;
+ private static byte computeAttributes(boolean isControlRecord, ByteBuffer key, ByteBuffer value) {
+ byte attributes = isControlRecord ? CONTROL_FLAG_MASK : (byte) 0;
if (key == null)
attributes |= NULL_KEY_MASK;
if (value == null)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LogRecord.java
index 3162a0fa9f9e9..cdf8ce382e802 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LogRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogRecord.java
@@ -119,7 +119,7 @@ public interface LogRecord {
boolean isCompressed();
/**
- * For versions prior to 2, the message contained a timestamp type attribute. This method can be
+ * For versions prior to 2, the record contained a timestamp type attribute. This method can be
* used to check whether the value of that attribute matches a particular timestamp type. For versions
* 2 and above, this will always be false.
* @param timestampType the timestamp type to compare
@@ -127,4 +127,12 @@ public interface LogRecord {
*/
boolean hasTimestampType(TimestampType timestampType);
+ /**
+ * Check whether this is a control record (i.e. whether the control bit is set in the record attributes).
+ * For magic versions prior to 2, this is always false.
+ *
+ * @return Whether this is a control record
+ */
+ boolean isControlRecord();
+
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 44f595cc01ab5..ba053000b25bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import java.io.DataOutputStream;
@@ -240,7 +241,7 @@ private void writeOldEntryCompressedWrapperHeader() {
* @param value The record value
* @return crc of the record
*/
- public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
+ public long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value) {
try {
if (lastOffset >= 0 && offset <= lastOffset)
throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
@@ -248,11 +249,14 @@ public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBu
if (timestamp < 0 && timestamp != LogEntry.NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
+ if (isControlRecord && magic < LogEntry.MAGIC_VALUE_V2)
+ throw new IllegalArgumentException("Magic v" + magic + " does not support control records");
+
if (baseTimestamp == null)
baseTimestamp = timestamp;
if (magic > LogEntry.MAGIC_VALUE_V1)
- return appendEosLogRecord(offset, timestamp, key, value);
+ return appendEosLogRecord(offset, isControlRecord, timestamp, key, value);
else
return appendOldLogRecord(offset, timestamp, key, value);
} catch (IOException e) {
@@ -261,13 +265,14 @@ public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBu
}
public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
- return appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value));
+ return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value));
}
- private long appendEosLogRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {
+ private long appendEosLogRecord(long offset, boolean isControlRecord, long timestamp,
+ ByteBuffer key, ByteBuffer value) throws IOException {
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
- long crc = EosLogRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value);
+ long crc = EosLogRecord.writeTo(appendStream, isControlRecord, offsetDelta, timestampDelta, key, value);
recordWritten(offset, timestamp, EosLogRecord.sizeInBytes(offsetDelta, timestamp, key, value));
return crc;
}
@@ -287,7 +292,11 @@ private long appendOldLogRecord(long offset, long timestamp, ByteBuffer key, Byt
}
public long append(long timestamp, ByteBuffer key, ByteBuffer value) {
- return appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value);
+ return appendWithOffset(nextSequentialOffset(), false, timestamp, key, value);
+ }
+
+ private long nextSequentialOffset() {
+ return lastOffset < 0 ? baseOffset : lastOffset + 1;
}
/**
@@ -306,8 +315,16 @@ public long append(KafkaRecord record) {
return append(record.timestamp(), record.key(), record.value());
}
+ public long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
+ Struct keyStruct = type.recordKey();
+ ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
+ keyStruct.writeTo(key);
+ key.flip();
+ return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value);
+ }
+
public long appendWithOffset(long offset, KafkaRecord record) {
- return appendWithOffset(offset, record.timestamp(), record.key(), record.value());
+ return appendWithOffset(offset, false, record.timestamp(), record.key(), record.value());
}
/**
@@ -334,7 +351,7 @@ public void appendUncheckedWithOffset(long offset, Record record) {
* @param record
*/
public void append(LogRecord record) {
- appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
+ appendWithOffset(record.offset(), record.isControlRecord(), record.timestamp(), record.key(), record.value());
}
/**
@@ -343,7 +360,7 @@ public void append(LogRecord record) {
* @param record
*/
public void appendWithOffset(long offset, LogRecord record) {
- appendWithOffset(offset, record.timestamp(), record.key(), record.value());
+ appendWithOffset(offset, record.isControlRecord(), record.timestamp(), record.key(), record.value());
}
/**
@@ -366,7 +383,7 @@ public void appendWithOffset(long offset, Record record) {
* @param record The record to add
*/
public void append(Record record) {
- appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, record);
+ appendWithOffset(nextSequentialOffset(), record);
}
private long toInnerOffset(long offset) {
diff --git a/clients/src/main/java/org/apache/kafka/common/record/OldLogEntry.java b/clients/src/main/java/org/apache/kafka/common/record/OldLogEntry.java
index 32736ebe71ac6..04aa69464db8d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/OldLogEntry.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/OldLogEntry.java
@@ -170,6 +170,11 @@ public int lastSequence() {
return LogEntry.NO_SEQUENCE;
}
+ @Override
+ public boolean isControlRecord() {
+ return false;
+ }
+
/**
* Get an iterator for the nested entries contained within this log entry. Note that
* if the entry is not compressed, then this method will return an iterator over the
diff --git a/clients/src/test/java/org/apache/kafka/common/record/EosLogRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/EosLogRecordTest.java
index f4c1ec859eb03..5b1834b62a937 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/EosLogRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/EosLogRecordTest.java
@@ -19,6 +19,7 @@
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -34,24 +35,27 @@ public void testBasicSerde() {
new KafkaRecord(null, null)
};
- for (KafkaRecord record : records) {
- int baseSequence = 723;
- long baseOffset = 37;
- int offsetDelta = 10;
- long baseTimestamp = System.currentTimeMillis();
- long timestampDelta = 323;
+ for (boolean isControlRecord : Arrays.asList(true, false)) {
+ for (KafkaRecord record : records) {
+ int baseSequence = 723;
+ long baseOffset = 37;
+ int offsetDelta = 10;
+ long baseTimestamp = System.currentTimeMillis();
+ long timestampDelta = 323;
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- EosLogRecord.writeTo(buffer, offsetDelta, timestampDelta, record.key(), record.value());
- buffer.flip();
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ EosLogRecord.writeTo(buffer, isControlRecord, offsetDelta, timestampDelta, record.key(), record.value());
+ buffer.flip();
- EosLogRecord logRecord = EosLogRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
- assertNotNull(logRecord);
- assertEquals(baseOffset + offsetDelta, logRecord.offset());
- assertEquals(baseSequence + offsetDelta, logRecord.sequence());
- assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp());
- assertEquals(record.key(), logRecord.key());
- assertEquals(record.value(), logRecord.value());
+ EosLogRecord logRecord = EosLogRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
+ assertNotNull(logRecord);
+ assertEquals(baseOffset + offsetDelta, logRecord.offset());
+ assertEquals(baseSequence + offsetDelta, logRecord.sequence());
+ assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp());
+ assertEquals(record.key(), logRecord.key());
+ assertEquals(record.value(), logRecord.value());
+ assertEquals(isControlRecord, logRecord.isControlRecord());
+ }
}
}
@@ -65,7 +69,7 @@ public void testSerdeNoSequence() {
long timestampDelta = 323;
ByteBuffer buffer = ByteBuffer.allocate(1024);
- EosLogRecord.writeTo(buffer, offsetDelta, timestampDelta, key, value);
+ EosLogRecord.writeTo(buffer, false, offsetDelta, timestampDelta, key, value);
buffer.flip();
EosLogRecord record = EosLogRecord.readFrom(buffer, baseOffset, baseTimestamp, LogEntry.NO_SEQUENCE, null);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
index d440eda9af64f..36c0451e838a3 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
@@ -18,11 +18,13 @@
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.io.DataOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -146,7 +148,8 @@ public void testConvertFromV1ToV0() {
public void buildEosRecord() {
ByteBuffer buffer = ByteBuffer.allocate(2048);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, LogEntry.MAGIC_VALUE_V2, CompressionType.NONE, TimestampType.CREATE_TIME, 1234567L);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, LogEntry.MAGIC_VALUE_V2, CompressionType.NONE,
+ TimestampType.CREATE_TIME, 1234567L);
builder.appendWithOffset(1234567, System.currentTimeMillis(), "a".getBytes(), "v".getBytes());
builder.appendWithOffset(1234568, System.currentTimeMillis(), "b".getBytes(), "v".getBytes());
@@ -162,4 +165,26 @@ public void buildEosRecord() {
}
}
+ @Test
+ public void appendControlRecord() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, LogEntry.MAGIC_VALUE_V2, CompressionType.NONE,
+ TimestampType.CREATE_TIME, 0L);
+
+ builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.COMMIT, null);
+ builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.ABORT, null);
+ MemoryRecords records = builder.build();
+
+ List logRecords = TestUtils.toList(records.records());
+ assertEquals(2, logRecords.size());
+
+ LogRecord commitRecord = logRecords.get(0);
+ assertTrue(commitRecord.isControlRecord());
+ assertEquals(ControlRecordType.COMMIT, ControlRecordType.parse(commitRecord.key()));
+
+ LogRecord abortRecord = logRecords.get(1);
+ assertTrue(abortRecord.isControlRecord());
+ assertEquals(ControlRecordType.ABORT, ControlRecordType.parse(abortRecord.key()));
+ }
+
}
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index fab71650af771..4c8011938f1ab 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -98,6 +98,7 @@ private[kafka] object LogValidator extends Logging {
for (entry <- records.entries.asScala) {
for (record <- entry.asScala) {
+ ensureNotControlRecord(record)
validateKey(record, compactedTopic)
validateTimestamp(entry, record, now, timestampType, messageTimestampDiffMaxMs)
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
@@ -127,10 +128,10 @@ private[kafka] object LogValidator extends Logging {
val baseOffset = offsetCounter.value
for (record <- entry.asScala) {
record.ensureValid()
-
+ ensureNotControlRecord(record)
validateKey(record, compactedTopic)
- val offset = offsetCounter.getAndIncrement()
+ val offset = offsetCounter.getAndIncrement()
if (entry.magic > LogEntry.MAGIC_VALUE_V0) {
validateTimestamp(entry, record, currentTimestamp, timestampType, timestampDiffMaxMs)
@@ -202,6 +203,7 @@ private[kafka] object LogValidator extends Logging {
throw new InvalidRecordException(s"Log record magic does not match outer magic ${entry.magic}")
record.ensureValid()
+ ensureNotControlRecord(record)
validateKey(record, compactedTopic)
if (!record.hasMagic(LogEntry.MAGIC_VALUE_V0) && messageFormatVersion > LogEntry.MAGIC_VALUE_V0) {
@@ -278,6 +280,13 @@ private[kafka] object LogValidator extends Logging {
messageSizeMaybeChanged = true)
}
+
+ private def ensureNotControlRecord(record: LogRecord) {
+ // Until we have implemented transaction support, we do not permit control records to be written
+ if (record.isControlRecord)
+ throw new InvalidRecordException("Control messages are not currently supported")
+ }
+
private def validateKey(record: LogRecord, compactedTopic: Boolean) {
if (compactedTopic && !record.hasKey)
throw new InvalidMessageException("Compacted topic cannot accept message without key.")