Skip to content

Commit

Permalink
VertxContextSupport: add utility methods for Multi
Browse files Browse the repository at this point in the history
- subscribe() and subscribeWith() do not block the current thread
  • Loading branch information
mkouba committed Sep 22, 2023
1 parent c19faac commit 162d9af
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 3 deletions.
12 changes: 12 additions & 0 deletions docs/src/main/asciidoc/vertx.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,18 @@ NOTE: If necessary, the CDI request context is activated during execution of the
CAUTION: `VertxContextSupport#subscribeAndAwait()` must not be called on an event loop!
It is also possible to subscribe to a supplied `io.smallrye.mutiny.Multi` on a Vert.x duplicated context.
In this case, the current thread is not blocked and the supplied subscription logic is used to consume the events.
[source, java]
----
void onStart(@Observes StartupEvent event, ExternalService service) {
VertxContextSupport.subscribeWith(() -> service.getFoos(), foo -> {
// do something useful with foo
});
}
----
== Going further
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import jakarta.enterprise.event.Observes;
Expand All @@ -18,6 +22,7 @@
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public class VertxContextSupportTest {
Expand All @@ -31,17 +36,23 @@ public class VertxContextSupportTest {
Alpha alpha;

@Test
public void testRunner() {
public void testRunner() throws InterruptedException {
assertEquals("foo", alpha.val);
assertTrue(alpha.latch.await(5, TimeUnit.SECONDS));
assertEquals(5, alpha.vals.size());
assertEquals(1, alpha.vals.get(0));
}

@Singleton
public static class Alpha {

String val;

final List<Integer> vals = new CopyOnWriteArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);

void onStart(@Observes StartupEvent event) {
Supplier<Uni<String>> supplier = new Supplier<Uni<String>>() {
Supplier<Uni<String>> uniSupplier = new Supplier<Uni<String>>() {
@Override
public Uni<String> get() {
assertTrue(VertxContext.isOnDuplicatedContext());
Expand All @@ -51,10 +62,21 @@ public Uni<String> get() {
}
};
try {
val = VertxContextSupport.subscribeAndAwait(supplier);
val = VertxContextSupport.subscribeAndAwait(uniSupplier);
} catch (Throwable e) {
fail();
}

Supplier<Multi<Integer>> multiSupplier = new Supplier<Multi<Integer>>() {

@Override
public Multi<Integer> get() {
assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Error", "Error");
return Multi.createFrom().items(1, 2, 3, 4, 5);
}
};
VertxContextSupport.subscribe(multiSupplier, ms -> ms.with(vals::add, latch::countDown));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.quarkus.vertx;

import java.util.function.Consumer;
import java.util.function.Supplier;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ManagedContext;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiSubscribe;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -58,6 +61,41 @@ public void handle(Void event) {
}).await().indefinitely();
}

/**
* Subscribes to the supplied {@link Multi} on a Vertx duplicated context; does not block the current thread.
*
* @param <T>
* @param multiSupplier
* @param subscribeConsumer
*/
public static <T> void subscribe(Supplier<Multi<T>> multiSupplier, Consumer<MultiSubscribe<T>> subscribeConsumer) {
Context context = getContext();
VertxContextSafetyToggle.setContextSafe(context, true);
context.runOnContext(new Handler<Void>() {

@Override
public void handle(Void event) {
subscribeConsumer.accept(multiSupplier.get().subscribe());
}
});
}

/**
* Subscribes to the supplied {@link Multi} on a Vertx duplicated context; does not block the current thread.
*
* @param <T>
* @param multiSupplier
* @param onItem
*/
public static <T> void subscribeWith(Supplier<Multi<T>> multiSupplier, Consumer<? super T> onItem) {
subscribe(multiSupplier, new Consumer<MultiSubscribe<T>>() {
@Override
public void accept(MultiSubscribe<T> ms) {
ms.with(onItem);
}
});
}

private static Context getContext() {
Context context = Vertx.currentContext();
if (context == null) {
Expand Down

0 comments on commit 162d9af

Please sign in to comment.