Skip to content

Commit

Permalink
Merge pull request #24246 from cescoffier/mark-context-safe-in-non-bl…
Browse files Browse the repository at this point in the history
…ocking-consume-event

Mark duplicated context created by Vert.x as safe in non-blocking @ConsumeEvent
  • Loading branch information
cescoffier authored Mar 11, 2022
2 parents 7aa0ab3 + 34514aa commit 3097830
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.vertx.ContextLocals;
import io.smallrye.common.vertx.VertxContext;
Expand Down Expand Up @@ -147,12 +148,14 @@ public static class MyConsumers {
@ConsumeEvent(value = "context")
Uni<String> receive(String data) {
Assertions.assertTrue(Thread.currentThread().getName().contains("vert.x-eventloop"));
VertxContextSafetyToggle.validateContextIfExists("Not marked as safe", "Not marked as safe");
return process(data);
}

private Uni<String> process(String id) {
Context context = Vertx.currentContext();
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Not marked as safe", "Not marked as safe");

String val = ContextLocals.get("key", null);
Assertions.assertNull(val);
Expand All @@ -168,6 +171,7 @@ private Uni<String> process(String id) {
Assertions.assertEquals("hey!", msg);
Assertions.assertEquals(id, ContextLocals.get("key", null));
Assertions.assertSame(Vertx.currentContext(), context);
VertxContextSafetyToggle.validateContextIfExists("Not marked as safe", "Not marked as safe");
return "OK-" + ContextLocals.get("key", null);
}).toCompletionStage());
}
Expand All @@ -176,13 +180,16 @@ private Uni<String> process(String id) {
@Blocking
String receiveBlocking(String data) {
Assertions.assertFalse(Thread.currentThread().getName().contains("vert.x-eventloop"));
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Not marked as safe", "Not marked as safe");
return process(data).await().atMost(Duration.ofSeconds(4));
}

@ConsumeEvent(value = "context-send")
public void consumeSend(String s) {
Context context = Vertx.currentContext();
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Not marked as safe", "Not marked as safe");

String val = ContextLocals.get("key", null);
Assertions.assertNull(val);
Expand All @@ -195,6 +202,7 @@ public void consumeSend(String s) {
public void consumeSendBlocking(String s) {
Context context = Vertx.currentContext();
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Not marked as safe", "Not marked as safe");

String val = ContextLocals.get("key", null);
Assertions.assertNull(val);
Expand All @@ -207,6 +215,7 @@ public void consumeSendBlocking(String s) {
public void consumePublish1(String s) {
Context context = Vertx.currentContext();
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Not marked as safe", "Not marked as safe");

String val = ContextLocals.get("key", null);
Assertions.assertNull(val);
Expand All @@ -219,6 +228,7 @@ public void consumePublish1(String s) {
public void consumePublish2(String s) {
Context context = Vertx.currentContext();
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Not marked as safe", "Not marked as safe");

String val = ContextLocals.get("key", null);
Assertions.assertNull(val);
Expand All @@ -232,6 +242,7 @@ public void consumePublish2(String s) {
public void consumePublishBlocking1(String s) {
Context context = Vertx.currentContext();
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Not marked as safe", "Not marked as safe");

String val = ContextLocals.get("key", null);
Assertions.assertNull(val);
Expand All @@ -245,6 +256,7 @@ public void consumePublishBlocking1(String s) {
public void consumePublishBlocking2(String s) {
Context context = Vertx.currentContext();
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("Not marked as safe", "Not marked as safe");

String val = ContextLocals.get("key", null);
Assertions.assertNull(val);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.vertx.runtime;

import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;
import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setCurrentContextSafe;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
Expand Down Expand Up @@ -120,7 +121,11 @@ public void handle(Promise<Object> event) {
}
}, invoker.isOrdered(), null);
} else {
// Will run on the context used for the consumer registration
// Will run on the context used for the consumer registration.
// It's a duplicated context, but we need to mark it as safe.
// The safety comes from the fact that it's instantiated by Vert.x for every
// message.
setCurrentContextSafe(true);
try {
invoker.invoke(m);
} catch (Exception e) {
Expand Down

0 comments on commit 3097830

Please sign in to comment.