Skip to content

Commit

Permalink
Merge pull request #32015 from ozangunalp/fix_reactive_messaging_inco…
Browse files Browse the repository at this point in the history
…mings

Support repeatable Incomings annotation for reactive messaging
  • Loading branch information
gsmet authored Mar 22, 2023
2 parents e34625b + ab62972 commit 7d7bc87
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ List<AnnotationInstance> findAnnotationsOnMethods(DotName annotation) {
.collect(Collectors.toList());
}

List<AnnotationInstance> findRepeatableAnnotationsOnMethods(DotName annotation) {
return index.getAnnotationsWithRepeatable(annotation, index)
.stream()
.filter(it -> it.target().kind() == AnnotationTarget.Kind.METHOD)
.collect(Collectors.toList());
}

List<AnnotationInstance> findAnnotationsOnInjectionPoints(DotName annotation) {
return index.getAnnotations(annotation)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void defaultChannelConfiguration(

if (launchMode.getLaunchMode().isDevOrTest()) {
if (!runtimeConfig.enableGracefulShutdownInDevAndTestMode) {
List<AnnotationInstance> incomings = discoveryState.findAnnotationsOnMethods(DotNames.INCOMING);
List<AnnotationInstance> incomings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.INCOMING);
List<AnnotationInstance> channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL);
List<AnnotationInstance> annotations = new ArrayList<>();
annotations.addAll(incomings);
Expand All @@ -188,7 +188,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
BuildProducer<ReflectiveClassBuildItem> reflection) {
Map<String, String> alreadyGeneratedSerializers = new HashMap<>();
Map<String, String> alreadyGeneratedDeserializers = new HashMap<>();
for (AnnotationInstance annotation : discovery.findAnnotationsOnMethods(DotNames.INCOMING)) {
for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.INCOMING)) {
String channelName = annotation.value().asString();
if (!discovery.isKafkaConnector(channelsManagedByConnectors, true, channelName)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -63,7 +64,9 @@ private static void doTest(Tuple[] expectations, Class<?>... classesToIndex) {
private static void doTest(Config customConfig, Tuple[] expectations, Class<?>... classesToIndex) {
List<RunTimeConfigurationDefaultBuildItem> configs = new ArrayList<>();

DefaultSerdeDiscoveryState discovery = new DefaultSerdeDiscoveryState(index(classesToIndex)) {
List<Class<?>> classes = new ArrayList<>(Arrays.asList(classesToIndex));
classes.add(Incoming.class);
DefaultSerdeDiscoveryState discovery = new DefaultSerdeDiscoveryState(index(classes)) {
@Override
Config getConfig() {
return customConfig != null ? customConfig : super.getConfig();
Expand All @@ -89,7 +92,7 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean in
}
}

private static IndexView index(Class<?>... classes) {
private static IndexView index(List<Class<?>> classes) {
Indexer indexer = new Indexer();
for (Class<?> clazz : classes) {
try {
Expand Down Expand Up @@ -2696,4 +2699,33 @@ private static class TransactionalProducer {

}

@Test
void repeatableIncomings() {
Tuple[] expectations = {
tuple("mp.messaging.incoming.channel1.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"),
tuple("mp.messaging.incoming.channel2.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"),
tuple("mp.messaging.incoming.channel3.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"),
tuple("mp.messaging.incoming.channel4.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"),
};
doTest(expectations, RepeatableIncomingsChannels.class);
}


private static class RepeatableIncomingsChannels {

@Incoming("channel1")
@Incoming("channel2")
void method1(String msg) {

}

@Incoming("channel3")
@Incoming("channel4")
void method2(JsonObject msg) {

}

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ public List<UnremovableBeanBuildItem> removalExclusions() {
new UnremovableBeanBuildItem(
new BeanClassAnnotationExclusion(
ReactiveMessagingDotNames.INCOMING)),
new UnremovableBeanBuildItem(
new BeanClassAnnotationExclusion(
ReactiveMessagingDotNames.INCOMINGS)),
new UnremovableBeanBuildItem(
new BeanClassAnnotationExclusion(
ReactiveMessagingDotNames.OUTGOING)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkus.it.kafka;

import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class KafkaRepeatableReceivers {

private final List<Double> prices = new CopyOnWriteArrayList<>();

@Incoming("prices-in")
@Incoming("prices-in2")
public CompletionStage<Void> consume(Message<Double> msg) {
prices.add(msg.getPayload());
return msg.ack();
}

public List<Double> getPrices() {
return prices;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.it.kafka;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class PricesProducer {

@Outgoing("prices-out")
public Multi<Double> generatePrices() {
return Multi.createFrom().items(1.2, 2.2, 3.4);
}

@Outgoing("prices-out2")
public Multi<Double> generatePrices2() {
return Multi.createFrom().items(4.5, 5.6, 6.7);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ mp.messaging.outgoing.pets-out.topic=pets
mp.messaging.incoming.pets-in.topic=pets

quarkus.redis.my-redis.hosts=${quarkus.redis.hosts}

mp.messaging.outgoing.prices-out.topic=prices
mp.messaging.outgoing.prices-out2.topic=prices2

mp.messaging.incoming.prices-in.topic=prices
mp.messaging.incoming.prices-in2.topic=prices2
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.quarkus.arc.Arc;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.DisabledOnIntegrationTest;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kafka.KafkaCompanionResource;
import io.restassured.common.mapper.TypeRef;
Expand Down Expand Up @@ -48,4 +50,11 @@ public void testFruits() {
await().untilAsserted(() -> Assertions.assertEquals(get("/kafka/fruits").as(TYPE_REF).size(), 4));
}

@Test
@DisabledOnIntegrationTest
public void testPrices() {
KafkaRepeatableReceivers repeatableReceivers = Arc.container().instance(KafkaRepeatableReceivers.class).get();
await().untilAsserted(() -> Assertions.assertEquals(repeatableReceivers.getPrices().size(), 6));
}

}

0 comments on commit 7d7bc87

Please sign in to comment.