diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index f9311d20d9593..cd8a973965499 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -26,7 +26,7 @@ import kafka.log.LogManager import kafka.log.UnifiedLog import kafka.raft.KafkaRaftManager.RaftIoThread import kafka.server.KafkaRaftServer.ControllerRole -import kafka.server.{KafkaConfig, MetaProperties} +import kafka.server.KafkaConfig import kafka.utils.CoreUtils import kafka.utils.FileLock import kafka.utils.Logging @@ -128,7 +128,7 @@ trait RaftManager[T] { } class KafkaRaftManager[T]( - metaProperties: MetaProperties, + clusterId: String, config: KafkaConfig, recordSerde: RecordSerde[T], topicPartition: TopicPartition, @@ -241,7 +241,7 @@ class KafkaRaftManager[T]( metrics, expirationService, logContext, - metaProperties.clusterId, + clusterId, nodeId, raftConfig ) diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 856d9365ae883..48d4f840fd60a 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -73,7 +73,7 @@ class ControllerApis( val controller: Controller, val raftManager: RaftManager[ApiMessageAndVersion], val config: KafkaConfig, - val metaProperties: MetaProperties, + val clusterId: String, val registrationsPublisher: ControllerRegistrationsPublisher, val apiVersionManager: ApiVersionManager, val metadataCache: KRaftMetadataCache @@ -1066,7 +1066,7 @@ class ControllerApis( val response = authHelper.computeDescribeClusterResponse( request, EndpointType.CONTROLLER, - metaProperties.clusterId, + clusterId, () => registrationsPublisher.describeClusterControllers(request.context.listenerName()), () => raftManager.leaderAndEpoch.leaderId().orElse(-1) ) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 9e49b7ffd4ccb..f684ee4ede148 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -137,7 +137,7 @@ class ControllerServer( true } - def clusterId: String = sharedServer.metaProps.clusterId + def clusterId: String = sharedServer.clusterId() def startup(): Unit = { if (!maybeChangeStatus(SHUTDOWN, STARTING)) return @@ -319,7 +319,7 @@ class ControllerServer( controller, raftManager, config, - sharedServer.metaProps, + clusterId, registrationsPublisher, apiVersionManager, metadataCache) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b6cea90abeacb..1f87f752139df 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -387,11 +387,10 @@ class KafkaServer( isZkBroker = true) // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller - val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId) val controllerQuorumVotersFuture = CompletableFuture.completedFuture( RaftConfig.parseVoterConnections(config.quorumVoters)) val raftManager = new KafkaRaftManager[ApiMessageAndVersion]( - kraftMetaProps, + clusterId, config, new MetadataRecordSerde, KafkaRaftServer.MetadataPartition, @@ -436,7 +435,7 @@ class KafkaServer( lifecycleManager.start( () => listener.highestOffset, brokerToQuorumChannelManager, - kraftMetaProps.clusterId, + clusterId, networkListeners, ibpAsFeature, OptionalLong.empty() diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index dc627f9eaaac7..7bbccf93bf027 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -110,6 +110,8 @@ class SharedServer( @volatile var snapshotGenerator: SnapshotGenerator = _ @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _ + def clusterId(): String = metaProps.clusterId + def isUsed(): Boolean = synchronized { usedByController || usedByBroker } @@ -248,7 +250,7 @@ class SharedServer( controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry())) } val _raftManager = new KafkaRaftManager[ApiMessageAndVersion]( - metaProps, + clusterId(), sharedServerConfig, new MetadataRecordSerde, KafkaRaftServer.MetadataPartition, diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 1026c52847337..f2cdc26723a74 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -23,7 +23,7 @@ import joptsimple.OptionException import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.{KafkaRaftManager, RaftManager} import kafka.security.CredentialProvider -import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, SimpleApiVersionManager} +import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager} import kafka.utils.{CoreUtils, Exit, Logging} import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -82,13 +82,8 @@ class TestRaftServer( () => Features.fromKRaftVersion(MetadataVersion.MINIMUM_KRAFT_VERSION)) socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) - val metaProperties = MetaProperties( - clusterId = Uuid.ZERO_UUID.toString, - nodeId = config.nodeId - ) - raftManager = new KafkaRaftManager[Array[Byte]]( - metaProperties, + Uuid.ZERO_UUID.toString, config, new ByteArraySerde, partition, diff --git a/core/src/test/java/kafka/testkit/BrokerNode.java b/core/src/test/java/kafka/testkit/BrokerNode.java index 005d498c25f3b..1cc90276a7976 100644 --- a/core/src/test/java/kafka/testkit/BrokerNode.java +++ b/core/src/test/java/kafka/testkit/BrokerNode.java @@ -49,7 +49,9 @@ public Builder setMetadataDirectory(String metadataDirectory) { return this; } - public BrokerNode build() { + BrokerNode build( + String baseDirectory + ) { if (id == -1) { throw new RuntimeException("You must set the node id"); } @@ -60,9 +62,11 @@ public BrokerNode build() { logDataDirectories = Collections. singletonList(String.format("broker_%d_data0", id)); } + logDataDirectories = TestKitNodes.absolutize(baseDirectory, logDataDirectories); if (metadataDirectory == null) { metadataDirectory = logDataDirectories.get(0); } + metadataDirectory = TestKitNodes.absolutize(baseDirectory, metadataDirectory); return new BrokerNode(id, incarnationId, metadataDirectory, logDataDirectories); } diff --git a/core/src/test/java/kafka/testkit/ControllerNode.java b/core/src/test/java/kafka/testkit/ControllerNode.java index 3ee2b4d081a9d..11901d1f38731 100644 --- a/core/src/test/java/kafka/testkit/ControllerNode.java +++ b/core/src/test/java/kafka/testkit/ControllerNode.java @@ -32,13 +32,16 @@ public Builder setMetadataDirectory(String metadataDirectory) { return this; } - public ControllerNode build() { + public ControllerNode build( + String baseDirectory + ) { if (id == -1) { throw new RuntimeException("You must set the node id"); } if (metadataDirectory == null) { metadataDirectory = String.format("controller_%d", id); } + metadataDirectory = TestKitNodes.absolutize(baseDirectory, metadataDirectory); return new ControllerNode(id, metadataDirectory); } } diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 14b711c3fec92..7b6bef6f64c62 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -216,11 +216,9 @@ public KafkaClusterTestKit build() throws Exception { ExecutorService executorService = null; ControllerQuorumVotersFutureManager connectFutureManager = new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size()); - File baseDirectory = null; + File baseDirectory = new File(nodes.baseDirectory()); try { - baseDirectory = TestUtils.tempDirectory(); - nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath()); executorService = Executors.newFixedThreadPool(numOfExecutorThreads, ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false)); for (ControllerNode node : nodes.controllerNodes().values()) { diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java index a27dfc0c8ff2c..29aee50fb9cd8 100644 --- a/core/src/test/java/kafka/testkit/TestKitNodes.java +++ b/core/src/test/java/kafka/testkit/TestKitNodes.java @@ -20,14 +20,17 @@ import kafka.server.MetaProperties; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NavigableMap; import java.util.TreeMap; @@ -36,8 +39,8 @@ public static class Builder { private boolean combined = false; private Uuid clusterId = null; private MetadataVersion bootstrapMetadataVersion = null; - private final NavigableMap controllerNodes = new TreeMap<>(); - private final NavigableMap brokerNodes = new TreeMap<>(); + private final NavigableMap controllerNodeBuilders = new TreeMap<>(); + private final NavigableMap brokerNodeBuilders = new TreeMap<>(); public Builder setClusterId(Uuid clusterId) { this.clusterId = clusterId; @@ -54,42 +57,21 @@ public Builder setCombined(boolean combined) { return this; } - public Builder addNodes(TestKitNode[] nodes) { - for (TestKitNode node : nodes) { - addNode(node); - } - return this; - } - - public Builder addNode(TestKitNode node) { - if (node instanceof ControllerNode) { - ControllerNode controllerNode = (ControllerNode) node; - controllerNodes.put(node.id(), controllerNode); - } else if (node instanceof BrokerNode) { - BrokerNode brokerNode = (BrokerNode) node; - brokerNodes.put(node.id(), brokerNode); - } else { - throw new RuntimeException("Can't handle TestKitNode subclass " + - node.getClass().getSimpleName()); - } - return this; - } - public Builder setNumControllerNodes(int numControllerNodes) { if (numControllerNodes < 0) { throw new RuntimeException("Invalid negative value for numControllerNodes"); } - while (controllerNodes.size() > numControllerNodes) { - controllerNodes.pollFirstEntry(); + while (controllerNodeBuilders.size() > numControllerNodes) { + controllerNodeBuilders.pollFirstEntry(); } - while (controllerNodes.size() < numControllerNodes) { + while (controllerNodeBuilders.size() < numControllerNodes) { int nextId = startControllerId(); - if (!controllerNodes.isEmpty()) { - nextId = controllerNodes.lastKey() + 1; + if (!controllerNodeBuilders.isEmpty()) { + nextId = controllerNodeBuilders.lastKey() + 1; } - controllerNodes.put(nextId, new ControllerNode.Builder(). - setId(nextId).build()); + controllerNodeBuilders.put(nextId, new ControllerNode.Builder(). + setId(nextId)); } return this; } @@ -98,16 +80,16 @@ public Builder setNumBrokerNodes(int numBrokerNodes) { if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } - while (brokerNodes.size() > numBrokerNodes) { - brokerNodes.pollFirstEntry(); + while (brokerNodeBuilders.size() > numBrokerNodes) { + brokerNodeBuilders.pollFirstEntry(); } - while (brokerNodes.size() < numBrokerNodes) { + while (brokerNodeBuilders.size() < numBrokerNodes) { int nextId = startBrokerId(); - if (!brokerNodes.isEmpty()) { - nextId = brokerNodes.lastKey() + 1; + if (!brokerNodeBuilders.isEmpty()) { + nextId = brokerNodeBuilders.lastKey() + 1; } - brokerNodes.put(nextId, new BrokerNode.Builder(). - setId(nextId).build()); + brokerNodeBuilders.put(nextId, new BrokerNode.Builder(). + setId(nextId)); } return this; } @@ -119,7 +101,35 @@ public TestKitNodes build() { if (bootstrapMetadataVersion == null) { bootstrapMetadataVersion = MetadataVersion.latest(); } - return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes); + String baseDirectory = TestUtils.tempDirectory("kafka_" + clusterId).getAbsolutePath(); + try { + NavigableMap controllerNodes = new TreeMap<>(); + for (ControllerNode.Builder controllerNodeBuilder : controllerNodeBuilders.values()) { + ControllerNode controllerNode = controllerNodeBuilder.build(baseDirectory); + if (controllerNodes.put(controllerNode.id(), controllerNode) != null) { + throw new RuntimeException("More than one controller claimed ID " + controllerNode.id()); + } + } + NavigableMap brokerNodes = new TreeMap<>(); + for (BrokerNode.Builder brokerNodeBuilder : brokerNodeBuilders.values()) { + BrokerNode brokerNode = brokerNodeBuilder.build(baseDirectory); + if (brokerNodes.put(brokerNode.id(), brokerNode) != null) { + throw new RuntimeException("More than one broker claimed ID " + brokerNode.id()); + } + } + return new TestKitNodes(baseDirectory, + clusterId, + bootstrapMetadataVersion, + controllerNodes, + brokerNodes); + } catch (Exception e) { + try { + Utils.delete(new File(baseDirectory)); + } catch (IOException x) { + throw new RuntimeException(x); + } + throw e; + } } private int startBrokerId() { @@ -134,6 +144,7 @@ private int startControllerId() { } } + private final String baseDirectory; private final Uuid clusterId; private final MetadataVersion bootstrapMetadataVersion; private final NavigableMap controllerNodes; @@ -143,16 +154,24 @@ public boolean isCombined(int node) { return controllerNodes.containsKey(node) && brokerNodes.containsKey(node); } - private TestKitNodes(Uuid clusterId, - MetadataVersion bootstrapMetadataVersion, - NavigableMap controllerNodes, - NavigableMap brokerNodes) { + private TestKitNodes( + String baseDirectory, + Uuid clusterId, + MetadataVersion bootstrapMetadataVersion, + NavigableMap controllerNodes, + NavigableMap brokerNodes + ) { + this.baseDirectory = baseDirectory; this.clusterId = clusterId; this.bootstrapMetadataVersion = bootstrapMetadataVersion; this.controllerNodes = controllerNodes; this.brokerNodes = brokerNodes; } + public String baseDirectory() { + return baseDirectory; + } + public Uuid clusterId() { return clusterId; } @@ -189,24 +208,7 @@ public ListenerName controllerListenerName() { return new ListenerName("CONTROLLER"); } - public TestKitNodes copyWithAbsolutePaths(String baseDirectory) { - NavigableMap newControllerNodes = new TreeMap<>(); - NavigableMap newBrokerNodes = new TreeMap<>(); - for (Entry entry : controllerNodes.entrySet()) { - ControllerNode node = entry.getValue(); - newControllerNodes.put(entry.getKey(), new ControllerNode(node.id(), - absolutize(baseDirectory, node.metadataDirectory()))); - } - for (Entry entry : brokerNodes.entrySet()) { - BrokerNode node = entry.getValue(); - newBrokerNodes.put(entry.getKey(), new BrokerNode(node.id(), - node.incarnationId(), absolutize(baseDirectory, node.metadataDirectory()), - absolutize(baseDirectory, node.logDataDirectories()), node.propertyOverrides())); - } - return new TestKitNodes(clusterId, bootstrapMetadataVersion, newControllerNodes, newBrokerNodes); - } - - private static List absolutize(String base, Collection directories) { + static List absolutize(String base, Collection directories) { List newDirectories = new ArrayList<>(); for (String directory : directories) { newDirectories.add(absolutize(base, directory)); @@ -214,7 +216,7 @@ private static List absolutize(String base, Collection directori return newDirectories; } - private static String absolutize(String base, String directory) { + static String absolutize(String base, String directory) { if (Paths.get(directory).isAbsolute()) { return directory; } diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala index 5d0dc6b1d5397..c79bd7e861ff8 100644 --- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala +++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala @@ -28,7 +28,6 @@ import kafka.server.KafkaConfig import kafka.server.KafkaRaftServer.BrokerRole import kafka.server.KafkaRaftServer.ControllerRole import kafka.server.KafkaRaftServer.ProcessRole -import kafka.server.MetaProperties import kafka.utils.TestUtils import kafka.tools.TestRaftServer.ByteArraySerde import org.apache.kafka.common.TopicPartition @@ -83,13 +82,9 @@ class RaftManagerTest { config: KafkaConfig ): KafkaRaftManager[Array[Byte]] = { val topicId = new Uuid(0L, 2L) - val metaProperties = MetaProperties( - clusterId = Uuid.randomUuid.toString, - nodeId = config.nodeId - ) new KafkaRaftManager[Array[Byte]]( - metaProperties, + Uuid.randomUuid.toString, config, new ByteArraySerde, topicPartition, diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 200a898c60596..7c86bc0d0bf62 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -159,7 +159,7 @@ class ControllerApisTest { controller, raftManager, new KafkaConfig(props), - MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId), + "JgxuGe9URy-E-ceaL04lEw", new ControllerRegistrationsPublisher(), new SimpleApiVersionManager( ListenerType.CONTROLLER,