Skip to content

Commit

Permalink
Merge pull request #42134 from ozangunalp/tls_config_messaging
Browse files Browse the repository at this point in the history
Tls Config for Messaging extensions
  • Loading branch information
ozangunalp authored Jul 26, 2024
2 parents 7e8c405 + 4ddfb60 commit 7daab53
Show file tree
Hide file tree
Showing 53 changed files with 1,196 additions and 244 deletions.
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.14.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.23.0</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>4.24.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.6.0</smallrye-stork.version>
<jakarta.activation.version>2.1.3</jakarta.activation.version>
<jakarta.annotation-api.version>3.0.0</jakarta.annotation-api.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@
import io.quarkus.runtime.util.ClassPathUtils;
import io.quarkus.tls.TlsConfiguration;
import io.quarkus.tls.TlsConfigurationRegistry;
import io.quarkus.tls.runtime.config.TlsConfigUtils;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.stork.Stork;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.SSLOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.grpc.client.GrpcClientChannel;

Expand Down Expand Up @@ -285,34 +285,7 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
}

if (configuration != null) {
if (configuration.getTrustStoreOptions() != null) {
options.setTrustOptions(configuration.getTrustStoreOptions());
}
if (configuration.getKeyStoreOptions() != null) {
options.setKeyCertOptions(configuration.getKeyStoreOptions());
}

options.setForceSni(configuration.usesSni());
if (configuration.isTrustAll()) {
options.setTrustAll(true);
}
if (configuration.getHostnameVerificationAlgorithm().isPresent()
&& configuration.getHostnameVerificationAlgorithm().get().equals("NONE")) {
// Only disable hostname verification if the algorithm is explicitly set to NONE
options.setVerifyHost(false);
}

SSLOptions sslOptions = configuration.getSSLOptions();
options.setSslHandshakeTimeout(sslOptions.getSslHandshakeTimeout());
options.setSslHandshakeTimeoutUnit(sslOptions.getSslHandshakeTimeoutUnit());
for (String suite : sslOptions.getEnabledCipherSuites()) {
options.addEnabledCipherSuite(suite);
}
for (Buffer buffer : sslOptions.getCrlValues()) {
options.addCrlValue(buffer);
}
options.setEnabledSecureTransportProtocols(sslOptions.getEnabledSecureTransportProtocols());

TlsConfigUtils.configure(options, configuration);
} else if (config.tls.enabled) {
TlsClientConfig tls = config.tls;
options.setSsl(true).setTrustAll(tls.trustAll);
Expand Down
4 changes: 2 additions & 2 deletions extensions/kafka-client/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-caffeine-deployment</artifactId>
<artifactId>quarkus-tls-registry-deployment</artifactId>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import io.quarkus.kafka.client.serialization.JsonbSerializer;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
import io.quarkus.kafka.client.serialization.ObjectMapperSerializer;
import io.quarkus.kafka.client.tls.QuarkusKafkaSslEngineFactory;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;

public class KafkaProcessor {
Expand Down Expand Up @@ -280,6 +281,8 @@ public void build(

handleAvro(reflectiveClass, proxies, serviceProviders, sslNativeSupport, capabilities);

reflectiveClass.produce(
ReflectiveClassBuildItem.builder(QuarkusKafkaSslEngineFactory.class).build());
}

@BuildStep(onlyIf = { HasSnappy.class, NativeOrNativeSourcesBuild.class })
Expand Down
2 changes: 1 addition & 1 deletion extensions/kafka-client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-caffeine</artifactId>
<artifactId>quarkus-tls-registry</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.eclipse.microprofile.config.Config;

import io.quarkus.arc.DefaultBean;
import io.quarkus.kafka.client.tls.QuarkusKafkaSslEngineFactory;
import io.quarkus.runtime.ApplicationConfig;
import io.smallrye.common.annotation.Identifier;

Expand Down Expand Up @@ -44,6 +45,9 @@ public Map<String, Object> createKafkaRuntimeConfig(Config config, ApplicationCo
.replace("_", ".");
String value = config.getOptionalValue(propertyName, String.class).orElse("");
result.put(effectivePropertyName, value);
if (effectivePropertyName.equals("tls-configuration-name")) {
result.put("ssl.engine.factory.class", QuarkusKafkaSslEngineFactory.class.getName());
}
}

if (!result.isEmpty() && !result.containsKey(GROUP_ID) && app.name.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package io.quarkus.kafka.client.tls;

import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
import java.util.Set;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;

import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.CDI;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SslEngineFactory;
import org.jboss.logging.Logger;

import io.quarkus.tls.TlsConfiguration;
import io.quarkus.tls.TlsConfigurationRegistry;

public class QuarkusKafkaSslEngineFactory implements SslEngineFactory {

private static final Logger log = Logger.getLogger(QuarkusKafkaSslEngineFactory.class);

/**
* Omits 'ssl.endpoint.identification.algorithm' because it is set by the user and it is not ignored
*/
private static final Set<String> KAFKA_SSL_CONFIGS = Set.of(
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
SslConfigs.SSL_KEY_PASSWORD_CONFIG,
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
SslConfigs.SSL_KEYSTORE_KEY_CONFIG,
SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG,
SslConfigs.SSL_PROTOCOL_CONFIG,
SslConfigs.SSL_PROVIDER_CONFIG,
SslConfigs.SSL_CIPHER_SUITES_CONFIG,
SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);

private TlsConfiguration configuration;
private SSLContext sslContext;

@Override
public SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification) {
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
sslEngine.setUseClientMode(true);
SSLParameters sslParameters = sslEngine.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm(endpointIdentification);
sslEngine.setSSLParameters(sslParameters);
return sslEngine;
}

@Override
public SSLEngine createServerSslEngine(String peerHost, int peerPort) {
throw new IllegalStateException("Server mode is not supported");
}

@Override
public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
return false;
}

@Override
public Set<String> reconfigurableConfigs() {
return Set.of();
}

@Override
public KeyStore keystore() {
return configuration.getKeyStore();
}

@Override
public KeyStore truststore() {
return configuration.getTrustStore();
}

@Override
public void close() throws IOException {
this.sslContext = null;
this.configuration = null;
}

@Override
public void configure(Map<String, ?> configs) {
String tlsConfigName = (String) configs.get("tls-configuration-name");

Instance<TlsConfigurationRegistry> tlsConfig = CDI.current().getBeanManager().createInstance()
.select(TlsConfigurationRegistry.class);
if (!tlsConfig.isUnsatisfied()) {
TlsConfigurationRegistry registry = tlsConfig.get();
configuration = registry.get(tlsConfigName)
.orElseThrow(() -> new IllegalArgumentException("No TLS configuration found for name " + tlsConfigName));
try {
sslContext = configuration.createSSLContext();
} catch (Exception e) {
throw new RuntimeException("Failed to create SSLContext", e);
}
String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
log.debugf("Configured Kafka client '%s' QuarkusKafkaSslEngineFactory with TLS configuration : %s",
clientId, tlsConfigName);
}

}

/**
* Check if any SSL configuration is set for the Kafka client that will be ignored because the TLS configuration is set
*
* @param configs the Kafka client configuration
*/
public static void checkForOtherSslConfigs(Map<String, ?> configs) {
String tlsConfigName = (String) configs.get("tls-configuration-name");
for (String sslConfig : KAFKA_SSL_CONFIGS) {
if (configs.containsKey(sslConfig)) {
log.warnf(
"The SSL configuration '%s' is set for Kafka client '%s' but it will be ignored because the TLS configuration '%s' is set",
sslConfig, configs.get(CommonClientConfigs.CLIENT_ID_CONFIG), tlsConfigName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.tls.TlsConfiguration;
import io.quarkus.tls.TlsConfigurationRegistry;
import io.quarkus.tls.runtime.config.TlsConfigUtils;
import io.smallrye.common.annotation.Identifier;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.SSLOptions;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisClientType;
import io.vertx.redis.client.RedisOptions;
Expand Down Expand Up @@ -216,38 +215,8 @@ private static void configureTLS(String name, RedisClientConfig config, TlsConfi
// Apply the configuration
if (configuration != null) {
// This part is often the same (or close) for every Vert.x client:
TlsConfigUtils.configure(net, configuration);
net.setSsl(tlsFromHosts);

if (configuration.getTrustStoreOptions() != null) {
net.setTrustOptions(configuration.getTrustStoreOptions());
}

// For mTLS:
if (configuration.getKeyStoreOptions() != null) {
net.setKeyCertOptions(configuration.getKeyStoreOptions());
}

if (configuration.isTrustAll()) {
net.setTrustAll(true);
}
if (configuration.getHostnameVerificationAlgorithm().isPresent()) {
net.setHostnameVerificationAlgorithm(configuration.getHostnameVerificationAlgorithm().get());
}

SSLOptions sslOptions = configuration.getSSLOptions();
if (sslOptions != null) {
net.setSslHandshakeTimeout(sslOptions.getSslHandshakeTimeout());
net.setSslHandshakeTimeoutUnit(sslOptions.getSslHandshakeTimeoutUnit());
for (String suite : sslOptions.getEnabledCipherSuites()) {
net.addEnabledCipherSuite(suite);
}
for (Buffer buffer : sslOptions.getCrlValues()) {
net.addCrlValue(buffer);
}
net.setEnabledSecureTransportProtocols(sslOptions.getEnabledSecureTransportProtocols());
net.setUseAlpn(sslOptions.isUseAlpn());
}

} else {
config.tcp().alpn().ifPresent(net::setUseAlpn);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-tls-registry-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.quarkus.smallrye.reactivemessaging.amqp.deployment;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.smallrye.reactivemessaging.amqp.runtime.AmqpClientConfigCustomizer;

public class SmallRyeReactiveMessagingAmqpProcessor {

Expand All @@ -11,4 +13,11 @@ FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.MESSAGING_AMQP);
}

@BuildStep
AdditionalBeanBuildItem build() {
return AdditionalBeanBuildItem.builder()
.addBeanClass(AmqpClientConfigCustomizer.class)
.setUnremovable()
.build();
}
}
4 changes: 4 additions & 0 deletions extensions/smallrye-reactive-messaging-amqp/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-tls-registry</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.quarkus.smallrye.reactivemessaging.amqp.runtime;

import java.util.Optional;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.Config;
import org.jboss.logging.Logger;

import io.quarkus.tls.TlsConfiguration;
import io.quarkus.tls.TlsConfigurationRegistry;
import io.quarkus.tls.runtime.config.TlsConfigUtils;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.vertx.amqp.AmqpClientOptions;

@ApplicationScoped
public class AmqpClientConfigCustomizer implements ClientCustomizer<AmqpClientOptions> {

private static final Logger log = Logger.getLogger(AmqpClientConfigCustomizer.class);

@Inject
TlsConfigurationRegistry tlsRegistry;

@Override
public AmqpClientOptions customize(String channel, Config channelConfig, AmqpClientOptions options) {
Optional<String> tlsConfigName = channelConfig.getOptionalValue("tls-configuration-name", String.class);
if (tlsConfigName.isPresent()) {
String tlsConfig = tlsConfigName.get();
Optional<TlsConfiguration> maybeTlsConfig = tlsRegistry.get(tlsConfig);
if (maybeTlsConfig.isPresent()) {
TlsConfigUtils.configure(options, maybeTlsConfig.get());
log.debugf("Configured RabbitMQOptions for channel %s with TLS configuration %s", channel, tlsConfig);
}
}
return options;
}
}
Loading

0 comments on commit 7daab53

Please sign in to comment.