Skip to content

Commit

Permalink
Add support for auto topic creation in KIP-500 (apache#486)
Browse files Browse the repository at this point in the history
  • Loading branch information
rondagostino authored Jan 19, 2021
1 parent 25716aa commit e80ab26
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ForwardingManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import scala.compat.java8.OptionConverters._
import scala.concurrent.TimeoutException

class ForwardingManager(channelManager: BrokerToControllerChannelManager,
class ForwardingManager(val channelManager: BrokerToControllerChannelManager,
time: Time,
retryTimeoutMs: Long,
logContext: LogContext) extends Logging {
Expand Down
40 changes: 31 additions & 9 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.util
import java.util.{Collections, Optional, Properties}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger

import kafka.admin.{AdminUtils, RackAwareMode}
Expand All @@ -47,9 +47,9 @@ import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection, CreateableTopicConfig, CreateableTopicConfigCollection}
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetResponseData, ListPartitionReassignmentsResponseData, MetadataResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsRequestData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetResponseData, ListPartitionReassignmentsResponseData, MetadataResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, OffsetForLeaderEpochResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
Expand All @@ -62,7 +62,6 @@ import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition
import org.apache.kafka.common.message.ListOffsetResponseData.{ListOffsetPartitionResponse, ListOffsetTopicResponse}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
Expand Down Expand Up @@ -91,6 +90,7 @@ import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.util.{Failure, Success, Try}
import kafka.coordinator.group.GroupOverview
import kafka.server.metadata.ConfigRepository
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource

Expand Down Expand Up @@ -1189,12 +1189,34 @@ class KafkaApis(val requestChannel: RequestChannel,
properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
try {
if (adminZkClient == null) {
throw new UnsupportedOperationException("Auto topic creation is not yet supported in KIP-500 mode")
if (replicationFactor < 1 || replicationFactor > Short.MaxValue) {
throw new IllegalArgumentException(s"Illegal replication factor: $replicationFactor")
}
val topics = new CreatableTopicCollection()
val configs = new CreateableTopicConfigCollection()
properties.forEach { (k, v) => configs.add(new CreateableTopicConfig().setName(k.toString).setValue(v.toString)) }
topics.add(new CreatableTopic().setName(topic).setNumPartitions(numPartitions).setReplicationFactor(replicationFactor.toShort).setConfigs(configs))
// send the request and block waiting for it to complete
val topicCreateFuture = new CompletableFuture[ClientResponse]()
forwardingManager.channelManager.sendRequest(
new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(topics)),
(clientResponse: ClientResponse) => {topicCreateFuture.complete(clientResponse)})
val clientResponse = topicCreateFuture.get()
val createTopicResponse = clientResponse.responseBody().asInstanceOf[CreateTopicsResponse]
val topicResult = createTopicResponse.data().topics().find(topic)
val errorCode = topicResult.errorCode()
val errorToReturn = if (errorCode == Errors.NONE.code() || errorCode == Errors.TOPIC_ALREADY_EXISTS.code()) {
Errors.LEADER_NOT_AVAILABLE // this is what we return upon either our or somebody else's success
} else {
Errors.forCode(errorCode) // failure for some reason; return that back
}
metadataResponseTopic(errorToReturn, topic, isInternal(topic), util.Collections.emptyList())
} else {
adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
}
adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
} catch {
case _: TopicExistsException => // let it go, possibly another broker created this topic
metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
Expand Down

0 comments on commit e80ab26

Please sign in to comment.