From 114f92fdb1c09685d09a2f44e957a45112bfeac1 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Tue, 31 Oct 2023 10:09:37 -0700 Subject: [PATCH] MINOR: MetaProperties refactor, part 1 Since we have added directory.id to MetaProperties, it is no longer safe to assume that all directories on a node contain the same MetaProperties. Therefore, we should get rid of places where we are using a single MetaProperties object to represent the settings of an entire cluster. This PR removes a few such cases. In each case, it is sufficient just to pass cluster ID. The second part of this change refactors KafkaClusterTestKit so that we convert paths to absolute before creating BrokerNode and ControllerNode objects, rather than after. This prepares the way for storing an ensemble of MetaProperties objects in BrokerNode and ControllerNode, which we will do in a follow-up change. --- .../main/scala/kafka/raft/RaftManager.scala | 6 +- .../scala/kafka/server/ControllerApis.scala | 4 +- .../scala/kafka/server/ControllerServer.scala | 4 +- .../main/scala/kafka/server/KafkaServer.scala | 5 +- .../scala/kafka/server/SharedServer.scala | 4 +- .../scala/kafka/tools/TestRaftServer.scala | 9 +- .../test/java/kafka/testkit/BrokerNode.java | 6 +- .../java/kafka/testkit/ControllerNode.java | 5 +- .../kafka/testkit/KafkaClusterTestKit.java | 4 +- .../test/java/kafka/testkit/TestKitNodes.java | 126 +++++++++--------- .../unit/kafka/raft/RaftManagerTest.scala | 7 +- .../kafka/server/ControllerApisTest.scala | 2 +- 12 files changed, 90 insertions(+), 92 deletions(-) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index f9311d20d9593..cd8a973965499 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -26,7 +26,7 @@ import kafka.log.LogManager import kafka.log.UnifiedLog import kafka.raft.KafkaRaftManager.RaftIoThread import kafka.server.KafkaRaftServer.ControllerRole -import kafka.server.{KafkaConfig, MetaProperties} +import kafka.server.KafkaConfig import kafka.utils.CoreUtils import kafka.utils.FileLock import kafka.utils.Logging @@ -128,7 +128,7 @@ trait RaftManager[T] { } class KafkaRaftManager[T]( - metaProperties: MetaProperties, + clusterId: String, config: KafkaConfig, recordSerde: RecordSerde[T], topicPartition: TopicPartition, @@ -241,7 +241,7 @@ class KafkaRaftManager[T]( metrics, expirationService, logContext, - metaProperties.clusterId, + clusterId, nodeId, raftConfig ) diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 856d9365ae883..48d4f840fd60a 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -73,7 +73,7 @@ class ControllerApis( val controller: Controller, val raftManager: RaftManager[ApiMessageAndVersion], val config: KafkaConfig, - val metaProperties: MetaProperties, + val clusterId: String, val registrationsPublisher: ControllerRegistrationsPublisher, val apiVersionManager: ApiVersionManager, val metadataCache: KRaftMetadataCache @@ -1066,7 +1066,7 @@ class ControllerApis( val response = authHelper.computeDescribeClusterResponse( request, EndpointType.CONTROLLER, - metaProperties.clusterId, + clusterId, () => registrationsPublisher.describeClusterControllers(request.context.listenerName()), () => raftManager.leaderAndEpoch.leaderId().orElse(-1) ) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 9e49b7ffd4ccb..f684ee4ede148 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -137,7 +137,7 @@ class ControllerServer( true } - def clusterId: String = sharedServer.metaProps.clusterId + def clusterId: String = sharedServer.clusterId() def startup(): Unit = { if (!maybeChangeStatus(SHUTDOWN, STARTING)) return @@ -319,7 +319,7 @@ class ControllerServer( controller, raftManager, config, - sharedServer.metaProps, + clusterId, registrationsPublisher, apiVersionManager, metadataCache) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b6cea90abeacb..1f87f752139df 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -387,11 +387,10 @@ class KafkaServer( isZkBroker = true) // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller - val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId) val controllerQuorumVotersFuture = CompletableFuture.completedFuture( RaftConfig.parseVoterConnections(config.quorumVoters)) val raftManager = new KafkaRaftManager[ApiMessageAndVersion]( - kraftMetaProps, + clusterId, config, new MetadataRecordSerde, KafkaRaftServer.MetadataPartition, @@ -436,7 +435,7 @@ class KafkaServer( lifecycleManager.start( () => listener.highestOffset, brokerToQuorumChannelManager, - kraftMetaProps.clusterId, + clusterId, networkListeners, ibpAsFeature, OptionalLong.empty() diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index dc627f9eaaac7..7bbccf93bf027 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -110,6 +110,8 @@ class SharedServer( @volatile var snapshotGenerator: SnapshotGenerator = _ @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _ + def clusterId(): String = metaProps.clusterId + def isUsed(): Boolean = synchronized { usedByController || usedByBroker } @@ -248,7 +250,7 @@ class SharedServer( controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry())) } val _raftManager = new KafkaRaftManager[ApiMessageAndVersion]( - metaProps, + clusterId(), sharedServerConfig, new MetadataRecordSerde, KafkaRaftServer.MetadataPartition, diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 1026c52847337..f2cdc26723a74 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -23,7 +23,7 @@ import joptsimple.OptionException import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.{KafkaRaftManager, RaftManager} import kafka.security.CredentialProvider -import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, SimpleApiVersionManager} +import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager} import kafka.utils.{CoreUtils, Exit, Logging} import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -82,13 +82,8 @@ class TestRaftServer( () => Features.fromKRaftVersion(MetadataVersion.MINIMUM_KRAFT_VERSION)) socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) - val metaProperties = MetaProperties( - clusterId = Uuid.ZERO_UUID.toString, - nodeId = config.nodeId - ) - raftManager = new KafkaRaftManager[Array[Byte]]( - metaProperties, + Uuid.ZERO_UUID.toString, config, new ByteArraySerde, partition, diff --git a/core/src/test/java/kafka/testkit/BrokerNode.java b/core/src/test/java/kafka/testkit/BrokerNode.java index 005d498c25f3b..1cc90276a7976 100644 --- a/core/src/test/java/kafka/testkit/BrokerNode.java +++ b/core/src/test/java/kafka/testkit/BrokerNode.java @@ -49,7 +49,9 @@ public Builder setMetadataDirectory(String metadataDirectory) { return this; } - public BrokerNode build() { + BrokerNode build( + String baseDirectory + ) { if (id == -1) { throw new RuntimeException("You must set the node id"); } @@ -60,9 +62,11 @@ public BrokerNode build() { logDataDirectories = Collections. singletonList(String.format("broker_%d_data0", id)); } + logDataDirectories = TestKitNodes.absolutize(baseDirectory, logDataDirectories); if (metadataDirectory == null) { metadataDirectory = logDataDirectories.get(0); } + metadataDirectory = TestKitNodes.absolutize(baseDirectory, metadataDirectory); return new BrokerNode(id, incarnationId, metadataDirectory, logDataDirectories); } diff --git a/core/src/test/java/kafka/testkit/ControllerNode.java b/core/src/test/java/kafka/testkit/ControllerNode.java index 3ee2b4d081a9d..11901d1f38731 100644 --- a/core/src/test/java/kafka/testkit/ControllerNode.java +++ b/core/src/test/java/kafka/testkit/ControllerNode.java @@ -32,13 +32,16 @@ public Builder setMetadataDirectory(String metadataDirectory) { return this; } - public ControllerNode build() { + public ControllerNode build( + String baseDirectory + ) { if (id == -1) { throw new RuntimeException("You must set the node id"); } if (metadataDirectory == null) { metadataDirectory = String.format("controller_%d", id); } + metadataDirectory = TestKitNodes.absolutize(baseDirectory, metadataDirectory); return new ControllerNode(id, metadataDirectory); } } diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 14b711c3fec92..7b6bef6f64c62 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -216,11 +216,9 @@ public KafkaClusterTestKit build() throws Exception { ExecutorService executorService = null; ControllerQuorumVotersFutureManager connectFutureManager = new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size()); - File baseDirectory = null; + File baseDirectory = new File(nodes.baseDirectory()); try { - baseDirectory = TestUtils.tempDirectory(); - nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath()); executorService = Executors.newFixedThreadPool(numOfExecutorThreads, ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false)); for (ControllerNode node : nodes.controllerNodes().values()) { diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java index a27dfc0c8ff2c..29aee50fb9cd8 100644 --- a/core/src/test/java/kafka/testkit/TestKitNodes.java +++ b/core/src/test/java/kafka/testkit/TestKitNodes.java @@ -20,14 +20,17 @@ import kafka.server.MetaProperties; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NavigableMap; import java.util.TreeMap; @@ -36,8 +39,8 @@ public static class Builder { private boolean combined = false; private Uuid clusterId = null; private MetadataVersion bootstrapMetadataVersion = null; - private final NavigableMap controllerNodes = new TreeMap<>(); - private final NavigableMap brokerNodes = new TreeMap<>(); + private final NavigableMap controllerNodeBuilders = new TreeMap<>(); + private final NavigableMap brokerNodeBuilders = new TreeMap<>(); public Builder setClusterId(Uuid clusterId) { this.clusterId = clusterId; @@ -54,42 +57,21 @@ public Builder setCombined(boolean combined) { return this; } - public Builder addNodes(TestKitNode[] nodes) { - for (TestKitNode node : nodes) { - addNode(node); - } - return this; - } - - public Builder addNode(TestKitNode node) { - if (node instanceof ControllerNode) { - ControllerNode controllerNode = (ControllerNode) node; - controllerNodes.put(node.id(), controllerNode); - } else if (node instanceof BrokerNode) { - BrokerNode brokerNode = (BrokerNode) node; - brokerNodes.put(node.id(), brokerNode); - } else { - throw new RuntimeException("Can't handle TestKitNode subclass " + - node.getClass().getSimpleName()); - } - return this; - } - public Builder setNumControllerNodes(int numControllerNodes) { if (numControllerNodes < 0) { throw new RuntimeException("Invalid negative value for numControllerNodes"); } - while (controllerNodes.size() > numControllerNodes) { - controllerNodes.pollFirstEntry(); + while (controllerNodeBuilders.size() > numControllerNodes) { + controllerNodeBuilders.pollFirstEntry(); } - while (controllerNodes.size() < numControllerNodes) { + while (controllerNodeBuilders.size() < numControllerNodes) { int nextId = startControllerId(); - if (!controllerNodes.isEmpty()) { - nextId = controllerNodes.lastKey() + 1; + if (!controllerNodeBuilders.isEmpty()) { + nextId = controllerNodeBuilders.lastKey() + 1; } - controllerNodes.put(nextId, new ControllerNode.Builder(). - setId(nextId).build()); + controllerNodeBuilders.put(nextId, new ControllerNode.Builder(). + setId(nextId)); } return this; } @@ -98,16 +80,16 @@ public Builder setNumBrokerNodes(int numBrokerNodes) { if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } - while (brokerNodes.size() > numBrokerNodes) { - brokerNodes.pollFirstEntry(); + while (brokerNodeBuilders.size() > numBrokerNodes) { + brokerNodeBuilders.pollFirstEntry(); } - while (brokerNodes.size() < numBrokerNodes) { + while (brokerNodeBuilders.size() < numBrokerNodes) { int nextId = startBrokerId(); - if (!brokerNodes.isEmpty()) { - nextId = brokerNodes.lastKey() + 1; + if (!brokerNodeBuilders.isEmpty()) { + nextId = brokerNodeBuilders.lastKey() + 1; } - brokerNodes.put(nextId, new BrokerNode.Builder(). - setId(nextId).build()); + brokerNodeBuilders.put(nextId, new BrokerNode.Builder(). + setId(nextId)); } return this; } @@ -119,7 +101,35 @@ public TestKitNodes build() { if (bootstrapMetadataVersion == null) { bootstrapMetadataVersion = MetadataVersion.latest(); } - return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes); + String baseDirectory = TestUtils.tempDirectory("kafka_" + clusterId).getAbsolutePath(); + try { + NavigableMap controllerNodes = new TreeMap<>(); + for (ControllerNode.Builder controllerNodeBuilder : controllerNodeBuilders.values()) { + ControllerNode controllerNode = controllerNodeBuilder.build(baseDirectory); + if (controllerNodes.put(controllerNode.id(), controllerNode) != null) { + throw new RuntimeException("More than one controller claimed ID " + controllerNode.id()); + } + } + NavigableMap brokerNodes = new TreeMap<>(); + for (BrokerNode.Builder brokerNodeBuilder : brokerNodeBuilders.values()) { + BrokerNode brokerNode = brokerNodeBuilder.build(baseDirectory); + if (brokerNodes.put(brokerNode.id(), brokerNode) != null) { + throw new RuntimeException("More than one broker claimed ID " + brokerNode.id()); + } + } + return new TestKitNodes(baseDirectory, + clusterId, + bootstrapMetadataVersion, + controllerNodes, + brokerNodes); + } catch (Exception e) { + try { + Utils.delete(new File(baseDirectory)); + } catch (IOException x) { + throw new RuntimeException(x); + } + throw e; + } } private int startBrokerId() { @@ -134,6 +144,7 @@ private int startControllerId() { } } + private final String baseDirectory; private final Uuid clusterId; private final MetadataVersion bootstrapMetadataVersion; private final NavigableMap controllerNodes; @@ -143,16 +154,24 @@ public boolean isCombined(int node) { return controllerNodes.containsKey(node) && brokerNodes.containsKey(node); } - private TestKitNodes(Uuid clusterId, - MetadataVersion bootstrapMetadataVersion, - NavigableMap controllerNodes, - NavigableMap brokerNodes) { + private TestKitNodes( + String baseDirectory, + Uuid clusterId, + MetadataVersion bootstrapMetadataVersion, + NavigableMap controllerNodes, + NavigableMap brokerNodes + ) { + this.baseDirectory = baseDirectory; this.clusterId = clusterId; this.bootstrapMetadataVersion = bootstrapMetadataVersion; this.controllerNodes = controllerNodes; this.brokerNodes = brokerNodes; } + public String baseDirectory() { + return baseDirectory; + } + public Uuid clusterId() { return clusterId; } @@ -189,24 +208,7 @@ public ListenerName controllerListenerName() { return new ListenerName("CONTROLLER"); } - public TestKitNodes copyWithAbsolutePaths(String baseDirectory) { - NavigableMap newControllerNodes = new TreeMap<>(); - NavigableMap newBrokerNodes = new TreeMap<>(); - for (Entry entry : controllerNodes.entrySet()) { - ControllerNode node = entry.getValue(); - newControllerNodes.put(entry.getKey(), new ControllerNode(node.id(), - absolutize(baseDirectory, node.metadataDirectory()))); - } - for (Entry entry : brokerNodes.entrySet()) { - BrokerNode node = entry.getValue(); - newBrokerNodes.put(entry.getKey(), new BrokerNode(node.id(), - node.incarnationId(), absolutize(baseDirectory, node.metadataDirectory()), - absolutize(baseDirectory, node.logDataDirectories()), node.propertyOverrides())); - } - return new TestKitNodes(clusterId, bootstrapMetadataVersion, newControllerNodes, newBrokerNodes); - } - - private static List absolutize(String base, Collection directories) { + static List absolutize(String base, Collection directories) { List newDirectories = new ArrayList<>(); for (String directory : directories) { newDirectories.add(absolutize(base, directory)); @@ -214,7 +216,7 @@ private static List absolutize(String base, Collection directori return newDirectories; } - private static String absolutize(String base, String directory) { + static String absolutize(String base, String directory) { if (Paths.get(directory).isAbsolute()) { return directory; } diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala index 5d0dc6b1d5397..c79bd7e861ff8 100644 --- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala +++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala @@ -28,7 +28,6 @@ import kafka.server.KafkaConfig import kafka.server.KafkaRaftServer.BrokerRole import kafka.server.KafkaRaftServer.ControllerRole import kafka.server.KafkaRaftServer.ProcessRole -import kafka.server.MetaProperties import kafka.utils.TestUtils import kafka.tools.TestRaftServer.ByteArraySerde import org.apache.kafka.common.TopicPartition @@ -83,13 +82,9 @@ class RaftManagerTest { config: KafkaConfig ): KafkaRaftManager[Array[Byte]] = { val topicId = new Uuid(0L, 2L) - val metaProperties = MetaProperties( - clusterId = Uuid.randomUuid.toString, - nodeId = config.nodeId - ) new KafkaRaftManager[Array[Byte]]( - metaProperties, + Uuid.randomUuid.toString, config, new ByteArraySerde, topicPartition, diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 200a898c60596..7c86bc0d0bf62 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -159,7 +159,7 @@ class ControllerApisTest { controller, raftManager, new KafkaConfig(props), - MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId), + "JgxuGe9URy-E-ceaL04lEw", new ControllerRegistrationsPublisher(), new SimpleApiVersionManager( ListenerType.CONTROLLER,