Skip to content

Commit

Permalink
Kafka & Pulsar schema discovery handling for GenericPayload type
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Mar 28, 2024
1 parent 7d4b497 commit 8fbc2bc
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ final class DotNames {
static final DotName TARGETED_MESSAGES = DotName.createSimple(io.smallrye.reactive.messaging.TargetedMessages.class.getName());

static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName());

static final DotName GENERIC_PAYLOAD = DotName.createSimple(io.smallrye.reactive.messaging.GenericPayload.class.getName());

static final DotName KAFKA_RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.KafkaRecord.class.getName());
static final DotName RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.Record.class.getName());
static final DotName CONSUMER_RECORD = DotName.createSimple(org.apache.kafka.clients.consumer.ConsumerRecord.class.getName());
Expand All @@ -30,6 +33,7 @@ final class DotNames {
static final DotName SUBSCRIBER = DotName.createSimple(org.reactivestreams.Subscriber.class.getName());
static final DotName SUBSCRIBER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder.class.getName());
static final DotName PUBLISHER = DotName.createSimple(org.reactivestreams.Publisher.class.getName());
static final DotName FLOW_PUBLISHER = DotName.createSimple(java.util.concurrent.Flow.Publisher.class.getName());
static final DotName PUBLISHER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder.class.getName());
static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName());
static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ private Type getIncomingTypeFromMethod(MethodInfo method) {
if ((isCompletionStage(returnType) && parametersCount >= 1)
|| (isUni(returnType) && parametersCount >= 1)
|| (isPublisher(returnType) && parametersCount == 1)
|| (isFlowPublisher(returnType) && parametersCount == 1)
|| (isPublisherBuilder(returnType) && parametersCount == 1)
|| (isMulti(returnType) && parametersCount == 1)) {
incomingType = parameterTypes.get(0);
Expand All @@ -433,8 +434,11 @@ private Type getIncomingTypeFromMethod(MethodInfo method) {
}

// @Incoming @Outgoing stream manipulation
if (incomingType != null
&& (isPublisher(incomingType) || isPublisherBuilder(incomingType) || isMulti(incomingType))) {
if (incomingType != null &&
(isPublisher(incomingType)
|| isFlowPublisher(incomingType)
|| isPublisherBuilder(incomingType)
|| isMulti(incomingType))) {
incomingType = incomingType.asParameterizedType().arguments().get(0);
}
}
Expand All @@ -446,7 +450,10 @@ private Type getIncomingTypeFromChannelInjectionPoint(Type injectionPointType) {
return null;
}

if (isPublisher(injectionPointType) || isPublisherBuilder(injectionPointType) || isMulti(injectionPointType)) {
if (isPublisher(injectionPointType)
|| isPublisherBuilder(injectionPointType)
|| isFlowPublisher(injectionPointType)
|| isMulti(injectionPointType)) {
return injectionPointType.asParameterizedType().arguments().get(0);
} else {
return null;
Expand All @@ -462,6 +469,7 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {

// @Outgoing
if ((isPublisher(returnType) && parametersCount == 0)
|| (isFlowPublisher(returnType) && parametersCount == 0)
|| (isPublisherBuilder(returnType) && parametersCount == 0)
|| (isMulti(returnType) && parametersCount == 0)
|| (isMultiSplitter(returnType) && parametersCount == 0)
Expand All @@ -476,11 +484,11 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {

// @Incoming @Outgoing
if (method.hasAnnotation(DotNames.INCOMING) || method.hasAnnotation(DotNames.INCOMINGS)) {
if ((isCompletionStage(returnType) && parametersCount == 1)
|| (isUni(returnType) && parametersCount == 1)
|| (isPublisher(returnType) && parametersCount == 1)
if (isCompletionStage(returnType) || isUni(returnType) || isMulti(returnType)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
} else if ((isPublisher(returnType) && parametersCount == 1)
|| (isFlowPublisher(returnType) && parametersCount == 1)
|| (isPublisherBuilder(returnType) && parametersCount == 1)
|| (isMulti(returnType) && parametersCount == 1)
|| (isMultiSplitter(returnType) && parametersCount == 1)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
} else if ((isProcessor(returnType) && parametersCount == 0)
Expand All @@ -494,7 +502,10 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {

// @Incoming @Outgoing stream manipulation
if (outgoingType != null
&& (isPublisher(outgoingType) || isPublisherBuilder(outgoingType) || isMulti(outgoingType))) {
&& (isPublisher(outgoingType)
|| isFlowPublisher(outgoingType)
|| isPublisherBuilder(outgoingType)
|| isMulti(outgoingType))) {
outgoingType = outgoingType.asParameterizedType().arguments().get(0);
}
}
Expand Down Expand Up @@ -548,6 +559,11 @@ private void extractKeyValueType(Type type, TriConsumer<Type, Type, Boolean> key
return;
}

if (isGenericPayload(type)) {
extractKeyValueType(type.asParameterizedType().arguments().get(0), keyValueTypeAcceptor);
return;
}

if (isMessage(type)) {
List<Type> typeArguments = type.asParameterizedType().arguments();
Type messageTypeParameter = typeArguments.get(0);
Expand Down Expand Up @@ -627,6 +643,13 @@ private static boolean isPublisher(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isFlowPublisher(Type type) {
// raw type Flow.Publisher is wrong, must be Flow.Publisher<Something>
return DotNames.FLOW_PUBLISHER.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isPublisherBuilder(Type type) {
// raw type PublisherBuilder is wrong, must be PublisherBuilder<Something, SomethingElse>
return DotNames.PUBLISHER_BUILDER.equals(type.name())
Expand Down Expand Up @@ -687,6 +710,13 @@ private static boolean isMessage(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isGenericPayload(Type type) {
// raw type Message is wrong, must be Message<Something>
return DotNames.GENERIC_PAYLOAD.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isKafkaRecord(Type type) {
// raw type KafkaRecord is wrong, must be KafkaRecord<Something, SomethingElse>
return DotNames.KAFKA_RECORD.equals(type.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.split.MultiSplitter;
import io.smallrye.reactive.messaging.GenericPayload;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.Targeted;
import io.smallrye.reactive.messaging.TargetedMessages;
Expand Down Expand Up @@ -2960,5 +2961,43 @@ private static class RequestReply {

}

@Test
void kafkaGenericPayload() {
Tuple[] expectations = {
tuple("mp.messaging.incoming.channel1.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"),
tuple("mp.messaging.outgoing.out1.key.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
tuple("mp.messaging.outgoing.out1.value.serializer", "io.quarkus.kafka.client.serialization.JsonObjectSerializer"),
tuple("mp.messaging.incoming.channel2.key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"),
tuple("mp.messaging.incoming.channel2.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"),
tuple("mp.messaging.outgoing.channel3.value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"),
tuple("mp.messaging.outgoing.channel4.key.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
tuple("mp.messaging.outgoing.channel4.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
};
doTest(expectations, GenericPayloadProducer.class);
}

private static class GenericPayloadProducer {
@Incoming("channel1")
@Outgoing("out1")
GenericPayload<ProducerRecord<Long, JsonObject>> method1(String msg) {
return null;
}

@Incoming("channel2")
void method2(GenericPayload<Record<Long, JsonObject>> msg) {
}

@Outgoing("channel3")
GenericPayload<Integer> method3() {
return null;
}


@Outgoing("channel4")
Multi<GenericPayload<ProducerRecord<String, Long>>> method4() {
return null;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ final class DotNames {
static final DotName TARGETED_MESSAGES = DotName.createSimple(io.smallrye.reactive.messaging.TargetedMessages.class.getName());

static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName());

static final DotName GENERIC_PAYLOAD = DotName.createSimple(io.smallrye.reactive.messaging.GenericPayload.class.getName());
static final DotName PULSAR_MESSAGE = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.PulsarMessage.class.getName());
static final DotName PULSAR_BATCH_MESSAGE = DotName.createSimple(io.smallrye.reactive.messaging.pulsar.PulsarBatchMessage.class.getName());
static final DotName PULSAR_API_MESSAGE = DotName.createSimple(org.apache.pulsar.client.api.Message.class.getName());
Expand All @@ -35,6 +37,8 @@ final class DotNames {
static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName());
static final DotName FLOW_PUBLISHER = DotName.createSimple(java.util.concurrent.Flow.Publisher.class.getName());
static final DotName MULTI = DotName.createSimple(io.smallrye.mutiny.Multi.class.getName());
static final DotName MULTI_SPLITTER = DotName.createSimple(io.smallrye.mutiny.operators.multi.split.MultiSplitter.class.getName());

static final DotName PULSAR_GENERIC_RECORD = DotName.createSimple(org.apache.pulsar.client.api.schema.GenericRecord.class.getName());

static final DotName AVRO_GENERATED = DotName.createSimple("org.apache.avro.specific.AvroGenerated");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,12 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {

// @Incoming @Outgoing
if (method.hasAnnotation(DotNames.INCOMING) || method.hasAnnotation(DotNames.INCOMINGS)) {
if ((isCompletionStage(returnType) && parametersCount == 1)
|| (isUni(returnType) && parametersCount == 1)
|| (isPublisher(returnType) && parametersCount == 1)
if (isCompletionStage(returnType) || isUni(returnType) || isMulti(returnType)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
} else if ((isPublisher(returnType) && parametersCount == 1)
|| (isPublisherBuilder(returnType) && parametersCount == 1)
|| (isFlowPublisher(returnType) && parametersCount == 1)
|| (isMulti(returnType) && parametersCount == 1)) {
|| (isMultiSplitter(returnType) && parametersCount == 1)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
} else if ((isProcessor(returnType) && parametersCount == 0)
|| (isProcessorBuilder(returnType) && parametersCount == 0)) {
Expand Down Expand Up @@ -328,6 +328,11 @@ private void extractValueType(Type type, BiConsumer<Type, Boolean> schemaAccepto
return;
}

if (isGenericPayload(type)) {
extractValueType(type.asParameterizedType().arguments().get(0), schemaAcceptor);
return;
}

if (isMessage(type)) {
List<Type> typeArguments = type.asParameterizedType().arguments();
Type messageTypeParameter = typeArguments.get(0);
Expand Down Expand Up @@ -381,6 +386,13 @@ private static boolean isMulti(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isMultiSplitter(Type type) {
// raw type MultiSplitter is wrong, must be MultiSplitter<Something, KeyEnum>
return DotNames.MULTI_SPLITTER.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 2;
}

private static boolean isTargeted(Type type) {
return DotNames.TARGETED.equals(type.name())
|| DotNames.TARGETED_MESSAGES.equals(type.name());
Expand Down Expand Up @@ -467,6 +479,13 @@ private static boolean isMessage(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isGenericPayload(Type type) {
// raw type Message is wrong, must be Message<Something>
return DotNames.GENERIC_PAYLOAD.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isPulsarMessage(Type type) {
// raw type PulsarMessage is wrong, must be PulsarMessage<SomethingElse>
return DotNames.PULSAR_MESSAGE.equals(type.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.smallrye.config.common.MapBackedConfigSource;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.GenericPayload;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.Targeted;
import io.smallrye.reactive.messaging.TargetedMessages;
Expand Down Expand Up @@ -2070,4 +2071,40 @@ TargetedMessages method2(String msg) {

}

@Test
void pulsarGenericPayload() {
Tuple[] expectations = {
tuple("mp.messaging.incoming.channel1.schema", "STRING"),
tuple("mp.messaging.outgoing.out1.schema", "JsonObjectJSON_OBJECTSchema"),
tuple("mp.messaging.incoming.channel2.schema", "STRING"),
tuple("mp.messaging.outgoing.channel3.schema", "INT32"),
tuple("mp.messaging.outgoing.channel4.schema", "INT64"),
};
var generatedSchemas = Map.of("io.vertx.core.json.JsonObject", "JsonObjectJSON_OBJECTSchema");
doTest(expectations, generatedSchemas, GenericPayloadProducer.class);
}

private static class GenericPayloadProducer {
@Incoming("channel1")
@Outgoing("out1")
GenericPayload<JsonObject> method1(String msg) {
return null;
}

@Incoming("channel2")
void method2(GenericPayload<String> msg) {
}

@Outgoing("channel3")
GenericPayload<Integer> method3() {
return null;
}

@Outgoing("channel4")
Multi<GenericPayload<Long>> method4() {
return null;
}
}


}

0 comments on commit 8fbc2bc

Please sign in to comment.