From 36b0c68f21d6b233f596772428e308aee6fc5262 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Wed, 11 Jan 2023 16:59:19 +0100 Subject: [PATCH] Cache programmatic API - support asynchronous value loader - resolves #30311 - CacheResultInterceptor - use Cache#getAsync() for async return types - deprecate UnresolvedUniValue --- .../test/runtime/ProgrammaticApiTest.java | 28 ++++++ .../test/runtime/SpecializedCacheTest.java | 5 + .../src/main/java/io/quarkus/cache/Cache.java | 14 +++ .../quarkus/cache/runtime/AbstractCache.java | 7 -- .../cache/runtime/CacheResultInterceptor.java | 35 ++----- .../cache/runtime/UnresolvedUniValue.java | 3 + .../runtime/caffeine/CaffeineCacheImpl.java | 94 ++++++++++++++----- .../quarkus/cache/runtime/noop/NoOpCache.java | 9 +- 8 files changed, 136 insertions(+), 59 deletions(-) diff --git a/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/ProgrammaticApiTest.java b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/ProgrammaticApiTest.java index d4041ee2323a3..c88078ca58a5c 100644 --- a/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/ProgrammaticApiTest.java +++ b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/ProgrammaticApiTest.java @@ -13,6 +13,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import javax.enterprise.context.Dependent; @@ -30,6 +31,7 @@ import io.quarkus.cache.CaffeineCache; import io.quarkus.cache.runtime.caffeine.CaffeineCacheImpl; import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Uni; public class ProgrammaticApiTest { @@ -225,6 +227,32 @@ public void testInvalidatePredicate() throws Exception { assertNull(cache.as(CaffeineCache.class).getIfPresent(key)); } + @Test + public void testAsyncLoader() throws Exception { + // Action: value retrieval from the cache. + // Expected effect: asyncvalue loader function used lazily. + // Verified by: same object reference returned from the cache. + String key = "alpha"; + String expectedValue = "foo"; + AtomicInteger loaded = new AtomicInteger(); + Function> loader = new Function>() { + + @Override + public Uni apply(String t) { + return Uni.createFrom().item(expectedValue + loaded.incrementAndGet()); + } + }; + Uni resultUni = cache.getAsync(key, loader); + assertEquals(0, loaded.get()); + String value = resultUni.await().indefinitely(); + assertEquals(1, loaded.get()); + assertEquals(expectedValue + "1", value); + assertEquals(expectedValue + "1", cache.getAsync(key, loader).await().indefinitely()); + assertEquals(1, loaded.get()); + assertKeySetContains(key); + assertGetIfPresent(key, value); + } + private void assertKeySetContains(Object... expectedKeys) { Set expectedKeySet = new HashSet<>(Arrays.asList(expectedKeys)); Set actualKeySet = cache.as(CaffeineCache.class).keySet(); diff --git a/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/SpecializedCacheTest.java b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/SpecializedCacheTest.java index 756aebc9610bc..5bcbee0a5b0dc 100644 --- a/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/SpecializedCacheTest.java +++ b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/SpecializedCacheTest.java @@ -66,6 +66,11 @@ public Uni get(K key, Function valueLoader) { throw new UnsupportedOperationException("This method is not tested here"); } + @Override + public Uni getAsync(K key, Function> valueLoader) { + throw new UnsupportedOperationException("This method is not tested here"); + } + @Override public Uni invalidate(Object key) { throw new UnsupportedOperationException("This method is not tested here"); diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/Cache.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/Cache.java index 264978e4f5f01..a7d7b3e9b2637 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/Cache.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/Cache.java @@ -41,6 +41,19 @@ public interface Cache { */ Uni get(K key, Function valueLoader); + /** + * Returns a lazy asynchronous action that will emit the cache value identified by {@code key}, obtaining that value from + * {@code valueLoader} if necessary. + * + * @param + * @param + * @param key + * @param valueLoader + * @return a lazy asynchronous action that will emit a cache value + * @throws NullPointerException if the key is {@code null} + */ + Uni getAsync(K key, Function> valueLoader); + /** * Removes the cache entry identified by {@code key} from the cache. If the key does not identify any cache entry, nothing * will happen. @@ -69,4 +82,5 @@ public interface Cache { * @throws IllegalStateException if this cache is not an instance of {@code type} */ T as(Class type); + } diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java index f3a468287ac07..811ce67003571 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/AbstractCache.java @@ -2,7 +2,6 @@ import io.quarkus.cache.Cache; import io.quarkus.cache.DefaultCacheKey; -import io.smallrye.mutiny.Uni; public abstract class AbstractCache implements Cache { @@ -28,10 +27,4 @@ public T as(Class type) { } } - /** - * Replaces the cache value associated with the given key by an item emitted by a {@link Uni}. This method can be called - * several times for the same key, each call will then always replace the existing cache entry with the given emitted - * value. If the key no longer identifies a cache entry, this method must not put the emitted item into the cache. - */ - public abstract Uni replaceUniValue(Object key, Object emittedValue); } diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java index e1c6e94bcbd51..b16691a6ecb7f 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java @@ -53,35 +53,20 @@ public Object intercept(InvocationContext invocationContext) throws Throwable { try { ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType()); if (returnType != ReturnType.NonAsync) { - Uni cacheValue = cache.get(key, new Function() { + Uni cacheValue = cache.getAsync(key, new Function>() { + @SuppressWarnings("unchecked") @Override - public Object apply(Object k) { - LOGGER.debugf("Adding %s entry with key [%s] into cache [%s]", - UnresolvedUniValue.class.getSimpleName(), key, binding.cacheName()); - return UnresolvedUniValue.INSTANCE; - } - }).onItem().transformToUni(new Function>() { - @Override - public Uni apply(Object value) { - if (value == UnresolvedUniValue.INSTANCE) { - try { - return asyncInvocationResultToUni(invocationContext.proceed(), returnType) - .call(new Function>() { - @Override - public Uni apply(Object emittedValue) { - return cache.replaceUniValue(key, emittedValue); - } - }); - } catch (CacheException e) { - throw e; - } catch (Exception e) { - throw new CacheException(e); - } - } else { - return Uni.createFrom().item(value); + public Uni apply(Object key) { + try { + return (Uni) asyncInvocationResultToUni(invocationContext.proceed(), returnType); + } catch (CacheException e) { + throw e; + } catch (Exception e) { + throw new CacheException(e); } } }); + if (binding.lockTimeout() <= 0) { return createAsyncResult(cacheValue, returnType); } diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UnresolvedUniValue.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UnresolvedUniValue.java index e466cb39e3e5f..4373e319b41d4 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UnresolvedUniValue.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/UnresolvedUniValue.java @@ -3,7 +3,10 @@ /** * This value acts as a placeholder in the cache. It will be eventually replaced by the item emitted by the * {@link io.smallrye.mutiny.Uni Uni} when it has been resolved. + * + * @deprecated This placeholder is not used anymore and will be removed at some time after Quarkus 3.0. */ +@Deprecated public class UnresolvedUniValue { public static final UnresolvedUniValue INSTANCE = new UnresolvedUniValue(); diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCacheImpl.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCacheImpl.java index acc5e0a6130c2..b4a19e2f58c34 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCacheImpl.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCacheImpl.java @@ -40,6 +40,7 @@ public class CaffeineCacheImpl extends AbstractCache implements CaffeineCache { private final CaffeineCacheInfo cacheInfo; private final StatsCounter statsCounter; + private final boolean recordStats; public CaffeineCacheImpl(CaffeineCacheInfo cacheInfo, boolean recordStats) { this.cacheInfo = cacheInfo; @@ -56,6 +57,7 @@ public CaffeineCacheImpl(CaffeineCacheInfo cacheInfo, boolean recordStats) { if (cacheInfo.expireAfterAccess != null) { builder.expireAfterAccess(cacheInfo.expireAfterAccess); } + this.recordStats = recordStats; if (recordStats) { LOGGER.tracef("Recording Caffeine stats for cache [%s]", cacheInfo.name); statsCounter = new ConcurrentStatsCounter(); @@ -94,6 +96,20 @@ public CompletionStage get() { }); } + @Override + public Uni getAsync(K key, Function> valueLoader) { + Objects.requireNonNull(key, NULL_KEYS_NOT_SUPPORTED_MSG); + return Uni.createFrom() + .completionStage(new Supplier>() { + @Override + public CompletionStage get() { + // When stats are enabled we need to use Map.compute() in order to call statsCounter.recordHits(1) + // Map.compute() is more costly compared to Map.computeIfAbsent() because the remapping function is always called and the returned value is replaced + return recordStats ? computeWithStats(key, valueLoader) : computeWithoutStats(key, valueLoader); + } + }).map(fromCacheValue()); + } + @Override public CompletableFuture getIfPresent(Object key) { Objects.requireNonNull(key, NULL_KEYS_NOT_SUPPORTED_MSG); @@ -206,29 +222,6 @@ public Void get() { }); } - @Override - public Uni replaceUniValue(Object key, Object emittedValue) { - return Uni.createFrom().item(new Supplier() { - @Override - public Void get() { - // If the cache no longer contains the key because it was removed, we don't want to put it back. - cache.asMap().computeIfPresent(key, - new BiFunction, CompletableFuture>() { - @Override - public CompletableFuture apply(Object k, CompletableFuture currentValue) { - LOGGER.debugf("Replacing Uni value entry with key [%s] into cache [%s]", key, cacheInfo.name); - /* - * The following computed value will always replace the current cache value (whether it is an - * UnresolvedUniValue or not) if this method is called multiple times with the same key. - */ - return CompletableFuture.completedFuture(NullValueConverter.toCacheValue(emittedValue)); - } - }); - return null; - } - }); - } - @Override public Set keySet() { return Collections.unmodifiableSet(new HashSet<>(cache.asMap().keySet())); @@ -293,4 +286,59 @@ private T cast(Object value) { "An existing cached value type does not match the type returned by the value loading function", e); } } + + @SuppressWarnings("unchecked") + private CompletionStage computeWithStats(K key, Function> valueLoader) { + return (CompletionStage) cache.asMap().compute(key, + new BiFunction, CompletableFuture>() { + @Override + public CompletableFuture apply(Object key, CompletableFuture value) { + if (value == null) { + statsCounter.recordMisses(1); + return valueLoader.apply((K) key) + .map(TO_CACHE_VALUE) + .subscribeAsCompletionStage(); + } else { + LOGGER.tracef("Key [%s] found in cache [%s]", key, cacheInfo.name); + statsCounter.recordHits(1); + return value; + } + } + }); + } + + @SuppressWarnings("unchecked") + private CompletionStage computeWithoutStats(K key, Function> valueLoader) { + return (CompletionStage) cache.asMap().computeIfAbsent(key, + new Function>() { + @Override + public CompletableFuture apply(Object key) { + return valueLoader.apply((K) key) + .map(TO_CACHE_VALUE) + .subscribeAsCompletionStage(); + } + }); + } + + @SuppressWarnings("unchecked") + private Function fromCacheValue() { + return (Function) FROM_CACHE_VALUE; + } + + private static final Function FROM_CACHE_VALUE = new Function() { + + @Override + public Object apply(Object value) { + return NullValueConverter.fromCacheValue(value); + } + }; + + private static final Function TO_CACHE_VALUE = new Function() { + + @Override + public Object apply(Object value) { + return NullValueConverter.toCacheValue(value); + } + }; + } diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java index 3db04068ead48..fd22220bd79ea 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/noop/NoOpCache.java @@ -30,6 +30,11 @@ public V get() { }); } + @Override + public Uni getAsync(K key, Function> valueLoader) { + return valueLoader.apply(key); + } + @Override public Uni invalidate(Object key) { return Uni.createFrom().voidItem(); @@ -45,8 +50,4 @@ public Uni invalidateIf(Predicate predicate) { return Uni.createFrom().voidItem(); } - @Override - public Uni replaceUniValue(Object key, Object emittedValue) { - return Uni.createFrom().voidItem(); - } }