Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a RunInSafeDuplicatedContext annotation #34636

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/src/main/asciidoc/duplicated-context.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,21 @@ Other extensions should follow a similar pattern when they are setting up a new
In other cases, it might be helpful to mark the current context as not safe instead explicitly; for example, if an existing context needs to be shared across multiple workers to process some operations in parallel: by marking and un-marking appropriately the same
context can have spans in which it's safe, followed by spans in which it's not safe.

To mark a context as safe, you can:

1. Use the `io.quarkus.vertx.core.runtime.context.SafeVertxContext` annotation
2. Use the `io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle` class

By using the `io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle` class, the current context can be explicitly marked as `safe`, or it can be explicitly marked as `unsafe`; there's a third state which is the default of any new context: `unmarked`.
The default is to consider any unmarked context to be `unsafe`, unless the system property `io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.UNRESTRICTED_BY_DEFAULT` is set to `true`;

The `SafeVertxContext` annotation marks the current duplicated context as safe and invokes the annotated method if the context is `unmarked` or already marked as `safe`.
If the context is marked as `unsafe`, you can force it to be `safe` using the `force=true` parameter.
However, this possibility must be used carefully.

IMPORTANT: The `@SafeVertxContext` annotation is a CDI interceptor binding annotation.
Therefore, it only works for CDI beans and on non-private methods.

== Extensions supporting duplicated contexts

In general, Quarkus invokes reactive code on duplicated contexts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.objectweb.asm.Opcodes;
import org.objectweb.asm.Type;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.deployment.UnremovableBeanBuildItem;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
Expand Down Expand Up @@ -56,6 +57,7 @@
import io.quarkus.vertx.core.runtime.VertxLocalsHelper;
import io.quarkus.vertx.core.runtime.VertxLogDelegateFactory;
import io.quarkus.vertx.core.runtime.config.VertxConfiguration;
import io.quarkus.vertx.core.runtime.context.SafeVertxContextInterceptor;
import io.quarkus.vertx.mdc.provider.LateBoundMDCProvider;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
Expand All @@ -71,6 +73,11 @@ class VertxCoreProcessor {
"io.vertx.core.impl.btc.BlockedThreadChecker" // Vert.x 4.3+
);

@BuildStep
AdditionalBeanBuildItem registerSafeDuplicatedContextInterceptor() {
return new AdditionalBeanBuildItem(SafeVertxContextInterceptor.class.getName());
}

@BuildStep
NativeImageConfigBuildItem build(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<NativeImageResourceBuildItem> nativeImageResources) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package io.quarkus.vertx.locals;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.assertj.core.api.Assertions;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.core.runtime.context.SafeVertxContext;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.mutiny.core.Vertx;

/**
* Verify the behavior of the interceptor handling {@link SafeVertxContext}
*/
public class SafeVertxContextTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap
.create(JavaArchive.class).addClasses(MyBean.class));

@Inject
MyBean bean;

@Inject
Vertx vertx;

@Test
void testWhenRunningFromUnmarkedDuplicatedContext() throws InterruptedException {
Context dc = VertxContext.getOrCreateDuplicatedContext(vertx.getDelegate());
CountDownLatch latch = new CountDownLatch(1);
dc.runOnContext(ignored -> {
bean.run();
bean.runWithForce();
latch.countDown();
});

Assertions.assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
}

@Test
void testWhenRunningFromSafeDuplicatedContext() throws InterruptedException {
Context dc = VertxContext.getOrCreateDuplicatedContext(vertx.getDelegate());
VertxContextSafetyToggle.setContextSafe(dc, true);
CountDownLatch latch = new CountDownLatch(1);
dc.runOnContext(ignored -> {
bean.run();
bean.runWithForce();
latch.countDown();
});

Assertions.assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
}

@Test
void testWhenRunningFromUnsafeDuplicatedContext() throws InterruptedException {
Context dc = VertxContext.getOrCreateDuplicatedContext(vertx.getDelegate());
VertxContextSafetyToggle.setContextSafe(dc, false);
CountDownLatch latch = new CountDownLatch(1);
dc.runOnContext(ignored -> {
try {
bean.run();
fail("The interceptor should have failed.");
} catch (IllegalStateException e) {
// Expected
}
bean.runWithForce();
latch.countDown();
});

Assertions.assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
}

@Test
void testWhenRunningOnARootContext() throws InterruptedException {
Context dc = vertx.getDelegate().getOrCreateContext();
CountDownLatch latch = new CountDownLatch(1);
dc.runOnContext(ignored -> {
try {
bean.run();
fail("The interceptor should have failed.");
} catch (IllegalStateException e) {
// Expected
}
try {
bean.run();
fail("The interceptor should have failed.");
} catch (IllegalStateException e) {
// Expected
}
latch.countDown();
});

Assertions.assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue();
}

@Test
void testWhenRunningWithoutAContext() {
try {
bean.run();
fail("The interceptor should have failed.");
} catch (IllegalStateException e) {
// Expected
}
try {
bean.run();
fail("The interceptor should have failed.");
} catch (IllegalStateException e) {
// Expected
}
}

@ApplicationScoped
public static class MyBean {

@SafeVertxContext
public void run() {
assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("ErrorVeto", "ErrorDoubt");
}

@SafeVertxContext(force = true)
public void runWithForce() {
assertTrue(VertxContext.isOnDuplicatedContext());
VertxContextSafetyToggle.validateContextIfExists("ErrorVeto", "ErrorDoubt");
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.vertx.core.runtime.context;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import jakarta.enterprise.util.Nonbinding;
import jakarta.interceptor.InterceptorBinding;

/**
* Indicates that the annotated method should be invoked on a safe duplicated context.
* This interceptor binding is a declarative alternative to the {@link VertxContextSafetyToggle}.
* <p>
* <strong>Important:</strong> You must only use this annotation if the annotated method does not access the context
* concurrently.
* <p>
* If the method is not run on a duplicated context, the interceptor fails.
* If the method is invoked on an unmarked duplicated context, the context is marked as `safe` and the method is called.
* If the method is invoked on a `safe` duplicated context, the method is called.
* If the method is invoked on an `unsafe` duplicated context, the interceptor fails, except if {@link #force()} is
* set to {@code true}. In this case, the duplicated context is marked as `safe` and the method is called.
*/
@InterceptorBinding
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR })
@Inherited
public @interface SafeVertxContext {

/**
* @return whether the current safety flag of the current context must be overridden.
*/
@Nonbinding
boolean force() default false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.quarkus.vertx.core.runtime.context;

import jakarta.inject.Inject;
import jakarta.interceptor.AroundInvoke;
import jakarta.interceptor.Interceptor;

import org.jboss.logging.Logger;

import io.quarkus.arc.ArcInvocationContext;
import io.vertx.core.Vertx;

@SafeVertxContext
@Interceptor
public class SafeVertxContextInterceptor {

@Inject
Vertx vertx;

private final static Logger LOGGER = Logger.getLogger(SafeVertxContextInterceptor.class);

@AroundInvoke
public Object markTheContextSafe(ArcInvocationContext ic) throws Exception {
final io.vertx.core.Context current = vertx.getOrCreateContext();
if (VertxContextSafetyToggle.isExplicitlyMarkedAsSafe(current)) {
return ic.proceed();
}

var annotation = ic.findIterceptorBinding(SafeVertxContext.class);
boolean unsafe = VertxContextSafetyToggle.isExplicitlyMarkedAsUnsafe(current);
if (unsafe && annotation.force()) {
LOGGER.debugf("Force the duplicated context as `safe` while is was explicitly marked as `unsafe` in %s.%s",
ic.getMethod().getDeclaringClass().getName(), ic.getMethod().getName());
} else if (unsafe) {
throw new IllegalStateException(
"Unable to mark the context as safe, as the current context is explicitly marked as unsafe");
}
VertxContextSafetyToggle.setContextSafe(current, true);
cescoffier marked this conversation as resolved.
Show resolved Hide resolved
return ic.proceed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,28 @@ public static void setContextSafe(final Context context, final boolean safe) {
}
}

public static boolean isExplicitlyMarkedAsSafe(final Context context) {
if (!VertxContext.isDuplicatedContext(context)) {
throw new IllegalStateException(
"Can't get the context safety flag: the current context is not a duplicated context");
}
final Object safeFlag = context.getLocal(ACCESS_TOGGLE_KEY);
if (safeFlag == Boolean.TRUE) {
return true;
}
return false;
}

public static boolean isExplicitlyMarkedAsUnsafe(final Context context) {
if (!VertxContext.isDuplicatedContext(context)) {
throw new IllegalStateException(
"Can't get the context safety flag: the current context is not a duplicated context");
}
final Object safeFlag = context.getLocal(ACCESS_TOGGLE_KEY);
if (safeFlag == Boolean.FALSE) {
return true;
}
return false;
}

}