diff --git a/extensions/cache/deployment/pom.xml b/extensions/cache/deployment/pom.xml index 7f5f0197b7714..a1c35c5f644c4 100644 --- a/extensions/cache/deployment/pom.xml +++ b/extensions/cache/deployment/pom.xml @@ -26,6 +26,10 @@ io.quarkus quarkus-caffeine-deployment + + io.quarkus + quarkus-mutiny-deployment + diff --git a/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java new file mode 100644 index 0000000000000..ff7e82583e1f4 --- /dev/null +++ b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java @@ -0,0 +1,95 @@ +package io.quarkus.cache.test.runtime; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.cache.CacheResult; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Uni; + +/** + * Tests the {@link CacheResult} annotation on methods returning {@link Uni}. + */ +public class UniValueTest { + + private static final String KEY = "key"; + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClass(CachedService.class).addAsResource( + new StringAsset("quarkus.log.category.\"io.quarkus.cache\".level=DEBUG"), "application.properties")); + + @Inject + CachedService cachedService; + + @Test + public void test() { + // STEP 1 + // Action: a method annotated with @CacheResult and returning a Uni is called. + // Expected effect: the method is invoked and its result (a Uni) is wrapped into an UncomputedUniValue which is cached. + // Verified by: invocations counter and CacheResultInterceptor log. + Uni uni1 = cachedService.cachedMethod(KEY); + assertEquals(1, cachedService.getInvocations()); + + // STEP 2 + // Action: same call as STEP 1. + // Expected effect: the method is not invoked and the cached UncomputedUniValue from STEP 1 is used to produce a result. + // Verified by: invocations counter and CacheResultInterceptor log. + Uni uni2 = cachedService.cachedMethod(KEY); + assertEquals(1, cachedService.getInvocations()); + + // STEP 3 + // Action: the Uni returned in STEP 1 is subscribed to and we wait for an item event to be fired. + // Expected effect: the UncomputedUniValue wrapper cached during STEP 1 is replaced with the emitted item from this step in the cache. + // Verified by: CaffeineCache log. + String emittedItem1 = uni1.await().indefinitely(); + + // STEP 4 + // Action: the Uni returned in STEP 2 is subscribed to and we wait for an item event to be fired. + // Expected effect: the emitted item from STEP 3 is replaced with the emitted item from this step in the cache. + // Verified by: CaffeineCache log and different objects references between STEPS 3 and 4 emitted items. + String emittedItem2 = uni2.await().indefinitely(); + assertTrue(emittedItem1 != emittedItem2); + + // STEP 5 + // Action: same call as STEP 2 but we immediately subscribe to the returned Uni and wait for an item event to be fired. + // Expected effect: the method is not invoked and the emitted item cached during STEP 4 is returned. + // Verified by: invocations counter and same object reference between STEPS 4 and 5 emitted items. + String emittedItem3 = cachedService.cachedMethod(KEY).await().indefinitely(); + assertEquals(1, cachedService.getInvocations()); + assertTrue(emittedItem2 == emittedItem3); + + // STEP 6 + // Action: same call as STEP 5 with a different key. + // Expected effect: the method is invoked and its result (a Uni) is wrapped into an UncomputedUniValue which is cached. + // Verified by: invocations counter, CacheResultInterceptor log and different objects references between STEPS 5 and 6 emitted items. + String emittedItem4 = cachedService.cachedMethod("another-key").await().indefinitely(); + assertEquals(2, cachedService.getInvocations()); + assertTrue(emittedItem3 != emittedItem4); + } + + @ApplicationScoped + static class CachedService { + + private int invocations; + + @CacheResult(cacheName = "test-cache") + public Uni cachedMethod(String key) { + invocations++; + return Uni.createFrom().item(() -> new String()); + } + + public int getInvocations() { + return invocations; + } + } +} diff --git a/extensions/cache/runtime/pom.xml b/extensions/cache/runtime/pom.xml index 73dd9f19a3744..4c2bddc8d4948 100644 --- a/extensions/cache/runtime/pom.xml +++ b/extensions/cache/runtime/pom.xml @@ -23,6 +23,10 @@ io.quarkus quarkus-caffeine + + io.quarkus + quarkus-mutiny + io.vertx vertx-web diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java index 8545d93c1b95a..d489be3811151 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java @@ -4,6 +4,7 @@ import java.util.function.Function; import io.quarkus.cache.Cache; +import io.smallrye.mutiny.Uni; public abstract class AbstractCache implements Cache { @@ -32,4 +33,17 @@ public Object getDefaultKey() { public abstract void invalidate(Object key); public abstract void invalidateAll(); + + /** + * Replaces the cache value associated with the given key by an item emitted by a Uni. This method can be called several + * times for the same key, each call will then always replace the existing cache entry with the given emitted value. If the + * key no longer identifies a cache entry, this method must not put the emitted item into the cache. + */ + public abstract Uni replaceUniValue(Object key, Object emittedValue); + + /** + * Removes the cache entry identified by the given key only if the cache value is the given {@link UncomputedUniValue}. + * This method is called in case of failure during the computation of a cached {@link Uni}. + */ + public abstract Uni removeUncomputedUniValue(Object key, UncomputedUniValue uncomputedUniValue); } diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java index 60d19de8fd61d..85435a52ccea2 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java @@ -14,6 +14,7 @@ import org.jboss.logging.Logger; import io.quarkus.cache.CacheResult; +import io.smallrye.mutiny.Uni; @CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding. @Interceptor @@ -47,28 +48,42 @@ public Object intercept(InvocationContext invocationContext) throws Throwable { @Override public Object apply(Object k) { try { - return invocationContext.proceed(); + Object invocationResult = invocationContext.proceed(); + if (invocationResult instanceof Uni) { + LOGGER.debugf("Adding UncomputedUniValue entry with key [%s] into cache [%s]", key, + cache.getName()); + return new UncomputedUniValue((Uni) invocationResult); + } else { + return invocationResult; + } } catch (Exception e) { throw new CacheException(e); } } }); + Object value; if (binding.lockTimeout() <= 0) { - return cacheValue.get(); + value = cacheValue.get(); } else { try { /* * If the current thread started the cache value computation, then the computation is already finished since * it was done synchronously and the following call will never time out. */ - return cacheValue.get(binding.lockTimeout(), TimeUnit.MILLISECONDS); + value = cacheValue.get(binding.lockTimeout(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // TODO: Add statistics here to monitor the timeout. return invocationContext.proceed(); } } + if (Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType())) { + return unwrapUniValue(cache, key, value); + } else { + return value; + } + } catch (ExecutionException e) { /* * Any exception raised during a cache computation will be encapsulated into an ExecutionException because it is @@ -93,4 +108,15 @@ public Object apply(Object k) { } } } + + private Object unwrapUniValue(AbstractCache cache, Object key, Object value) { + if (value instanceof UncomputedUniValue) { + UncomputedUniValue uncomputedUniValue = (UncomputedUniValue) value; + return uncomputedUniValue.getUni() + .onItem().call(emittedValue -> cache.replaceUniValue(key, emittedValue)) + .onFailure().call(() -> cache.removeUncomputedUniValue(key, uncomputedUniValue)); + } else { + return Uni.createFrom().item(value); + } + } } diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UncomputedUniValue.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UncomputedUniValue.java new file mode 100644 index 0000000000000..a3a1d94603bf8 --- /dev/null +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UncomputedUniValue.java @@ -0,0 +1,16 @@ +package io.quarkus.cache.runtime; + +import io.smallrye.mutiny.Uni; + +public class UncomputedUniValue { + + private final Uni uni; + + public UncomputedUniValue(Uni uni) { + this.uni = uni; + } + + public Uni getUni() { + return uni; + } +} diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCache.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCache.java index e4e2ba8aa11d3..b681950825e98 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCache.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCache.java @@ -4,12 +4,16 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import org.jboss.logging.Logger; + import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Caffeine; import io.quarkus.cache.runtime.AbstractCache; import io.quarkus.cache.runtime.CacheException; import io.quarkus.cache.runtime.NullValueConverter; +import io.quarkus.cache.runtime.UncomputedUniValue; +import io.smallrye.mutiny.Uni; /** * This class is an internal Quarkus cache implementation. Do not use it explicitly from your Quarkus application. The public @@ -17,6 +21,8 @@ */ public class CaffeineCache extends AbstractCache { + private static final Logger LOGGER = Logger.getLogger(CaffeineCache.class); + private AsyncCache cache; private String name; @@ -71,7 +77,7 @@ public CompletableFuture get(Object key, Function valueL if (key == null) { throw new NullPointerException(NULL_KEYS_NOT_SUPPORTED_MSG); } - CompletableFuture newCacheValue = new CompletableFuture(); + CompletableFuture newCacheValue = new CompletableFuture<>(); CompletableFuture existingCacheValue = cache.asMap().putIfAbsent(key, newCacheValue); if (existingCacheValue == null) { try { @@ -88,7 +94,7 @@ public CompletableFuture get(Object key, Function valueL } private CompletableFuture unwrapCacheValueOrThrowable(CompletableFuture cacheValue) { - return cacheValue.thenApply(new Function() { + return cacheValue.thenApply(new Function<>() { @Override public Object apply(Object value) { // If there's a throwable encapsulated into a CaffeineComputationThrowable, it must be rethrown. @@ -119,6 +125,37 @@ public void invalidateAll() { cache.synchronous().invalidateAll(); } + @Override + public Uni replaceUniValue(Object key, Object emittedValue) { + return Uni.createFrom().item(() -> { + // If the cache no longer contains the key because it was removed, we don't want to put it back. + cache.asMap().computeIfPresent(key, (k, currentValue) -> { + LOGGER.debugf("Replacing Uni value entry with key [%s] into cache [%s]", key, name); + /* + * The following computed value will always replace the current cache value (whether it is an + * UncomputedUniValue or not) if this method is called multiple times with the same key. + */ + return CompletableFuture.completedFuture(NullValueConverter.toCacheValue(emittedValue)); + }); + return null; + }); + } + + @Override + public Uni removeUncomputedUniValue(Object key, UncomputedUniValue uncomputedUniValue) { + /* + * The cache value associated with the given key will be removed only it is the given UncomputedUniValue. + * Emitted Uni items that would be cached will never be removed by this method. + */ + return Uni.createFrom().item(() -> { + boolean removed = cache.asMap().remove(key, uncomputedUniValue); + if (removed) { + LOGGER.debugf("UncomputedUniValue entry with key [%s] removed from cache [%s]", key, name); + } + return null; + }); + } + // For testing purposes only. public Integer getInitialCapacity() { return initialCapacity; diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java index 2bd7ebdb0fb07..f835db1e60cda 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java @@ -4,6 +4,8 @@ import java.util.function.Function; import io.quarkus.cache.runtime.AbstractCache; +import io.quarkus.cache.runtime.UncomputedUniValue; +import io.smallrye.mutiny.Uni; /** * This class is an internal Quarkus cache implementation. Do not use it explicitly from your Quarkus application. The public @@ -37,4 +39,14 @@ public void invalidate(Object key) { @Override public void invalidateAll() { } + + @Override + public Uni replaceUniValue(Object key, Object emittedValue) { + return Uni.createFrom().voidItem(); + } + + @Override + public Uni removeUncomputedUniValue(Object key, UncomputedUniValue uncomputedUniValue) { + return Uni.createFrom().voidItem(); + } }