Skip to content

Commit

Permalink
Implement full (stubbed out) fetch and produce request schema (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
hachikuji committed Mar 9, 2017
1 parent bc541e9 commit c00e78e
Show file tree
Hide file tree
Showing 19 changed files with 389 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.kafka.common.protocol.types.Type.INT16;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.INT64;
import static org.apache.kafka.common.protocol.types.Type.INT8;
import static org.apache.kafka.common.protocol.types.Type.RECORDS;
import static org.apache.kafka.common.protocol.types.Type.STRING;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
Expand Down Expand Up @@ -164,7 +165,21 @@ public class Protocol {
* timestamp.
*/
public static final Schema PRODUCE_REQUEST_V2 = PRODUCE_REQUEST_V1;
public static final Schema PRODUCE_REQUEST_V3 = PRODUCE_REQUEST_V2;

// Produce request V3 adds the transactional id which is used for authorization when attempting to write
// transactional data. This version also adds support for message format V2.
public static final Schema PRODUCE_REQUEST_V3 = new Schema(
new Field("transactional_id",
NULLABLE_STRING,
"The transactional ID of the producer. This is used to authorize transaction produce requests. " +
"This can be null for non-transactional producers."),
new Field("acks",
INT16,
"The number of acknowledgments the producer requires the leader to have received before " +
"considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader " +
"and -1 for the full ISR."),
new Field("timeout", INT32, "The time to await a response in ms."),
new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));

public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses",
new ArrayOf(new Schema(new Field("topic", STRING),
Expand Down Expand Up @@ -502,7 +517,7 @@ public class Protocol {
// Only the version number is incremented to indicate the client support message format V1 which uses
// relative offset and has timestamp.
public static final Schema FETCH_REQUEST_V2 = FETCH_REQUEST_V1;
// FETCH_REQUEST_V3 added top level max_bytes field - the total size of partition data to accumulate in response.
// Fetch Request V3 added top level max_bytes field - the total size of partition data to accumulate in response.
// The partition ordering is now relevant - partitions will be processed in order they appear in request.
public static final Schema FETCH_REQUEST_V3 = new Schema(new Field("replica_id",
INT32,
Expand All @@ -521,7 +536,34 @@ public class Protocol {
new Field("topics",
new ArrayOf(FETCH_REQUEST_TOPIC_V0),
"Topics to fetch in the order provided."));
public static final Schema FETCH_REQUEST_V4 = FETCH_REQUEST_V3;

// The V4 Fetch Request adds the fetch isolation level and exposes magic v2 (via the response).
public static final Schema FETCH_REQUEST_V4 = new Schema(
new Field("replica_id",
INT32,
"Broker id of the follower. For normal consumers, use -1."),
new Field("max_wait_time",
INT32,
"Maximum time in ms to wait for the response."),
new Field("min_bytes",
INT32,
"Minimum bytes to accumulate in the response."),
new Field("max_bytes",
INT32,
"Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
"if the first message in the first non-empty partition of the fetch is larger than this " +
"value, the message will still be returned to ensure that progress can be made."),
new Field("isolation_level",
INT8,
"This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
"(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
"non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
"READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
"and enables the inclusion of the list of aborted transactions in the result, which allows " +
"consumers to discard ABORTED transactional records"),
new Field("topics",
new ArrayOf(FETCH_REQUEST_TOPIC_V0),
"Topics to fetch in the order provided."));

public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
INT32,
Expand Down Expand Up @@ -552,7 +594,44 @@ public class Protocol {
// (magic byte 0 and 1). For details, see ByteBufferMessageSet.
public static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1;
public static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2;
public static final Schema FETCH_RESPONSE_V4 = FETCH_RESPONSE_V3;


// The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
// last stable offset). It also exposes messages with magic v2 (along with older formats).
private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
new Field("pid", INT64, "The producer ID (PID) associated with the aborted transactions"),
new Field("first_offset", INT64, "The first offset in the aborted transaction"));

public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema(
new Field("partition",
INT32,
"Topic partition id."),
new Field("error_code", INT16),
new Field("high_watermark",
INT64,
"Last committed offset."),
new Field("last_stable_offset",
INT64,
"The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
"of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
new Field("aborted_transactions",
ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));

public static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V4),
new Field("record_set", RECORDS));

public static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
new Field("topic", STRING),
new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));

public static final Schema FETCH_RESPONSE_V4 = new Schema(
new Field("throttle_time_ms",
INT32,
"Duration in milliseconds for which the request was throttled " +
"due to quota violation (zero if the request did not violate any quota).",
0),
new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));

public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4};
public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
*/
package org.apache.kafka.common.protocol.types;

import java.nio.ByteBuffer;

import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Utils;

import java.nio.ByteBuffer;

/**
* A serializable type
*/
Expand Down Expand Up @@ -443,7 +443,7 @@ public void write(ByteBuffer buffer, Object o) {
}

@Override
public Object read(ByteBuffer buffer) {
public Records read(ByteBuffer buffer) {
ByteBuffer recordsBuffer = (ByteBuffer) NULLABLE_BYTES.read(buffer);
return MemoryRecords.readableRecords(recordsBuffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,25 @@
*/
package org.apache.kafka.common.requests;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.Utils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class FetchRequest extends AbstractRequest {
public static final int CONSUMER_REPLICA_ID = -1;
private static final String REPLICA_ID_KEY_NAME = "replica_id";
private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
private static final String MIN_BYTES_KEY_NAME = "min_bytes";
private static final String ISOLATION_LEVEL_KEY_NAME = "isolation_level";
private static final String TOPICS_KEY_NAME = "topics";

// request and partition level name
Expand All @@ -54,6 +55,7 @@ public class FetchRequest extends AbstractRequest {
private final int maxWait;
private final int minBytes;
private final int maxBytes;
private final IsolationLevel isolationLevel;
private final LinkedHashMap<TopicPartition, PartitionData> fetchData;

public static final class PartitionData {
Expand Down Expand Up @@ -159,6 +161,7 @@ private FetchRequest(short version, int replicaId, int maxWait, int minBytes, in
this.minBytes = minBytes;
this.maxBytes = maxBytes;
this.fetchData = fetchData;
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
}

public FetchRequest(Struct struct, short version) {
Expand All @@ -170,6 +173,12 @@ public FetchRequest(Struct struct, short version) {
maxBytes = struct.getInt(MAX_BYTES_KEY_NAME);
else
maxBytes = DEFAULT_RESPONSE_MAX_BYTES;

if (struct.hasField(ISOLATION_LEVEL_KEY_NAME))
isolationLevel = IsolationLevel.forId(struct.getByte(ISOLATION_LEVEL_KEY_NAME));
else
isolationLevel = IsolationLevel.READ_UNCOMMITTED;

fetchData = new LinkedHashMap<>();
for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
Struct topicResponse = (Struct) topicResponseObj;
Expand All @@ -191,8 +200,7 @@ public AbstractResponse getErrorResponse(Throwable e) {

for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e),
FetchResponse.INVALID_HIGHWATERMARK, MemoryRecords.EMPTY);

FetchResponse.INVALID_LSO, FetchResponse.INVALID_HIGHWATERMARK, null, MemoryRecords.EMPTY);
responseData.put(entry.getKey(), partitionResponse);
}
return new FetchResponse(responseData, 0);
Expand Down Expand Up @@ -222,6 +230,10 @@ public boolean isFromFollower() {
return replicaId >= 0;
}

public IsolationLevel isolationLevel() {
return isolationLevel;
}

public static FetchRequest parse(ByteBuffer buffer, short version) {
return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
}
Expand All @@ -237,6 +249,9 @@ protected Struct toStruct() {
struct.set(MIN_BYTES_KEY_NAME, minBytes);
if (version >= 3)
struct.set(MAX_BYTES_KEY_NAME, maxBytes);
if (version >= 4)
struct.set(ISOLATION_LEVEL_KEY_NAME, IsolationLevel.READ_UNCOMMITTED.id());

List<Struct> topicArray = new ArrayList<>();
for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
Struct topicData = struct.instance(TOPICS_KEY_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -49,9 +50,18 @@ public class FetchResponse extends AbstractResponse {
private static final String PARTITION_HEADER_KEY_NAME = "partition_header";
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
private static final String LAST_STABLE_OFFSET_KEY_NAME = "last_stable_offset";
private static final String ABORTED_TRANSACTIONS_KEY_NAME = "aborted_transactions";
private static final String RECORD_SET_KEY_NAME = "record_set";

// aborted transaction field names
private static final String PID_KEY_NAME = "pid";
private static final String FIRST_OFFSET_KEY_NAME = "first_offset";

// Default throttle time
private static final int DEFAULT_THROTTLE_TIME = 0;
public static final long INVALID_HIGHWATERMARK = -1L;
public static final long INVALID_LSO = -1L;

/**
* Possible error codes:
Expand All @@ -63,29 +73,47 @@ public class FetchResponse extends AbstractResponse {
* UNKNOWN (-1)
*/

private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
private static final String RECORD_SET_KEY_NAME = "record_set";

public static final long INVALID_HIGHWATERMARK = -1L;

private final LinkedHashMap<TopicPartition, PartitionData> responseData;
private final int throttleTimeMs;

public static final class AbortedTransaction {
public final long pid;
public final long firstOffset;

public AbortedTransaction(long pid, long firstOffset) {
this.pid = pid;
this.firstOffset = firstOffset;
}

@Override
public String toString() {
return "(pid=" + pid + ", firstOffset=" + firstOffset + ")";
}
}

public static final class PartitionData {
public final Errors error;
public final long lastStableOffset;
public final long highWatermark;
public final List<AbortedTransaction> abortedTransactions;
public final Records records;

public PartitionData(Errors error, long highWatermark, Records records) {
public PartitionData(Errors error,
long highWatermark,
long lastStableOffset,
List<AbortedTransaction> abortedTransactions,
Records records) {
this.error = error;
this.highWatermark = highWatermark;
this.lastStableOffset = lastStableOffset;
this.abortedTransactions = abortedTransactions;
this.records = records;
}

@Override
public String toString() {
return "(error=" + error.toString() + ", highWaterMark=" + highWatermark +
", records=" + records + ")";
return "(error=" + error + ", highWaterMark=" + highWatermark +
", abortedTransactions = " + abortedTransactions + ", records=" + records + ")";
}
}

Expand Down Expand Up @@ -114,8 +142,28 @@ public FetchResponse(Struct struct) {
int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME);
Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME));
long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
long lastStableOffset = INVALID_LSO;
if (partitionResponse.hasField(LAST_STABLE_OFFSET_KEY_NAME))
lastStableOffset = partitionResponse.getLong(LAST_STABLE_OFFSET_KEY_NAME);

Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
PartitionData partitionData = new PartitionData(error, highWatermark, records);

List<AbortedTransaction> abortedTransactions = Collections.emptyList();
if (partitionResponse.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
Object[] abortedTransactionsArray = partitionResponse.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
if (abortedTransactionsArray != null) {
abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
for (Object abortedTransactionObj : abortedTransactionsArray) {
Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
long pid = abortedTransactionStruct.getLong(PID_KEY_NAME);
long firstOffset = abortedTransactionStruct.getLong(FIRST_OFFSET_KEY_NAME);
abortedTransactions.add(new AbortedTransaction(pid, firstOffset));
}
}
}

PartitionData partitionData = new PartitionData(error, highWatermark, lastStableOffset,
abortedTransactions, records);
responseData.put(new TopicPartition(topic, partition), partitionData);
}
}
Expand Down Expand Up @@ -225,6 +273,24 @@ private static Struct toStruct(short version, LinkedHashMap<TopicPartition, Part
partitionDataHeader.set(PARTITION_KEY_NAME, partitionEntry.getKey());
partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code());
partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);

if (version >= 4) {
partitionDataHeader.set(LAST_STABLE_OFFSET_KEY_NAME, fetchPartitionData.lastStableOffset);

if (fetchPartitionData.abortedTransactions == null) {
partitionDataHeader.set(ABORTED_TRANSACTIONS_KEY_NAME, null);
} else {
List<Struct> abortedTransactionStructs = new ArrayList<>(fetchPartitionData.abortedTransactions.size());
for (AbortedTransaction abortedTransaction : fetchPartitionData.abortedTransactions) {
Struct abortedTransactionStruct = partitionDataHeader.instance(ABORTED_TRANSACTIONS_KEY_NAME);
abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.pid);
abortedTransactionStruct.set(FIRST_OFFSET_KEY_NAME, abortedTransaction.firstOffset);
abortedTransactionStructs.add(abortedTransactionStruct);
}
partitionDataHeader.set(ABORTED_TRANSACTIONS_KEY_NAME, abortedTransactionStructs.toArray());
}
}

partitionData.set(PARTITION_HEADER_KEY_NAME, partitionDataHeader);
partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.records);
partitionArray.add(partitionData);
Expand Down
Loading

0 comments on commit c00e78e

Please sign in to comment.