Skip to content

Commit

Permalink
ConsumeEvent - defer request context destruction for async return types
Browse files Browse the repository at this point in the history
- resolves quarkusio#14595
  • Loading branch information
mkouba committed Feb 8, 2021
1 parent 0dd5182 commit c1934eb
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static io.quarkus.vertx.deployment.VertxConstants.MESSAGE;
import static io.quarkus.vertx.deployment.VertxConstants.MUTINY_MESSAGE;
import static io.quarkus.vertx.deployment.VertxConstants.UNI;
import static org.objectweb.asm.Opcodes.ACC_FINAL;
import static org.objectweb.asm.Opcodes.ACC_PRIVATE;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -22,17 +24,14 @@
import io.quarkus.arc.processor.BeanInfo;
import io.quarkus.arc.processor.BuiltinScope;
import io.quarkus.arc.processor.DotNames;
import io.quarkus.gizmo.AssignableResultHandle;
import io.quarkus.gizmo.BranchResult;
import io.quarkus.gizmo.BytecodeCreator;
import io.quarkus.gizmo.ClassCreator;
import io.quarkus.gizmo.ClassOutput;
import io.quarkus.gizmo.FunctionCreator;
import io.quarkus.gizmo.FieldCreator;
import io.quarkus.gizmo.FieldDescriptor;
import io.quarkus.gizmo.MethodCreator;
import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.gizmo.ResultHandle;
import io.quarkus.runtime.util.HashUtil;
import io.quarkus.vertx.ConsumeEvent;
import io.quarkus.vertx.runtime.EventConsumerInvoker;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
Expand All @@ -42,6 +41,8 @@ class EventBusConsumer {

private static final String INVOKER_SUFFIX = "_VertxInvoker";

private static final MethodDescriptor INVOKER_CONSTRUCTOR = MethodDescriptor
.ofConstructor(EventConsumerInvoker.class);
private static final MethodDescriptor ARC_CONTAINER = MethodDescriptor
.ofMethod(Arc.class, "container", ArcContainer.class);
private static final MethodDescriptor INSTANCE_HANDLE_GET = MethodDescriptor.ofMethod(InstanceHandle.class, "get",
Expand All @@ -57,8 +58,6 @@ class EventBusConsumer {
"newInstance", io.vertx.mutiny.core.eventbus.Message.class, Message.class);
private static final MethodDescriptor MESSAGE_REPLY = MethodDescriptor.ofMethod(Message.class, "reply", void.class,
Object.class);
private static final MethodDescriptor MESSAGE_FAIL = MethodDescriptor.ofMethod(Message.class, "fail", void.class,
Integer.TYPE, String.class);
private static final MethodDescriptor MESSAGE_BODY = MethodDescriptor.ofMethod(Message.class, "body", Object.class);
private static final MethodDescriptor INSTANCE_HANDLE_DESTROY = MethodDescriptor
.ofMethod(InstanceHandle.class, "destroy",
Expand Down Expand Up @@ -99,32 +98,58 @@ static String generateInvoker(BeanInfo bean, MethodInfo method,
blocking = method.hasAnnotation(BLOCKING) || (blockingValue != null && blockingValue.asBoolean());

ClassCreator invokerCreator = ClassCreator.builder().classOutput(classOutput).className(generatedName)
.interfaces(EventConsumerInvoker.class).build();
.superClass(EventConsumerInvoker.class).build();

// The method descriptor is: void invokeBean(Object message)
MethodCreator invoke = invokerCreator.getMethodCreator("invokeBean", void.class, Object.class)
.addException(Exception.class);
// Initialized state
FieldCreator beanField = invokerCreator.getFieldCreator("bean", InjectableBean.class)
.setModifiers(ACC_PRIVATE | ACC_FINAL);
FieldCreator containerField = invokerCreator.getFieldCreator("container", ArcContainer.class)
.setModifiers(ACC_PRIVATE | ACC_FINAL);

if (blocking) {
MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class);
isBlocking.returnValue(isBlocking.load(true));
}

invoke(bean, method, invoke.getMethodParam(0), invoke);
implementConstructor(bean, invokerCreator, beanField, containerField);
implementInvoke(bean, method, invokerCreator, beanField.getFieldDescriptor(), containerField.getFieldDescriptor());

invoke.returnValue(null);
invokerCreator.close();
return generatedName.replace('/', '.');
}

private static void invoke(BeanInfo bean, MethodInfo method, ResultHandle messageHandle, BytecodeCreator invoke) {
ResultHandle containerHandle = invoke.invokeStaticMethod(ARC_CONTAINER);
ResultHandle beanHandle = invoke.invokeInterfaceMethod(ARC_CONTAINER_BEAN, containerHandle,
invoke.load(bean.getIdentifier()));
static void implementConstructor(BeanInfo bean, ClassCreator invokerCreator, FieldCreator beanField,
FieldCreator containerField) {
MethodCreator constructor = invokerCreator.getMethodCreator("<init>", void.class);
// Invoke super()
constructor.invokeSpecialMethod(INVOKER_CONSTRUCTOR, constructor.getThis());

ResultHandle containerHandle = constructor
.invokeStaticMethod(ARC_CONTAINER);
ResultHandle beanHandle = constructor.invokeInterfaceMethod(
ARC_CONTAINER_BEAN,
containerHandle, constructor.load(bean.getIdentifier()));
constructor.writeInstanceField(beanField.getFieldDescriptor(), constructor.getThis(), beanHandle);
constructor.writeInstanceField(containerField.getFieldDescriptor(), constructor.getThis(), containerHandle);
constructor.returnValue(null);
}

private static void implementInvoke(BeanInfo bean, MethodInfo method, ClassCreator invokerCreator,
FieldDescriptor beanField,
FieldDescriptor containerField) {

// The method descriptor is: CompletionStage invokeBean(Message message)
MethodCreator invoke = invokerCreator.getMethodCreator("invokeBean", CompletionStage.class, Message.class)
.addException(Exception.class);

ResultHandle containerHandle = invoke.readInstanceField(containerField, invoke.getThis());
ResultHandle beanHandle = invoke.readInstanceField(beanField, invoke.getThis());
ResultHandle instanceHandle = invoke.invokeInterfaceMethod(ARC_CONTAINER_INSTANCE_FOR_BEAN, containerHandle,
beanHandle);
ResultHandle beanInstanceHandle = invoke
.invokeInterfaceMethod(INSTANCE_HANDLE_GET, instanceHandle);
ResultHandle messageHandle = invoke.getMethodParam(0);
ResultHandle completionStage;

Type paramType = method.parameters().get(0);
if (paramType.name().equals(MESSAGE)) {
Expand All @@ -133,96 +158,44 @@ private static void invoke(BeanInfo bean, MethodInfo method, ResultHandle messag
MethodDescriptor
.ofMethod(bean.getImplClazz().name().toString(), method.name(), void.class, Message.class),
beanInstanceHandle, messageHandle);
completionStage = invoke.loadNull();
} else if (paramType.name().equals(MUTINY_MESSAGE)) {
// io.vertx.mutiny.core.eventbus.Message
ResultHandle mutinyMessageHandle = invoke.invokeStaticMethod(MUTINY_MESSAGE_NEW_INSTANCE, messageHandle);
invoke.invokeVirtualMethod(
MethodDescriptor.ofMethod(bean.getImplClazz().name().toString(), method.name(), void.class,
io.vertx.mutiny.core.eventbus.Message.class),
beanInstanceHandle, mutinyMessageHandle);
completionStage = invoke.loadNull();
} else {
// Parameter is payload
ResultHandle bodyHandle = invoke.invokeInterfaceMethod(MESSAGE_BODY, messageHandle);
ResultHandle replyHandle = invoke.invokeVirtualMethod(
ResultHandle returnHandle = invoke.invokeVirtualMethod(
MethodDescriptor.ofMethod(bean.getImplClazz().name().toString(), method.name(),
method.returnType().name().toString(), paramType.name().toString()),
beanInstanceHandle, bodyHandle);
if (replyHandle != null) {
if (returnHandle != null) {
if (method.returnType().name().equals(COMPLETION_STAGE)) {
FunctionCreator handler = generateWhenCompleteHandler(messageHandle, invoke);
invoke.invokeInterfaceMethod(
WHEN_COMPLETE,
replyHandle, handler.getInstance());
completionStage = returnHandle;
} else if (method.returnType().name().equals(UNI)) {
// If the return type is Uni use uni.subscribeAsCompletionStage().whenComplete(...)
FunctionCreator handler = generateWhenCompleteHandler(messageHandle, invoke);
ResultHandle subscribedCompletionStage = invoke.invokeInterfaceMethod(SUBSCRIBE_AS_COMPLETION_STAGE,
replyHandle);
invoke.invokeInterfaceMethod(WHEN_COMPLETE,
subscribedCompletionStage, handler.getInstance());
completionStage = invoke.invokeInterfaceMethod(SUBSCRIBE_AS_COMPLETION_STAGE,
returnHandle);
} else {
// Message.reply(returnValue)
invoke.invokeInterfaceMethod(MESSAGE_REPLY, messageHandle, replyHandle);
invoke.invokeInterfaceMethod(MESSAGE_REPLY, messageHandle, returnHandle);
completionStage = invoke.loadNull();
}
} else {
completionStage = invoke.loadNull();
}
}

// handle.destroy() - destroy dependent instance afterwards
if (BuiltinScope.DEPENDENT.is(bean.getScope())) {
invoke.invokeInterfaceMethod(INSTANCE_HANDLE_DESTROY, instanceHandle);
}
}

/**
* If the return type is CompletionStage use:
* <code><pre>
* cs.whenComplete((whenResult, whenFailure) -> {
* if (failure != null) {
* message.fail(status, whenFailure.getMessage());
* } else {
* message.reply(whenResult);
* }
* })
* </pre></code>
*
* @param messageHandle the message variable
* @param invoke the bytecode creator
* @return the function
*/
private static FunctionCreator generateWhenCompleteHandler(ResultHandle messageHandle, BytecodeCreator invoke) {
FunctionCreator handler = invoke.createFunction(BiConsumer.class);
BytecodeCreator bytecode = handler.getBytecode();

// This avoid having to check cast in the branches
AssignableResultHandle whenResult = bytecode.createVariable(Object.class);
bytecode.assign(whenResult, bytecode.getMethodParam(0));
AssignableResultHandle whenFailure = bytecode.createVariable(Exception.class);
bytecode.assign(whenFailure, bytecode.getMethodParam(1));
AssignableResultHandle message = bytecode.createVariable(Message.class);
bytecode.assign(message, messageHandle);

BranchResult ifFailureIfNull = bytecode.ifNull(whenFailure);
// failure is not null branch - message.fail(failureStatus, failure.getMessage())
// In this branch we use the EXPLICIT FAILURE CODE
BytecodeCreator failureIsNotNull = ifFailureIfNull.falseBranch();
ResultHandle failureStatus = failureIsNotNull.load(ConsumeEvent.EXPLICIT_FAILURE_CODE);
ResultHandle failureMessage = failureIsNotNull
.invokeVirtualMethod(THROWABLE_GET_MESSAGE, whenFailure);
failureIsNotNull.invokeInterfaceMethod(
MESSAGE_FAIL,
message,
failureStatus,
failureMessage);

// failure is null branch - message.reply(reply))
BytecodeCreator failureIsNull = ifFailureIfNull.trueBranch();
failureIsNull.invokeInterfaceMethod(
MESSAGE_REPLY,
messageHandle,
whenResult);

bytecode.returnValue(null);
return handler;
invoke.returnValue(completionStage);
}

private EventBusConsumer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package io.quarkus.vertx.deployment;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.PreDestroy;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.Uni;
import io.vertx.core.eventbus.EventBus;

public class RequestContextTerminationTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClasses(SimpleBean.class));

@Inject
EventBus eventBus;

@Test
public void testTermination() throws InterruptedException {
assertTerminated("foo");
assertTerminated("foo-cs");
assertTerminated("foo-uni");
}

void assertTerminated(String address) throws InterruptedException {
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
Converter.DESTROYED.set(false);
eventBus.request(address, "bongo", ar -> {
if (ar.succeeded()) {
try {
synchronizer.put(ar.result().body());
} catch (InterruptedException e) {
fail(e);
}
} else {
fail(ar.cause());
}
});
assertEquals("BONGO", synchronizer.poll(2, TimeUnit.SECONDS));
assertTrue(Converter.DESTROYED.get());
}

@Test
public void testFailureNoReplyHandler() throws InterruptedException {
}

static class SimpleBean {

@Inject
Converter converter;

@ConsumeEvent("foo")
String foo(String message) {
return converter.convert(message);
}

@ConsumeEvent("foo-cs")
CompletionStage<String> asyncFoo(String message) {
return CompletableFuture.completedFuture(converter.convert(message));
}

@ConsumeEvent("foo-uni")
Uni<String> asyncFooUni(String message) {
return Uni.createFrom().item(converter.convert(message));
}

}

@RequestScoped
static class Converter {

static final AtomicBoolean DESTROYED = new AtomicBoolean();

String convert(String val) {
return val.toUpperCase();
}

@PreDestroy
void destroy() {
DESTROYED.set(true);
}

}

}
Loading

0 comments on commit c1934eb

Please sign in to comment.