Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: MetaProperties refactor, part 1 #14678

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.log.LogManager
import kafka.log.UnifiedLog
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.KafkaRaftServer.ControllerRole
import kafka.server.{KafkaConfig, MetaProperties}
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils
import kafka.utils.FileLock
import kafka.utils.Logging
Expand Down Expand Up @@ -128,7 +128,7 @@ trait RaftManager[T] {
}

class KafkaRaftManager[T](
metaProperties: MetaProperties,
clusterId: String,
config: KafkaConfig,
recordSerde: RecordSerde[T],
topicPartition: TopicPartition,
Expand Down Expand Up @@ -241,7 +241,7 @@ class KafkaRaftManager[T](
metrics,
expirationService,
logContext,
metaProperties.clusterId,
clusterId,
nodeId,
raftConfig
)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ControllerApis(
val controller: Controller,
val raftManager: RaftManager[ApiMessageAndVersion],
val config: KafkaConfig,
val metaProperties: MetaProperties,
val clusterId: String,
val registrationsPublisher: ControllerRegistrationsPublisher,
val apiVersionManager: ApiVersionManager,
val metadataCache: KRaftMetadataCache
Expand Down Expand Up @@ -1066,7 +1066,7 @@ class ControllerApis(
val response = authHelper.computeDescribeClusterResponse(
request,
EndpointType.CONTROLLER,
metaProperties.clusterId,
clusterId,
() => registrationsPublisher.describeClusterControllers(request.context.listenerName()),
() => raftManager.leaderAndEpoch.leaderId().orElse(-1)
)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class ControllerServer(
true
}

def clusterId: String = sharedServer.metaProps.clusterId
def clusterId: String = sharedServer.clusterId()

def startup(): Unit = {
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
Expand Down Expand Up @@ -319,7 +319,7 @@ class ControllerServer(
controller,
raftManager,
config,
sharedServer.metaProps,
clusterId,
registrationsPublisher,
apiVersionManager,
metadataCache)
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,10 @@ class KafkaServer(
isZkBroker = true)

// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId)
val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
RaftConfig.parseVoterConnections(config.quorumVoters))
val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
kraftMetaProps,
clusterId,
config,
new MetadataRecordSerde,
KafkaRaftServer.MetadataPartition,
Expand Down Expand Up @@ -436,7 +435,7 @@ class KafkaServer(
lifecycleManager.start(
() => listener.highestOffset,
brokerToQuorumChannelManager,
kraftMetaProps.clusterId,
clusterId,
networkListeners,
ibpAsFeature,
OptionalLong.empty()
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ class SharedServer(
@volatile var snapshotGenerator: SnapshotGenerator = _
@volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _

def clusterId(): String = metaProps.clusterId

def isUsed(): Boolean = synchronized {
usedByController || usedByBroker
}
Expand Down Expand Up @@ -248,7 +250,7 @@ class SharedServer(
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaProps,
clusterId(),
sharedServerConfig,
new MetadataRecordSerde,
KafkaRaftServer.MetadataPartition,
Expand Down
9 changes: 2 additions & 7 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import joptsimple.OptionException
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.{KafkaRaftManager, RaftManager}
import kafka.security.CredentialProvider
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, SimpleApiVersionManager}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager}
import kafka.utils.{CoreUtils, Exit, Logging}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
Expand Down Expand Up @@ -82,13 +82,8 @@ class TestRaftServer(
() => Features.fromKRaftVersion(MetadataVersion.MINIMUM_KRAFT_VERSION))
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)

val metaProperties = MetaProperties(
clusterId = Uuid.ZERO_UUID.toString,
nodeId = config.nodeId
)

raftManager = new KafkaRaftManager[Array[Byte]](
metaProperties,
Uuid.ZERO_UUID.toString,
Comment on lines -91 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want Uuid.randomUuid.toString instead, which is what we use in RaftManagerTest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question. I don't know why this test chose Uuid.ZERO_UUID. But I figured changing it here was out of scope.

config,
new ByteArraySerde,
partition,
Expand Down
6 changes: 5 additions & 1 deletion core/src/test/java/kafka/testkit/BrokerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public Builder setMetadataDirectory(String metadataDirectory) {
return this;
}

public BrokerNode build() {
BrokerNode build(
String baseDirectory
) {
if (id == -1) {
throw new RuntimeException("You must set the node id");
}
Expand All @@ -60,9 +62,11 @@ public BrokerNode build() {
logDataDirectories = Collections.
singletonList(String.format("broker_%d_data0", id));
}
logDataDirectories = TestKitNodes.absolutize(baseDirectory, logDataDirectories);
if (metadataDirectory == null) {
metadataDirectory = logDataDirectories.get(0);
}
metadataDirectory = TestKitNodes.absolutize(baseDirectory, metadataDirectory);
return new BrokerNode(id, incarnationId, metadataDirectory,
logDataDirectories);
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/test/java/kafka/testkit/ControllerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ public Builder setMetadataDirectory(String metadataDirectory) {
return this;
}

public ControllerNode build() {
public ControllerNode build(
String baseDirectory
) {
if (id == -1) {
throw new RuntimeException("You must set the node id");
}
if (metadataDirectory == null) {
metadataDirectory = String.format("controller_%d", id);
}
metadataDirectory = TestKitNodes.absolutize(baseDirectory, metadataDirectory);
return new ControllerNode(id, metadataDirectory);
}
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,9 @@ public KafkaClusterTestKit build() throws Exception {
ExecutorService executorService = null;
ControllerQuorumVotersFutureManager connectFutureManager =
new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
File baseDirectory = null;
File baseDirectory = new File(nodes.baseDirectory());

try {
baseDirectory = TestUtils.tempDirectory();
nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false));
for (ControllerNode node : nodes.controllerNodes().values()) {
Expand Down
126 changes: 64 additions & 62 deletions core/src/test/java/kafka/testkit/TestKitNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import kafka.server.MetaProperties;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;

Expand All @@ -36,8 +39,8 @@ public static class Builder {
private boolean combined = false;
private Uuid clusterId = null;
private MetadataVersion bootstrapMetadataVersion = null;
private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
private final NavigableMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
private final NavigableMap<Integer, ControllerNode.Builder> controllerNodeBuilders = new TreeMap<>();
private final NavigableMap<Integer, BrokerNode.Builder> brokerNodeBuilders = new TreeMap<>();

public Builder setClusterId(Uuid clusterId) {
this.clusterId = clusterId;
Expand All @@ -54,42 +57,21 @@ public Builder setCombined(boolean combined) {
return this;
}

public Builder addNodes(TestKitNode[] nodes) {
for (TestKitNode node : nodes) {
addNode(node);
}
return this;
}

public Builder addNode(TestKitNode node) {
if (node instanceof ControllerNode) {
ControllerNode controllerNode = (ControllerNode) node;
controllerNodes.put(node.id(), controllerNode);
} else if (node instanceof BrokerNode) {
BrokerNode brokerNode = (BrokerNode) node;
brokerNodes.put(node.id(), brokerNode);
} else {
throw new RuntimeException("Can't handle TestKitNode subclass " +
node.getClass().getSimpleName());
}
return this;
}

public Builder setNumControllerNodes(int numControllerNodes) {
if (numControllerNodes < 0) {
throw new RuntimeException("Invalid negative value for numControllerNodes");
}

while (controllerNodes.size() > numControllerNodes) {
controllerNodes.pollFirstEntry();
while (controllerNodeBuilders.size() > numControllerNodes) {
controllerNodeBuilders.pollFirstEntry();
}
while (controllerNodes.size() < numControllerNodes) {
while (controllerNodeBuilders.size() < numControllerNodes) {
int nextId = startControllerId();
if (!controllerNodes.isEmpty()) {
nextId = controllerNodes.lastKey() + 1;
if (!controllerNodeBuilders.isEmpty()) {
nextId = controllerNodeBuilders.lastKey() + 1;
}
controllerNodes.put(nextId, new ControllerNode.Builder().
setId(nextId).build());
controllerNodeBuilders.put(nextId, new ControllerNode.Builder().
setId(nextId));
}
return this;
}
Expand All @@ -98,16 +80,16 @@ public Builder setNumBrokerNodes(int numBrokerNodes) {
if (numBrokerNodes < 0) {
throw new RuntimeException("Invalid negative value for numBrokerNodes");
}
while (brokerNodes.size() > numBrokerNodes) {
brokerNodes.pollFirstEntry();
while (brokerNodeBuilders.size() > numBrokerNodes) {
brokerNodeBuilders.pollFirstEntry();
}
while (brokerNodes.size() < numBrokerNodes) {
while (brokerNodeBuilders.size() < numBrokerNodes) {
int nextId = startBrokerId();
if (!brokerNodes.isEmpty()) {
nextId = brokerNodes.lastKey() + 1;
if (!brokerNodeBuilders.isEmpty()) {
nextId = brokerNodeBuilders.lastKey() + 1;
}
brokerNodes.put(nextId, new BrokerNode.Builder().
setId(nextId).build());
brokerNodeBuilders.put(nextId, new BrokerNode.Builder().
setId(nextId));
}
return this;
}
Expand All @@ -119,7 +101,35 @@ public TestKitNodes build() {
if (bootstrapMetadataVersion == null) {
bootstrapMetadataVersion = MetadataVersion.latest();
}
return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes);
String baseDirectory = TestUtils.tempDirectory("kafka_" + clusterId).getAbsolutePath();
try {
NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
for (ControllerNode.Builder controllerNodeBuilder : controllerNodeBuilders.values()) {
ControllerNode controllerNode = controllerNodeBuilder.build(baseDirectory);
if (controllerNodes.put(controllerNode.id(), controllerNode) != null) {
throw new RuntimeException("More than one controller claimed ID " + controllerNode.id());
}
}
NavigableMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
for (BrokerNode.Builder brokerNodeBuilder : brokerNodeBuilders.values()) {
BrokerNode brokerNode = brokerNodeBuilder.build(baseDirectory);
if (brokerNodes.put(brokerNode.id(), brokerNode) != null) {
throw new RuntimeException("More than one broker claimed ID " + brokerNode.id());
}
}
return new TestKitNodes(baseDirectory,
clusterId,
bootstrapMetadataVersion,
controllerNodes,
brokerNodes);
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be catch (RuntimeException e) { since the code doesn't throw any checked exceptions and the signature of this method does not declare any checked exceptions as being potentially thrown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fair. We can only have RuntimException here. Still, I can see no logical reason why we wouldn't do the deletion if the code was restructured to throw checked exceptions here. So it seems clearer this way.

try {
Utils.delete(new File(baseDirectory));
} catch (IOException x) {
throw new RuntimeException(x);
}
throw e;
}
}

private int startBrokerId() {
Expand All @@ -134,6 +144,7 @@ private int startControllerId() {
}
}

private final String baseDirectory;
private final Uuid clusterId;
private final MetadataVersion bootstrapMetadataVersion;
private final NavigableMap<Integer, ControllerNode> controllerNodes;
Expand All @@ -143,16 +154,24 @@ public boolean isCombined(int node) {
return controllerNodes.containsKey(node) && brokerNodes.containsKey(node);
}

private TestKitNodes(Uuid clusterId,
MetadataVersion bootstrapMetadataVersion,
NavigableMap<Integer, ControllerNode> controllerNodes,
NavigableMap<Integer, BrokerNode> brokerNodes) {
private TestKitNodes(
String baseDirectory,
Uuid clusterId,
MetadataVersion bootstrapMetadataVersion,
NavigableMap<Integer, ControllerNode> controllerNodes,
NavigableMap<Integer, BrokerNode> brokerNodes
) {
this.baseDirectory = baseDirectory;
this.clusterId = clusterId;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.controllerNodes = controllerNodes;
this.brokerNodes = brokerNodes;
}

public String baseDirectory() {
return baseDirectory;
}

public Uuid clusterId() {
return clusterId;
}
Expand Down Expand Up @@ -189,32 +208,15 @@ public ListenerName controllerListenerName() {
return new ListenerName("CONTROLLER");
}

public TestKitNodes copyWithAbsolutePaths(String baseDirectory) {
NavigableMap<Integer, ControllerNode> newControllerNodes = new TreeMap<>();
NavigableMap<Integer, BrokerNode> newBrokerNodes = new TreeMap<>();
for (Entry<Integer, ControllerNode> entry : controllerNodes.entrySet()) {
ControllerNode node = entry.getValue();
newControllerNodes.put(entry.getKey(), new ControllerNode(node.id(),
absolutize(baseDirectory, node.metadataDirectory())));
}
for (Entry<Integer, BrokerNode> entry : brokerNodes.entrySet()) {
BrokerNode node = entry.getValue();
newBrokerNodes.put(entry.getKey(), new BrokerNode(node.id(),
node.incarnationId(), absolutize(baseDirectory, node.metadataDirectory()),
absolutize(baseDirectory, node.logDataDirectories()), node.propertyOverrides()));
}
return new TestKitNodes(clusterId, bootstrapMetadataVersion, newControllerNodes, newBrokerNodes);
}

private static List<String> absolutize(String base, Collection<String> directories) {
static List<String> absolutize(String base, Collection<String> directories) {
List<String> newDirectories = new ArrayList<>();
for (String directory : directories) {
newDirectories.add(absolutize(base, directory));
}
return newDirectories;
}

private static String absolutize(String base, String directory) {
static String absolutize(String base, String directory) {
if (Paths.get(directory).isAbsolute()) {
return directory;
}
Expand Down
Loading