From 85ac88b26a5623960f01de03fbb285a889268af1 Mon Sep 17 00:00:00 2001 From: Gwenneg Lepage Date: Sat, 17 Apr 2021 23:34:30 +0200 Subject: [PATCH] Cache the item emitted by a Uni instead of the Uni itself --- extensions/cache/deployment/pom.xml | 4 + .../cache/test/runtime/UniValueTest.java | 103 ++++++++++++++++++ extensions/cache/runtime/pom.xml | 4 + .../quarkus/cache/runtime/AbstractCache.java | 14 +++ .../cache/runtime/CacheResultInterceptor.java | 32 +++++- .../cache/runtime/UnresolvedUniValue.java | 16 +++ .../cache/runtime/caffeine/CaffeineCache.java | 42 ++++++- .../quarkus/cache/runtime/noop/NoOpCache.java | 12 ++ 8 files changed, 222 insertions(+), 5 deletions(-) create mode 100644 extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java create mode 100644 extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UnresolvedUniValue.java diff --git a/extensions/cache/deployment/pom.xml b/extensions/cache/deployment/pom.xml index 7f5f0197b7714a..a1c35c5f644c43 100644 --- a/extensions/cache/deployment/pom.xml +++ b/extensions/cache/deployment/pom.xml @@ -26,6 +26,10 @@ io.quarkus quarkus-caffeine-deployment + + io.quarkus + quarkus-mutiny-deployment + diff --git a/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java new file mode 100644 index 00000000000000..4af8bd3a7a24ca --- /dev/null +++ b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java @@ -0,0 +1,103 @@ +package io.quarkus.cache.test.runtime; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.cache.CacheResult; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Uni; + +/** + * Tests the {@link CacheResult} annotation on methods returning {@link Uni}. + */ +public class UniValueTest { + + private static final String KEY = "key"; + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClass(CachedService.class).addAsResource( + new StringAsset("quarkus.log.category.\"io.quarkus.cache\".level=DEBUG"), "application.properties")); + + @Inject + CachedService cachedService; + + @Test + public void test() { + // STEP 1 + // Action: a method annotated with @CacheResult and returning a Uni is called. + // Expected effect: the method is invoked and its result is wrapped into an UnresolvedUniValue which is cached. + // Verified by: invocations counter and CacheResultInterceptor log. + Uni uni1 = cachedService.cachedMethod(KEY); + assertEquals(1, cachedService.getInvocations()); + + // STEP 2 + // Action: same call as STEP 1. + // Expected effect: the method is NOT invoked and the cached UnresolvedUniValue from STEP 1 is used to produce a result. + // Verified by: invocations counter and CacheResultInterceptor log. + Uni uni2 = cachedService.cachedMethod(KEY); + assertEquals(1, cachedService.getInvocations()); + + // STEP 3 + // Action: the Uni returned in STEP 1 is subscribed to and we wait for an item event to be fired. + // Expected effect: the UnresolvedUniValue wrapper cached during STEP 1 is replaced with the emitted item from this step in the cache. + // Verified by: subscriptions counter and CaffeineCache log. + String emittedItem1 = uni1.await().indefinitely(); + assertEquals("1", emittedItem1); // This checks the subscriptions counter value. + + // STEP 4 + // Action: the Uni returned in STEP 2 is subscribed to and we wait for an item event to be fired. + // Expected effect: the emitted item from STEP 3 is replaced with the emitted item from this step in the cache. + // Verified by: subscriptions counter, CaffeineCache log and different objects references between STEPS 3 and 4 emitted items. + String emittedItem2 = uni2.await().indefinitely(); + assertTrue(emittedItem1 != emittedItem2); + assertEquals("2", emittedItem2); // This checks the subscriptions counter value. + + // STEP 5 + // Action: same call as STEP 2 but we immediately subscribe to the returned Uni and wait for an item event to be fired. + // Expected effect: the method is not invoked and the emitted item cached during STEP 4 is returned. + // Verified by: invocations and subscriptions counters, same object reference between STEPS 4 and 5 emitted items. + String emittedItem3 = cachedService.cachedMethod(KEY).await().indefinitely(); + assertEquals(1, cachedService.getInvocations()); + assertEquals("2", emittedItem3); // This checks the subscriptions counter value. + assertTrue(emittedItem2 == emittedItem3); + + // STEP 6 + // Action: same call as STEP 5 with a different key. + // Expected effect: the method is invoked, its result is wrapped into an UnresolvedUniValue which is cached. + // Verified by: invocations and subscriptions counters, CacheResultInterceptor log and different objects references between STEPS 5 and 6 emitted items. + String emittedItem4 = cachedService.cachedMethod("another-key").await().indefinitely(); + assertEquals(2, cachedService.getInvocations()); + assertEquals("3", emittedItem4); // This checks the subscriptions counter value. + assertTrue(emittedItem3 != emittedItem4); + } + + @ApplicationScoped + static class CachedService { + + private int invocations; + private int subscriptions; + + @CacheResult(cacheName = "test-cache") + public Uni cachedMethod(String key) { + invocations++; + return Uni.createFrom().item(() -> { + subscriptions++; + return "" + subscriptions; + }); + } + + public int getInvocations() { + return invocations; + } + } +} diff --git a/extensions/cache/runtime/pom.xml b/extensions/cache/runtime/pom.xml index 73dd9f19a37441..4c2bddc8d4948a 100644 --- a/extensions/cache/runtime/pom.xml +++ b/extensions/cache/runtime/pom.xml @@ -23,6 +23,10 @@ io.quarkus quarkus-caffeine + + io.quarkus + quarkus-mutiny + io.vertx vertx-web diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java index 8545d93c1b95a0..091f2236ccac44 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java @@ -4,6 +4,7 @@ import java.util.function.Function; import io.quarkus.cache.Cache; +import io.smallrye.mutiny.Uni; public abstract class AbstractCache implements Cache { @@ -32,4 +33,17 @@ public Object getDefaultKey() { public abstract void invalidate(Object key); public abstract void invalidateAll(); + + /** + * Replaces the cache value associated with the given key by an item emitted by a {@link Uni}. This method can be called + * several times for the same key, each call will then always replace the existing cache entry with the given emitted + * value. If the key no longer identifies a cache entry, this method must not put the emitted item into the cache. + */ + public abstract Uni replaceUniValue(Object key, Object emittedValue); + + /** + * Removes the cache entry identified by the given key only if the cache value is the given {@link UnresolvedUniValue}. + * This method is called in case of failure during the computation of a cached {@link Uni}. + */ + public abstract Uni removeUnresolvedUniValue(Object key, UnresolvedUniValue unresolvedUniValue); } diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java index 60d19de8fd61df..999fea544169ef 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java @@ -14,6 +14,7 @@ import org.jboss.logging.Logger; import io.quarkus.cache.CacheResult; +import io.smallrye.mutiny.Uni; @CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding. @Interceptor @@ -47,28 +48,42 @@ public Object intercept(InvocationContext invocationContext) throws Throwable { @Override public Object apply(Object k) { try { - return invocationContext.proceed(); + Object invocationResult = invocationContext.proceed(); + if (invocationResult instanceof Uni) { + LOGGER.debugf("Adding %s entry with key [%s] into cache [%s]", + UnresolvedUniValue.class.getSimpleName(), key, cache.getName()); + return new UnresolvedUniValue((Uni) invocationResult); + } else { + return invocationResult; + } } catch (Exception e) { throw new CacheException(e); } } }); + Object value; if (binding.lockTimeout() <= 0) { - return cacheValue.get(); + value = cacheValue.get(); } else { try { /* * If the current thread started the cache value computation, then the computation is already finished since * it was done synchronously and the following call will never time out. */ - return cacheValue.get(binding.lockTimeout(), TimeUnit.MILLISECONDS); + value = cacheValue.get(binding.lockTimeout(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // TODO: Add statistics here to monitor the timeout. return invocationContext.proceed(); } } + if (Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType())) { + return unwrapUniValue(cache, key, value); + } else { + return value; + } + } catch (ExecutionException e) { /* * Any exception raised during a cache computation will be encapsulated into an ExecutionException because it is @@ -93,4 +108,15 @@ public Object apply(Object k) { } } } + + private Object unwrapUniValue(AbstractCache cache, Object key, Object value) { + if (value instanceof UnresolvedUniValue) { + UnresolvedUniValue unresolvedUniValue = (UnresolvedUniValue) value; + return unresolvedUniValue.getUni() + .onItem().call(emittedValue -> cache.replaceUniValue(key, emittedValue)) + .onFailure().call(() -> cache.removeUnresolvedUniValue(key, unresolvedUniValue)); + } else { + return Uni.createFrom().item(value); + } + } } diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UnresolvedUniValue.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UnresolvedUniValue.java new file mode 100644 index 00000000000000..e3decb3e4b27b0 --- /dev/null +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UnresolvedUniValue.java @@ -0,0 +1,16 @@ +package io.quarkus.cache.runtime; + +import io.smallrye.mutiny.Uni; + +public class UnresolvedUniValue { + + private final Uni uni; + + public UnresolvedUniValue(Uni uni) { + this.uni = uni; + } + + public Uni getUni() { + return uni; + } +} diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCache.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCache.java index e4e2ba8aa11d3b..d9d2442352ccc1 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCache.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCache.java @@ -4,12 +4,16 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import org.jboss.logging.Logger; + import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Caffeine; import io.quarkus.cache.runtime.AbstractCache; import io.quarkus.cache.runtime.CacheException; import io.quarkus.cache.runtime.NullValueConverter; +import io.quarkus.cache.runtime.UnresolvedUniValue; +import io.smallrye.mutiny.Uni; /** * This class is an internal Quarkus cache implementation. Do not use it explicitly from your Quarkus application. The public @@ -17,6 +21,8 @@ */ public class CaffeineCache extends AbstractCache { + private static final Logger LOGGER = Logger.getLogger(CaffeineCache.class); + private AsyncCache cache; private String name; @@ -71,7 +77,7 @@ public CompletableFuture get(Object key, Function valueL if (key == null) { throw new NullPointerException(NULL_KEYS_NOT_SUPPORTED_MSG); } - CompletableFuture newCacheValue = new CompletableFuture(); + CompletableFuture newCacheValue = new CompletableFuture<>(); CompletableFuture existingCacheValue = cache.asMap().putIfAbsent(key, newCacheValue); if (existingCacheValue == null) { try { @@ -88,7 +94,7 @@ public CompletableFuture get(Object key, Function valueL } private CompletableFuture unwrapCacheValueOrThrowable(CompletableFuture cacheValue) { - return cacheValue.thenApply(new Function() { + return cacheValue.thenApply(new Function<>() { @Override public Object apply(Object value) { // If there's a throwable encapsulated into a CaffeineComputationThrowable, it must be rethrown. @@ -119,6 +125,38 @@ public void invalidateAll() { cache.synchronous().invalidateAll(); } + @Override + public Uni replaceUniValue(Object key, Object emittedValue) { + return Uni.createFrom().item(() -> { + // If the cache no longer contains the key because it was removed, we don't want to put it back. + cache.asMap().computeIfPresent(key, (k, currentValue) -> { + LOGGER.debugf("Replacing Uni value entry with key [%s] into cache [%s]", key, name); + /* + * The following computed value will always replace the current cache value (whether it is an + * UncomputedUniValue or not) if this method is called multiple times with the same key. + */ + return CompletableFuture.completedFuture(NullValueConverter.toCacheValue(emittedValue)); + }); + return null; + }); + } + + @Override + public Uni removeUnresolvedUniValue(Object key, UnresolvedUniValue unresolvedUniValue) { + /* + * The cache value associated with the given key will be removed only it is the given UnresolvedUniValue. + * Emitted Uni items that would be cached will never be removed by this method. + */ + return Uni.createFrom().item(() -> { + boolean removed = cache.asMap().remove(key, unresolvedUniValue); + if (removed) { + LOGGER.debugf("%s entry with key [%s] removed from cache [%s]", UnresolvedUniValue.class.getSimpleName(), key, + name); + } + return null; + }); + } + // For testing purposes only. public Integer getInitialCapacity() { return initialCapacity; diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java index 2bd7ebdb0fb07b..ab7311f073043c 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java @@ -4,6 +4,8 @@ import java.util.function.Function; import io.quarkus.cache.runtime.AbstractCache; +import io.quarkus.cache.runtime.UnresolvedUniValue; +import io.smallrye.mutiny.Uni; /** * This class is an internal Quarkus cache implementation. Do not use it explicitly from your Quarkus application. The public @@ -37,4 +39,14 @@ public void invalidate(Object key) { @Override public void invalidateAll() { } + + @Override + public Uni replaceUniValue(Object key, Object emittedValue) { + return Uni.createFrom().voidItem(); + } + + @Override + public Uni removeUnresolvedUniValue(Object key, UnresolvedUniValue unresolvedUniValue) { + return Uni.createFrom().voidItem(); + } }