diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java deleted file mode 100644 index 7ca19beea8c1..000000000000 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java +++ /dev/null @@ -1,1164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.tools; - -import kafka.admin.RackAwareTest; -import kafka.server.ControllerServer; -import kafka.server.KafkaBroker; -import kafka.server.KafkaConfig; -import kafka.utils.Logging; -import kafka.utils.TestUtils; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClientTestUtils; -import org.apache.kafka.clients.admin.Config; -import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult; -import org.apache.kafka.clients.admin.NewPartitionReassignment; -import org.apache.kafka.clients.admin.PartitionReassignment; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.TopicPartitionInfo; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.errors.ClusterAuthorizationException; -import org.apache.kafka.common.errors.TopicExistsException; -import org.apache.kafka.common.internals.Topic; -import org.apache.kafka.common.message.MetadataResponseData; -import org.apache.kafka.common.network.ListenerName; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.MetadataResponse; -import org.apache.kafka.common.security.auth.SecurityProtocol; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import scala.collection.JavaConverters; -import scala.collection.Seq; -import scala.collection.mutable.Buffer; - -import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - -@Tag("integration") -@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters -public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest { - private final short defaultReplicationFactor = 1; - private final int defaultNumPartitions = 1; - private TopicCommand.TopicService topicService; - private Admin adminClient; - private String bootstrapServer; - private String testTopicName; - private Buffer scalaBrokers; - private Seq scalaControllers; - - /** - * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every - * test and should not reuse previous configurations unless they select their ports randomly when servers are started. - * - * Note the replica fetch max bytes is set to `1` in order to throttle the rate of replication for test - * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`. - */ - @Override - public scala.collection.Seq generateConfigs() { - Map rackInfo = new HashMap<>(); - rackInfo.put(0, "rack1"); - rackInfo.put(1, "rack2"); - rackInfo.put(2, "rack2"); - rackInfo.put(3, "rack1"); - rackInfo.put(4, "rack3"); - rackInfo.put(5, "rack3"); - - List brokerConfigs = ToolsTestUtils - .createBrokerProperties(6, zkConnectOrNull(), rackInfo, defaultNumPartitions, defaultReplicationFactor); - - List configs = new ArrayList<>(); - for (Properties props : brokerConfigs) { - props.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); - configs.add(KafkaConfig.fromProps(props)); - } - return JavaConverters.asScalaBuffer(configs).toSeq(); - } - - private TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap(String... opts) { - String[] finalOptions = Stream.concat(Arrays.stream(opts), - Stream.of("--bootstrap-server", bootstrapServer) - ).toArray(String[]::new); - return new TopicCommand.TopicCommandOptions(finalOptions); - } - - @BeforeEach - public void setUp(TestInfo info) { - super.setUp(info); - // create adminClient - Properties props = new Properties(); - bootstrapServer = bootstrapServers(listenerName()); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - adminClient = Admin.create(props); - topicService = new TopicCommand.TopicService(props, Optional.of(bootstrapServer)); - testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), org.apache.kafka.test.TestUtils.randomString(10)); - scalaBrokers = brokers(); - scalaControllers = controllerServers(); - } - - @AfterEach - public void close() throws Exception { - if (topicService != null) - topicService.close(); - if (adminClient != null) - adminClient.close(); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreate(String quorum) throws Exception { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, 1, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - assertTrue(adminClient.listTopics().names().get().contains(testTopicName), - "Admin client didn't see the created topic. It saw: " + adminClient.listTopics().names().get()); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithDefaults(String quorum) throws Exception { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - List partitions = adminClient - .describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames() - .get() - .get(testTopicName) - .partitions(); - assertEquals(defaultNumPartitions, partitions.size(), "Unequal partition size: " + partitions.size()); - assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size()); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithDefaultReplication(String quorum) throws Exception { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - List partitions = adminClient - .describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames() - .get() - .get(testTopicName) - .partitions(); - assertEquals(2, partitions.size(), "Unequal partition size: " + partitions.size()); - assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size()); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithDefaultPartitions(String quorum) throws Exception { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, 2, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - List partitions = adminClient - .describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames() - .get() - .get(testTopicName) - .partitions(); - - assertEquals(defaultNumPartitions, partitions.size(), "Unequal partition size: " + partitions.size()); - assertEquals(2, (short) partitions.get(0).replicas().size(), "Partitions not replicated: " + partitions.get(0).replicas().size()); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithConfigs(String quorum) throws Exception { - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Properties topicConfig = new Properties(); - topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000"); - - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, 2, - scala.collection.immutable.Map$.MODULE$.empty(), topicConfig - ); - - Config configs = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); - assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value()), - "Config not set correctly: " + configs.get("delete.retention.ms").value()); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWhenAlreadyExists(String quorum) { - // create the topic - TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap( - "--create", "--partitions", Integer.toString(defaultNumPartitions), "--replication-factor", "1", - "--topic", testTopicName); - - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - // try to re-create the topic - assertThrows(TopicExistsException.class, () -> topicService.createTopic(createOpts), - "Expected TopicExistsException to throw"); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWhenAlreadyExistsWithIfNotExists(String quorum) throws Exception { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TopicCommand.TopicCommandOptions createOpts = - buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName, "--if-not-exists"); - topicService.createTopic(createOpts); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithReplicaAssignment(String quorum) throws Exception { - scala.collection.mutable.HashMap> replicaAssignmentMap = new scala.collection.mutable.HashMap<>(); - - replicaAssignmentMap.put(0, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 5, (Object) 4)).asScala().toSeq()); - replicaAssignmentMap.put(1, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 3, (Object) 2)).asScala().toSeq()); - replicaAssignmentMap.put(2, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 1, (Object) 0)).asScala().toSeq()); - - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, - defaultReplicationFactor, replicaAssignmentMap, new Properties() - ); - - List partitions = adminClient - .describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames() - .get() - .get(testTopicName) - .partitions(); - - assertEquals(3, partitions.size(), - "Unequal partition size: " + partitions.size()); - assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0), - "Unexpected replica assignment: " + getPartitionReplicas(partitions, 0)); - assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1), - "Unexpected replica assignment: " + getPartitionReplicas(partitions, 1)); - assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2), - "Unexpected replica assignment: " + getPartitionReplicas(partitions, 2)); - } - - private List getPartitionReplicas(List partitions, int partitionNumber) { - return partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList()); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithInvalidReplicationFactor(String quorum) { - TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", Integer.toString(Short.MAX_VALUE + 1), - "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithNegativeReplicationFactor(String quorum) { - TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", - "--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithNegativePartitionCount(String quorum) { - TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testInvalidTopicLevelConfig(String quorum) { - TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", - "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName, - "--config", "message.timestamp.type=boom"); - assertThrows(ConfigException.class, () -> topicService.createTopic(createOpts), "Expected ConfigException to throw"); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testListTopics(String quorum) { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - - String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list")); - assertTrue(output.contains(testTopicName), "Expected topic name to be present in output: " + output); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testListTopicsWithIncludeList(String quorum) { - String topic1 = "kafka.testTopic1"; - String topic2 = "kafka.testTopic2"; - String topic3 = "oooof.testTopic1"; - int partition = 2; - short replicationFactor = 2; - TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, topic2, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, topic3, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - - String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--topic", "kafka.*")); - - assertTrue(output.contains(topic1), "Expected topic name " + topic1 + " to be present in output: " + output); - assertTrue(output.contains(topic2), "Expected topic name " + topic2 + " to be present in output: " + output); - assertFalse(output.contains(topic3), "Do not expect topic name " + topic3 + " to be present in output: " + output); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testListTopicsWithExcludeInternal(String quorum) { - String topic1 = "kafka.testTopic1"; - String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME; - int partition = 2; - short replicationFactor = 2; - TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, hiddenConsumerTopic, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - - String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--exclude-internal")); - - assertTrue(output.contains(topic1), "Expected topic name " + topic1 + " to be present in output: " + output); - assertFalse(output.contains(hiddenConsumerTopic), "Do not expect topic name " + hiddenConsumerTopic + " to be present in output: " + output); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testAlterPartitionCount(String quorum) throws ExecutionException, InterruptedException { - int partition = 2; - short replicationFactor = 2; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--partitions", "3")); - - TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); - kafka.utils.TestUtils.waitUntilTrue( - () -> brokers().forall(b -> b.metadataCache().getTopicPartitions(testTopicName).size() == 3), - () -> "Timeout waiting for new assignment propagating to broker", org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); - TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get(); - assertEquals(3, topicDescription.partitions().size(), "Expected partition count to be 3. Got: " + topicDescription.partitions().size()); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testAlterAssignment(String quorum) throws ExecutionException, InterruptedException { - int partition = 2; - short replicationFactor = 2; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", - "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "3")); - - TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); - kafka.utils.TestUtils.waitUntilTrue( - () -> brokers().forall(b -> b.metadataCache().getTopicPartitions(testTopicName).size() == 3), - () -> "Timeout waiting for new assignment propagating to broker", - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); - - TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get(); - assertEquals(3, topicDescription.partitions().size(), "Expected partition count to be 3. Got: " + topicDescription.partitions().size()); - List partitionReplicas = getPartitionReplicas(topicDescription.partitions(), 2); - assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to have replicas 4,2. Got: " + partitionReplicas); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testAlterAssignmentWithMoreAssignmentThanPartitions(String quorum) { - int partition = 2; - short replicationFactor = 2; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - assertThrows(ExecutionException.class, - () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", - "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3")), - "Expected to fail with ExecutionException"); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testAlterAssignmentWithMorePartitionsThanAssignment(String quorum) { - int partition = 2; - short replicationFactor = 2; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - - assertThrows(ExecutionException.class, - () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, - "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6")), - "Expected to fail with ExecutionException"); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testAlterWithInvalidPartitionCount(String quorum) { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - assertThrows(ExecutionException.class, - () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--partitions", "-1", "--topic", testTopicName)), - "Expected to fail with ExecutionException"); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testAlterWhenTopicDoesntExist(String quorum) { - // alter a topic that does not exist without --if-exists - TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--partitions", "1"); - TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient); - assertThrows(IllegalArgumentException.class, () -> topicService.alterTopic(alterOpts), "Expected to fail with IllegalArgumentException"); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testAlterWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException { - topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--partitions", "1", "--if-exists")); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateAlterTopicWithRackAware(String quorum) throws Exception { - Map rackInfo = new HashMap<>(); - rackInfo.put(0, "rack1"); - rackInfo.put(1, "rack2"); - rackInfo.put(2, "rack2"); - rackInfo.put(3, "rack1"); - rackInfo.put(4, "rack3"); - rackInfo.put(5, "rack3"); - - int numPartitions = 18; - int replicationFactor = 3; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, numPartitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - - Map> assignment = adminClient.describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions() - .stream() - .collect(Collectors.toMap( - info -> info.partition(), - info -> info.replicas().stream().map(Node::id).collect(Collectors.toList()))); - checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), numPartitions, - replicationFactor, true, true, true); - - int alteredNumPartitions = 36; - // verify that adding partitions will also be rack aware - TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap("--alter", - "--partitions", Integer.toString(alteredNumPartitions), - "--topic", testTopicName); - topicService.alterTopic(alterOpts); - - TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); - kafka.utils.TestUtils.waitUntilTrue( - () -> brokers().forall(p -> p.metadataCache().getTopicPartitions(testTopicName).size() == alteredNumPartitions), - () -> "Timeout waiting for new assignment propagating to broker", - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); - - assignment = adminClient.describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions().stream() - .collect(Collectors.toMap(info -> info.partition(), info -> info.replicas().stream().map(Node::id).collect(Collectors.toList()))); - checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), alteredNumPartitions, replicationFactor, - true, true, true); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testConfigPreservationAcrossPartitionAlteration(String quorum) throws Exception { - String cleanUpPolicy = "compact"; - Properties topicConfig = new Properties(); - topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy); - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), topicConfig - ); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Config props = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); - // val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName) - assertNotNull(props.get(TopicConfig.CLEANUP_POLICY_CONFIG), "Properties after creation don't contain " + cleanUpPolicy); - assertEquals(cleanUpPolicy, props.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Properties after creation have incorrect value"); - - // pre-create the topic config changes path to avoid a NoNodeException - if (!isKRaftTest()) { - zkClient().makeSurePersistentPathExists(kafka.zk.ConfigEntityChangeNotificationZNode.path()); - } - - // modify the topic to add new partitions - int numPartitionsModified = 3; - TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap("--alter", - "--partitions", Integer.toString(numPartitionsModified), "--topic", testTopicName); - topicService.alterTopic(alterOpts); - - TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); - Config newProps = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); - assertNotNull(newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG), "Updated properties do not contain " + TopicConfig.CLEANUP_POLICY_CONFIG); - assertEquals(cleanUpPolicy, newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Updated properties have incorrect value"); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testTopicDeletion(String quorum) throws Exception { - // create the NormalTopic - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - // delete the NormalTopic - TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName); - - if (!isKRaftTest()) { - String deletePath = kafka.zk.DeleteTopicsTopicZNode.path(testTopicName); - assertFalse(zkClient().pathExists(deletePath), "Delete path for topic shouldn't exist before deletion."); - } - topicService.deleteTopic(deleteOpts); - TestUtils.verifyTopicDeletion(zkClientOrNull(), testTopicName, 1, brokers()); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testTopicWithCollidingCharDeletionAndCreateAgain(String quorum) throws Exception { - // create the topic with colliding chars - String topicWithCollidingChar = "test.a"; - TestUtils.createTopicWithAdmin(adminClient, topicWithCollidingChar, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - // delete the topic - TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", topicWithCollidingChar); - - if (!isKRaftTest()) { - String deletePath = kafka.zk.DeleteTopicsTopicZNode.path(topicWithCollidingChar); - assertFalse(zkClient().pathExists(deletePath), "Delete path for topic shouldn't exist before deletion."); - } - topicService.deleteTopic(deleteOpts); - TestUtils.verifyTopicDeletion(zkClientOrNull(), topicWithCollidingChar, 1, brokers()); - assertDoesNotThrow(() -> TestUtils.createTopicWithAdmin(adminClient, topicWithCollidingChar, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ), "Should be able to create a topic with colliding chars after deletion."); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteInternalTopic(String quorum) throws Exception { - // create the offset topic - TestUtils.createTopicWithAdmin(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - // Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is allowed by default. - // This is a difference between the new and the old command as the old one didn't allow internal topic deletion. - // If deleting internal topics is not desired, ACLS should be used to control it. - TopicCommand.TopicCommandOptions deleteOffsetTopicOpts = - buildTopicCommandOptionsWithBootstrap("--delete", "--topic", Topic.GROUP_METADATA_TOPIC_NAME); - String deleteOffsetTopicPath = kafka.zk.DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME); - if (!isKRaftTest()) { - assertFalse(zkClient().pathExists(deleteOffsetTopicPath), "Delete path for topic shouldn't exist before deletion."); - } - topicService.deleteTopic(deleteOffsetTopicOpts); - TestUtils.verifyTopicDeletion(zkClientOrNull(), Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions, brokers()); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteWhenTopicDoesntExist(String quorum) { - // delete a topic that does not exist - TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.deleteTopic(deleteOpts), - "Expected an exception when trying to delete a topic that does not exist."); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException { - topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName, "--if-exists")); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribe(String quorum) { - int partition = 2; - short replicationFactor = 2; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)); - String[] rows = output.split(System.lineSeparator()); - assertEquals(3, rows.length, "Expected 3 rows in output, got " + rows.length); - assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + rows[0]); - } - - @ParameterizedTest - @ValueSource(strings = {"quorum=zk", "quorum=kraft"}) - public void testDescribeWithDescribeTopicPartitionsApi(String quorum) throws ExecutionException, InterruptedException { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 20, (short) 2, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, "test-2", scalaBrokers, scalaControllers, 41, (short) 2, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, "test-3", scalaBrokers, scalaControllers, 5, (short) 2, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, "test-4", scalaBrokers, scalaControllers, 5, (short) 2, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, "test-5", scalaBrokers, scalaControllers, 100, (short) 2, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap( - "--describe", "--partition-size-limit-per-response=20")); - String[] rows = output.split("\n"); - assertEquals(176, rows.length, String.join("\n", rows)); - assertTrue(rows[2].contains("\tElr"), rows[2]); - assertTrue(rows[2].contains("LastKnownElr"), rows[2]); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeWhenTopicDoesntExist(String quorum) { - assertThrows(IllegalArgumentException.class, - () -> topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)), - "Expected an exception when trying to describe a topic that does not exist."); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException { - topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName, "--if-exists")); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeUnavailablePartitions(String quorum) throws ExecutionException, InterruptedException { - int partitions = 6; - short replicationFactor = 1; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - try { - // check which partition is on broker 0 which we'll kill - TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames().get().get(testTopicName); - int partitionOnBroker0 = testTopicDescription.partitions().stream() - .filter(partition -> partition.leader().id() == 0) - .findFirst().get().partition(); - - killBroker(0); - - // wait until the topic metadata for the test topic is propagated to each alive broker - kafka.utils.TestUtils.waitUntilTrue( - () -> { - boolean result = true; - for (KafkaBroker server : JavaConverters.asJavaCollection(brokers())) { - if (server.config().brokerId() != 0) { - Set topicNames = Collections.singleton(testTopicName); - Collection topicMetadatas = - JavaConverters.asJavaCollection(server.dataPlaneRequestProcessor().metadataCache() - .getTopicMetadata(JavaConverters.asScalaSetConverter(topicNames).asScala().toSet(), - ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), - false, false) - ); - Optional testTopicMetadata = topicMetadatas.stream() - .filter(metadata -> metadata.name().equals(testTopicName)) - .findFirst(); - if (!testTopicMetadata.isPresent()) { - throw new AssertionError("Topic metadata is not found in metadata cache"); - } - Optional testPartitionMetadata = testTopicMetadata.get().partitions().stream() - .filter(metadata -> metadata.partitionIndex() == partitionOnBroker0) - .findFirst(); - if (!testPartitionMetadata.isPresent()) { - throw new AssertionError("Partition metadata is not found in metadata cache"); - } - result = result && testPartitionMetadata.get().errorCode() == Errors.LEADER_NOT_AVAILABLE.code(); - } - } - return result; - }, - () -> String.format("Partition metadata for %s is not propagated", testTopicName), - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); - - // grab the console output and assert - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName, "--unavailable-partitions")); - String[] rows = output.split(System.lineSeparator()); - assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), - "Unexpected Topic " + rows[0] + " received. Expect " + String.format("Topic: %s", testTopicName)); - assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:"), - "Rows did not contain 'Leader: none\tReplicas: 0\tIsr:'"); - } finally { - restartDeadBrokers(false); - } - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeUnderReplicatedPartitions(String quorum) { - int partitions = 1; - short replicationFactor = 6; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - try { - killBroker(0); - if (isKRaftTest()) { - ensureConsistentKRaftMetadata(); - } else { - TestUtils.waitForPartitionMetadata(aliveBrokers(), testTopicName, 0, org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS); - } - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-replicated-partitions")); - String[] rows = output.split(System.lineSeparator()); - assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), String.format("Unexpected output: %s", rows[0])); - } finally { - restartDeadBrokers(false); - } - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeUnderMinIsrPartitions(String quorum) { - Properties topicConfig = new Properties(); - topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); - int partitions = 1; - short replicationFactor = 6; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), topicConfig - ); - try { - killBroker(0); - if (isKRaftTest()) { - ensureConsistentKRaftMetadata(); - } else { - kafka.utils.TestUtils.waitUntilTrue( - () -> aliveBrokers().forall(b -> b.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 5), - () -> String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName), - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L - ); - } - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions")); - String[] rows = output.split(System.lineSeparator()); - assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), - "Unexpected topic: " + rows[0]); - } finally { - restartDeadBrokers(false); - } - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String quorum) throws ExecutionException, InterruptedException { - TopicPartition tp = new TopicPartition(testTopicName, 0); - - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - - // Produce multiple batches. - TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1); - TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1); - - // Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication - // throughput so the reassignment doesn't complete quickly. - List brokerIds = JavaConverters.seqAsJavaList(brokers()).stream() - .map(broker -> broker.config().brokerId()).collect(Collectors.toList()); - - ToolsTestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp), 1); - - TopicDescription testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName); - TopicPartitionInfo firstPartition = testTopicDesc.partitions().get(0); - - List replicasOfFirstPartition = firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList()); - List replicasDiff = new ArrayList<>(brokerIds); - replicasDiff.removeAll(replicasOfFirstPartition); - Integer targetReplica = replicasDiff.get(0); - - adminClient.alterPartitionReassignments(Collections.singletonMap(tp, - Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get(); - - // let's wait until the LAIR is propagated - kafka.utils.TestUtils.waitUntilTrue( - () -> { - try { - return !adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get() - .get(tp).addingReplicas().isEmpty(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }, - () -> "Reassignment didn't add the second node", - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); - - ensureConsistentKRaftMetadata(); - - // describe the topic and test if it's under-replicated - String simpleDescribeOutput = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)); - String[] simpleDescribeOutputRows = simpleDescribeOutput.split(System.lineSeparator()); - assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", testTopicName)), - "Unexpected describe output: " + simpleDescribeOutputRows[0]); - assertEquals(2, simpleDescribeOutputRows.length, - "Unexpected describe output length: " + simpleDescribeOutputRows.length); - - String underReplicatedOutput = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-replicated-partitions")); - assertEquals("", underReplicatedOutput, - String.format("--under-replicated-partitions shouldn't return anything: '%s'", underReplicatedOutput)); - - int maxRetries = 20; - long pause = 100L; - long waitTimeMs = maxRetries * pause; - AtomicReference reassignmentsRef = new AtomicReference<>(); - - TestUtils.waitUntilTrue(() -> { - try { - PartitionReassignment tempReassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp); - reassignmentsRef.set(tempReassignments); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Error while fetching reassignments", e); - } - return reassignmentsRef.get() != null; - }, () -> "Reassignments did not become non-null within the specified time", waitTimeMs, pause); - - assertFalse(reassignmentsRef.get().addingReplicas().isEmpty()); - - ToolsTestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp)); - TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeAtMinIsrPartitions(String quorum) { - Properties topicConfig = new Properties(); - topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4"); - - int partitions = 1; - short replicationFactor = 6; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), topicConfig - ); - try { - killBroker(0); - killBroker(1); - - if (isKRaftTest()) { - ensureConsistentKRaftMetadata(); - } else { - kafka.utils.TestUtils.waitUntilTrue( - () -> aliveBrokers().forall(broker -> broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 4), - () -> String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName), - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L - ); - } - - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--at-min-isr-partitions")); - String[] rows = output.split(System.lineSeparator()); - assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), - "Unexpected output: " + rows[0]); - assertEquals(1, rows.length); - } finally { - restartDeadBrokers(false); - } - } - - /** - * Test describe --under-min-isr-partitions option with four topics: - * (1) topic with partition under the configured min ISR count - * (2) topic with under-replicated partition (but not under min ISR count) - * (3) topic with offline partition - * (4) topic with fully replicated partition - * - * Output should only display the (1) topic with partition under min ISR count and (3) topic with offline partition - */ - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeUnderMinIsrPartitionsMixed(String quorum) { - String underMinIsrTopic = "under-min-isr-topic"; - String notUnderMinIsrTopic = "not-under-min-isr-topic"; - String offlineTopic = "offline-topic"; - String fullyReplicatedTopic = "fully-replicated-topic"; - - scala.collection.mutable.HashMap> fullyReplicatedReplicaAssignmentMap = new scala.collection.mutable.HashMap<>(); - fullyReplicatedReplicaAssignmentMap.put(0, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 1, (Object) 2, (Object) 3)).asScala().toSeq()); - - scala.collection.mutable.HashMap> offlineReplicaAssignmentMap = new scala.collection.mutable.HashMap<>(); - offlineReplicaAssignmentMap.put(0, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 0)).asScala().toSeq()); - - Properties topicConfig = new Properties(); - topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); - - int partitions = 1; - short replicationFactor = 6; - int negativePartition = -1; - short negativeReplicationFactor = -1; - TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, scalaBrokers, scalaControllers, partitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), topicConfig - ); - TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, scalaBrokers, scalaControllers, partitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, offlineTopic, scalaBrokers, scalaControllers, negativePartition, negativeReplicationFactor, - offlineReplicaAssignmentMap, new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, scalaBrokers, scalaControllers, negativePartition, negativeReplicationFactor, - fullyReplicatedReplicaAssignmentMap, new Properties() - ); - - - try { - killBroker(0); - if (isKRaftTest()) { - ensureConsistentKRaftMetadata(); - } else { - kafka.utils.TestUtils.waitUntilTrue( - () -> aliveBrokers().forall(broker -> - broker.metadataCache().getPartitionInfo(underMinIsrTopic, 0).get().isr().size() < 6 && - broker.metadataCache().getPartitionInfo(offlineTopic, 0).get().leader() == MetadataResponse.NO_LEADER_ID), - () -> "Timeout waiting for partition metadata propagating to brokers for underMinIsrTopic topic", - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); - } - TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions")); - String[] rows = output.split(System.lineSeparator()); - assertTrue(rows[0].startsWith(String.format("Topic: %s", underMinIsrTopic)), - "Unexpected output: " + rows[0]); - assertTrue(rows[1].startsWith(String.format("\tTopic: %s", offlineTopic)), - "Unexpected output: " + rows[1]); - assertEquals(2, rows.length); - } finally { - restartDeadBrokers(false); - } - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeReportOverriddenConfigs(String quorum) { - String config = "file.delete.delay.ms=1000"; - Properties topicConfig = new Properties(); - topicConfig.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1000"); - - int partitions = 2; - short replicationFactor = 2; - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), topicConfig - ); - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe")); - assertTrue(output.contains(config), String.format("Describe output should have contained %s", config)); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeAndListTopicsWithoutInternalTopics(String quorum) { - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - TestUtils.createTopicWithAdmin(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - // test describe - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--describe", "--exclude-internal")); - assertTrue(output.contains(testTopicName), - String.format("Output should have contained %s", testTopicName)); - assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), - "Output should not have contained " + Topic.GROUP_METADATA_TOPIC_NAME); - - // test list - output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--exclude-internal")); - assertTrue(output.contains(testTopicName), String.format("Output should have contained %s", testTopicName)); - assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), - "Output should not have contained " + Topic.GROUP_METADATA_TOPIC_NAME); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(String quorum) throws Exception { - adminClient = spy(adminClient); - topicService.close(); // need to be closed before initializing a new one with the spy version of adminclient, otherwise will have the extra adminclient(s) not closed. - ListPartitionReassignmentsResult result = AdminClientTestUtils.listPartitionReassignmentsResult( - new ClusterAuthorizationException("Unauthorized")); - - doReturn(result).when(adminClient).listPartitionReassignments( - Collections.singleton(new TopicPartition(testTopicName, 0)) - ); - - topicService = new TopicCommand.TopicService(adminClient); - - TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - - String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)); - String[] rows = output.split(System.lineSeparator()); - assertEquals(2, rows.length, "Unexpected output: " + output); - assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Unexpected output: " + rows[0]); - } - - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testCreateWithTopicNameCollision(String quorum) { - String topic = "foo_bar"; - int partitions = 1; - short replicationFactor = 6; - TestUtils.createTopicWithAdmin(adminClient, topic, scalaBrokers, scalaControllers, partitions, replicationFactor, - scala.collection.immutable.Map$.MODULE$.empty(), new Properties() - ); - assertThrows(TopicExistsException.class, - () -> topicService.createTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", topic))); - } - - private void checkReplicaDistribution(Map> assignment, - Map brokerRackMapping, - Integer numBrokers, - Integer numPartitions, - Integer replicationFactor, - Boolean verifyRackAware, - Boolean verifyLeaderDistribution, - Boolean verifyReplicasDistribution) { - // always verify that no broker will be assigned for more than one replica - assignment.entrySet().stream() - .forEach(entry -> assertEquals(new HashSet<>(entry.getValue()).size(), entry.getValue().size(), - "More than one replica is assigned to same broker for the same partition")); - - ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); - - if (verifyRackAware) { - Map> partitionRackMap = distribution.partitionRacks; - - List partitionRackMapValueSize = partitionRackMap.values().stream() - .map(value -> (int) value.stream().distinct().count()) - .collect(Collectors.toList()); - - List expected = Collections.nCopies(numPartitions, replicationFactor); - assertEquals(expected, partitionRackMapValueSize, "More than one replica of the same partition is assigned to the same rack"); - } - - if (verifyLeaderDistribution) { - Map leaderCount = distribution.brokerLeaderCount; - int leaderCountPerBroker = numPartitions / numBrokers; - List expected = Collections.nCopies(numBrokers, leaderCountPerBroker); - assertEquals(expected, new ArrayList<>(leaderCount.values()), "Preferred leader count is not even for brokers"); - } - - if (verifyReplicasDistribution) { - Map replicasCount = distribution.brokerReplicasCount; - int numReplicasPerBroker = numPartitions * replicationFactor / numBrokers; - List expected = Collections.nCopies(numBrokers, numReplicasPerBroker); - assertEquals(expected, new ArrayList<>(replicasCount.values()), "Replica count is not even for broker"); - } - } - - private String captureDescribeTopicStandardOut(TopicCommand.TopicCommandOptions opts) { - Runnable runnable = () -> { - try { - topicService.describeTopic(opts); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - return ToolsTestUtils.captureStandardOut(runnable); - } - - private String captureListTopicStandardOut(TopicCommand.TopicCommandOptions opts) { - Runnable runnable = () -> { - try { - topicService.listTopics(opts); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - return ToolsTestUtils.captureStandardOut(runnable); - } - - private static ReplicaDistributions getReplicaDistribution(Map> assignment, Map brokerRackMapping) { - Map leaderCount = new HashMap<>(); - Map partitionCount = new HashMap<>(); - Map> partitionRackMap = new HashMap<>(); - - assignment.entrySet().stream().forEach(entry -> { - Integer partitionId = entry.getKey(); - List replicaList = entry.getValue(); - Integer leader = replicaList.get(0); - leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1); - replicaList.stream().forEach(brokerId -> { - partitionCount.put(brokerId, partitionCount.getOrDefault(brokerId, 0) + 1); - String rack; - if (brokerRackMapping.containsKey(brokerId)) { - rack = brokerRackMapping.get(brokerId); - List partitionRackValues = Stream.of(Collections.singletonList(rack), partitionRackMap.getOrDefault(partitionId, Collections.emptyList())) - .flatMap(List::stream) - .collect(Collectors.toList()); - partitionRackMap.put(partitionId, partitionRackValues); - } else { - System.err.printf("No mapping found for %s in `brokerRackMapping`%n", brokerId); - } - }); - }); - return new ReplicaDistributions(partitionRackMap, leaderCount, partitionCount); - } - - private static class ReplicaDistributions { - private final Map> partitionRacks; - private final Map brokerLeaderCount; - private final Map brokerReplicasCount; - - public ReplicaDistributions(Map> partitionRacks, - Map brokerLeaderCount, - Map brokerReplicasCount) { - this.partitionRacks = partitionRacks; - this.brokerLeaderCount = brokerLeaderCount; - this.brokerReplicasCount = brokerReplicasCount; - } - } - -} diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java index e6399a870729..c1892996a27b 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java @@ -14,30 +14,55 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.kafka.tools; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientTestUtils; +import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.CreatePartitionsResult; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DeleteTopicsOptions; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult; import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.UpdateMetadataRequestData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.server.common.AdminCommandFailedException; import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.ArrayList; import java.util.Arrays; @@ -48,24 +73,38 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@Timeout(value = 60) +@ExtendWith(ClusterTestExtensions.class) public class TopicCommandTest { + private final short defaultReplicationFactor = 1; + private final int defaultNumPartitions = 1; + + private static final int CLUSTER_WAIT_MS = 60000; + private final String bootstrapServer = "localhost:9092"; private final String topicName = "topicName"; @@ -78,9 +117,9 @@ public void testIsNotUnderReplicatedWhenAdding() { } TopicCommand.PartitionDescription partitionDescription = new TopicCommand.PartitionDescription("test-topic", - new TopicPartitionInfo(0, new Node(1, "localhost", 9091), replicas, - Collections.singletonList(new Node(1, "localhost", 9091))), - null, false, + new TopicPartitionInfo(0, new Node(1, "localhost", 9091), replicas, + Collections.singletonList(new Node(1, "localhost", 9091))), + null, false, new PartitionReassignment(replicaIds, Arrays.asList(2), Collections.emptyList()) ); @@ -221,11 +260,11 @@ public void testCreateTopicDoesNotRetryThrottlingQuotaExceededException() { }))); NewTopic expectedNewTopic = new NewTopic(topicName, Optional.empty(), Optional.empty()) - .configs(Collections.emptyMap()); + .configs(Collections.emptyMap()); verify(adminClient, times(1)).createTopics( - eq(new HashSet<>(Arrays.asList(expectedNewTopic))), - argThat(exception -> !exception.shouldRetryOnQuotaViolation()) + eq(new HashSet<>(Arrays.asList(expectedNewTopic))), + argThat(exception -> !exception.shouldRetryOnQuotaViolation()) ); } @@ -249,8 +288,8 @@ public void testDeleteTopicDoesNotRetryThrottlingQuotaExceededException() { assertInstanceOf(ThrottlingQuotaExceededException.class, exception.getCause()); verify(adminClient).deleteTopics( - argThat((Collection topics) -> topics.equals(Arrays.asList(topicName))), - argThat((DeleteTopicsOptions options) -> !options.shouldRetryOnQuotaViolation())); + argThat((Collection topics) -> topics.equals(Arrays.asList(topicName))), + argThat((DeleteTopicsOptions options) -> !options.shouldRetryOnQuotaViolation())); } @Test @@ -262,9 +301,9 @@ public void testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException() { when(adminClient.listTopics(any())).thenReturn(listResult); TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, new Node(0, "", 0), - Collections.emptyList(), Collections.emptyList()); + Collections.emptyList(), Collections.emptyList()); DescribeTopicsResult describeResult = AdminClientTestUtils.describeTopicsResult(topicName, - new TopicDescription(topicName, false, Collections.singletonList(topicPartitionInfo))); + new TopicDescription(topicName, false, Collections.singletonList(topicPartitionInfo))); when(adminClient.describeTopics(anyCollection())).thenReturn(describeResult); CreatePartitionsResult result = AdminClientTestUtils.createPartitionsResult(topicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception()); @@ -278,8 +317,8 @@ public void testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException() { assertInstanceOf(ThrottlingQuotaExceededException.class, exception.getCause()); verify(adminClient, times(1)).createPartitions( - argThat(newPartitions -> newPartitions.get(topicName).totalCount() == 3), - argThat(createPartitionOption -> !createPartitionOption.shouldRetryOnQuotaViolation())); + argThat(newPartitions -> newPartitions.get(topicName).totalCount() == 3), + argThat(createPartitionOption -> !createPartitionOption.shouldRetryOnQuotaViolation())); } public void assertInitializeInvalidOptionsExitCode(int expected, String[] options) { @@ -293,4 +332,1151 @@ public void assertInitializeInvalidOptionsExitCode(int expected, String[] option Exit.resetExitProcedure(); } } + + private TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap(ClusterInstance clusterInstance, String... opts) { + String bootstrapServer = clusterInstance.bootstrapServers(); + String[] finalOptions = Stream.concat(Arrays.stream(opts), + Stream.of("--bootstrap-server", bootstrapServer) + ).toArray(String[]::new); + return new TopicCommand.TopicCommandOptions(finalOptions); + } + + static List generate() { + Map serverProp = new HashMap<>(); + serverProp.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); // if config name error, no exception throw + + Map> rackInfo = new HashMap<>(); + Map infoPerBroker1 = new HashMap<>(); + infoPerBroker1.put("broker.rack", "rack1"); + Map infoPerBroker2 = new HashMap<>(); + infoPerBroker2.put("broker.rack", "rack2"); + Map infoPerBroker3 = new HashMap<>(); + infoPerBroker3.put("broker.rack", "rack2"); + Map infoPerBroker4 = new HashMap<>(); + infoPerBroker4.put("broker.rack", "rack1"); + Map infoPerBroker5 = new HashMap<>(); + infoPerBroker5.put("broker.rack", "rack3"); + Map infoPerBroker6 = new HashMap<>(); + infoPerBroker6.put("broker.rack", "rack3"); + + rackInfo.put(0, infoPerBroker1); + rackInfo.put(1, infoPerBroker2); + rackInfo.put(2, infoPerBroker3); + rackInfo.put(3, infoPerBroker4); + rackInfo.put(4, infoPerBroker5); + rackInfo.put(5, infoPerBroker6); + + return Collections.singletonList(ClusterConfig.defaultBuilder() + .setBrokers(6) + .setServerProperties(serverProp) + .setPerServerProperties(rackInfo) + .setTypes(Stream.of(Type.ZK, Type.KRAFT).collect(Collectors.toSet())) + .build() + ); + } + + @ClusterTemplate("generate") + public void testCreate(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName), + "Admin client didn't see the created topic. It saw: " + adminClient.listTopics().names().get()); + + adminClient.deleteTopics(Collections.singletonList(testTopicName)); + clusterInstance.waitForTopic(testTopicName, 0); + Assertions.assertTrue(adminClient.listTopics().names().get().isEmpty(), + "Admin client see the created topic. It saw: " + adminClient.listTopics().names().get()); + } + } + + @ClusterTemplate("generate") + public void testCreateWithDefaults(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName), + "Admin client didn't see the created topic. It saw: " + adminClient.listTopics().names().get()); + + List partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + Assertions.assertEquals(defaultNumPartitions, partitions.size(), "Unequal partition size: " + partitions.size()); + Assertions.assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size()); + + adminClient.deleteTopics(Collections.singletonList(testTopicName)); + clusterInstance.waitForTopic(testTopicName, 0); + Assertions.assertTrue(adminClient.listTopics().names().get().isEmpty(), + "Admin client see the created topic. It saw: " + adminClient.listTopics().names().get()); + } + } + + @ClusterTemplate("generate") + public void testCreateWithDefaultReplication(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 2, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, 2); + List partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + assertEquals(2, partitions.size(), "Unequal partition size: " + partitions.size()); + assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size()); + } + } + + @ClusterTemplate("generate") + public void testCreateWithDefaultPartitions(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, (short) 2))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + List partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + + assertEquals(defaultNumPartitions, partitions.size(), "Unequal partition size: " + partitions.size()); + assertEquals(2, (short) partitions.get(0).replicas().size(), "Partitions not replicated: " + partitions.get(0).replicas().size()); + } + } + + @ClusterTemplate("generate") + public void testCreateWithConfigs(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Map topicConfig = new HashMap<>(); + topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000"); + + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 2, (short) 2).configs(topicConfig))); + clusterInstance.waitForTopic(testTopicName, 2); + + + Config configs = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); + assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value()), + "Config not set correctly: " + configs.get("delete.retention.ms").value()); + } + } + + @ClusterTemplate("generate") + public void testCreateWhenAlreadyExists(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap( + clusterInstance, "--create", "--partitions", Integer.toString(defaultNumPartitions), "--replication-factor", "1", + "--topic", testTopicName); + + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + // try to re-create the topic + assertThrows(TopicExistsException.class, () -> topicService.createTopic(createOpts), + "Expected TopicExistsException to throw"); + } + } + + @ClusterTemplate("generate") + public void testCreateWhenAlreadyExistsWithIfNotExists(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + TopicCommand.TopicCommandOptions createOpts = + buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--topic", testTopicName, "--if-not-exists"); + topicService.createTopic(createOpts); + } + } + + private List getPartitionReplicas(List partitions, int partitionNumber) { + return partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList()); + } + + @ClusterTemplate("generate") + public void testCreateWithReplicaAssignment(ClusterInstance clusterInstance) throws Exception { + Map> replicaAssignmentMap = new HashMap<>(); + try (Admin adminClient = clusterInstance.createAdminClient()) { + String testTopicName = TestUtils.randomString(10); + + replicaAssignmentMap.put(0, Arrays.asList(5, 4)); + replicaAssignmentMap.put(1, Arrays.asList(3, 2)); + replicaAssignmentMap.put(2, Arrays.asList(1, 0)); + + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, replicaAssignmentMap))); + clusterInstance.waitForTopic(testTopicName, 3); + + List partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + + adminClient.close(); + assertEquals(3, partitions.size(), + "Unequal partition size: " + partitions.size()); + assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0), + "Unexpected replica assignment: " + getPartitionReplicas(partitions, 0)); + assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1), + "Unexpected replica assignment: " + getPartitionReplicas(partitions, 1)); + assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2), + "Unexpected replica assignment: " + getPartitionReplicas(partitions, 2)); + } + } + + @ClusterTemplate("generate") + public void testCreateWithInvalidReplicationFactor(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + + TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--partitions", "2", "--replication-factor", Integer.toString(Short.MAX_VALUE + 1), + "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); + } + } + + @ClusterTemplate("generate") + public void testCreateWithNegativeReplicationFactor(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", + "--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); + } + } + + @ClusterTemplate("generate") + public void testCreateWithNegativePartitionCount(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); + } + } + + @ClusterTemplate("generate") + public void testInvalidTopicLevelConfig(ClusterInstance clusterInstance) { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient); + + TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", + "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName, + "--config", "message.timestamp.type=boom"); + assertThrows(ConfigException.class, () -> topicService.createTopic(createOpts), "Expected ConfigException to throw"); + } + } + + @ClusterTemplate("generate") + public void testListTopics(ClusterInstance clusterInstance) throws InterruptedException { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + String output = captureListTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list")); + assertTrue(output.contains(testTopicName), "Expected topic name to be present in output: " + output); + } + } + + @ClusterTemplate("generate") + public void testListTopicsWithIncludeList(ClusterInstance clusterInstance) throws InterruptedException { + try (Admin adminClient = clusterInstance.createAdminClient()) { + String topic1 = "kafka.testTopic1"; + String topic2 = "kafka.testTopic2"; + String topic3 = "oooof.testTopic1"; + int partition = 2; + short replicationFactor = 2; + adminClient.createTopics(Collections.singletonList(new NewTopic(topic1, partition, replicationFactor))); + adminClient.createTopics(Collections.singletonList(new NewTopic(topic2, partition, replicationFactor))); + adminClient.createTopics(Collections.singletonList(new NewTopic(topic3, partition, replicationFactor))); + clusterInstance.waitForTopic(topic1, partition); + clusterInstance.waitForTopic(topic2, partition); + clusterInstance.waitForTopic(topic3, partition); + + String output = captureListTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list", "--topic", "kafka.*")); + assertTrue(output.contains(topic1), "Expected topic name " + topic1 + " to be present in output: " + output); + assertTrue(output.contains(topic2), "Expected topic name " + topic2 + " to be present in output: " + output); + assertFalse(output.contains(topic3), "Do not expect topic name " + topic3 + " to be present in output: " + output); + } + } + + @ClusterTemplate("generate") + public void testListTopicsWithExcludeInternal(ClusterInstance clusterInstance) throws InterruptedException { + try (Admin adminClient = clusterInstance.createAdminClient()) { + String topic1 = "kafka.testTopic1"; + String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME; + int partition = 2; + short replicationFactor = 2; + adminClient.createTopics(Collections.singletonList(new NewTopic(topic1, partition, replicationFactor))); + clusterInstance.waitForTopic(topic1, partition); + + String output = captureListTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list", "--exclude-internal")); + assertTrue(output.contains(topic1), "Expected topic name " + topic1 + " to be present in output: " + output); + assertFalse(output.contains(hiddenConsumerTopic), "Do not expect topic name " + hiddenConsumerTopic + " to be present in output: " + output); + } + } + + @ClusterTemplate("generate") + public void testAlterPartitionCount(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + int partition = 2; + short replicationFactor = 2; + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor))); + clusterInstance.waitForTopic(testTopicName, partition); + topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--partitions", "3")); + + TestUtils.waitForCondition( + () -> adminClient.listPartitionReassignments().reassignments().get().isEmpty(), + CLUSTER_WAIT_MS, testTopicName + String.format("reassignmet not finished after %s ms", CLUSTER_WAIT_MS) + ); + + TestUtils.waitForCondition( + () -> clusterInstance.brokers().values().stream().allMatch( + b -> b.metadataCache().getTopicPartitions(testTopicName).size() == 3), + TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new assignment propagating to broker"); + TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get(); + assertEquals(3, topicDescription.partitions().size(), "Expected partition count to be 3. Got: " + topicDescription.partitions().size()); + } + } + + @ClusterTemplate("generate") + public void testAlterAssignment(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + int partition = 2; + short replicationFactor = 2; + + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor))); + clusterInstance.waitForTopic(testTopicName, partition); + + topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", + "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "3")); + + TestUtils.waitForCondition( + () -> adminClient.listPartitionReassignments().reassignments().get().isEmpty(), + CLUSTER_WAIT_MS, testTopicName + String.format("reassignmet not finished after %s ms", CLUSTER_WAIT_MS) + ); + + TestUtils.waitForCondition( + () -> clusterInstance.brokers().values().stream().allMatch( + b -> b.metadataCache().getTopicPartitions(testTopicName).size() == 3), + TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new assignment propagating to broker"); + + TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get(); + assertEquals(3, topicDescription.partitions().size(), "Expected partition count to be 3. Got: " + topicDescription.partitions().size()); + List partitionReplicas = getPartitionReplicas(topicDescription.partitions(), 2); + assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to have replicas 4,2. Got: " + partitionReplicas); + + } + } + + @ClusterTemplate("generate") + public void testAlterAssignmentWithMoreAssignmentThanPartitions(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + + int partition = 2; + short replicationFactor = 2; + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor))); + clusterInstance.waitForTopic(testTopicName, partition); + + assertThrows(ExecutionException.class, + () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", + "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3")), + "Expected to fail with ExecutionException"); + + } + } + + @ClusterTemplate("generate") + public void testAlterAssignmentWithMorePartitionsThanAssignment(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + int partition = 2; + short replicationFactor = 2; + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor))); + clusterInstance.waitForTopic(testTopicName, partition); + + assertThrows(ExecutionException.class, + () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, + "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6")), + "Expected to fail with ExecutionException"); + + } + } + + @ClusterTemplate("generate") + public void testAlterWithInvalidPartitionCount(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + assertThrows(ExecutionException.class, + () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--partitions", "-1", "--topic", testTopicName)), + "Expected to fail with ExecutionException"); + } + } + + @ClusterTemplate("generate") + public void testAlterWhenTopicDoesntExist(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + // alter a topic that does not exist without --if-exists + TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--partitions", "1"); + assertThrows(IllegalArgumentException.class, () -> topicService.alterTopic(alterOpts), "Expected to fail with IllegalArgumentException"); + + } + } + + @ClusterTemplate("generate") + public void testAlterWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + Admin adminClient = clusterInstance.createAdminClient(); + + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient); + topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--partitions", "1", "--if-exists")); + adminClient.close(); + topicService.close(); + } + + @ClusterTemplate("generate") + public void testCreateAlterTopicWithRackAware(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + + Map rackInfo = new HashMap<>(); + rackInfo.put(0, "rack1"); + rackInfo.put(1, "rack2"); + rackInfo.put(2, "rack2"); + rackInfo.put(3, "rack1"); + rackInfo.put(4, "rack3"); + rackInfo.put(5, "rack3"); + + int numPartitions = 18; + int replicationFactor = 3; + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, numPartitions, (short) replicationFactor))); + clusterInstance.waitForTopic(testTopicName, numPartitions); + + Map> assignment = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions() + .stream() + .collect(Collectors.toMap( + info -> info.partition(), + info -> info.replicas().stream().map(Node::id).collect(Collectors.toList()))); + checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), numPartitions, + replicationFactor, true, true, true); + + int alteredNumPartitions = 36; + // verify that adding partitions will also be rack aware + TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", + "--partitions", Integer.toString(alteredNumPartitions), + "--topic", testTopicName); + topicService.alterTopic(alterOpts); + + TestUtils.waitForCondition( + () -> adminClient.listPartitionReassignments().reassignments().get().isEmpty(), + CLUSTER_WAIT_MS, testTopicName + String.format("reassignmet not finished after %s ms", CLUSTER_WAIT_MS) + ); + TestUtils.waitForCondition( + () -> clusterInstance.brokers().values().stream().allMatch(p -> p.metadataCache().getTopicPartitions(testTopicName).size() == alteredNumPartitions), + TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new assignment propagating to broker"); + + assignment = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().stream() + .collect(Collectors.toMap(info -> info.partition(), info -> info.replicas().stream().map(Node::id).collect(Collectors.toList()))); + checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), alteredNumPartitions, replicationFactor, + true, true, true); + + } + } + + @ClusterTemplate("generate") + public void testConfigPreservationAcrossPartitionAlteration(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + + String cleanUpPolicy = "compact"; + HashMap topicConfig = new HashMap<>(); + topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy); + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor).configs(topicConfig))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Config props = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); + assertNotNull(props.get(TopicConfig.CLEANUP_POLICY_CONFIG), "Properties after creation don't contain " + cleanUpPolicy); + assertEquals(cleanUpPolicy, props.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Properties after creation have incorrect value"); + + // modify the topic to add new partitions + int numPartitionsModified = 3; + TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", + "--partitions", Integer.toString(numPartitionsModified), "--topic", testTopicName); + topicService.alterTopic(alterOpts); + + TestUtils.waitForCondition( + () -> clusterInstance.brokers().values().stream().allMatch(p -> p.metadataCache().getTopicPartitions(testTopicName).size() == numPartitionsModified), + TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new assignment propagating to broker"); + + Config newProps = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); + assertNotNull(newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG), "Updated properties do not contain " + TopicConfig.CLEANUP_POLICY_CONFIG); + assertEquals(cleanUpPolicy, newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Updated properties have incorrect value"); + + } + } + + @ClusterTemplate("generate") + public void testTopicDeletion(ClusterInstance clusterInstance) throws Exception { + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + String testTopicName = TestUtils.randomString(10); + + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + // delete the NormalTopic + TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", testTopicName); + topicService.deleteTopic(deleteOpts); + + TestUtils.waitForCondition( + () -> adminClient.listTopics().listings().get().stream().noneMatch(topic -> topic.name().equals(testTopicName)), + CLUSTER_WAIT_MS, String.format("Delete topic fail in %s ms", CLUSTER_WAIT_MS) + ); + } + } + + @ClusterTemplate("generate") + public void testTopicWithCollidingCharDeletionAndCreateAgain(ClusterInstance clusterInstance) throws Exception { + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + // create the topic with colliding chars + String topicWithCollidingChar = "test.a"; + adminClient.createTopics(Collections.singletonList(new NewTopic(topicWithCollidingChar, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(topicWithCollidingChar, defaultNumPartitions); + + // delete the topic + TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", topicWithCollidingChar); + topicService.deleteTopic(deleteOpts); + TestUtils.waitForCondition( + () -> adminClient.listTopics().listings().get().stream().noneMatch(topic -> topic.name().equals(topicWithCollidingChar)), + CLUSTER_WAIT_MS, String.format("Delete topic fail in %s ms", CLUSTER_WAIT_MS) + ); + + clusterInstance.waitTopicDeletion(topicWithCollidingChar); + + // recreate same topic + adminClient.createTopics(Collections.singletonList(new NewTopic(topicWithCollidingChar, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(topicWithCollidingChar, defaultNumPartitions); + } + } + + @ClusterTemplate("generate") + public void testDeleteInternalTopic(ClusterInstance clusterInstance) throws Exception { + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + + // create the offset topic + // In ZK mode, Topic.GROUP_METADATA_TOPIC_NAME exist when cluster is created. + if (clusterInstance.isKRaftTest()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions); + } + + // Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is allowed by default. + // This is a difference between the new and the old command as the old one didn't allow internal topic deletion. + // If deleting internal topics is not desired, ACLS should be used to control it. + TopicCommand.TopicCommandOptions deleteOffsetTopicOpts = + buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", Topic.GROUP_METADATA_TOPIC_NAME); + + topicService.deleteTopic(deleteOffsetTopicOpts); + TestUtils.waitForCondition( + () -> adminClient.listTopics().listings().get().stream().noneMatch(topic -> topic.name().equals(Topic.GROUP_METADATA_TOPIC_NAME)), + CLUSTER_WAIT_MS, String.format("Delete topic fail in %s ms", CLUSTER_WAIT_MS) + ); + + } + } + + @ClusterTemplate("generate") + public void testDeleteWhenTopicDoesntExist(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + // delete a topic that does not exist + TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.deleteTopic(deleteOpts), + "Expected an exception when trying to delete a topic that does not exist."); + } + } + + @ClusterTemplate("generate") + public void testDeleteWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", testTopicName, "--if-exists")); + } + } + + @ClusterTemplate("generate") + public void testDescribe(ClusterInstance clusterInstance) throws InterruptedException { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + int partition = 2; + short replicationFactor = 2; + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor))); + clusterInstance.waitForTopic(testTopicName, partition); + + String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName)); + String[] rows = output.split(System.lineSeparator()); + assertEquals(3, rows.length, "Expected 3 rows in output, got " + rows.length); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + rows[0]); + } + } + + @ClusterTemplate("generate") + public void testDescribeWithDescribeTopicPartitionsApi(ClusterInstance clusterInstance) throws InterruptedException { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + + List topics = new ArrayList<>(); + topics.add(new NewTopic(testTopicName, 20, (short) 2)); + topics.add(new NewTopic("test-2", 41, (short) 2)); + topics.add(new NewTopic("test-3", 5, (short) 2)); + topics.add(new NewTopic("test-4", 5, (short) 2)); + topics.add(new NewTopic("test-5", 100, (short) 2)); + + adminClient.createTopics(topics); + clusterInstance.waitForTopic(testTopicName, 20); + clusterInstance.waitForTopic("test-2", 41); + clusterInstance.waitForTopic("test-3", 5); + clusterInstance.waitForTopic("test-4", 5); + clusterInstance.waitForTopic("test-5", 100); + + String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, + "--describe", "--partition-size-limit-per-response=20", "--exclude-internal")); + String[] rows = output.split("\n"); + + assertEquals(176, rows.length, String.join("\n", rows)); + assertTrue(rows[2].contains("\tElr"), rows[2]); + assertTrue(rows[2].contains("LastKnownElr"), rows[2]); + + } + } + + @ClusterTemplate("generate") + public void testDescribeWhenTopicDoesntExist(ClusterInstance clusterInstance) { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient); + + assertThrows(IllegalArgumentException.class, + () -> topicService.describeTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName)), + "Expected an exception when trying to describe a topic that does not exist."); + } + + } + + @ClusterTemplate("generate") + public void testDescribeWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient); + + topicService.describeTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName, "--if-exists")); + + adminClient.close(); + topicService.close(); + } + } + + @ClusterTemplate("generate") + public void testDescribeUnavailablePartitions(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + int partitions = 6; + short replicationFactor = 1; + + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor))); + clusterInstance.waitForTopic(testTopicName, partitions); + + // check which partition is on broker 0 which we'll kill + clusterInstance.shutdownBroker(0); + assertEquals(5, clusterInstance.aliveBrokers().size()); + + // wait until the topic metadata for the test topic is propagated to each alive broker + clusterInstance.waitForTopic(testTopicName, 6); + + // grab the console output and assert + String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName, "--unavailable-partitions")); + String[] rows = output.split(System.lineSeparator()); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), + "Unexpected Topic " + rows[0] + " received. Expect " + String.format("Topic: %s", testTopicName)); + assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:"), + "Rows did not contain 'Leader: none\tReplicas: 0\tIsr:'"); + + } + } + + @ClusterTemplate("generate") + public void testDescribeUnderReplicatedPartitions(ClusterInstance clusterInstance) throws InterruptedException { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + int partitions = 1; + short replicationFactor = 6; + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor))); + clusterInstance.waitForTopic(testTopicName, partitions); + + clusterInstance.shutdownBroker(0); + Assertions.assertEquals(clusterInstance.aliveBrokers().size(), 5); + + TestUtils.waitForCondition( + () -> clusterInstance.aliveBrokers().values().stream().allMatch( + broker -> { + Optional partitionState = + Optional.ofNullable(broker.metadataCache().getPartitionInfo(testTopicName, 0).getOrElse(null)); + return partitionState.map(s -> FetchRequest.isValidBrokerId(s.leader())).orElse(false); + } + ), CLUSTER_WAIT_MS, String.format("Meta data propogation fail in %s ms", CLUSTER_WAIT_MS)); + + String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--under-replicated-partitions")); + String[] rows = output.split(System.lineSeparator()); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), String.format("Unexpected output: %s", rows[0])); + } + } + + + @ClusterTemplate("generate") + public void testDescribeUnderMinIsrPartitions(ClusterInstance clusterInstance) throws InterruptedException { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + Map topicConfig = new HashMap<>(); + topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); + int partitions = 1; + short replicationFactor = 6; + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig))); + clusterInstance.waitForTopic(testTopicName, partitions); + + clusterInstance.shutdownBroker(0); + assertEquals(5, clusterInstance.aliveBrokers().size()); + + TestUtils.waitForCondition( + () -> clusterInstance.aliveBrokers().values().stream().allMatch(broker -> broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 5), + CLUSTER_WAIT_MS, String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName) + ); + + String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--under-min-isr-partitions", "--exclude-internal")); + String[] rows = output.split(System.lineSeparator()); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), + "Unexpected topic: " + rows[0]); + } + } + + @ClusterTemplate("generate") + public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient(); + KafkaProducer producer = createProducer(clusterInstance)) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + TopicPartition tp = new TopicPartition(testTopicName, 0); + + // Produce multiple batches. + sendProducerRecords(testTopicName, producer, 10); + sendProducerRecords(testTopicName, producer, 10); + + // Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication + // throughput so the reassignment doesn't complete quickly. + List brokerIds = new ArrayList<>(clusterInstance.brokerIds()); + + ToolsTestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp), 1); + + TopicDescription testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName); + TopicPartitionInfo firstPartition = testTopicDesc.partitions().get(0); + + List replicasOfFirstPartition = firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList()); + List replicasDiff = new ArrayList<>(brokerIds); + replicasDiff.removeAll(replicasOfFirstPartition); + Integer targetReplica = replicasDiff.get(0); + + adminClient.alterPartitionReassignments(Collections.singletonMap(tp, + Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get(); + + // let's wait until the LAIR is propagated + TestUtils.waitForCondition( + () -> !adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get() + .get(tp).addingReplicas().isEmpty(), CLUSTER_WAIT_MS, "Reassignment didn't add the second node" + ); + + // describe the topic and test if it's under-replicated + String simpleDescribeOutput = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName)); + String[] simpleDescribeOutputRows = simpleDescribeOutput.split(System.lineSeparator()); + assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", testTopicName)), + "Unexpected describe output: " + simpleDescribeOutputRows[0]); + assertEquals(2, simpleDescribeOutputRows.length, + "Unexpected describe output length: " + simpleDescribeOutputRows.length); + + String underReplicatedOutput = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--under-replicated-partitions")); + assertEquals("", underReplicatedOutput, + String.format("--under-replicated-partitions shouldn't return anything: '%s'", underReplicatedOutput)); + + int maxRetries = 20; + long pause = 100L; + long waitTimeMs = maxRetries * pause; + AtomicReference reassignmentsRef = new AtomicReference<>(); + + TestUtils.waitForCondition( + () -> { + PartitionReassignment tempReassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp); + reassignmentsRef.set(tempReassignments); + return reassignmentsRef.get() != null; + }, waitTimeMs, "Reassignments did not become non-null within the specified time" + ); + + assertFalse(reassignmentsRef.get().addingReplicas().isEmpty()); + + ToolsTestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp)); + TestUtils.waitForCondition( + () -> adminClient.listPartitionReassignments().reassignments().get().isEmpty(), + CLUSTER_WAIT_MS, String.format("reassignmet not finished after %s ms", CLUSTER_WAIT_MS) + ); + } + } + + @ClusterTemplate("generate") + public void testDescribeAtMinIsrPartitions(ClusterInstance clusterInstance) throws InterruptedException { + String testTopicName = TestUtils.randomString(10); + + try (Admin adminClient = clusterInstance.createAdminClient()) { + Map topicConfig = new HashMap<>(); + topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4"); + + int partitions = 1; + short replicationFactor = 6; + + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig))); + clusterInstance.waitForTopic(testTopicName, partitions); + + clusterInstance.shutdownBroker(0); + clusterInstance.shutdownBroker(1); + assertEquals(4, clusterInstance.aliveBrokers().size()); + + TestUtils.waitForCondition( + () -> clusterInstance.aliveBrokers().values().stream().allMatch(broker -> broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 4), + CLUSTER_WAIT_MS, String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName) + ); + + + String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--at-min-isr-partitions", "--exclude-internal")); + String[] rows = output.split(System.lineSeparator()); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), + "Unexpected output: " + rows[0]); + assertEquals(1, rows.length); + } + } + + /** + * Test describe --under-min-isr-partitions option with four topics: + * (1) topic with partition under the configured min ISR count + * (2) topic with under-replicated partition (but not under min ISR count) + * (3) topic with offline partition + * (4) topic with fully replicated partition + * + * Output should only display the (1) topic with partition under min ISR count and (3) topic with offline partition + */ + @ClusterTemplate("generate") + public void testDescribeUnderMinIsrPartitionsMixed(ClusterInstance clusterInstance) throws InterruptedException { + try (Admin adminClient = clusterInstance.createAdminClient()) { + String underMinIsrTopic = "under-min-isr-topic"; + String notUnderMinIsrTopic = "not-under-min-isr-topic"; + String offlineTopic = "offline-topic"; + String fullyReplicatedTopic = "fully-replicated-topic"; + int partitions = 1; + short replicationFactor = 6; + + List newTopics = new ArrayList<>(); + + Map> fullyReplicatedReplicaAssignmentMap = new HashMap<>(); + fullyReplicatedReplicaAssignmentMap.put(0, Arrays.asList(1, 2, 3)); + + Map> offlineReplicaAssignmentMap = new HashMap<>(); + offlineReplicaAssignmentMap.put(0, Arrays.asList(0)); + + Map topicConfig = new HashMap<>(); + topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); + + newTopics.add(new NewTopic(underMinIsrTopic, partitions, replicationFactor).configs(topicConfig)); + newTopics.add(new NewTopic(notUnderMinIsrTopic, partitions, replicationFactor)); + newTopics.add(new NewTopic(offlineTopic, offlineReplicaAssignmentMap)); + newTopics.add(new NewTopic(fullyReplicatedTopic, fullyReplicatedReplicaAssignmentMap)); + + adminClient.createTopics(newTopics); + for (NewTopic topioc: newTopics) { + clusterInstance.waitForTopic(topioc.name(), partitions); + } + + clusterInstance.shutdownBroker(0); + Assertions.assertEquals(5, clusterInstance.aliveBrokers().size()); + + TestUtils.waitForCondition( + () -> clusterInstance.aliveBrokers().values().stream().allMatch(broker -> + broker.metadataCache().getPartitionInfo(underMinIsrTopic, 0).get().isr().size() < 6 && + broker.metadataCache().getPartitionInfo(offlineTopic, 0).get().leader() == MetadataResponse.NO_LEADER_ID), + CLUSTER_WAIT_MS, "Timeout waiting for partition metadata propagating to brokers for underMinIsrTopic topic" + ); + + TestUtils.waitForCondition( + () -> adminClient.listPartitionReassignments().reassignments().get().isEmpty(), + CLUSTER_WAIT_MS, String.format("reassignmet not finished after %s ms", CLUSTER_WAIT_MS) + ); + + String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--under-min-isr-partitions", "--exclude-internal")); + String[] rows = output.split(System.lineSeparator()); + assertTrue(rows[0].startsWith(String.format("Topic: %s", underMinIsrTopic)), + "Unexpected output: " + rows[0]); + assertTrue(rows[1].startsWith(String.format("\tTopic: %s", offlineTopic)), + "Unexpected output: " + rows[1]); + assertEquals(2, rows.length); + + } + } + + @ClusterTemplate("generate") + public void testDescribeReportOverriddenConfigs(ClusterInstance clusterInstance) throws InterruptedException { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + String config = "file.delete.delay.ms=1000"; + Map topicConfig = new HashMap<>(); + topicConfig.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1000"); + + int partitions = 2; + short replicationFactor = 2; + + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig))); + clusterInstance.waitForTopic(testTopicName, partitions); + + String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe")); + assertTrue(output.contains(config), String.format("Describe output should have contained %s", config)); + } + } + + @ClusterTemplate("generate") + public void testDescribeAndListTopicsWithoutInternalTopics(ClusterInstance clusterInstance) throws InterruptedException { + String testTopicName = TestUtils.randomString(10); + try (Admin adminClient = clusterInstance.createAdminClient()) { + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + // test describe + String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--describe", "--exclude-internal")); + assertTrue(output.contains(testTopicName), + String.format("Output should have contained %s", testTopicName)); + assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), + "Output should not have contained " + Topic.GROUP_METADATA_TOPIC_NAME); + + // test list + output = captureListTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--list", "--exclude-internal")); + assertTrue(output.contains(testTopicName), String.format("Output should have contained %s", testTopicName)); + assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), + "Output should not have contained " + Topic.GROUP_METADATA_TOPIC_NAME); + } + } + + @ClusterTemplate("generate") + public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(ClusterInstance clusterInstance) throws Exception { + String testTopicName = TestUtils.randomString(10); + Admin adminClient = clusterInstance.createAdminClient(); + + adminClient = spy(adminClient); + + ListPartitionReassignmentsResult result = AdminClientTestUtils.listPartitionReassignmentsResult( + new ClusterAuthorizationException("Unauthorized")); + + doReturn(result).when(adminClient).listPartitionReassignments( + Collections.singleton(new TopicPartition(testTopicName, 0)) + ); + adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor))); + clusterInstance.waitForTopic(testTopicName, defaultNumPartitions); + + String output = captureDescribeTopicStandardOut(clusterInstance, buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName)); + String[] rows = output.split(System.lineSeparator()); + assertEquals(2, rows.length, "Unexpected output: " + output); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Unexpected output: " + rows[0]); + + adminClient.close(); + } + + @ClusterTemplate("generate") + public void testCreateWithTopicNameCollision(ClusterInstance clusterInstance) throws Exception { + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + + String topic = "foo_bar"; + int partitions = 1; + short replicationFactor = 6; + adminClient.createTopics(Collections.singletonList(new NewTopic(topic, partitions, replicationFactor))); + clusterInstance.waitForTopic(topic, defaultNumPartitions); + + assertThrows(TopicExistsException.class, + () -> topicService.createTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--topic", topic))); + + } + } + + private void checkReplicaDistribution(Map> assignment, + Map brokerRackMapping, + Integer numBrokers, + Integer numPartitions, + Integer replicationFactor, + Boolean verifyRackAware, + Boolean verifyLeaderDistribution, + Boolean verifyReplicasDistribution) { + // always verify that no broker will be assigned for more than one replica + assignment.entrySet().stream() + .forEach(entry -> assertEquals(new HashSet<>(entry.getValue()).size(), entry.getValue().size(), + "More than one replica is assigned to same broker for the same partition")); + + ReplicaDistributions distribution = getReplicaDistribution(assignment, brokerRackMapping); + + if (verifyRackAware) { + Map> partitionRackMap = distribution.partitionRacks; + + List partitionRackMapValueSize = partitionRackMap.values().stream() + .map(value -> (int) value.stream().distinct().count()) + .collect(Collectors.toList()); + + List expected = Collections.nCopies(numPartitions, replicationFactor); + assertEquals(expected, partitionRackMapValueSize, "More than one replica of the same partition is assigned to the same rack"); + } + + if (verifyLeaderDistribution) { + Map leaderCount = distribution.brokerLeaderCount; + int leaderCountPerBroker = numPartitions / numBrokers; + List expected = Collections.nCopies(numBrokers, leaderCountPerBroker); + assertEquals(expected, new ArrayList<>(leaderCount.values()), "Preferred leader count is not even for brokers"); + } + + if (verifyReplicasDistribution) { + Map replicasCount = distribution.brokerReplicasCount; + int numReplicasPerBroker = numPartitions * replicationFactor / numBrokers; + List expected = Collections.nCopies(numBrokers, numReplicasPerBroker); + assertEquals(expected, new ArrayList<>(replicasCount.values()), "Replica count is not even for broker"); + } + } + + private String captureDescribeTopicStandardOut(ClusterInstance clusterInstance, TopicCommand.TopicCommandOptions opts) { + Runnable runnable = () -> { + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + topicService.describeTopic(opts); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + return ToolsTestUtils.captureStandardOut(runnable); + } + + private String captureListTopicStandardOut(ClusterInstance clusterInstance, TopicCommand.TopicCommandOptions opts) { + Runnable runnable = () -> { + try (Admin adminClient = clusterInstance.createAdminClient(); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) { + topicService.listTopics(opts); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + return ToolsTestUtils.captureStandardOut(runnable); + } + + private static ReplicaDistributions getReplicaDistribution(Map> assignment, Map brokerRackMapping) { + Map leaderCount = new HashMap<>(); + Map partitionCount = new HashMap<>(); + Map> partitionRackMap = new HashMap<>(); + + assignment.entrySet().stream().forEach(entry -> { + Integer partitionId = entry.getKey(); + List replicaList = entry.getValue(); + Integer leader = replicaList.get(0); + leaderCount.put(leader, leaderCount.getOrDefault(leader, 0) + 1); + replicaList.stream().forEach(brokerId -> { + partitionCount.put(brokerId, partitionCount.getOrDefault(brokerId, 0) + 1); + String rack; + if (brokerRackMapping.containsKey(brokerId)) { + rack = brokerRackMapping.get(brokerId); + List partitionRackValues = Stream.of(Collections.singletonList(rack), partitionRackMap.getOrDefault(partitionId, Collections.emptyList())) + .flatMap(List::stream) + .collect(Collectors.toList()); + partitionRackMap.put(partitionId, partitionRackValues); + } else { + System.err.printf("No mapping found for %s in `brokerRackMapping`%n", brokerId); + } + }); + }); + return new ReplicaDistributions(partitionRackMap, leaderCount, partitionCount); + } + + private static class ReplicaDistributions { + private final Map> partitionRacks; + private final Map brokerLeaderCount; + private final Map brokerReplicasCount; + + public ReplicaDistributions(Map> partitionRacks, + Map brokerLeaderCount, + Map brokerReplicasCount) { + this.partitionRacks = partitionRacks; + this.brokerLeaderCount = brokerLeaderCount; + this.brokerReplicasCount = brokerReplicasCount; + } + } + + private KafkaProducer createProducer(ClusterInstance clusterInstance) { + Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + producerProps.put(ProducerConfig.ACKS_CONFIG, "-1"); + return new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer()); + } + + private void sendProducerRecords(String testTopicName, KafkaProducer producer, int numMessage) { + IntStream.range(0, numMessage).forEach(i -> producer.send(new ProducerRecord<>(testTopicName, "test-" + i))); + producer.flush(); + } }