Skip to content

Commit

Permalink
spring-projectsGH-3662: Support Kafka 3.9.0+ in EmbeddedKafkaKraftBroker
Browse files Browse the repository at this point in the history
Fixes: spring-projects#3662
Issue: spring-projects#3662

Add compatibility for both Kafka 3.8.0 and 3.9.0+ by handling different method signatures for setConfigProp:
- 3.9.0+: setConfigProp(String, Object)
- 3.8.0: setConfigProp(String, String)

The change uses reflection to detect Kafka version and call appropriate method.
  • Loading branch information
sobychacko committed Dec 12, 2024
1 parent fe71001 commit 85153c2
Showing 1 changed file with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
Expand Down Expand Up @@ -77,6 +79,8 @@
*/
public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {

private static final String CLASS_EXISTS_ONLY_IN_390 = "org.apache.kafka.server.config.AbstractKafkaConfig";

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(EmbeddedKafkaKraftBroker.class));

/**
Expand Down Expand Up @@ -217,7 +221,7 @@ private void start() {
.setNumBrokerNodes(this.count)
.setNumControllerNodes(this.count)
.build());
this.brokerProperties.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, (String) v));
this.brokerProperties.forEach((k, v) -> setConfigProperty(clusterBuilder, k, v));
this.cluster = clusterBuilder.build();
}
catch (Exception ex) {
Expand All @@ -243,6 +247,37 @@ private void start() {
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
}

private static void setConfigProperty(Object clusterBuilder, Object key, Object value) {
try {
boolean isKafka39OrLater = isClassicConsumerPresent();
Class<?> builderClass = clusterBuilder.getClass();

if (isKafka39OrLater) {
// For Kafka 3.9.0+: setConfigProp(String, Object)
Method setConfigMethod = builderClass.getMethod("setConfigProp", String.class, Object.class);
setConfigMethod.invoke(clusterBuilder, (String) key, value);
}
else {
// For Kafka 3.8.0: setConfigProp(String, String)
Method setConfigMethod = builderClass.getMethod("setConfigProp", String.class, String.class);
setConfigMethod.invoke(clusterBuilder, (String) key, (String) value);
}
}
catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Failed to set config property", e);
}
}

private static boolean isClassicConsumerPresent() {
try {
Class.forName(CLASS_EXISTS_ONLY_IN_390);
return true; // Class exists - Kafka 3.9.0+
}
catch (ClassNotFoundException e) {
return false; // Class doesn't exist - Kafka 3.8.0
}
}

@Override
public void destroy() {
AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
Expand Down

0 comments on commit 85153c2

Please sign in to comment.