diff --git a/bom/application/pom.xml b/bom/application/pom.xml index ccdd60aa46d1e..c489fd801fccd 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -1907,6 +1907,11 @@ quarkus-vertx-deployment ${project.version} + + io.quarkus + quarkus-vertx-deployment-spi + ${project.version} + io.quarkus quarkus-vertx-latebound-mdc-provider @@ -1952,6 +1957,16 @@ quarkus-vertx-http-dev-ui-resources ${project.version} + + io.quarkus + quarkus-vertx-kotlin + ${project.version} + + + io.quarkus + quarkus-vertx-kotlin-deployment + ${project.version} + io.quarkus diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml index 260b88d194a5f..a63c400e29e3d 100644 --- a/devtools/bom-descriptor-json/pom.xml +++ b/devtools/bom-descriptor-json/pom.xml @@ -2943,6 +2943,19 @@ + + io.quarkus + quarkus-vertx-kotlin + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-virtual-threads diff --git a/docs/pom.xml b/docs/pom.xml index 9f7fbb3430f5c..6b6731adb1901 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -2955,6 +2955,19 @@ + + io.quarkus + quarkus-vertx-kotlin-deployment + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-virtual-threads-deployment diff --git a/docs/src/main/asciidoc/kotlin.adoc b/docs/src/main/asciidoc/kotlin.adoc index eb2710ebd3347..84e2e61be86fd 100644 --- a/docs/src/main/asciidoc/kotlin.adoc +++ b/docs/src/main/asciidoc/kotlin.adoc @@ -501,6 +501,9 @@ The following extensions provide support for Kotlin Coroutines by allowing the u |`quarkus-smallrye-fault-tolerance` |Support is provided for the declarative annotation-based API +|`quarkus-vertx` +|Support is provided for `@ConsumeEvent` methods + |=== === Kotlin coroutines and Mutiny diff --git a/extensions/kotlin/deployment/pom.xml b/extensions/kotlin/deployment/pom.xml index 6e931d801d627..9515f8f899c83 100644 --- a/extensions/kotlin/deployment/pom.xml +++ b/extensions/kotlin/deployment/pom.xml @@ -28,6 +28,11 @@ kotlin-compiler + + io.quarkus + quarkus-vertx-kotlin-deployment + true + diff --git a/extensions/kotlin/runtime/pom.xml b/extensions/kotlin/runtime/pom.xml index a92dd5751b376..7af7abdf3c66c 100644 --- a/extensions/kotlin/runtime/pom.xml +++ b/extensions/kotlin/runtime/pom.xml @@ -78,5 +78,11 @@ org.jetbrains.kotlinx kotlinx-coroutines-jdk8 + + + io.quarkus + quarkus-vertx-kotlin + true + diff --git a/extensions/vertx/deployment-spi/pom.xml b/extensions/vertx/deployment-spi/pom.xml new file mode 100644 index 0000000000000..719736a3d4e08 --- /dev/null +++ b/extensions/vertx/deployment-spi/pom.xml @@ -0,0 +1,26 @@ + + + + quarkus-vertx-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-vertx-deployment-spi + Quarkus - Vert.x - Deployment - SPI + + + + io.quarkus + quarkus-core-deployment + + + io.quarkus.arc + arc-processor + + + + diff --git a/extensions/vertx/deployment-spi/src/main/java/io/quarkus/vertx/deployment/spi/EventConsumerInvokerCustomizerBuildItem.java b/extensions/vertx/deployment-spi/src/main/java/io/quarkus/vertx/deployment/spi/EventConsumerInvokerCustomizerBuildItem.java new file mode 100644 index 0000000000000..20f4f993409d2 --- /dev/null +++ b/extensions/vertx/deployment-spi/src/main/java/io/quarkus/vertx/deployment/spi/EventConsumerInvokerCustomizerBuildItem.java @@ -0,0 +1,24 @@ +package io.quarkus.vertx.deployment.spi; + +import java.util.function.BiConsumer; + +import org.jboss.jandex.MethodInfo; + +import io.quarkus.arc.processor.InvokerBuilder; +import io.quarkus.builder.item.MultiBuildItem; + +/** + * This build item should be considered private and should not be used outside of core Quarkus. + * It can be changed without notice. + */ +public final class EventConsumerInvokerCustomizerBuildItem extends MultiBuildItem { + private final BiConsumer invokerCustomizer; + + public EventConsumerInvokerCustomizerBuildItem(BiConsumer invokerCustomizer) { + this.invokerCustomizer = invokerCustomizer; + } + + public BiConsumer getInvokerCustomizer() { + return invokerCustomizer; + } +} diff --git a/extensions/vertx/deployment/pom.xml b/extensions/vertx/deployment/pom.xml index fd96dfafdae57..3caa22b6d250b 100644 --- a/extensions/vertx/deployment/pom.xml +++ b/extensions/vertx/deployment/pom.xml @@ -37,6 +37,10 @@ io.quarkus quarkus-jackson-spi + + io.quarkus + quarkus-vertx-deployment-spi + io.quarkus quarkus-junit5-internal diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java index 28592e9d9839f..1c3d7fb2e9915 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java @@ -35,6 +35,7 @@ import io.quarkus.arc.processor.BuiltinScope; import io.quarkus.arc.processor.InvokerBuilder; import io.quarkus.arc.processor.InvokerInfo; +import io.quarkus.arc.processor.KotlinUtils; import io.quarkus.bootstrap.classloading.QuarkusClassLoader; import io.quarkus.deployment.Capabilities; import io.quarkus.deployment.Capability; @@ -59,6 +60,7 @@ import io.quarkus.gizmo.ClassOutput; import io.quarkus.vertx.ConsumeEvent; import io.quarkus.vertx.core.deployment.CoreVertxBuildItem; +import io.quarkus.vertx.deployment.spi.EventConsumerInvokerCustomizerBuildItem; import io.quarkus.vertx.runtime.EventConsumerInfo; import io.quarkus.vertx.runtime.VertxEventBusConsumerRecorder; import io.quarkus.vertx.runtime.VertxProducer; @@ -137,6 +139,7 @@ public UnremovableBeanBuildItem unremovableBeans() { void collectEventConsumers( BeanRegistrationPhaseBuildItem beanRegistrationPhase, InvokerFactoryBuildItem invokerFactory, + List invokerCustomizers, BuildProducer messageConsumerBusinessMethods, BuildProducer errors) { // We need to collect all business methods annotated with @ConsumeEvent first @@ -149,8 +152,13 @@ void collectEventConsumers( AnnotationInstance consumeEvent = annotationStore.getAnnotation(method, CONSUME_EVENT); if (consumeEvent != null) { // Validate method params and return type + int parametersCount = method.parametersCount(); + if (KotlinUtils.isKotlinSuspendMethod(method)) { + parametersCount--; + } + List params = method.parameterTypes(); - if (params.size() == 2) { + if (parametersCount == 2) { if (!isMessageHeaders(params.get(0).name())) { // If there are two parameters, the first must be message headers. throw new IllegalStateException(String.format( @@ -161,12 +169,13 @@ void collectEventConsumers( "An event consumer business method with two parameters must not accept io.vertx.core.eventbus.Message or io.vertx.mutiny.core.eventbus.Message: %s [method: %s, bean:%s]", params, method, bean)); } - } else if (params.size() != 1) { + } else if (parametersCount != 1) { throw new IllegalStateException(String.format( "An event consumer business method must accept exactly one parameter: %s [method: %s, bean:%s]", params, method, bean)); } - if (method.returnType().kind() != Kind.VOID && VertxConstants.isMessage(params.get(0).name())) { + if (method.returnType().kind() != Kind.VOID && VertxConstants.isMessage(params.get(0).name()) + && !KotlinUtils.isKotlinSuspendMethod(method)) { throw new IllegalStateException(String.format( "An event consumer business method that accepts io.vertx.core.eventbus.Message or io.vertx.mutiny.core.eventbus.Message must return void [method: %s, bean:%s]", method, bean)); @@ -181,16 +190,16 @@ void collectEventConsumers( InvokerBuilder builder = invokerFactory.createInvoker(bean, method) .withInstanceLookup(); - if (method.parametersCount() == 1 && method.parameterType(0).name().equals(MESSAGE)) { + if (parametersCount == 1 && method.parameterType(0).name().equals(MESSAGE)) { // io.vertx.core.eventbus.Message // no transformation required - } else if (method.parametersCount() == 1 && method.parameterType(0).name().equals(MUTINY_MESSAGE)) { + } else if (parametersCount == 1 && method.parameterType(0).name().equals(MUTINY_MESSAGE)) { // io.vertx.mutiny.core.eventbus.Message builder.withArgumentTransformer(0, io.vertx.mutiny.core.eventbus.Message.class, "newInstance"); - } else if (method.parametersCount() == 1) { + } else if (parametersCount == 1) { // parameter is payload builder.withArgumentTransformer(0, io.vertx.core.eventbus.Message.class, "body"); - } else if (method.parametersCount() == 2 && method.parameterType(0).name().equals(MUTINY_MESSAGE_HEADERS)) { + } else if (parametersCount == 2 && method.parameterType(0).name().equals(MUTINY_MESSAGE_HEADERS)) { // if the method expects Mutiny MultiMap, wrap the Vert.x MultiMap builder.withArgumentTransformer(0, io.vertx.mutiny.core.MultiMap.class, "newInstance"); } @@ -199,11 +208,16 @@ void collectEventConsumers( builder.withReturnValueTransformer(Uni.class, "subscribeAsCompletionStage"); } + // the rest of Kotlin suspend function support is in the `vertx-kotlin` extension + for (EventConsumerInvokerCustomizerBuildItem invokerCustomizer : invokerCustomizers) { + invokerCustomizer.getInvokerCustomizer().accept(method, builder); + } + InvokerInfo invoker = builder.build(); messageConsumerBusinessMethods.produce(new EventConsumerBusinessMethodItem(bean, consumeEvent, method.hasAnnotation(Blocking.class), method.hasAnnotation(RunOnVirtualThread.class), - params.size() == 2, invoker)); + parametersCount == 2, invoker)); LOGGER.debugf("Found event consumer business method %s declared on %s", method, bean); } } diff --git a/extensions/vertx/kotlin/deployment/pom.xml b/extensions/vertx/kotlin/deployment/pom.xml new file mode 100644 index 0000000000000..45cb0e8d86ae9 --- /dev/null +++ b/extensions/vertx/kotlin/deployment/pom.xml @@ -0,0 +1,52 @@ + + + + quarkus-vertx-kotlin-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-vertx-kotlin-deployment + Quarkus - Vert.x - Kotlin - Deployment + + + + io.quarkus + quarkus-vertx-kotlin + + + + io.quarkus + quarkus-arc-deployment + + + io.quarkus + quarkus-vertx-deployment-spi + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + de.thetaphi + forbiddenapis + + + + + diff --git a/extensions/vertx/kotlin/deployment/src/main/java/io/quarkus/vertx/kotlin/deployment/VertxKotlinProcessor.java b/extensions/vertx/kotlin/deployment/src/main/java/io/quarkus/vertx/kotlin/deployment/VertxKotlinProcessor.java new file mode 100644 index 0000000000000..8c7dbbb22c778 --- /dev/null +++ b/extensions/vertx/kotlin/deployment/src/main/java/io/quarkus/vertx/kotlin/deployment/VertxKotlinProcessor.java @@ -0,0 +1,48 @@ +package io.quarkus.vertx.kotlin.deployment; + +import java.util.function.BiConsumer; + +import org.jboss.jandex.MethodInfo; + +import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.arc.processor.InvokerBuilder; +import io.quarkus.arc.processor.KotlinUtils; +import io.quarkus.bootstrap.classloading.QuarkusClassLoader; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.vertx.deployment.spi.EventConsumerInvokerCustomizerBuildItem; +import io.quarkus.vertx.kotlin.runtime.ApplicationCoroutineScope; +import io.quarkus.vertx.kotlin.runtime.CoroutineInvoker; + +public class VertxKotlinProcessor { + private static final String KOTLIN_COROUTINE_SCOPE = "kotlinx.coroutines.CoroutineScope"; + + @BuildStep + void produceCoroutineScope(BuildProducer additionalBean) { + if (!QuarkusClassLoader.isClassPresentAtRuntime(KOTLIN_COROUTINE_SCOPE)) { + return; + } + + additionalBean.produce(AdditionalBeanBuildItem.builder() + .addBeanClass(ApplicationCoroutineScope.class) + .setUnremovable() + .build()); + } + + @BuildStep + void produceInvokerCustomizerForSuspendConsumeEventMethods( + BuildProducer customizers) { + if (!QuarkusClassLoader.isClassPresentAtRuntime(KOTLIN_COROUTINE_SCOPE)) { + return; + } + + customizers.produce(new EventConsumerInvokerCustomizerBuildItem(new BiConsumer() { + @Override + public void accept(MethodInfo method, InvokerBuilder invokerBuilder) { + if (KotlinUtils.isKotlinSuspendMethod(method)) { + invokerBuilder.withInvocationWrapper(CoroutineInvoker.class, "inNewCoroutine"); + } + } + })); + } +} diff --git a/extensions/vertx/kotlin/pom.xml b/extensions/vertx/kotlin/pom.xml new file mode 100644 index 0000000000000..92dfa833c09bc --- /dev/null +++ b/extensions/vertx/kotlin/pom.xml @@ -0,0 +1,19 @@ + + + + quarkus-vertx-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-vertx-kotlin-parent + Quarkus - Vert.x - Kotlin + pom + + deployment + runtime + + diff --git a/extensions/vertx/kotlin/runtime/pom.xml b/extensions/vertx/kotlin/runtime/pom.xml new file mode 100644 index 0000000000000..ec1e2e7f975b3 --- /dev/null +++ b/extensions/vertx/kotlin/runtime/pom.xml @@ -0,0 +1,121 @@ + + + + quarkus-vertx-kotlin-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-vertx-kotlin + Quarkus - Vert.x - Kotlin - Runtime + + + + io.quarkus + quarkus-arc + + + io.vertx + vertx-core + + + org.jetbrains.kotlin + kotlin-stdlib + true + + + org.jetbrains.kotlinx + kotlinx-coroutines-core + true + + + + + ${project.basedir}/src/main/kotlin + ${project.basedir}/src/test/kotlin + + + + io.quarkus + quarkus-extension-maven-plugin + + + io.quarkus:quarkus-vertx + + + + + + org.jetbrains.kotlin + kotlin-maven-plugin + ${kotlin.version} + + + compile + + compile + + + + test-compile + + test-compile + + + + + ${maven.compiler.target} + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + default-compile + none + + + + default-testCompile + none + + + java-compile + compile + + compile + + + + java-test-compile + test-compile + + testCompile + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + false + + + + + diff --git a/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/ApplicationCoroutineScope.kt b/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/ApplicationCoroutineScope.kt new file mode 100644 index 0000000000000..7a9719fdbeb86 --- /dev/null +++ b/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/ApplicationCoroutineScope.kt @@ -0,0 +1,18 @@ +package io.quarkus.vertx.kotlin.runtime + +import jakarta.annotation.PreDestroy +import jakarta.inject.Singleton +import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel + +@Singleton +class ApplicationCoroutineScope : CoroutineScope, AutoCloseable { + override val coroutineContext: CoroutineContext = SupervisorJob() + + @PreDestroy + override fun close() { + coroutineContext.cancel() + } +} diff --git a/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/CoroutineInvoker.kt b/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/CoroutineInvoker.kt new file mode 100644 index 0000000000000..48e5a26f084f0 --- /dev/null +++ b/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/CoroutineInvoker.kt @@ -0,0 +1,36 @@ +package io.quarkus.vertx.kotlin.runtime + +import io.quarkus.arc.Arc +import io.vertx.core.Vertx +import jakarta.enterprise.invoke.Invoker +import java.util.concurrent.CompletionStage +import kotlin.coroutines.suspendCoroutine +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.async +import kotlinx.coroutines.future.asCompletableFuture + +object CoroutineInvoker { + @JvmStatic + fun inNewCoroutine( + instance: T, + arguments: Array, + invoker: Invoker + ): CompletionStage { + val coroutineScope = Arc.container().instance(ApplicationCoroutineScope::class.java).get() + val dispatcher: CoroutineDispatcher = + Vertx.currentContext()?.let(::VertxDispatcher) + ?: throw IllegalStateException("No Vertx context found") + + return coroutineScope + .async(context = dispatcher) { + suspendCoroutine { continuation -> + val newArguments = arrayOfNulls(arguments.size + 1) + arguments.copyInto(newArguments) + newArguments[arguments.size] = continuation + invoker.invoke(instance, newArguments) + } + null + } + .asCompletableFuture() + } +} diff --git a/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/VertxDispatcher.kt b/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/VertxDispatcher.kt new file mode 100644 index 0000000000000..67cee1bfab8b8 --- /dev/null +++ b/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/VertxDispatcher.kt @@ -0,0 +1,24 @@ +package io.quarkus.vertx.kotlin.runtime + +import io.quarkus.arc.Arc +import io.vertx.core.Context +import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.CoroutineDispatcher + +class VertxDispatcher(private val vertxContext: Context) : CoroutineDispatcher() { + override fun dispatch(context: CoroutineContext, block: Runnable) { + val requestContext = Arc.container().requestContext() + vertxContext.runOnContext { + if (requestContext.isActive) { + block.run() + } else { + try { + requestContext.activate() + block.run() + } finally { + requestContext.terminate() + } + } + } + } +} diff --git a/extensions/vertx/kotlin/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/vertx/kotlin/runtime/src/main/resources/META-INF/quarkus-extension.yaml new file mode 100644 index 0000000000000..42269a5f6e853 --- /dev/null +++ b/extensions/vertx/kotlin/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -0,0 +1,5 @@ +--- +artifact: ${project.groupId}:${project.artifactId}:${project.version} +name: "Vert.x Kotlin integration" +metadata: + unlisted: true diff --git a/extensions/vertx/pom.xml b/extensions/vertx/pom.xml index e58674406920e..6c19b787b8145 100644 --- a/extensions/vertx/pom.xml +++ b/extensions/vertx/pom.xml @@ -15,6 +15,8 @@ pom deployment + deployment-spi + kotlin latebound-mdc-provider runtime diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 88f1308775726..52a88ee23af2c 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -230,6 +230,7 @@ vertx-web vertx-web-jackson vertx + vertx-kotlin websockets spring-di spring-web diff --git a/integration-tests/vertx-kotlin/pom.xml b/integration-tests/vertx-kotlin/pom.xml new file mode 100644 index 0000000000000..14a08e162a825 --- /dev/null +++ b/integration-tests/vertx-kotlin/pom.xml @@ -0,0 +1,103 @@ + + + 4.0.0 + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + + quarkus-integration-test-vertx-kotlin + Quarkus - Integration Tests - Vert.x Kotlin + + + + io.quarkus + quarkus-kotlin + + + io.quarkus + quarkus-vertx + + + io.quarkus + quarkus-junit5 + test + + + org.assertj + assertj-core + test + + + + + io.quarkus + quarkus-kotlin-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-vertx-deployment + ${project.version} + pom + test + + + * + * + + + + + + + src/main/kotlin + src/test/kotlin + + + org.jetbrains.kotlin + kotlin-maven-plugin + + + compile + compile + + compile + + + + test-compile + test-compile + + test-compile + + + + + + io.quarkus + quarkus-maven-plugin + + + + build + + + + + + + + diff --git a/integration-tests/vertx-kotlin/src/main/kotlin/io/quarkus/it/vertx/kotlin/MessageConsumers.kt b/integration-tests/vertx-kotlin/src/main/kotlin/io/quarkus/it/vertx/kotlin/MessageConsumers.kt new file mode 100644 index 0000000000000..d6ec06eedc5e4 --- /dev/null +++ b/integration-tests/vertx-kotlin/src/main/kotlin/io/quarkus/it/vertx/kotlin/MessageConsumers.kt @@ -0,0 +1,49 @@ +package io.quarkus.it.vertx.kotlin + +import io.quarkus.vertx.ConsumeEvent +import jakarta.inject.Singleton +import java.util.concurrent.CountDownLatch + +@Singleton +class MessageConsumers { + companion object { + @Volatile var latch: CountDownLatch = CountDownLatch(1) + + var message: String? = null + + fun reset() { + latch = CountDownLatch(1) + message = null + } + } + + @ConsumeEvent("message") + fun message(msg: io.vertx.core.eventbus.Message) { + message = msg.body() + latch.countDown() + } + + @ConsumeEvent("mutiny-message") + fun mutinyMessage(msg: io.vertx.mutiny.core.eventbus.Message) { + message = msg.body() + latch.countDown() + } + + @ConsumeEvent("payload") + fun payload(msg: String) { + message = msg + latch.countDown() + } + + @ConsumeEvent("headers-payload") + fun headersPayload(headers: io.vertx.core.MultiMap, msg: String) { + message = "${headers["header"]} - $msg" + latch.countDown() + } + + @ConsumeEvent("mutiny-headers-payload") + fun mutinyHeadersPayload(headers: io.vertx.mutiny.core.MultiMap, msg: String) { + message = "${headers["header"]} - $msg" + latch.countDown() + } +} diff --git a/integration-tests/vertx-kotlin/src/main/kotlin/io/quarkus/it/vertx/kotlin/SuspendMessageConsumers.kt b/integration-tests/vertx-kotlin/src/main/kotlin/io/quarkus/it/vertx/kotlin/SuspendMessageConsumers.kt new file mode 100644 index 0000000000000..58e5b76788e96 --- /dev/null +++ b/integration-tests/vertx-kotlin/src/main/kotlin/io/quarkus/it/vertx/kotlin/SuspendMessageConsumers.kt @@ -0,0 +1,55 @@ +package io.quarkus.it.vertx.kotlin + +import io.quarkus.vertx.ConsumeEvent +import jakarta.inject.Singleton +import java.util.concurrent.CountDownLatch +import kotlinx.coroutines.delay + +@Singleton +class SuspendMessageConsumers { + companion object { + @Volatile var latch: CountDownLatch = CountDownLatch(1) + + var message: String? = null + + fun reset() { + latch = CountDownLatch(1) + message = null + } + } + + @ConsumeEvent("suspend-message") + suspend fun message(msg: io.vertx.core.eventbus.Message) { + delay(100) + message = msg.body() + latch.countDown() + } + + @ConsumeEvent("suspend-mutiny-message") + suspend fun mutinyMessage(msg: io.vertx.mutiny.core.eventbus.Message) { + delay(100) + message = msg.body() + latch.countDown() + } + + @ConsumeEvent("suspend-payload") + suspend fun payload(msg: String) { + delay(100) + message = msg + latch.countDown() + } + + @ConsumeEvent("suspend-headers-payload") + suspend fun headersPayload(headers: io.vertx.core.MultiMap, msg: String) { + delay(100) + message = "${headers["header"]} - $msg" + latch.countDown() + } + + @ConsumeEvent("suspend-mutiny-headers-payload") + suspend fun mutinyHeadersPayload(headers: io.vertx.mutiny.core.MultiMap, msg: String) { + delay(100) + message = "${headers["header"]} - $msg" + latch.countDown() + } +} diff --git a/integration-tests/vertx-kotlin/src/test/kotlin/io/quarkus/it/vertx/kotlin/KotlinConsumeEventTest.kt b/integration-tests/vertx-kotlin/src/test/kotlin/io/quarkus/it/vertx/kotlin/KotlinConsumeEventTest.kt new file mode 100644 index 0000000000000..5b5034bd4f918 --- /dev/null +++ b/integration-tests/vertx-kotlin/src/test/kotlin/io/quarkus/it/vertx/kotlin/KotlinConsumeEventTest.kt @@ -0,0 +1,62 @@ +package io.quarkus.it.vertx.kotlin + +import io.quarkus.test.junit.QuarkusTest +import io.vertx.core.eventbus.DeliveryOptions +import io.vertx.core.eventbus.EventBus +import jakarta.inject.Inject +import java.util.concurrent.TimeUnit +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +@QuarkusTest +class KotlinConsumeEventTest { + @Inject lateinit var bus: EventBus + + @BeforeEach + fun setup() { + MessageConsumers.reset() + } + + @Test + fun message() { + bus.send("message", "message") + + MessageConsumers.latch.await(2, TimeUnit.SECONDS) + assertEquals("message", MessageConsumers.message) + } + + @Test + fun mutinyMessage() { + bus.send("mutiny-message", "mutiny message") + + MessageConsumers.latch.await(2, TimeUnit.SECONDS) + assertEquals("mutiny message", MessageConsumers.message) + } + + @Test + fun payload() { + bus.send("payload", "just payload") + + MessageConsumers.latch.await(2, TimeUnit.SECONDS) + assertEquals("just payload", MessageConsumers.message) + } + + @Test + fun headersPayload() { + val options = DeliveryOptions().addHeader("header", "test header") + bus.send("headers-payload", "payload", options) + + MessageConsumers.latch.await(2, TimeUnit.SECONDS) + assertEquals("test header - payload", MessageConsumers.message) + } + + @Test + fun mutinyHeadersPayload() { + val options = DeliveryOptions().addHeader("header", "test mutiny header") + bus.send("mutiny-headers-payload", "payload", options) + + MessageConsumers.latch.await(2, TimeUnit.SECONDS) + assertEquals("test mutiny header - payload", MessageConsumers.message) + } +} diff --git a/integration-tests/vertx-kotlin/src/test/kotlin/io/quarkus/it/vertx/kotlin/KotlinSuspendConsumeEventTest.kt b/integration-tests/vertx-kotlin/src/test/kotlin/io/quarkus/it/vertx/kotlin/KotlinSuspendConsumeEventTest.kt new file mode 100644 index 0000000000000..975c80e428426 --- /dev/null +++ b/integration-tests/vertx-kotlin/src/test/kotlin/io/quarkus/it/vertx/kotlin/KotlinSuspendConsumeEventTest.kt @@ -0,0 +1,62 @@ +package io.quarkus.it.vertx.kotlin + +import io.quarkus.test.junit.QuarkusTest +import io.vertx.core.eventbus.DeliveryOptions +import io.vertx.core.eventbus.EventBus +import jakarta.inject.Inject +import java.util.concurrent.TimeUnit +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +@QuarkusTest +class KotlinSuspendConsumeEventTest { + @Inject lateinit var bus: EventBus + + @BeforeEach + fun setup() { + SuspendMessageConsumers.reset() + } + + @Test + fun message() { + bus.send("suspend-message", "message") + + SuspendMessageConsumers.latch.await(2, TimeUnit.SECONDS) + assertEquals("message", SuspendMessageConsumers.message) + } + + @Test + fun mutinyMessage() { + bus.send("suspend-mutiny-message", "mutiny message") + + SuspendMessageConsumers.latch.await(2, TimeUnit.SECONDS) + assertEquals("mutiny message", SuspendMessageConsumers.message) + } + + @Test + fun payload() { + bus.send("suspend-payload", "just payload") + + SuspendMessageConsumers.latch.await(2, TimeUnit.SECONDS) + assertEquals("just payload", SuspendMessageConsumers.message) + } + + @Test + fun headersPayload() { + val options = DeliveryOptions().addHeader("header", "test header") + bus.send("suspend-headers-payload", "payload", options) + + SuspendMessageConsumers.latch.await(2, TimeUnit.SECONDS) + assertEquals("test header - payload", SuspendMessageConsumers.message) + } + + @Test + fun mutinyHeadersPayload() { + val options = DeliveryOptions().addHeader("header", "test mutiny header") + bus.send("suspend-mutiny-headers-payload", "payload", options) + + SuspendMessageConsumers.latch.await(2, TimeUnit.SECONDS) + assertEquals("test mutiny header - payload", SuspendMessageConsumers.message) + } +}