From 831dba12c0603b7d19104e38296ac4e88aa07110 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 13 Dec 2024 12:20:27 -0500 Subject: [PATCH] GH-3662: Support Kafka 3.9.0+ in EmbeddedKafkaKraftBroker Fixes: #3662 Issue: https://github.com/spring-projects/spring-kafka/issues/3662 Add compatibility for both Kafka 3.8.0 and 3.9.0+ by handling different method signatures for setConfigProp: - 3.9.0+: setConfigProp(String, Object) - 3.8.0: setConfigProp(String, String) The change uses reflection to detect Kafka version and call appropriate method. * Direct call to setConfigProp when using 3.8.0 * Addressing PR review * Addressing PR review --- .../kafka/test/EmbeddedKafkaKraftBroker.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 77b14b8a0..9566c3f1a 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.lang.reflect.Method; import java.nio.file.Files; import java.time.Duration; import java.util.AbstractMap.SimpleEntry; @@ -56,6 +57,8 @@ import org.springframework.core.log.LogAccessor; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; /** * An embedded Kafka Broker(s) using KRaft. @@ -87,6 +90,23 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker { public static final int DEFAULT_ADMIN_TIMEOUT = 10; + private static final boolean IS_KAFKA_39_OR_LATER = ClassUtils.isPresent( + "org.apache.kafka.server.config.AbstractKafkaConfig", EmbeddedKafkaKraftBroker.class.getClassLoader()); + + private static final Method SET_CONFIG_METHOD; + + static { + if (IS_KAFKA_39_OR_LATER) { + SET_CONFIG_METHOD = ReflectionUtils.findMethod( + KafkaClusterTestKit.Builder.class, + "setConfigProp", + String.class, Object.class); + } + else { + SET_CONFIG_METHOD = null; + } + } + private final int count; private final Set topics; @@ -212,7 +232,7 @@ private void start() { .setNumBrokerNodes(this.count) .setNumControllerNodes(this.count) .build()); - this.brokerProperties.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, (String) v)); + this.brokerProperties.forEach((k, v) -> setConfigProperty(clusterBuilder, (String) k, v)); this.cluster = clusterBuilder.build(); } catch (Exception ex) { @@ -238,6 +258,17 @@ private void start() { System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString()); } + private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder, String key, Object value) { + if (IS_KAFKA_39_OR_LATER) { + // For Kafka 3.9.0+: use reflection + ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, key, value); + } + else { + // For Kafka 3.8.0: direct call + clusterBuilder.setConfigProp(key, (String) value); + } + } + @Override public void destroy() { AtomicReference shutdownFailure = new AtomicReference<>();