Skip to content

Commit

Permalink
KAFKA-2071: Replace Producer Request/Response with their org.apache.k…
Browse files Browse the repository at this point in the history
…afka.common.requests equivalents

This PR replaces all occurrences of kafka.api.ProducerRequest/ProducerResponse by their common equivalents.

Author: David Jacot <[email protected]>

Reviewers: Grant Henke <[email protected]>, Ismael Juma <[email protected]>, Ewen Cheslack-Postava <[email protected]>

Closes apache#110 from dajac/KAFKA-2071
  • Loading branch information
dajac authored and ewencp committed Jan 21, 2016
1 parent 959cf09 commit 79cda04
Show file tree
Hide file tree
Showing 13 changed files with 208 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {

switch (versionId) {
case 0:
return new ProduceResponse(responseMap, 0);
return new ProduceResponse(responseMap);
case 1:
return new ProduceResponse(responseMap, ProduceResponse.DEFAULT_THROTTLE_TIME);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
Expand All @@ -122,6 +124,10 @@ public Map<TopicPartition, ByteBuffer> partitionRecords() {
return partitionRecords;
}

public void clearPartitionRecords() {
partitionRecords.clear();
}

public static ProduceRequest parse(ByteBuffer buffer, int versionId) {
return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ProduceResponse extends AbstractRequestResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";

public static final long INVALID_OFFSET = -1L;
private static final int DEFAULT_THROTTLE_TIME = 0;
public static final int DEFAULT_THROTTLE_TIME = 0;

/**
* Possible error code:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public RequestSend(String destination, RequestHeader header, Struct body) {
this.body = body;
}

private static ByteBuffer serialize(RequestHeader header, Struct body) {
public static ByteBuffer serialize(RequestHeader header, Struct body) {
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
header.writeTo(buffer);
body.writeTo(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testSerialization() throws Exception {
createOffsetFetchRequest().getErrorResponse(0, new UnknownServerException()),
createOffsetFetchResponse(),
createProduceRequest(),
createProduceRequest().getErrorResponse(0, new UnknownServerException()),
createProduceRequest().getErrorResponse(1, new UnknownServerException()),
createProduceResponse(),
createStopReplicaRequest(),
createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()),
Expand Down
53 changes: 27 additions & 26 deletions core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package kafka.coordinator
import java.util.concurrent.locks.ReentrantReadWriteLock

import kafka.utils.CoreUtils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.types.{ArrayOf, Struct, Schema, Field}
import org.apache.kafka.common.protocol.types.Type.STRING
import org.apache.kafka.common.protocol.types.Type.INT32
import org.apache.kafka.common.protocol.types.Type.INT64
import org.apache.kafka.common.protocol.types.Type.BYTES
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils

import kafka.utils._
Expand All @@ -35,7 +37,6 @@ import kafka.log.FileMessageSet
import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition
import kafka.tools.MessageFormatter
import kafka.api.ProducerResponseStatus
import kafka.server.ReplicaManager

import scala.collection._
Expand All @@ -47,8 +48,8 @@ import java.util.concurrent.TimeUnit
import com.yammer.metrics.core.Gauge


case class DelayedStore(messageSet: Map[TopicAndPartition, MessageSet],
callback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
callback: Map[TopicPartition, PartitionResponse] => Unit)

class GroupMetadataManager(val brokerId: Int,
val config: OffsetConfig,
Expand Down Expand Up @@ -171,15 +172,15 @@ class GroupMetadataManager(val brokerId: Int,
bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment)
)

val groupMetadataPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))
val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))

val groupMetadataMessageSet = Map(groupMetadataPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))

val generationId = group.generationId

// set the callback function to insert the created group into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || ! responseStatus.contains(groupMetadataPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
Expand All @@ -190,30 +191,30 @@ class GroupMetadataManager(val brokerId: Int,
val status = responseStatus(groupMetadataPartition)

var responseCode = Errors.NONE.code
if (status.error != Errors.NONE.code) {
if (status.errorCode != Errors.NONE.code) {
debug("Metadata from group %s with generation %d failed when appending to log due to %s"
.format(group.groupId, generationId, Errors.forCode(status.error).exceptionName))
.format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName))

// transform the log append error code to the corresponding the commit status error code
responseCode = if (status.error == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) {
responseCode = if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) {
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
} else if (status.error == Errors.NOT_LEADER_FOR_PARTITION.code) {
} else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) {
Errors.NOT_COORDINATOR_FOR_GROUP.code
} else if (status.error == Errors.REQUEST_TIMED_OUT.code) {
} else if (status.errorCode == Errors.REQUEST_TIMED_OUT.code) {
Errors.REBALANCE_IN_PROGRESS.code
} else if (status.error == Errors.MESSAGE_TOO_LARGE.code
|| status.error == Errors.RECORD_LIST_TOO_LARGE.code
|| status.error == Errors.INVALID_FETCH_SIZE.code) {
} else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
|| status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
|| status.errorCode == Errors.INVALID_FETCH_SIZE.code) {

error("Appending metadata message for group %s generation %d failed due to %s, returning UNKNOWN error code to the client"
.format(group.groupId, generationId, Errors.forCode(status.error).exceptionName))
.format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName))

Errors.UNKNOWN.code
} else {
error("Appending metadata message for group %s generation %d failed due to unexpected error: %s"
.format(group.groupId, generationId, status.error))
.format(group.groupId, generationId, status.errorCode))

status.error
status.errorCode
}
}

Expand Down Expand Up @@ -254,13 +255,13 @@ class GroupMetadataManager(val brokerId: Int,
)
}.toSeq

val offsetTopicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId))
val offsetTopicPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId))

val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))

// set the callback function to insert offsets into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
Expand All @@ -271,26 +272,26 @@ class GroupMetadataManager(val brokerId: Int,
val status = responseStatus(offsetTopicPartition)

val responseCode =
if (status.error == Errors.NONE.code) {
if (status.errorCode == Errors.NONE.code) {
filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
}
Errors.NONE.code
} else {
debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
.format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.error).exceptionName))
.format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName))

// transform the log append error code to the corresponding the commit status error code
if (status.error == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
else if (status.error == Errors.NOT_LEADER_FOR_PARTITION.code)
else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code)
Errors.NOT_COORDINATOR_FOR_GROUP.code
else if (status.error == Errors.MESSAGE_TOO_LARGE.code
|| status.error == Errors.RECORD_LIST_TOO_LARGE.code
|| status.error == Errors.INVALID_FETCH_SIZE.code)
else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
|| status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
|| status.errorCode == Errors.INVALID_FETCH_SIZE.code)
Errors.INVALID_COMMIT_OFFSET_SIZE.code
else
status.error
status.errorCode
}


Expand Down
19 changes: 7 additions & 12 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@ package kafka.network

import java.net.InetAddress
import java.nio.ByteBuffer
import java.security.Principal
import java.util.HashMap
import java.util.concurrent._

import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.{Logging, SystemTime}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader}
import org.apache.kafka.common.requests.{RequestSend, ProduceRequest, AbstractRequest, RequestHeader}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.log4j.Logger

Expand All @@ -39,12 +38,9 @@ object RequestChannel extends Logging {
val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost()), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)

def getShutdownReceive() = {
val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
byteBuffer.putShort(ApiKeys.PRODUCE.id)
emptyProducerRequest.writeTo(byteBuffer)
byteBuffer.rewind()
byteBuffer
val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, "", 0)
val emptyProduceRequest = new ProduceRequest(0, 0, new HashMap[TopicPartition, ByteBuffer]())
RequestSend.serialize(emptyRequestHeader, emptyProduceRequest.toStruct)
}

case class Session(principal: KafkaPrincipal, clientAddress: InetAddress)
Expand All @@ -66,8 +62,7 @@ object RequestChannel extends Logging {
// request types should only use the client-side versions which are parsed with
// o.a.k.common.requests.AbstractRequest.getRequest()
private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
Map(ApiKeys.PRODUCE.id -> ProducerRequest.readFrom,
ApiKeys.FETCH.id -> FetchRequest.readFrom,
Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
ApiKeys.UPDATE_METADATA_KEY.id -> UpdateMetadataRequest.readFrom,
ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom,
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/DelayedOperationKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server

import kafka.common.TopicAndPartition
import org.apache.kafka.common.TopicPartition

/**
* Keys used for delayed operation metrics recording
Expand All @@ -33,6 +34,8 @@ object DelayedOperationKey {
/* used by delayed-produce and delayed-fetch operations */
case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey {

def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition)

def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)

override def keyLabel = "%s-%d".format(topic, partition)
Expand Down
30 changes: 15 additions & 15 deletions core/src/main/scala/kafka/server/DelayedProduce.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@ package kafka.server
import java.util.concurrent.TimeUnit

import com.yammer.metrics.core.Meter
import kafka.api.ProducerResponseStatus
import kafka.common.TopicAndPartition
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Pool
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse

import scala.collection._

case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) {
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
@volatile var acksPending = false

override def toString = "[acksPending: %b, error: %d, startOffset: %d, requiredOffset: %d]"
.format(acksPending, responseStatus.error, responseStatus.offset, requiredOffset)
.format(acksPending, responseStatus.errorCode, responseStatus.baseOffset, requiredOffset)
}

/**
* The produce metadata maintained by the delayed produce operation
*/
case class ProduceMetadata(produceRequiredAcks: Short,
produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) {
produceStatus: Map[TopicPartition, ProducePartitionStatus]) {

override def toString = "[requiredAcks: %d, partitionStatus: %s]"
.format(produceRequiredAcks, produceStatus)
Expand All @@ -53,20 +53,20 @@ case class ProduceMetadata(produceRequiredAcks: Short,
class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
responseCallback: Map[TopicPartition, PartitionResponse] => Unit)
extends DelayedOperation(delayMs) {

// first update the acks pending variable according to the error code
produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
if (status.responseStatus.error == Errors.NONE.code) {
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
if (status.responseStatus.errorCode == Errors.NONE.code) {
// Timeout error state will be cleared when required acks are received
status.acksPending = true
status.responseStatus.error = Errors.REQUEST_TIMED_OUT.code
status.responseStatus.errorCode = Errors.REQUEST_TIMED_OUT.code
} else {
status.acksPending = false
}

trace("Initial partition status for %s is %s".format(topicAndPartition, status))
trace("Initial partition status for %s is %s".format(topicPartition, status))
}

/**
Expand Down Expand Up @@ -97,11 +97,11 @@ class DelayedProduce(delayMs: Long,
if (errorCode != Errors.NONE.code) {
// Case B.1
status.acksPending = false
status.responseStatus.error = errorCode
status.responseStatus.errorCode = errorCode
} else if (hasEnough) {
// Case B.2
status.acksPending = false
status.responseStatus.error = Errors.NONE.code
status.responseStatus.errorCode = Errors.NONE.code
}
}
}
Expand Down Expand Up @@ -134,14 +134,14 @@ object DelayedProduceMetrics extends KafkaMetricsGroup {

private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)

private val partitionExpirationMeterFactory = (key: TopicAndPartition) =>
private val partitionExpirationMeterFactory = (key: TopicPartition) =>
newMeter("ExpiresPerSec",
"requests",
TimeUnit.SECONDS,
tags = Map("topic" -> key.topic, "partition" -> key.partition.toString))
private val partitionExpirationMeters = new Pool[TopicAndPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory))
private val partitionExpirationMeters = new Pool[TopicPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory))

def recordExpiration(partition: TopicAndPartition) {
def recordExpiration(partition: TopicPartition) {
aggregateExpirationMeter.mark()
partitionExpirationMeters.getAndMaybePut(partition).mark()
}
Expand Down
Loading

0 comments on commit 79cda04

Please sign in to comment.