diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala index 1ae98f1e03805..bc91961b34666 100644 --- a/core/src/main/scala/kafka/server/ForwardingManager.scala +++ b/core/src/main/scala/kafka/server/ForwardingManager.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.{LogContext, Time} import scala.compat.java8.OptionConverters._ import scala.concurrent.TimeoutException -class ForwardingManager(channelManager: BrokerToControllerChannelManager, +class ForwardingManager(val channelManager: BrokerToControllerChannelManager, time: Time, retryTimeoutMs: Long, logContext: LogContext) extends Logging { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b679e0fe3bf64..c4a8d7ddb3584 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -21,7 +21,7 @@ import java.lang.{Long => JLong} import java.nio.ByteBuffer import java.util import java.util.{Collections, Optional, Properties} -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} import java.util.concurrent.atomic.AtomicInteger import kafka.admin.{AdminUtils, RackAwareMode} @@ -47,9 +47,9 @@ import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.{FatalExitError, Topic} import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} -import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic +import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection, CreateableTopicConfig, CreateableTopicConfigCollection} import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult -import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetResponseData, ListPartitionReassignmentsResponseData, MetadataResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} +import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsRequestData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetResponseData, ListPartitionReassignmentsResponseData, MetadataResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, OffsetForLeaderEpochResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection} import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse} @@ -62,7 +62,6 @@ import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition import org.apache.kafka.common.message.ListOffsetResponseData.{ListOffsetPartitionResponse, ListOffsetTopicResponse} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic -import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, Send} @@ -91,6 +90,7 @@ import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.util.{Failure, Success, Try} import kafka.coordinator.group.GroupOverview import kafka.server.metadata.ConfigRepository +import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource @@ -1189,12 +1189,34 @@ class KafkaApis(val requestChannel: RequestChannel, properties: util.Properties = new util.Properties()): MetadataResponseTopic = { try { if (adminZkClient == null) { - throw new UnsupportedOperationException("Auto topic creation is not yet supported in KIP-500 mode") + if (replicationFactor < 1 || replicationFactor > Short.MaxValue) { + throw new IllegalArgumentException(s"Illegal replication factor: $replicationFactor") + } + val topics = new CreatableTopicCollection() + val configs = new CreateableTopicConfigCollection() + properties.forEach { (k, v) => configs.add(new CreateableTopicConfig().setName(k.toString).setValue(v.toString)) } + topics.add(new CreatableTopic().setName(topic).setNumPartitions(numPartitions).setReplicationFactor(replicationFactor.toShort).setConfigs(configs)) + // send the request and block waiting for it to complete + val topicCreateFuture = new CompletableFuture[ClientResponse]() + forwardingManager.channelManager.sendRequest( + new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(topics)), + (clientResponse: ClientResponse) => {topicCreateFuture.complete(clientResponse)}) + val clientResponse = topicCreateFuture.get() + val createTopicResponse = clientResponse.responseBody().asInstanceOf[CreateTopicsResponse] + val topicResult = createTopicResponse.data().topics().find(topic) + val errorCode = topicResult.errorCode() + val errorToReturn = if (errorCode == Errors.NONE.code() || errorCode == Errors.TOPIC_ALREADY_EXISTS.code()) { + Errors.LEADER_NOT_AVAILABLE // this is what we return upon either our or somebody else's success + } else { + Errors.forCode(errorCode) // failure for some reason; return that back + } + metadataResponseTopic(errorToReturn, topic, isInternal(topic), util.Collections.emptyList()) + } else { + adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful" + .format(topic, numPartitions, replicationFactor)) + metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList()) } - adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful" - .format(topic, numPartitions, replicationFactor)) - metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList()) } catch { case _: TopicExistsException => // let it go, possibly another broker created this topic metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())