Skip to content

Commit

Permalink
Merge pull request #30314 from mkouba/issue-30311
Browse files Browse the repository at this point in the history
Cache programmatic API - support asynchronous value loader
  • Loading branch information
mkouba authored Jan 19, 2023
2 parents 746634a + 36b0c68 commit 3a4b19d
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import javax.enterprise.context.Dependent;
Expand All @@ -30,6 +31,7 @@
import io.quarkus.cache.CaffeineCache;
import io.quarkus.cache.runtime.caffeine.CaffeineCacheImpl;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

public class ProgrammaticApiTest {

Expand Down Expand Up @@ -225,6 +227,32 @@ public void testInvalidatePredicate() throws Exception {
assertNull(cache.as(CaffeineCache.class).getIfPresent(key));
}

@Test
public void testAsyncLoader() throws Exception {
// Action: value retrieval from the cache.
// Expected effect: asyncvalue loader function used lazily.
// Verified by: same object reference returned from the cache.
String key = "alpha";
String expectedValue = "foo";
AtomicInteger loaded = new AtomicInteger();
Function<String, Uni<String>> loader = new Function<String, Uni<String>>() {

@Override
public Uni<String> apply(String t) {
return Uni.createFrom().item(expectedValue + loaded.incrementAndGet());
}
};
Uni<String> resultUni = cache.getAsync(key, loader);
assertEquals(0, loaded.get());
String value = resultUni.await().indefinitely();
assertEquals(1, loaded.get());
assertEquals(expectedValue + "1", value);
assertEquals(expectedValue + "1", cache.getAsync(key, loader).await().indefinitely());
assertEquals(1, loaded.get());
assertKeySetContains(key);
assertGetIfPresent(key, value);
}

private void assertKeySetContains(Object... expectedKeys) {
Set<Object> expectedKeySet = new HashSet<>(Arrays.asList(expectedKeys));
Set<Object> actualKeySet = cache.as(CaffeineCache.class).keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public <K, V> Uni<V> get(K key, Function<K, V> valueLoader) {
throw new UnsupportedOperationException("This method is not tested here");
}

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
throw new UnsupportedOperationException("This method is not tested here");
}

@Override
public Uni<Void> invalidate(Object key) {
throw new UnsupportedOperationException("This method is not tested here");
Expand Down
14 changes: 14 additions & 0 deletions extensions/cache/runtime/src/main/java/io/quarkus/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ public interface Cache {
*/
<K, V> Uni<V> get(K key, Function<K, V> valueLoader);

/**
* Returns a lazy asynchronous action that will emit the cache value identified by {@code key}, obtaining that value from
* {@code valueLoader} if necessary.
*
* @param <K>
* @param <V>
* @param key
* @param valueLoader
* @return a lazy asynchronous action that will emit a cache value
* @throws NullPointerException if the key is {@code null}
*/
<K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader);

/**
* Removes the cache entry identified by {@code key} from the cache. If the key does not identify any cache entry, nothing
* will happen.
Expand Down Expand Up @@ -69,4 +82,5 @@ public interface Cache {
* @throws IllegalStateException if this cache is not an instance of {@code type}
*/
<T extends Cache> T as(Class<T> type);

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.quarkus.cache.Cache;
import io.quarkus.cache.DefaultCacheKey;
import io.smallrye.mutiny.Uni;

public abstract class AbstractCache implements Cache {

Expand All @@ -28,10 +27,4 @@ public <T extends Cache> T as(Class<T> type) {
}
}

/**
* Replaces the cache value associated with the given key by an item emitted by a {@link Uni}. This method can be called
* several times for the same key, each call will then always replace the existing cache entry with the given emitted
* value. If the key no longer identifies a cache entry, this method must not put the emitted item into the cache.
*/
public abstract Uni<Void> replaceUniValue(Object key, Object emittedValue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,20 @@ public Object intercept(InvocationContext invocationContext) throws Throwable {
try {
ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType());
if (returnType != ReturnType.NonAsync) {
Uni<Object> cacheValue = cache.get(key, new Function<Object, Object>() {
Uni<Object> cacheValue = cache.getAsync(key, new Function<Object, Uni<Object>>() {
@SuppressWarnings("unchecked")
@Override
public Object apply(Object k) {
LOGGER.debugf("Adding %s entry with key [%s] into cache [%s]",
UnresolvedUniValue.class.getSimpleName(), key, binding.cacheName());
return UnresolvedUniValue.INSTANCE;
}
}).onItem().transformToUni(new Function<Object, Uni<?>>() {
@Override
public Uni<?> apply(Object value) {
if (value == UnresolvedUniValue.INSTANCE) {
try {
return asyncInvocationResultToUni(invocationContext.proceed(), returnType)
.call(new Function<Object, Uni<?>>() {
@Override
public Uni<?> apply(Object emittedValue) {
return cache.replaceUniValue(key, emittedValue);
}
});
} catch (CacheException e) {
throw e;
} catch (Exception e) {
throw new CacheException(e);
}
} else {
return Uni.createFrom().item(value);
public Uni<Object> apply(Object key) {
try {
return (Uni<Object>) asyncInvocationResultToUni(invocationContext.proceed(), returnType);
} catch (CacheException e) {
throw e;
} catch (Exception e) {
throw new CacheException(e);
}
}
});

if (binding.lockTimeout() <= 0) {
return createAsyncResult(cacheValue, returnType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
/**
* This value acts as a placeholder in the cache. It will be eventually replaced by the item emitted by the
* {@link io.smallrye.mutiny.Uni Uni} when it has been resolved.
*
* @deprecated This placeholder is not used anymore and will be removed at some time after Quarkus 3.0.
*/
@Deprecated
public class UnresolvedUniValue {

public static final UnresolvedUniValue INSTANCE = new UnresolvedUniValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CaffeineCacheImpl extends AbstractCache implements CaffeineCache {

private final CaffeineCacheInfo cacheInfo;
private final StatsCounter statsCounter;
private final boolean recordStats;

public CaffeineCacheImpl(CaffeineCacheInfo cacheInfo, boolean recordStats) {
this.cacheInfo = cacheInfo;
Expand All @@ -56,6 +57,7 @@ public CaffeineCacheImpl(CaffeineCacheInfo cacheInfo, boolean recordStats) {
if (cacheInfo.expireAfterAccess != null) {
builder.expireAfterAccess(cacheInfo.expireAfterAccess);
}
this.recordStats = recordStats;
if (recordStats) {
LOGGER.tracef("Recording Caffeine stats for cache [%s]", cacheInfo.name);
statsCounter = new ConcurrentStatsCounter();
Expand Down Expand Up @@ -94,6 +96,20 @@ public CompletionStage<V> get() {
});
}

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
Objects.requireNonNull(key, NULL_KEYS_NOT_SUPPORTED_MSG);
return Uni.createFrom()
.completionStage(new Supplier<CompletionStage<V>>() {
@Override
public CompletionStage<V> get() {
// When stats are enabled we need to use Map.compute() in order to call statsCounter.recordHits(1)
// Map.compute() is more costly compared to Map.computeIfAbsent() because the remapping function is always called and the returned value is replaced
return recordStats ? computeWithStats(key, valueLoader) : computeWithoutStats(key, valueLoader);
}
}).map(fromCacheValue());
}

@Override
public <V> CompletableFuture<V> getIfPresent(Object key) {
Objects.requireNonNull(key, NULL_KEYS_NOT_SUPPORTED_MSG);
Expand Down Expand Up @@ -206,29 +222,6 @@ public Void get() {
});
}

@Override
public Uni<Void> replaceUniValue(Object key, Object emittedValue) {
return Uni.createFrom().item(new Supplier<Void>() {
@Override
public Void get() {
// If the cache no longer contains the key because it was removed, we don't want to put it back.
cache.asMap().computeIfPresent(key,
new BiFunction<Object, CompletableFuture<Object>, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object k, CompletableFuture<Object> currentValue) {
LOGGER.debugf("Replacing Uni value entry with key [%s] into cache [%s]", key, cacheInfo.name);
/*
* The following computed value will always replace the current cache value (whether it is an
* UnresolvedUniValue or not) if this method is called multiple times with the same key.
*/
return CompletableFuture.completedFuture(NullValueConverter.toCacheValue(emittedValue));
}
});
return null;
}
});
}

@Override
public Set<Object> keySet() {
return Collections.unmodifiableSet(new HashSet<>(cache.asMap().keySet()));
Expand Down Expand Up @@ -293,4 +286,59 @@ private <T> T cast(Object value) {
"An existing cached value type does not match the type returned by the value loading function", e);
}
}

@SuppressWarnings("unchecked")
private <K, V> CompletionStage<V> computeWithStats(K key, Function<K, Uni<V>> valueLoader) {
return (CompletionStage<V>) cache.asMap().compute(key,
new BiFunction<Object, CompletableFuture<Object>, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object key, CompletableFuture<Object> value) {
if (value == null) {
statsCounter.recordMisses(1);
return valueLoader.apply((K) key)
.map(TO_CACHE_VALUE)
.subscribeAsCompletionStage();
} else {
LOGGER.tracef("Key [%s] found in cache [%s]", key, cacheInfo.name);
statsCounter.recordHits(1);
return value;
}
}
});
}

@SuppressWarnings("unchecked")
private <K, V> CompletionStage<V> computeWithoutStats(K key, Function<K, Uni<V>> valueLoader) {
return (CompletionStage<V>) cache.asMap().computeIfAbsent(key,
new Function<Object, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object key) {
return valueLoader.apply((K) key)
.map(TO_CACHE_VALUE)
.subscribeAsCompletionStage();
}
});
}

@SuppressWarnings("unchecked")
private <V> Function<V, V> fromCacheValue() {
return (Function<V, V>) FROM_CACHE_VALUE;
}

private static final Function<Object, Object> FROM_CACHE_VALUE = new Function<Object, Object>() {

@Override
public Object apply(Object value) {
return NullValueConverter.fromCacheValue(value);
}
};

private static final Function<Object, Object> TO_CACHE_VALUE = new Function<Object, Object>() {

@Override
public Object apply(Object value) {
return NullValueConverter.toCacheValue(value);
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public V get() {
});
}

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
return valueLoader.apply(key);
}

@Override
public Uni<Void> invalidate(Object key) {
return Uni.createFrom().voidItem();
Expand All @@ -45,8 +50,4 @@ public Uni<Void> invalidateIf(Predicate<Object> predicate) {
return Uni.createFrom().voidItem();
}

@Override
public Uni<Void> replaceUniValue(Object key, Object emittedValue) {
return Uni.createFrom().voidItem();
}
}

0 comments on commit 3a4b19d

Please sign in to comment.