diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java index 75059fde21d1e..195a2581fc70e 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusConsumer.java @@ -1,6 +1,5 @@ package io.quarkus.vertx.deployment; -import static io.quarkus.vertx.ConsumeEvent.FAILURE_CODE; import static io.quarkus.vertx.deployment.VertxConstants.AXLE_MESSAGE; import static io.quarkus.vertx.deployment.VertxConstants.COMPLETION_STAGE; import static io.quarkus.vertx.deployment.VertxConstants.MESSAGE; @@ -8,7 +7,6 @@ import static io.quarkus.vertx.deployment.VertxConstants.RX_MESSAGE; import static io.quarkus.vertx.deployment.VertxConstants.UNI; -import java.lang.annotation.Annotation; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; @@ -30,40 +28,27 @@ import io.quarkus.gizmo.AssignableResultHandle; import io.quarkus.gizmo.BranchResult; import io.quarkus.gizmo.BytecodeCreator; -import io.quarkus.gizmo.CatchBlockCreator; import io.quarkus.gizmo.ClassCreator; import io.quarkus.gizmo.ClassOutput; import io.quarkus.gizmo.FunctionCreator; import io.quarkus.gizmo.MethodCreator; import io.quarkus.gizmo.MethodDescriptor; import io.quarkus.gizmo.ResultHandle; -import io.quarkus.gizmo.TryBlock; import io.quarkus.runtime.util.HashUtil; import io.quarkus.vertx.ConsumeEvent; import io.quarkus.vertx.runtime.EventConsumerInvoker; import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; import io.vertx.core.eventbus.Message; class EventBusConsumer { + private static final String INVOKER_SUFFIX = "_VertxInvoker"; + private static final MethodDescriptor ARC_CONTAINER = MethodDescriptor .ofMethod(Arc.class, "container", ArcContainer.class); private static final MethodDescriptor INSTANCE_HANDLE_GET = MethodDescriptor.ofMethod(InstanceHandle.class, "get", Object.class); - private static final MethodDescriptor ARC_CONTAINER_INSTANCE_FOR_TYPE = MethodDescriptor - .ofMethod(ArcContainer.class, - "instance", InstanceHandle.class, - Class.class, Annotation[].class); - private static final MethodDescriptor VERTX_EXECUTE_BLOCKING = MethodDescriptor.ofMethod(Vertx.class, - "executeBlocking", void.class, Handler.class, boolean.class, Handler.class); - private static final MethodDescriptor FUTURE_COMPLETE = MethodDescriptor.ofMethod(Future.class, - "complete", void.class, Object.class); - private static final MethodDescriptor FUTURE_FAIL = MethodDescriptor.ofMethod(Future.class, - "fail", void.class, Throwable.class); private static final MethodDescriptor ARC_CONTAINER_BEAN = MethodDescriptor.ofMethod(ArcContainer.class, "bean", InjectableBean.class, String.class); private static final MethodDescriptor ARC_CONTAINER_INSTANCE_FOR_BEAN = MethodDescriptor @@ -118,51 +103,23 @@ static String generateInvoker(BeanInfo bean, MethodInfo method, String generatedName = targetPackage.replace('.', '/') + "/" + baseName + INVOKER_SUFFIX + "_" + method.name() + "_" + HashUtil.sha1(sigBuilder.toString()); + boolean blocking; + AnnotationValue blockingValue = consumeEvent.value("blocking"); + blocking = method.hasAnnotation(BLOCKING) || (blockingValue != null && blockingValue.asBoolean()); + ClassCreator invokerCreator = ClassCreator.builder().classOutput(classOutput).className(generatedName) .interfaces(EventConsumerInvoker.class).build(); // The method descriptor is: void invokeBean(Object message) MethodCreator invoke = invokerCreator.getMethodCreator("invokeBean", void.class, Object.class); - ResultHandle containerHandle = invoke.invokeStaticMethod(ARC_CONTAINER); - AnnotationValue blocking = consumeEvent.value("blocking"); - boolean blockingAnnotation = method.hasAnnotation(BLOCKING); - if ((blocking != null && blocking.asBoolean()) || blockingAnnotation) { - // Blocking operation must be performed on a worker thread - ResultHandle vertxHandle = invoke - .invokeInterfaceMethod(INSTANCE_HANDLE_GET, - invoke.invokeInterfaceMethod(ARC_CONTAINER_INSTANCE_FOR_TYPE, containerHandle, - invoke.loadClass(Vertx.class), - invoke.newArray(Annotation.class.getName(), invoke.load(0)))); - - FunctionCreator func = invoke.createFunction(Handler.class); - BytecodeCreator funcBytecode = func.getBytecode(); - AssignableResultHandle messageHandle = funcBytecode.createVariable(Message.class); - funcBytecode.assign(messageHandle, invoke.getMethodParam(0)); - TryBlock tryBlock = funcBytecode.tryBlock(); - invoke(bean, method, messageHandle, tryBlock); - tryBlock.invokeInterfaceMethod(FUTURE_COMPLETE, funcBytecode.getMethodParam(0), tryBlock.loadNull()); - - CatchBlockCreator catchBlock = tryBlock.addCatch(Exception.class); - // Need to reply with the caught exception - using Throwable.toString on purpose to get the class name. - ResultHandle failureMessage = catchBlock - .invokeVirtualMethod(THROWABLE_TO_STRING, catchBlock.getCaughtException()); - ResultHandle failureStatus = catchBlock.load(FAILURE_CODE); - catchBlock.invokeInterfaceMethod( - MESSAGE_FAIL, - messageHandle, - failureStatus, - failureMessage); - // Completing successfully, the failure has been sent to the sender. - catchBlock.invokeInterfaceMethod(FUTURE_COMPLETE, funcBytecode.getMethodParam(0), catchBlock.loadNull()); + if (blocking) { + MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class); + isBlocking.returnValue(isBlocking.load(true)); + } - funcBytecode.returnValue(null); + invoke(bean, method, invoke.getMethodParam(0), invoke); - invoke.invokeInterfaceMethod(VERTX_EXECUTE_BLOCKING, vertxHandle, func.getInstance(), invoke.load(false), - invoke.loadNull()); - } else { - invoke(bean, method, invoke.getMethodParam(0), invoke); - } invoke.returnValue(null); invokerCreator.close(); return generatedName.replace('/', '.'); diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerMethodTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerMethodTest.java index b3d1e5eaebed3..3182380d96068 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerMethodTest.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerMethodTest.java @@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import io.quarkus.arc.Arc; import io.quarkus.test.QuarkusUnitTest; import io.quarkus.vertx.ConsumeEvent; import io.smallrye.common.annotation.Blocking; @@ -39,9 +38,11 @@ public class MessageConsumerMethodTest { @Inject SimpleBean simpleBean; + @Inject + EventBus eventBus; + @Test public void testSend() throws InterruptedException { - EventBus eventBus = Arc.container().instance(EventBus.class).get(); BlockingQueue synchronizer = new LinkedBlockingQueue<>(); eventBus.request("foo", "hello", ar -> { if (ar.succeeded()) { @@ -59,7 +60,6 @@ public void testSend() throws InterruptedException { @Test public void testSendAsync() throws InterruptedException { - EventBus eventBus = Arc.container().instance(EventBus.class).get(); BlockingQueue synchronizer = new LinkedBlockingQueue<>(); eventBus.request("foo-async", "hello", ar -> { if (ar.succeeded()) { @@ -77,7 +77,6 @@ public void testSendAsync() throws InterruptedException { @Test public void testSendAsyncUni() throws InterruptedException { - EventBus eventBus = Arc.container().instance(EventBus.class).get(); BlockingQueue synchronizer = new LinkedBlockingQueue<>(); eventBus.request("foo-async-uni", "hello-uni", ar -> { if (ar.succeeded()) { @@ -95,7 +94,6 @@ public void testSendAsyncUni() throws InterruptedException { @Test public void testSendDefaultAddress() throws InterruptedException { - EventBus eventBus = Arc.container().instance(EventBus.class).get(); BlockingQueue synchronizer = new LinkedBlockingQueue<>(); eventBus.request("io.quarkus.vertx.deployment.MessageConsumerMethodTest$SimpleBean", "Hello", ar -> { if (ar.succeeded()) { @@ -113,7 +111,6 @@ public void testSendDefaultAddress() throws InterruptedException { @Test public void testRequestContext() throws InterruptedException { - EventBus eventBus = Arc.container().instance(EventBus.class).get(); BlockingQueue synchronizer = new LinkedBlockingQueue<>(); eventBus.request("request", "Martin", ar -> { if (ar.succeeded()) { @@ -129,10 +126,26 @@ public void testRequestContext() throws InterruptedException { assertEquals("MArtin", synchronizer.poll(2, TimeUnit.SECONDS)); } + @Test + public void testBlockingRequestContext() throws InterruptedException { + BlockingQueue synchronizer = new LinkedBlockingQueue<>(); + eventBus.request("blocking-request", "Lu", ar -> { + if (ar.succeeded()) { + try { + synchronizer.put(ar.result().body()); + } catch (InterruptedException e) { + fail(e); + } + } else { + fail(ar.cause()); + } + }); + assertEquals("Lu", synchronizer.poll(2, TimeUnit.SECONDS)); + } + @Test public void testPublish() throws InterruptedException { SimpleBean.MESSAGES.clear(); - EventBus eventBus = Arc.container().instance(EventBus.class).get(); SimpleBean.latch = new CountDownLatch(2); eventBus.publish("pub", "Hello"); SimpleBean.latch.await(2, TimeUnit.SECONDS); @@ -143,7 +156,6 @@ public void testPublish() throws InterruptedException { @Test public void testBlockingConsumer() throws InterruptedException { SimpleBean.MESSAGES.clear(); - EventBus eventBus = Arc.container().instance(EventBus.class).get(); SimpleBean.latch = new CountDownLatch(1); eventBus.publish("blocking", "Hello"); SimpleBean.latch.await(2, TimeUnit.SECONDS); @@ -155,7 +167,6 @@ public void testBlockingConsumer() throws InterruptedException { @Test public void testPublishRx() throws InterruptedException { SimpleBean.MESSAGES.clear(); - EventBus eventBus = Arc.container().instance(EventBus.class).get(); SimpleBean.latch = new CountDownLatch(1); eventBus.publish("pub-rx", "Hello"); SimpleBean.latch.await(2, TimeUnit.SECONDS); @@ -165,7 +176,6 @@ public void testPublishRx() throws InterruptedException { @Test public void testPublishAxle() throws InterruptedException { SimpleBean.MESSAGES.clear(); - EventBus eventBus = Arc.container().instance(EventBus.class).get(); SimpleBean.latch = new CountDownLatch(1); eventBus.publish("pub-axle", "Hello"); SimpleBean.latch.await(2, TimeUnit.SECONDS); @@ -175,7 +185,6 @@ public void testPublishAxle() throws InterruptedException { @Test public void testPublishMutiny() throws InterruptedException { SimpleBean.MESSAGES.clear(); - EventBus eventBus = Arc.container().instance(EventBus.class).get(); SimpleBean.latch = new CountDownLatch(1); eventBus.publish("pub-mutiny", "Hello"); SimpleBean.latch.await(2, TimeUnit.SECONDS); @@ -185,7 +194,6 @@ public void testPublishMutiny() throws InterruptedException { @Test public void testBlockingConsumerUsingSmallRyeBlocking() throws InterruptedException { SimpleBean.MESSAGES.clear(); - EventBus eventBus = Arc.container().instance(EventBus.class).get(); SimpleBean.latch = new CountDownLatch(1); eventBus.publish("worker", "Hello"); SimpleBean.latch.await(2, TimeUnit.SECONDS); @@ -270,6 +278,12 @@ void consumeBlockingUsingRunOnWorkerThread(String message) { MESSAGES.add(message.toLowerCase() + "::" + Context.isOnWorkerThread()); latch.countDown(); } + + @Blocking + @ConsumeEvent("blocking-request") + String blockingRequestContextActive(String message) { + return transformer.transform(message); + } } @RequestScoped diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java index 234012d266f7b..beeede5b396b8 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/EventConsumerInvoker.java @@ -9,4 +9,8 @@ */ public interface EventConsumerInvoker extends BeanInvoker> { + default boolean isBlocking() { + return false; + } + } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java index 466b423bcbf8e..d656fad5029c0 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxRecorder.java @@ -18,6 +18,7 @@ import io.quarkus.vertx.ConsumeEvent; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; @@ -81,10 +82,24 @@ void registerMessageConsumers(Map messageConsumerConfigura consumer.handler(new Handler>() { @Override public void handle(Message m) { - try { - invoker.invoke(m); - } catch (Throwable e) { - m.fail(ConsumeEvent.FAILURE_CODE, e.toString()); + if (invoker.isBlocking()) { + vertx.executeBlocking(new Handler>() { + @Override + public void handle(Promise event) { + try { + invoker.invoke(m); + } catch (Throwable e) { + m.fail(ConsumeEvent.FAILURE_CODE, e.toString()); + } + event.complete(); + } + }, null); + } else { + try { + invoker.invoke(m); + } catch (Throwable e) { + m.fail(ConsumeEvent.FAILURE_CODE, e.toString()); + } } } });