Skip to content

Commit

Permalink
Fix kroxylicious#176: Changes needed to get the tests working, at least
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Bentley <[email protected]>
  • Loading branch information
tombentley committed Sep 12, 2023
1 parent 56a64f8 commit fcda683
Show file tree
Hide file tree
Showing 13 changed files with 569 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public interface KafkaCluster extends AutoCloseable {
void close() throws Exception;

/**
* Gets the number of brokers in the cluster, including any that are stopped.
* @return the size of the cluster.
* Gets the number of brokers in the cluster, including any that are stopped, excluding any pure controller nodes.
* @return the number of brokers in the cluster.
*/
int getNumOfBrokers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public static ClusterId clusterId(String clusterId) {
* @return the kraft cluster
*/
public static KRaftCluster kraftCluster(int numControllers) {
return mkAnnotation(KRaftCluster.class, Map.of("numControllers", numControllers));
return mkAnnotation(KRaftCluster.class, Map.of(
"numControllers", numControllers,
"combinedMode", true));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,42 @@
/**
* Annotation constraining a {@link KafkaClusterProvisioningStrategy} to use
* a {@link KafkaCluster} that is KRaft-based.
*
* <table>
* <caption>Breakdown of the interaction between numControllers and numBrokers</caption>
* <tr><th>numBrokers</th><th>numControllers</th><th>combinedMode</th><th>roles</th></tr>
* <tr><td>1</td><td>1</td><td>true</td><td>1×<code>"broker,controller"</code></td></tr>
* <tr><td>1</td><td>1</td><td>false</td><td>1×<code>"broker"</code>, 1×<code>"controller"</code></td></tr>
*
* <tr><td>3</td><td>1</td><td>true</td><td>1×<code>"broker,controller"</code>, 2×<code>"broker"</code></td></tr>
* <tr><td>3</td><td>1</td><td>false</td><td>3×<code>"broker"</code>, 1×<code>"controller"</code>, </td></tr>
*
* <tr><td>1</td><td>3</td><td>true</td><td>1×<code>"broker,controller"</code>, 2×<code>"controller"</code></td></tr>
* <tr><td>1</td><td>3</td><td>false</td><td>1×<code>"broker"</code>, 3×<code>"controller"</code></td></tr>
*
* <tr><td>3</td><td>3</td><td>true</td><td>3×<code>"broker,controller"</code></td></tr>
* <tr><td>3</td><td>3</td><td>true</td><td>3×<code>"broker"</code>, 3×<code>"controller"</code></td></tr>
* </table>
*/
@Target({ ElementType.PARAMETER, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
@KafkaClusterConstraint
public @interface KRaftCluster {
/**
* The number of kraft controllers
* The extension will ensure there are enough nodes started with the <code>controller</code> role.
* The extension will combine this with the <code>numBrokers</code> to generate a cluster topology.
* <table>
* <caption>Breakdown of the interaction between numControllers and numBrokers</caption>
* <tr><th>numBrokers</th><th>numControllers</th><th>roles</th></tr>
* <tr><td>1</td><td>1</td><td><code>"broker,controller"</code></td></tr>
* <tr><td>3</td><td>1</td><td><code>"broker,controller"</code>, <code>"broker"</code>, <code>"broker"</code></td></tr>
* <tr><td>1</td><td>3</td><td><code>"broker,controller"</code>, <code>"controller"</code>, <code>"controller"</code></td></tr>
* <tr><td>3</td><td>3</td><td><code>"broker,controller"</code>, <code>"broker,controller"</code>, <code>"broker,controller"</code></td></tr>
* </table>
* The number of kraft controllers.
* The extension will ensure there are this many nodes started with the <code>controller</code> role.
* combining this with the <code>numBrokers</code> and <code>combinedMode</code> to generate a cluster topology.
*
* See the class JavaDoc for example topologies.
* @return The number of KRaft controllers
*/
public int numControllers() default 1;

/**
* Whether to use combined mode, where controllers can share a JVM with brokers.
* See the class JavaDoc for example topologies.
* @return true to use combined mode.
*/
public boolean combinedMode() default true;

}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,20 @@ public static KafkaCluster create(KafkaClusterConfig clusterConfig) {
}

var clusterMode = getExecutionMode(clusterConfig);
var kraftMode = convertClusterKraftMode(System.getenv().get(TEST_CLUSTER_KRAFT_MODE), true);
var kraftMode = convertClusterKraftMode(System.getenv().get(TEST_CLUSTER_KRAFT_MODE), MetadataMode.KRAFT_COMBINED);
var builder = clusterConfig.toBuilder();

if (clusterConfig.getExecMode() == null) {
builder.execMode(clusterMode);
}

if (clusterConfig.getKraftMode() == null) {
builder.kraftMode(kraftMode);
if (clusterConfig.getMetadataMode() == null) {
builder.metadataMode(kraftMode);
}

if (KafkaClusterExecutionMode.CONTAINER == clusterMode && kraftMode && clusterConfig.getBrokersNum() < clusterConfig.getKraftControllers()) {
if (KafkaClusterExecutionMode.CONTAINER == clusterMode
&& kraftMode != MetadataMode.ZOOKEEPER
&& clusterConfig.getBrokersNum() < clusterConfig.getKraftControllers()) {
throw new IllegalStateException(
"Due to https://github.com/ozangunalp/kafka-native/issues/88 we can't support controller only nodes in " + KafkaClusterExecutionMode.CONTAINER
+ " mode so we need to fail fast. This cluster has "
Expand Down Expand Up @@ -104,10 +106,10 @@ private static KafkaClusterExecutionMode getExecutionMode(KafkaClusterConfig clu
clusterConfig.getExecMode() == null ? KafkaClusterExecutionMode.IN_VM : clusterConfig.getExecMode());
}

private static boolean convertClusterKraftMode(String mode, boolean defaultMode) {
private static MetadataMode convertClusterKraftMode(String mode, MetadataMode defaultMode) {
if (mode == null) {
return defaultMode;
}
return Boolean.parseBoolean(mode);
return Boolean.parseBoolean(mode) ? MetadataMode.KRAFT_COMBINED : MetadataMode.ZOOKEEPER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Kroxylicious Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.kroxylicious.testing.kafka.common;

public enum MetadataMode {
ZOOKEEPER,
KRAFT_COMBINED,
KRAFT_SEPARATE;

/**
* @return The total number of Kafka nodes (excludes any ZooKeeper nodes).
*/
public int numNodes(int numControllers, int numBrokers) {
return switch (this) {
case KRAFT_SEPARATE -> numControllers + numBrokers;
case KRAFT_COMBINED -> Math.max(numControllers, numBrokers);
case ZOOKEEPER -> numBrokers;
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Kroxylicious Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.kroxylicious.testing.kafka.common;

import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
import java.util.stream.Collectors;

public enum NodeRole {
BROKER("broker"),
CONTROLLER("controller");

private final String configRole;

private NodeRole(String configRole) {
this.configRole = configRole;
}

/**
* @return The role, as it can be configured in a Kafka {@code server.properties} file.
*/
public static String forConfig(Collection<NodeRole> roles) {
return roles.stream().map(x -> x.configRole).distinct().collect(Collectors.joining(","));
}

public static boolean isPureController(Set<NodeRole> roles) {
return EnumSet.of(NodeRole.CONTROLLER).equals(roles);
}

public static boolean isCombinedNode(Set<NodeRole> roles) {
return EnumSet.of(NodeRole.CONTROLLER, NodeRole.BROKER).equals(roles);
}

public static boolean isPureBroker(Set<NodeRole> roles) {
return EnumSet.of(NodeRole.BROKER).equals(roles);
}

public static boolean hasBrokerRole(Set<NodeRole> roles) {
return roles.contains(NodeRole.BROKER);
}

public static boolean hasControllerRole(Set<NodeRole> roles) {
return roles.contains(NodeRole.CONTROLLER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public Optional<Object> construct(Class<?> clazz, Object... parameters) {
private KafkaConfig buildBrokerConfig(KafkaClusterConfig.ConfigHolder c) {
Properties properties = new Properties();
properties.putAll(c.getProperties());
var logsDir = getBrokerLogDir(c.getBrokerNum());
var logsDir = getBrokerLogDir(c.getNodeId());
properties.setProperty(KafkaConfig.LogDirProp(), logsDir.toAbsolutePath().toString());
LOGGER.log(System.Logger.Level.DEBUG, "Generated config {0}", properties);
return new KafkaConfig(properties);
Expand All @@ -174,18 +174,33 @@ private Path getBrokerLogDir(int brokerNum) {

@Override
public synchronized void start() {
// kraft mode: per-broker: 1 external port + 1 inter-broker port + 1 controller port + 1 anon port
// zk mode: per-cluster: 1 zk port; per-broker: 1 external port + 1 inter-broker port + 1 anon port
// zookeeper mode: per-cluster: 1 zk port; per-broker: 1 external port + 1 inter-broker port + 1 anon port
// kraft combined mode: per-broker: 1 external port + 1 inter-broker port + 1 controller port + 1 anon port
// kraft separate mode: per-controller: 1 controller port
// kraft separate mode: per-broker: 1 external port + 1 inter-broker port + 1 anon port
try (PortAllocator.PortAllocationSession portAllocationSession = portsAllocator.allocationSession()) {
portAllocationSession.allocate(Set.of(Listener.EXTERNAL, Listener.ANON, Listener.INTERNAL), 0, clusterConfig.getBrokersNum());
portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0, clusterConfig.isKraftMode() ? clusterConfig.getKraftControllers() : 1);
if (!clusterConfig.isKraftMode()) {
portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0);
}
for (int nodeId = 0; nodeId < clusterConfig.numNodes(); nodeId++) {
Set<Listener> listeners = new HashSet<>();
if (clusterConfig.hasControllerRole(nodeId)) {
listeners.add(Listener.CONTROLLER);
}
if (clusterConfig.hasBrokerRole(nodeId)) {
listeners.add(Listener.EXTERNAL);
listeners.add(Listener.ANON);
listeners.add(Listener.INTERNAL);
}
portAllocationSession.allocate(listeners, nodeId);
}
}

buildAndStartZookeeper();
clusterConfig.getBrokerConfigs(() -> this).parallel().forEach(configHolder -> {
maybeBuildAndStartZookeeper();
clusterConfig.getNodeConfigs(() -> this).parallel().forEach(configHolder -> {
final Server server = this.buildKafkaServer(configHolder);
tryToStartServerWithRetry(configHolder, server);
servers.put(configHolder.getBrokerNum(), server);
servers.put(configHolder.getNodeId(), server);
});
Utils.awaitExpectedBrokerCountInClusterViaTopic(
clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))), 120,
Expand All @@ -198,18 +213,18 @@ private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHol
.until(() -> {
// Hopefully we can remove this once a fix for https://issues.apache.org/jira/browse/KAFKA-14908 actually lands.
try {
LOGGER.log(System.Logger.Level.DEBUG, "Attempting to start node: {0} with roles: {1}", configHolder.getBrokerNum(),
LOGGER.log(System.Logger.Level.DEBUG, "Attempting to start node: {0} with roles: {1}", configHolder.getNodeId(),
configHolder.getProperties().get("process.roles"));
server.startup();
return true;
}
catch (Throwable t) {
LOGGER.log(System.Logger.Level.WARNING, "failed to start server due to: " + t.getMessage());
LOGGER.log(System.Logger.Level.WARNING, "anon: {0}, client: {1}, controller: {2}, interBroker: {3}, ",
this.getEndpointPair(Listener.ANON, configHolder.getBrokerNum()).getBind(),
this.getEndpointPair(Listener.EXTERNAL, configHolder.getBrokerNum()).getBind(),
this.getEndpointPair(Listener.CONTROLLER, configHolder.getBrokerNum()).getBind(),
this.getEndpointPair(Listener.EXTERNAL, configHolder.getBrokerNum()).getBind());
this.getEndpointPair(Listener.ANON, configHolder.getNodeId()).getBind(),
this.getEndpointPair(Listener.EXTERNAL, configHolder.getNodeId()).getBind(),
this.getEndpointPair(Listener.CONTROLLER, configHolder.getNodeId()).getBind(),
this.getEndpointPair(Listener.EXTERNAL, configHolder.getNodeId()).getBind());

server.shutdown();
server.awaitShutdown();
Expand All @@ -218,7 +233,7 @@ private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHol
});
}

private void buildAndStartZookeeper() {
private void maybeBuildAndStartZookeeper() {
if (!clusterConfig.isKraftMode()) {
try {
final int zookeeperPort = portsAllocator.getPort(Listener.CONTROLLER, 0);
Expand Down Expand Up @@ -274,7 +289,7 @@ public Map<String, Object> getKafkaClientConfiguration(String user, String passw
@Override
public synchronized int addBroker() {
// find next free kafka node.id
var first = IntStream.rangeClosed(0, getNumOfBrokers()).filter(cand -> !servers.containsKey(cand)).findFirst();
var first = IntStream.rangeClosed(0, numNodes()).filter(cand -> !servers.containsKey(cand)).findFirst();
if (first.isEmpty()) {
throw new IllegalStateException("Could not determine new nodeId, existing set " + servers.keySet());
}
Expand All @@ -297,16 +312,16 @@ public synchronized int addBroker() {
clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))), 120,
TimeUnit.SECONDS,
getNumOfBrokers());
return configHolder.getBrokerNum();
return configHolder.getNodeId();
}

@Override
public synchronized void removeBroker(int nodeId) throws IllegalArgumentException, UnsupportedOperationException {
if (!servers.containsKey(nodeId)) {
throw new IllegalArgumentException("Broker node " + nodeId + " is not a member of the cluster.");
throw new IllegalArgumentException("Node " + nodeId + " is not a member of the cluster.");
}
if (clusterConfig.isKraftMode() && isController(nodeId)) {
throw new UnsupportedOperationException("Cannot remove controller node " + nodeId + " from a kraft cluster.");
if (!clusterConfig.isPureBroker(nodeId)) {
throw new UnsupportedOperationException("Node " + nodeId + " is not a pure broker.");
}
if (servers.size() < 2) {
throw new IllegalArgumentException("Cannot remove a node from a cluster with only %d nodes".formatted(servers.size()));
Expand Down Expand Up @@ -434,6 +449,12 @@ private static void ensureDirectoryIsEmpty(Path path) {

@Override
public synchronized int getNumOfBrokers() {
return (int) servers.keySet().stream().filter(nodeId -> nodeId >= clusterConfig.numNodes() // added nodes are always brokers
|| clusterConfig.hasBrokerRole(nodeId))
.count(); // initial nodes have broker role depending on the metadata mode
}

public synchronized int numNodes() {
return servers.size();
}

Expand Down
Loading

0 comments on commit fcda683

Please sign in to comment.