diff --git a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java index 709454beccd6b..535db61a0267c 100644 --- a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java +++ b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java @@ -18,11 +18,11 @@ import kafka.test.ClusterInstance; import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTests; import kafka.test.annotation.Type; import kafka.test.junit.ClusterTestExtensions; import kafka.test.junit.RaftClusterInvocationContext; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -40,6 +40,7 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; +import scala.jdk.javaapi.CollectionConverters; import java.io.IOException; import java.time.Duration; @@ -49,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -66,15 +68,20 @@ public LogManagerIntegrationTest(ClusterInstance cluster) { this.cluster = cluster; } - @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3) + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT, brokers = 3), + @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3) + }) public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException { RaftClusterInvocationContext.RaftClusterInstance raftInstance = (RaftClusterInvocationContext.RaftClusterInstance) cluster; try (Admin admin = cluster.createAdminClient()) { - admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short) 3))).all().get(); + kafka.utils.TestUtils.createTopicWithAdmin(admin, "foo", + CollectionConverters.asScala(raftInstance.brokers().iterator()).toSeq(), + CollectionConverters.asScala(raftInstance.controllers().iterator()).toSeq(), + 1, 3, CollectionConverters.asScala(Collections.emptyMap()), new Properties()); } - cluster.waitForTopic("foo", 1); Optional partitionMetadataFile = Optional.ofNullable( raftInstance.getUnderlying().brokers().get(0).logManager()