Skip to content

Commit

Permalink
Kafka serde autodetection : keep inserted default config in discovery…
Browse files Browse the repository at this point in the history
… state, avoid duplicates.

Fixes quarkusio#18495
  • Loading branch information
ozangunalp committed Jul 8, 2021
1 parent a906fe2 commit 366573d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class DefaultSerdeDiscoveryState {
private final IndexView index;

private final Map<String, Boolean> isKafkaConnector = new HashMap<>();
private final Map<String, String> serdeConfigMap = new HashMap<>();

private Boolean hasConfluent;
private Boolean hasApicurio1;
Expand All @@ -40,6 +41,13 @@ boolean isKafkaConnector(boolean incoming, String channelName) {
});
}

void runIfConfigIsAbsent(String key, String value, Runnable runnable) {
if (value != null && !serdeConfigMap.containsKey(key)) {
serdeConfigMap.put(key, value);
runnable.run();
}
}

boolean isAvroGenerated(DotName className) {
ClassInfo clazz = index.getClassByName(className);
return clazz != null && clazz.classAnnotation(DotNames.AVRO_GENERATED) != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,10 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
MethodInfo method = annotation.target().asMethod();

processIncomingMethod(discovery, method, (keyDeserializer, valueDeserializer) -> {
if (keyDeserializer != null) {
config.produce(new RunTimeConfigurationDefaultBuildItem(
"mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer));
}
if (valueDeserializer != null) {
config.produce(new RunTimeConfigurationDefaultBuildItem(
"mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer));
}
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer);
});
}

Expand All @@ -85,14 +81,10 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
MethodInfo method = annotation.target().asMethod();

processOutgoingMethod(discovery, method, (keySerializer, valueSerializer) -> {
if (keySerializer != null) {
config.produce(new RunTimeConfigurationDefaultBuildItem(
"mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer));
}
if (valueSerializer != null) {
config.produce(new RunTimeConfigurationDefaultBuildItem(
"mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer));
}
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer);
});
}

Expand All @@ -117,29 +109,27 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
}

processIncomingChannelInjectionPoint(discovery, injectionPointType, (keyDeserializer, valueDeserializer) -> {
if (keyDeserializer != null) {
config.produce(new RunTimeConfigurationDefaultBuildItem(
"mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer));
}
if (valueDeserializer != null) {
config.produce(new RunTimeConfigurationDefaultBuildItem(
"mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer));
}
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer);
});

processOutgoingChannelInjectionPoint(discovery, injectionPointType, (keySerializer, valueSerializer) -> {
if (keySerializer != null) {
config.produce(new RunTimeConfigurationDefaultBuildItem(
"mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer));
}
if (valueSerializer != null) {
config.produce(new RunTimeConfigurationDefaultBuildItem(
"mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer));
}
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer);
});
}
}

void produceRuntimeConfigurationDefaultBuildItem(DefaultSerdeDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config, String key, String value) {
discovery.runIfConfigIsAbsent(key, value,
() -> config.produce(new RunTimeConfigurationDefaultBuildItem(key, value)));
}

private void processIncomingMethod(DefaultSerdeDiscoveryState discovery, MethodInfo method,
BiConsumer<String, String> deserializerAcceptor) {
List<Type> parameterTypes = method.parameters();
Expand Down

0 comments on commit 366573d

Please sign in to comment.