diff --git a/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/DuplicatedContextHandlingTest.java b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/DuplicatedContextHandlingTest.java new file mode 100644 index 0000000000000..8f47f1770489f --- /dev/null +++ b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/DuplicatedContextHandlingTest.java @@ -0,0 +1,178 @@ +package io.quarkus.cache.test.runtime; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.control.ActivateRequestContext; +import javax.inject.Inject; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.cache.CacheResult; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; + +public class DuplicatedContextHandlingTest { + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest().withApplicationRoot(jar -> jar + .addClass(CachedService.class)); + + @Inject + CachedService cachedService; + + @Inject + Vertx vertx; + + @Test + @ActivateRequestContext + void testDuplicatedContextHandlingWhenCalledFromNoContext() { + cachedService.direct(false).await().indefinitely(); + cachedService.direct(true).await().indefinitely(); + } + + @Test + @ActivateRequestContext + void testDuplicatedContextHandlingWhenCalledOnContext() throws InterruptedException { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + if (context.isDuplicate()) { + context = context.duplicate(); + } + + CountDownLatch latch = new CountDownLatch(1); + Context tmp = context; + context.runOnContext(x -> { + cachedService.direct(false) + .invoke(() -> { + if (!tmp.equals(Vertx.currentContext())) { + throw new AssertionError("Expected to go back on the caller context"); + } + }) + .subscribe().with(y -> latch.countDown()); + }); + Assertions.assertTrue(latch.await(1, TimeUnit.SECONDS)); + + CountDownLatch latch2 = new CountDownLatch(1); + context.runOnContext(x -> { + cachedService.direct(true) + .invoke(() -> { + if (!tmp.equals(Vertx.currentContext())) { + throw new AssertionError("Expected to go back on the caller context"); + } + }) + .subscribe().with(y -> latch2.countDown()); + }); + Assertions.assertTrue(latch2.await(1, TimeUnit.SECONDS)); + + CountDownLatch latch3 = new CountDownLatch(1); + context.runOnContext(x -> { + cachedService.direct(false) + .invoke(() -> { + if (!tmp.equals(Vertx.currentContext())) { + throw new AssertionError("Expected to go back on the caller context"); + } + }) + .subscribe().with(y -> latch3.countDown()); + }); + Assertions.assertTrue(latch3.await(1, TimeUnit.SECONDS)); + + } + + @Test + @ActivateRequestContext + void testDuplicatedContextHandlingWhenCalledOnDifferentContexts() throws InterruptedException { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + context = context.duplicate(); + var context2 = context.duplicate(); + + CountDownLatch latch = new CountDownLatch(1); + Context tmp = context; + context.runOnContext(x -> { + cachedService.direct(false) + .invoke(() -> { + if (!tmp.equals(Vertx.currentContext())) { + throw new AssertionError("Expected to go back on the caller context"); + } + }) + .subscribe().with(y -> latch.countDown()); + }); + Assertions.assertTrue(latch.await(1, TimeUnit.SECONDS)); + + CountDownLatch latch2 = new CountDownLatch(1); + context2.runOnContext(x -> { + cachedService.direct(false) + .invoke(() -> { + if (!context2.equals(Vertx.currentContext())) { + throw new AssertionError("Expected to go back on the caller context"); + } + }) + .subscribe().with(y -> latch2.countDown()); + }); + Assertions.assertTrue(latch2.await(1, TimeUnit.SECONDS)); + } + + @Test + @ActivateRequestContext + void testDuplicatedContextHandlingWhenCalledContextAndAnsweredFromAnotherContext() throws InterruptedException { + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + context = context.duplicate(); + var context2 = context.duplicate(); + + CountDownLatch latch = new CountDownLatch(1); + Context tmp = context; + context.runOnContext(x -> { + cachedService.directOnAnotherContext(false) + .invoke(() -> { + if (!tmp.equals(Vertx.currentContext())) { + throw new AssertionError("Expected to go back on the caller context"); + } + }) + .subscribe().with(y -> latch.countDown()); + }); + Assertions.assertTrue(latch.await(1, TimeUnit.SECONDS)); + + CountDownLatch latch2 = new CountDownLatch(1); + context2.runOnContext(x -> { + cachedService.directOnAnotherContext(false) + .invoke(() -> { + if (!context2.equals(Vertx.currentContext())) { + throw new AssertionError("Expected to go back on the caller context"); + } + }) + .subscribe().with(y -> latch2.countDown()); + }); + Assertions.assertTrue(latch2.await(1, TimeUnit.SECONDS)); + } + + @ApplicationScoped + public static class CachedService { + + volatile boolean timedout = false; + + @CacheResult(cacheName = "duplicated-context-cache", lockTimeout = 100) + public Uni direct(boolean timeout) { + if (!timeout || timedout) { + return Uni.createFrom().item("foo"); + } + timedout = true; + return Uni.createFrom().nothing(); + } + + @CacheResult(cacheName = "duplicated-context-cache", lockTimeout = 100) + public Uni directOnAnotherContext(boolean timeout) { + if (!timeout || timedout) { + return Uni.createFrom().item("foo") + .emitOn(c -> ((ContextInternal) Vertx.currentContext().owner()).duplicate().runOnContext(x -> c.run())); + } + timedout = true; + return Uni.createFrom().nothing(); + } + } + +} diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java index e1c6e94bcbd51..c6f0e96d71a42 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java @@ -1,6 +1,7 @@ package io.quarkus.cache.runtime; import java.time.Duration; +import java.util.concurrent.Executor; import java.util.function.Function; import java.util.function.Supplier; @@ -16,6 +17,10 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.TimeoutException; import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; @CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding. @Interceptor @@ -53,6 +58,7 @@ public Object intercept(InvocationContext invocationContext) throws Throwable { try { ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType()); if (returnType != ReturnType.NonAsync) { + Context context = Vertx.currentContext(); Uni cacheValue = cache.get(key, new Function() { @Override public Object apply(Object k) { @@ -81,10 +87,19 @@ public Uni apply(Object emittedValue) { return Uni.createFrom().item(value); } } + }).emitOn(new Executor() { + // We need make sure we go back to the original context when the cache value is computed. + // Otherwise, we would always emit on the context having computed the value, which could + // break the duplicated context isolation. + @Override + public void execute(Runnable command) { + invokeOnContext(command, context); + } }); if (binding.lockTimeout() <= 0) { return createAsyncResult(cacheValue, returnType); } + // IMPORTANT: The item/failure are emitted on the captured context. cacheValue = cacheValue.ifNoItem().after(Duration.ofMillis(binding.lockTimeout())) .recoverWithUni(new Supplier>() { @Override @@ -97,6 +112,15 @@ public Uni get() { throw new CacheException(e); } } + }) + .emitOn(new Executor() { + // We need make sure we go back to the original context when the cache value is computed. + // Otherwise, we would always emit on the context having computed the value, which could + // break the duplicated context isolation. + @Override + public void execute(Runnable command) { + invokeOnContext(command, context); + } }); return createAsyncResult(cacheValue, returnType); } else { @@ -142,4 +166,42 @@ public Object apply(Object k) { } } + private static void invokeOnContext(Runnable command, Context context) { + Context ctx = Vertx.currentContext(); + if (context == null) { + // We didn't capture a context + if (ctx == null) { + // We are not on a context => we can execute immediately. + command.run(); + } else { + // We are on a context. + // We cannot continue on the current context as we may share a duplicated context. + // We need a new one. Note that duplicate() does not duplicate the duplicated context, + // but the root context. + ((ContextInternal) ctx).duplicate() + .runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } else { + // We captured a context. + if (ctx == context) { + // We are on the same context => we can execute immediately + command.run(); + } else { + // 1) We are not on a context (ctx == null) => we need to switch to the captured context. + // 2) We are on a different context (ctx != null) => we need to switch to the captured context. + context.runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } + } + }