Skip to content

Commit

Permalink
KAFKA-8305; Support default partitions & replication factor in AdminC…
Browse files Browse the repository at this point in the history
…lient#createTopic (KIP-464) (#6728)

This commit makes three changes:
- Adds a constructor for NewTopic(String, Optional<Integer>, Optional<Short>)
which allows users to specify Optional.empty() for numPartitions or
replicationFactor in order to use the broker default.
- Changes AdminManager to accept -1 as valid options for replication
factor and numPartitions (resolving to broker defaults).
- Makes --partitions and --replication-factor optional arguments when creating
topics using kafka-topics.sh.
- Adds a dependency on scalaJava8Compat library to make it simpler to
convert Scala Option to Java Optional

Reviewers: Ismael Juma <[email protected]>, Ryanne Dolan <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
agavra authored and Jason Gustafson committed Jun 5, 2019
1 parent b6d9e15 commit 8e16158
Show file tree
Hide file tree
Showing 18 changed files with 248 additions and 78 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,8 @@ project(':clients') {
compile libs.lz4
compile libs.snappy
compile libs.slf4jApi
compile libs.scalaJava8Compat

compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
compileOnly libs.jacksonJDK8Datatypes

Expand Down
34 changes: 24 additions & 10 deletions clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.clients.admin;

import java.util.Optional;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
Expand All @@ -31,16 +32,29 @@
* A new topic to be created via {@link AdminClient#createTopics(Collection)}.
*/
public class NewTopic {

private static final int NO_PARTITIONS = -1;
private static final short NO_REPLICATION_FACTOR = -1;

private final String name;
private final int numPartitions;
private final short replicationFactor;
private final Optional<Integer> numPartitions;
private final Optional<Short> replicationFactor;
private final Map<Integer, List<Integer>> replicasAssignments;
private Map<String, String> configs = null;

/**
* A new topic with the specified replication factor and number of partitions.
*/
public NewTopic(String name, int numPartitions, short replicationFactor) {
this(name, Optional.of(numPartitions), Optional.of(replicationFactor));
}

/**
* A new topic that optionally defaults {@code numPartitions} and {@code replicationFactor} to
* the broker configurations for {@code num.partitions} and {@code default.replication.factor}
* respectively.
*/
public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
this.name = name;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
Expand All @@ -56,8 +70,8 @@ public NewTopic(String name, int numPartitions, short replicationFactor) {
*/
public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments) {
this.name = name;
this.numPartitions = -1;
this.replicationFactor = -1;
this.numPartitions = Optional.empty();
this.replicationFactor = Optional.empty();
this.replicasAssignments = Collections.unmodifiableMap(replicasAssignments);
}

Expand All @@ -72,14 +86,14 @@ public String name() {
* The number of partitions for the new topic or -1 if a replica assignment has been specified.
*/
public int numPartitions() {
return numPartitions;
return numPartitions.orElse(NO_PARTITIONS);
}

/**
* The replication factor for the new topic or -1 if a replica assignment has been specified.
*/
public short replicationFactor() {
return replicationFactor;
return replicationFactor.orElse(NO_REPLICATION_FACTOR);
}

/**
Expand Down Expand Up @@ -111,8 +125,8 @@ public Map<String, String> configs() {
CreatableTopic convertToCreatableTopic() {
CreatableTopic creatableTopic = new CreatableTopic().
setName(name).
setNumPartitions(numPartitions).
setReplicationFactor(replicationFactor);
setNumPartitions(numPartitions.orElse(NO_PARTITIONS)).
setReplicationFactor(replicationFactor.orElse(NO_REPLICATION_FACTOR));
if (replicasAssignments != null) {
for (Entry<Integer, List<Integer>> entry : replicasAssignments.entrySet()) {
creatableTopic.assignments().add(
Expand All @@ -136,8 +150,8 @@ CreatableTopic convertToCreatableTopic() {
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(name=").append(name).
append(", numPartitions=").append(numPartitions).
append(", replicationFactor=").append(replicationFactor).
append(", numPartitions=").append(numPartitions.map(String::valueOf).orElse("default")).
append(", replicationFactor=").append(replicationFactor.map(String::valueOf).orElse("default")).
append(", replicasAssignments=").append(replicasAssignments).
append(", configs=").append(configs).
append(")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.common.requests;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
Expand All @@ -24,8 +27,6 @@
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;

import java.nio.ByteBuffer;

public class CreateTopicsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<CreateTopicsRequest> {
private final CreateTopicsRequestData data;
Expand All @@ -40,6 +41,23 @@ public CreateTopicsRequest build(short version) {
if (data.validateOnly() && version == 0)
throw new UnsupportedVersionException("validateOnly is not supported in version 0 of " +
"CreateTopicsRequest");

final List<String> topicsWithDefaults = data.topics()
.stream()
.filter(topic -> topic.assignments().isEmpty())
.filter(topic ->
topic.numPartitions() == CreateTopicsRequest.NO_NUM_PARTITIONS
|| topic.replicationFactor() == CreateTopicsRequest.NO_REPLICATION_FACTOR)
.map(CreatableTopic::name)
.collect(Collectors.toList());

if (!topicsWithDefaults.isEmpty() && version < 4) {
throw new UnsupportedVersionException("Creating topics with default "
+ "partitions/replication factor are only supported in CreateTopicRequest "
+ "version 4+. The following topics need values for partitions and replicas: "
+ topicsWithDefaults);
}

return new CreateTopicsRequest(data, version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
"type": "request",
"name": "CreateTopicsRequest",
// Version 1 adds validateOnly.
"validVersions": "0-3",
// Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464)
"validVersions": "0-4",
"fields": [
{ "name": "Topics", "type": "[]CreatableTopic", "versions": "0+",
"about": "The topics to create.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." },
{ "name": "NumPartitions", "type": "int32", "versions": "0+",
"about": "The number of partitions to create in the topic, or -1 if we are specifying a manual partition assignment." },
"about": "The number of partitions to create in the topic, or -1 if we are either specifying a manual partition assignment or using the default partitions." },
{ "name": "ReplicationFactor", "type": "int16", "versions": "0+",
"about": "The number of replicas to create for each partition in the topic, or -1 if we are specifying a manual partition assignment." },
"about": "The number of replicas to create for each partition in the topic, or -1 if we are either specifying a manual partition assignment or using the default replication factor." },
{ "name": "Assignments", "type": "[]CreatableReplicaAssignment", "versions": "0+",
"about": "The manual partition assignment, or the empty array if we are using automatic assignment.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
// Version 1 adds a per-topic error message string.
// Version 2 adds the throttle time.
// Starting in version 3, on quota violation, brokers send out responses before throttling.
"validVersions": "0-3",
// Version 4 makes partitions/replicationFactor optional even when assignments are not present (KIP-464)
"validVersions": "0-4",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
import org.apache.kafka.common.requests.CreateTopicsRequest.Builder;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
Expand Down Expand Up @@ -120,6 +121,7 @@
import static org.apache.kafka.test.TestUtils.toBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -644,6 +646,28 @@ public void testCreateTopicRequestV0FailsIfValidateOnly() {
createCreateTopicRequest(0, true);
}

@Test
public void testCreateTopicRequestV3FailsIfNoPartitionsOrReplicas() {
final UnsupportedVersionException exception = assertThrows(
UnsupportedVersionException.class, () -> {
CreateTopicsRequestData data = new CreateTopicsRequestData()
.setTimeoutMs(123)
.setValidateOnly(false);
data.topics().add(new CreatableTopic().
setName("foo").
setNumPartitions(CreateTopicsRequest.NO_NUM_PARTITIONS).
setReplicationFactor((short) 1));
data.topics().add(new CreatableTopic().
setName("bar").
setNumPartitions(1).
setReplicationFactor(CreateTopicsRequest.NO_REPLICATION_FACTOR));

new Builder(data).build((short) 3);
});
assertTrue(exception.getMessage().contains("supported in CreateTopicRequest version 4+"));
assertTrue(exception.getMessage().contains("[foo, bar]"));
}

@Test
public void testFetchRequestMaxBytesOldVersions() throws Exception {
final short version = 1;
Expand Down
28 changes: 18 additions & 10 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{ListTopicsOptions, NewPartitions, NewTopic, AdminClient => JAdminClient}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.security.JaasUtils
Expand All @@ -40,6 +40,7 @@ import org.apache.zookeeper.KeeperException.NodeExistsException

import scala.collection.JavaConverters._
import scala.collection._
import scala.compat.java8.OptionConverters._
import scala.io.StdIn

object TopicCommand extends Logging {
Expand Down Expand Up @@ -82,7 +83,7 @@ object TopicCommand extends Logging {
class CommandTopicPartition(opts: TopicCommandOptions) {
val name: String = opts.topic.get
val partitions: Option[Integer] = opts.partitions
val replicationFactor: Integer = opts.replicationFactor.getOrElse(-1)
val replicationFactor: Option[Integer] = opts.replicationFactor
val replicaAssignment: Option[Map[Int, List[Int]]] = opts.replicaAssignment
val configsToAdd: Properties = parseTopicConfigsToBeAdded(opts)
val configsToDelete: Seq[String] = parseTopicConfigsToBeDeleted(opts)
Expand Down Expand Up @@ -172,14 +173,21 @@ object TopicCommand extends Logging {
case class AdminClientTopicService private (adminClient: JAdminClient) extends TopicService {

override def createTopic(topic: CommandTopicPartition): Unit = {
if (topic.replicationFactor > Short.MaxValue)
throw new IllegalArgumentException(s"The replication factor's maximum value must be smaller or equal to ${Short.MaxValue}")
if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")
if (topic.partitions.exists(partitions => partitions < 1))
throw new IllegalArgumentException(s"The partitions must be greater than 0")

if (!adminClient.listTopics().names().get().contains(topic.name)) {
val newTopic = if (topic.hasReplicaAssignment)
new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
else
new NewTopic(topic.name, topic.partitions.get, topic.replicationFactor.shortValue())
else {
new NewTopic(
topic.name,
topic.partitions.asJava,
topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
}

val configsMap = topic.configsToAdd.stringPropertyNames()
.asScala
.map(name => name -> topic.configsToAdd.getProperty(name))
Expand Down Expand Up @@ -289,7 +297,7 @@ object TopicCommand extends Logging {
if (topic.hasReplicaAssignment)
adminZkClient.createTopicWithAssignment(topic.name, topic.configsToAdd, topic.replicaAssignment.get)
else
adminZkClient.createTopic(topic.name, topic.partitions.get, topic.replicationFactor, topic.configsToAdd, topic.rackAwareMode)
adminZkClient.createTopic(topic.name, topic.partitions.get, topic.replicationFactor.get, topic.configsToAdd, topic.rackAwareMode)
println(s"Created topic ${topic.name}.")
} catch {
case e: TopicExistsException => if (!topic.ifTopicDoesntExist()) throw e
Expand Down Expand Up @@ -538,11 +546,11 @@ object TopicCommand extends Logging {
.describedAs("name")
.ofType(classOf[String])
private val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default.")
.withRequiredArg
.describedAs("# of partitions")
.ofType(classOf[java.lang.Integer])
private val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.")
private val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.")
.withRequiredArg
.describedAs("replication factor")
.ofType(classOf[java.lang.Integer])
Expand Down Expand Up @@ -633,7 +641,7 @@ object TopicCommand extends Logging {
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
if (!has(listOpt) && !has(describeOpt))
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
if (has(createOpt) && !has(replicaAssignmentOpt))
if (has(createOpt) && !has(replicaAssignmentOpt) && has(zkConnectOpt))
CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt, replicationFactorOpt)
if (has(bootstrapServerOpt) && has(alterOpt)) {
CommandLineUtils.checkInvalidArgsSet(parser, options, Set(bootstrapServerOpt, configOpt), Set(alterOpt))
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/scala/kafka/server/AdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class AdminManager(val config: KafkaConfig,

def hasDelayedTopicOperations = topicPurgatory.numDelayed != 0

private val defaultNumPartitions = config.numPartitions.intValue()
private val defaultReplicationFactor = config.defaultReplicationFactor.shortValue()

/**
* Try to complete delayed topic operations with the request key
*/
Expand Down Expand Up @@ -95,8 +98,15 @@ class AdminManager(val config: KafkaConfig,
throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
"Both cannot be used at the same time.")
}

val resolvedNumPartitions = if (topic.numPartitions == NO_NUM_PARTITIONS)
defaultNumPartitions else topic.numPartitions
val resolvedReplicationFactor = if (topic.replicationFactor == NO_REPLICATION_FACTOR)
defaultReplicationFactor else topic.replicationFactor

val assignments = if (topic.assignments().isEmpty) {
AdminUtils.assignReplicasToBrokers(brokers, topic.numPartitions, topic.replicationFactor)
AdminUtils.assignReplicasToBrokers(
brokers, resolvedNumPartitions, resolvedReplicationFactor)
} else {
val assignments = new mutable.HashMap[Int, Seq[Int]]
// Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
Expand All @@ -115,9 +125,9 @@ class AdminManager(val config: KafkaConfig,

// Use `null` for unset fields in the public API
val numPartitions: java.lang.Integer =
if (topic.numPartitions == NO_NUM_PARTITIONS) null else topic.numPartitions
if (topic.assignments().isEmpty) resolvedNumPartitions else null
val replicationFactor: java.lang.Short =
if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else topic.replicationFactor
if (topic.assignments().isEmpty) resolvedReplicationFactor else null
val javaAssignments = if (topic.assignments().isEmpty) {
null
} else {
Expand Down
Loading

0 comments on commit 8e16158

Please sign in to comment.