Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka TLS Registry integration: include tls-configuration-name in Kafka config #43116

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package io.quarkus.kafka.client.runtime;

import java.util.*;
import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -33,8 +40,10 @@ void init() {
Map<String, Object> conf = new HashMap<>();
conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_ADMIN_CLIENT_TIMEOUT);
for (Map.Entry<String, Object> entry : config.entrySet()) {
if (AdminClientConfig.configNames().contains(entry.getKey())) {
conf.put(entry.getKey(), entry.getValue().toString());
String key = entry.getKey();
// include TLS config name if it has been configured
if (TLS_CONFIG_NAME_KEY.equals(key) || AdminClientConfig.configNames().contains(key)) {
conf.put(key, entry.getValue().toString());
}
}
client = AdminClient.create(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
@Singleton
public class KafkaRuntimeConfigProducer {

public static final String TLS_CONFIG_NAME_KEY = "tls-configuration-name";

// not "kafka.", because we also inspect env vars, which start with "KAFKA_"
private static final String CONFIG_PREFIX = "kafka";
private static final String UI_CONFIG_PREFIX = CONFIG_PREFIX + ".ui";
Expand Down Expand Up @@ -45,7 +47,7 @@ public Map<String, Object> createKafkaRuntimeConfig(Config config, ApplicationCo
.replace("_", ".");
String value = config.getOptionalValue(propertyName, String.class).orElse("");
result.put(effectivePropertyName, value);
if (effectivePropertyName.equals("tls-configuration-name")) {
if (effectivePropertyName.equals(TLS_CONFIG_NAME_KEY)) {
result.put("ssl.engine.factory.class", QuarkusKafkaSslEngineFactory.class.getName());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.kafka.client.tls;

import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY;

import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
Expand Down Expand Up @@ -92,7 +94,11 @@ public void close() throws IOException {

@Override
public void configure(Map<String, ?> configs) {
String tlsConfigName = (String) configs.get("tls-configuration-name");
String tlsConfigName = (String) configs.get(TLS_CONFIG_NAME_KEY);
if (tlsConfigName == null) {
throw new IllegalArgumentException(
"The 'tls-configuration-name' property is required for Kafka Quarkus TLS Registry integration.");
}

Instance<TlsConfigurationRegistry> tlsConfig = CDI.current().getBeanManager().createInstance()
.select(TlsConfigurationRegistry.class);
Expand All @@ -118,7 +124,7 @@ public void configure(Map<String, ?> configs) {
* @param configs the Kafka client configuration
*/
public static void checkForOtherSslConfigs(Map<String, ?> configs) {
String tlsConfigName = (String) configs.get("tls-configuration-name");
String tlsConfigName = (String) configs.get(TLS_CONFIG_NAME_KEY);
for (String sslConfig : KAFKA_SSL_CONFIGS) {
if (configs.containsKey(sslConfig)) {
log.warnf(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.kafka.streams.runtime;

import static io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer.TLS_CONFIG_NAME_KEY;
import static io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig.DEFAULT_KAFKA_BROKER;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -362,6 +363,10 @@ private static void waitForTopicsToBeCreated(Admin adminClient, Collection<Strin

private static Properties getAdminClientConfig(Properties properties) {
Properties adminClientConfig = new Properties(properties);
// include TLS config name if it has been configured
if (properties.containsKey(TLS_CONFIG_NAME_KEY)) {
adminClientConfig.put(TLS_CONFIG_NAME_KEY, properties.get(TLS_CONFIG_NAME_KEY));
}
// include other AdminClientConfig(s) that have been configured
for (final String knownAdminClientConfig : AdminClientConfig.configNames()) {
// give preference to admin.<propname> first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
Expand All @@ -17,6 +18,7 @@
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import io.quarkus.kafka.client.runtime.KafkaAdminClient;
import io.smallrye.common.annotation.Identifier;

/**
Expand All @@ -29,8 +31,13 @@ public class SslKafkaEndpoint {
@Identifier("default-kafka-broker")
Map<String, Object> kafkaConfig;

@Inject
KafkaAdminClient adminClient;

@GET
public String get(@QueryParam("format") CertificateFormat format) {
public String get(@QueryParam("format") CertificateFormat format) throws ExecutionException, InterruptedException {
// prevent admin client to be removed
adminClient.getTopics();
Consumer<Integer, String> consumer = createConsumer(format);
final ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(60000));
if (records.isEmpty()) {
Expand Down