From 162d9af7bb138458f95d13f767edd5da6da0dff7 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Fri, 22 Sep 2023 10:22:26 +0200 Subject: [PATCH] VertxContextSupport: add utility methods for Multi - subscribe() and subscribeWith() do not block the current thread --- docs/src/main/asciidoc/vertx.adoc | 12 ++++++ .../vertx/VertxContextSupportTest.java | 28 ++++++++++++-- .../io/quarkus/vertx/VertxContextSupport.java | 38 +++++++++++++++++++ 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/docs/src/main/asciidoc/vertx.adoc b/docs/src/main/asciidoc/vertx.adoc index e685e6b0a4fb5..c5e08b8f5e43d 100644 --- a/docs/src/main/asciidoc/vertx.adoc +++ b/docs/src/main/asciidoc/vertx.adoc @@ -445,6 +445,18 @@ NOTE: If necessary, the CDI request context is activated during execution of the CAUTION: `VertxContextSupport#subscribeAndAwait()` must not be called on an event loop! +It is also possible to subscribe to a supplied `io.smallrye.mutiny.Multi` on a Vert.x duplicated context. +In this case, the current thread is not blocked and the supplied subscription logic is used to consume the events. + +[source, java] +---- +void onStart(@Observes StartupEvent event, ExternalService service) { + VertxContextSupport.subscribeWith(() -> service.getFoos(), foo -> { + // do something useful with foo + }); +} +---- + == Going further diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/VertxContextSupportTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/VertxContextSupportTest.java index b34d631d355f8..f005acefa7a52 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/VertxContextSupportTest.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/VertxContextSupportTest.java @@ -4,6 +4,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import jakarta.enterprise.event.Observes; @@ -18,6 +22,7 @@ import io.quarkus.test.QuarkusUnitTest; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; public class VertxContextSupportTest { @@ -31,8 +36,11 @@ public class VertxContextSupportTest { Alpha alpha; @Test - public void testRunner() { + public void testRunner() throws InterruptedException { assertEquals("foo", alpha.val); + assertTrue(alpha.latch.await(5, TimeUnit.SECONDS)); + assertEquals(5, alpha.vals.size()); + assertEquals(1, alpha.vals.get(0)); } @Singleton @@ -40,8 +48,11 @@ public static class Alpha { String val; + final List vals = new CopyOnWriteArrayList<>(); + final CountDownLatch latch = new CountDownLatch(1); + void onStart(@Observes StartupEvent event) { - Supplier> supplier = new Supplier>() { + Supplier> uniSupplier = new Supplier>() { @Override public Uni get() { assertTrue(VertxContext.isOnDuplicatedContext()); @@ -51,10 +62,21 @@ public Uni get() { } }; try { - val = VertxContextSupport.subscribeAndAwait(supplier); + val = VertxContextSupport.subscribeAndAwait(uniSupplier); } catch (Throwable e) { fail(); } + + Supplier> multiSupplier = new Supplier>() { + + @Override + public Multi get() { + assertTrue(VertxContext.isOnDuplicatedContext()); + VertxContextSafetyToggle.validateContextIfExists("Error", "Error"); + return Multi.createFrom().items(1, 2, 3, 4, 5); + } + }; + VertxContextSupport.subscribe(multiSupplier, ms -> ms.with(vals::add, latch::countDown)); } } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/VertxContextSupport.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/VertxContextSupport.java index db19e799d0ce9..f349be8d0f8f8 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/VertxContextSupport.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/VertxContextSupport.java @@ -1,5 +1,6 @@ package io.quarkus.vertx; +import java.util.function.Consumer; import java.util.function.Supplier; import io.quarkus.arc.Arc; @@ -7,7 +8,9 @@ import io.quarkus.vertx.core.runtime.VertxCoreRecorder; import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle; import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.groups.MultiSubscribe; import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -58,6 +61,41 @@ public void handle(Void event) { }).await().indefinitely(); } + /** + * Subscribes to the supplied {@link Multi} on a Vertx duplicated context; does not block the current thread. + * + * @param + * @param multiSupplier + * @param subscribeConsumer + */ + public static void subscribe(Supplier> multiSupplier, Consumer> subscribeConsumer) { + Context context = getContext(); + VertxContextSafetyToggle.setContextSafe(context, true); + context.runOnContext(new Handler() { + + @Override + public void handle(Void event) { + subscribeConsumer.accept(multiSupplier.get().subscribe()); + } + }); + } + + /** + * Subscribes to the supplied {@link Multi} on a Vertx duplicated context; does not block the current thread. + * + * @param + * @param multiSupplier + * @param onItem + */ + public static void subscribeWith(Supplier> multiSupplier, Consumer onItem) { + subscribe(multiSupplier, new Consumer>() { + @Override + public void accept(MultiSubscribe ms) { + ms.with(onItem); + } + }); + } + private static Context getContext() { Context context = Vertx.currentContext(); if (context == null) {