Skip to content

Commit

Permalink
Add basic support for control messages (apache#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
hachikuji committed Mar 16, 2017
1 parent 722e407 commit 36c10fa
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 39 deletions.
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@
<allow pkg="net.jpountz" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.protocol.types" />
<allow pkg="org.apache.kafka.common.errors" />
</subpackage>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,12 +771,13 @@ private PartitionRecords<K, V> parseCompletedFetch(CompletedFetch completedFetch
boolean skippedRecords = false;
for (LogEntry entry : partition.records.entries()) {
for (LogRecord record : entry) {
// Skip the messages earlier than current position.
if (record.offset() >= position) {
// control records should not be returned to the user. also skip anything out of range
if (record.isControlRecord() || record.offset() < position) {
skippedRecords = true;
} else {
parsed.add(parseRecord(tp, entry, record));
bytes += record.sizeInBytes();
} else
skippedRecords = true;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.common.record;

import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;

import java.nio.ByteBuffer;

public enum ControlRecordType {
COMMIT((short) 0),
ABORT((short) 1),

// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
UNKNOWN((short) -1);

private static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0;
private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new Schema(
new Field("version", Type.INT16),
new Field("type", Type.INT16));

private final short type;

ControlRecordType(short type) {
this.type = type;
}

public Struct recordKey() {
if (this == UNKNOWN)
throw new IllegalArgumentException("Cannot serialize UNKNOWN control record type");

Struct struct = new Struct(CONTROL_RECORD_KEY_SCHEMA_VERSION_V0);
struct.set("version", CURRENT_CONTROL_RECORD_KEY_VERSION);
struct.set("type", type);
return struct;
}

public static ControlRecordType parse(ByteBuffer key) {
short version = key.getShort(0);
if (version != CURRENT_CONTROL_RECORD_KEY_VERSION)
throw new IllegalArgumentException("Cannot parse control record key schema with version " + version);
short type = key.getShort(2);
switch (type) {
case 0:
return COMMIT;
case 1:
return ABORT;
default:
return UNKNOWN;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class EosLogRecord implements LogRecord {
private static final int MAX_RECORD_OVERHEAD = 21;
private static final int NULL_KEY_MASK = 0x01;
private static final int NULL_VALUE_MASK = 0x02;
private static final int CONTROL_FLAG_MASK = 0x04;

private final int sizeInBytes;
private final byte attributes;
Expand Down Expand Up @@ -149,14 +150,15 @@ public ByteBuffer value() {
}

public static long writeTo(DataOutputStream out,
boolean isControlRecord,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value) throws IOException {
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value);
ByteUtils.writeVarint(sizeInBytes, out);

byte attributes = computeAttributes(key, value);
byte attributes = computeAttributes(isControlRecord, key, value);
out.write(attributes);

ByteUtils.writeVarlong(timestampDelta, out);
Expand All @@ -175,12 +177,13 @@ public static long writeTo(DataOutputStream out,
}

public static long writeTo(ByteBuffer out,
boolean isControlRecord,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value) {
try {
return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta,
return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), isControlRecord, offsetDelta,
timestampDelta, key, value);
} catch (IOException e) {
// cannot actually be raised by ByteBufferOutputStream
Expand Down Expand Up @@ -224,6 +227,11 @@ public boolean hasTimestampType(TimestampType timestampType) {
return false;
}

@Override
public boolean isControlRecord() {
return (attributes & CONTROL_FLAG_MASK) != 0;
}

public static EosLogRecord readFrom(DataInputStream input,
long baseOffset,
long baseTimestamp,
Expand Down Expand Up @@ -280,8 +288,8 @@ private static EosLogRecord readFrom(ByteBuffer buffer,
return new EosLogRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value);
}

private static byte computeAttributes(ByteBuffer key, ByteBuffer value) {
byte attributes = 0;
private static byte computeAttributes(boolean isControlRecord, ByteBuffer key, ByteBuffer value) {
byte attributes = isControlRecord ? CONTROL_FLAG_MASK : (byte) 0;
if (key == null)
attributes |= NULL_KEY_MASK;
if (value == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,20 @@ public interface LogRecord {
boolean isCompressed();

/**
* For versions prior to 2, the message contained a timestamp type attribute. This method can be
* For versions prior to 2, the record contained a timestamp type attribute. This method can be
* used to check whether the value of that attribute matches a particular timestamp type. For versions
* 2 and above, this will always be false.
* @param timestampType the timestamp type to compare
* @return true if the version is lower than 2 and the timestamp type matches
*/
boolean hasTimestampType(TimestampType timestampType);

/**
* Check whether this is a control record (i.e. whether the control bit is set in the record attributes).
* For magic versions prior to 2, this is always false.
*
* @return Whether this is a control record
*/
boolean isControlRecord();

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.record;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.ByteBufferOutputStream;

import java.io.DataOutputStream;
Expand Down Expand Up @@ -240,19 +241,22 @@ private void writeOldEntryCompressedWrapperHeader() {
* @param value The record value
* @return crc of the record
*/
public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
public long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value) {
try {
if (lastOffset >= 0 && offset <= lastOffset)
throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));

if (timestamp < 0 && timestamp != LogEntry.NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);

if (isControlRecord && magic < LogEntry.MAGIC_VALUE_V2)
throw new IllegalArgumentException("Magic v" + magic + " does not support control records");

if (baseTimestamp == null)
baseTimestamp = timestamp;

if (magic > LogEntry.MAGIC_VALUE_V1)
return appendEosLogRecord(offset, timestamp, key, value);
return appendEosLogRecord(offset, isControlRecord, timestamp, key, value);
else
return appendOldLogRecord(offset, timestamp, key, value);
} catch (IOException e) {
Expand All @@ -261,13 +265,14 @@ public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBu
}

public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
return appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value));
return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value));
}

private long appendEosLogRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {
private long appendEosLogRecord(long offset, boolean isControlRecord, long timestamp,
ByteBuffer key, ByteBuffer value) throws IOException {
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
long crc = EosLogRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value);
long crc = EosLogRecord.writeTo(appendStream, isControlRecord, offsetDelta, timestampDelta, key, value);
recordWritten(offset, timestamp, EosLogRecord.sizeInBytes(offsetDelta, timestamp, key, value));
return crc;
}
Expand All @@ -287,7 +292,11 @@ private long appendOldLogRecord(long offset, long timestamp, ByteBuffer key, Byt
}

public long append(long timestamp, ByteBuffer key, ByteBuffer value) {
return appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value);
return appendWithOffset(nextSequentialOffset(), false, timestamp, key, value);
}

private long nextSequentialOffset() {
return lastOffset < 0 ? baseOffset : lastOffset + 1;
}

/**
Expand All @@ -306,8 +315,16 @@ public long append(KafkaRecord record) {
return append(record.timestamp(), record.key(), record.value());
}

public long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
Struct keyStruct = type.recordKey();
ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
keyStruct.writeTo(key);
key.flip();
return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value);
}

public long appendWithOffset(long offset, KafkaRecord record) {
return appendWithOffset(offset, record.timestamp(), record.key(), record.value());
return appendWithOffset(offset, false, record.timestamp(), record.key(), record.value());
}

/**
Expand All @@ -334,7 +351,7 @@ public void appendUncheckedWithOffset(long offset, Record record) {
* @param record
*/
public void append(LogRecord record) {
appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
appendWithOffset(record.offset(), record.isControlRecord(), record.timestamp(), record.key(), record.value());
}

/**
Expand All @@ -343,7 +360,7 @@ public void append(LogRecord record) {
* @param record
*/
public void appendWithOffset(long offset, LogRecord record) {
appendWithOffset(offset, record.timestamp(), record.key(), record.value());
appendWithOffset(offset, record.isControlRecord(), record.timestamp(), record.key(), record.value());
}

/**
Expand All @@ -366,7 +383,7 @@ public void appendWithOffset(long offset, Record record) {
* @param record The record to add
*/
public void append(Record record) {
appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, record);
appendWithOffset(nextSequentialOffset(), record);
}

private long toInnerOffset(long offset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ public int lastSequence() {
return LogEntry.NO_SEQUENCE;
}

@Override
public boolean isControlRecord() {
return false;
}

/**
* Get an iterator for the nested entries contained within this log entry. Note that
* if the entry is not compressed, then this method will return an iterator over the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.Arrays;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand All @@ -34,24 +35,27 @@ public void testBasicSerde() {
new KafkaRecord(null, null)
};

for (KafkaRecord record : records) {
int baseSequence = 723;
long baseOffset = 37;
int offsetDelta = 10;
long baseTimestamp = System.currentTimeMillis();
long timestampDelta = 323;
for (boolean isControlRecord : Arrays.asList(true, false)) {
for (KafkaRecord record : records) {
int baseSequence = 723;
long baseOffset = 37;
int offsetDelta = 10;
long baseTimestamp = System.currentTimeMillis();
long timestampDelta = 323;

ByteBuffer buffer = ByteBuffer.allocate(1024);
EosLogRecord.writeTo(buffer, offsetDelta, timestampDelta, record.key(), record.value());
buffer.flip();
ByteBuffer buffer = ByteBuffer.allocate(1024);
EosLogRecord.writeTo(buffer, isControlRecord, offsetDelta, timestampDelta, record.key(), record.value());
buffer.flip();

EosLogRecord logRecord = EosLogRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
assertNotNull(logRecord);
assertEquals(baseOffset + offsetDelta, logRecord.offset());
assertEquals(baseSequence + offsetDelta, logRecord.sequence());
assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp());
assertEquals(record.key(), logRecord.key());
assertEquals(record.value(), logRecord.value());
EosLogRecord logRecord = EosLogRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
assertNotNull(logRecord);
assertEquals(baseOffset + offsetDelta, logRecord.offset());
assertEquals(baseSequence + offsetDelta, logRecord.sequence());
assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp());
assertEquals(record.key(), logRecord.key());
assertEquals(record.value(), logRecord.value());
assertEquals(isControlRecord, logRecord.isControlRecord());
}
}
}

Expand All @@ -65,7 +69,7 @@ public void testSerdeNoSequence() {
long timestampDelta = 323;

ByteBuffer buffer = ByteBuffer.allocate(1024);
EosLogRecord.writeTo(buffer, offsetDelta, timestampDelta, key, value);
EosLogRecord.writeTo(buffer, false, offsetDelta, timestampDelta, key, value);
buffer.flip();

EosLogRecord record = EosLogRecord.readFrom(buffer, baseOffset, baseTimestamp, LogEntry.NO_SEQUENCE, null);
Expand Down
Loading

0 comments on commit 36c10fa

Please sign in to comment.