-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-3662: Support Kafka 3.9.0+ in EmbeddedKafkaKraftBroker #3665
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ | |
|
||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.lang.reflect.InvocationTargetException; | ||
import java.lang.reflect.Method; | ||
import java.nio.file.Files; | ||
import java.time.Duration; | ||
import java.util.AbstractMap.SimpleEntry; | ||
|
@@ -77,6 +79,8 @@ | |
*/ | ||
public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker { | ||
|
||
private static final String CLASS_EXISTS_ONLY_IN_390 = "org.apache.kafka.server.config.AbstractKafkaConfig"; | ||
|
||
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(EmbeddedKafkaKraftBroker.class)); | ||
|
||
/** | ||
|
@@ -217,7 +221,7 @@ private void start() { | |
.setNumBrokerNodes(this.count) | ||
.setNumControllerNodes(this.count) | ||
.build()); | ||
this.brokerProperties.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, (String) v)); | ||
this.brokerProperties.forEach((k, v) -> setConfigProperty(clusterBuilder, k, v)); | ||
this.cluster = clusterBuilder.build(); | ||
} | ||
catch (Exception ex) { | ||
|
@@ -243,6 +247,37 @@ private void start() { | |
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString()); | ||
} | ||
|
||
private static void setConfigProperty(Object clusterBuilder, Object key, Object value) { | ||
try { | ||
boolean isKafka39OrLater = isClassicConsumerPresent(); | ||
Class<?> builderClass = clusterBuilder.getClass(); | ||
|
||
if (isKafka39OrLater) { | ||
// For Kafka 3.9.0+: setConfigProp(String, Object) | ||
Method setConfigMethod = builderClass.getMethod("setConfigProp", String.class, Object.class); | ||
setConfigMethod.invoke(clusterBuilder, (String) key, value); | ||
} | ||
else { | ||
// For Kafka 3.8.0: setConfigProp(String, String) | ||
Method setConfigMethod = builderClass.getMethod("setConfigProp", String.class, String.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need a reflection for this method? Please, consider to find method via reflection only one in |
||
setConfigMethod.invoke(clusterBuilder, (String) key, (String) value); | ||
} | ||
} | ||
catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { | ||
throw new RuntimeException("Failed to set config property", e); | ||
} | ||
} | ||
|
||
private static boolean isClassicConsumerPresent() { | ||
try { | ||
Class.forName(CLASS_EXISTS_ONLY_IN_390); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See |
||
return true; // Class exists - Kafka 3.9.0+ | ||
} | ||
catch (ClassNotFoundException e) { | ||
return false; // Class doesn't exist - Kafka 3.8.0 | ||
} | ||
} | ||
|
||
@Override | ||
public void destroy() { | ||
AtomicReference<Throwable> shutdownFailure = new AtomicReference<>(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also see
ReflectionUtils.findMethod()
andReflectionUtils.invokeMethod()
.You won't need to deal with any exceptions then!