From 366573d2d4517b4442a6aed290e9089bc021e287 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 8 Jul 2021 11:57:53 +0100 Subject: [PATCH] Kafka serde autodetection : keep inserted default config in discovery state, avoid duplicates. Fixes #18495 --- .../DefaultSerdeDiscoveryState.java | 8 +++ ...allRyeReactiveMessagingKafkaProcessor.java | 54 ++++++++----------- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java index f4740fe592478..b6bade7d88c25 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java @@ -19,6 +19,7 @@ class DefaultSerdeDiscoveryState { private final IndexView index; private final Map isKafkaConnector = new HashMap<>(); + private final Map serdeConfigMap = new HashMap<>(); private Boolean hasConfluent; private Boolean hasApicurio1; @@ -40,6 +41,13 @@ boolean isKafkaConnector(boolean incoming, String channelName) { }); } + void runIfConfigIsAbsent(String key, String value, Runnable runnable) { + if (value != null && !serdeConfigMap.containsKey(key)) { + serdeConfigMap.put(key, value); + runnable.run(); + } + } + boolean isAvroGenerated(DotName className) { ClassInfo clazz = index.getClassByName(className); return clazz != null && clazz.classAnnotation(DotNames.AVRO_GENERATED) != null; diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 2687f5f4a0f05..3fbec6398fe20 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -65,14 +65,10 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery, MethodInfo method = annotation.target().asMethod(); processIncomingMethod(discovery, method, (keyDeserializer, valueDeserializer) -> { - if (keyDeserializer != null) { - config.produce(new RunTimeConfigurationDefaultBuildItem( - "mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer)); - } - if (valueDeserializer != null) { - config.produce(new RunTimeConfigurationDefaultBuildItem( - "mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer)); - } + produceRuntimeConfigurationDefaultBuildItem(discovery, config, + "mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer); + produceRuntimeConfigurationDefaultBuildItem(discovery, config, + "mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer); }); } @@ -85,14 +81,10 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery, MethodInfo method = annotation.target().asMethod(); processOutgoingMethod(discovery, method, (keySerializer, valueSerializer) -> { - if (keySerializer != null) { - config.produce(new RunTimeConfigurationDefaultBuildItem( - "mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer)); - } - if (valueSerializer != null) { - config.produce(new RunTimeConfigurationDefaultBuildItem( - "mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer)); - } + produceRuntimeConfigurationDefaultBuildItem(discovery, config, + "mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer); + produceRuntimeConfigurationDefaultBuildItem(discovery, config, + "mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer); }); } @@ -117,29 +109,27 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery, } processIncomingChannelInjectionPoint(discovery, injectionPointType, (keyDeserializer, valueDeserializer) -> { - if (keyDeserializer != null) { - config.produce(new RunTimeConfigurationDefaultBuildItem( - "mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer)); - } - if (valueDeserializer != null) { - config.produce(new RunTimeConfigurationDefaultBuildItem( - "mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer)); - } + produceRuntimeConfigurationDefaultBuildItem(discovery, config, + "mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer); + produceRuntimeConfigurationDefaultBuildItem(discovery, config, + "mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer); }); processOutgoingChannelInjectionPoint(discovery, injectionPointType, (keySerializer, valueSerializer) -> { - if (keySerializer != null) { - config.produce(new RunTimeConfigurationDefaultBuildItem( - "mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer)); - } - if (valueSerializer != null) { - config.produce(new RunTimeConfigurationDefaultBuildItem( - "mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer)); - } + produceRuntimeConfigurationDefaultBuildItem(discovery, config, + "mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer); + produceRuntimeConfigurationDefaultBuildItem(discovery, config, + "mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer); }); } } + void produceRuntimeConfigurationDefaultBuildItem(DefaultSerdeDiscoveryState discovery, + BuildProducer config, String key, String value) { + discovery.runIfConfigIsAbsent(key, value, + () -> config.produce(new RunTimeConfigurationDefaultBuildItem(key, value))); + } + private void processIncomingMethod(DefaultSerdeDiscoveryState discovery, MethodInfo method, BiConsumer deserializerAcceptor) { List parameterTypes = method.parameters();