diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 0581f8400ed13..a91524733ae77 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -103,7 +103,9 @@ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { switch (versionId) { case 0: - return new ProduceResponse(responseMap, 0); + return new ProduceResponse(responseMap); + case 1: + return new ProduceResponse(responseMap, ProduceResponse.DEFAULT_THROTTLE_TIME); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id))); @@ -122,6 +124,10 @@ public Map partitionRecords() { return partitionRecords; } + public void clearPartitionRecords() { + partitionRecords.clear(); + } + public static ProduceRequest parse(ByteBuffer buffer, int versionId) { return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer)); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index fc413079b7861..c213332079df7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -43,7 +43,7 @@ public class ProduceResponse extends AbstractRequestResponse { private static final String ERROR_CODE_KEY_NAME = "error_code"; public static final long INVALID_OFFSET = -1L; - private static final int DEFAULT_THROTTLE_TIME = 0; + public static final int DEFAULT_THROTTLE_TIME = 0; /** * Possible error code: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java index 3fec60beda761..02cac80cd973f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java @@ -31,7 +31,7 @@ public RequestSend(String destination, RequestHeader header, Struct body) { this.body = body; } - private static ByteBuffer serialize(RequestHeader header, Struct body) { + public static ByteBuffer serialize(RequestHeader header, Struct body) { ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf()); header.writeTo(buffer); body.writeTo(buffer); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 69431a59563be..789cca79f5eb2 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -81,7 +81,7 @@ public void testSerialization() throws Exception { createOffsetFetchRequest().getErrorResponse(0, new UnknownServerException()), createOffsetFetchResponse(), createProduceRequest(), - createProduceRequest().getErrorResponse(0, new UnknownServerException()), + createProduceRequest().getErrorResponse(1, new UnknownServerException()), createProduceResponse(), createStopReplicaRequest(), createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()), diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index beb5a6f2ab63a..48818c3edff8e 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -20,12 +20,14 @@ package kafka.coordinator import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.utils.CoreUtils._ +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.{ArrayOf, Struct, Schema, Field} import org.apache.kafka.common.protocol.types.Type.STRING import org.apache.kafka.common.protocol.types.Type.INT32 import org.apache.kafka.common.protocol.types.Type.INT64 import org.apache.kafka.common.protocol.types.Type.BYTES +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils import kafka.utils._ @@ -35,7 +37,6 @@ import kafka.log.FileMessageSet import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import kafka.tools.MessageFormatter -import kafka.api.ProducerResponseStatus import kafka.server.ReplicaManager import scala.collection._ @@ -47,8 +48,8 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Gauge -case class DelayedStore(messageSet: Map[TopicAndPartition, MessageSet], - callback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) +case class DelayedStore(messageSet: Map[TopicPartition, MessageSet], + callback: Map[TopicPartition, PartitionResponse] => Unit) class GroupMetadataManager(val brokerId: Int, val config: OffsetConfig, @@ -171,7 +172,7 @@ class GroupMetadataManager(val brokerId: Int, bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment) ) - val groupMetadataPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId)) + val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId)) val groupMetadataMessageSet = Map(groupMetadataPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message)) @@ -179,7 +180,7 @@ class GroupMetadataManager(val brokerId: Int, val generationId = group.generationId // set the callback function to insert the created group into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { // the append response should only contain the topics partition if (responseStatus.size != 1 || ! responseStatus.contains(groupMetadataPartition)) throw new IllegalStateException("Append status %s should only have one partition %s" @@ -190,30 +191,30 @@ class GroupMetadataManager(val brokerId: Int, val status = responseStatus(groupMetadataPartition) var responseCode = Errors.NONE.code - if (status.error != Errors.NONE.code) { + if (status.errorCode != Errors.NONE.code) { debug("Metadata from group %s with generation %d failed when appending to log due to %s" - .format(group.groupId, generationId, Errors.forCode(status.error).exceptionName)) + .format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName)) // transform the log append error code to the corresponding the commit status error code - responseCode = if (status.error == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) { + responseCode = if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) { Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code - } else if (status.error == Errors.NOT_LEADER_FOR_PARTITION.code) { + } else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) { Errors.NOT_COORDINATOR_FOR_GROUP.code - } else if (status.error == Errors.REQUEST_TIMED_OUT.code) { + } else if (status.errorCode == Errors.REQUEST_TIMED_OUT.code) { Errors.REBALANCE_IN_PROGRESS.code - } else if (status.error == Errors.MESSAGE_TOO_LARGE.code - || status.error == Errors.RECORD_LIST_TOO_LARGE.code - || status.error == Errors.INVALID_FETCH_SIZE.code) { + } else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code + || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code + || status.errorCode == Errors.INVALID_FETCH_SIZE.code) { error("Appending metadata message for group %s generation %d failed due to %s, returning UNKNOWN error code to the client" - .format(group.groupId, generationId, Errors.forCode(status.error).exceptionName)) + .format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName)) Errors.UNKNOWN.code } else { error("Appending metadata message for group %s generation %d failed due to unexpected error: %s" - .format(group.groupId, generationId, status.error)) + .format(group.groupId, generationId, status.errorCode)) - status.error + status.errorCode } } @@ -254,13 +255,13 @@ class GroupMetadataManager(val brokerId: Int, ) }.toSeq - val offsetTopicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId)) + val offsetTopicPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId)) val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) // set the callback function to insert offsets into cache after log append completed - def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { // the append response should only contain the topics partition if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) throw new IllegalStateException("Append status %s should only have one partition %s" @@ -271,26 +272,26 @@ class GroupMetadataManager(val brokerId: Int, val status = responseStatus(offsetTopicPartition) val responseCode = - if (status.error == Errors.NONE.code) { + if (status.errorCode == Errors.NONE.code) { filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) => putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata) } Errors.NONE.code } else { debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s" - .format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.error).exceptionName)) + .format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName)) // transform the log append error code to the corresponding the commit status error code - if (status.error == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code - else if (status.error == Errors.NOT_LEADER_FOR_PARTITION.code) + else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) Errors.NOT_COORDINATOR_FOR_GROUP.code - else if (status.error == Errors.MESSAGE_TOO_LARGE.code - || status.error == Errors.RECORD_LIST_TOO_LARGE.code - || status.error == Errors.INVALID_FETCH_SIZE.code) + else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code + || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code + || status.errorCode == Errors.INVALID_FETCH_SIZE.code) Errors.INVALID_COMMIT_OFFSET_SIZE.code else - status.error + status.errorCode } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 998f51ad74213..f0d599ddc8525 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -19,18 +19,17 @@ package kafka.network import java.net.InetAddress import java.nio.ByteBuffer -import java.security.Principal +import java.util.HashMap import java.util.concurrent._ import com.yammer.metrics.core.Gauge import kafka.api._ -import kafka.common.TopicAndPartition -import kafka.message.ByteBufferMessageSet import kafka.metrics.KafkaMetricsGroup import kafka.utils.{Logging, SystemTime} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} -import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader} +import org.apache.kafka.common.requests.{RequestSend, ProduceRequest, AbstractRequest, RequestHeader} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.log4j.Logger @@ -39,12 +38,9 @@ object RequestChannel extends Logging { val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost()), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) def getShutdownReceive() = { - val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) - val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2) - byteBuffer.putShort(ApiKeys.PRODUCE.id) - emptyProducerRequest.writeTo(byteBuffer) - byteBuffer.rewind() - byteBuffer + val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, "", 0) + val emptyProduceRequest = new ProduceRequest(0, 0, new HashMap[TopicPartition, ByteBuffer]()) + RequestSend.serialize(emptyRequestHeader, emptyProduceRequest.toStruct) } case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) @@ -66,8 +62,7 @@ object RequestChannel extends Logging { // request types should only use the client-side versions which are parsed with // o.a.k.common.requests.AbstractRequest.getRequest() private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]= - Map(ApiKeys.PRODUCE.id -> ProducerRequest.readFrom, - ApiKeys.FETCH.id -> FetchRequest.readFrom, + Map(ApiKeys.FETCH.id -> FetchRequest.readFrom, ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom, ApiKeys.UPDATE_METADATA_KEY.id -> UpdateMetadataRequest.readFrom, ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom, diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala index f005019b9a21e..072a658ea25b7 100644 --- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala +++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala @@ -18,6 +18,7 @@ package kafka.server import kafka.common.TopicAndPartition +import org.apache.kafka.common.TopicPartition /** * Keys used for delayed operation metrics recording @@ -33,6 +34,8 @@ object DelayedOperationKey { /* used by delayed-produce and delayed-fetch operations */ case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey { + def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition) + def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) override def keyLabel = "%s-%d".format(topic, partition) diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index c228807258b26..be1be4faf97c7 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -21,26 +21,26 @@ package kafka.server import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Meter -import kafka.api.ProducerResponseStatus -import kafka.common.TopicAndPartition import kafka.metrics.KafkaMetricsGroup import kafka.utils.Pool import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import scala.collection._ -case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) { +case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) { @volatile var acksPending = false override def toString = "[acksPending: %b, error: %d, startOffset: %d, requiredOffset: %d]" - .format(acksPending, responseStatus.error, responseStatus.offset, requiredOffset) + .format(acksPending, responseStatus.errorCode, responseStatus.baseOffset, requiredOffset) } /** * The produce metadata maintained by the delayed produce operation */ case class ProduceMetadata(produceRequiredAcks: Short, - produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) { + produceStatus: Map[TopicPartition, ProducePartitionStatus]) { override def toString = "[requiredAcks: %d, partitionStatus: %s]" .format(produceRequiredAcks, produceStatus) @@ -53,20 +53,20 @@ case class ProduceMetadata(produceRequiredAcks: Short, class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, - responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) + responseCallback: Map[TopicPartition, PartitionResponse] => Unit) extends DelayedOperation(delayMs) { // first update the acks pending variable according to the error code - produceMetadata.produceStatus.foreach { case (topicAndPartition, status) => - if (status.responseStatus.error == Errors.NONE.code) { + produceMetadata.produceStatus.foreach { case (topicPartition, status) => + if (status.responseStatus.errorCode == Errors.NONE.code) { // Timeout error state will be cleared when required acks are received status.acksPending = true - status.responseStatus.error = Errors.REQUEST_TIMED_OUT.code + status.responseStatus.errorCode = Errors.REQUEST_TIMED_OUT.code } else { status.acksPending = false } - trace("Initial partition status for %s is %s".format(topicAndPartition, status)) + trace("Initial partition status for %s is %s".format(topicPartition, status)) } /** @@ -97,11 +97,11 @@ class DelayedProduce(delayMs: Long, if (errorCode != Errors.NONE.code) { // Case B.1 status.acksPending = false - status.responseStatus.error = errorCode + status.responseStatus.errorCode = errorCode } else if (hasEnough) { // Case B.2 status.acksPending = false - status.responseStatus.error = Errors.NONE.code + status.responseStatus.errorCode = Errors.NONE.code } } } @@ -134,14 +134,14 @@ object DelayedProduceMetrics extends KafkaMetricsGroup { private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS) - private val partitionExpirationMeterFactory = (key: TopicAndPartition) => + private val partitionExpirationMeterFactory = (key: TopicPartition) => newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map("topic" -> key.topic, "partition" -> key.partition.toString)) - private val partitionExpirationMeters = new Pool[TopicAndPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory)) + private val partitionExpirationMeters = new Pool[TopicPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory)) - def recordExpiration(partition: TopicAndPartition) { + def recordExpiration(partition: TopicPartition) { aggregateExpirationMeter.mark() partitionExpirationMeters.getAndMaybePut(partition).mark() } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f7d6be911eec6..e48df90db6796 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -27,7 +27,7 @@ import kafka.common._ import kafka.controller.KafkaController import kafka.coordinator.{GroupCoordinator, JoinGroupResult} import kafka.log._ -import kafka.message.MessageSet +import kafka.message.{ByteBufferMessageSet, MessageSet} import kafka.network._ import kafka.network.RequestChannel.{Session, Response} import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write} @@ -35,11 +35,12 @@ import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, ClusterAuthorizationException} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} +import org.apache.kafka.common.protocol.{ProtoUtils, ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, -StopReplicaRequest, StopReplicaResponse} +StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse} +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicPartition, Node} @@ -109,7 +110,6 @@ class KafkaApis(val requestChannel: RequestChannel, error("Error when handling request %s".format(request.body), e) } - } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds } @@ -315,44 +315,43 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle a produce request */ def handleProducerRequest(request: RequestChannel.Request) { - val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] - val numBytesAppended = produceRequest.sizeInBytes + val produceRequest = request.body.asInstanceOf[ProduceRequest] + val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf - val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition { - case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic)) + val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.partitionRecords.asScala.partition { + case (topicPartition, _) => authorize(request.session, Write, new Resource(Topic, topicPartition.topic)) } // the callback for sending a produce response - def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { - val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1)) + val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1)) var errorInResponse = false - mergedResponseStatus.foreach { case (topicAndPartition, status) => - if (status.error != Errors.NONE.code) { + mergedResponseStatus.foreach { case (topicPartition, status) => + if (status.errorCode != Errors.NONE.code) { errorInResponse = true debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - produceRequest.correlationId, - produceRequest.clientId, - topicAndPartition, - Errors.forCode(status.error).exceptionName)) + request.header.correlationId, + request.header.clientId, + topicPartition, + Errors.forCode(status.errorCode).exceptionName)) } } def produceResponseCallback(delayTimeMs: Int) { - - if (produceRequest.requiredAcks == 0) { + if (produceRequest.acks == 0) { // no operation needed if producer request.required.acks = 0; however, if there is any error in handling // the request, since no response is expected by the producer, the server will close socket server so that // the producer client will know that some error has happened and will refresh its metadata if (errorInResponse) { - val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) => - topicAndPartition -> Errors.forCode(status.error).exceptionName + val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => + topicPartition -> Errors.forCode(status.errorCode).exceptionName }.mkString(", ") info( - s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " + - s"from client id ${produceRequest.clientId} with ack=0\n" + + s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " + + s"from client id ${request.header.clientId} with ack=0\n" + s"Topic and partition to exceptions: $exceptionsSummary" ) requestChannel.closeConnection(request.processor, request) @@ -360,41 +359,51 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.noOperation(request.processor, request) } } else { - val response = ProducerResponse(produceRequest.correlationId, - mergedResponseStatus, - produceRequest.versionId, - delayTimeMs) - requestChannel.sendResponse(new RequestChannel.Response(request, - new RequestOrResponseSend(request.connectionId, - response))) + val respHeader = new ResponseHeader(request.header.correlationId) + val respBody = request.header.apiVersion match { + case 0 => new ProduceResponse(mergedResponseStatus.asJava) + case 1 => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs) + // This case shouldn't happen unless a new version of ProducerRequest is added without + // updating this part of the code to handle it properly. + case _ => throw new IllegalArgumentException("Version %d of ProducerRequest is not handled. Code must be updated." + .format(request.header.apiVersion)) + } + + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody))) } } // When this callback is triggered, the remote API call has completed request.apiRemoteCompleteTimeMs = SystemTime.milliseconds - quotaManagers(ApiKeys.PRODUCE.id).recordAndMaybeThrottle(produceRequest.clientId, - numBytesAppended, - produceResponseCallback) + quotaManagers(ApiKeys.PRODUCE.id).recordAndMaybeThrottle( + request.header.clientId, + numBytesAppended, + produceResponseCallback) } if (authorizedRequestInfo.isEmpty) sendResponseCallback(Map.empty) else { - val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId + val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId + + // Convert ByteBuffer to ByteBufferMessageSet + val authorizedMessagesPerPartition = authorizedRequestInfo.map { + case (topicPartition, buffer) => (topicPartition, new ByteBufferMessageSet(buffer)) + } // call the replica manager to append messages to the replicas replicaManager.appendMessages( - produceRequest.ackTimeoutMs.toLong, - produceRequest.requiredAcks, + produceRequest.timeout.toLong, + produceRequest.acks, internalTopicsAllowed, - authorizedRequestInfo, + authorizedMessagesPerPartition, sendResponseCallback) // if the request is put into the purgatory, it will have a held reference // and hence cannot be garbage collected; hence we clear its data here in // order to let GC re-claim its memory since it is already appended to log - produceRequest.emptyData() + produceRequest.clearPartitionRecords() } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d1e549d47da58..0ffb0e39d3e5c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.LeaderAndIsrRequest import org.apache.kafka.common.requests.StopReplicaRequest +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time => JTime} import scala.collection._ @@ -320,19 +321,19 @@ class ReplicaManager(val config: KafkaConfig, def appendMessages(timeout: Long, requiredAcks: Short, internalTopicsAllowed: Boolean, - messagesPerPartition: Map[TopicAndPartition, MessageSet], - responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { + messagesPerPartition: Map[TopicPartition, MessageSet], + responseCallback: Map[TopicPartition, PartitionResponse] => Unit) { if (isValidRequiredAcks(requiredAcks)) { val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - val produceStatus = localProduceResults.map { case (topicAndPartition, result) => - topicAndPartition -> + val produceStatus = localProduceResults.map { case (topicPartition, result) => + topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset - ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status + new PartitionResponse(result.errorCode, result.info.firstOffset)) // response status } if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) { @@ -359,7 +360,7 @@ class ReplicaManager(val config: KafkaConfig, val responseStatus = messagesPerPartition.map { case (topicAndPartition, messageSet) => (topicAndPartition -> - ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code, + new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code, LogAppendInfo.UnknownLogAppendInfo.firstOffset)) } responseCallback(responseStatus) @@ -371,8 +372,8 @@ class ReplicaManager(val config: KafkaConfig, // 1. required acks = -1 // 2. there is data to append // 3. at least one partition append was successful (fewer errors than partitions) - private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicAndPartition, MessageSet], - localProduceResults: Map[TopicAndPartition, LogAppendResult]): Boolean = { + private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicPartition, MessageSet], + localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = { requiredAcks == -1 && messagesPerPartition.size > 0 && localProduceResults.values.count(_.error.isDefined) < messagesPerPartition.size @@ -386,26 +387,26 @@ class ReplicaManager(val config: KafkaConfig, * Append the messages to the local replica logs */ private def appendToLocalLog(internalTopicsAllowed: Boolean, - messagesPerPartition: Map[TopicAndPartition, MessageSet], - requiredAcks: Short): Map[TopicAndPartition, LogAppendResult] = { + messagesPerPartition: Map[TopicPartition, MessageSet], + requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { trace("Append [%s] to local log ".format(messagesPerPartition)) - messagesPerPartition.map { case (topicAndPartition, messages) => - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).totalProduceRequestRate.mark() + messagesPerPartition.map { case (topicPartition, messages) => + BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark() // reject appending to internal topics if it is not allowed - if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed) { - (topicAndPartition, LogAppendResult( + if (Topic.InternalTopics.contains(topicPartition.topic) && !internalTopicsAllowed) { + (topicPartition, LogAppendResult( LogAppendInfo.UnknownLogAppendInfo, - Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic))))) + Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicPartition.topic))))) } else { try { - val partitionOpt = getPartition(topicAndPartition.topic, topicAndPartition.partition) + val partitionOpt = getPartition(topicPartition.topic, topicPartition.partition) val info = partitionOpt match { case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks) case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" - .format(topicAndPartition, localBrokerId)) + .format(topicPartition, localBrokerId)) } val numAppendedMessages = @@ -415,36 +416,36 @@ class ReplicaManager(val config: KafkaConfig, info.lastOffset - info.firstOffset + 1 // update stats for successfully appended bytes and messages as bytesInRate and messageInRate - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) + BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages) BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" - .format(messages.sizeInBytes, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) - (topicAndPartition, LogAppendResult(info)) + .format(messages.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset)) + (topicPartition, LogAppendResult(info)) } catch { // NOTE: Failed produce requests metric is not incremented for known exceptions // it is supposed to indicate un-expected failures of a broker in handling a produce request case e: KafkaStorageException => fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) Runtime.getRuntime.halt(1) - (topicAndPartition, null) + (topicPartition, null) case utpe: UnknownTopicOrPartitionException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) + (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) case nle: NotLeaderForPartitionException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle))) + (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle))) case mtle: RecordTooLargeException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle))) + (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle))) case mstle: RecordBatchTooLargeException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle))) + (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle))) case imse: CorruptRecordException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse))) + (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse))) case t: Throwable => - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() + BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() - error("Error processing append operation on partition %s".format(topicAndPartition), t) - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t))) + error("Error processing append operation on partition %s".format(topicPartition), t) + (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t))) } } } diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index 0f702a0f24c55..7e6e7656be6c0 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -17,16 +17,16 @@ package kafka.coordinator - import org.junit.Assert._ -import kafka.api.ProducerResponseStatus import kafka.common.{OffsetAndMetadata, TopicAndPartition} import kafka.message.MessageSet import kafka.server.{ReplicaManager, KafkaConfig} import kafka.utils._ +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest} +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.easymock.{Capture, IAnswer, EasyMock} import org.junit.{After, Before, Test} import org.scalatest.junit.JUnitSuite @@ -824,16 +824,16 @@ class GroupCoordinatorResponseTest extends JUnitSuite { assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams] = { val (responseFuture, responseCallback) = setupSyncGroupCallback - val capturedArgument: Capture[Map[TopicAndPartition, ProducerResponseStatus] => Unit] = EasyMock.newCapture() + val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(), EasyMock.anyShort(), EasyMock.anyBoolean(), - EasyMock.anyObject().asInstanceOf[Map[TopicAndPartition, MessageSet]], + EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( - Map(TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) -> - new ProducerResponseStatus(Errors.NONE.code, 0L) + Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) -> + new PartitionResponse(Errors.NONE.code, 0L) ) )}) EasyMock.replay(replicaManager) @@ -900,16 +900,16 @@ class GroupCoordinatorResponseTest extends JUnitSuite { offsets: immutable.Map[TopicAndPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = { val (responseFuture, responseCallback) = setupCommitOffsetsCallback - val capturedArgument: Capture[Map[TopicAndPartition, ProducerResponseStatus] => Unit] = EasyMock.newCapture() + val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(), EasyMock.anyShort(), EasyMock.anyBoolean(), - EasyMock.anyObject().asInstanceOf[Map[TopicAndPartition, MessageSet]], + EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( - Map(TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) -> - new ProducerResponseStatus(Errors.NONE.code, 0L) + Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) -> + new PartitionResponse(Errors.NONE.code, 0L) ) )}) EasyMock.replay(replicaManager) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 01f198e212b47..b4ba027747e5c 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -21,24 +21,26 @@ package kafka.network; import java.net._ import javax.net.ssl._ import java.io._ +import java.util.HashMap +import java.util.Random +import java.nio.ByteBuffer import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.NetworkSend -import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader} import org.apache.kafka.common.utils.SystemTime -import org.junit.Assert._ -import org.junit._ -import org.scalatest.junit.JUnitSuite -import java.util.Random + import kafka.producer.SyncProducerConfig -import kafka.api.ProducerRequest -import java.nio.ByteBuffer -import kafka.common.TopicAndPartition -import kafka.message.ByteBufferMessageSet import kafka.server.KafkaConfig import kafka.utils.TestUtils +import org.junit.Assert._ +import org.junit._ +import org.scalatest.junit.JUnitSuite + import scala.collection.Map class SocketServerTest extends JUnitSuite { @@ -56,10 +58,15 @@ class SocketServerTest extends JUnitSuite { val server = new SocketServer(config, metrics, new SystemTime) server.startup() - def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { + def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None) { val outgoing = new DataOutputStream(socket.getOutputStream) - outgoing.writeInt(request.length + 2) - outgoing.writeShort(id) + id match { + case Some(id) => + outgoing.writeInt(request.length + 2) + outgoing.writeShort(id) + case None => + outgoing.writeInt(request.length) + } outgoing.write(request) outgoing.flush() } @@ -75,9 +82,11 @@ class SocketServerTest extends JUnitSuite { /* A simple request handler that just echos back the response */ def processRequest(channel: RequestChannel) { val request = channel.receiveRequest - val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes) - request.requestObj.writeTo(byteBuffer) + val byteBuffer = ByteBuffer.allocate(request.header.sizeOf + request.body.sizeOf) + request.header.writeTo(byteBuffer) + request.body.writeTo(byteBuffer) byteBuffer.rewind() + val send = new NetworkSend(request.connectionId, byteBuffer) channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } @@ -92,14 +101,17 @@ class SocketServerTest extends JUnitSuite { } private def producerRequestBytes: Array[Byte] = { + val apiKey: Short = 0 val correlationId = -1 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks - val emptyRequest = - new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) - val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) + val emptyHeader = new RequestHeader(apiKey, clientId, correlationId) + val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]()) + + val byteBuffer = ByteBuffer.allocate(emptyHeader.sizeOf + emptyRequest.sizeOf) + emptyHeader.writeTo(byteBuffer) emptyRequest.writeTo(byteBuffer) byteBuffer.rewind() val serializedBytes = new Array[Byte](byteBuffer.remaining) @@ -114,12 +126,12 @@ class SocketServerTest extends JUnitSuite { val serializedBytes = producerRequestBytes // Test PLAINTEXT socket - sendRequest(plainSocket, 0, serializedBytes) + sendRequest(plainSocket, serializedBytes) processRequest(server.requestChannel) assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq) // Test TRACE socket - sendRequest(traceSocket, 0, serializedBytes) + sendRequest(traceSocket, serializedBytes) processRequest(server.requestChannel) assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq) } @@ -129,7 +141,7 @@ class SocketServerTest extends JUnitSuite { val tooManyBytes = new Array[Byte](server.config.socketRequestMaxBytes + 1) new Random().nextBytes(tooManyBytes) val socket = connect() - sendRequest(socket, 0, tooManyBytes) + sendRequest(socket, tooManyBytes, Some(0)) try { receiveResponse(socket) } catch { @@ -144,8 +156,8 @@ class SocketServerTest extends JUnitSuite { val traceSocket = connect(protocol = SecurityProtocol.TRACE) val bytes = new Array[Byte](40) // send a request first to make sure the connection has been picked up by the socket server - sendRequest(plainSocket, 0, bytes) - sendRequest(traceSocket, 0, bytes) + sendRequest(plainSocket, bytes, Some(0)) + sendRequest(traceSocket, bytes, Some(0)) processRequest(server.requestChannel) // make sure the sockets are open @@ -157,14 +169,14 @@ class SocketServerTest extends JUnitSuite { // doing a subsequent send should throw an exception as the connection should be closed. // send a large chunk of bytes to trigger a socket flush try { - sendRequest(plainSocket, 0, largeChunkOfBytes) + sendRequest(plainSocket, largeChunkOfBytes, Some(0)) fail("expected exception when writing to closed plain socket") } catch { case e: IOException => // expected } try { - sendRequest(traceSocket, 0, largeChunkOfBytes) + sendRequest(traceSocket, largeChunkOfBytes, Some(0)) fail("expected exception when writing to closed trace socket") } catch { case e: IOException => // expected @@ -188,7 +200,7 @@ class SocketServerTest extends JUnitSuite { "Failed to decrement connection count after close") val conn2 = connect() val serializedBytes = producerRequestBytes - sendRequest(conn2, 0, serializedBytes) + sendRequest(conn2, serializedBytes) val request = server.requestChannel.receiveRequest(2000) assertNotNull(request) conn2.close() @@ -235,20 +247,22 @@ class SocketServerTest extends JUnitSuite { val sslSocket = socketFactory.createSocket("localhost", overrideServer.boundPort(SecurityProtocol.SSL)).asInstanceOf[SSLSocket] sslSocket.setNeedClientAuth(false) + val apiKey = ApiKeys.PRODUCE.id val correlationId = -1 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks - val emptyRequest = - new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) + val emptyHeader = new RequestHeader(apiKey, clientId, correlationId) + val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]()) - val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) + val byteBuffer = ByteBuffer.allocate(emptyHeader.sizeOf() + emptyRequest.sizeOf()) + emptyHeader.writeTo(byteBuffer) emptyRequest.writeTo(byteBuffer) byteBuffer.rewind() val serializedBytes = new Array[Byte](byteBuffer.remaining) byteBuffer.get(serializedBytes) - sendRequest(sslSocket, 0, serializedBytes) + sendRequest(sslSocket, serializedBytes) processRequest(overrideServer.requestChannel) assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq) sslSocket.close() @@ -262,7 +276,7 @@ class SocketServerTest extends JUnitSuite { def testSessionPrincipal(): Unit = { val socket = connect() val bytes = new Array[Byte](40) - sendRequest(socket, 0, bytes) + sendRequest(socket, bytes, Some(0)) assertEquals(KafkaPrincipal.ANONYMOUS, server.requestChannel.receiveRequest().session.principal) socket.close() } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 724d4aca4d4e3..32085f6ae0d7e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -17,21 +17,26 @@ package kafka.server -import kafka.api.{ProducerResponseStatus, SerializationTestUtils, ProducerRequest} -import kafka.common.TopicAndPartition + +import kafka.api.SerializationTestUtils +import kafka.message.{Message, ByteBufferMessageSet} import kafka.utils.{ZkUtils, MockScheduler, MockTime, TestUtils} +import org.apache.kafka.common.requests.ProduceRequest import java.util.concurrent.atomic.AtomicBoolean import java.io.File import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{MockTime => JMockTime} import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient import org.junit.Test import scala.collection.Map +import scala.collection.JavaConverters._ class ReplicaManagerTest { @@ -97,11 +102,15 @@ class ReplicaManagerTest { val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), Option(this.getClass.getName)) try { - val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) - def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = { - assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) + def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = { + assert(responseStatus.values.head.errorCode == Errors.INVALID_REQUIRED_ACKS.code) } - rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback) + rm.appendMessages( + timeout = 0, + requiredAcks = 3, + internalTopicsAllowed = false, + messagesPerPartition = Map(new TopicPartition("test1", 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))), + responseCallback = callback) } finally { rm.shutdown(false) metrics.close()