diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java index 3153a56e7e55b..5dd492dbbcb1f 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java @@ -19,6 +19,9 @@ final class DotNames { static final DotName TARGETED_MESSAGES = DotName.createSimple(io.smallrye.reactive.messaging.TargetedMessages.class.getName()); static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName()); + + static final DotName GENERIC_PAYLOAD = DotName.createSimple(io.smallrye.reactive.messaging.GenericPayload.class.getName()); + static final DotName KAFKA_RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.KafkaRecord.class.getName()); static final DotName RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.Record.class.getName()); static final DotName CONSUMER_RECORD = DotName.createSimple(org.apache.kafka.clients.consumer.ConsumerRecord.class.getName()); @@ -30,6 +33,7 @@ final class DotNames { static final DotName SUBSCRIBER = DotName.createSimple(org.reactivestreams.Subscriber.class.getName()); static final DotName SUBSCRIBER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder.class.getName()); static final DotName PUBLISHER = DotName.createSimple(org.reactivestreams.Publisher.class.getName()); + static final DotName FLOW_PUBLISHER = DotName.createSimple(java.util.concurrent.Flow.Publisher.class.getName()); static final DotName PUBLISHER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder.class.getName()); static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName()); static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName()); diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index df4f15d53c438..16aefe55c77b4 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -420,6 +420,7 @@ private Type getIncomingTypeFromMethod(MethodInfo method) { if ((isCompletionStage(returnType) && parametersCount >= 1) || (isUni(returnType) && parametersCount >= 1) || (isPublisher(returnType) && parametersCount == 1) + || (isFlowPublisher(returnType) && parametersCount == 1) || (isPublisherBuilder(returnType) && parametersCount == 1) || (isMulti(returnType) && parametersCount == 1)) { incomingType = parameterTypes.get(0); @@ -433,8 +434,11 @@ private Type getIncomingTypeFromMethod(MethodInfo method) { } // @Incoming @Outgoing stream manipulation - if (incomingType != null - && (isPublisher(incomingType) || isPublisherBuilder(incomingType) || isMulti(incomingType))) { + if (incomingType != null && + (isPublisher(incomingType) + || isFlowPublisher(incomingType) + || isPublisherBuilder(incomingType) + || isMulti(incomingType))) { incomingType = incomingType.asParameterizedType().arguments().get(0); } } @@ -446,7 +450,10 @@ private Type getIncomingTypeFromChannelInjectionPoint(Type injectionPointType) { return null; } - if (isPublisher(injectionPointType) || isPublisherBuilder(injectionPointType) || isMulti(injectionPointType)) { + if (isPublisher(injectionPointType) + || isPublisherBuilder(injectionPointType) + || isFlowPublisher(injectionPointType) + || isMulti(injectionPointType)) { return injectionPointType.asParameterizedType().arguments().get(0); } else { return null; @@ -462,6 +469,7 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) { // @Outgoing if ((isPublisher(returnType) && parametersCount == 0) + || (isFlowPublisher(returnType) && parametersCount == 0) || (isPublisherBuilder(returnType) && parametersCount == 0) || (isMulti(returnType) && parametersCount == 0) || (isMultiSplitter(returnType) && parametersCount == 0) @@ -476,11 +484,11 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) { // @Incoming @Outgoing if (method.hasAnnotation(DotNames.INCOMING) || method.hasAnnotation(DotNames.INCOMINGS)) { - if ((isCompletionStage(returnType) && parametersCount == 1) - || (isUni(returnType) && parametersCount == 1) - || (isPublisher(returnType) && parametersCount == 1) + if (isCompletionStage(returnType) || isUni(returnType) || isMulti(returnType)) { + outgoingType = returnType.asParameterizedType().arguments().get(0); + } else if ((isPublisher(returnType) && parametersCount == 1) + || (isFlowPublisher(returnType) && parametersCount == 1) || (isPublisherBuilder(returnType) && parametersCount == 1) - || (isMulti(returnType) && parametersCount == 1) || (isMultiSplitter(returnType) && parametersCount == 1)) { outgoingType = returnType.asParameterizedType().arguments().get(0); } else if ((isProcessor(returnType) && parametersCount == 0) @@ -494,7 +502,10 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) { // @Incoming @Outgoing stream manipulation if (outgoingType != null - && (isPublisher(outgoingType) || isPublisherBuilder(outgoingType) || isMulti(outgoingType))) { + && (isPublisher(outgoingType) + || isFlowPublisher(outgoingType) + || isPublisherBuilder(outgoingType) + || isMulti(outgoingType))) { outgoingType = outgoingType.asParameterizedType().arguments().get(0); } } @@ -548,6 +559,11 @@ private void extractKeyValueType(Type type, TriConsumer key return; } + if (isGenericPayload(type)) { + extractKeyValueType(type.asParameterizedType().arguments().get(0), keyValueTypeAcceptor); + return; + } + if (isMessage(type)) { List typeArguments = type.asParameterizedType().arguments(); Type messageTypeParameter = typeArguments.get(0); @@ -627,6 +643,13 @@ private static boolean isPublisher(Type type) { && type.asParameterizedType().arguments().size() == 1; } + private static boolean isFlowPublisher(Type type) { + // raw type Flow.Publisher is wrong, must be Flow.Publisher + return DotNames.FLOW_PUBLISHER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + private static boolean isPublisherBuilder(Type type) { // raw type PublisherBuilder is wrong, must be PublisherBuilder return DotNames.PUBLISHER_BUILDER.equals(type.name()) @@ -687,6 +710,13 @@ private static boolean isMessage(Type type) { && type.asParameterizedType().arguments().size() == 1; } + private static boolean isGenericPayload(Type type) { + // raw type Message is wrong, must be Message + return DotNames.GENERIC_PAYLOAD.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + private static boolean isKafkaRecord(Type type) { // raw type KafkaRecord is wrong, must be KafkaRecord return DotNames.KAFKA_RECORD.equals(type.name()) diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java index 7cebd95da8802..c7cf0b045844b 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java @@ -53,6 +53,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.split.MultiSplitter; +import io.smallrye.reactive.messaging.GenericPayload; import io.smallrye.reactive.messaging.MutinyEmitter; import io.smallrye.reactive.messaging.Targeted; import io.smallrye.reactive.messaging.TargetedMessages; @@ -2960,5 +2961,43 @@ private static class RequestReply { } + @Test + void kafkaGenericPayload() { + Tuple[] expectations = { + tuple("mp.messaging.incoming.channel1.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + tuple("mp.messaging.outgoing.out1.key.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + tuple("mp.messaging.outgoing.out1.value.serializer", "io.quarkus.kafka.client.serialization.JsonObjectSerializer"), + tuple("mp.messaging.incoming.channel2.key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"), + tuple("mp.messaging.incoming.channel2.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"), + tuple("mp.messaging.outgoing.channel3.value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"), + tuple("mp.messaging.outgoing.channel4.key.serializer", "org.apache.kafka.common.serialization.StringSerializer"), + tuple("mp.messaging.outgoing.channel4.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"), + }; + doTest(expectations, GenericPayloadProducer.class); + } + + private static class GenericPayloadProducer { + @Incoming("channel1") + @Outgoing("out1") + GenericPayload> method1(String msg) { + return null; + } + + @Incoming("channel2") + void method2(GenericPayload> msg) { + } + + @Outgoing("channel3") + GenericPayload method3() { + return null; + } + + + @Outgoing("channel4") + Multi>> method4() { + return null; + } + } + } diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java index 6c0cbde2047e9..dc321cb954ec5 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java @@ -18,6 +18,8 @@ final class DotNames { static final DotName TARGETED_MESSAGES = DotName.createSimple(io.smallrye.reactive.messaging.TargetedMessages.class.getName()); static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName()); + + static final DotName GENERIC_PAYLOAD = DotName.createSimple(io.smallrye.reactive.messaging.GenericPayload.class.getName()); static final DotName PULSAR_MESSAGE = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.PulsarMessage.class.getName()); static final DotName PULSAR_BATCH_MESSAGE = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.PulsarBatchMessage.class.getName()); static final DotName PULSAR_API_MESSAGE = DotName.createSimple(org.apache.pulsar.client.api.Message.class.getName()); @@ -35,6 +37,8 @@ final class DotNames { static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName()); static final DotName FLOW_PUBLISHER = DotName.createSimple(java.util.concurrent.Flow.Publisher.class.getName()); static final DotName MULTI = DotName.createSimple(io.smallrye.mutiny.Multi.class.getName()); + static final DotName MULTI_SPLITTER = DotName.createSimple(io.smallrye.mutiny.operators.multi.split.MultiSplitter.class.getName()); + static final DotName PULSAR_GENERIC_RECORD = DotName.createSimple(org.apache.pulsar.client.api.schema.GenericRecord.class.getName()); static final DotName AVRO_GENERATED = DotName.createSimple("org.apache.avro.specific.AvroGenerated"); diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java index 78ada8473793a..3f88ab082a283 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java @@ -252,12 +252,12 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) { // @Incoming @Outgoing if (method.hasAnnotation(DotNames.INCOMING) || method.hasAnnotation(DotNames.INCOMINGS)) { - if ((isCompletionStage(returnType) && parametersCount == 1) - || (isUni(returnType) && parametersCount == 1) - || (isPublisher(returnType) && parametersCount == 1) + if (isCompletionStage(returnType) || isUni(returnType) || isMulti(returnType)) { + outgoingType = returnType.asParameterizedType().arguments().get(0); + } else if ((isPublisher(returnType) && parametersCount == 1) || (isPublisherBuilder(returnType) && parametersCount == 1) || (isFlowPublisher(returnType) && parametersCount == 1) - || (isMulti(returnType) && parametersCount == 1)) { + || (isMultiSplitter(returnType) && parametersCount == 1)) { outgoingType = returnType.asParameterizedType().arguments().get(0); } else if ((isProcessor(returnType) && parametersCount == 0) || (isProcessorBuilder(returnType) && parametersCount == 0)) { @@ -328,6 +328,11 @@ private void extractValueType(Type type, BiConsumer schemaAccepto return; } + if (isGenericPayload(type)) { + extractValueType(type.asParameterizedType().arguments().get(0), schemaAcceptor); + return; + } + if (isMessage(type)) { List typeArguments = type.asParameterizedType().arguments(); Type messageTypeParameter = typeArguments.get(0); @@ -381,6 +386,13 @@ private static boolean isMulti(Type type) { && type.asParameterizedType().arguments().size() == 1; } + private static boolean isMultiSplitter(Type type) { + // raw type MultiSplitter is wrong, must be MultiSplitter + return DotNames.MULTI_SPLITTER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + private static boolean isTargeted(Type type) { return DotNames.TARGETED.equals(type.name()) || DotNames.TARGETED_MESSAGES.equals(type.name()); @@ -467,6 +479,13 @@ private static boolean isMessage(Type type) { && type.asParameterizedType().arguments().size() == 1; } + private static boolean isGenericPayload(Type type) { + // raw type Message is wrong, must be Message + return DotNames.GENERIC_PAYLOAD.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 1; + } + private static boolean isPulsarMessage(Type type) { // raw type PulsarMessage is wrong, must be PulsarMessage return DotNames.PULSAR_MESSAGE.equals(type.name()) diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java index 5f8fea4d404e5..0910e90ddd70b 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java @@ -53,6 +53,7 @@ import io.smallrye.config.common.MapBackedConfigSource; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.GenericPayload; import io.smallrye.reactive.messaging.MutinyEmitter; import io.smallrye.reactive.messaging.Targeted; import io.smallrye.reactive.messaging.TargetedMessages; @@ -2070,4 +2071,40 @@ TargetedMessages method2(String msg) { } + @Test + void pulsarGenericPayload() { + Tuple[] expectations = { + tuple("mp.messaging.incoming.channel1.schema", "STRING"), + tuple("mp.messaging.outgoing.out1.schema", "JsonObjectJSON_OBJECTSchema"), + tuple("mp.messaging.incoming.channel2.schema", "STRING"), + tuple("mp.messaging.outgoing.channel3.schema", "INT32"), + tuple("mp.messaging.outgoing.channel4.schema", "INT64"), + }; + var generatedSchemas = Map.of("io.vertx.core.json.JsonObject", "JsonObjectJSON_OBJECTSchema"); + doTest(expectations, generatedSchemas, GenericPayloadProducer.class); + } + + private static class GenericPayloadProducer { + @Incoming("channel1") + @Outgoing("out1") + GenericPayload method1(String msg) { + return null; + } + + @Incoming("channel2") + void method2(GenericPayload msg) { + } + + @Outgoing("channel3") + GenericPayload method3() { + return null; + } + + @Outgoing("channel4") + Multi> method4() { + return null; + } + } + + }