Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VertxContextSupport: add utility methods for Multi #36088

Merged
merged 1 commit into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/src/main/asciidoc/vertx.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,18 @@ NOTE: If necessary, the CDI request context is activated during execution of the

CAUTION: `VertxContextSupport#subscribeAndAwait()` must not be called on an event loop!

It is also possible to subscribe to a supplied `io.smallrye.mutiny.Multi` on a Vert.x duplicated context.
In this case, the current thread is not blocked and the supplied subscription logic is used to consume the events.

[source, java]
----
void onStart(@Observes StartupEvent event, ExternalService service) {
VertxContextSupport.subscribeWith(() -> service.getFoos(), foo -> {
// do something useful with foo
});
}
----


== Going further

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import jakarta.enterprise.event.Observes;
Expand All @@ -18,6 +22,7 @@
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public class VertxContextSupportTest {
Expand All @@ -31,17 +36,23 @@ public class VertxContextSupportTest {
Alpha alpha;

@Test
public void testRunner() {
public void testRunner() throws InterruptedException {
assertEquals("foo", alpha.val);
assertTrue(alpha.latch.await(5, TimeUnit.SECONDS));
assertEquals(5, alpha.vals.size());
assertEquals(1, alpha.vals.get(0));
}

@Singleton
public static class Alpha {

String val;

final List<Integer> vals = new CopyOnWriteArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);

void onStart(@Observes StartupEvent event) {
Supplier<Uni<String>> supplier = new Supplier<Uni<String>>() {
Supplier<Uni<String>> uniSupplier = new Supplier<Uni<String>>() {
@Override
public Uni<String> get() {
assertTrue(VertxContext.isOnDuplicatedContext());
Expand All @@ -51,10 +62,21 @@ public Uni<String> get() {
}
};
try {
val = VertxContextSupport.subscribeAndAwait(supplier);
val = VertxContextSupport.subscribeAndAwait(uniSupplier);
} catch (Throwable e) {
fail();
}

Supplier<Multi<Integer>> multiSupplier = new Supplier<Multi<Integer>>() {

@Override
public Multi<Integer> get() {
assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Error", "Error");
return Multi.createFrom().items(1, 2, 3, 4, 5);
}
};
VertxContextSupport.subscribe(multiSupplier, ms -> ms.with(vals::add, latch::countDown));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.quarkus.vertx;

import java.util.function.Consumer;
import java.util.function.Supplier;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ManagedContext;
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiSubscribe;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -58,6 +61,41 @@ public void handle(Void event) {
}).await().indefinitely();
}

/**
* Subscribes to the supplied {@link Multi} on a Vertx duplicated context; does not block the current thread.
*
* @param <T>
* @param multiSupplier
* @param subscribeConsumer
*/
public static <T> void subscribe(Supplier<Multi<T>> multiSupplier, Consumer<MultiSubscribe<T>> subscribeConsumer) {
Context context = getContext();
VertxContextSafetyToggle.setContextSafe(context, true);
context.runOnContext(new Handler<Void>() {

@Override
public void handle(Void event) {
subscribeConsumer.accept(multiSupplier.get().subscribe());
}
});
}

/**
* Subscribes to the supplied {@link Multi} on a Vertx duplicated context; does not block the current thread.
*
* @param <T>
* @param multiSupplier
* @param onItem
*/
public static <T> void subscribeWith(Supplier<Multi<T>> multiSupplier, Consumer<? super T> onItem) {
subscribe(multiSupplier, new Consumer<MultiSubscribe<T>>() {
@Override
public void accept(MultiSubscribe<T> ms) {
ms.with(onItem);
}
});
}

private static Context getContext() {
Context context = Vertx.currentContext();
if (context == null) {
Expand Down
Loading