Skip to content

Commit

Permalink
When a message consumer fails and uses blocking=true, the caught ex…
Browse files Browse the repository at this point in the history
…ception must be captured and sent to the sender as reply exception.
  • Loading branch information
cescoffier committed Oct 9, 2020
1 parent e4686a7 commit ea87a7f
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.vertx.deployment;

import static io.quarkus.vertx.ConsumeEvent.FAILURE_CODE;
import static io.quarkus.vertx.deployment.VertxConstants.*;

import java.lang.annotation.Annotation;
Expand Down Expand Up @@ -85,6 +86,8 @@ class EventBusConsumer {
.ofMethod(Uni.class, "subscribeAsCompletionStage", CompletableFuture.class);
protected static final MethodDescriptor THROWABLE_GET_MESSAGE = MethodDescriptor
.ofMethod(Throwable.class, "getMessage", String.class);
protected static final MethodDescriptor THROWABLE_TO_STRING = MethodDescriptor
.ofMethod(Throwable.class, "toString", String.class);

static String generateInvoker(BeanInfo bean, MethodInfo method,
AnnotationInstance consumeEvent,
Expand Down Expand Up @@ -130,8 +133,20 @@ static String generateInvoker(BeanInfo bean, MethodInfo method,
TryBlock tryBlock = funcBytecode.tryBlock();
invoke(bean, method, messageHandle, tryBlock);
tryBlock.invokeInterfaceMethod(FUTURE_COMPLETE, funcBytecode.getMethodParam(0), tryBlock.loadNull());

CatchBlockCreator catchBlock = tryBlock.addCatch(Exception.class);
catchBlock.invokeInterfaceMethod(FUTURE_FAIL, funcBytecode.getMethodParam(0), catchBlock.getCaughtException());
// Need to reply with the caught exception - using Throwable.toString on purpose to get the class name.
ResultHandle failureMessage = catchBlock
.invokeVirtualMethod(THROWABLE_TO_STRING, catchBlock.getCaughtException());
ResultHandle failureStatus = catchBlock.load(FAILURE_CODE);
catchBlock.invokeInterfaceMethod(
MESSAGE_FAIL,
messageHandle,
failureStatus,
failureMessage);
// Completing successfully, the failure has been sent to the sender.
catchBlock.invokeInterfaceMethod(FUTURE_COMPLETE, funcBytecode.getMethodParam(0), catchBlock.loadNull());

funcBytecode.returnValue(null);

invoke.invokeInterfaceMethod(VERTX_EXECUTE_BLOCKING, vertxHandle, func.getInstance(), invoke.load(false),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package io.quarkus.vertx.deployment;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.Uni;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;

public class MessageConsumerFailureBlockingTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClasses(SimpleBean.class));

@Inject
SimpleBean simpleBean;

@Inject
EventBus eventBus;

@Test
public void testFailureWithBlocking() throws InterruptedException {
verifyFailure("foo", "java.lang.IllegalStateException: Red is dead", false);
verifyFailure("foo-message", "java.lang.NullPointerException", false);
verifyFailure("foo-completion-stage", "java.lang.NullPointerException: Something is null", false);
verifyFailure("foo-completion-stage-failure", "boom", true);
verifyFailure("foo-uni", "java.lang.NullPointerException: Something is null", false);
verifyFailure("foo-uni-failure", "boom", true);
}

void verifyFailure(String address, String expectedMessage, boolean explicit) throws InterruptedException {
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
eventBus.request(address, "hello", ar -> {
try {
if (ar.cause() != null) {
synchronizer.put(ar.cause());
} else {
synchronizer.put(false);
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
Object ret = synchronizer.poll(2, TimeUnit.SECONDS);
assertTrue(ret instanceof ReplyException);
ReplyException replyException = (ReplyException) ret;
if (!explicit) {
assertEquals(ConsumeEvent.FAILURE_CODE, replyException.failureCode());
} else {
assertEquals(ConsumeEvent.EXPLICIT_FAILURE_CODE, replyException.failureCode());
}
assertEquals(expectedMessage, replyException.getMessage());
}

static class SimpleBean {

@ConsumeEvent(value = "foo", blocking = true)
String fail(String message) {
throw new IllegalStateException("Red is dead");
}

@ConsumeEvent(value = "foo-message", blocking = true)
void failMessage(Message<String> message) {
throw new NullPointerException();
}

@ConsumeEvent(value = "foo-completion-stage", blocking = true)
CompletionStage<String> failCompletionStage(String message) {
throw new NullPointerException("Something is null");
}

@ConsumeEvent(value = "foo-completion-stage-failure", blocking = true)
CompletionStage<String> failedCompletionStage(String message) {
CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(new IOException("boom"));
return future;
}

@ConsumeEvent(value = "foo-uni", blocking = true)
Uni<String> failUni(String message) {
throw new NullPointerException("Something is null");
}

@ConsumeEvent(value = "foo-uni-failure", blocking = true)
Uni<String> failedUni(String message) {
return Uni.createFrom().failure(new IOException("boom"));
}

}

}

0 comments on commit ea87a7f

Please sign in to comment.