Skip to content

Commit

Permalink
Handle duplicated context in the CacheResultInterceptor
Browse files Browse the repository at this point in the history
- Capture the context when calling the interceptor
- Make sure the item is emitted on the captured context
  • Loading branch information
cescoffier committed Nov 15, 2023
1 parent c1af049 commit f98313b
Showing 1 changed file with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.cache.runtime;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -16,6 +17,9 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.TimeoutException;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;

@CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding.
@Interceptor
Expand Down Expand Up @@ -53,6 +57,7 @@ public Object intercept(InvocationContext invocationContext) throws Throwable {
try {
ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType());
if (returnType != ReturnType.NonAsync) {
Context context = Vertx.currentContext();
Uni<Object> cacheValue = cache.getAsync(key, new Function<Object, Uni<Object>>() {
@SuppressWarnings("unchecked")
@Override
Expand All @@ -65,11 +70,32 @@ public Uni<Object> apply(Object key) {
throw new CacheException(e);
}
}
}).emitOn(new Executor() {
// We need make sure we go back to the original context when the cache value is computed.
// Otherwise, we would always emit on the context having computed the value, which could
// break the duplicated context isolation.
@Override
public void execute(Runnable command) {
Context ctx = Vertx.currentContext();
if (ctx == context) {
// We are already on the right context, execute immediately.
command.run();
} else {
// Jump back to the captured context.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
}
});

if (binding.lockTimeout() <= 0) {
return createAsyncResult(cacheValue, returnType);
}
// IMPORTANT: The item/failure are emitted on the captured context.
cacheValue = cacheValue.ifNoItem().after(Duration.ofMillis(binding.lockTimeout()))
.recoverWithUni(new Supplier<Uni<?>>() {
@Override
Expand Down

0 comments on commit f98313b

Please sign in to comment.