-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8305: support default partitions & replication factor in AdminClient#createTopic #6728
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking forward to this!
clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I had a quick look and left some comments.
clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
Outdated
Show resolved
Hide resolved
val replicationFactor: java.lang.Short = | ||
if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else topic.replicationFactor | ||
if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else resolvedReplicationFactor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation for CreateTopicPolicy
says:
@param numPartitions the number of partitions to create or null if replicasAssignments is set.
@param replicationFactor the replication factor for the topic or null if replicaAssignments is set.
It seems like we may end up with a non null numPartitions
and replicationFactor
even if replicaAssignment
is set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the scenario that causes this? If replicaAssignments
is set, then we expect the partitions/replication factor to equal NO_PARTITIONS
and NO_REPLICATION_FACTOR
respectively, otherwise it fails here:
if ((topic.numPartitions != NO_NUM_PARTITIONS || topic.replicationFactor != NO_REPLICATION_FACTOR)
&& !topic.assignments().isEmpty) {
throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
"Both cannot be used at the same time.")
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. But then why do we need this change at all? It seems like we never end up with a different result than the previous code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference is that if the value sent was -1
(i.e. NO_REPLICATION_FACTOR
) and there are no assignments, it will instead be set the resolvedReplicationFactor
, which is the cluster default (i.e. default.replication.factor
)
(you can see tests for the changed behavior)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. I think Ismael's point is that if topic.replicationFactor != NO_REPLICATION_FACTOR
, then resolvedReplicationFactor == topic.replicationFactor
, and this is equivalent to what we already had. It seems like this is what we're trying to do:
val replicationFactor: java.lang.Short =
if (topic.assignments().nonEmpty) null else resolvedReplicationFactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦♂ I see what you're saying now. I think @hachikuji's code snippet is the easiest to read so I'll change it to that. @ijuma my original line of thought was that it's easier to reason about the code if we only ever use resolvedX
everywhere, hence the motivation for this change. That way I would never accidentally use the value of -1
in business logic code and it's easy to confirm by just looking at usages of topic.numPartitions
and topic.replicationFactor
.
Pinging +1'ers @colinhicks @omkreddy @gwenshap @rhauch 😄 thanks in advance! (cc @ijuma) |
@agavra I may be missing something, but if we are not bumping the |
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
Outdated
Show resolved
Hide resolved
@hachikuji - thanks for the review!
You are not missing something, this is something that I overlooked (not all errors are the same)! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. Left a few comments.
val replicationFactor: java.lang.Short = | ||
if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else topic.replicationFactor | ||
if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else resolvedReplicationFactor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. I think Ismael's point is that if topic.replicationFactor != NO_REPLICATION_FACTOR
, then resolvedReplicationFactor == topic.replicationFactor
, and this is equivalent to what we already had. It seems like this is what we're trying to do:
val replicationFactor: java.lang.Short =
if (topic.assignments().nonEmpty) null else resolvedReplicationFactor
@@ -289,7 +294,7 @@ object TopicCommand extends Logging { | |||
if (topic.hasReplicaAssignment) | |||
adminZkClient.createTopicWithAssignment(topic.name, topic.configsToAdd, topic.replicaAssignment.get) | |||
else | |||
adminZkClient.createTopic(topic.name, topic.partitions.get, topic.replicationFactor, topic.configsToAdd, topic.rackAwareMode) | |||
adminZkClient.createTopic(topic.name, topic.partitions.get, topic.replicationFactor.get, topic.configsToAdd, topic.rackAwareMode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error may be a bit obscure if no --partitions
or --replication-factor
option was provided. Could we detect this case in the argument checks we have below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure if it was worth adding bloat for the deprecated code, but I'm happy adding it back (I had it then removed it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks good. Just a few more small comments.
clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one more comment, but LGTM overall.
createAndWaitTopic(new TopicCommandOptions( | ||
Array("--topic", testTopicName))) | ||
|
||
adminClient.listTopics().names().get().contains(testTopicName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add default assertions here. Maybe we can just use describeTopic
like the other tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops i missed this comment! Addressing that now.
"retest this please" |
…lient#createTopic. This commit makes three changes: - Adds a constructor for NewTopic(String, Optional<Integer>, Optional<Short>) which allows users to specify Optional.empty() for numPartitions or replicationFactor in order to use the broker default. - Changes AdminManager to accept -1 as valid options for replication factor and numPartitions (resolving to broker defaults). - Adds a dependency on scalaJava8Compat library to make it simpler to convert Scala Option to Java Optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM. Merging to trunk!
…lient#createTopic (KIP-464) (apache#6728) This commit makes three changes: - Adds a constructor for NewTopic(String, Optional<Integer>, Optional<Short>) which allows users to specify Optional.empty() for numPartitions or replicationFactor in order to use the broker default. - Changes AdminManager to accept -1 as valid options for replication factor and numPartitions (resolving to broker defaults). - Makes --partitions and --replication-factor optional arguments when creating topics using kafka-topics.sh. - Adds a dependency on scalaJava8Compat library to make it simpler to convert Scala Option to Java Optional Reviewers: Ismael Juma <[email protected]>, Ryanne Dolan <[email protected]>, Jason Gustafson <[email protected]>
[KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: #6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via #10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <[email protected]>, Ismael Juma <[email protected]>
[KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: #6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via #10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <[email protected]>, Ismael Juma <[email protected]>
[KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: #6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via #10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <[email protected]>, Ismael Juma <[email protected]>
[KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: apache/kafka#6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via apache/kafka#10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <[email protected]>, Ismael Juma <[email protected]>
…he#11429) [KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: apache#6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via apache#10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <[email protected]>, Ismael Juma <[email protected]>
…he#11429) [KIP-464](https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic) (PR: apache#6728) made it possible to create topics without passing partition count and/or replica factor when using the admin client. We incorrectly disallowed this via apache#10457 while trying to ensure validation was consistent between ZK and the admin client (in this case the inconsistency was intentional). Fix this regression and add tests for the command lines in quick start (i.e. create topic and describe topic) to make sure it won't be broken in the future. Reviewers: Lee Dongjin <[email protected]>, Ismael Juma <[email protected]>
See: KIP-464 for more information.
Description
This change makes the two required changes to support creating topics using the cluster defaults for replication and partitions:
NewTopic(String)
constructor to theNewTopic
APIAdminManager
to accept-1
as valid options for replication factor and partitions. If this is the case, it will resolve it using the default configuration.Option
toOptional
.Testing
Committer Checklist (excluded from commit message)