Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka TLS Registry integration: include tls-configuration-name in Kafka config #43116

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Kafka TLS Registry integration: include tls-configuration-name in Kaf…
…ka config, when it is configured

Fixes #43107
  • Loading branch information
ozangunalp committed Sep 9, 2024
commit 965bc6990a510cb2189e245bf77216fd837db6d4
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package io.quarkus.kafka.client.runtime;

import java.util.*;
import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

@@ -33,8 +40,10 @@ void init() {
Map<String, Object> conf = new HashMap<>();
conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_ADMIN_CLIENT_TIMEOUT);
for (Map.Entry<String, Object> entry : config.entrySet()) {
if (AdminClientConfig.configNames().contains(entry.getKey())) {
conf.put(entry.getKey(), entry.getValue().toString());
String key = entry.getKey();
// include TLS config name if it has been configured
if (TLS_CONFIG_NAME_KEY.equals(key) || AdminClientConfig.configNames().contains(key)) {
conf.put(key, entry.getValue().toString());
}
}
client = AdminClient.create(conf);
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@
@Singleton
public class KafkaRuntimeConfigProducer {

public static final String TLS_CONFIG_NAME_KEY = "tls-configuration-name";

// not "kafka.", because we also inspect env vars, which start with "KAFKA_"
private static final String CONFIG_PREFIX = "kafka";
private static final String UI_CONFIG_PREFIX = CONFIG_PREFIX + ".ui";
@@ -45,7 +47,7 @@ public Map<String, Object> createKafkaRuntimeConfig(Config config, ApplicationCo
.replace("_", ".");
String value = config.getOptionalValue(propertyName, String.class).orElse("");
result.put(effectivePropertyName, value);
if (effectivePropertyName.equals("tls-configuration-name")) {
if (effectivePropertyName.equals(TLS_CONFIG_NAME_KEY)) {
result.put("ssl.engine.factory.class", QuarkusKafkaSslEngineFactory.class.getName());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.kafka.client.tls;

import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY;

import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
@@ -92,7 +94,11 @@ public void close() throws IOException {

@Override
public void configure(Map<String, ?> configs) {
String tlsConfigName = (String) configs.get("tls-configuration-name");
String tlsConfigName = (String) configs.get(TLS_CONFIG_NAME_KEY);
if (tlsConfigName == null) {
throw new IllegalArgumentException(
"The 'tls-configuration-name' property is required for Kafka Quarkus TLS Registry integration.");
}

Instance<TlsConfigurationRegistry> tlsConfig = CDI.current().getBeanManager().createInstance()
.select(TlsConfigurationRegistry.class);
@@ -118,7 +124,7 @@ public void configure(Map<String, ?> configs) {
* @param configs the Kafka client configuration
*/
public static void checkForOtherSslConfigs(Map<String, ?> configs) {
String tlsConfigName = (String) configs.get("tls-configuration-name");
String tlsConfigName = (String) configs.get(TLS_CONFIG_NAME_KEY);
for (String sslConfig : KAFKA_SSL_CONFIGS) {
if (configs.containsKey(sslConfig)) {
log.warnf(
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.kafka.streams.runtime;

import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY;
import static io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig.DEFAULT_KAFKA_BROKER;

import java.net.InetSocketAddress;
@@ -362,6 +363,10 @@ private static void waitForTopicsToBeCreated(Admin adminClient, Collection<Strin

private static Properties getAdminClientConfig(Properties properties) {
Properties adminClientConfig = new Properties(properties);
// include TLS config name if it has been configured
if (properties.containsKey(TLS_CONFIG_NAME_KEY)) {
adminClientConfig.put(TLS_CONFIG_NAME_KEY, properties.get(TLS_CONFIG_NAME_KEY));
}
// include other AdminClientConfig(s) that have been configured
for (final String knownAdminClientConfig : AdminClientConfig.configNames()) {
// give preference to admin.<propname> first
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
@@ -17,6 +18,7 @@
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import io.quarkus.kafka.client.runtime.KafkaAdminClient;
import io.smallrye.common.annotation.Identifier;

/**
@@ -29,8 +31,13 @@ public class SslKafkaEndpoint {
@Identifier("default-kafka-broker")
Map<String, Object> kafkaConfig;

@Inject
KafkaAdminClient adminClient;

@GET
public String get(@QueryParam("format") CertificateFormat format) {
public String get(@QueryParam("format") CertificateFormat format) throws ExecutionException, InterruptedException {
// prevent admin client to be removed
adminClient.getTopics();
Consumer<Integer, String> consumer = createConsumer(format);
final ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(60000));
if (records.isEmpty()) {