Skip to content

Commit

Permalink
Handle duplicated context in the CacheResultInterceptor
Browse files Browse the repository at this point in the history
Backport of quarkusio#37077

* Capture the context when calling the interceptor
* Make sure the item is emitted on the captured context
  • Loading branch information
cescoffier committed Nov 20, 2023
1 parent 3e50616 commit f052cc8
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package io.quarkus.cache.test.runtime;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.control.ActivateRequestContext;
import javax.inject.Inject;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.cache.CacheResult;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

public class DuplicatedContextHandlingTest {

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest().withApplicationRoot(jar -> jar
.addClass(CachedService.class));

@Inject
CachedService cachedService;

@Inject
Vertx vertx;

@Test
@ActivateRequestContext
void testDuplicatedContextHandlingWhenCalledFromNoContext() {
cachedService.direct(false).await().indefinitely();
cachedService.direct(true).await().indefinitely();
}

@Test
@ActivateRequestContext
void testDuplicatedContextHandlingWhenCalledOnContext() throws InterruptedException {
ContextInternal context = (ContextInternal) vertx.getOrCreateContext();
if (context.isDuplicate()) {
context = context.duplicate();
}

CountDownLatch latch = new CountDownLatch(1);
Context tmp = context;
context.runOnContext(x -> {
cachedService.direct(false)
.invoke(() -> {
if (!tmp.equals(Vertx.currentContext())) {
throw new AssertionError("Expected to go back on the caller context");
}
})
.subscribe().with(y -> latch.countDown());
});
Assertions.assertTrue(latch.await(1, TimeUnit.SECONDS));

CountDownLatch latch2 = new CountDownLatch(1);
context.runOnContext(x -> {
cachedService.direct(true)
.invoke(() -> {
if (!tmp.equals(Vertx.currentContext())) {
throw new AssertionError("Expected to go back on the caller context");
}
})
.subscribe().with(y -> latch2.countDown());
});
Assertions.assertTrue(latch2.await(1, TimeUnit.SECONDS));

CountDownLatch latch3 = new CountDownLatch(1);
context.runOnContext(x -> {
cachedService.direct(false)
.invoke(() -> {
if (!tmp.equals(Vertx.currentContext())) {
throw new AssertionError("Expected to go back on the caller context");
}
})
.subscribe().with(y -> latch3.countDown());
});
Assertions.assertTrue(latch3.await(1, TimeUnit.SECONDS));

}

@Test
@ActivateRequestContext
void testDuplicatedContextHandlingWhenCalledOnDifferentContexts() throws InterruptedException {
ContextInternal context = (ContextInternal) vertx.getOrCreateContext();
context = context.duplicate();
var context2 = context.duplicate();

CountDownLatch latch = new CountDownLatch(1);
Context tmp = context;
context.runOnContext(x -> {
cachedService.direct(false)
.invoke(() -> {
if (!tmp.equals(Vertx.currentContext())) {
throw new AssertionError("Expected to go back on the caller context");
}
})
.subscribe().with(y -> latch.countDown());
});
Assertions.assertTrue(latch.await(1, TimeUnit.SECONDS));

CountDownLatch latch2 = new CountDownLatch(1);
context2.runOnContext(x -> {
cachedService.direct(false)
.invoke(() -> {
if (!context2.equals(Vertx.currentContext())) {
throw new AssertionError("Expected to go back on the caller context");
}
})
.subscribe().with(y -> latch2.countDown());
});
Assertions.assertTrue(latch2.await(1, TimeUnit.SECONDS));
}

@Test
@ActivateRequestContext
void testDuplicatedContextHandlingWhenCalledContextAndAnsweredFromAnotherContext() throws InterruptedException {
ContextInternal context = (ContextInternal) vertx.getOrCreateContext();
context = context.duplicate();
var context2 = context.duplicate();

CountDownLatch latch = new CountDownLatch(1);
Context tmp = context;
context.runOnContext(x -> {
cachedService.directOnAnotherContext(false)
.invoke(() -> {
if (!tmp.equals(Vertx.currentContext())) {
throw new AssertionError("Expected to go back on the caller context");
}
})
.subscribe().with(y -> latch.countDown());
});
Assertions.assertTrue(latch.await(1, TimeUnit.SECONDS));

CountDownLatch latch2 = new CountDownLatch(1);
context2.runOnContext(x -> {
cachedService.directOnAnotherContext(false)
.invoke(() -> {
if (!context2.equals(Vertx.currentContext())) {
throw new AssertionError("Expected to go back on the caller context");
}
})
.subscribe().with(y -> latch2.countDown());
});
Assertions.assertTrue(latch2.await(1, TimeUnit.SECONDS));
}

@ApplicationScoped
public static class CachedService {

volatile boolean timedout = false;

@CacheResult(cacheName = "duplicated-context-cache", lockTimeout = 100)
public Uni<String> direct(boolean timeout) {
if (!timeout || timedout) {
return Uni.createFrom().item("foo");
}
timedout = true;
return Uni.createFrom().nothing();
}

@CacheResult(cacheName = "duplicated-context-cache", lockTimeout = 100)
public Uni<String> directOnAnotherContext(boolean timeout) {
if (!timeout || timedout) {
return Uni.createFrom().item("foo")
.emitOn(c -> ((ContextInternal) Vertx.currentContext().owner()).duplicate().runOnContext(x -> c.run()));
}
timedout = true;
return Uni.createFrom().nothing();
}
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.cache.runtime;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -16,6 +17,10 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.TimeoutException;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

@CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding.
@Interceptor
Expand Down Expand Up @@ -53,6 +58,7 @@ public Object intercept(InvocationContext invocationContext) throws Throwable {
try {
ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType());
if (returnType != ReturnType.NonAsync) {
Context context = Vertx.currentContext();
Uni<Object> cacheValue = cache.get(key, new Function<Object, Object>() {
@Override
public Object apply(Object k) {
Expand Down Expand Up @@ -81,10 +87,19 @@ public Uni<?> apply(Object emittedValue) {
return Uni.createFrom().item(value);
}
}
}).emitOn(new Executor() {
// We need make sure we go back to the original context when the cache value is computed.
// Otherwise, we would always emit on the context having computed the value, which could
// break the duplicated context isolation.
@Override
public void execute(Runnable command) {
invokeOnContext(command, context);
}
});
if (binding.lockTimeout() <= 0) {
return createAsyncResult(cacheValue, returnType);
}
// IMPORTANT: The item/failure are emitted on the captured context.
cacheValue = cacheValue.ifNoItem().after(Duration.ofMillis(binding.lockTimeout()))
.recoverWithUni(new Supplier<Uni<?>>() {
@Override
Expand All @@ -97,6 +112,15 @@ public Uni<?> get() {
throw new CacheException(e);
}
}
})
.emitOn(new Executor() {
// We need make sure we go back to the original context when the cache value is computed.
// Otherwise, we would always emit on the context having computed the value, which could
// break the duplicated context isolation.
@Override
public void execute(Runnable command) {
invokeOnContext(command, context);
}
});
return createAsyncResult(cacheValue, returnType);
} else {
Expand Down Expand Up @@ -142,4 +166,42 @@ public Object apply(Object k) {
}
}

private static void invokeOnContext(Runnable command, Context context) {
Context ctx = Vertx.currentContext();
if (context == null) {
// We didn't capture a context
if (ctx == null) {
// We are not on a context => we can execute immediately.
command.run();
} else {
// We are on a context.
// We cannot continue on the current context as we may share a duplicated context.
// We need a new one. Note that duplicate() does not duplicate the duplicated context,
// but the root context.
((ContextInternal) ctx).duplicate()
.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
} else {
// We captured a context.
if (ctx == context) {
// We are on the same context => we can execute immediately
command.run();
} else {
// 1) We are not on a context (ctx == null) => we need to switch to the captured context.
// 2) We are on a different context (ctx != null) => we need to switch to the captured context.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
}
}

}

0 comments on commit f052cc8

Please sign in to comment.