Skip to content

Commit

Permalink
Merge pull request #29720 from ozangunalp/kafka_checkpointing_state_s…
Browse files Browse the repository at this point in the history
…tore_beans

Check config before creating Kafka checkpoint state store beans
  • Loading branch information
ozangunalp authored Dec 8, 2022
2 parents 25b8a47 + 8e9d971 commit f030a3e
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class RedisClientConfig {
*
* @see <a href="https://www.iana.org/assignments/uri-schemes/prov/redis">Redis scheme on www.iana.org</a>
*/
@ConfigItem(defaultValueDocumentation = "redis://localhost:6379", name = RedisConfig.HOSTS_CONFIG_NAME)
@ConfigItem(name = RedisConfig.HOSTS_CONFIG_NAME)
public Optional<Set<URI>> hosts;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package io.quarkus.smallrye.reactivemessaging.kafka.deployment;

import static io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore.HIBERNATE_ORM_STATE_STORE;
import static io.quarkus.smallrye.reactivemessaging.kafka.HibernateReactiveStateStore.HIBERNATE_REACTIVE_STATE_STORE;
import static io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore.REDIS_STATE_STORE;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;

Expand Down Expand Up @@ -41,12 +46,16 @@
import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig;
import io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore;
import io.smallrye.mutiny.tuples.Functions.TriConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;

public class SmallRyeReactiveMessagingKafkaProcessor {

private static final Logger LOGGER = Logger.getLogger("io.quarkus.smallrye-reactive-messaging-kafka.deployment.processor");

public static final String CHECKPOINT_STATE_STORE_MESSAGE = "Quarkus detected the use of `%s` for the" +
" Kafka checkpoint commit strategy but the extension has not been added. Consider adding '%s'.";

@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.SMALLRYE_REACTIVE_MESSAGING_KAFKA);
Expand Down Expand Up @@ -74,25 +83,61 @@ public void ignoreDuplicateJmxRegistrationInDevAndTestModes(LaunchModeBuildItem
}
}

static boolean hasStateStoreConfig(String stateStoreName, Config config) {
Optional<String> connectorStrategy = getConnectorProperty("checkpoint.state-store", config);
if (connectorStrategy.isPresent() && connectorStrategy.get().equals(stateStoreName)) {
return true;
}
List<String> stateStores = getChannelProperties("checkpoint.state-store", config);
return stateStores.contains(stateStoreName);
}

private static Optional<String> getConnectorProperty(String keySuffix, Config config) {
return config.getOptionalValue("mp.messaging.connector." + KafkaConnector.CONNECTOR_NAME + "." + keySuffix,
String.class);
}

private static List<String> getChannelProperties(String keySuffix, Config config) {
List<String> values = new ArrayList<>();
for (String propertyName : config.getPropertyNames()) {
if (propertyName.startsWith("mp.messaging.incoming.") && propertyName.endsWith("." + keySuffix)) {
values.add(config.getValue(propertyName, String.class));
}
}
return values;
}

@BuildStep
public void checkpointRedis(BuildProducer<AdditionalBeanBuildItem> additionalBean, Capabilities capabilities) {
if (capabilities.isPresent(Capability.REDIS_CLIENT)) {
additionalBean.produce(new AdditionalBeanBuildItem(RedisStateStore.Factory.class));
additionalBean.produce(new AdditionalBeanBuildItem(DatabindProcessingStateCodec.Factory.class));
if (hasStateStoreConfig(REDIS_STATE_STORE, ConfigProvider.getConfig())) {
if (capabilities.isPresent(Capability.REDIS_CLIENT)) {
additionalBean.produce(new AdditionalBeanBuildItem(RedisStateStore.Factory.class));
additionalBean.produce(new AdditionalBeanBuildItem(DatabindProcessingStateCodec.Factory.class));
} else {
LOGGER.warnf(CHECKPOINT_STATE_STORE_MESSAGE, REDIS_STATE_STORE, "quarkus-redis-client");
}
}
}

@BuildStep
public void checkpointHibernateReactive(BuildProducer<AdditionalBeanBuildItem> additionalBean, Capabilities capabilities) {
if (capabilities.isPresent(Capability.HIBERNATE_REACTIVE)) {
additionalBean.produce(new AdditionalBeanBuildItem(HibernateReactiveStateStore.Factory.class));
if (hasStateStoreConfig(HIBERNATE_REACTIVE_STATE_STORE, ConfigProvider.getConfig())) {
if (capabilities.isPresent(Capability.HIBERNATE_REACTIVE)) {
additionalBean.produce(new AdditionalBeanBuildItem(HibernateReactiveStateStore.Factory.class));
} else {
LOGGER.warnf(CHECKPOINT_STATE_STORE_MESSAGE, HIBERNATE_REACTIVE_STATE_STORE, "quarkus-hibernate-reactive");
}
}
}

@BuildStep
public void checkpointHibernateOrm(BuildProducer<AdditionalBeanBuildItem> additionalBean, Capabilities capabilities) {
if (capabilities.isPresent(Capability.HIBERNATE_ORM)) {
additionalBean.produce(new AdditionalBeanBuildItem(HibernateOrmStateStore.Factory.class));
if (hasStateStoreConfig(HIBERNATE_ORM_STATE_STORE, ConfigProvider.getConfig())) {
if (capabilities.isPresent(Capability.HIBERNATE_ORM)) {
additionalBean.produce(new AdditionalBeanBuildItem(HibernateOrmStateStore.Factory.class));
} else {
LOGGER.warnf(CHECKPOINT_STATE_STORE_MESSAGE, HIBERNATE_ORM_STATE_STORE, "quarkus-hibernate-orm");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.quarkus.smallrye.reactivemessaging.kafka.deployment;

import static io.quarkus.smallrye.reactivemessaging.kafka.HibernateOrmStateStore.HIBERNATE_ORM_STATE_STORE;
import static io.quarkus.smallrye.reactivemessaging.kafka.HibernateReactiveStateStore.HIBERNATE_REACTIVE_STATE_STORE;
import static io.quarkus.smallrye.reactivemessaging.kafka.RedisStateStore.REDIS_STATE_STORE;
import static io.quarkus.smallrye.reactivemessaging.kafka.deployment.SmallRyeReactiveMessagingKafkaProcessor.hasStateStoreConfig;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Map;

import org.eclipse.microprofile.config.spi.ConfigProviderResolver;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import io.smallrye.config.SmallRyeConfig;
import io.smallrye.config.SmallRyeConfigBuilder;
import io.smallrye.config.common.MapBackedConfigSource;

public class CheckpointStateStoreConfigTest {

SmallRyeConfig config;

@AfterEach
void tearDown() {
if (config != null) {
ConfigProviderResolver.instance().releaseConfig(config);
}
}

private void createConfig(Map<String, String> configMap) {
config = new SmallRyeConfigBuilder()
.withSources(new MapBackedConfigSource("test", configMap) {
})
.build();
}

@Test
void testHasStateStoreConfigWithConnectorConfig() {
createConfig(Map.of("mp.messaging.connector.smallrye-kafka.checkpoint.state-store", HIBERNATE_ORM_STATE_STORE));
assertTrue(hasStateStoreConfig(HIBERNATE_ORM_STATE_STORE, config));
}

@Test
void testHasStateStoreConfigWithChannelConfig() {
createConfig(Map.of("mp.messaging.incoming.my-channel.checkpoint.state-store", HIBERNATE_REACTIVE_STATE_STORE));
assertTrue(hasStateStoreConfig(HIBERNATE_REACTIVE_STATE_STORE, config));
}

@Test
void testHasStateStoreConfigWithInvalidChannelConfig() {
createConfig(Map.of(
"mp.messaging.outgoing.my-channel.checkpoint.state-store", HIBERNATE_REACTIVE_STATE_STORE,
"mp.messaging.incoming.my-channel.state-store", HIBERNATE_ORM_STATE_STORE));
assertFalse(hasStateStoreConfig(HIBERNATE_REACTIVE_STATE_STORE, config));
assertFalse(hasStateStoreConfig(HIBERNATE_ORM_STATE_STORE, config));
}

@Test
void testHasStateStoreConfigEmptyConfig() {
createConfig(Map.of());
assertFalse(hasStateStoreConfig(REDIS_STATE_STORE, config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

public class HibernateOrmStateStore implements CheckpointStateStore {

public static final String QUARKUS_HIBERNATE_ORM = "quarkus-hibernate-orm";
public static final String HIBERNATE_ORM_STATE_STORE = "quarkus-hibernate-orm";
private final String consumerGroupId;
private final SessionFactory sf;
private final Class<? extends CheckpointEntity> stateType;
Expand All @@ -42,7 +42,7 @@ public HibernateOrmStateStore(String consumerGroupId, SessionFactory sf,
}

@ApplicationScoped
@Identifier(QUARKUS_HIBERNATE_ORM)
@Identifier(HIBERNATE_ORM_STATE_STORE)
public static class Factory implements CheckpointStateStore.Factory {

@Inject
Expand All @@ -57,7 +57,7 @@ public CheckpointStateStore create(KafkaConnectorIncomingConfiguration config, V
throw new IllegalArgumentException("State type needs to extend `CheckpointEntity`");
}
String persistenceUnit = config.config().getOptionalValue(KafkaCommitHandler.Strategy.CHECKPOINT + "." +
QUARKUS_HIBERNATE_ORM + ".persistence-unit", String.class)
HIBERNATE_ORM_STATE_STORE + ".persistence-unit", String.class)
.orElse(null);
SessionFactory sf = persistenceUnit != null
? sessionFactories.select(new PersistenceUnit.PersistenceUnitLiteral(persistenceUnit)).get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

public class HibernateReactiveStateStore implements CheckpointStateStore {

public static final String HIBERNATE_REACTIVE_STATE_STORE = "quarkus-hibernate-reactive";
private final String consumerGroupId;
private final Mutiny.SessionFactory sf;
private final Class<? extends CheckpointEntity> stateType;
Expand All @@ -37,7 +38,7 @@ public HibernateReactiveStateStore(String consumerGroupId, Mutiny.SessionFactory
}

@ApplicationScoped
@Identifier("quarkus-hibernate-reactive")
@Identifier(HIBERNATE_REACTIVE_STATE_STORE)
public static class Factory implements CheckpointStateStore.Factory {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

public class RedisStateStore implements CheckpointStateStore {

public static final String REDIS_CHECKPOINT_NAME = "quarkus-redis";
public static final String REDIS_STATE_STORE = "quarkus-redis";

private final ReactiveRedisDataSource redis;
private final String consumerGroupId;
Expand All @@ -47,7 +47,7 @@ public RedisStateStore(ReactiveRedisDataSource redis, String consumerGroupId, Pr
}

@ApplicationScoped
@Identifier(REDIS_CHECKPOINT_NAME)
@Identifier(REDIS_STATE_STORE)
public static class Factory implements CheckpointStateStore.Factory {

@Inject
Expand All @@ -62,7 +62,7 @@ public CheckpointStateStore create(KafkaConnectorIncomingConfiguration config, V
KafkaConsumer<?, ?> consumer, Class<?> stateType) {
String consumerGroupId = (String) consumer.configuration().get(ConsumerConfig.GROUP_ID_CONFIG);
String clientName = config.config().getOptionalValue(KafkaCommitHandler.Strategy.CHECKPOINT + "." +
REDIS_CHECKPOINT_NAME + ".client-name", String.class)
REDIS_STATE_STORE + ".client-name", String.class)
.orElse(null);
ReactiveRedisDataSource rds = clientName != null
? redisDataSource.select(RedisClientName.Literal.of(clientName)).get()
Expand Down

0 comments on commit f030a3e

Please sign in to comment.