From f34a35598f06200fdd14106013c90f9b60635bee Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Wed, 17 Jul 2024 15:53:53 +0300 Subject: [PATCH 1/4] TLS config utils : configure TLS options for Vert.x-based clients --- .../grpc/runtime/supports/Channels.java | 31 +----- .../client/VertxRedisClientFactory.java | 35 +------ .../tls/runtime/config/TlsConfigUtils.java | 97 +++++++++++++++++++ .../next/runtime/WebSocketConnectorBase.java | 38 +------- 4 files changed, 103 insertions(+), 98 deletions(-) diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java index 248592ca448ba..c7e3b159759bd 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java @@ -69,6 +69,7 @@ import io.quarkus.runtime.util.ClassPathUtils; import io.quarkus.tls.TlsConfiguration; import io.quarkus.tls.TlsConfigurationRegistry; +import io.quarkus.tls.runtime.config.TlsConfigUtils; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.stork.Stork; import io.vertx.core.Vertx; @@ -76,7 +77,6 @@ import io.vertx.core.http.HttpClientOptions; import io.vertx.core.net.PemKeyCertOptions; import io.vertx.core.net.PemTrustOptions; -import io.vertx.core.net.SSLOptions; import io.vertx.core.net.SocketAddress; import io.vertx.grpc.client.GrpcClientChannel; @@ -285,34 +285,7 @@ public static Channel createChannel(String name, Set perClientIntercepto } if (configuration != null) { - if (configuration.getTrustStoreOptions() != null) { - options.setTrustOptions(configuration.getTrustStoreOptions()); - } - if (configuration.getKeyStoreOptions() != null) { - options.setKeyCertOptions(configuration.getKeyStoreOptions()); - } - - options.setForceSni(configuration.usesSni()); - if (configuration.isTrustAll()) { - options.setTrustAll(true); - } - if (configuration.getHostnameVerificationAlgorithm().isPresent() - && configuration.getHostnameVerificationAlgorithm().get().equals("NONE")) { - // Only disable hostname verification if the algorithm is explicitly set to NONE - options.setVerifyHost(false); - } - - SSLOptions sslOptions = configuration.getSSLOptions(); - options.setSslHandshakeTimeout(sslOptions.getSslHandshakeTimeout()); - options.setSslHandshakeTimeoutUnit(sslOptions.getSslHandshakeTimeoutUnit()); - for (String suite : sslOptions.getEnabledCipherSuites()) { - options.addEnabledCipherSuite(suite); - } - for (Buffer buffer : sslOptions.getCrlValues()) { - options.addCrlValue(buffer); - } - options.setEnabledSecureTransportProtocols(sslOptions.getEnabledSecureTransportProtocols()); - + TlsConfigUtils.configure(options, configuration); } else if (config.tls.enabled) { TlsClientConfig tls = config.tls; options.setSsl(true).setTrustAll(tls.trustAll); diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/VertxRedisClientFactory.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/VertxRedisClientFactory.java index a4ea37284e844..524c83e458ac6 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/VertxRedisClientFactory.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/VertxRedisClientFactory.java @@ -26,12 +26,11 @@ import io.quarkus.runtime.configuration.ConfigurationException; import io.quarkus.tls.TlsConfiguration; import io.quarkus.tls.TlsConfigurationRegistry; +import io.quarkus.tls.runtime.config.TlsConfigUtils; import io.smallrye.common.annotation.Identifier; import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClientOptions; import io.vertx.core.net.ProxyOptions; -import io.vertx.core.net.SSLOptions; import io.vertx.redis.client.Redis; import io.vertx.redis.client.RedisClientType; import io.vertx.redis.client.RedisOptions; @@ -216,38 +215,8 @@ private static void configureTLS(String name, RedisClientConfig config, TlsConfi // Apply the configuration if (configuration != null) { // This part is often the same (or close) for every Vert.x client: + TlsConfigUtils.configure(net, configuration); net.setSsl(tlsFromHosts); - - if (configuration.getTrustStoreOptions() != null) { - net.setTrustOptions(configuration.getTrustStoreOptions()); - } - - // For mTLS: - if (configuration.getKeyStoreOptions() != null) { - net.setKeyCertOptions(configuration.getKeyStoreOptions()); - } - - if (configuration.isTrustAll()) { - net.setTrustAll(true); - } - if (configuration.getHostnameVerificationAlgorithm().isPresent()) { - net.setHostnameVerificationAlgorithm(configuration.getHostnameVerificationAlgorithm().get()); - } - - SSLOptions sslOptions = configuration.getSSLOptions(); - if (sslOptions != null) { - net.setSslHandshakeTimeout(sslOptions.getSslHandshakeTimeout()); - net.setSslHandshakeTimeoutUnit(sslOptions.getSslHandshakeTimeoutUnit()); - for (String suite : sslOptions.getEnabledCipherSuites()) { - net.addEnabledCipherSuite(suite); - } - for (Buffer buffer : sslOptions.getCrlValues()) { - net.addCrlValue(buffer); - } - net.setEnabledSecureTransportProtocols(sslOptions.getEnabledSecureTransportProtocols()); - net.setUseAlpn(sslOptions.isUseAlpn()); - } - } else { config.tcp().alpn().ifPresent(net::setUseAlpn); diff --git a/extensions/tls-registry/runtime/src/main/java/io/quarkus/tls/runtime/config/TlsConfigUtils.java b/extensions/tls-registry/runtime/src/main/java/io/quarkus/tls/runtime/config/TlsConfigUtils.java index 05140f08e5ec1..567d25d7b4de4 100644 --- a/extensions/tls-registry/runtime/src/main/java/io/quarkus/tls/runtime/config/TlsConfigUtils.java +++ b/extensions/tls-registry/runtime/src/main/java/io/quarkus/tls/runtime/config/TlsConfigUtils.java @@ -7,6 +7,14 @@ import java.nio.file.Path; import io.quarkus.runtime.util.ClassPathUtils; +import io.quarkus.tls.TlsConfiguration; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.WebSocketClientOptions; +import io.vertx.core.net.ClientOptionsBase; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.SSLOptions; +import io.vertx.core.net.TCPSSLOptions; public class TlsConfigUtils { @@ -42,4 +50,93 @@ public static byte[] read(Path path) { return data; } + /** + * Configure the {@link TCPSSLOptions} with the given {@link TlsConfiguration}. + * + * @param options the options to configure + * @param configuration the configuration to use + */ + public static void configure(TCPSSLOptions options, TlsConfiguration configuration) { + options.setSsl(true); + if (configuration.getTrustStoreOptions() != null) { + options.setTrustOptions(configuration.getTrustStoreOptions()); + } + + // For mTLS: + if (configuration.getKeyStoreOptions() != null) { + options.setKeyCertOptions(configuration.getKeyStoreOptions()); + } + + SSLOptions sslOptions = configuration.getSSLOptions(); + if (sslOptions != null) { + options.setSslHandshakeTimeout(sslOptions.getSslHandshakeTimeout()); + options.setSslHandshakeTimeoutUnit(sslOptions.getSslHandshakeTimeoutUnit()); + for (String suite : sslOptions.getEnabledCipherSuites()) { + options.addEnabledCipherSuite(suite); + } + for (Buffer buffer : sslOptions.getCrlValues()) { + options.addCrlValue(buffer); + } + options.setEnabledSecureTransportProtocols(sslOptions.getEnabledSecureTransportProtocols()); + options.setUseAlpn(sslOptions.isUseAlpn()); + } + } + + /** + * Configure the {@link ClientOptionsBase} with the given {@link TlsConfiguration}. + * + * @param options the options to configure + * @param configuration the configuration to use + */ + public static void configure(ClientOptionsBase options, TlsConfiguration configuration) { + configure((TCPSSLOptions) options, configuration); + if (configuration.isTrustAll()) { + options.setTrustAll(true); + } + } + + /** + * Configure the {@link NetClientOptions} with the given {@link TlsConfiguration}. + * + * @param options the options to configure + * @param configuration the configuration to use + */ + public static void configure(NetClientOptions options, TlsConfiguration configuration) { + configure((ClientOptionsBase) options, configuration); + if (configuration.getHostnameVerificationAlgorithm().isPresent()) { + options.setHostnameVerificationAlgorithm(configuration.getHostnameVerificationAlgorithm().get()); + } + } + + /** + * Configure the {@link HttpClientOptions} with the given {@link TlsConfiguration}. + * + * @param options the options to configure + * @param configuration the configuration to use + */ + public static void configure(HttpClientOptions options, TlsConfiguration configuration) { + configure((ClientOptionsBase) options, configuration); + options.setForceSni(configuration.usesSni()); + if (configuration.getHostnameVerificationAlgorithm().isPresent() + && configuration.getHostnameVerificationAlgorithm().get().equals("NONE")) { + // Only disable hostname verification if the algorithm is explicitly set to NONE + options.setVerifyHost(false); + } + } + + /** + * Configure the {@link WebSocketClientOptions} with the given {@link TlsConfiguration}. + * + * @param options the options to configure + * @param configuration the configuration to use + */ + public static void configure(WebSocketClientOptions options, TlsConfiguration configuration) { + configure((ClientOptionsBase) options, configuration); + if (configuration.getHostnameVerificationAlgorithm().isPresent() + && configuration.getHostnameVerificationAlgorithm().get().equals("NONE")) { + // Only disable hostname verification if the algorithm is explicitly set to NONE + options.setVerifyHost(false); + } + } + } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorBase.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorBase.java index 1556899636c9f..1a878b6b6cb18 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorBase.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorBase.java @@ -16,13 +16,12 @@ import io.quarkus.tls.TlsConfiguration; import io.quarkus.tls.TlsConfigurationRegistry; +import io.quarkus.tls.runtime.config.TlsConfigUtils; import io.quarkus.websockets.next.WebSocketClientException; import io.quarkus.websockets.next.WebSocketsClientRuntimeConfig; import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; import io.vertx.core.http.WebSocketClientOptions; import io.vertx.core.http.WebSocketConnectOptions; -import io.vertx.core.net.SSLOptions; abstract class WebSocketConnectorBase> { @@ -155,40 +154,7 @@ protected WebSocketClientOptions populateClientOptions() { Optional maybeTlsConfiguration = TlsConfiguration.from(tlsConfigurationRegistry, config.tlsConfigurationName()); if (maybeTlsConfiguration.isPresent()) { - clientOptions.setSsl(true); - - TlsConfiguration tlsConfiguration = maybeTlsConfiguration.get(); - if (tlsConfiguration.getTrustStoreOptions() != null) { - clientOptions.setTrustOptions(tlsConfiguration.getTrustStoreOptions()); - } - - // For mTLS: - if (tlsConfiguration.getKeyStoreOptions() != null) { - clientOptions.setKeyCertOptions(tlsConfiguration.getKeyStoreOptions()); - } - - if (tlsConfiguration.isTrustAll()) { - clientOptions.setTrustAll(true); - } - if (tlsConfiguration.getHostnameVerificationAlgorithm().isPresent() - && tlsConfiguration.getHostnameVerificationAlgorithm().get().equals("NONE")) { - // Only disable hostname verification if the algorithm is explicitly set to NONE - clientOptions.setVerifyHost(false); - } - - SSLOptions sslOptions = tlsConfiguration.getSSLOptions(); - if (sslOptions != null) { - clientOptions.setSslHandshakeTimeout(sslOptions.getSslHandshakeTimeout()); - clientOptions.setSslHandshakeTimeoutUnit(sslOptions.getSslHandshakeTimeoutUnit()); - for (String suite : sslOptions.getEnabledCipherSuites()) { - clientOptions.addEnabledCipherSuite(suite); - } - for (Buffer buffer : sslOptions.getCrlValues()) { - clientOptions.addCrlValue(buffer); - } - clientOptions.setEnabledSecureTransportProtocols(sslOptions.getEnabledSecureTransportProtocols()); - clientOptions.setUseAlpn(sslOptions.isUseAlpn()); - } + TlsConfigUtils.configure(clientOptions, maybeTlsConfiguration.get()); } return clientOptions; } From 9539593b5ce3583e00834ba39f6ff08a12bce766 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Wed, 17 Jul 2024 15:53:53 +0300 Subject: [PATCH 2/4] TLS registry config for messaging extensions --- bom/application/pom.xml | 2 +- extensions/kafka-client/deployment/pom.xml | 4 +- .../client/deployment/KafkaProcessor.java | 3 + extensions/kafka-client/runtime/pom.xml | 2 +- .../runtime/KafkaRuntimeConfigProducer.java | 4 + .../tls/QuarkusKafkaSslEngineFactory.java | 130 ++++++++++++++++++ .../deployment/pom.xml | 4 + ...mallRyeReactiveMessagingAmqpProcessor.java | 9 ++ .../runtime/pom.xml | 4 + .../runtime/AmqpClientConfigCustomizer.java | 38 +++++ ...allRyeReactiveMessagingKafkaProcessor.java | 5 +- .../kafka/KafkaConfigCustomizer.java | 24 ++++ .../deployment/pom.xml | 4 + ...mallRyeReactiveMessagingMqttProcessor.java | 10 ++ .../runtime/pom.xml | 4 + .../runtime/MqttClientConfigCustomizer.java | 38 +++++ .../deployment/pom.xml | 4 + .../runtime/pom.xml | 4 + .../RabbitmqClientConfigCustomizer.java | 38 +++++ .../it/kafka/ssl/SslKafkaEndpoint.java | 27 +--- .../src/main/resources/application.properties | 11 ++ .../it/kafka/KafkaSSLTestResource.java | 9 +- .../it/kafka/SslJksKafkaConsumerITCase.java | 13 ++ .../it/kafka/SslJksKafkaConsumerTest.java | 25 ++++ .../it/kafka/SslKafkaConsumerITCase.java | 10 -- .../it/kafka/SslKafkaConsumerTest.java | 51 +++---- .../kafka/SslPKCS12KafkaConsumerITCase.java | 13 ++ .../it/kafka/SslPKCS12KafkaConsumerTest.java | 25 ++++ .../it/kafka/SslPemKafkaConsumerITCase.java | 13 ++ .../it/kafka/SslPemKafkaConsumerTest.java | 25 ++++ 30 files changed, 483 insertions(+), 70 deletions(-) create mode 100644 extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java create mode 100644 extensions/smallrye-reactive-messaging-amqp/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/amqp/runtime/AmqpClientConfigCustomizer.java create mode 100644 extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/KafkaConfigCustomizer.java create mode 100644 extensions/smallrye-reactive-messaging-mqtt/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/mqtt/runtime/MqttClientConfigCustomizer.java create mode 100644 extensions/smallrye-reactive-messaging-rabbitmq/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/rabbitmq/runtime/RabbitmqClientConfigCustomizer.java create mode 100644 integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslJksKafkaConsumerITCase.java create mode 100644 integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslJksKafkaConsumerTest.java delete mode 100644 integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerITCase.java create mode 100644 integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPKCS12KafkaConsumerITCase.java create mode 100644 integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPKCS12KafkaConsumerTest.java create mode 100644 integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPemKafkaConsumerITCase.java create mode 100644 integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPemKafkaConsumerTest.java diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 5471e46431630..43b8ab451360b 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -62,7 +62,7 @@ 1.0.13 3.0.1 3.14.0 - 4.23.0 + 4.24.0 2.6.0 2.1.3 3.0.0 diff --git a/extensions/kafka-client/deployment/pom.xml b/extensions/kafka-client/deployment/pom.xml index 83ff1869de99b..9329e0e43e0ed 100644 --- a/extensions/kafka-client/deployment/pom.xml +++ b/extensions/kafka-client/deployment/pom.xml @@ -42,9 +42,9 @@ io.quarkus - quarkus-caffeine-deployment + quarkus-tls-registry-deployment - + org.testcontainers testcontainers diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index 295c223570ee9..d80337f11a013 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -104,6 +104,7 @@ import io.quarkus.kafka.client.serialization.JsonbSerializer; import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer; import io.quarkus.kafka.client.serialization.ObjectMapperSerializer; +import io.quarkus.kafka.client.tls.QuarkusKafkaSslEngineFactory; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; public class KafkaProcessor { @@ -280,6 +281,8 @@ public void build( handleAvro(reflectiveClass, proxies, serviceProviders, sslNativeSupport, capabilities); + reflectiveClass.produce( + ReflectiveClassBuildItem.builder(QuarkusKafkaSslEngineFactory.class).build()); } @BuildStep(onlyIf = { HasSnappy.class, NativeOrNativeSourcesBuild.class }) diff --git a/extensions/kafka-client/runtime/pom.xml b/extensions/kafka-client/runtime/pom.xml index c4e6c311c7832..5162715561574 100644 --- a/extensions/kafka-client/runtime/pom.xml +++ b/extensions/kafka-client/runtime/pom.xml @@ -45,7 +45,7 @@ io.quarkus - quarkus-caffeine + quarkus-tls-registry diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java index ba0f407ea5dec..7490541fc174d 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java @@ -9,6 +9,7 @@ import org.eclipse.microprofile.config.Config; import io.quarkus.arc.DefaultBean; +import io.quarkus.kafka.client.tls.QuarkusKafkaSslEngineFactory; import io.quarkus.runtime.ApplicationConfig; import io.smallrye.common.annotation.Identifier; @@ -44,6 +45,9 @@ public Map createKafkaRuntimeConfig(Config config, ApplicationCo .replace("_", "."); String value = config.getOptionalValue(propertyName, String.class).orElse(""); result.put(effectivePropertyName, value); + if (effectivePropertyName.equals("tls-configuration-name")) { + result.put("ssl.engine.factory.class", QuarkusKafkaSslEngineFactory.class.getName()); + } } if (!result.isEmpty() && !result.containsKey(GROUP_ID) && app.name.isPresent()) { diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java new file mode 100644 index 0000000000000..42aab3be3e781 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/tls/QuarkusKafkaSslEngineFactory.java @@ -0,0 +1,130 @@ +package io.quarkus.kafka.client.tls; + +import java.io.IOException; +import java.security.KeyStore; +import java.util.Map; +import java.util.Set; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.spi.CDI; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SslEngineFactory; +import org.jboss.logging.Logger; + +import io.quarkus.tls.TlsConfiguration; +import io.quarkus.tls.TlsConfigurationRegistry; + +public class QuarkusKafkaSslEngineFactory implements SslEngineFactory { + + private static final Logger log = Logger.getLogger(QuarkusKafkaSslEngineFactory.class); + + /** + * Omits 'ssl.endpoint.identification.algorithm' because it is set by the user and it is not ignored + */ + private static final Set KAFKA_SSL_CONFIGS = Set.of( + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, + SslConfigs.SSL_KEY_PASSWORD_CONFIG, + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, + SslConfigs.SSL_KEYSTORE_KEY_CONFIG, + SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, + SslConfigs.SSL_PROTOCOL_CONFIG, + SslConfigs.SSL_PROVIDER_CONFIG, + SslConfigs.SSL_CIPHER_SUITES_CONFIG, + SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, + SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, + SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, + SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG); + + private TlsConfiguration configuration; + private SSLContext sslContext; + + @Override + public SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification) { + SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort); + sslEngine.setUseClientMode(true); + SSLParameters sslParameters = sslEngine.getSSLParameters(); + sslParameters.setEndpointIdentificationAlgorithm(endpointIdentification); + sslEngine.setSSLParameters(sslParameters); + return sslEngine; + } + + @Override + public SSLEngine createServerSslEngine(String peerHost, int peerPort) { + throw new IllegalStateException("Server mode is not supported"); + } + + @Override + public boolean shouldBeRebuilt(Map nextConfigs) { + return false; + } + + @Override + public Set reconfigurableConfigs() { + return Set.of(); + } + + @Override + public KeyStore keystore() { + return configuration.getKeyStore(); + } + + @Override + public KeyStore truststore() { + return configuration.getTrustStore(); + } + + @Override + public void close() throws IOException { + this.sslContext = null; + this.configuration = null; + } + + @Override + public void configure(Map configs) { + String tlsConfigName = (String) configs.get("tls-configuration-name"); + + Instance tlsConfig = CDI.current().getBeanManager().createInstance() + .select(TlsConfigurationRegistry.class); + if (!tlsConfig.isUnsatisfied()) { + TlsConfigurationRegistry registry = tlsConfig.get(); + configuration = registry.get(tlsConfigName) + .orElseThrow(() -> new IllegalArgumentException("No TLS configuration found for name " + tlsConfigName)); + try { + sslContext = configuration.createSSLContext(); + } catch (Exception e) { + throw new RuntimeException("Failed to create SSLContext", e); + } + String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG); + log.debugf("Configured Kafka client '%s' QuarkusKafkaSslEngineFactory with TLS configuration : %s", + clientId, tlsConfigName); + } + + } + + /** + * Check if any SSL configuration is set for the Kafka client that will be ignored because the TLS configuration is set + * + * @param configs the Kafka client configuration + */ + public static void checkForOtherSslConfigs(Map configs) { + String tlsConfigName = (String) configs.get("tls-configuration-name"); + for (String sslConfig : KAFKA_SSL_CONFIGS) { + if (configs.containsKey(sslConfig)) { + log.warnf( + "The SSL configuration '%s' is set for Kafka client '%s' but it will be ignored because the TLS configuration '%s' is set", + sslConfig, configs.get(CommonClientConfigs.CLIENT_ID_CONFIG), tlsConfigName); + } + } + } +} diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/pom.xml b/extensions/smallrye-reactive-messaging-amqp/deployment/pom.xml index 69691de56ac95..7924dd5684e4e 100644 --- a/extensions/smallrye-reactive-messaging-amqp/deployment/pom.xml +++ b/extensions/smallrye-reactive-messaging-amqp/deployment/pom.xml @@ -38,6 +38,10 @@ io.quarkus quarkus-jackson-deployment + + io.quarkus + quarkus-tls-registry-deployment + io.quarkus quarkus-junit5-internal diff --git a/extensions/smallrye-reactive-messaging-amqp/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/amqp/deployment/SmallRyeReactiveMessagingAmqpProcessor.java b/extensions/smallrye-reactive-messaging-amqp/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/amqp/deployment/SmallRyeReactiveMessagingAmqpProcessor.java index e833218bad8ef..7c4be9cefeb9f 100644 --- a/extensions/smallrye-reactive-messaging-amqp/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/amqp/deployment/SmallRyeReactiveMessagingAmqpProcessor.java +++ b/extensions/smallrye-reactive-messaging-amqp/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/amqp/deployment/SmallRyeReactiveMessagingAmqpProcessor.java @@ -1,8 +1,10 @@ package io.quarkus.smallrye.reactivemessaging.amqp.deployment; +import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.deployment.Feature; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.smallrye.reactivemessaging.amqp.runtime.AmqpClientConfigCustomizer; public class SmallRyeReactiveMessagingAmqpProcessor { @@ -11,4 +13,11 @@ FeatureBuildItem feature() { return new FeatureBuildItem(Feature.MESSAGING_AMQP); } + @BuildStep + AdditionalBeanBuildItem build() { + return AdditionalBeanBuildItem.builder() + .addBeanClass(AmqpClientConfigCustomizer.class) + .setUnremovable() + .build(); + } } diff --git a/extensions/smallrye-reactive-messaging-amqp/runtime/pom.xml b/extensions/smallrye-reactive-messaging-amqp/runtime/pom.xml index 4ca01fc3f0219..a86915d0d5a7e 100644 --- a/extensions/smallrye-reactive-messaging-amqp/runtime/pom.xml +++ b/extensions/smallrye-reactive-messaging-amqp/runtime/pom.xml @@ -34,6 +34,10 @@ io.quarkus quarkus-jackson + + io.quarkus + quarkus-tls-registry + io.smallrye.reactive smallrye-reactive-messaging-provider diff --git a/extensions/smallrye-reactive-messaging-amqp/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/amqp/runtime/AmqpClientConfigCustomizer.java b/extensions/smallrye-reactive-messaging-amqp/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/amqp/runtime/AmqpClientConfigCustomizer.java new file mode 100644 index 0000000000000..361985914f193 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-amqp/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/amqp/runtime/AmqpClientConfigCustomizer.java @@ -0,0 +1,38 @@ +package io.quarkus.smallrye.reactivemessaging.amqp.runtime; + +import java.util.Optional; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.Config; +import org.jboss.logging.Logger; + +import io.quarkus.tls.TlsConfiguration; +import io.quarkus.tls.TlsConfigurationRegistry; +import io.quarkus.tls.runtime.config.TlsConfigUtils; +import io.smallrye.reactive.messaging.ClientCustomizer; +import io.vertx.amqp.AmqpClientOptions; + +@ApplicationScoped +public class AmqpClientConfigCustomizer implements ClientCustomizer { + + private static final Logger log = Logger.getLogger(AmqpClientConfigCustomizer.class); + + @Inject + TlsConfigurationRegistry tlsRegistry; + + @Override + public AmqpClientOptions customize(String channel, Config channelConfig, AmqpClientOptions options) { + Optional tlsConfigName = channelConfig.getOptionalValue("tls-configuration-name", String.class); + if (tlsConfigName.isPresent()) { + String tlsConfig = tlsConfigName.get(); + Optional maybeTlsConfig = tlsRegistry.get(tlsConfig); + if (maybeTlsConfig.isPresent()) { + TlsConfigUtils.configure(options, maybeTlsConfig.get()); + log.debugf("Configured RabbitMQOptions for channel %s with TLS configuration %s", channel, tlsConfig); + } + } + return options; + } +} diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 16aefe55c77b4..d385bc9aec825 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -46,6 +46,7 @@ import io.quarkus.smallrye.reactivemessaging.kafka.DatabindProcessingStateCodec; import io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore; import io.quarkus.smallrye.reactivemessaging.kafka.HibernateReactiveStateStore; +import io.quarkus.smallrye.reactivemessaging.kafka.KafkaConfigCustomizer; import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig; import io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore; import io.smallrye.mutiny.tuples.Functions.TriConsumer; @@ -68,8 +69,10 @@ FeatureBuildItem feature() { } @BuildStep - public void build(BuildProducer reflectiveClass) { + public void build(BuildProducer reflectiveClass, + BuildProducer additionalBean) { reflectiveClass.produce(ReflectiveClassBuildItem.builder(ProcessingState.class).methods().fields().build()); + additionalBean.produce(AdditionalBeanBuildItem.unremovableOf(KafkaConfigCustomizer.class)); } @BuildStep diff --git a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/KafkaConfigCustomizer.java b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/KafkaConfigCustomizer.java new file mode 100644 index 0000000000000..651714d707bf9 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/KafkaConfigCustomizer.java @@ -0,0 +1,24 @@ +package io.quarkus.smallrye.reactivemessaging.kafka; + +import java.util.Map; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.config.Config; + +import io.quarkus.kafka.client.tls.QuarkusKafkaSslEngineFactory; +import io.smallrye.reactive.messaging.ClientCustomizer; + +@ApplicationScoped +public class KafkaConfigCustomizer implements ClientCustomizer> { + + @Override + public Map customize(String channel, Config channelConfig, Map config) { + // TODO verify other ssl properties + if (config.containsKey("tls-configuration-name")) { + QuarkusKafkaSslEngineFactory.checkForOtherSslConfigs(config); + config.put("ssl.engine.factory.class", QuarkusKafkaSslEngineFactory.class.getName()); + } + return config; + } +} diff --git a/extensions/smallrye-reactive-messaging-mqtt/deployment/pom.xml b/extensions/smallrye-reactive-messaging-mqtt/deployment/pom.xml index 9c5788bf52222..6a7e072646ff0 100644 --- a/extensions/smallrye-reactive-messaging-mqtt/deployment/pom.xml +++ b/extensions/smallrye-reactive-messaging-mqtt/deployment/pom.xml @@ -33,6 +33,10 @@ io.quarkus quarkus-vertx-deployment + + io.quarkus + quarkus-tls-registry-deployment + io.quarkus quarkus-devservices-deployment diff --git a/extensions/smallrye-reactive-messaging-mqtt/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/mqtt/deployment/SmallRyeReactiveMessagingMqttProcessor.java b/extensions/smallrye-reactive-messaging-mqtt/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/mqtt/deployment/SmallRyeReactiveMessagingMqttProcessor.java index 7241ee579c0db..cf33e2b873d6a 100644 --- a/extensions/smallrye-reactive-messaging-mqtt/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/mqtt/deployment/SmallRyeReactiveMessagingMqttProcessor.java +++ b/extensions/smallrye-reactive-messaging-mqtt/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/mqtt/deployment/SmallRyeReactiveMessagingMqttProcessor.java @@ -1,12 +1,22 @@ package io.quarkus.smallrye.reactivemessaging.mqtt.deployment; +import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.deployment.Feature; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.smallrye.reactivemessaging.mqtt.runtime.MqttClientConfigCustomizer; public class SmallRyeReactiveMessagingMqttProcessor { @BuildStep FeatureBuildItem feature() { return new FeatureBuildItem(Feature.MESSAGING_MQTT); } + + @BuildStep + AdditionalBeanBuildItem build() { + return AdditionalBeanBuildItem.builder() + .addBeanClass(MqttClientConfigCustomizer.class) + .setUnremovable() + .build(); + } } diff --git a/extensions/smallrye-reactive-messaging-mqtt/runtime/pom.xml b/extensions/smallrye-reactive-messaging-mqtt/runtime/pom.xml index 527a0b524f013..8fa79a139df82 100644 --- a/extensions/smallrye-reactive-messaging-mqtt/runtime/pom.xml +++ b/extensions/smallrye-reactive-messaging-mqtt/runtime/pom.xml @@ -30,6 +30,10 @@ io.quarkus quarkus-vertx + + io.quarkus + quarkus-tls-registry + io.smallrye.reactive smallrye-reactive-messaging-provider diff --git a/extensions/smallrye-reactive-messaging-mqtt/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/mqtt/runtime/MqttClientConfigCustomizer.java b/extensions/smallrye-reactive-messaging-mqtt/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/mqtt/runtime/MqttClientConfigCustomizer.java new file mode 100644 index 0000000000000..ba96fc45a0fc9 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-mqtt/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/mqtt/runtime/MqttClientConfigCustomizer.java @@ -0,0 +1,38 @@ +package io.quarkus.smallrye.reactivemessaging.mqtt.runtime; + +import java.util.Optional; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.Config; +import org.jboss.logging.Logger; + +import io.quarkus.tls.TlsConfiguration; +import io.quarkus.tls.TlsConfigurationRegistry; +import io.quarkus.tls.runtime.config.TlsConfigUtils; +import io.smallrye.reactive.messaging.ClientCustomizer; +import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; + +@ApplicationScoped +public class MqttClientConfigCustomizer implements ClientCustomizer { + + private static final Logger log = Logger.getLogger(MqttClientConfigCustomizer.class); + + @Inject + TlsConfigurationRegistry tlsRegistry; + + @Override + public MqttClientSessionOptions customize(String channel, Config channelConfig, MqttClientSessionOptions options) { + Optional tlsConfigName = channelConfig.getOptionalValue("tls-configuration-name", String.class); + if (tlsConfigName.isPresent()) { + String tlsConfig = tlsConfigName.get(); + Optional maybeTlsConfig = tlsRegistry.get(tlsConfig); + if (maybeTlsConfig.isPresent()) { + TlsConfigUtils.configure(options, maybeTlsConfig.get()); + log.debugf("Configured MqttClientSessionOptions for channel %s with TLS configuration %s", channel, tlsConfig); + } + } + return options; + } +} diff --git a/extensions/smallrye-reactive-messaging-rabbitmq/deployment/pom.xml b/extensions/smallrye-reactive-messaging-rabbitmq/deployment/pom.xml index 4114737367326..333f46d4c14ed 100644 --- a/extensions/smallrye-reactive-messaging-rabbitmq/deployment/pom.xml +++ b/extensions/smallrye-reactive-messaging-rabbitmq/deployment/pom.xml @@ -38,6 +38,10 @@ io.quarkus quarkus-credentials-deployment + + io.quarkus + quarkus-tls-registry-deployment + io.quarkus quarkus-vertx-http-dev-ui-spi diff --git a/extensions/smallrye-reactive-messaging-rabbitmq/runtime/pom.xml b/extensions/smallrye-reactive-messaging-rabbitmq/runtime/pom.xml index f2bc1771b142e..120da89852860 100644 --- a/extensions/smallrye-reactive-messaging-rabbitmq/runtime/pom.xml +++ b/extensions/smallrye-reactive-messaging-rabbitmq/runtime/pom.xml @@ -34,6 +34,10 @@ io.quarkus quarkus-credentials + + io.quarkus + quarkus-tls-registry + io.quarkus quarkus-jackson diff --git a/extensions/smallrye-reactive-messaging-rabbitmq/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/rabbitmq/runtime/RabbitmqClientConfigCustomizer.java b/extensions/smallrye-reactive-messaging-rabbitmq/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/rabbitmq/runtime/RabbitmqClientConfigCustomizer.java new file mode 100644 index 0000000000000..f2d794ff4cde3 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-rabbitmq/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/rabbitmq/runtime/RabbitmqClientConfigCustomizer.java @@ -0,0 +1,38 @@ +package io.quarkus.smallrye.reactivemessaging.rabbitmq.runtime; + +import java.util.Optional; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.Config; +import org.jboss.logging.Logger; + +import io.quarkus.tls.TlsConfiguration; +import io.quarkus.tls.TlsConfigurationRegistry; +import io.quarkus.tls.runtime.config.TlsConfigUtils; +import io.smallrye.reactive.messaging.ClientCustomizer; +import io.vertx.rabbitmq.RabbitMQOptions; + +@ApplicationScoped +public class RabbitmqClientConfigCustomizer implements ClientCustomizer { + + private static final Logger log = Logger.getLogger(RabbitmqClientConfigCustomizer.class); + + @Inject + TlsConfigurationRegistry tlsRegistry; + + @Override + public RabbitMQOptions customize(String channel, Config channelConfig, RabbitMQOptions options) { + Optional tlsConfigName = channelConfig.getOptionalValue("tls-configuration-name", String.class); + if (tlsConfigName.isPresent()) { + String tlsConfig = tlsConfigName.get(); + Optional maybeTlsConfig = tlsRegistry.get(tlsConfig); + if (maybeTlsConfig.isPresent()) { + TlsConfigUtils.configure(options, maybeTlsConfig.get()); + log.debugf("Configured RabbitMQOptions for channel %s with TLS configuration %s", channel, tlsConfig); + } + } + return options; + } +} diff --git a/integration-tests/kafka-ssl/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java b/integration-tests/kafka-ssl/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java index ab872e9ee0ac0..8f1f00f7df085 100644 --- a/integration-tests/kafka-ssl/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java +++ b/integration-tests/kafka-ssl/src/main/java/io/quarkus/it/kafka/ssl/SslKafkaEndpoint.java @@ -1,8 +1,8 @@ package io.quarkus.it.kafka.ssl; -import java.io.File; import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.Properties; import jakarta.inject.Inject; @@ -10,15 +10,14 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.QueryParam; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.eclipse.microprofile.config.Config; + +import io.smallrye.common.annotation.Identifier; /** * Endpoint to check the SSL connection. @@ -27,7 +26,8 @@ public class SslKafkaEndpoint { @Inject - Config config; + @Identifier("default-kafka-broker") + Map kafkaConfig; @GET public String get(@QueryParam("format") CertificateFormat format) { @@ -42,28 +42,13 @@ public String get(@QueryParam("format") CertificateFormat format) { public KafkaConsumer createConsumer(CertificateFormat format) { Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getValue("kafka.bootstrap.servers", String.class)); + props.putAll(kafkaConfig); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - String truststore = switch (format) { - case PKCS12 -> "kafka-truststore.p12"; - case JKS -> "kafka-truststore.jks"; - case PEM -> "kafka-ca.crt"; - }; - - File tsFile = new File(config.getValue("ssl-dir", String.class), truststore); - props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); - props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsFile.getPath()); - if (format != CertificateFormat.PEM) { - props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"); - } - props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, format.name()); - props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-ssl-consumer")); return consumer; diff --git a/integration-tests/kafka-ssl/src/main/resources/application.properties b/integration-tests/kafka-ssl/src/main/resources/application.properties index c0f22a629a999..6513c3ba49a7b 100644 --- a/integration-tests/kafka-ssl/src/main/resources/application.properties +++ b/integration-tests/kafka-ssl/src/main/resources/application.properties @@ -7,3 +7,14 @@ quarkus.kafka.health.enabled=true # using QuarkusTestResourceLifecycleManager in this test quarkus.kafka.devservices.enabled=false + +kafka.security.protocol=ssl +#kafka.tls-configuration-name=set-by-test-resource + +quarkus.tls.custom-p12.trust-store.p12.path=target/certs/kafka-truststore.p12 +quarkus.tls.custom-p12.trust-store.p12.password=Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L + +quarkus.tls.custom-pem.trust-store.pem.certs=target/certs/kafka.crt,target/certs/kafka-ca.crt + +quarkus.tls.custom-jks.trust-store.jks.path=target/certs/kafka-truststore.jks +quarkus.tls.custom-jks.trust-store.jks.password=Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L diff --git a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java index 429c927722cc2..24b6a1ca7dff6 100644 --- a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java +++ b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/KafkaSSLTestResource.java @@ -21,13 +21,20 @@ public class KafkaSSLTestResource implements QuarkusTestResourceLifecycleManager .withCopyFileToContainer(MountableFile.forHostPath("target/certs/kafka-truststore.p12"), "/opt/kafka/config/kafka-truststore.p12"); + private Map initProps; + + @Override + public void init(Map initArgs) { + initProps = initArgs; + } + @Override public Map start() { kafka.start(); // Used by the test System.setProperty("bootstrap.servers", kafka.getBootstrapServers()); // Used by the application - Map properties = new HashMap<>(); + Map properties = new HashMap<>(initProps); properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); properties.put("ssl-dir", new File("target/certs").getAbsolutePath()); diff --git a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslJksKafkaConsumerITCase.java b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslJksKafkaConsumerITCase.java new file mode 100644 index 0000000000000..8c94592a57aa3 --- /dev/null +++ b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslJksKafkaConsumerITCase.java @@ -0,0 +1,13 @@ +package io.quarkus.it.kafka; + +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.common.WithTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +@WithTestResource(value = KafkaSSLTestResource.class, initArgs = { + @ResourceArg(name = "kafka.tls-configuration-name", value = "custom-jks") +}) +public class SslJksKafkaConsumerITCase extends SslJksKafkaConsumerTest { + +} diff --git a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslJksKafkaConsumerTest.java b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslJksKafkaConsumerTest.java new file mode 100644 index 0000000000000..4496cc9b99f19 --- /dev/null +++ b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslJksKafkaConsumerTest.java @@ -0,0 +1,25 @@ +package io.quarkus.it.kafka; + +import io.quarkus.it.kafka.ssl.CertificateFormat; +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.common.WithTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.smallrye.certs.Format; +import io.smallrye.certs.junit5.Certificate; +import io.smallrye.certs.junit5.Certificates; + +@Certificates(certificates = { + @Certificate(name = "kafka", formats = { Format.PKCS12, Format.JKS, + Format.PEM }, password = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L") +}, baseDir = "target/certs") +@QuarkusTest +@WithTestResource(value = KafkaSSLTestResource.class, initArgs = { + @ResourceArg(name = "kafka.tls-configuration-name", value = "custom-jks") +}) +public class SslJksKafkaConsumerTest extends SslKafkaConsumerTest { + + @Override + public CertificateFormat getFormat() { + return CertificateFormat.JKS; + } +} diff --git a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerITCase.java b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerITCase.java deleted file mode 100644 index 88c27cfdd99f2..0000000000000 --- a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerITCase.java +++ /dev/null @@ -1,10 +0,0 @@ -package io.quarkus.it.kafka; - -import io.quarkus.test.common.WithTestResource; -import io.quarkus.test.junit.QuarkusIntegrationTest; - -@QuarkusIntegrationTest -@WithTestResource(value = KafkaSSLTestResource.class, restrictToAnnotatedClass = false) -public class SslKafkaConsumerITCase extends SslKafkaConsumerTest { - -} diff --git a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java index 188761217d32d..450ef4468b450 100644 --- a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java +++ b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslKafkaConsumerTest.java @@ -12,24 +12,27 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.api.Test; import io.quarkus.it.kafka.ssl.CertificateFormat; -import io.quarkus.test.common.WithTestResource; -import io.quarkus.test.junit.QuarkusTest; import io.restassured.RestAssured; -import io.smallrye.certs.Format; -import io.smallrye.certs.junit5.Certificate; -import io.smallrye.certs.junit5.Certificates; -@Certificates(certificates = { - @Certificate(name = "kafka", formats = { Format.PKCS12, Format.JKS, - Format.PEM }, password = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L") -}, baseDir = "target/certs") -@QuarkusTest -@WithTestResource(value = KafkaSSLTestResource.class, restrictToAnnotatedClass = false) -public class SslKafkaConsumerTest { +public abstract class SslKafkaConsumerTest { + + public abstract CertificateFormat getFormat(); + + @Test + public void testReception() { + String format = getFormat().name(); + try (Producer producer = createProducer(CertificateFormat.valueOf(format))) { + producer.send(new ProducerRecord<>("test-ssl-consumer", 1, "hi world")); + String string = RestAssured + .given().queryParam("format", format) + .when().get("/ssl") + .andReturn().asString(); + Assertions.assertEquals("hi world", string); + } + } public static Producer createProducer(CertificateFormat format) { Properties props = new Properties(); @@ -41,7 +44,7 @@ public static Producer createProducer(CertificateFormat format) String truststore = switch (format) { case PKCS12 -> "kafka-truststore.p12"; case JKS -> "kafka-truststore.jks"; - case PEM -> "kafka-ca.crt"; + case PEM -> "kafka.crt"; }; File tsFile = new File("target/certs/" + truststore); @@ -55,22 +58,4 @@ public static Producer createProducer(CertificateFormat format) return new KafkaProducer<>(props); } - - @ParameterizedTest - @CsvSource({ - "PKCS12", - "JKS", - "PEM" - }) - public void testReception(String format) { - try (Producer producer = createProducer(CertificateFormat.valueOf(format))) { - producer.send(new ProducerRecord<>("test-ssl-consumer", 1, "hi world")); - String string = RestAssured - .given().queryParam("format", format) - .when().get("/ssl") - .andReturn().asString(); - Assertions.assertEquals("hi world", string); - } - } - } diff --git a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPKCS12KafkaConsumerITCase.java b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPKCS12KafkaConsumerITCase.java new file mode 100644 index 0000000000000..d8f4bd47503fe --- /dev/null +++ b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPKCS12KafkaConsumerITCase.java @@ -0,0 +1,13 @@ +package io.quarkus.it.kafka; + +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.common.WithTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +@WithTestResource(value = KafkaSSLTestResource.class, initArgs = { + @ResourceArg(name = "kafka.tls-configuration-name", value = "custom-p12") +}) +public class SslPKCS12KafkaConsumerITCase extends SslPKCS12KafkaConsumerTest { + +} diff --git a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPKCS12KafkaConsumerTest.java b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPKCS12KafkaConsumerTest.java new file mode 100644 index 0000000000000..22baaf6c182e6 --- /dev/null +++ b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPKCS12KafkaConsumerTest.java @@ -0,0 +1,25 @@ +package io.quarkus.it.kafka; + +import io.quarkus.it.kafka.ssl.CertificateFormat; +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.common.WithTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.smallrye.certs.Format; +import io.smallrye.certs.junit5.Certificate; +import io.smallrye.certs.junit5.Certificates; + +@Certificates(certificates = { + @Certificate(name = "kafka", formats = { Format.PKCS12, Format.JKS, + Format.PEM }, password = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L") +}, baseDir = "target/certs") +@QuarkusTest +@WithTestResource(value = KafkaSSLTestResource.class, initArgs = { + @ResourceArg(name = "kafka.tls-configuration-name", value = "custom-p12") +}) +public class SslPKCS12KafkaConsumerTest extends SslKafkaConsumerTest { + + @Override + public CertificateFormat getFormat() { + return CertificateFormat.PKCS12; + } +} diff --git a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPemKafkaConsumerITCase.java b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPemKafkaConsumerITCase.java new file mode 100644 index 0000000000000..4c620d062c4f3 --- /dev/null +++ b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPemKafkaConsumerITCase.java @@ -0,0 +1,13 @@ +package io.quarkus.it.kafka; + +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.common.WithTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +@WithTestResource(value = KafkaSSLTestResource.class, initArgs = { + @ResourceArg(name = "kafka.tls-configuration-name", value = "custom-pem") +}) +public class SslPemKafkaConsumerITCase extends SslPemKafkaConsumerTest { + +} diff --git a/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPemKafkaConsumerTest.java b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPemKafkaConsumerTest.java new file mode 100644 index 0000000000000..2e14f29ae1cda --- /dev/null +++ b/integration-tests/kafka-ssl/src/test/java/io/quarkus/it/kafka/SslPemKafkaConsumerTest.java @@ -0,0 +1,25 @@ +package io.quarkus.it.kafka; + +import io.quarkus.it.kafka.ssl.CertificateFormat; +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.common.WithTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.smallrye.certs.Format; +import io.smallrye.certs.junit5.Certificate; +import io.smallrye.certs.junit5.Certificates; + +@Certificates(certificates = { + @Certificate(name = "kafka", formats = { Format.PKCS12, Format.JKS, + Format.PEM }, password = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L") +}, baseDir = "target/certs") +@QuarkusTest +@WithTestResource(value = KafkaSSLTestResource.class, initArgs = { + @ResourceArg(name = "kafka.tls-configuration-name", value = "custom-pem") +}) +public class SslPemKafkaConsumerTest extends SslKafkaConsumerTest { + + @Override + public CertificateFormat getFormat() { + return CertificateFormat.PEM; + } +} From 8da7f12638b236723cee675d300f4ff77888b30e Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 25 Jul 2024 08:14:26 +0300 Subject: [PATCH 3/4] Bump pulsar devservice image to 3.3.0 --- .../reactivemessaging/pulsar/deployment/PulsarContainer.java | 2 +- .../pulsar/deployment/PulsarDevServicesBuildTimeConfig.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarContainer.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarContainer.java index 1bdd4b93b94b5..248b8723c6839 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarContainer.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarContainer.java @@ -13,7 +13,7 @@ public class PulsarContainer extends GenericContainer { - public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.0.0"); + public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.3.0"); public static final String STARTER_SCRIPT = "/run_pulsar.sh"; diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarDevServicesBuildTimeConfig.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarDevServicesBuildTimeConfig.java index b6c59449488b4..0e5169104cd22 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarDevServicesBuildTimeConfig.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarDevServicesBuildTimeConfig.java @@ -34,7 +34,7 @@ public class PulsarDevServicesBuildTimeConfig { * * Check https://hub.docker.com/r/apachepulsar/pulsar to find the available versions. */ - @ConfigItem(defaultValue = "apachepulsar/pulsar:3.0.0") + @ConfigItem(defaultValue = "apachepulsar/pulsar:3.3.0") public String imageName; /** From 4ddfb601060b688d364a8c99880160675c0e52e4 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Wed, 17 Jul 2024 15:53:53 +0300 Subject: [PATCH 4/4] TLS registry config for pulsar extension Disabled message encryption in pulsar IT, as it seems incompatible with client TLS configuration --- .../deployment/pom.xml | 4 + ...llRyeReactiveMessagingPulsarProcessor.java | 2 + .../runtime/pom.xml | 4 + .../pulsar/PulsarClientConfigCustomizer.java | 74 ++++++++++ .../QuarkusPulsarKeyStoreAuthentication.java | 127 ++++++++++++++++++ .../reactive-messaging-pulsar/pom.xml | 39 ++++++ .../io/quarkus/it/pulsar/PulsarConfig.java | 74 ---------- .../src/main/resources/application.properties | 18 +++ .../it/pulsar/JksPulsarConnectorIT.java | 8 ++ .../it/pulsar/JksPulsarConnectorTest.java | 33 +++++ .../it/pulsar/PKCS12PulsarConnectorIT.java | 8 ++ .../it/pulsar/PKCS12PulsarConnectorTest.java | 33 +++++ .../it/pulsar/PemPulsarConnectorIT.java | 8 ++ .../it/pulsar/PemPulsarConnectorTest.java | 33 +++++ .../it/pulsar/PulsarConnectorTest.java | 2 + .../io/quarkus/it/pulsar/PulsarContainer.java | 93 +++++++++++++ .../io/quarkus/it/pulsar/PulsarResource.java | 122 +++++++++++++++++ 17 files changed, 608 insertions(+), 74 deletions(-) create mode 100644 extensions/smallrye-reactive-messaging-pulsar/runtime/src/main/java/io/quarkus/pulsar/PulsarClientConfigCustomizer.java create mode 100644 extensions/smallrye-reactive-messaging-pulsar/runtime/src/main/java/io/quarkus/pulsar/QuarkusPulsarKeyStoreAuthentication.java delete mode 100644 integration-tests/reactive-messaging-pulsar/src/main/java/io/quarkus/it/pulsar/PulsarConfig.java create mode 100644 integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/JksPulsarConnectorIT.java create mode 100644 integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/JksPulsarConnectorTest.java create mode 100644 integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PKCS12PulsarConnectorIT.java create mode 100644 integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PKCS12PulsarConnectorTest.java create mode 100644 integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PemPulsarConnectorIT.java create mode 100644 integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PemPulsarConnectorTest.java create mode 100644 integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarContainer.java create mode 100644 integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarResource.java diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/pom.xml b/extensions/smallrye-reactive-messaging-pulsar/deployment/pom.xml index 1b459de02cf68..7fe0775136ff7 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/pom.xml +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/pom.xml @@ -46,6 +46,10 @@ io.quarkus quarkus-devservices-deployment + + io.quarkus + quarkus-tls-registry-deployment + io.quarkus quarkus-junit5-internal diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/SmallRyeReactiveMessagingPulsarProcessor.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/SmallRyeReactiveMessagingPulsarProcessor.java index 021642151f1d9..897b73601674e 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/SmallRyeReactiveMessagingPulsarProcessor.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/SmallRyeReactiveMessagingPulsarProcessor.java @@ -30,6 +30,7 @@ import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.gizmo.Gizmo; +import io.quarkus.pulsar.PulsarClientConfigCustomizer; import io.quarkus.pulsar.PulsarRuntimeConfigProducer; public class SmallRyeReactiveMessagingPulsarProcessor { @@ -45,6 +46,7 @@ FeatureBuildItem feature() { public AdditionalBeanBuildItem runtimeConfig() { return AdditionalBeanBuildItem.builder() .addBeanClass(PulsarRuntimeConfigProducer.class) + .addBeanClass(PulsarClientConfigCustomizer.class) .setUnremovable() .build(); } diff --git a/extensions/smallrye-reactive-messaging-pulsar/runtime/pom.xml b/extensions/smallrye-reactive-messaging-pulsar/runtime/pom.xml index b8714a8ed69d7..b700040e0157c 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/runtime/pom.xml +++ b/extensions/smallrye-reactive-messaging-pulsar/runtime/pom.xml @@ -22,6 +22,10 @@ io.quarkus quarkus-vertx + + io.quarkus + quarkus-tls-registry + io.netty netty-transport-classes-kqueue diff --git a/extensions/smallrye-reactive-messaging-pulsar/runtime/src/main/java/io/quarkus/pulsar/PulsarClientConfigCustomizer.java b/extensions/smallrye-reactive-messaging-pulsar/runtime/src/main/java/io/quarkus/pulsar/PulsarClientConfigCustomizer.java new file mode 100644 index 0000000000000..07e04f1cb9e74 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/runtime/src/main/java/io/quarkus/pulsar/PulsarClientConfigCustomizer.java @@ -0,0 +1,74 @@ +package io.quarkus.pulsar; + +import java.io.ByteArrayInputStream; +import java.util.Optional; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.eclipse.microprofile.config.Config; +import org.jboss.logging.Logger; + +import io.quarkus.tls.TlsConfiguration; +import io.quarkus.tls.TlsConfigurationRegistry; +import io.smallrye.reactive.messaging.ClientCustomizer; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.KeyCertOptions; +import io.vertx.core.net.KeyStoreOptionsBase; +import io.vertx.core.net.PemKeyCertOptions; +import io.vertx.core.net.PemTrustOptions; +import io.vertx.core.net.SSLOptions; +import io.vertx.core.net.TrustOptions; + +@ApplicationScoped +public class PulsarClientConfigCustomizer implements ClientCustomizer { + + private static final Logger log = Logger.getLogger(PulsarClientConfigCustomizer.class); + + @Inject + TlsConfigurationRegistry tlsRegistry; + + @Override + public ClientBuilder customize(String channel, Config channelConfig, ClientBuilder builder) { + Optional tlsConfigName = channelConfig.getOptionalValue("tls-configuration-name", String.class); + if (tlsConfigName.isPresent()) { + String tlsConfig = tlsConfigName.get(); + Optional maybeTlsConfig = tlsRegistry.get(tlsConfig); + if (maybeTlsConfig.isPresent()) { + TlsConfiguration configuration = maybeTlsConfig.get(); + SSLOptions sslOptions = configuration.getSSLOptions(); + builder.tlsCiphers(sslOptions.getEnabledCipherSuites()); + builder.tlsProtocols(sslOptions.getEnabledSecureTransportProtocols()); + builder.allowTlsInsecureConnection(false); + + KeyCertOptions keyStoreOptions = configuration.getKeyStoreOptions(); + TrustOptions trustStoreOptions = configuration.getTrustStoreOptions(); + + if (keyStoreOptions instanceof PemKeyCertOptions keyCertOptions + && trustStoreOptions instanceof PemTrustOptions trustCertOptions) { + Buffer trust = trustCertOptions.getCertValues().stream() + .collect(Buffer::buffer, Buffer::appendBuffer, Buffer::appendBuffer); + builder.authentication(new AuthenticationTls( + () -> new ByteArrayInputStream(keyCertOptions.getCertValue().getBytes()), + () -> new ByteArrayInputStream(keyCertOptions.getKeyValue().getBytes()), + () -> new ByteArrayInputStream(trust.getBytes()))); + log.debugf("Configured PulsarClientConfiguration for channel %s with TLS configuration %s", + channel, tlsConfig); + } else if (keyStoreOptions instanceof KeyStoreOptionsBase + && trustStoreOptions instanceof KeyStoreOptionsBase) { + // Set to false even though we use keyStore TLS + builder.useKeyStoreTls(false); + builder.authentication(new QuarkusPulsarKeyStoreAuthentication(configuration)); + log.debugf("Configured PulsarClientConfiguration for channel %s with TLS configuration %s", + channel, tlsConfig); + } else { + log.warnf("Unsupported TLS configuration for channel %s with TLS configuration %s", channel, tlsConfig); + } + } + } + return builder; + } + +} diff --git a/extensions/smallrye-reactive-messaging-pulsar/runtime/src/main/java/io/quarkus/pulsar/QuarkusPulsarKeyStoreAuthentication.java b/extensions/smallrye-reactive-messaging-pulsar/runtime/src/main/java/io/quarkus/pulsar/QuarkusPulsarKeyStoreAuthentication.java new file mode 100644 index 0000000000000..a64518cf35bb2 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-pulsar/runtime/src/main/java/io/quarkus/pulsar/QuarkusPulsarKeyStoreAuthentication.java @@ -0,0 +1,127 @@ +package io.quarkus.pulsar; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.security.cert.CertificateEncodingException; +import java.security.cert.X509Certificate; +import java.util.Base64; +import java.util.Map; +import java.util.stream.Stream; + +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.PulsarClientException; + +import io.quarkus.tls.TlsConfiguration; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.KeyStoreOptionsBase; + +public class QuarkusPulsarKeyStoreAuthentication implements Authentication { + public static final String BEGIN_CERT = "-----BEGIN CERTIFICATE-----"; + public static final String END_CERT = "-----END CERTIFICATE-----"; + public static final String LINE_SEPARATOR = System.getProperty("line.separator", "\n"); + + private final TlsConfiguration configuration; + + public QuarkusPulsarKeyStoreAuthentication(TlsConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public String getAuthMethodName() { + return "quarkus-pulsar"; + } + + @Override + public void configure(Map authParams) { + + } + + @Override + public void start() throws PulsarClientException { + + } + + @Override + public void close() throws IOException { + + } + + @Override + public AuthenticationDataProvider getAuthData() throws PulsarClientException { + try { + // Extract keystore password + String keyStorePwd = ((KeyStoreOptionsBase) configuration.getKeyStoreOptions()).getPassword(); + // Certificates + KeyStore keyStore = configuration.getKeyStore(); + String alias = keyStore.aliases().nextElement(); + // TODO do we need to handle certificate chains here? + X509Certificate cert = (X509Certificate) keyStore.getCertificate(alias); + X509Certificate[] certificates = Stream.of(cert).toArray(X509Certificate[]::new); + + // Private key + KeyStore.PrivateKeyEntry privateKeyEntry = (KeyStore.PrivateKeyEntry) keyStore.getEntry(alias, + new KeyStore.PasswordProtection(keyStorePwd.toCharArray())); + PrivateKey privateKey = privateKeyEntry.getPrivateKey(); + + // Trust store certificate + KeyStore trustStore = configuration.getTrustStore(); + String trustAlias = trustStore.aliases().nextElement(); + Certificate trustStoreCertificate = trustStore.getCertificate(trustAlias); + Buffer formatCrtFileContents = formatCrtFileContents(trustStoreCertificate); + + return new QuarkusPulsarAuthenticationData(certificates, privateKey, formatCrtFileContents); + } catch (Exception e) { + throw new PulsarClientException(e); + } + } + + public static Buffer formatCrtFileContents(Certificate certificate) throws CertificateEncodingException { + Buffer buffer = Buffer.buffer(); + final Base64.Encoder encoder = Base64.getMimeEncoder(64, LINE_SEPARATOR.getBytes()); + + buffer.appendString(BEGIN_CERT) + .appendString(LINE_SEPARATOR) + .appendBytes(encoder.encode(certificate.getEncoded())) + .appendString(LINE_SEPARATOR) + .appendString(END_CERT); + return buffer; + } + + private static class QuarkusPulsarAuthenticationData implements AuthenticationDataProvider { + + private final Certificate[] certs; + private final PrivateKey privateKey; + private final InputStream trustStoreStream; + + public QuarkusPulsarAuthenticationData(Certificate[] certs, PrivateKey privateKey, Buffer trustStore) { + this.certs = certs; + this.privateKey = privateKey; + this.trustStoreStream = new ByteArrayInputStream(trustStore.getBytes()); + } + + public boolean hasDataForTls() { + return true; + } + + @Override + public Certificate[] getTlsCertificates() { + return certs; + } + + @Override + public PrivateKey getTlsPrivateKey() { + return privateKey; + } + + @Override + public InputStream getTlsTrustStoreStream() { + return trustStoreStream; + } + + } +} diff --git a/integration-tests/reactive-messaging-pulsar/pom.xml b/integration-tests/reactive-messaging-pulsar/pom.xml index 98cf5bdf5dc7d..9dc0b2da16664 100644 --- a/integration-tests/reactive-messaging-pulsar/pom.xml +++ b/integration-tests/reactive-messaging-pulsar/pom.xml @@ -178,6 +178,16 @@ awaitility test + + org.testcontainers + testcontainers + test + + + io.smallrye.certs + smallrye-certificate-generator-junit5 + test + @@ -195,6 +205,35 @@ + + io.smallrye.certs + smallrye-certificate-generator-maven-plugin + ${smallrye-certificate-generator.version} + + + generate-test-resources + + generate + + + + + ${project.build.directory}/certs + + + pulsar + + PEM + PKCS12 + JKS + + Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L + true + + + + + maven-failsafe-plugin diff --git a/integration-tests/reactive-messaging-pulsar/src/main/java/io/quarkus/it/pulsar/PulsarConfig.java b/integration-tests/reactive-messaging-pulsar/src/main/java/io/quarkus/it/pulsar/PulsarConfig.java deleted file mode 100644 index da2113bb45d59..0000000000000 --- a/integration-tests/reactive-messaging-pulsar/src/main/java/io/quarkus/it/pulsar/PulsarConfig.java +++ /dev/null @@ -1,74 +0,0 @@ -package io.quarkus.it.pulsar; - -import java.io.IOException; -import java.util.Map; -import java.util.Set; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.inject.Produces; -import jakarta.inject.Singleton; - -import org.apache.pulsar.client.api.CryptoKeyReader; -import org.apache.pulsar.client.api.EncryptionKeyInfo; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; - -import io.smallrye.common.annotation.Identifier; - -@Singleton -public class PulsarConfig { - - @Produces - @Identifier("fruits-out") - @ApplicationScoped - public ProducerConfigurationData producer() { - ProducerConfigurationData data = new ProducerConfigurationData(); - data.setCryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem")); - data.setEncryptionKeys(Set.of("myappkey")); - return data; - } - - @Produces - @Identifier("fruits-in") - @ApplicationScoped - public ConsumerConfigurationData consumer() { - ConsumerConfigurationData data = new ConsumerConfigurationData<>(); - data.setCryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem")); - return data; - } - - class RawFileKeyReader implements CryptoKeyReader { - - String publicKeyFile = ""; - String privateKeyFile = ""; - - RawFileKeyReader(String pubKeyFile, String privKeyFile) { - publicKeyFile = pubKeyFile; - privateKeyFile = privKeyFile; - } - - @Override - public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) { - EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); - try { - keyInfo.setKey(PulsarConfig.class.getResourceAsStream("/" + publicKeyFile).readAllBytes()); - } catch (IOException e) { - System.out.println("ERROR: Failed to read public key from file " + publicKeyFile); - e.printStackTrace(); - } - return keyInfo; - } - - @Override - public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) { - EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); - try { - keyInfo.setKey(PulsarConfig.class.getResourceAsStream("/" + privateKeyFile).readAllBytes()); - } catch (IOException e) { - System.out.println("ERROR: Failed to read private key from file " + privateKeyFile); - e.printStackTrace(); - } - return keyInfo; - } - } -} diff --git a/integration-tests/reactive-messaging-pulsar/src/main/resources/application.properties b/integration-tests/reactive-messaging-pulsar/src/main/resources/application.properties index 4f485cb597f2e..b4098d37a1e11 100644 --- a/integration-tests/reactive-messaging-pulsar/src/main/resources/application.properties +++ b/integration-tests/reactive-messaging-pulsar/src/main/resources/application.properties @@ -10,3 +10,21 @@ pulsar.client.sslProvider=JDK pulsar.producer.compressionType=SNAPPY quarkus.security.security-providers=BC quarkus.native.resources.includes=*.pem + +quarkus.pulsar.devservices.enabled=false + +#mp.messaging.connector.smallrye-pulsar.tls-configuration-name=set-by-tests + +quarkus.tls.custom-pem.trust-store.pem.certs=target/certs/pulsar-client-ca.crt +quarkus.tls.custom-pem.key-store.pem.a.cert=target/certs/pulsar-client.crt +quarkus.tls.custom-pem.key-store.pem.a.key=target/certs/pulsar-client.key + +quarkus.tls.custom-jks.trust-store.jks.path=target/certs/pulsar-client-truststore.jks +quarkus.tls.custom-jks.trust-store.jks.password=Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L +quarkus.tls.custom-jks.key-store.jks.path=target/certs/pulsar-client-keystore.jks +quarkus.tls.custom-jks.key-store.jks.password=Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L + +quarkus.tls.custom-p12.trust-store.p12.path=target/certs/pulsar-client-truststore.p12 +quarkus.tls.custom-p12.trust-store.p12.password=Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L +quarkus.tls.custom-p12.key-store.p12.path=target/certs/pulsar-client-keystore.p12 +quarkus.tls.custom-p12.key-store.p12.password=Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L diff --git a/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/JksPulsarConnectorIT.java b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/JksPulsarConnectorIT.java new file mode 100644 index 0000000000000..483b75559a04a --- /dev/null +++ b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/JksPulsarConnectorIT.java @@ -0,0 +1,8 @@ +package io.quarkus.it.pulsar; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class JksPulsarConnectorIT extends JksPulsarConnectorTest { + +} diff --git a/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/JksPulsarConnectorTest.java b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/JksPulsarConnectorTest.java new file mode 100644 index 0000000000000..12b2a10cebc24 --- /dev/null +++ b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/JksPulsarConnectorTest.java @@ -0,0 +1,33 @@ +package io.quarkus.it.pulsar; + +import static io.restassured.RestAssured.get; +import static org.awaitility.Awaitility.await; + +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.common.WithTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.common.mapper.TypeRef; + +@WithTestResource(value = PulsarResource.class, initArgs = { + @ResourceArg(name = "isJks", value = "true"), + @ResourceArg(name = "keyStorePassword", value = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"), + @ResourceArg(name = "trustStorePassword", value = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"), + @ResourceArg(name = "pulsar.tls-configuration-name", value = "custom-jks"), +}, restrictToAnnotatedClass = true) +@QuarkusTest +public class JksPulsarConnectorTest { + + protected static final TypeRef> TYPE_REF = new TypeRef>() { + }; + + @Test + public void testFruits() { + await().untilAsserted(() -> Assertions.assertEquals(get("/pulsar/fruits").as(TYPE_REF).size(), 4)); + } + +} diff --git a/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PKCS12PulsarConnectorIT.java b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PKCS12PulsarConnectorIT.java new file mode 100644 index 0000000000000..d4fbe57510904 --- /dev/null +++ b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PKCS12PulsarConnectorIT.java @@ -0,0 +1,8 @@ +package io.quarkus.it.pulsar; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class PKCS12PulsarConnectorIT extends PKCS12PulsarConnectorTest { + +} diff --git a/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PKCS12PulsarConnectorTest.java b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PKCS12PulsarConnectorTest.java new file mode 100644 index 0000000000000..01752dfaf78ef --- /dev/null +++ b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PKCS12PulsarConnectorTest.java @@ -0,0 +1,33 @@ +package io.quarkus.it.pulsar; + +import static io.restassured.RestAssured.get; +import static org.awaitility.Awaitility.await; + +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.common.WithTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.common.mapper.TypeRef; + +@WithTestResource(value = PulsarResource.class, initArgs = { + @ResourceArg(name = "isPem", value = "true"), + @ResourceArg(name = "keyStorePassword", value = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"), + @ResourceArg(name = "trustStorePassword", value = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"), + @ResourceArg(name = "pulsar.tls-configuration-name", value = "custom-p12"), +}, restrictToAnnotatedClass = true) +@QuarkusTest +public class PKCS12PulsarConnectorTest { + + protected static final TypeRef> TYPE_REF = new TypeRef>() { + }; + + @Test + public void testFruits() { + await().untilAsserted(() -> Assertions.assertEquals(get("/pulsar/fruits").as(TYPE_REF).size(), 4)); + } + +} diff --git a/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PemPulsarConnectorIT.java b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PemPulsarConnectorIT.java new file mode 100644 index 0000000000000..b3bb4b35101c2 --- /dev/null +++ b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PemPulsarConnectorIT.java @@ -0,0 +1,8 @@ +package io.quarkus.it.pulsar; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class PemPulsarConnectorIT extends PemPulsarConnectorTest { + +} diff --git a/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PemPulsarConnectorTest.java b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PemPulsarConnectorTest.java new file mode 100644 index 0000000000000..eda8d1c668f61 --- /dev/null +++ b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PemPulsarConnectorTest.java @@ -0,0 +1,33 @@ +package io.quarkus.it.pulsar; + +import static io.restassured.RestAssured.get; +import static org.awaitility.Awaitility.await; + +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.common.WithTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.common.mapper.TypeRef; + +@WithTestResource(value = PulsarResource.class, initArgs = { + @ResourceArg(name = "isPem", value = "true"), + @ResourceArg(name = "keyStorePassword", value = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"), + @ResourceArg(name = "trustStorePassword", value = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L"), + @ResourceArg(name = "pulsar.tls-configuration-name", value = "custom-pem"), +}, restrictToAnnotatedClass = true) +@QuarkusTest +public class PemPulsarConnectorTest { + + protected static final TypeRef> TYPE_REF = new TypeRef>() { + }; + + @Test + public void testFruits() { + await().untilAsserted(() -> Assertions.assertEquals(get("/pulsar/fruits").as(TYPE_REF).size(), 4)); + } + +} diff --git a/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarConnectorTest.java b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarConnectorTest.java index 283c9d48e2572..4b6a667db2978 100644 --- a/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarConnectorTest.java +++ b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarConnectorTest.java @@ -8,9 +8,11 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import io.quarkus.test.common.WithTestResource; import io.quarkus.test.junit.QuarkusTest; import io.restassured.common.mapper.TypeRef; +@WithTestResource(value = PulsarResource.class, restrictToAnnotatedClass = true) @QuarkusTest public class PulsarConnectorTest { diff --git a/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarContainer.java b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarContainer.java new file mode 100644 index 0000000000000..9b913f842f6fb --- /dev/null +++ b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarContainer.java @@ -0,0 +1,93 @@ +package io.quarkus.it.pulsar; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; + +import com.github.dockerjava.api.command.InspectContainerResponse; + +public class PulsarContainer extends GenericContainer { + + public static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.3.0"); + + public static final String STARTER_SCRIPT = "/run_pulsar.sh"; + + public static final int BROKER_PORT = 6650; + public static final int TLS_BROKER_PORT = 6651; + public static final int BROKER_HTTP_PORT = 8080; + public static final int TLS_BROKER_HTTP_PORT = 8443; + + private boolean useTls; + + public PulsarContainer() { + this(PULSAR_IMAGE); + } + + public PulsarContainer(DockerImageName imageName) { + super(imageName); + super.withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT); + super.withStartupTimeout(Duration.ofSeconds(60)); + super.waitingFor(Wait.forLogMessage(".*Created namespace public/default.*", 1)); + super.withCommand("sh", "-c", runStarterScript()); + super.withTmpFs(Collections.singletonMap("/pulsar/data", "rw")); + } + + protected String runStarterScript() { + return "while [ ! -x " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT; + } + + @Override + protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { + super.containerIsStarting(containerInfo, reused); + String advertisedListeners; + if (useTls) { + advertisedListeners = String.format("internal:pulsar+ssl://localhost:%s,external:pulsar+ssl://%s:%s", + TLS_BROKER_PORT, this.getHost(), this.getMappedPort(TLS_BROKER_PORT)); + } else { + advertisedListeners = String.format("internal:pulsar://localhost:%s,external:pulsar://%s:%s", + BROKER_PORT, this.getHost(), this.getMappedPort(BROKER_PORT)); + } + + String command = "#!/bin/bash \n"; + command += "export PULSAR_PREFIX_advertisedListeners=" + advertisedListeners + " \n"; + command += "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"; + copyFileToContainer( + Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), + STARTER_SCRIPT); + } + + public PulsarContainer useTls() { + addExposedPorts(TLS_BROKER_PORT, TLS_BROKER_HTTP_PORT); + useTls = true; + return self(); + } + + public PulsarContainer withPort(final int fixedPort) { + if (fixedPort <= 0) { + throw new IllegalArgumentException("The fixed port must be greater than 0"); + } + addFixedExposedPort(fixedPort, BROKER_PORT); + return self(); + } + + public String getPulsarBrokerUrl() { + if (useTls) { + return String.format("pulsar+ssl://%s:%s", this.getHost(), this.getMappedPort(TLS_BROKER_PORT)); + } else { + return String.format("pulsar://%s:%s", this.getHost(), this.getMappedPort(BROKER_PORT)); + } + } + + public String getHttpServiceUrl() { + if (useTls) { + return String.format("https://%s:%s", this.getHost(), this.getMappedPort(TLS_BROKER_HTTP_PORT)); + } else { + return String.format("http://%s:%s", this.getHost(), this.getMappedPort(BROKER_HTTP_PORT)); + } + } +} diff --git a/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarResource.java b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarResource.java new file mode 100644 index 0000000000000..ca0164fc0ade2 --- /dev/null +++ b/integration-tests/reactive-messaging-pulsar/src/test/java/io/quarkus/it/pulsar/PulsarResource.java @@ -0,0 +1,122 @@ +package io.quarkus.it.pulsar; + +import java.util.HashMap; +import java.util.Map; + +import org.testcontainers.utility.MountableFile; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class PulsarResource implements QuarkusTestResourceLifecycleManager { + + private PulsarContainer container; + + private boolean pem = false; + private boolean jks = false; + private String keyStorePassword; + private String trustStorePassword; + private String tlsConfigName; + + @Override + public void init(Map initArgs) { + pem = Boolean.parseBoolean(initArgs.get("isPem")); + jks = Boolean.parseBoolean(initArgs.get("isJks")); + keyStorePassword = initArgs.get("keyStorePassword"); + trustStorePassword = initArgs.get("trustStorePassword"); + tlsConfigName = initArgs.get("pulsar.tls-configuration-name"); + } + + @Override + public Map start() { + container = new PulsarContainer(); + + if (pem) { + configurePem(); + } else if (jks) { + configureJks(); + } + + container.start(); + Map cfg = new HashMap<>(); + cfg.put("pulsar.client.serviceUrl", container.getPulsarBrokerUrl()); + if (tlsConfigName != null) { + cfg.put("mp.messaging.connector.smallrye-pulsar.tls-configuration-name", tlsConfigName); + } + + return cfg; + } + + private void configureJks() { + configureCommonTls(); + container + .useTls() + .withCopyFileToContainer(MountableFile.forHostPath("target/certs/pulsar-keystore.jks"), + "/pulsar/conf/pulsar-keystore.jks") + .withCopyFileToContainer(MountableFile.forHostPath("target/certs/pulsar-server-truststore.jks"), + "/pulsar/conf/pulsar-server-truststore.jks") + // broker client + .withCopyFileToContainer(MountableFile.forHostPath("target/certs/pulsar-client-keystore.jks"), + "/pulsar/conf/pulsar-client-keystore.jks") + .withCopyFileToContainer(MountableFile.forHostPath("target/certs/pulsar-client-truststore.jks"), + "/pulsar/conf/pulsar-client-truststore.jks"); + addConf("tlsEnabledWithKeyStore", "true"); + addConf("tlsKeyStoreType", "JKS"); + addConf("tlsKeyStore", "/pulsar/conf/pulsar-keystore.jks"); + addConf("tlsKeyStorePassword", keyStorePassword); + addConf("tlsTrustStoreType", "JKS"); + addConf("tlsTrustStore", "/pulsar/conf/pulsar-server-truststore.jks"); + addConf("tlsTrustStorePassword", trustStorePassword); + // broker client + addConf("brokerClientTlsEnabledWithKeyStore", "true"); + addConf("brokerClientTlsTrustStoreType", "JKS"); + addConf("brokerClientTlsTrustStore", "/pulsar/conf/pulsar-client-truststore.jks"); + addConf("brokerClientTlsTrustStorePassword", trustStorePassword); + addConf("brokerClientTlsKeyStoreType", "JKS"); + addConf("brokerClientTlsKeyStore", "/pulsar/conf/pulsar-client-keystore.jks"); + addConf("brokerClientTlsKeyStorePassword", keyStorePassword); + } + + private void configurePem() { + configureCommonTls(); + container + .useTls() + .withCopyFileToContainer(MountableFile.forHostPath("target/certs/pulsar.crt"), "/pulsar/conf/pulsar.crt") + .withCopyFileToContainer(MountableFile.forHostPath("target/certs/pulsar.key"), "/pulsar/conf/pulsar.key") + .withCopyFileToContainer(MountableFile.forHostPath("target/certs/pulsar-server-ca.crt"), + "/pulsar/conf/pulsar-server-ca.crt") + // broker client + .withCopyFileToContainer(MountableFile.forHostPath("target/certs/pulsar-client.crt"), + "/pulsar/conf/pulsar-client.crt") + .withCopyFileToContainer(MountableFile.forHostPath("target/certs/pulsar-client.key"), + "/pulsar/conf/pulsar-client.key") + .withCopyFileToContainer(MountableFile.forHostPath("target/certs/pulsar-client-ca.crt"), + "/pulsar/conf/pulsar-client-ca.crt"); + addConf("tlsRequireTrustedClientCertOnConnect", "true"); + addConf("tlsTrustCertsFilePath", "/pulsar/conf/pulsar-server-ca.crt"); + addConf("tlsCertificateFilePath", "/pulsar/conf/pulsar.crt"); + addConf("tlsKeyFilePath", "/pulsar/conf/pulsar.key"); + // broker client + addConf("brokerClientTrustCertsFilePath", "/pulsar/conf/pulsar-client-ca.crt"); + addConf("brokerClientCertificateFilePath", "/pulsar/conf/pulsar-client.crt"); + addConf("brokerClientKeyFilePath", "/pulsar/conf/pulsar-client.key"); + } + + private void addConf(String key, String value) { + container.addEnv("PULSAR_PREFIX_" + key, value); + } + + private void configureCommonTls() { + addConf("brokerServicePort", ""); + addConf("brokerServicePortTls", "6651"); + addConf("webServicePortTls", "8443"); + addConf("tlsEnabled", "true"); + addConf("brokerClientTlsEnabled", "true"); + } + + @Override + public void stop() { + if (container != null) { + container.close(); + } + } +}