From 13e8afad1335ed5e0ac4974cc13c84529b41a890 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Tue, 14 May 2024 09:42:48 +0800 Subject: [PATCH 01/11] fix KAFKA-16705: the flag "started" of RaftClusterInstance is false even though the cluster is started --- .../junit/RaftClusterInvocationContext.java | 154 +++++++++--------- 1 file changed, 74 insertions(+), 80 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index ad4d549e6ae80..4b8d166ea117c 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -21,9 +21,9 @@ import kafka.server.BrokerFeatures; import kafka.server.BrokerServer; import kafka.server.ControllerServer; -import kafka.test.annotation.Type; import kafka.test.ClusterConfig; import kafka.test.ClusterInstance; +import kafka.test.annotation.Type; import kafka.testkit.KafkaClusterTestKit; import kafka.testkit.TestKitNodes; import kafka.zk.EmbeddedZookeeper; @@ -47,7 +47,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -66,96 +65,66 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte private final String baseDisplayName; private final ClusterConfig clusterConfig; - private final AtomicReference clusterReference; - private final AtomicReference zkReference; private final boolean isCombined; public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig clusterConfig, boolean isCombined) { this.baseDisplayName = baseDisplayName; this.clusterConfig = clusterConfig; - this.clusterReference = new AtomicReference<>(); - this.zkReference = new AtomicReference<>(); this.isCombined = isCombined; } @Override public String getDisplayName(int invocationIndex) { String clusterDesc = clusterConfig.nameTags().entrySet().stream() - .map(Object::toString) - .collect(Collectors.joining(", ")); + .map(Object::toString) + .collect(Collectors.joining(", ")); return String.format("%s [%d] Type=Raft-%s, %s", baseDisplayName, invocationIndex, isCombined ? "Combined" : "Isolated", clusterDesc); } @Override public List getAdditionalExtensions() { - RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, zkReference, clusterConfig, isCombined); + RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterConfig, isCombined); return Arrays.asList( - (BeforeTestExecutionCallback) context -> { - TestKitNodes nodes = new TestKitNodes.Builder(). - setBootstrapMetadataVersion(clusterConfig.metadataVersion()). - setCombined(isCombined). - setNumBrokerNodes(clusterConfig.numBrokers()). - setPerServerProperties(clusterConfig.perServerOverrideProperties()). - setNumDisksPerBroker(clusterConfig.numDisksPerBroker()). - setNumControllerNodes(clusterConfig.numControllers()).build(); - KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); - - if (Boolean.parseBoolean(clusterConfig.serverProperties().getOrDefault("zookeeper.metadata.migration.enable", "false"))) { - zkReference.set(new EmbeddedZookeeper()); - builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", zkReference.get().port())); - } - // Copy properties into the TestKit builder - clusterConfig.serverProperties().forEach(builder::setConfigProp); - // KAFKA-12512 need to pass security protocol and listener name here - KafkaClusterTestKit cluster = builder.build(); - clusterReference.set(cluster); - cluster.format(); - if (clusterConfig.isAutoStart()) { - cluster.startup(); - kafka.utils.TestUtils.waitUntilTrue( - () -> cluster.brokers().get(0).brokerState() == BrokerState.RUNNING, - () -> "Broker never made it to RUNNING state.", - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, - 100L); - } - }, - (AfterTestExecutionCallback) context -> clusterInstance.stop(), - new ClusterInstanceParameterResolver(clusterInstance) + (BeforeTestExecutionCallback) context -> { + if (clusterConfig.isAutoStart()) { + clusterInstance.start(); + } + }, + (AfterTestExecutionCallback) context -> clusterInstance.stop(), + new ClusterInstanceParameterResolver(clusterInstance) ); } public static class RaftClusterInstance implements ClusterInstance { - private final AtomicReference clusterReference; - private final AtomicReference zkReference; private final ClusterConfig clusterConfig; final AtomicBoolean started = new AtomicBoolean(false); final AtomicBoolean stopped = new AtomicBoolean(false); private final ConcurrentLinkedQueue admins = new ConcurrentLinkedQueue<>(); + private EmbeddedZookeeper embeddedZookeeper; + private KafkaClusterTestKit clusterTestKit; private final boolean isCombined; - RaftClusterInstance(AtomicReference clusterReference, AtomicReference zkReference, ClusterConfig clusterConfig, boolean isCombined) { - this.clusterReference = clusterReference; - this.zkReference = zkReference; + RaftClusterInstance(ClusterConfig clusterConfig, boolean isCombined) { this.clusterConfig = clusterConfig; this.isCombined = isCombined; } @Override public String bootstrapServers() { - return clusterReference.get().bootstrapServers(); + return clusterTestKit.bootstrapServers(); } @Override public String bootstrapControllers() { - return clusterReference.get().bootstrapControllers(); + return clusterTestKit.bootstrapControllers(); } @Override public Collection brokerSocketServers() { return brokers() - .map(BrokerServer::socketServer) - .collect(Collectors.toList()); + .map(BrokerServer::socketServer) + .collect(Collectors.toList()); } @Override @@ -171,39 +140,39 @@ public Optional controllerListenerName() { @Override public Collection controllerSocketServers() { return controllers() - .map(ControllerServer::socketServer) - .collect(Collectors.toList()); + .map(ControllerServer::socketServer) + .collect(Collectors.toList()); } @Override public SocketServer anyBrokerSocketServer() { return brokers() - .map(BrokerServer::socketServer) - .findFirst() - .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); + .map(BrokerServer::socketServer) + .findFirst() + .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); } @Override public SocketServer anyControllerSocketServer() { return controllers() - .map(ControllerServer::socketServer) - .findFirst() - .orElseThrow(() -> new RuntimeException("No controller SocketServers found")); + .map(ControllerServer::socketServer) + .findFirst() + .orElseThrow(() -> new RuntimeException("No controller SocketServers found")); } @Override public Map brokerFeatures() { return brokers().collect(Collectors.toMap( - brokerServer -> brokerServer.config().nodeId(), - BrokerServer::brokerFeatures + brokerServer -> brokerServer.config().nodeId(), + BrokerServer::brokerFeatures )); } @Override public String clusterId() { return controllers().findFirst().map(ControllerServer::clusterId).orElse( - brokers().findFirst().map(BrokerServer::clusterId).orElseThrow( - () -> new RuntimeException("No controllers or brokers!")) + brokers().findFirst().map(BrokerServer::clusterId).orElseThrow( + () -> new RuntimeException("No controllers or brokers!")) ); } @@ -224,26 +193,25 @@ public ClusterConfig config() { @Override public Set controllerIds() { return controllers() - .map(controllerServer -> controllerServer.config().nodeId()) - .collect(Collectors.toSet()); + .map(controllerServer -> controllerServer.config().nodeId()) + .collect(Collectors.toSet()); } @Override public Set brokerIds() { return brokers() - .map(brokerServer -> brokerServer.config().nodeId()) - .collect(Collectors.toSet()); + .map(brokerServer -> brokerServer.config().nodeId()) + .collect(Collectors.toSet()); } @Override public KafkaClusterTestKit getUnderlying() { - return clusterReference.get(); + return clusterTestKit; } @Override public Admin createAdminClient(Properties configOverrides) { - Admin admin = Admin.create(clusterReference.get(). - newClientPropertiesBuilder(configOverrides).build()); + Admin admin = Admin.create(clusterTestKit.newClientPropertiesBuilder(configOverrides).build()); admins.add(admin); return admin; } @@ -252,7 +220,13 @@ public Admin createAdminClient(Properties configOverrides) { public void start() { if (started.compareAndSet(false, true)) { try { - clusterReference.get().startup(); + buildAndFormatCluster(); + clusterTestKit.startup(); + kafka.utils.TestUtils.waitUntilTrue( + () -> this.clusterTestKit.brokers().get(0).brokerState() == BrokerState.RUNNING, + () -> "Broker never made it to RUNNING state.", + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, + 100L); } catch (Exception e) { throw new RuntimeException("Failed to start Raft server", e); } @@ -264,9 +238,9 @@ public void stop() { if (stopped.compareAndSet(false, true)) { admins.forEach(admin -> Utils.closeQuietly(admin, "admin")); admins.clear(); - Utils.closeQuietly(clusterReference.get(), "cluster"); - if (zkReference.get() != null) { - Utils.closeQuietly(zkReference.get(), "zk"); + Utils.closeQuietly(clusterTestKit, "cluster"); + if (embeddedZookeeper != null) { + Utils.closeQuietly(embeddedZookeeper, "zk"); } } } @@ -284,24 +258,44 @@ public void startBroker(int brokerId) { @Override public void waitForReadyBrokers() throws InterruptedException { try { - clusterReference.get().waitForReadyBrokers(); + clusterTestKit.waitForReadyBrokers(); } catch (ExecutionException e) { throw new AssertionError("Failed while waiting for brokers to become ready", e); } } - private BrokerServer findBrokerOrThrow(int brokerId) { - return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) - .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); - } - public Stream brokers() { - return clusterReference.get().brokers().values().stream(); + return clusterTestKit.brokers().values().stream(); } public Stream controllers() { - return clusterReference.get().controllers().values().stream(); + return clusterTestKit.controllers().values().stream(); } + private BrokerServer findBrokerOrThrow(int brokerId) { + return Optional.ofNullable(clusterTestKit.brokers().get(brokerId)) + .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); + } + + private void buildAndFormatCluster() throws Exception { + TestKitNodes nodes = new TestKitNodes.Builder() + .setBootstrapMetadataVersion(clusterConfig.metadataVersion()) + .setCombined(isCombined) + .setNumBrokerNodes(clusterConfig.numBrokers()) + .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) + .setPerServerProperties(clusterConfig.perServerOverrideProperties()) + .setNumControllerNodes(clusterConfig.numControllers()).build(); + KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); + if (Boolean.parseBoolean(clusterConfig.serverProperties() + .getOrDefault("zookeeper.metadata.migration.enable", "false"))) { + this.embeddedZookeeper = new EmbeddedZookeeper(); + builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", embeddedZookeeper.port())); + } + // Copy properties into the TestKit builder + clusterConfig.serverProperties().forEach(builder::setConfigProp); + // KAFKA-12512 need to pass security protocol and listener name here + this.clusterTestKit = builder.build(); + this.clusterTestKit.format(); + } } } From 3b61f33f3cf76fda515d1c5077e65dcb89cf7b26 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Wed, 15 May 2024 11:32:03 +0800 Subject: [PATCH 02/11] add a format public method --- .../kafka/test/junit/RaftClusterInvocationContext.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 4b8d166ea117c..702eb681c1e09 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -272,6 +272,13 @@ public Stream controllers() { return clusterTestKit.controllers().values().stream(); } + public void format() throws Exception { + if(this.clusterTestKit == null){ + throw new IllegalStateException("Cluster not started yet"); + } + this.clusterTestKit.format(); + } + private BrokerServer findBrokerOrThrow(int brokerId) { return Optional.ofNullable(clusterTestKit.brokers().get(brokerId)) .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); From e6cac8542407046373b410e8ec9290388c78dcd0 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Wed, 15 May 2024 14:36:56 +0800 Subject: [PATCH 03/11] refactor build and format --- .../junit/RaftClusterInvocationContext.java | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 702eb681c1e09..44e525745b13a 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -78,7 +78,8 @@ public String getDisplayName(int invocationIndex) { String clusterDesc = clusterConfig.nameTags().entrySet().stream() .map(Object::toString) .collect(Collectors.joining(", ")); - return String.format("%s [%d] Type=Raft-%s, %s", baseDisplayName, invocationIndex, isCombined ? "Combined" : "Isolated", clusterDesc); + return String.format("%s [%d] Type=Raft-%s, %s", baseDisplayName, invocationIndex, isCombined ? "Combined" : + "Isolated", clusterDesc); } @Override @@ -102,7 +103,7 @@ public static class RaftClusterInstance implements ClusterInstance { final AtomicBoolean stopped = new AtomicBoolean(false); private final ConcurrentLinkedQueue admins = new ConcurrentLinkedQueue<>(); private EmbeddedZookeeper embeddedZookeeper; - private KafkaClusterTestKit clusterTestKit; + private volatile KafkaClusterTestKit clusterTestKit; private final boolean isCombined; RaftClusterInstance(ClusterConfig clusterConfig, boolean isCombined) { @@ -220,7 +221,7 @@ public Admin createAdminClient(Properties configOverrides) { public void start() { if (started.compareAndSet(false, true)) { try { - buildAndFormatCluster(); + this.format(); clusterTestKit.startup(); kafka.utils.TestUtils.waitUntilTrue( () -> this.clusterTestKit.brokers().get(0).brokerState() == BrokerState.RUNNING, @@ -273,9 +274,7 @@ public Stream controllers() { } public void format() throws Exception { - if(this.clusterTestKit == null){ - throw new IllegalStateException("Cluster not started yet"); - } + safeBuildCluster(); this.clusterTestKit.format(); } @@ -284,7 +283,18 @@ private BrokerServer findBrokerOrThrow(int brokerId) { .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); } - private void buildAndFormatCluster() throws Exception { + private void safeBuildCluster() throws Exception { + if (this.clusterTestKit != null) { + return; + } + synchronized (this) { + if (this.clusterTestKit == null) { + doBuild(); + } + } + } + + private void doBuild() throws Exception { TestKitNodes nodes = new TestKitNodes.Builder() .setBootstrapMetadataVersion(clusterConfig.metadataVersion()) .setCombined(isCombined) @@ -302,7 +312,6 @@ private void buildAndFormatCluster() throws Exception { clusterConfig.serverProperties().forEach(builder::setConfigProp); // KAFKA-12512 need to pass security protocol and listener name here this.clusterTestKit = builder.build(); - this.clusterTestKit.format(); } } } From b944fb07146e1e8635679fa1cc7953b5cc7b9094 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Thu, 16 May 2024 16:47:46 +0800 Subject: [PATCH 04/11] change back code that was modified by formatting code --- .../junit/RaftClusterInvocationContext.java | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 44e525745b13a..a24119efc5375 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -21,9 +21,9 @@ import kafka.server.BrokerFeatures; import kafka.server.BrokerServer; import kafka.server.ControllerServer; +import kafka.test.annotation.Type; import kafka.test.ClusterConfig; import kafka.test.ClusterInstance; -import kafka.test.annotation.Type; import kafka.testkit.KafkaClusterTestKit; import kafka.testkit.TestKitNodes; import kafka.zk.EmbeddedZookeeper; @@ -76,10 +76,9 @@ public RaftClusterInvocationContext(String baseDisplayName, ClusterConfig cluste @Override public String getDisplayName(int invocationIndex) { String clusterDesc = clusterConfig.nameTags().entrySet().stream() - .map(Object::toString) - .collect(Collectors.joining(", ")); - return String.format("%s [%d] Type=Raft-%s, %s", baseDisplayName, invocationIndex, isCombined ? "Combined" : - "Isolated", clusterDesc); + .map(Object::toString) + .collect(Collectors.joining(", ")); + return String.format("%s [%d] Type=Raft-%s, %s", baseDisplayName, invocationIndex, isCombined ? "Combined" : "Isolated", clusterDesc); } @Override @@ -141,39 +140,39 @@ public Optional controllerListenerName() { @Override public Collection controllerSocketServers() { return controllers() - .map(ControllerServer::socketServer) - .collect(Collectors.toList()); + .map(ControllerServer::socketServer) + .collect(Collectors.toList()); } @Override public SocketServer anyBrokerSocketServer() { return brokers() - .map(BrokerServer::socketServer) - .findFirst() - .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); + .map(BrokerServer::socketServer) + .findFirst() + .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); } @Override public SocketServer anyControllerSocketServer() { return controllers() - .map(ControllerServer::socketServer) - .findFirst() - .orElseThrow(() -> new RuntimeException("No controller SocketServers found")); + .map(ControllerServer::socketServer) + .findFirst() + .orElseThrow(() -> new RuntimeException("No controller SocketServers found")); } @Override public Map brokerFeatures() { return brokers().collect(Collectors.toMap( - brokerServer -> brokerServer.config().nodeId(), - BrokerServer::brokerFeatures + brokerServer -> brokerServer.config().nodeId(), + BrokerServer::brokerFeatures )); } @Override public String clusterId() { return controllers().findFirst().map(ControllerServer::clusterId).orElse( - brokers().findFirst().map(BrokerServer::clusterId).orElseThrow( - () -> new RuntimeException("No controllers or brokers!")) + brokers().findFirst().map(BrokerServer::clusterId).orElseThrow( + () -> new RuntimeException("No controllers or brokers!")) ); } @@ -194,15 +193,15 @@ public ClusterConfig config() { @Override public Set controllerIds() { return controllers() - .map(controllerServer -> controllerServer.config().nodeId()) - .collect(Collectors.toSet()); + .map(controllerServer -> controllerServer.config().nodeId()) + .collect(Collectors.toSet()); } @Override public Set brokerIds() { return brokers() - .map(brokerServer -> brokerServer.config().nodeId()) - .collect(Collectors.toSet()); + .map(brokerServer -> brokerServer.config().nodeId()) + .collect(Collectors.toSet()); } @Override From ed88a63e574eda2704f0fc5628beaf81dd693032 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Thu, 16 May 2024 17:38:06 +0800 Subject: [PATCH 05/11] remove sync init. separate format and start --- .../junit/RaftClusterInvocationContext.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index a24119efc5375..3a3edbc4d7aa6 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -100,9 +100,10 @@ public static class RaftClusterInstance implements ClusterInstance { private final ClusterConfig clusterConfig; final AtomicBoolean started = new AtomicBoolean(false); final AtomicBoolean stopped = new AtomicBoolean(false); + final AtomicBoolean formated = new AtomicBoolean(false); private final ConcurrentLinkedQueue admins = new ConcurrentLinkedQueue<>(); private EmbeddedZookeeper embeddedZookeeper; - private volatile KafkaClusterTestKit clusterTestKit; + private KafkaClusterTestKit clusterTestKit; private final boolean isCombined; RaftClusterInstance(ClusterConfig clusterConfig, boolean isCombined) { @@ -220,7 +221,7 @@ public Admin createAdminClient(Properties configOverrides) { public void start() { if (started.compareAndSet(false, true)) { try { - this.format(); + safeBuildCluster(); clusterTestKit.startup(); kafka.utils.TestUtils.waitUntilTrue( () -> this.clusterTestKit.brokers().get(0).brokerState() == BrokerState.RUNNING, @@ -273,8 +274,10 @@ public Stream controllers() { } public void format() throws Exception { - safeBuildCluster(); - this.clusterTestKit.format(); + if (formated.compareAndSet(false,true)) { + safeBuildCluster(); + this.clusterTestKit.format(); + } } private BrokerServer findBrokerOrThrow(int brokerId) { @@ -283,13 +286,8 @@ private BrokerServer findBrokerOrThrow(int brokerId) { } private void safeBuildCluster() throws Exception { - if (this.clusterTestKit != null) { - return; - } - synchronized (this) { - if (this.clusterTestKit == null) { - doBuild(); - } + if (this.clusterTestKit == null) { + doBuild(); } } From 5b08ffefc8256f6ea9ac8451c6e82f4400158d67 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Fri, 26 Apr 2024 11:36:30 +0800 Subject: [PATCH 06/11] a slight change. The modifiers should be in uniform order --- .../main/java/org/apache/kafka/timeline/BaseHashTable.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java index 783d7e1064336..9d41aa65a520c 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java @@ -37,17 +37,17 @@ class BaseHashTable { /** * The maximum load factor we will allow the hash table to climb to before expanding. */ - private final static double MAX_LOAD_FACTOR = 0.75f; + private static final double MAX_LOAD_FACTOR = 0.75f; /** * The minimum number of slots we can have in the hash table. */ - final static int MIN_CAPACITY = 2; + static final int MIN_CAPACITY = 2; /** * The maximum number of slots we can have in the hash table. */ - final static int MAX_CAPACITY = 1 << 30; + static final int MAX_CAPACITY = 1 << 30; private Object[] elements; private int size = 0; From e3e1f312c283e5d3834ef8db09fa09784be0587f Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Fri, 17 May 2024 09:54:25 +0800 Subject: [PATCH 07/11] rebase merge --- .../junit/RaftClusterInvocationContext.java | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 730c583ae905b..111b8cdc69a28 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -119,13 +119,6 @@ public String bootstrapControllers() { return clusterTestKit.bootstrapControllers(); } - @Override - public Collection brokerSocketServers() { - return brokers() - .map(BrokerServer::socketServer) - .collect(Collectors.toList()); - } - @Override public ListenerName clientListener() { return ListenerName.normalised("EXTERNAL"); @@ -228,23 +221,17 @@ public void waitForReadyBrokers() throws InterruptedException { } } - private BrokerServer findBrokerOrThrow(int brokerId) { - return Optional.ofNullable(clusterReference.get().brokers().get(brokerId)) - .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); - } @Override public Map brokers() { - return clusterReference.get().brokers().entrySet() + return clusterTestKit.brokers().entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @Override public Map controllers() { - return Collections.unmodifiableMap(clusterReference.get().controllers()); - public Stream controllers() { - return clusterTestKit.controllers().values().stream(); + return Collections.unmodifiableMap(clusterTestKit.controllers()); } public void format() throws Exception { From 2a3644e0e2a855f086a6293070f3c09cea2db1d0 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Fri, 17 May 2024 09:55:56 +0800 Subject: [PATCH 08/11] revert base hash table --- .../main/java/org/apache/kafka/timeline/BaseHashTable.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java index 9d41aa65a520c..783d7e1064336 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java @@ -37,17 +37,17 @@ class BaseHashTable { /** * The maximum load factor we will allow the hash table to climb to before expanding. */ - private static final double MAX_LOAD_FACTOR = 0.75f; + private final static double MAX_LOAD_FACTOR = 0.75f; /** * The minimum number of slots we can have in the hash table. */ - static final int MIN_CAPACITY = 2; + final static int MIN_CAPACITY = 2; /** * The maximum number of slots we can have in the hash table. */ - static final int MAX_CAPACITY = 1 << 30; + final static int MAX_CAPACITY = 1 << 30; private Object[] elements; private int size = 0; From 55a3cf9595b52762ab067070ea3225eab4d55f42 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Mon, 20 May 2024 09:37:45 +0800 Subject: [PATCH 09/11] fix checkstyle --- .../java/kafka/test/junit/RaftClusterInvocationContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 111b8cdc69a28..0535504d197f2 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -235,7 +235,7 @@ public Map controllers() { } public void format() throws Exception { - if (formated.compareAndSet(false,true)) { + if (formated.compareAndSet(false, true)) { safeBuildCluster(); this.clusterTestKit.format(); } From f24c0b2ecc67f51fda82bf7aedc5df3997c2a5c2 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Tue, 21 May 2024 10:10:22 +0800 Subject: [PATCH 10/11] merge `format` and `doBuild` --- .../junit/RaftClusterInvocationContext.java | 53 ++++++++----------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 0535504d197f2..95ace6aa9c90a 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -175,18 +175,18 @@ public Admin createAdminClient(Properties configOverrides) { @Override public void start() { - if (started.compareAndSet(false, true)) { - try { - safeBuildCluster(); + try { + format(); + if (started.compareAndSet(false, true)) { clusterTestKit.startup(); kafka.utils.TestUtils.waitUntilTrue( () -> this.clusterTestKit.brokers().get(0).brokerState() == BrokerState.RUNNING, () -> "Broker never made it to RUNNING state.", org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); - } catch (Exception e) { - throw new RuntimeException("Failed to start Raft server", e); } + } catch (Exception e) { + throw new RuntimeException("Failed to start Raft server", e); } } @@ -236,7 +236,23 @@ public Map controllers() { public void format() throws Exception { if (formated.compareAndSet(false, true)) { - safeBuildCluster(); + TestKitNodes nodes = new TestKitNodes.Builder() + .setBootstrapMetadataVersion(clusterConfig.metadataVersion()) + .setCombined(isCombined) + .setNumBrokerNodes(clusterConfig.numBrokers()) + .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) + .setPerServerProperties(clusterConfig.perServerOverrideProperties()) + .setNumControllerNodes(clusterConfig.numControllers()).build(); + KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); + if (Boolean.parseBoolean(clusterConfig.serverProperties() + .getOrDefault("zookeeper.metadata.migration.enable", "false"))) { + this.embeddedZookeeper = new EmbeddedZookeeper(); + builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", embeddedZookeeper.port())); + } + // Copy properties into the TestKit builder + clusterConfig.serverProperties().forEach(builder::setConfigProp); + // KAFKA-12512 need to pass security protocol and listener name here + this.clusterTestKit = builder.build(); this.clusterTestKit.format(); } } @@ -246,30 +262,5 @@ private BrokerServer findBrokerOrThrow(int brokerId) { .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); } - private void safeBuildCluster() throws Exception { - if (this.clusterTestKit == null) { - doBuild(); - } - } - - private void doBuild() throws Exception { - TestKitNodes nodes = new TestKitNodes.Builder() - .setBootstrapMetadataVersion(clusterConfig.metadataVersion()) - .setCombined(isCombined) - .setNumBrokerNodes(clusterConfig.numBrokers()) - .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) - .setPerServerProperties(clusterConfig.perServerOverrideProperties()) - .setNumControllerNodes(clusterConfig.numControllers()).build(); - KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); - if (Boolean.parseBoolean(clusterConfig.serverProperties() - .getOrDefault("zookeeper.metadata.migration.enable", "false"))) { - this.embeddedZookeeper = new EmbeddedZookeeper(); - builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", embeddedZookeeper.port())); - } - // Copy properties into the TestKit builder - clusterConfig.serverProperties().forEach(builder::setConfigProp); - // KAFKA-12512 need to pass security protocol and listener name here - this.clusterTestKit = builder.build(); - } } } From a06864a7ea4bcbabc8454d522af2f5d4b99a8892 Mon Sep 17 00:00:00 2001 From: gongxuanzhang Date: Wed, 22 May 2024 11:24:43 +0800 Subject: [PATCH 11/11] in order to compatibility,add before callback `format` --- .../test/java/kafka/test/junit/RaftClusterInvocationContext.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 95ace6aa9c90a..9857d4c92cd39 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -84,6 +84,7 @@ public List getAdditionalExtensions() { RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterConfig, isCombined); return Arrays.asList( (BeforeTestExecutionCallback) context -> { + clusterInstance.format(); if (clusterConfig.isAutoStart()) { clusterInstance.start(); }