Skip to content

Commit

Permalink
Kafka TLS Registry integration: include tls-configuration-name in Kaf…
Browse files Browse the repository at this point in the history
…ka config, when it is configured

Fixes #43107
  • Loading branch information
ozangunalp committed Sep 9, 2024
1 parent 646c96b commit 965bc69
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package io.quarkus.kafka.client.runtime;

import java.util.*;
import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -33,8 +40,10 @@ void init() {
Map<String, Object> conf = new HashMap<>();
conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_ADMIN_CLIENT_TIMEOUT);
for (Map.Entry<String, Object> entry : config.entrySet()) {
if (AdminClientConfig.configNames().contains(entry.getKey())) {
conf.put(entry.getKey(), entry.getValue().toString());
String key = entry.getKey();
// include TLS config name if it has been configured
if (TLS_CONFIG_NAME_KEY.equals(key) || AdminClientConfig.configNames().contains(key)) {
conf.put(key, entry.getValue().toString());
}
}
client = AdminClient.create(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
@Singleton
public class KafkaRuntimeConfigProducer {

public static final String TLS_CONFIG_NAME_KEY = "tls-configuration-name";

// not "kafka.", because we also inspect env vars, which start with "KAFKA_"
private static final String CONFIG_PREFIX = "kafka";
private static final String UI_CONFIG_PREFIX = CONFIG_PREFIX + ".ui";
Expand Down Expand Up @@ -45,7 +47,7 @@ public Map<String, Object> createKafkaRuntimeConfig(Config config, ApplicationCo
.replace("_", ".");
String value = config.getOptionalValue(propertyName, String.class).orElse("");
result.put(effectivePropertyName, value);
if (effectivePropertyName.equals("tls-configuration-name")) {
if (effectivePropertyName.equals(TLS_CONFIG_NAME_KEY)) {
result.put("ssl.engine.factory.class", QuarkusKafkaSslEngineFactory.class.getName());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.kafka.client.tls;

import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY;

import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
Expand Down Expand Up @@ -92,7 +94,11 @@ public void close() throws IOException {

@Override
public void configure(Map<String, ?> configs) {
String tlsConfigName = (String) configs.get("tls-configuration-name");
String tlsConfigName = (String) configs.get(TLS_CONFIG_NAME_KEY);
if (tlsConfigName == null) {
throw new IllegalArgumentException(
"The 'tls-configuration-name' property is required for Kafka Quarkus TLS Registry integration.");
}

Instance<TlsConfigurationRegistry> tlsConfig = CDI.current().getBeanManager().createInstance()
.select(TlsConfigurationRegistry.class);
Expand All @@ -118,7 +124,7 @@ public void configure(Map<String, ?> configs) {
* @param configs the Kafka client configuration
*/
public static void checkForOtherSslConfigs(Map<String, ?> configs) {
String tlsConfigName = (String) configs.get("tls-configuration-name");
String tlsConfigName = (String) configs.get(TLS_CONFIG_NAME_KEY);
for (String sslConfig : KAFKA_SSL_CONFIGS) {
if (configs.containsKey(sslConfig)) {
log.warnf(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.kafka.streams.runtime;

import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY;
import static io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig.DEFAULT_KAFKA_BROKER;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -362,6 +363,10 @@ private static void waitForTopicsToBeCreated(Admin adminClient, Collection<Strin

private static Properties getAdminClientConfig(Properties properties) {
Properties adminClientConfig = new Properties(properties);
// include TLS config name if it has been configured
if (properties.containsKey(TLS_CONFIG_NAME_KEY)) {
adminClientConfig.put(TLS_CONFIG_NAME_KEY, properties.get(TLS_CONFIG_NAME_KEY));
}
// include other AdminClientConfig(s) that have been configured
for (final String knownAdminClientConfig : AdminClientConfig.configNames()) {
// give preference to admin.<propname> first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
Expand All @@ -17,6 +18,7 @@
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import io.quarkus.kafka.client.runtime.KafkaAdminClient;
import io.smallrye.common.annotation.Identifier;

/**
Expand All @@ -29,8 +31,13 @@ public class SslKafkaEndpoint {
@Identifier("default-kafka-broker")
Map<String, Object> kafkaConfig;

@Inject
KafkaAdminClient adminClient;

@GET
public String get(@QueryParam("format") CertificateFormat format) {
public String get(@QueryParam("format") CertificateFormat format) throws ExecutionException, InterruptedException {
// prevent admin client to be removed
adminClient.getTopics();
Consumer<Integer, String> consumer = createConsumer(format);
final ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(60000));
if (records.isEmpty()) {
Expand Down

0 comments on commit 965bc69

Please sign in to comment.