From c919ccdfe2eea6f43b2466b16b8f1dfc03c26fc6 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Tue, 27 Apr 2021 14:49:09 +0200 Subject: [PATCH] Autodetect Kafka serializer/deserializer with Reactive Messaging --- docs/src/main/asciidoc/kafka.adoc | 90 +- .../client/deployment/KafkaProcessor.java | 1 - .../deployment/pom.xml | 21 + .../DefaultSerdeDiscoveryState.java | 128 ++ .../kafka/deployment/DotNames.java | 37 + ...ReactiveMessagingKafkaBuildTimeConfig.java | 14 + ...allRyeReactiveMessagingKafkaProcessor.java | 515 +++++ .../deployment/DefaultSerdeConfigTest.java | 1980 +++++++++++++++++ .../io/quarkus/it/kafka/PersonSerializer.java | 6 + .../src/main/resources/application.properties | 2 - 10 files changed, 2783 insertions(+), 11 deletions(-) create mode 100644 extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java create mode 100644 extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java create mode 100644 extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/ReactiveMessagingKafkaBuildTimeConfig.java create mode 100644 extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java create mode 100644 integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PersonSerializer.java diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 37471ab56a4a5..9c96b97477766 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -262,12 +262,12 @@ kafka.bootstrap.servers=localhost:9092 # Configure the Kafka sink (we write to it) mp.messaging.outgoing.generated-price.connector=smallrye-kafka mp.messaging.outgoing.generated-price.topic=prices -mp.messaging.incoming.prices.health-readiness-enabled=false mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer # Configure the Kafka source (we read from it) mp.messaging.incoming.prices.connector=smallrye-kafka mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer +mp.messaging.incoming.prices.health-readiness-enabled=false ---- More details about this configuration is available on the https://kafka.apache.org/documentation/#producerconfigs[Producer configuration] and https://kafka.apache.org/documentation/#consumerconfigs[Consumer configuration] section from the Kafka documentation. These properties are configured with the prefix `kafka`. @@ -495,6 +495,7 @@ To do this, we will need to setup JSON serialization with Jackson or JSON-B. NOTE: With JSON serialization correctly configured, you can also use `Publisher` and `Emitter`. +[[jackson-serialization]] === Serializing via Jackson First, you need to include the `quarkus-jackson` extension (if you already use the `quarkus-resteasy-jackson` extension, this is not needed). @@ -507,9 +508,10 @@ First, you need to include the `quarkus-jackson` extension (if you already use t ---- -There is an existing `ObjectMapperSerializer` that can be used to serialize all pojos via Jackson, -but the corresponding deserializer is generic, so it needs to be subclassed. +There is an existing `ObjectMapperSerializer` that can be used to serialize all pojos via Jackson. +You may create an empty subclass if you want to use <>. +The corresponding deserializer class needs to be subclassed. So, let's create a `FruitDeserializer` that extends the `ObjectMapperDeserializer`. [source,java] @@ -519,7 +521,7 @@ package com.acme.fruit.jackson; import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer; public class FruitDeserializer extends ObjectMapperDeserializer { - public FruitDeserializer(){ + public FruitDeserializer() { // pass the class to the parent. super(Fruit.class); } @@ -543,6 +545,7 @@ mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.seriali Now, your Kafka messages will contain a Jackson serialized representation of your Fruit pojo. +[[jsonb-serialization]] === Serializing via JSON-B First, you need to include the `quarkus-jsonb` extension (if you already use the `quarkus-resteasy-jsonb` extension, this is not needed). @@ -555,9 +558,10 @@ First, you need to include the `quarkus-jsonb` extension (if you already use the ---- -There is an existing `JsonbSerializer` that can be used to serialize all pojos via JSON-B, -but the corresponding deserializer is generic, so it needs to be subclassed. +There is an existing `JsonbSerializer` that can be used to serialize all pojos via JSON-B. +You may create an empty subclass if you want to use <>. +The corresponding deserializer class needs to be subclassed. So, let's create a `FruitDeserializer` that extends the generic `JsonbDeserializer`. [source,java] @@ -567,7 +571,7 @@ package com.acme.fruit.jsonb; import io.quarkus.kafka.client.serialization.JsonbDeserializer; public class FruitDeserializer extends JsonbDeserializer { - public FruitDeserializer(){ + public FruitDeserializer() { // pass the class to the parent. super(Fruit.class); } @@ -575,7 +579,7 @@ public class FruitDeserializer extends JsonbDeserializer { ---- NOTE: If you don't want to create a deserializer for each of your pojo, you can use the generic `io.vertx.kafka.client.serialization.JsonObjectDeserializer` -that will deserialize to a `javax.json.JsonObject`. The corresponding serializer can also be used: `io.vertx.kafka.client.serialization.JsonObjectSerializer`. +that will deserialize to a `io.vertx.core.json.JsonObject`. The corresponding serializer can also be used: `io.vertx.kafka.client.serialization.JsonObjectSerializer`. Finally, configure your streams to use the JSON-B serializer and deserializer. @@ -629,6 +633,76 @@ public class FruitResource { } ---- +== Avro serialization + +This is described in a dedicated guide: link:kafka-schema-registry-avro.adoc[Using Apache Kafka with Schema Registry and Avro]. + +[[serialization-autodetection]] +== Serializer/deserializer autodetection + +When using SmallRye Reactive Messaging with Kafka, Quarkus can often automatically detect the correct serializer and deserializer class. +This autodetection is based on declarations of `@Incoming` and `@Outgoing` methods, as well as injected ``@Channel``s. + +For example, if you declare + +[source,java] +---- +@Outgoing("generated-price") +public Multi generate() { + ... +} +---- + +and your configuration indicates that the `generated-price` channel uses the `smallrye-kafka` connector, then Quarkus will automatically set the `value.serializer` to Kafka's built-in `IntegerSerializer`. + +Similarly, if you declare + +[source,java] +---- +@Incoming("my-kafka-records") +public void consume(KafkaRecord record) { + ... +} +---- + +and your configuration indicates that the `my-kafka-records` channel uses the `smallrye-kafka` connector, then Quarkus will automatically set the `key.deserializer` to Kafka's built-in `LongDeserializer`, as well as the `value.deserializer` to `ByteArrayDeserializer`. + +Finally, if you declare + +[source,java] +---- +@Inject +@Channel("price-create") +Emitter priceEmitter; +---- + +and your configuration indicates that the `price-create` channel uses the `smallrye-kafka` connector, then Quarkus will automatically set the `value.serializer` to Kafka's built-in `DoubleSerializer`. + +The full set of types supported by the serializer/deserializer autodetection is: + +* `short` and `java.lang.Short` +* `int` and `java.lang.Integer` +* `long` and `java.lang.Long` +* `float` and `java.lang.Float` +* `double` and `java.lang.Double` +* `byte[]` +* `java.lang.String` +* `java.util.UUID` +* `java.nio.ByteBuffer` +* `org.apache.kafka.common.utils.Bytes` +* `io.vertx.core.buffer.Buffer` +* `io.vertx.core.json.JsonObject` +* `io.vertx.core.json.JsonArray` +* classes generated from Avro schemas, if Confluent or Apicurio _serde_ is present +** see link:kafka-schema-registry-avro.adoc[Using Apache Kafka with Schema Registry and Avro] for more information about using Confluent or Apicurio libraries +* classes for which a subclass of `ObjectMapperSerializer` / `ObjectMapperDeserializer` is present, as described in <> +** it is technically not needed to subclass `ObjectMapperSerializer`, but in such case, autodetection isn't possible +* classes for which a subclass of `JsonbSerializer` / `JsonbDeserializer` is present, as described in <> +** it is technically not needed to subclass `JsonbSerializer`, but in such case, autodetection isn't possible + +In case you have any issues with serializer autodetection, you can switch it off completely by setting `quarkus.reactive-messaging.kafka.serializer-autodetection.enabled=false`. +If you find you need to do this, please file a bug in the link:https://github.com/quarkusio/quarkus/issues[Quarkus issue tracker] so we can fix whatever problem you have. + == Blocking processing You often need to combine Reactive Messaging with blocking processing such as database interactions. diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index 8f1561411e642..50358ffca240d 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -377,7 +377,6 @@ private void handleAvro(BuildProducer reflectiveClass, "io.apicurio.registry.serde.DefaultSchemaResolver", "io.apicurio.registry.serde.DefaultIdHandler", "io.apicurio.registry.serde.Legacy4ByteIdHandler", - "io.apicurio.registry.serde.DefaultSchemaResolver", "io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider", "io.apicurio.registry.serde.headers.DefaultHeadersHandler")); diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml b/extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml index 5ff767776ce32..44abd424e3920 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml @@ -48,6 +48,27 @@ quarkus-junit5-internal test + + org.assertj + assertj-core + test + + + io.quarkus + quarkus-avro-deployment + test + + + io.quarkus + quarkus-jsonb-deployment + test + + + io.apicurio + apicurio-registry-serdes-avro-serde + 2.0.0.Final + test + diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java new file mode 100644 index 0000000000000..f4740fe592478 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java @@ -0,0 +1,128 @@ +package io.quarkus.smallrye.reactivemessaging.kafka.deployment; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.eclipse.microprofile.config.ConfigProvider; +import org.jboss.jandex.AnnotationInstance; +import org.jboss.jandex.AnnotationTarget; +import org.jboss.jandex.ClassInfo; +import org.jboss.jandex.DotName; +import org.jboss.jandex.IndexView; +import org.jboss.jandex.Type; + +import io.smallrye.reactive.messaging.kafka.KafkaConnector; + +class DefaultSerdeDiscoveryState { + private final IndexView index; + + private final Map isKafkaConnector = new HashMap<>(); + + private Boolean hasConfluent; + private Boolean hasApicurio1; + private Boolean hasApicurio2; + private Boolean hasJsonb; + + DefaultSerdeDiscoveryState(IndexView index) { + this.index = index; + } + + boolean isKafkaConnector(boolean incoming, String channelName) { + String channelType = incoming ? "incoming" : "outgoing"; + return isKafkaConnector.computeIfAbsent(channelType + "|" + channelName, ignored -> { + String connectorKey = "mp.messaging." + channelType + "." + channelName + ".connector"; + String connector = ConfigProvider.getConfig() + .getOptionalValue(connectorKey, String.class) + .orElse("ignored"); + return KafkaConnector.CONNECTOR_NAME.equals(connector); + }); + } + + boolean isAvroGenerated(DotName className) { + ClassInfo clazz = index.getClassByName(className); + return clazz != null && clazz.classAnnotation(DotNames.AVRO_GENERATED) != null; + } + + boolean hasConfluent() { + if (hasConfluent == null) { + try { + Class.forName("io.confluent.kafka.serializers.KafkaAvroDeserializer", false, + Thread.currentThread().getContextClassLoader()); + hasConfluent = true; + } catch (ClassNotFoundException e) { + hasConfluent = false; + } + } + + return hasConfluent; + } + + boolean hasApicurio1() { + if (hasApicurio1 == null) { + try { + Class.forName("io.apicurio.registry.utils.serde.AvroKafkaDeserializer", false, + Thread.currentThread().getContextClassLoader()); + hasApicurio1 = true; + } catch (ClassNotFoundException e) { + hasApicurio1 = false; + } + } + + return hasApicurio1; + } + + boolean hasApicurio2() { + if (hasApicurio2 == null) { + try { + Class.forName("io.apicurio.registry.serde.avro.AvroKafkaDeserializer", false, + Thread.currentThread().getContextClassLoader()); + hasApicurio2 = true; + } catch (ClassNotFoundException e) { + hasApicurio2 = false; + } + } + + return hasApicurio2; + } + + boolean hasJsonb() { + if (hasJsonb == null) { + try { + Class.forName("javax.json.bind.Jsonb", false, + Thread.currentThread().getContextClassLoader()); + hasJsonb = true; + } catch (ClassNotFoundException e) { + hasJsonb = false; + } + } + + return hasJsonb; + } + + ClassInfo getSubclassOfWithTypeArgument(DotName superclass, DotName expectedTypeArgument) { + return index.getKnownDirectSubclasses(superclass) + .stream() + .filter(it -> it.superClassType().kind() == Type.Kind.PARAMETERIZED_TYPE + && it.superClassType().asParameterizedType().arguments().size() == 1 + && it.superClassType().asParameterizedType().arguments().get(0).name().equals(expectedTypeArgument)) + .findAny() + .orElse(null); + } + + List findAnnotationsOnMethods(DotName annotation) { + return index.getAnnotations(annotation) + .stream() + .filter(it -> it.target().kind() == AnnotationTarget.Kind.METHOD) + .collect(Collectors.toList()); + } + + List findAnnotationsOnInjectionPoints(DotName annotation) { + return index.getAnnotations(annotation) + .stream() + .filter(it -> it.target().kind() == AnnotationTarget.Kind.FIELD + || it.target().kind() == AnnotationTarget.Kind.METHOD_PARAMETER) + .collect(Collectors.toList()); + } +} diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java new file mode 100644 index 0000000000000..8381663444fdb --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java @@ -0,0 +1,37 @@ +package io.quarkus.smallrye.reactivemessaging.kafka.deployment; + +import org.jboss.jandex.DotName; + +final class DotNames { + // @formatter:off + static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName()); + static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName()); + static final DotName CHANNEL = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Channel.class.getName()); + + static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName()); + static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName()); + + static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName()); + static final DotName KAFKA_RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.KafkaRecord.class.getName()); + static final DotName RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.Record.class.getName()); + static final DotName CONSUMER_RECORD = DotName.createSimple(org.apache.kafka.clients.consumer.ConsumerRecord.class.getName()); + static final DotName PRODUCER_RECORD = DotName.createSimple(org.apache.kafka.clients.producer.ProducerRecord.class.getName()); + + static final DotName COMPLETION_STAGE = DotName.createSimple(java.util.concurrent.CompletionStage.class.getName()); + static final DotName UNI = DotName.createSimple(io.smallrye.mutiny.Uni.class.getName()); + + static final DotName SUBSCRIBER = DotName.createSimple(org.reactivestreams.Subscriber.class.getName()); + static final DotName SUBSCRIBER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder.class.getName()); + static final DotName PUBLISHER = DotName.createSimple(org.reactivestreams.Publisher.class.getName()); + static final DotName PUBLISHER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder.class.getName()); + static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName()); + static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName()); + static final DotName MULTI = DotName.createSimple(io.smallrye.mutiny.Multi.class.getName()); + + static final DotName AVRO_GENERATED = DotName.createSimple("org.apache.avro.specific.AvroGenerated"); + static final DotName OBJECT_MAPPER_DESERIALIZER = DotName.createSimple(io.quarkus.kafka.client.serialization.ObjectMapperDeserializer.class.getName()); + static final DotName OBJECT_MAPPER_SERIALIZER = DotName.createSimple(io.quarkus.kafka.client.serialization.ObjectMapperSerializer.class.getName()); + static final DotName JSONB_DESERIALIZER = DotName.createSimple(io.quarkus.kafka.client.serialization.JsonbDeserializer.class.getName()); + static final DotName JSONB_SERIALIZER = DotName.createSimple(io.quarkus.kafka.client.serialization.JsonbSerializer.class.getName()); + // @formatter:on +} diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/ReactiveMessagingKafkaBuildTimeConfig.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/ReactiveMessagingKafkaBuildTimeConfig.java new file mode 100644 index 0000000000000..f02383ea6b5a3 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/ReactiveMessagingKafkaBuildTimeConfig.java @@ -0,0 +1,14 @@ +package io.quarkus.smallrye.reactivemessaging.kafka.deployment; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "reactive-messaging.kafka", phase = ConfigPhase.BUILD_TIME) +public class ReactiveMessagingKafkaBuildTimeConfig { + /** + * Whether or not Kafka serializer/deserializer autodetection is enabled. + */ + @ConfigItem(name = "serializer-autodetection.enabled", defaultValue = "true") + public boolean serializerAutodetectionEnabled; +} diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 3dbf037eb5b1a..64694bd764b1d 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -1,9 +1,22 @@ package io.quarkus.smallrye.reactivemessaging.kafka.deployment; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import org.jboss.jandex.AnnotationInstance; +import org.jboss.jandex.ClassInfo; +import org.jboss.jandex.DotName; +import org.jboss.jandex.MethodInfo; +import org.jboss.jandex.MethodParameterInfo; +import org.jboss.jandex.Type; + import io.quarkus.deployment.Feature; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl; @@ -26,4 +39,506 @@ public void build(BuildProducer reflectiveClass) { .build()); } + @BuildStep + public void defaultSerdeConfig(ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig, + CombinedIndexBuildItem combinedIndex, + BuildProducer defaultConfigProducer) { + + if (!buildTimeConfig.serializerAutodetectionEnabled) { + return; + } + + DefaultSerdeDiscoveryState discoveryState = new DefaultSerdeDiscoveryState(combinedIndex.getIndex()); + + discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer); + } + + // visible for testing + void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery, + BuildProducer config) { + for (AnnotationInstance annotation : discovery.findAnnotationsOnMethods(DotNames.INCOMING)) { + String channelName = annotation.value().asString(); + if (!discovery.isKafkaConnector(true, channelName)) { + continue; + } + + MethodInfo method = annotation.target().asMethod(); + + processIncomingMethod(discovery, method, (keyDeserializer, valueDeserializer) -> { + if (keyDeserializer != null) { + config.produce(new RunTimeConfigurationDefaultBuildItem( + "mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer)); + } + if (valueDeserializer != null) { + config.produce(new RunTimeConfigurationDefaultBuildItem( + "mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer)); + } + }); + } + + for (AnnotationInstance annotation : discovery.findAnnotationsOnMethods(DotNames.OUTGOING)) { + String channelName = annotation.value().asString(); + if (!discovery.isKafkaConnector(false, channelName)) { + continue; + } + + MethodInfo method = annotation.target().asMethod(); + + processOutgoingMethod(discovery, method, (keySerializer, valueSerializer) -> { + if (keySerializer != null) { + config.produce(new RunTimeConfigurationDefaultBuildItem( + "mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer)); + } + if (valueSerializer != null) { + config.produce(new RunTimeConfigurationDefaultBuildItem( + "mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer)); + } + }); + } + + for (AnnotationInstance annotation : discovery.findAnnotationsOnInjectionPoints(DotNames.CHANNEL)) { + String channelName = annotation.value().asString(); + if (!discovery.isKafkaConnector(false, channelName)) { + continue; + } + + Type injectionPointType; + switch (annotation.target().kind()) { + case FIELD: + injectionPointType = annotation.target().asField().type(); + break; + case METHOD_PARAMETER: + MethodParameterInfo parameter = annotation.target().asMethodParameter(); + injectionPointType = parameter.method().parameters().get(parameter.position()); + break; + default: + continue; + } + + processIncomingChannelInjectionPoint(discovery, injectionPointType, (keyDeserializer, valueDeserializer) -> { + if (keyDeserializer != null) { + config.produce(new RunTimeConfigurationDefaultBuildItem( + "mp.messaging.incoming." + channelName + ".key.deserializer", keyDeserializer)); + } + if (valueDeserializer != null) { + config.produce(new RunTimeConfigurationDefaultBuildItem( + "mp.messaging.incoming." + channelName + ".value.deserializer", valueDeserializer)); + } + }); + + processOutgoingChannelInjectionPoint(discovery, injectionPointType, (keySerializer, valueSerializer) -> { + if (keySerializer != null) { + config.produce(new RunTimeConfigurationDefaultBuildItem( + "mp.messaging.outgoing." + channelName + ".key.serializer", keySerializer)); + } + if (valueSerializer != null) { + config.produce(new RunTimeConfigurationDefaultBuildItem( + "mp.messaging.outgoing." + channelName + ".value.serializer", valueSerializer)); + } + }); + } + } + + private void processIncomingMethod(DefaultSerdeDiscoveryState discovery, MethodInfo method, + BiConsumer deserializerAcceptor) { + List parameterTypes = method.parameters(); + int parametersCount = parameterTypes.size(); + Type returnType = method.returnType(); + + Type incomingType = null; + + // @Incoming + if ((isVoid(returnType) && parametersCount == 1) + || (isCompletionStage(returnType) && parametersCount == 1) + || (isUni(returnType) && parametersCount == 1)) { + incomingType = parameterTypes.get(0); + } else if ((isSubscriber(returnType) && parametersCount == 0) + || (isSubscriberBuilder(returnType) && parametersCount == 0)) { + incomingType = returnType.asParameterizedType().arguments().get(0); + } + + // @Incoming @Outgoing + if (method.hasAnnotation(DotNames.OUTGOING)) { + if ((isCompletionStage(returnType) && parametersCount == 1) + || (isUni(returnType) && parametersCount == 1) + || (isPublisher(returnType) && parametersCount == 1) + || (isPublisherBuilder(returnType) && parametersCount == 1) + || (isMulti(returnType) && parametersCount == 1)) { + incomingType = parameterTypes.get(0); + } else if ((isProcessor(returnType) && parametersCount == 0) + || (isProcessorBuilder(returnType) && parametersCount == 0)) { + incomingType = returnType.asParameterizedType().arguments().get(0); + } else if (parametersCount == 1) { + incomingType = parameterTypes.get(0); + } + + // @Incoming @Outgoing stream manipulation + if (incomingType != null + && (isPublisher(incomingType) || isPublisherBuilder(incomingType) || isMulti(incomingType))) { + incomingType = incomingType.asParameterizedType().arguments().get(0); + } + } + + processIncomingType(discovery, incomingType, deserializerAcceptor); + } + + private void processIncomingChannelInjectionPoint(DefaultSerdeDiscoveryState discovery, Type injectionPointType, + BiConsumer deserializerAcceptor) { + Type incomingType = null; + + if (isPublisher(injectionPointType) || isPublisherBuilder(injectionPointType) || isMulti(injectionPointType)) { + incomingType = injectionPointType.asParameterizedType().arguments().get(0); + } + + processIncomingType(discovery, incomingType, deserializerAcceptor); + } + + private void processIncomingType(DefaultSerdeDiscoveryState discovery, Type incomingType, + BiConsumer deserializerAcceptor) { + if (incomingType == null) { + return; + } + + if (isMessage(incomingType)) { + List typeArguments = incomingType.asParameterizedType().arguments(); + String deserializer = deserializerFor(discovery, typeArguments.get(0)); + deserializerAcceptor.accept(null, deserializer); + } else if (isKafkaRecord(incomingType) || isRecord(incomingType) || isConsumerRecord(incomingType)) { + List typeArguments = incomingType.asParameterizedType().arguments(); + String keyDeserializer = deserializerFor(discovery, typeArguments.get(0)); + String valueDeserializer = deserializerFor(discovery, typeArguments.get(1)); + deserializerAcceptor.accept(keyDeserializer, valueDeserializer); + } else if (isRawMessage(incomingType)) { + String deserializer = deserializerFor(discovery, incomingType); + deserializerAcceptor.accept(null, deserializer); + } + } + + private void processOutgoingMethod(DefaultSerdeDiscoveryState discovery, MethodInfo method, + BiConsumer serializerAcceptor) { + List parameterTypes = method.parameters(); + int parametersCount = parameterTypes.size(); + Type returnType = method.returnType(); + + Type outgoingType = null; + + // @Outgoing + if ((isPublisher(returnType) && parametersCount == 0) + || (isPublisherBuilder(returnType) && parametersCount == 0) + || (isMulti(returnType) && parametersCount == 0) + || (isCompletionStage(returnType) && parametersCount == 0) + || (isUni(returnType) && parametersCount == 0)) { + outgoingType = returnType.asParameterizedType().arguments().get(0); + } else if (parametersCount == 0) { + outgoingType = returnType; + } + + // @Incoming @Outgoing + if (method.hasAnnotation(DotNames.INCOMING)) { + if ((isCompletionStage(returnType) && parametersCount == 1) + || (isUni(returnType) && parametersCount == 1) + || (isPublisher(returnType) && parametersCount == 1) + || (isPublisherBuilder(returnType) && parametersCount == 1) + || (isMulti(returnType) && parametersCount == 1)) { + outgoingType = returnType.asParameterizedType().arguments().get(0); + } else if ((isProcessor(returnType) && parametersCount == 0) + || (isProcessorBuilder(returnType) && parametersCount == 0)) { + outgoingType = returnType.asParameterizedType().arguments().get(1); + } else if (parametersCount == 1) { + outgoingType = returnType; + } + + // @Incoming @Outgoing stream manipulation + if (outgoingType != null + && (isPublisher(outgoingType) || isPublisherBuilder(outgoingType) || isMulti(outgoingType))) { + outgoingType = outgoingType.asParameterizedType().arguments().get(0); + } + } + + processOutgoingType(discovery, outgoingType, serializerAcceptor); + } + + private void processOutgoingChannelInjectionPoint(DefaultSerdeDiscoveryState discovery, Type injectionPointType, + BiConsumer serializerAcceptor) { + Type outgoingType = null; + + if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType)) { + outgoingType = injectionPointType.asParameterizedType().arguments().get(0); + } + + processOutgoingType(discovery, outgoingType, serializerAcceptor); + } + + private void processOutgoingType(DefaultSerdeDiscoveryState discovery, Type outgoingType, + BiConsumer serializerAcceptor) { + if (outgoingType == null) { + return; + } + + if (isMessage(outgoingType)) { + List typeArguments = outgoingType.asParameterizedType().arguments(); + String serializer = serializerFor(discovery, typeArguments.get(0)); + serializerAcceptor.accept(null, serializer); + } else if (isKafkaRecord(outgoingType) || isRecord(outgoingType) || isProducerRecord(outgoingType)) { + List typeArguments = outgoingType.asParameterizedType().arguments(); + String keySerializer = serializerFor(discovery, typeArguments.get(0)); + String valueSerializer = serializerFor(discovery, typeArguments.get(1)); + serializerAcceptor.accept(keySerializer, valueSerializer); + } else if (isRawMessage(outgoingType)) { + String serializer = serializerFor(discovery, outgoingType); + serializerAcceptor.accept(null, serializer); + } + } + + // --- + + private static boolean isVoid(Type type) { + return type.kind() == Type.Kind.VOID; + } + + private static boolean isCompletionStage(Type type) { + // raw type CompletionStage is wrong, must be CompletionStage + return DotNames.COMPLETION_STAGE.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isUni(Type type) { + // raw type Uni is wrong, must be Uni + return DotNames.UNI.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isMulti(Type type) { + // raw type Multi is wrong, must be Multi + return DotNames.MULTI.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isSubscriber(Type type) { + // raw type Subscriber is wrong, must be Subscriber + return DotNames.SUBSCRIBER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isSubscriberBuilder(Type type) { + // raw type SubscriberBuilder is wrong, must be SubscriberBuilder + return DotNames.SUBSCRIBER_BUILDER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + private static boolean isPublisher(Type type) { + // raw type Publisher is wrong, must be Publisher + return DotNames.PUBLISHER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isPublisherBuilder(Type type) { + // raw type PublisherBuilder is wrong, must be PublisherBuilder + return DotNames.PUBLISHER_BUILDER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isProcessor(Type type) { + // raw type Processor is wrong, must be Processor + return DotNames.PROCESSOR.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + private static boolean isProcessorBuilder(Type type) { + // raw type ProcessorBuilder is wrong, must be ProcessorBuilder + return DotNames.PROCESSOR_BUILDER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + // --- + + private static boolean isEmitter(Type type) { + // raw type Emitter is wrong, must be Emitter + return DotNames.EMITTER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isMutinyEmitter(Type type) { + // raw type MutinyEmitter is wrong, must be MutinyEmitter + return DotNames.MUTINY_EMITTER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + // --- + + private static boolean isMessage(Type type) { + // raw type Message is wrong, must be Message + return DotNames.MESSAGE.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + + private static boolean isKafkaRecord(Type type) { + // raw type KafkaRecord is wrong, must be KafkaRecord + return DotNames.KAFKA_RECORD.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + private static boolean isRecord(Type type) { + // raw type Record is wrong, must be Record + return DotNames.RECORD.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + private static boolean isConsumerRecord(Type type) { + // raw type ConsumerRecord is wrong, must be ConsumerRecord + return DotNames.CONSUMER_RECORD.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + private static boolean isProducerRecord(Type type) { + // raw type ProducerRecord is wrong, must be ProducerRecord + return DotNames.PRODUCER_RECORD.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + + private static boolean isRawMessage(Type type) { + switch (type.kind()) { + case PRIMITIVE: + case CLASS: + case ARRAY: + return true; + default: + return false; + } + } + + // --- + + // @formatter:off + private static final Map KNOWN_DESERIALIZERS = Map.ofEntries( + // Java types with built-in Kafka deserializer + // primitives + Map.entry(DotName.createSimple("short"), org.apache.kafka.common.serialization.ShortDeserializer.class.getName()), + Map.entry(DotName.createSimple("int"), org.apache.kafka.common.serialization.IntegerDeserializer.class.getName()), + Map.entry(DotName.createSimple("long"), org.apache.kafka.common.serialization.LongDeserializer.class.getName()), + Map.entry(DotName.createSimple("float"), org.apache.kafka.common.serialization.FloatDeserializer.class.getName()), + Map.entry(DotName.createSimple("double"), org.apache.kafka.common.serialization.DoubleDeserializer.class.getName()), + // primitive wrappers + Map.entry(DotName.createSimple(java.lang.Short.class.getName()), org.apache.kafka.common.serialization.ShortDeserializer.class.getName()), + Map.entry(DotName.createSimple(java.lang.Integer.class.getName()), org.apache.kafka.common.serialization.IntegerDeserializer.class.getName()), + Map.entry(DotName.createSimple(java.lang.Long.class.getName()), org.apache.kafka.common.serialization.LongDeserializer.class.getName()), + Map.entry(DotName.createSimple(java.lang.Float.class.getName()), org.apache.kafka.common.serialization.FloatDeserializer.class.getName()), + Map.entry(DotName.createSimple(java.lang.Double.class.getName()), org.apache.kafka.common.serialization.DoubleDeserializer.class.getName()), + // arrays + Map.entry(DotName.createSimple("[B"), org.apache.kafka.common.serialization.ByteArrayDeserializer.class.getName()), + // other + Map.entry(DotName.createSimple(java.lang.Void.class.getName()), org.apache.kafka.common.serialization.VoidDeserializer.class.getName()), + Map.entry(DotName.createSimple(java.lang.String.class.getName()), org.apache.kafka.common.serialization.StringDeserializer.class.getName()), + Map.entry(DotName.createSimple(java.util.UUID.class.getName()), org.apache.kafka.common.serialization.UUIDDeserializer.class.getName()), + Map.entry(DotName.createSimple(java.nio.ByteBuffer.class.getName()), org.apache.kafka.common.serialization.ByteBufferDeserializer.class.getName()), + // Kafka types + Map.entry(DotName.createSimple(org.apache.kafka.common.utils.Bytes.class.getName()), org.apache.kafka.common.serialization.BytesDeserializer.class.getName()), + // Vert.x types + Map.entry(DotName.createSimple(io.vertx.core.buffer.Buffer.class.getName()), io.vertx.kafka.client.serialization.BufferDeserializer.class.getName()), + Map.entry(DotName.createSimple(io.vertx.core.json.JsonObject.class.getName()), io.vertx.kafka.client.serialization.JsonObjectDeserializer.class.getName()), + Map.entry(DotName.createSimple(io.vertx.core.json.JsonArray.class.getName()), io.vertx.kafka.client.serialization.JsonArrayDeserializer.class.getName()) + ); + + private static final Map KNOWN_SERIALIZERS = Map.ofEntries( + // Java types with built-in Kafka serializer + // primitives + Map.entry(DotName.createSimple("short"), org.apache.kafka.common.serialization.ShortSerializer.class.getName()), + Map.entry(DotName.createSimple("int"), org.apache.kafka.common.serialization.IntegerSerializer.class.getName()), + Map.entry(DotName.createSimple("long"), org.apache.kafka.common.serialization.LongSerializer.class.getName()), + Map.entry(DotName.createSimple("float"), org.apache.kafka.common.serialization.FloatSerializer.class.getName()), + Map.entry(DotName.createSimple("double"), org.apache.kafka.common.serialization.DoubleSerializer.class.getName()), + // primitives wrappers + Map.entry(DotName.createSimple(java.lang.Short.class.getName()), org.apache.kafka.common.serialization.ShortSerializer.class.getName()), + Map.entry(DotName.createSimple(java.lang.Integer.class.getName()), org.apache.kafka.common.serialization.IntegerSerializer.class.getName()), + Map.entry(DotName.createSimple(java.lang.Long.class.getName()), org.apache.kafka.common.serialization.LongSerializer.class.getName()), + Map.entry(DotName.createSimple(java.lang.Float.class.getName()), org.apache.kafka.common.serialization.FloatSerializer.class.getName()), + Map.entry(DotName.createSimple(java.lang.Double.class.getName()), org.apache.kafka.common.serialization.DoubleSerializer.class.getName()), + // arrays + Map.entry(DotName.createSimple("[B"), org.apache.kafka.common.serialization.ByteArraySerializer.class.getName()), + // other + Map.entry(DotName.createSimple(java.lang.Void.class.getName()), org.apache.kafka.common.serialization.VoidSerializer.class.getName()), + Map.entry(DotName.createSimple(java.lang.String.class.getName()), org.apache.kafka.common.serialization.StringSerializer.class.getName()), + Map.entry(DotName.createSimple(java.util.UUID.class.getName()), org.apache.kafka.common.serialization.UUIDSerializer.class.getName()), + Map.entry(DotName.createSimple(java.nio.ByteBuffer.class.getName()), org.apache.kafka.common.serialization.ByteBufferSerializer.class.getName()), + // Kafka types + Map.entry(DotName.createSimple(org.apache.kafka.common.utils.Bytes.class.getName()), org.apache.kafka.common.serialization.BytesSerializer.class.getName()), + // Vert.x types + Map.entry(DotName.createSimple(io.vertx.core.buffer.Buffer.class.getName()), io.vertx.kafka.client.serialization.BufferSerializer.class.getName()), + Map.entry(DotName.createSimple(io.vertx.core.json.JsonObject.class.getName()), io.vertx.kafka.client.serialization.JsonObjectSerializer.class.getName()), + Map.entry(DotName.createSimple(io.vertx.core.json.JsonArray.class.getName()), io.vertx.kafka.client.serialization.JsonArraySerializer.class.getName()) + ); + // @formatter:on + + private String deserializerFor(DefaultSerdeDiscoveryState discovery, Type type) { + return serializerDeserializerFor(discovery, type, false); + } + + private String serializerFor(DefaultSerdeDiscoveryState discovery, Type type) { + return serializerDeserializerFor(discovery, type, true); + } + + private String serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, Type type, boolean serializer) { + DotName typeName = type.name(); + + // statically known serializer/deserializer + Map map = serializer ? KNOWN_SERIALIZERS : KNOWN_DESERIALIZERS; + if (map.containsKey(typeName)) { + return map.get(typeName); + } + + // Avro generated class (serializer/deserializer provided by Confluent or Apicurio) + if (discovery.isAvroGenerated(typeName)) { + if (discovery.hasConfluent()) { + return serializer + ? "io.confluent.kafka.serializers.KafkaAvroSerializer" + : "io.confluent.kafka.serializers.KafkaAvroDeserializer"; + } else if (discovery.hasApicurio1()) { + return serializer + ? "io.apicurio.registry.utils.serde.AvroKafkaSerializer" + : "io.apicurio.registry.utils.serde.AvroKafkaDeserializer"; + } else if (discovery.hasApicurio2()) { + return serializer + ? "io.apicurio.registry.serde.avro.AvroKafkaSerializer" + : "io.apicurio.registry.serde.avro.AvroKafkaDeserializer"; + } + } + + // Jackson-based serializer/deserializer + // note that Jackson is always present with Kafka, so no need to check + { + ClassInfo subclass = discovery.getSubclassOfWithTypeArgument( + serializer ? DotNames.OBJECT_MAPPER_SERIALIZER : DotNames.OBJECT_MAPPER_DESERIALIZER, typeName); + if (subclass != null) { + return subclass.name().toString(); + } + } + + // Jsonb-based serializer/deserializer + if (discovery.hasJsonb()) { + ClassInfo subclass = discovery.getSubclassOfWithTypeArgument( + serializer ? DotNames.JSONB_SERIALIZER : DotNames.JSONB_DESERIALIZER, typeName); + if (subclass != null) { + return subclass.name().toString(); + } + } + + // unknown + return null; + } } diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java new file mode 100644 index 0000000000000..b4c3dcfe3c3af --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java @@ -0,0 +1,1980 @@ +package io.quarkus.smallrye.reactivemessaging.kafka.deployment; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.groups.Tuple.tuple; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionStage; + +import javax.inject.Inject; + +import org.apache.avro.specific.AvroGenerated; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.assertj.core.groups.Tuple; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder; +import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; +import org.jboss.jandex.IndexView; +import org.jboss.jandex.Indexer; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; +import io.quarkus.kafka.client.serialization.JsonbSerializer; +import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.MutinyEmitter; +import io.smallrye.reactive.messaging.kafka.KafkaRecord; +import io.smallrye.reactive.messaging.kafka.Record; +import io.vertx.core.json.JsonArray; + +public class DefaultSerdeConfigTest { + private static void doTest(Tuple[] expectations, Class... classesToIndex) { + List configs = new ArrayList<>(); + + DefaultSerdeDiscoveryState discovery = new DefaultSerdeDiscoveryState(index(classesToIndex)) { + @Override + boolean isKafkaConnector(boolean incoming, String channelName) { + return true; + } + }; + new SmallRyeReactiveMessagingKafkaProcessor().discoverDefaultSerdeConfig(discovery, configs::add); + + assertThat(configs) + .extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue) + .containsOnly(expectations); + } + + private static IndexView index(Class... classes) { + Indexer indexer = new Indexer(); + for (Class clazz : classes) { + try { + try (InputStream stream = DefaultSerdeConfigTest.class.getClassLoader() + .getResourceAsStream(clazz.getName().replace('.', '/') + ".class")) { + indexer.index(stream); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return indexer.complete(); + } + + // --- + + @Test + public void stringInLongOut() { + // @formatter:off + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel2.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel3.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel4.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel5.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel6.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel7.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel8.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel9.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel10.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel11.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.channel12.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + + tuple("mp.messaging.incoming.channel13.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.incoming.channel14.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.incoming.channel15.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.incoming.channel16.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.incoming.channel17.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.incoming.channel18.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.incoming.channel19.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.incoming.channel20.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.incoming.channel21.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + + tuple("mp.messaging.incoming.channel22.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel23.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel24.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel25.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel26.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel27.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel28.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel29.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel30.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel31.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel32.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel33.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel34.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel35.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel36.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel37.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel38.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel39.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel40.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel41.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel42.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel43.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel44.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel45.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel46.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel47.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel48.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel49.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel50.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel51.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel52.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel53.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + + tuple("mp.messaging.incoming.channel54.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel55.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel56.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel57.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel58.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel59.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel60.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel61.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel62.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel63.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.incoming.channel64.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.channel65.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + }; + // @formatter:on + + doTest(expectations, StringInLongOut.class); + } + + private static class StringInLongOut { + // @Outgoing + + @Outgoing("channel1") + Publisher> method1() { + return null; + } + + @Outgoing("channel2") + Publisher method2() { + return null; + } + + @Outgoing("channel3") + PublisherBuilder> method3() { + return null; + } + + @Outgoing("channel4") + PublisherBuilder method4() { + return null; + } + + @Outgoing("channel5") + Multi> method5() { + return null; + } + + @Outgoing("channel6") + Multi method6() { + return null; + } + + @Outgoing("channel7") + Message method7() { + return null; + } + + @Outgoing("channel8") + Long method8() { + return null; + } + + @Outgoing("channel9") + CompletionStage> method9() { + return null; + } + + @Outgoing("channel10") + CompletionStage method10() { + return null; + } + + @Outgoing("channel11") + Uni> method11() { + return null; + } + + @Outgoing("channel12") + Uni method12() { + return null; + } + + // @Incoming + + @Incoming("channel13") + Subscriber> method13() { + return null; + } + + @Incoming("channel14") + Subscriber method14() { + return null; + } + + @Incoming("channel15") + SubscriberBuilder, Void> method15() { + return null; + } + + @Incoming("channel16") + SubscriberBuilder method16() { + return null; + } + + @Incoming("channel17") + void method17(String msg) { + } + + @Incoming("channel18") + CompletionStage method18(Message msg) { + return null; + } + + @Incoming("channel19") + CompletionStage method19(String payload) { + return null; + } + + @Incoming("channel20") + Uni method20(Message msg) { + return null; + } + + @Incoming("channel21") + Uni method21(String payload) { + return null; + } + + // @Incoming @Outgoing + + @Incoming("channel22") + @Outgoing("channel23") + Processor, Message> method22() { + return null; + } + + @Incoming("channel24") + @Outgoing("channel25") + Processor method23() { + return null; + } + + @Incoming("channel26") + @Outgoing("channel27") + ProcessorBuilder, Message> method24() { + return null; + } + + @Incoming("channel28") + @Outgoing("channel29") + ProcessorBuilder method25() { + return null; + } + + @Incoming("channel30") + @Outgoing("channel31") + Publisher> method26(Message msg) { + return null; + } + + @Incoming("channel32") + @Outgoing("channel33") + Publisher method27(String payload) { + return null; + } + + @Incoming("channel34") + @Outgoing("channel35") + PublisherBuilder> method28(Message msg) { + return null; + } + + @Incoming("channel36") + @Outgoing("channel37") + PublisherBuilder method29(String payload) { + return null; + } + + @Incoming("channel38") + @Outgoing("channel39") + Multi> method30(Message msg) { + return null; + } + + @Incoming("channel40") + @Outgoing("channel41") + Multi method31(String payload) { + return null; + } + + @Incoming("channel42") + @Outgoing("channel43") + Message method32(Message msg) { + return null; + } + + @Incoming("channel44") + @Outgoing("channel45") + Long method33(String payload) { + return null; + } + + @Incoming("channel46") + @Outgoing("channel47") + CompletionStage> method34(Message msg) { + return null; + } + + @Incoming("channel48") + @Outgoing("channel49") + CompletionStage method35(String payload) { + return null; + } + + @Incoming("channel50") + @Outgoing("channel51") + Uni> method36(Message msg) { + return null; + } + + @Incoming("channel52") + @Outgoing("channel53") + Uni method37(String payload) { + return null; + } + + // @Incoming @Outgoing stream manipulation + + @Incoming("channel54") + @Outgoing("channel55") + Publisher> method38(Publisher> msg) { + return null; + } + + @Incoming("channel56") + @Outgoing("channel57") + Publisher method39(Publisher payload) { + return null; + } + + @Incoming("channel58") + @Outgoing("channel59") + PublisherBuilder> method40(PublisherBuilder> msg) { + return null; + } + + @Incoming("channel60") + @Outgoing("channel61") + PublisherBuilder method41(PublisherBuilder payload) { + return null; + } + + @Incoming("channel62") + @Outgoing("channel63") + Multi> method42(Multi> msg) { + return null; + } + + @Incoming("channel64") + @Outgoing("channel65") + Multi method43(Multi payload) { + return null; + } + } + + // --- + + @Test + public void byteArrayInAvroDtoOut() { + // @formatter:off + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel2.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel3.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel4.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel5.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel6.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel7.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel8.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel9.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel10.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel11.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.outgoing.channel12.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + + tuple("mp.messaging.incoming.channel13.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.incoming.channel14.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.incoming.channel15.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.incoming.channel16.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.incoming.channel17.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.incoming.channel18.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.incoming.channel19.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.incoming.channel20.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.incoming.channel21.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + + tuple("mp.messaging.incoming.channel22.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel23.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel24.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel25.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel26.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel27.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel28.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel29.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel30.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel31.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel32.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel33.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel34.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel35.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel36.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel37.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel38.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel39.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel40.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel41.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel42.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel43.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel44.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel45.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel46.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel47.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel48.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel49.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel50.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel51.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel52.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel53.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + + tuple("mp.messaging.incoming.channel54.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel55.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel56.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel57.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel58.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel59.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel60.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel61.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel62.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel63.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + tuple("mp.messaging.incoming.channel64.value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"), + tuple("mp.messaging.outgoing.channel65.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"), + }; + // @formatter:on + + doTest(expectations, AvroDto.class, ByteArrayInAvroDtoOut.class); + } + + // simulating an Avro-generated class, the autodetection code only looks for this annotation + @AvroGenerated + private static class AvroDto { + } + + private static class ByteArrayInAvroDtoOut { + // @Outgoing + + @Outgoing("channel1") + Publisher> method1() { + return null; + } + + @Outgoing("channel2") + Publisher method2() { + return null; + } + + @Outgoing("channel3") + PublisherBuilder> method3() { + return null; + } + + @Outgoing("channel4") + PublisherBuilder method4() { + return null; + } + + @Outgoing("channel5") + Multi> method5() { + return null; + } + + @Outgoing("channel6") + Multi method6() { + return null; + } + + @Outgoing("channel7") + Message method7() { + return null; + } + + @Outgoing("channel8") + AvroDto method8() { + return null; + } + + @Outgoing("channel9") + CompletionStage> method9() { + return null; + } + + @Outgoing("channel10") + CompletionStage method10() { + return null; + } + + @Outgoing("channel11") + Uni> method11() { + return null; + } + + @Outgoing("channel12") + Uni method12() { + return null; + } + + // @Incoming + + @Incoming("channel13") + Subscriber> method13() { + return null; + } + + @Incoming("channel14") + Subscriber method14() { + return null; + } + + @Incoming("channel15") + SubscriberBuilder, Void> method15() { + return null; + } + + @Incoming("channel16") + SubscriberBuilder method16() { + return null; + } + + @Incoming("channel17") + void method17(byte[] msg) { + } + + @Incoming("channel18") + CompletionStage method18(Message msg) { + return null; + } + + @Incoming("channel19") + CompletionStage method19(byte[] payload) { + return null; + } + + @Incoming("channel20") + Uni method20(Message msg) { + return null; + } + + @Incoming("channel21") + Uni method21(byte[] payload) { + return null; + } + + // @Incoming @Outgoing + + @Incoming("channel22") + @Outgoing("channel23") + Processor, Message> method22() { + return null; + } + + @Incoming("channel24") + @Outgoing("channel25") + Processor method23() { + return null; + } + + @Incoming("channel26") + @Outgoing("channel27") + ProcessorBuilder, Message> method24() { + return null; + } + + @Incoming("channel28") + @Outgoing("channel29") + ProcessorBuilder method25() { + return null; + } + + @Incoming("channel30") + @Outgoing("channel31") + Publisher> method26(Message msg) { + return null; + } + + @Incoming("channel32") + @Outgoing("channel33") + Publisher method27(byte[] payload) { + return null; + } + + @Incoming("channel34") + @Outgoing("channel35") + PublisherBuilder> method28(Message msg) { + return null; + } + + @Incoming("channel36") + @Outgoing("channel37") + PublisherBuilder method29(byte[] payload) { + return null; + } + + @Incoming("channel38") + @Outgoing("channel39") + Multi> method30(Message msg) { + return null; + } + + @Incoming("channel40") + @Outgoing("channel41") + Multi method31(byte[] payload) { + return null; + } + + @Incoming("channel42") + @Outgoing("channel43") + Message method32(Message msg) { + return null; + } + + @Incoming("channel44") + @Outgoing("channel45") + AvroDto method33(byte[] payload) { + return null; + } + + @Incoming("channel46") + @Outgoing("channel47") + CompletionStage> method34(Message msg) { + return null; + } + + @Incoming("channel48") + @Outgoing("channel49") + CompletionStage method35(byte[] payload) { + return null; + } + + @Incoming("channel50") + @Outgoing("channel51") + Uni> method36(Message msg) { + return null; + } + + @Incoming("channel52") + @Outgoing("channel53") + Uni method37(byte[] payload) { + return null; + } + + // @Incoming @Outgoing stream manipulation + + @Incoming("channel54") + @Outgoing("channel55") + Publisher> method38(Publisher> msg) { + return null; + } + + @Incoming("channel56") + @Outgoing("channel57") + Publisher method39(Publisher payload) { + return null; + } + + @Incoming("channel58") + @Outgoing("channel59") + PublisherBuilder> method40(PublisherBuilder> msg) { + return null; + } + + @Incoming("channel60") + @Outgoing("channel61") + PublisherBuilder method41(PublisherBuilder payload) { + return null; + } + + @Incoming("channel62") + @Outgoing("channel63") + Multi> method42(Multi> msg) { + return null; + } + + @Incoming("channel64") + @Outgoing("channel65") + Multi method43(Multi payload) { + return null; + } + } + + // --- + + @Test + public void jacksonDtoInVertxJsonObjectOut() { + // @formatter:off + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel2.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel3.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel4.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel5.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel6.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel7.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel8.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel9.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel10.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel11.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.outgoing.channel12.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + + tuple("mp.messaging.incoming.channel13.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.incoming.channel14.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.incoming.channel15.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.incoming.channel16.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.incoming.channel17.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.incoming.channel18.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.incoming.channel19.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.incoming.channel20.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.incoming.channel21.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + + tuple("mp.messaging.incoming.channel22.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel23.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel24.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel25.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel26.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel27.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel28.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel29.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel30.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel31.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel32.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel33.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel34.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel35.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel36.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel37.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel38.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel39.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel40.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel41.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel42.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel43.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel44.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel45.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel46.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel47.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel48.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel49.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel50.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel51.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel52.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel53.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + + tuple("mp.messaging.incoming.channel54.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel55.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel56.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel57.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel58.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel59.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel60.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel61.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel62.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel63.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel64.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JacksonDtoDeserializer"), + tuple("mp.messaging.outgoing.channel65.value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer"), + }; + // @formatter:on + + doTest(expectations, JacksonDto.class, JacksonDtoDeserializer.class, JacksonDtoInVertxJsonObjectOut.class); + } + + private static class JacksonDto { + } + + private static class JacksonDtoDeserializer extends ObjectMapperDeserializer { + public JacksonDtoDeserializer() { + super(JacksonDto.class); + } + + @Override + public JacksonDto deserialize(String topic, Headers headers, byte[] data) { + return null; + } + } + + private static class JacksonDtoInVertxJsonObjectOut { + // @Outgoing + + @Outgoing("channel1") + Publisher> method1() { + return null; + } + + @Outgoing("channel2") + Publisher method2() { + return null; + } + + @Outgoing("channel3") + PublisherBuilder> method3() { + return null; + } + + @Outgoing("channel4") + PublisherBuilder method4() { + return null; + } + + @Outgoing("channel5") + Multi> method5() { + return null; + } + + @Outgoing("channel6") + Multi method6() { + return null; + } + + @Outgoing("channel7") + Message method7() { + return null; + } + + @Outgoing("channel8") + io.vertx.core.json.JsonObject method8() { + return null; + } + + @Outgoing("channel9") + CompletionStage> method9() { + return null; + } + + @Outgoing("channel10") + CompletionStage method10() { + return null; + } + + @Outgoing("channel11") + Uni> method11() { + return null; + } + + @Outgoing("channel12") + Uni method12() { + return null; + } + + // @Incoming + + @Incoming("channel13") + Subscriber> method13() { + return null; + } + + @Incoming("channel14") + Subscriber method14() { + return null; + } + + @Incoming("channel15") + SubscriberBuilder, Void> method15() { + return null; + } + + @Incoming("channel16") + SubscriberBuilder method16() { + return null; + } + + @Incoming("channel17") + void method17(JacksonDto msg) { + } + + @Incoming("channel18") + CompletionStage method18(Message msg) { + return null; + } + + @Incoming("channel19") + CompletionStage method19(JacksonDto payload) { + return null; + } + + @Incoming("channel20") + Uni method20(Message msg) { + return null; + } + + @Incoming("channel21") + Uni method21(JacksonDto payload) { + return null; + } + + // @Incoming @Outgoing + + @Incoming("channel22") + @Outgoing("channel23") + Processor, Message> method22() { + return null; + } + + @Incoming("channel24") + @Outgoing("channel25") + Processor method23() { + return null; + } + + @Incoming("channel26") + @Outgoing("channel27") + ProcessorBuilder, Message> method24() { + return null; + } + + @Incoming("channel28") + @Outgoing("channel29") + ProcessorBuilder method25() { + return null; + } + + @Incoming("channel30") + @Outgoing("channel31") + Publisher> method26(Message msg) { + return null; + } + + @Incoming("channel32") + @Outgoing("channel33") + Publisher method27(JacksonDto payload) { + return null; + } + + @Incoming("channel34") + @Outgoing("channel35") + PublisherBuilder> method28(Message msg) { + return null; + } + + @Incoming("channel36") + @Outgoing("channel37") + PublisherBuilder method29(JacksonDto payload) { + return null; + } + + @Incoming("channel38") + @Outgoing("channel39") + Multi> method30(Message msg) { + return null; + } + + @Incoming("channel40") + @Outgoing("channel41") + Multi method31(JacksonDto payload) { + return null; + } + + @Incoming("channel42") + @Outgoing("channel43") + Message method32(Message msg) { + return null; + } + + @Incoming("channel44") + @Outgoing("channel45") + io.vertx.core.json.JsonObject method33(JacksonDto payload) { + return null; + } + + @Incoming("channel46") + @Outgoing("channel47") + CompletionStage> method34(Message msg) { + return null; + } + + @Incoming("channel48") + @Outgoing("channel49") + CompletionStage method35(JacksonDto payload) { + return null; + } + + @Incoming("channel50") + @Outgoing("channel51") + Uni> method36(Message msg) { + return null; + } + + @Incoming("channel52") + @Outgoing("channel53") + Uni method37(JacksonDto payload) { + return null; + } + + // @Incoming @Outgoing stream manipulation + + @Incoming("channel54") + @Outgoing("channel55") + Publisher> method38(Publisher> msg) { + return null; + } + + @Incoming("channel56") + @Outgoing("channel57") + Publisher method39(Publisher payload) { + return null; + } + + @Incoming("channel58") + @Outgoing("channel59") + PublisherBuilder> method40(PublisherBuilder> msg) { + return null; + } + + @Incoming("channel60") + @Outgoing("channel61") + PublisherBuilder method41(PublisherBuilder payload) { + return null; + } + + @Incoming("channel62") + @Outgoing("channel63") + Multi> method42(Multi> msg) { + return null; + } + + @Incoming("channel64") + @Outgoing("channel65") + Multi method43(Multi payload) { + return null; + } + } + + // --- + + @Test + public void kafkaBytesInJsonbDtoOut() { + // @formatter:off + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel2.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel3.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel4.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel5.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel6.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel7.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel8.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel9.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel10.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel11.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.outgoing.channel12.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + + tuple("mp.messaging.incoming.channel13.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.incoming.channel14.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.incoming.channel15.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.incoming.channel16.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.incoming.channel17.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.incoming.channel18.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.incoming.channel19.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.incoming.channel20.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.incoming.channel21.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + + tuple("mp.messaging.incoming.channel22.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel23.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel24.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel25.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel26.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel27.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel28.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel29.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel30.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel31.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel32.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel33.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel34.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel35.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel36.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel37.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel38.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel39.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel40.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel41.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel42.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel43.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel44.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel45.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel46.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel47.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel48.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel49.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel50.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel51.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel52.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel53.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + + tuple("mp.messaging.incoming.channel54.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel55.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel56.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel57.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel58.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel59.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel60.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel61.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel62.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel63.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + tuple("mp.messaging.incoming.channel64.value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"), + tuple("mp.messaging.outgoing.channel65.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$JsonbDtoSerializer"), + }; + // @formatter:on + + doTest(expectations, JsonbDto.class, JsonbDtoSerializer.class, KafkaBytesInJsonbDtoOut.class); + } + + private static class JsonbDto { + } + + private static class JsonbDtoSerializer extends JsonbSerializer { + @Override + public byte[] serialize(String topic, Headers headers, JsonbDto data) { + return null; + } + } + + private static class KafkaBytesInJsonbDtoOut { + // @Outgoing + + @Outgoing("channel1") + Publisher> method1() { + return null; + } + + @Outgoing("channel2") + Publisher method2() { + return null; + } + + @Outgoing("channel3") + PublisherBuilder> method3() { + return null; + } + + @Outgoing("channel4") + PublisherBuilder method4() { + return null; + } + + @Outgoing("channel5") + Multi> method5() { + return null; + } + + @Outgoing("channel6") + Multi method6() { + return null; + } + + @Outgoing("channel7") + Message method7() { + return null; + } + + @Outgoing("channel8") + JsonbDto method8() { + return null; + } + + @Outgoing("channel9") + CompletionStage> method9() { + return null; + } + + @Outgoing("channel10") + CompletionStage method10() { + return null; + } + + @Outgoing("channel11") + Uni> method11() { + return null; + } + + @Outgoing("channel12") + Uni method12() { + return null; + } + + // @Incoming + + @Incoming("channel13") + Subscriber> method13() { + return null; + } + + @Incoming("channel14") + Subscriber method14() { + return null; + } + + @Incoming("channel15") + SubscriberBuilder, Void> method15() { + return null; + } + + @Incoming("channel16") + SubscriberBuilder method16() { + return null; + } + + @Incoming("channel17") + void method17(org.apache.kafka.common.utils.Bytes msg) { + } + + @Incoming("channel18") + CompletionStage method18(Message msg) { + return null; + } + + @Incoming("channel19") + CompletionStage method19(org.apache.kafka.common.utils.Bytes payload) { + return null; + } + + @Incoming("channel20") + Uni method20(Message msg) { + return null; + } + + @Incoming("channel21") + Uni method21(org.apache.kafka.common.utils.Bytes payload) { + return null; + } + + // @Incoming @Outgoing + + @Incoming("channel22") + @Outgoing("channel23") + Processor, Message> method22() { + return null; + } + + @Incoming("channel24") + @Outgoing("channel25") + Processor method23() { + return null; + } + + @Incoming("channel26") + @Outgoing("channel27") + ProcessorBuilder, Message> method24() { + return null; + } + + @Incoming("channel28") + @Outgoing("channel29") + ProcessorBuilder method25() { + return null; + } + + @Incoming("channel30") + @Outgoing("channel31") + Publisher> method26(Message msg) { + return null; + } + + @Incoming("channel32") + @Outgoing("channel33") + Publisher method27(org.apache.kafka.common.utils.Bytes payload) { + return null; + } + + @Incoming("channel34") + @Outgoing("channel35") + PublisherBuilder> method28(Message msg) { + return null; + } + + @Incoming("channel36") + @Outgoing("channel37") + PublisherBuilder method29(org.apache.kafka.common.utils.Bytes payload) { + return null; + } + + @Incoming("channel38") + @Outgoing("channel39") + Multi> method30(Message msg) { + return null; + } + + @Incoming("channel40") + @Outgoing("channel41") + Multi method31(org.apache.kafka.common.utils.Bytes payload) { + return null; + } + + @Incoming("channel42") + @Outgoing("channel43") + Message method32(Message msg) { + return null; + } + + @Incoming("channel44") + @Outgoing("channel45") + JsonbDto method33(org.apache.kafka.common.utils.Bytes payload) { + return null; + } + + @Incoming("channel46") + @Outgoing("channel47") + CompletionStage> method34(Message msg) { + return null; + } + + @Incoming("channel48") + @Outgoing("channel49") + CompletionStage method35(org.apache.kafka.common.utils.Bytes payload) { + return null; + } + + @Incoming("channel50") + @Outgoing("channel51") + Uni> method36(Message msg) { + return null; + } + + @Incoming("channel52") + @Outgoing("channel53") + Uni method37(org.apache.kafka.common.utils.Bytes payload) { + return null; + } + + // @Incoming @Outgoing stream manipulation + + @Incoming("channel54") + @Outgoing("channel55") + Publisher> method38(Publisher> msg) { + return null; + } + + @Incoming("channel56") + @Outgoing("channel57") + Publisher method39(Publisher payload) { + return null; + } + + @Incoming("channel58") + @Outgoing("channel59") + PublisherBuilder> method40(PublisherBuilder> msg) { + return null; + } + + @Incoming("channel60") + @Outgoing("channel61") + PublisherBuilder method41(PublisherBuilder payload) { + return null; + } + + @Incoming("channel62") + @Outgoing("channel63") + Multi> method42(Multi> msg) { + return null; + } + + @Incoming("channel64") + @Outgoing("channel65") + Multi method43(Multi payload) { + return null; + } + } + + // --- + + @Test + public void kafkaRecordIntUuidInRecordDoubleByteBufferOut() { + // @formatter:off + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.outgoing.channel3.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel3.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.outgoing.channel5.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel5.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.outgoing.channel7.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel7.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.outgoing.channel9.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel9.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.outgoing.channel11.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel11.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + + tuple("mp.messaging.incoming.channel13.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel13.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.incoming.channel15.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel15.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.incoming.channel18.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel18.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.incoming.channel20.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel20.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + + tuple("mp.messaging.incoming.channel22.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel22.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel23.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel23.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel26.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel26.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel27.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel27.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel30.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel30.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel31.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel31.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel34.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel34.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel35.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel35.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel38.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel38.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel39.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel39.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel42.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel42.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel43.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel43.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel46.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel46.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel47.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel47.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel50.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel50.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel51.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel51.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + + tuple("mp.messaging.incoming.channel54.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel54.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel55.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel55.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel58.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel58.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel59.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel59.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel62.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel62.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel63.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel63.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + }; + // @formatter:on + + doTest(expectations, KafkaRecordIntUuidInRecordDoubleByteBufferOut.class); + } + + private static class KafkaRecordIntUuidInRecordDoubleByteBufferOut { + // @Outgoing + + @Outgoing("channel1") + Publisher> method1() { + return null; + } + + @Outgoing("channel3") + PublisherBuilder> method3() { + return null; + } + + @Outgoing("channel5") + Multi> method5() { + return null; + } + + @Outgoing("channel7") + Record method7() { + return null; + } + + @Outgoing("channel9") + CompletionStage> method9() { + return null; + } + + @Outgoing("channel11") + Uni> method11() { + return null; + } + + // @Incoming + + @Incoming("channel13") + Subscriber> method13() { + return null; + } + + @Incoming("channel15") + SubscriberBuilder, Void> method15() { + return null; + } + + @Incoming("channel18") + CompletionStage method18(KafkaRecord msg) { + return null; + } + + @Incoming("channel20") + Uni method20(KafkaRecord msg) { + return null; + } + + // @Incoming @Outgoing + + @Incoming("channel22") + @Outgoing("channel23") + Processor, Record> method22() { + return null; + } + + @Incoming("channel26") + @Outgoing("channel27") + ProcessorBuilder, Record> method24() { + return null; + } + + @Incoming("channel30") + @Outgoing("channel31") + Publisher> method26(KafkaRecord msg) { + return null; + } + + @Incoming("channel34") + @Outgoing("channel35") + PublisherBuilder> method28(KafkaRecord msg) { + return null; + } + + @Incoming("channel38") + @Outgoing("channel39") + Multi> method30(KafkaRecord msg) { + return null; + } + + @Incoming("channel42") + @Outgoing("channel43") + Record method32(KafkaRecord msg) { + return null; + } + + @Incoming("channel46") + @Outgoing("channel47") + CompletionStage> method34(KafkaRecord msg) { + return null; + } + + @Incoming("channel50") + @Outgoing("channel51") + Uni> method36(KafkaRecord msg) { + return null; + } + + // @Incoming @Outgoing stream manipulation + + @Incoming("channel54") + @Outgoing("channel55") + Publisher> method38(Publisher> msg) { + return null; + } + + @Incoming("channel58") + @Outgoing("channel59") + PublisherBuilder> method40( + PublisherBuilder> msg) { + return null; + } + + @Incoming("channel62") + @Outgoing("channel63") + Multi> method42(Multi> msg) { + return null; + } + } + + // --- + + @Test + public void consumerRecordIntUuidInProducerRecordDoubleByteBufferOut() { + // @formatter:off + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.outgoing.channel3.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel3.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.outgoing.channel5.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel5.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.outgoing.channel7.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel7.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.outgoing.channel9.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel9.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.outgoing.channel11.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel11.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + + tuple("mp.messaging.incoming.channel13.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel13.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.incoming.channel15.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel15.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.incoming.channel18.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel18.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.incoming.channel20.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel20.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + + tuple("mp.messaging.incoming.channel22.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel22.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel23.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel23.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel26.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel26.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel27.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel27.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel30.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel30.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel31.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel31.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel34.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel34.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel35.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel35.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel38.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel38.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel39.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel39.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel42.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel42.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel43.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel43.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel46.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel46.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel47.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel47.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel50.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel50.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel51.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel51.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + + tuple("mp.messaging.incoming.channel54.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel54.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel55.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel55.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel58.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel58.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel59.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel59.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + tuple("mp.messaging.incoming.channel62.key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.incoming.channel62.value.deserializer", "org.apache.kafka.common.serialization.UUIDDeserializer"), + tuple("mp.messaging.outgoing.channel63.key.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + tuple("mp.messaging.outgoing.channel63.value.serializer", "org.apache.kafka.common.serialization.ByteBufferSerializer"), + }; + // @formatter:on + + doTest(expectations, ConsumerRecordIntUuidInProducerRecordDoubleByteBufferOut.class); + } + + private static class ConsumerRecordIntUuidInProducerRecordDoubleByteBufferOut { + // @Outgoing + + @Outgoing("channel1") + Publisher> method1() { + return null; + } + + @Outgoing("channel3") + PublisherBuilder> method3() { + return null; + } + + @Outgoing("channel5") + Multi> method5() { + return null; + } + + @Outgoing("channel7") + ProducerRecord method7() { + return null; + } + + @Outgoing("channel9") + CompletionStage> method9() { + return null; + } + + @Outgoing("channel11") + Uni> method11() { + return null; + } + + // @Incoming + + @Incoming("channel13") + Subscriber> method13() { + return null; + } + + @Incoming("channel15") + SubscriberBuilder, Void> method15() { + return null; + } + + @Incoming("channel18") + CompletionStage method18(ConsumerRecord msg) { + return null; + } + + @Incoming("channel20") + Uni method20(ConsumerRecord msg) { + return null; + } + + // @Incoming @Outgoing + + @Incoming("channel22") + @Outgoing("channel23") + Processor, ProducerRecord> method22() { + return null; + } + + @Incoming("channel26") + @Outgoing("channel27") + ProcessorBuilder, ProducerRecord> method24() { + return null; + } + + @Incoming("channel30") + @Outgoing("channel31") + Publisher> method26(ConsumerRecord msg) { + return null; + } + + @Incoming("channel34") + @Outgoing("channel35") + PublisherBuilder> method28(ConsumerRecord msg) { + return null; + } + + @Incoming("channel38") + @Outgoing("channel39") + Multi> method30(ConsumerRecord msg) { + return null; + } + + @Incoming("channel42") + @Outgoing("channel43") + ProducerRecord method32(ConsumerRecord msg) { + return null; + } + + @Incoming("channel46") + @Outgoing("channel47") + CompletionStage> method34(ConsumerRecord msg) { + return null; + } + + @Incoming("channel50") + @Outgoing("channel51") + Uni> method36(ConsumerRecord msg) { + return null; + } + + // @Incoming @Outgoing stream manipulation + + @Incoming("channel54") + @Outgoing("channel55") + Publisher> method38( + Publisher> msg) { + return null; + } + + @Incoming("channel58") + @Outgoing("channel59") + PublisherBuilder> method40( + PublisherBuilder> msg) { + return null; + } + + @Incoming("channel62") + @Outgoing("channel63") + Multi> method42(Multi> msg) { + return null; + } + } + + // --- + + @Test + public void floatJsonArrayInShortByteArrayOut() { + // @formatter:off + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"), + tuple("mp.messaging.outgoing.channel2.value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"), + tuple("mp.messaging.outgoing.channel3.key.serializer", "org.apache.kafka.common.serialization.ShortSerializer"), + tuple("mp.messaging.outgoing.channel3.value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"), + tuple("mp.messaging.outgoing.channel4.key.serializer", "org.apache.kafka.common.serialization.ShortSerializer"), + tuple("mp.messaging.outgoing.channel4.value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"), + tuple("mp.messaging.outgoing.channel5.key.serializer", "org.apache.kafka.common.serialization.ShortSerializer"), + tuple("mp.messaging.outgoing.channel5.value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"), + tuple("mp.messaging.outgoing.channel6.value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"), + tuple("mp.messaging.outgoing.channel7.value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"), + tuple("mp.messaging.outgoing.channel8.key.serializer", "org.apache.kafka.common.serialization.ShortSerializer"), + tuple("mp.messaging.outgoing.channel8.value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"), + tuple("mp.messaging.outgoing.channel9.key.serializer", "org.apache.kafka.common.serialization.ShortSerializer"), + tuple("mp.messaging.outgoing.channel9.value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"), + tuple("mp.messaging.outgoing.channel10.key.serializer", "org.apache.kafka.common.serialization.ShortSerializer"), + tuple("mp.messaging.outgoing.channel10.value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"), + + tuple("mp.messaging.incoming.channel11.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel12.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel13.key.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer"), + tuple("mp.messaging.incoming.channel13.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel14.key.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer"), + tuple("mp.messaging.incoming.channel14.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel15.key.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer"), + tuple("mp.messaging.incoming.channel15.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel16.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel17.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel18.key.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer"), + tuple("mp.messaging.incoming.channel18.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel19.key.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer"), + tuple("mp.messaging.incoming.channel19.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel20.key.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer"), + tuple("mp.messaging.incoming.channel20.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel21.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel22.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel23.key.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer"), + tuple("mp.messaging.incoming.channel23.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel24.key.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer"), + tuple("mp.messaging.incoming.channel24.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + tuple("mp.messaging.incoming.channel25.key.deserializer", "org.apache.kafka.common.serialization.FloatDeserializer"), + tuple("mp.messaging.incoming.channel25.value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer"), + }; + // @formatter:on + + doTest(expectations, FloatJsonArrayInShortByteArrayOut.class); + } + + private static class FloatJsonArrayInShortByteArrayOut { + // outgoing + + @Inject + @Channel("channel1") + Emitter emitter1; + + @Inject + @Channel("channel2") + Emitter> emitter2; + + @Inject + @Channel("channel3") + Emitter> emitter3; + + @Inject + @Channel("channel4") + Emitter> emitter4; + + @Inject + @Channel("channel5") + Emitter> emitter5; + + @Inject + @Channel("channel6") + MutinyEmitter emitter6; + + @Inject + @Channel("channel7") + MutinyEmitter> emitter7; + + @Inject + @Channel("channel8") + MutinyEmitter> emitter8; + + @Inject + @Channel("channel9") + MutinyEmitter> emitter9; + + @Inject + @Channel("channel10") + MutinyEmitter> emitter10; + + // incoming + + @Inject + @Channel("channel11") + Publisher consumer11; + + @Inject + @Channel("channel12") + Publisher> consumer12; + + @Inject + @Channel("channel13") + Publisher> consumer13; + + @Inject + @Channel("channel14") + Publisher> consumer14; + + @Inject + @Channel("channel15") + Publisher> consumer15; + + @Inject + @Channel("channel16") + PublisherBuilder consumer16; + + @Inject + @Channel("channel17") + PublisherBuilder> consumer17; + + @Inject + @Channel("channel18") + PublisherBuilder> consumer18; + + @Inject + @Channel("channel19") + PublisherBuilder> consumer19; + + @Inject + @Channel("channel20") + PublisherBuilder> consumer20; + + @Inject + @Channel("channel21") + Multi consumer21; + + @Inject + @Channel("channel22") + Multi> consumer22; + + @Inject + @Channel("channel23") + Multi> consumer23; + + @Inject + @Channel("channel24") + Multi> consumer24; + + @Inject + @Channel("channel25") + Multi> consumer25; + } +} diff --git a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PersonSerializer.java b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PersonSerializer.java new file mode 100644 index 0000000000000..59c1c8b01713a --- /dev/null +++ b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PersonSerializer.java @@ -0,0 +1,6 @@ +package io.quarkus.it.kafka; + +import io.quarkus.kafka.client.serialization.JsonbSerializer; + +public class PersonSerializer extends JsonbSerializer { +} diff --git a/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties b/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties index cd67849140cb3..e4a06037410c1 100644 --- a/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties +++ b/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties @@ -7,8 +7,6 @@ quarkus.kafka.health.enabled=true mp.messaging.outgoing.people-out.connector=smallrye-kafka mp.messaging.outgoing.people-out.topic=people -mp.messaging.outgoing.people-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer mp.messaging.incoming.people-in.connector=smallrye-kafka mp.messaging.incoming.people-in.topic=people mp.messaging.incoming.people-in.auto.offset.reset=earliest -mp.messaging.incoming.people-in.value.deserializer=io.quarkus.it.kafka.PersonDeserializer