From f781b7fd106fd2d90656ac78d21f8e01374f0b68 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Tue, 14 Nov 2023 14:09:44 +0100 Subject: [PATCH] Handle duplicated context in the CacheResultInterceptor - Capture the context when calling the interceptor - Make sure the item is emitted on the captured context --- .../cache/runtime/CacheResultInterceptor.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java index dadc40e07d033..e6e1397b53dd4 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java @@ -1,6 +1,7 @@ package io.quarkus.cache.runtime; import java.time.Duration; +import java.util.concurrent.Executor; import java.util.function.Function; import java.util.function.Supplier; @@ -16,6 +17,10 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.TimeoutException; import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; @CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding. @Interceptor @@ -53,6 +58,7 @@ public Object intercept(InvocationContext invocationContext) throws Throwable { try { ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType()); if (returnType != ReturnType.NonAsync) { + Context context = Vertx.currentContext(); Uni cacheValue = cache.getAsync(key, new Function>() { @SuppressWarnings("unchecked") @Override @@ -65,11 +71,54 @@ public Uni apply(Object key) { throw new CacheException(e); } } + }).emitOn(new Executor() { + // We need make sure we go back to the original context when the cache value is computed. + // Otherwise, we would always emit on the context having computed the value, which could + // break the duplicated context isolation. + @Override + public void execute(Runnable command) { + Context ctx = Vertx.currentContext(); + if (context == null) { + // We didn't capture a context + if (ctx == null) { + // We are not on a context => we can execute immediately. + command.run(); + } else { + // We are on a context. + // We cannot continue on the current context as we may share a duplicated context. + // We need a new one. Note that duplicate() does not duplicate the duplicated context, + // but the root context. + ((ContextInternal) ctx).duplicate() + .runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } else { + // We captured a context. + if (ctx == context) { + // We are on the same context => we can execute immediately + command.run(); + } else { + // 1) We are not on a context (ctx == null) => we need to switch to the captured context. + // 2) We are on a different context (ctx != null) => we need to switch to the captured context. + context.runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } + } }); if (binding.lockTimeout() <= 0) { return createAsyncResult(cacheValue, returnType); } + // IMPORTANT: The item/failure are emitted on the captured context. cacheValue = cacheValue.ifNoItem().after(Duration.ofMillis(binding.lockTimeout())) .recoverWithUni(new Supplier>() { @Override