From 8e9d9719f297dcb21fc28135d2c0197cac2c2260 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Fri, 2 Dec 2022 09:32:21 +0100 Subject: [PATCH] Kafka checkpoint state store: Check config before creating bean Fixes #29606 --- .../client/config/RedisClientConfig.java | 2 +- ...allRyeReactiveMessagingKafkaProcessor.java | 59 +++++++++++++++-- .../CheckpointStateStoreConfigTest.java | 64 +++++++++++++++++++ .../kafka/HibernateOrmStateStore.java | 6 +- .../kafka/HibernateReactiveStateStore.java | 3 +- .../kafka/RedisStateStore.java | 6 +- 6 files changed, 125 insertions(+), 15 deletions(-) create mode 100644 extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/CheckpointStateStoreConfigTest.java diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/config/RedisClientConfig.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/config/RedisClientConfig.java index 77ca63a9de075..d10e1f670b8fd 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/config/RedisClientConfig.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/config/RedisClientConfig.java @@ -25,7 +25,7 @@ public class RedisClientConfig { * * @see Redis scheme on www.iana.org */ - @ConfigItem(defaultValueDocumentation = "redis://localhost:6379", name = RedisConfig.HOSTS_CONFIG_NAME) + @ConfigItem(name = RedisConfig.HOSTS_CONFIG_NAME) public Optional> hosts; /** diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 6803bd135f0a2..898774bf6d92e 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -1,9 +1,14 @@ package io.quarkus.smallrye.reactivemessaging.kafka.deployment; +import static io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore.HIBERNATE_ORM_STATE_STORE; +import static io.quarkus.smallrye.reactivemessaging.kafka.HibernateReactiveStateStore.HIBERNATE_REACTIVE_STATE_STORE; +import static io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore.REDIS_STATE_STORE; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Function; @@ -41,12 +46,16 @@ import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig; import io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore; import io.smallrye.mutiny.tuples.Functions.TriConsumer; +import io.smallrye.reactive.messaging.kafka.KafkaConnector; import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl; public class SmallRyeReactiveMessagingKafkaProcessor { private static final Logger LOGGER = Logger.getLogger("io.quarkus.smallrye-reactive-messaging-kafka.deployment.processor"); + public static final String CHECKPOINT_STATE_STORE_MESSAGE = "Quarkus detected the use of `%s` for the" + + " Kafka checkpoint commit strategy but the extension has not been added. Consider adding '%s'."; + @BuildStep FeatureBuildItem feature() { return new FeatureBuildItem(Feature.SMALLRYE_REACTIVE_MESSAGING_KAFKA); @@ -74,25 +83,61 @@ public void ignoreDuplicateJmxRegistrationInDevAndTestModes(LaunchModeBuildItem } } + static boolean hasStateStoreConfig(String stateStoreName, Config config) { + Optional connectorStrategy = getConnectorProperty("checkpoint.state-store", config); + if (connectorStrategy.isPresent() && connectorStrategy.get().equals(stateStoreName)) { + return true; + } + List stateStores = getChannelProperties("checkpoint.state-store", config); + return stateStores.contains(stateStoreName); + } + + private static Optional getConnectorProperty(String keySuffix, Config config) { + return config.getOptionalValue("mp.messaging.connector." + KafkaConnector.CONNECTOR_NAME + "." + keySuffix, + String.class); + } + + private static List getChannelProperties(String keySuffix, Config config) { + List values = new ArrayList<>(); + for (String propertyName : config.getPropertyNames()) { + if (propertyName.startsWith("mp.messaging.incoming.") && propertyName.endsWith("." + keySuffix)) { + values.add(config.getValue(propertyName, String.class)); + } + } + return values; + } + @BuildStep public void checkpointRedis(BuildProducer additionalBean, Capabilities capabilities) { - if (capabilities.isPresent(Capability.REDIS_CLIENT)) { - additionalBean.produce(new AdditionalBeanBuildItem(RedisStateStore.Factory.class)); - additionalBean.produce(new AdditionalBeanBuildItem(DatabindProcessingStateCodec.Factory.class)); + if (hasStateStoreConfig(REDIS_STATE_STORE, ConfigProvider.getConfig())) { + if (capabilities.isPresent(Capability.REDIS_CLIENT)) { + additionalBean.produce(new AdditionalBeanBuildItem(RedisStateStore.Factory.class)); + additionalBean.produce(new AdditionalBeanBuildItem(DatabindProcessingStateCodec.Factory.class)); + } else { + LOGGER.warnf(CHECKPOINT_STATE_STORE_MESSAGE, REDIS_STATE_STORE, "quarkus-redis-client"); + } } } @BuildStep public void checkpointHibernateReactive(BuildProducer additionalBean, Capabilities capabilities) { - if (capabilities.isPresent(Capability.HIBERNATE_REACTIVE)) { - additionalBean.produce(new AdditionalBeanBuildItem(HibernateReactiveStateStore.Factory.class)); + if (hasStateStoreConfig(HIBERNATE_REACTIVE_STATE_STORE, ConfigProvider.getConfig())) { + if (capabilities.isPresent(Capability.HIBERNATE_REACTIVE)) { + additionalBean.produce(new AdditionalBeanBuildItem(HibernateReactiveStateStore.Factory.class)); + } else { + LOGGER.warnf(CHECKPOINT_STATE_STORE_MESSAGE, HIBERNATE_REACTIVE_STATE_STORE, "quarkus-hibernate-reactive"); + } } } @BuildStep public void checkpointHibernateOrm(BuildProducer additionalBean, Capabilities capabilities) { - if (capabilities.isPresent(Capability.HIBERNATE_ORM)) { - additionalBean.produce(new AdditionalBeanBuildItem(HibernateOrmStateStore.Factory.class)); + if (hasStateStoreConfig(HIBERNATE_ORM_STATE_STORE, ConfigProvider.getConfig())) { + if (capabilities.isPresent(Capability.HIBERNATE_ORM)) { + additionalBean.produce(new AdditionalBeanBuildItem(HibernateOrmStateStore.Factory.class)); + } else { + LOGGER.warnf(CHECKPOINT_STATE_STORE_MESSAGE, HIBERNATE_ORM_STATE_STORE, "quarkus-hibernate-orm"); + } } } diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/CheckpointStateStoreConfigTest.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/CheckpointStateStoreConfigTest.java new file mode 100644 index 0000000000000..03167b1e4edcb --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/CheckpointStateStoreConfigTest.java @@ -0,0 +1,64 @@ +package io.quarkus.smallrye.reactivemessaging.kafka.deployment; + +import static io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore.HIBERNATE_ORM_STATE_STORE; +import static io.quarkus.smallrye.reactivemessaging.kafka.HibernateReactiveStateStore.HIBERNATE_REACTIVE_STATE_STORE; +import static io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore.REDIS_STATE_STORE; +import static io.quarkus.smallrye.reactivemessaging.kafka.deployment.SmallRyeReactiveMessagingKafkaProcessor.hasStateStoreConfig; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; + +import org.eclipse.microprofile.config.spi.ConfigProviderResolver; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.config.SmallRyeConfig; +import io.smallrye.config.SmallRyeConfigBuilder; +import io.smallrye.config.common.MapBackedConfigSource; + +public class CheckpointStateStoreConfigTest { + + SmallRyeConfig config; + + @AfterEach + void tearDown() { + if (config != null) { + ConfigProviderResolver.instance().releaseConfig(config); + } + } + + private void createConfig(Map configMap) { + config = new SmallRyeConfigBuilder() + .withSources(new MapBackedConfigSource("test", configMap) { + }) + .build(); + } + + @Test + void testHasStateStoreConfigWithConnectorConfig() { + createConfig(Map.of("mp.messaging.connector.smallrye-kafka.checkpoint.state-store", HIBERNATE_ORM_STATE_STORE)); + assertTrue(hasStateStoreConfig(HIBERNATE_ORM_STATE_STORE, config)); + } + + @Test + void testHasStateStoreConfigWithChannelConfig() { + createConfig(Map.of("mp.messaging.incoming.my-channel.checkpoint.state-store", HIBERNATE_REACTIVE_STATE_STORE)); + assertTrue(hasStateStoreConfig(HIBERNATE_REACTIVE_STATE_STORE, config)); + } + + @Test + void testHasStateStoreConfigWithInvalidChannelConfig() { + createConfig(Map.of( + "mp.messaging.outgoing.my-channel.checkpoint.state-store", HIBERNATE_REACTIVE_STATE_STORE, + "mp.messaging.incoming.my-channel.state-store", HIBERNATE_ORM_STATE_STORE)); + assertFalse(hasStateStoreConfig(HIBERNATE_REACTIVE_STATE_STORE, config)); + assertFalse(hasStateStoreConfig(HIBERNATE_ORM_STATE_STORE, config)); + } + + @Test + void testHasStateStoreConfigEmptyConfig() { + createConfig(Map.of()); + assertFalse(hasStateStoreConfig(REDIS_STATE_STORE, config)); + } +} diff --git a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java index b4f8637ea0a76..dacadc1645387 100644 --- a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateOrmStateStore.java @@ -29,7 +29,7 @@ public class HibernateOrmStateStore implements CheckpointStateStore { - public static final String QUARKUS_HIBERNATE_ORM = "quarkus-hibernate-orm"; + public static final String HIBERNATE_ORM_STATE_STORE = "quarkus-hibernate-orm"; private final String consumerGroupId; private final SessionFactory sf; private final Class stateType; @@ -42,7 +42,7 @@ public HibernateOrmStateStore(String consumerGroupId, SessionFactory sf, } @ApplicationScoped - @Identifier(QUARKUS_HIBERNATE_ORM) + @Identifier(HIBERNATE_ORM_STATE_STORE) public static class Factory implements CheckpointStateStore.Factory { @Inject @@ -57,7 +57,7 @@ public CheckpointStateStore create(KafkaConnectorIncomingConfiguration config, V throw new IllegalArgumentException("State type needs to extend `CheckpointEntity`"); } String persistenceUnit = config.config().getOptionalValue(KafkaCommitHandler.Strategy.CHECKPOINT + "." + - QUARKUS_HIBERNATE_ORM + ".persistence-unit", String.class) + HIBERNATE_ORM_STATE_STORE + ".persistence-unit", String.class) .orElse(null); SessionFactory sf = persistenceUnit != null ? sessionFactories.select(new PersistenceUnit.PersistenceUnitLiteral(persistenceUnit)).get() diff --git a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateReactiveStateStore.java b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateReactiveStateStore.java index 94fed9eb48e27..811b29bb10115 100644 --- a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateReactiveStateStore.java +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/HibernateReactiveStateStore.java @@ -25,6 +25,7 @@ public class HibernateReactiveStateStore implements CheckpointStateStore { + public static final String HIBERNATE_REACTIVE_STATE_STORE = "quarkus-hibernate-reactive"; private final String consumerGroupId; private final Mutiny.SessionFactory sf; private final Class stateType; @@ -37,7 +38,7 @@ public HibernateReactiveStateStore(String consumerGroupId, Mutiny.SessionFactory } @ApplicationScoped - @Identifier("quarkus-hibernate-reactive") + @Identifier(HIBERNATE_REACTIVE_STATE_STORE) public static class Factory implements CheckpointStateStore.Factory { @Inject diff --git a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/RedisStateStore.java b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/RedisStateStore.java index d73cbcbfc4a1b..b606a2e14a67b 100644 --- a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/RedisStateStore.java +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/RedisStateStore.java @@ -33,7 +33,7 @@ public class RedisStateStore implements CheckpointStateStore { - public static final String REDIS_CHECKPOINT_NAME = "quarkus-redis"; + public static final String REDIS_STATE_STORE = "quarkus-redis"; private final ReactiveRedisDataSource redis; private final String consumerGroupId; @@ -47,7 +47,7 @@ public RedisStateStore(ReactiveRedisDataSource redis, String consumerGroupId, Pr } @ApplicationScoped - @Identifier(REDIS_CHECKPOINT_NAME) + @Identifier(REDIS_STATE_STORE) public static class Factory implements CheckpointStateStore.Factory { @Inject @@ -62,7 +62,7 @@ public CheckpointStateStore create(KafkaConnectorIncomingConfiguration config, V KafkaConsumer consumer, Class stateType) { String consumerGroupId = (String) consumer.configuration().get(ConsumerConfig.GROUP_ID_CONFIG); String clientName = config.config().getOptionalValue(KafkaCommitHandler.Strategy.CHECKPOINT + "." + - REDIS_CHECKPOINT_NAME + ".client-name", String.class) + REDIS_STATE_STORE + ".client-name", String.class) .orElse(null); ReactiveRedisDataSource rds = clientName != null ? redisDataSource.select(RedisClientName.Literal.of(clientName)).get()