Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vertx - activate the request context for blocking ConsumeEvent methods #14143

Merged
merged 1 commit into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package io.quarkus.vertx.deployment;

import static io.quarkus.vertx.ConsumeEvent.FAILURE_CODE;
import static io.quarkus.vertx.deployment.VertxConstants.AXLE_MESSAGE;
import static io.quarkus.vertx.deployment.VertxConstants.COMPLETION_STAGE;
import static io.quarkus.vertx.deployment.VertxConstants.MESSAGE;
import static io.quarkus.vertx.deployment.VertxConstants.MUTINY_MESSAGE;
import static io.quarkus.vertx.deployment.VertxConstants.RX_MESSAGE;
import static io.quarkus.vertx.deployment.VertxConstants.UNI;

import java.lang.annotation.Annotation;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
Expand All @@ -30,40 +28,27 @@
import io.quarkus.gizmo.AssignableResultHandle;
import io.quarkus.gizmo.BranchResult;
import io.quarkus.gizmo.BytecodeCreator;
import io.quarkus.gizmo.CatchBlockCreator;
import io.quarkus.gizmo.ClassCreator;
import io.quarkus.gizmo.ClassOutput;
import io.quarkus.gizmo.FunctionCreator;
import io.quarkus.gizmo.MethodCreator;
import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.gizmo.ResultHandle;
import io.quarkus.gizmo.TryBlock;
import io.quarkus.runtime.util.HashUtil;
import io.quarkus.vertx.ConsumeEvent;
import io.quarkus.vertx.runtime.EventConsumerInvoker;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;

class EventBusConsumer {

private static final String INVOKER_SUFFIX = "_VertxInvoker";

private static final MethodDescriptor ARC_CONTAINER = MethodDescriptor
.ofMethod(Arc.class, "container", ArcContainer.class);
private static final MethodDescriptor INSTANCE_HANDLE_GET = MethodDescriptor.ofMethod(InstanceHandle.class, "get",
Object.class);
private static final MethodDescriptor ARC_CONTAINER_INSTANCE_FOR_TYPE = MethodDescriptor
.ofMethod(ArcContainer.class,
"instance", InstanceHandle.class,
Class.class, Annotation[].class);
private static final MethodDescriptor VERTX_EXECUTE_BLOCKING = MethodDescriptor.ofMethod(Vertx.class,
"executeBlocking", void.class, Handler.class, boolean.class, Handler.class);
private static final MethodDescriptor FUTURE_COMPLETE = MethodDescriptor.ofMethod(Future.class,
"complete", void.class, Object.class);
private static final MethodDescriptor FUTURE_FAIL = MethodDescriptor.ofMethod(Future.class,
"fail", void.class, Throwable.class);
private static final MethodDescriptor ARC_CONTAINER_BEAN = MethodDescriptor.ofMethod(ArcContainer.class, "bean",
InjectableBean.class, String.class);
private static final MethodDescriptor ARC_CONTAINER_INSTANCE_FOR_BEAN = MethodDescriptor
Expand Down Expand Up @@ -118,51 +103,23 @@ static String generateInvoker(BeanInfo bean, MethodInfo method,
String generatedName = targetPackage.replace('.', '/') + "/" + baseName + INVOKER_SUFFIX + "_" + method.name() + "_"
+ HashUtil.sha1(sigBuilder.toString());

boolean blocking;
AnnotationValue blockingValue = consumeEvent.value("blocking");
blocking = method.hasAnnotation(BLOCKING) || (blockingValue != null && blockingValue.asBoolean());

ClassCreator invokerCreator = ClassCreator.builder().classOutput(classOutput).className(generatedName)
.interfaces(EventConsumerInvoker.class).build();

// The method descriptor is: void invokeBean(Object message)
MethodCreator invoke = invokerCreator.getMethodCreator("invokeBean", void.class, Object.class);
ResultHandle containerHandle = invoke.invokeStaticMethod(ARC_CONTAINER);

AnnotationValue blocking = consumeEvent.value("blocking");
boolean blockingAnnotation = method.hasAnnotation(BLOCKING);
if ((blocking != null && blocking.asBoolean()) || blockingAnnotation) {
// Blocking operation must be performed on a worker thread
ResultHandle vertxHandle = invoke
.invokeInterfaceMethod(INSTANCE_HANDLE_GET,
invoke.invokeInterfaceMethod(ARC_CONTAINER_INSTANCE_FOR_TYPE, containerHandle,
invoke.loadClass(Vertx.class),
invoke.newArray(Annotation.class.getName(), invoke.load(0))));

FunctionCreator func = invoke.createFunction(Handler.class);
BytecodeCreator funcBytecode = func.getBytecode();
AssignableResultHandle messageHandle = funcBytecode.createVariable(Message.class);
funcBytecode.assign(messageHandle, invoke.getMethodParam(0));
TryBlock tryBlock = funcBytecode.tryBlock();
invoke(bean, method, messageHandle, tryBlock);
tryBlock.invokeInterfaceMethod(FUTURE_COMPLETE, funcBytecode.getMethodParam(0), tryBlock.loadNull());

CatchBlockCreator catchBlock = tryBlock.addCatch(Exception.class);
// Need to reply with the caught exception - using Throwable.toString on purpose to get the class name.
ResultHandle failureMessage = catchBlock
.invokeVirtualMethod(THROWABLE_TO_STRING, catchBlock.getCaughtException());
ResultHandle failureStatus = catchBlock.load(FAILURE_CODE);
catchBlock.invokeInterfaceMethod(
MESSAGE_FAIL,
messageHandle,
failureStatus,
failureMessage);
// Completing successfully, the failure has been sent to the sender.
catchBlock.invokeInterfaceMethod(FUTURE_COMPLETE, funcBytecode.getMethodParam(0), catchBlock.loadNull());
if (blocking) {
MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class);
isBlocking.returnValue(isBlocking.load(true));
}

funcBytecode.returnValue(null);
invoke(bean, method, invoke.getMethodParam(0), invoke);

invoke.invokeInterfaceMethod(VERTX_EXECUTE_BLOCKING, vertxHandle, func.getInstance(), invoke.load(false),
invoke.loadNull());
} else {
invoke(bean, method, invoke.getMethodParam(0), invoke);
}
invoke.returnValue(null);
invokerCreator.close();
return generatedName.replace('/', '.');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.arc.Arc;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.common.annotation.Blocking;
Expand All @@ -39,9 +38,11 @@ public class MessageConsumerMethodTest {
@Inject
SimpleBean simpleBean;

@Inject
EventBus eventBus;

@Test
public void testSend() throws InterruptedException {
EventBus eventBus = Arc.container().instance(EventBus.class).get();
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("foo", "hello", ar -> {
if (ar.succeeded()) {
Expand All @@ -59,7 +60,6 @@ public void testSend() throws InterruptedException {

@Test
public void testSendAsync() throws InterruptedException {
EventBus eventBus = Arc.container().instance(EventBus.class).get();
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("foo-async", "hello", ar -> {
if (ar.succeeded()) {
Expand All @@ -77,7 +77,6 @@ public void testSendAsync() throws InterruptedException {

@Test
public void testSendAsyncUni() throws InterruptedException {
EventBus eventBus = Arc.container().instance(EventBus.class).get();
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("foo-async-uni", "hello-uni", ar -> {
if (ar.succeeded()) {
Expand All @@ -95,7 +94,6 @@ public void testSendAsyncUni() throws InterruptedException {

@Test
public void testSendDefaultAddress() throws InterruptedException {
EventBus eventBus = Arc.container().instance(EventBus.class).get();
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("io.quarkus.vertx.deployment.MessageConsumerMethodTest$SimpleBean", "Hello", ar -> {
if (ar.succeeded()) {
Expand All @@ -113,7 +111,6 @@ public void testSendDefaultAddress() throws InterruptedException {

@Test
public void testRequestContext() throws InterruptedException {
EventBus eventBus = Arc.container().instance(EventBus.class).get();
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("request", "Martin", ar -> {
if (ar.succeeded()) {
Expand All @@ -129,10 +126,26 @@ public void testRequestContext() throws InterruptedException {
assertEquals("MArtin", synchronizer.poll(2, TimeUnit.SECONDS));
}

@Test
public void testBlockingRequestContext() throws InterruptedException {
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request("blocking-request", "Lu", ar -> {
if (ar.succeeded()) {
try {
synchronizer.put(ar.result().body());
} catch (InterruptedException e) {
fail(e);
}
} else {
fail(ar.cause());
}
});
assertEquals("Lu", synchronizer.poll(2, TimeUnit.SECONDS));
}

@Test
public void testPublish() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(2);
eventBus.publish("pub", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand All @@ -143,7 +156,6 @@ public void testPublish() throws InterruptedException {
@Test
public void testBlockingConsumer() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(1);
eventBus.publish("blocking", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand All @@ -155,7 +167,6 @@ public void testBlockingConsumer() throws InterruptedException {
@Test
public void testPublishRx() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(1);
eventBus.publish("pub-rx", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand All @@ -165,7 +176,6 @@ public void testPublishRx() throws InterruptedException {
@Test
public void testPublishAxle() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(1);
eventBus.publish("pub-axle", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand All @@ -175,7 +185,6 @@ public void testPublishAxle() throws InterruptedException {
@Test
public void testPublishMutiny() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(1);
eventBus.publish("pub-mutiny", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand All @@ -185,7 +194,6 @@ public void testPublishMutiny() throws InterruptedException {
@Test
public void testBlockingConsumerUsingSmallRyeBlocking() throws InterruptedException {
SimpleBean.MESSAGES.clear();
EventBus eventBus = Arc.container().instance(EventBus.class).get();
SimpleBean.latch = new CountDownLatch(1);
eventBus.publish("worker", "Hello");
SimpleBean.latch.await(2, TimeUnit.SECONDS);
Expand Down Expand Up @@ -270,6 +278,12 @@ void consumeBlockingUsingRunOnWorkerThread(String message) {
MESSAGES.add(message.toLowerCase() + "::" + Context.isOnWorkerThread());
latch.countDown();
}

@Blocking
@ConsumeEvent("blocking-request")
String blockingRequestContextActive(String message) {
return transformer.transform(message);
}
}

@RequestScoped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@
*/
public interface EventConsumerInvoker extends BeanInvoker<Message<Object>> {

default boolean isBlocking() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.quarkus.vertx.ConsumeEvent;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
Expand Down Expand Up @@ -81,10 +82,24 @@ void registerMessageConsumers(Map<String, ConsumeEvent> messageConsumerConfigura
consumer.handler(new Handler<Message<Object>>() {
@Override
public void handle(Message<Object> m) {
try {
invoker.invoke(m);
} catch (Throwable e) {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
if (invoker.isBlocking()) {
vertx.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> event) {
try {
invoker.invoke(m);
} catch (Throwable e) {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
}
event.complete();
}
}, null);
} else {
try {
invoker.invoke(m);
} catch (Throwable e) {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
}
}
}
});
Expand Down