From 965bc6990a510cb2189e245bf77216fd837db6d4 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Sat, 7 Sep 2024 21:03:07 +0200 Subject: [PATCH] Kafka TLS Registry integration: include tls-configuration-name in Kafka config, when it is configured Fixes #43107 --- .../kafka/client/runtime/KafkaAdminClient.java | 15 ++++++++++++--- .../runtime/KafkaRuntimeConfigProducer.java | 4 +++- .../client/tls/QuarkusKafkaSslEngineFactory.java | 10 ++++++++-- .../streams/runtime/KafkaStreamsProducer.java | 5 +++++ .../io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java | 9 ++++++++- 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java index 1600561f3f1ef..0ced0504abcda 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java @@ -1,6 +1,13 @@ package io.quarkus.kafka.client.runtime; -import java.util.*; +import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -33,8 +40,10 @@ void init() { Map conf = new HashMap<>(); conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_ADMIN_CLIENT_TIMEOUT); for (Map.Entry entry : config.entrySet()) { - if (AdminClientConfig.configNames().contains(entry.getKey())) { - conf.put(entry.getKey(), entry.getValue().toString()); + String key = entry.getKey(); + // include TLS config name if it has been configured + if (TLS_CONFIG_NAME_KEY.equals(key) || AdminClientConfig.configNames().contains(key)) { + conf.put(key, entry.getValue().toString()); } } client = AdminClient.create(conf); diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java index 7490541fc174d..ff679cd61c80e 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java @@ -16,6 +16,8 @@ @Singleton public class KafkaRuntimeConfigProducer { + public static final String TLS_CONFIG_NAME_KEY = "tls-configuration-name"; + // not "kafka.", because we also inspect env vars, which start with "KAFKA_" private static final String CONFIG_PREFIX = "kafka"; private static final String UI_CONFIG_PREFIX = CONFIG_PREFIX + ".ui"; @@ -45,7 +47,7 @@ public Map createKafkaRuntimeConfig(Config config, ApplicationCo .replace("_", "."); String value = config.getOptionalValue(propertyName, String.class).orElse(""); result.put(effectivePropertyName, value); - if (effectivePropertyName.equals("tls-configuration-name")) { + if (effectivePropertyName.equals(TLS_CONFIG_NAME_KEY)) { result.put("ssl.engine.factory.class", QuarkusKafkaSslEngineFactory.class.getName()); } } diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java index 42aab3be3e781..1ebd9d795aed1 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java @@ -1,5 +1,7 @@ package io.quarkus.kafka.client.tls; +import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY; + import java.io.IOException; import java.security.KeyStore; import java.util.Map; @@ -92,7 +94,11 @@ public void close() throws IOException { @Override public void configure(Map configs) { - String tlsConfigName = (String) configs.get("tls-configuration-name"); + String tlsConfigName = (String) configs.get(TLS_CONFIG_NAME_KEY); + if (tlsConfigName == null) { + throw new IllegalArgumentException( + "The 'tls-configuration-name' property is required for Kafka Quarkus TLS Registry integration."); + } Instance tlsConfig = CDI.current().getBeanManager().createInstance() .select(TlsConfigurationRegistry.class); @@ -118,7 +124,7 @@ public void configure(Map configs) { * @param configs the Kafka client configuration */ public static void checkForOtherSslConfigs(Map configs) { - String tlsConfigName = (String) configs.get("tls-configuration-name"); + String tlsConfigName = (String) configs.get(TLS_CONFIG_NAME_KEY); for (String sslConfig : KAFKA_SSL_CONFIGS) { if (configs.containsKey(sslConfig)) { log.warnf( diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java index 604017f831591..570ab735c8f07 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java @@ -1,5 +1,6 @@ package io.quarkus.kafka.streams.runtime; +import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY; import static io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig.DEFAULT_KAFKA_BROKER; import java.net.InetSocketAddress; @@ -362,6 +363,10 @@ private static void waitForTopicsToBeCreated(Admin adminClient, Collection first diff --git a/integration-tests/kafka-ssl/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java b/integration-tests/kafka-ssl/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java index 8f1f00f7df085..0a6fff74a6c50 100644 --- a/integration-tests/kafka-ssl/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java +++ b/integration-tests/kafka-ssl/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java @@ -4,6 +4,7 @@ import java.util.Collections; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; import jakarta.inject.Inject; import jakarta.ws.rs.GET; @@ -17,6 +18,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; +import io.quarkus.kafka.client.runtime.KafkaAdminClient; import io.smallrye.common.annotation.Identifier; /** @@ -29,8 +31,13 @@ public class SslKafkaEndpoint { @Identifier("default-kafka-broker") Map kafkaConfig; + @Inject + KafkaAdminClient adminClient; + @GET - public String get(@QueryParam("format") CertificateFormat format) { + public String get(@QueryParam("format") CertificateFormat format) throws ExecutionException, InterruptedException { + // prevent admin client to be removed + adminClient.getTopics(); Consumer consumer = createConsumer(format); final ConsumerRecords records = consumer.poll(Duration.ofMillis(60000)); if (records.isEmpty()) {