Skip to content

Commit

Permalink
MINOR: MetaProperties refactor, part 1 (apache#14678)
Browse files Browse the repository at this point in the history
Since we have added directory.id to MetaProperties, it is no longer safe
to assume that all directories on a node contain the same MetaProperties.
Therefore, we should get rid of places where we are using a single
MetaProperties object to represent the settings of an entire cluster.
This PR removes a few such cases. In each case, it is sufficient just to
pass cluster ID.

The second part of this change refactors KafkaClusterTestKit so that we
convert paths to absolute before creating BrokerNode and ControllerNode
objects, rather than after. This prepares the way for storing an
ensemble of MetaProperties objects in BrokerNode and ControllerNode,
which we will do in a follow-up change.

Reviewers: Ron Dagostino <[email protected]>
  • Loading branch information
cmccabe authored and yyu1993 committed Feb 15, 2024
1 parent b9d7a33 commit 0ad2750
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 92 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.log.LogManager
import kafka.log.UnifiedLog
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.KafkaRaftServer.ControllerRole
import kafka.server.{KafkaConfig, MetaProperties}
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils
import kafka.utils.FileLock
import kafka.utils.Logging
Expand Down Expand Up @@ -128,7 +128,7 @@ trait RaftManager[T] {
}

class KafkaRaftManager[T](
metaProperties: MetaProperties,
clusterId: String,
config: KafkaConfig,
recordSerde: RecordSerde[T],
topicPartition: TopicPartition,
Expand Down Expand Up @@ -241,7 +241,7 @@ class KafkaRaftManager[T](
metrics,
expirationService,
logContext,
metaProperties.clusterId,
clusterId,
nodeId,
raftConfig
)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ControllerApis(
val controller: Controller,
val raftManager: RaftManager[ApiMessageAndVersion],
val config: KafkaConfig,
val metaProperties: MetaProperties,
val clusterId: String,
val registrationsPublisher: ControllerRegistrationsPublisher,
val apiVersionManager: ApiVersionManager,
val metadataCache: KRaftMetadataCache
Expand Down Expand Up @@ -1067,7 +1067,7 @@ class ControllerApis(
val response = authHelper.computeDescribeClusterResponse(
request,
EndpointType.CONTROLLER,
metaProperties.clusterId,
clusterId,
() => registrationsPublisher.describeClusterControllers(request.context.listenerName()),
() => raftManager.leaderAndEpoch.leaderId().orElse(-1)
)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class ControllerServer(
true
}

def clusterId: String = sharedServer.metaProps.clusterId
def clusterId: String = sharedServer.clusterId()

def startup(): Unit = {
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
Expand Down Expand Up @@ -319,7 +319,7 @@ class ControllerServer(
controller,
raftManager,
config,
sharedServer.metaProps,
clusterId,
registrationsPublisher,
apiVersionManager,
metadataCache)
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,10 @@ class KafkaServer(
isZkBroker = true)

// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId)
val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
RaftConfig.parseVoterConnections(config.quorumVoters))
val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
kraftMetaProps,
clusterId,
config,
new MetadataRecordSerde,
KafkaRaftServer.MetadataPartition,
Expand Down Expand Up @@ -436,7 +435,7 @@ class KafkaServer(
lifecycleManager.start(
() => listener.highestOffset,
brokerToQuorumChannelManager,
kraftMetaProps.clusterId,
clusterId,
networkListeners,
ibpAsFeature,
OptionalLong.empty()
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ class SharedServer(
@volatile var snapshotGenerator: SnapshotGenerator = _
@volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _

def clusterId(): String = metaProps.clusterId

def isUsed(): Boolean = synchronized {
usedByController || usedByBroker
}
Expand Down Expand Up @@ -248,7 +250,7 @@ class SharedServer(
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaProps,
clusterId(),
sharedServerConfig,
new MetadataRecordSerde,
KafkaRaftServer.MetadataPartition,
Expand Down
9 changes: 2 additions & 7 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import joptsimple.OptionException
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.{KafkaRaftManager, RaftManager}
import kafka.security.CredentialProvider
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, SimpleApiVersionManager}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager}
import kafka.utils.{CoreUtils, Exit, Logging}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
Expand Down Expand Up @@ -82,13 +82,8 @@ class TestRaftServer(
() => Features.fromKRaftVersion(MetadataVersion.MINIMUM_KRAFT_VERSION))
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)

val metaProperties = MetaProperties(
clusterId = Uuid.ZERO_UUID.toString,
nodeId = config.nodeId
)

raftManager = new KafkaRaftManager[Array[Byte]](
metaProperties,
Uuid.ZERO_UUID.toString,
config,
new ByteArraySerde,
partition,
Expand Down
6 changes: 5 additions & 1 deletion core/src/test/java/kafka/testkit/BrokerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public Builder setMetadataDirectory(String metadataDirectory) {
return this;
}

public BrokerNode build() {
BrokerNode build(
String baseDirectory
) {
if (id == -1) {
throw new RuntimeException("You must set the node id");
}
Expand All @@ -60,9 +62,11 @@ public BrokerNode build() {
logDataDirectories = Collections.
singletonList(String.format("broker_%d_data0", id));
}
logDataDirectories = TestKitNodes.absolutize(baseDirectory, logDataDirectories);
if (metadataDirectory == null) {
metadataDirectory = logDataDirectories.get(0);
}
metadataDirectory = TestKitNodes.absolutize(baseDirectory, metadataDirectory);
return new BrokerNode(id, incarnationId, metadataDirectory,
logDataDirectories);
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/test/java/kafka/testkit/ControllerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ public Builder setMetadataDirectory(String metadataDirectory) {
return this;
}

public ControllerNode build() {
public ControllerNode build(
String baseDirectory
) {
if (id == -1) {
throw new RuntimeException("You must set the node id");
}
if (metadataDirectory == null) {
metadataDirectory = String.format("controller_%d", id);
}
metadataDirectory = TestKitNodes.absolutize(baseDirectory, metadataDirectory);
return new ControllerNode(id, metadataDirectory);
}
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,9 @@ public KafkaClusterTestKit build() throws Exception {
ExecutorService executorService = null;
ControllerQuorumVotersFutureManager connectFutureManager =
new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
File baseDirectory = null;
File baseDirectory = new File(nodes.baseDirectory());

try {
baseDirectory = TestUtils.tempDirectory();
nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false));
for (ControllerNode node : nodes.controllerNodes().values()) {
Expand Down
126 changes: 64 additions & 62 deletions core/src/test/java/kafka/testkit/TestKitNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import kafka.server.MetaProperties;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;

Expand All @@ -36,8 +39,8 @@ public static class Builder {
private boolean combined = false;
private Uuid clusterId = null;
private MetadataVersion bootstrapMetadataVersion = null;
private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
private final NavigableMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
private final NavigableMap<Integer, ControllerNode.Builder> controllerNodeBuilders = new TreeMap<>();
private final NavigableMap<Integer, BrokerNode.Builder> brokerNodeBuilders = new TreeMap<>();

public Builder setClusterId(Uuid clusterId) {
this.clusterId = clusterId;
Expand All @@ -54,42 +57,21 @@ public Builder setCombined(boolean combined) {
return this;
}

public Builder addNodes(TestKitNode[] nodes) {
for (TestKitNode node : nodes) {
addNode(node);
}
return this;
}

public Builder addNode(TestKitNode node) {
if (node instanceof ControllerNode) {
ControllerNode controllerNode = (ControllerNode) node;
controllerNodes.put(node.id(), controllerNode);
} else if (node instanceof BrokerNode) {
BrokerNode brokerNode = (BrokerNode) node;
brokerNodes.put(node.id(), brokerNode);
} else {
throw new RuntimeException("Can't handle TestKitNode subclass " +
node.getClass().getSimpleName());
}
return this;
}

public Builder setNumControllerNodes(int numControllerNodes) {
if (numControllerNodes < 0) {
throw new RuntimeException("Invalid negative value for numControllerNodes");
}

while (controllerNodes.size() > numControllerNodes) {
controllerNodes.pollFirstEntry();
while (controllerNodeBuilders.size() > numControllerNodes) {
controllerNodeBuilders.pollFirstEntry();
}
while (controllerNodes.size() < numControllerNodes) {
while (controllerNodeBuilders.size() < numControllerNodes) {
int nextId = startControllerId();
if (!controllerNodes.isEmpty()) {
nextId = controllerNodes.lastKey() + 1;
if (!controllerNodeBuilders.isEmpty()) {
nextId = controllerNodeBuilders.lastKey() + 1;
}
controllerNodes.put(nextId, new ControllerNode.Builder().
setId(nextId).build());
controllerNodeBuilders.put(nextId, new ControllerNode.Builder().
setId(nextId));
}
return this;
}
Expand All @@ -98,16 +80,16 @@ public Builder setNumBrokerNodes(int numBrokerNodes) {
if (numBrokerNodes < 0) {
throw new RuntimeException("Invalid negative value for numBrokerNodes");
}
while (brokerNodes.size() > numBrokerNodes) {
brokerNodes.pollFirstEntry();
while (brokerNodeBuilders.size() > numBrokerNodes) {
brokerNodeBuilders.pollFirstEntry();
}
while (brokerNodes.size() < numBrokerNodes) {
while (brokerNodeBuilders.size() < numBrokerNodes) {
int nextId = startBrokerId();
if (!brokerNodes.isEmpty()) {
nextId = brokerNodes.lastKey() + 1;
if (!brokerNodeBuilders.isEmpty()) {
nextId = brokerNodeBuilders.lastKey() + 1;
}
brokerNodes.put(nextId, new BrokerNode.Builder().
setId(nextId).build());
brokerNodeBuilders.put(nextId, new BrokerNode.Builder().
setId(nextId));
}
return this;
}
Expand All @@ -119,7 +101,35 @@ public TestKitNodes build() {
if (bootstrapMetadataVersion == null) {
bootstrapMetadataVersion = MetadataVersion.latest();
}
return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes);
String baseDirectory = TestUtils.tempDirectory("kafka_" + clusterId).getAbsolutePath();
try {
NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
for (ControllerNode.Builder controllerNodeBuilder : controllerNodeBuilders.values()) {
ControllerNode controllerNode = controllerNodeBuilder.build(baseDirectory);
if (controllerNodes.put(controllerNode.id(), controllerNode) != null) {
throw new RuntimeException("More than one controller claimed ID " + controllerNode.id());
}
}
NavigableMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
for (BrokerNode.Builder brokerNodeBuilder : brokerNodeBuilders.values()) {
BrokerNode brokerNode = brokerNodeBuilder.build(baseDirectory);
if (brokerNodes.put(brokerNode.id(), brokerNode) != null) {
throw new RuntimeException("More than one broker claimed ID " + brokerNode.id());
}
}
return new TestKitNodes(baseDirectory,
clusterId,
bootstrapMetadataVersion,
controllerNodes,
brokerNodes);
} catch (Exception e) {
try {
Utils.delete(new File(baseDirectory));
} catch (IOException x) {
throw new RuntimeException(x);
}
throw e;
}
}

private int startBrokerId() {
Expand All @@ -134,6 +144,7 @@ private int startControllerId() {
}
}

private final String baseDirectory;
private final Uuid clusterId;
private final MetadataVersion bootstrapMetadataVersion;
private final NavigableMap<Integer, ControllerNode> controllerNodes;
Expand All @@ -143,16 +154,24 @@ public boolean isCombined(int node) {
return controllerNodes.containsKey(node) && brokerNodes.containsKey(node);
}

private TestKitNodes(Uuid clusterId,
MetadataVersion bootstrapMetadataVersion,
NavigableMap<Integer, ControllerNode> controllerNodes,
NavigableMap<Integer, BrokerNode> brokerNodes) {
private TestKitNodes(
String baseDirectory,
Uuid clusterId,
MetadataVersion bootstrapMetadataVersion,
NavigableMap<Integer, ControllerNode> controllerNodes,
NavigableMap<Integer, BrokerNode> brokerNodes
) {
this.baseDirectory = baseDirectory;
this.clusterId = clusterId;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.controllerNodes = controllerNodes;
this.brokerNodes = brokerNodes;
}

public String baseDirectory() {
return baseDirectory;
}

public Uuid clusterId() {
return clusterId;
}
Expand Down Expand Up @@ -189,32 +208,15 @@ public ListenerName controllerListenerName() {
return new ListenerName("CONTROLLER");
}

public TestKitNodes copyWithAbsolutePaths(String baseDirectory) {
NavigableMap<Integer, ControllerNode> newControllerNodes = new TreeMap<>();
NavigableMap<Integer, BrokerNode> newBrokerNodes = new TreeMap<>();
for (Entry<Integer, ControllerNode> entry : controllerNodes.entrySet()) {
ControllerNode node = entry.getValue();
newControllerNodes.put(entry.getKey(), new ControllerNode(node.id(),
absolutize(baseDirectory, node.metadataDirectory())));
}
for (Entry<Integer, BrokerNode> entry : brokerNodes.entrySet()) {
BrokerNode node = entry.getValue();
newBrokerNodes.put(entry.getKey(), new BrokerNode(node.id(),
node.incarnationId(), absolutize(baseDirectory, node.metadataDirectory()),
absolutize(baseDirectory, node.logDataDirectories()), node.propertyOverrides()));
}
return new TestKitNodes(clusterId, bootstrapMetadataVersion, newControllerNodes, newBrokerNodes);
}

private static List<String> absolutize(String base, Collection<String> directories) {
static List<String> absolutize(String base, Collection<String> directories) {
List<String> newDirectories = new ArrayList<>();
for (String directory : directories) {
newDirectories.add(absolutize(base, directory));
}
return newDirectories;
}

private static String absolutize(String base, String directory) {
static String absolutize(String base, String directory) {
if (Paths.get(directory).isAbsolute()) {
return directory;
}
Expand Down
Loading

0 comments on commit 0ad2750

Please sign in to comment.