Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache programmatic API - support asynchronous value loader #30314

Merged
merged 1 commit into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import javax.enterprise.context.Dependent;
Expand All @@ -30,6 +31,7 @@
import io.quarkus.cache.CaffeineCache;
import io.quarkus.cache.runtime.caffeine.CaffeineCacheImpl;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

public class ProgrammaticApiTest {

Expand Down Expand Up @@ -225,6 +227,32 @@ public void testInvalidatePredicate() throws Exception {
assertNull(cache.as(CaffeineCache.class).getIfPresent(key));
}

@Test
public void testAsyncLoader() throws Exception {
// Action: value retrieval from the cache.
// Expected effect: asyncvalue loader function used lazily.
// Verified by: same object reference returned from the cache.
String key = "alpha";
String expectedValue = "foo";
AtomicInteger loaded = new AtomicInteger();
Function<String, Uni<String>> loader = new Function<String, Uni<String>>() {

@Override
public Uni<String> apply(String t) {
return Uni.createFrom().item(expectedValue + loaded.incrementAndGet());
}
};
Uni<String> resultUni = cache.getAsync(key, loader);
assertEquals(0, loaded.get());
String value = resultUni.await().indefinitely();
assertEquals(1, loaded.get());
assertEquals(expectedValue + "1", value);
assertEquals(expectedValue + "1", cache.getAsync(key, loader).await().indefinitely());
assertEquals(1, loaded.get());
assertKeySetContains(key);
assertGetIfPresent(key, value);
}

private void assertKeySetContains(Object... expectedKeys) {
Set<Object> expectedKeySet = new HashSet<>(Arrays.asList(expectedKeys));
Set<Object> actualKeySet = cache.as(CaffeineCache.class).keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public <K, V> Uni<V> get(K key, Function<K, V> valueLoader) {
throw new UnsupportedOperationException("This method is not tested here");
}

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
throw new UnsupportedOperationException("This method is not tested here");
}

@Override
public Uni<Void> invalidate(Object key) {
throw new UnsupportedOperationException("This method is not tested here");
Expand Down
14 changes: 14 additions & 0 deletions extensions/cache/runtime/src/main/java/io/quarkus/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ public interface Cache {
*/
<K, V> Uni<V> get(K key, Function<K, V> valueLoader);

/**
* Returns a lazy asynchronous action that will emit the cache value identified by {@code key}, obtaining that value from
* {@code valueLoader} if necessary.
*
* @param <K>
* @param <V>
* @param key
* @param valueLoader
* @return a lazy asynchronous action that will emit a cache value
* @throws NullPointerException if the key is {@code null}
*/
<K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader);

/**
* Removes the cache entry identified by {@code key} from the cache. If the key does not identify any cache entry, nothing
* will happen.
Expand Down Expand Up @@ -69,4 +82,5 @@ public interface Cache {
* @throws IllegalStateException if this cache is not an instance of {@code type}
*/
<T extends Cache> T as(Class<T> type);

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.quarkus.cache.Cache;
import io.quarkus.cache.DefaultCacheKey;
import io.smallrye.mutiny.Uni;

public abstract class AbstractCache implements Cache {

Expand All @@ -28,10 +27,4 @@ public <T extends Cache> T as(Class<T> type) {
}
}

/**
* Replaces the cache value associated with the given key by an item emitted by a {@link Uni}. This method can be called
* several times for the same key, each call will then always replace the existing cache entry with the given emitted
* value. If the key no longer identifies a cache entry, this method must not put the emitted item into the cache.
*/
public abstract Uni<Void> replaceUniValue(Object key, Object emittedValue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,20 @@ public Object intercept(InvocationContext invocationContext) throws Throwable {
try {
ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType());
if (returnType != ReturnType.NonAsync) {
Uni<Object> cacheValue = cache.get(key, new Function<Object, Object>() {
Uni<Object> cacheValue = cache.getAsync(key, new Function<Object, Uni<Object>>() {
@SuppressWarnings("unchecked")
@Override
public Object apply(Object k) {
LOGGER.debugf("Adding %s entry with key [%s] into cache [%s]",
UnresolvedUniValue.class.getSimpleName(), key, binding.cacheName());
return UnresolvedUniValue.INSTANCE;
}
}).onItem().transformToUni(new Function<Object, Uni<?>>() {
@Override
public Uni<?> apply(Object value) {
if (value == UnresolvedUniValue.INSTANCE) {
try {
return asyncInvocationResultToUni(invocationContext.proceed(), returnType)
.call(new Function<Object, Uni<?>>() {
@Override
public Uni<?> apply(Object emittedValue) {
return cache.replaceUniValue(key, emittedValue);
mkouba marked this conversation as resolved.
Show resolved Hide resolved
}
});
} catch (CacheException e) {
throw e;
} catch (Exception e) {
throw new CacheException(e);
}
} else {
return Uni.createFrom().item(value);
public Uni<Object> apply(Object key) {
try {
return (Uni<Object>) asyncInvocationResultToUni(invocationContext.proceed(), returnType);
} catch (CacheException e) {
throw e;
} catch (Exception e) {
throw new CacheException(e);
}
}
});

if (binding.lockTimeout() <= 0) {
return createAsyncResult(cacheValue, returnType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
/**
* This value acts as a placeholder in the cache. It will be eventually replaced by the item emitted by the
* {@link io.smallrye.mutiny.Uni Uni} when it has been resolved.
*
* @deprecated This placeholder is not used anymore and will be removed at some time after Quarkus 3.0.
*/
@Deprecated
public class UnresolvedUniValue {

public static final UnresolvedUniValue INSTANCE = new UnresolvedUniValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CaffeineCacheImpl extends AbstractCache implements CaffeineCache {

private final CaffeineCacheInfo cacheInfo;
private final StatsCounter statsCounter;
private final boolean recordStats;

public CaffeineCacheImpl(CaffeineCacheInfo cacheInfo, boolean recordStats) {
this.cacheInfo = cacheInfo;
Expand All @@ -56,6 +57,7 @@ public CaffeineCacheImpl(CaffeineCacheInfo cacheInfo, boolean recordStats) {
if (cacheInfo.expireAfterAccess != null) {
builder.expireAfterAccess(cacheInfo.expireAfterAccess);
}
this.recordStats = recordStats;
if (recordStats) {
LOGGER.tracef("Recording Caffeine stats for cache [%s]", cacheInfo.name);
statsCounter = new ConcurrentStatsCounter();
Expand Down Expand Up @@ -94,6 +96,20 @@ public CompletionStage<V> get() {
});
}

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
Objects.requireNonNull(key, NULL_KEYS_NOT_SUPPORTED_MSG);
return Uni.createFrom()
.completionStage(new Supplier<CompletionStage<V>>() {
@Override
public CompletionStage<V> get() {
// When stats are enabled we need to use Map.compute() in order to call statsCounter.recordHits(1)
// Map.compute() is more costly compared to Map.computeIfAbsent() because the remapping function is always called and the returned value is replaced
return recordStats ? computeWithStats(key, valueLoader) : computeWithoutStats(key, valueLoader);
}
}).map(fromCacheValue());
}

@Override
public <V> CompletableFuture<V> getIfPresent(Object key) {
Objects.requireNonNull(key, NULL_KEYS_NOT_SUPPORTED_MSG);
Expand Down Expand Up @@ -206,29 +222,6 @@ public Void get() {
});
}

@Override
public Uni<Void> replaceUniValue(Object key, Object emittedValue) {
return Uni.createFrom().item(new Supplier<Void>() {
@Override
public Void get() {
// If the cache no longer contains the key because it was removed, we don't want to put it back.
cache.asMap().computeIfPresent(key,
new BiFunction<Object, CompletableFuture<Object>, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object k, CompletableFuture<Object> currentValue) {
LOGGER.debugf("Replacing Uni value entry with key [%s] into cache [%s]", key, cacheInfo.name);
/*
* The following computed value will always replace the current cache value (whether it is an
* UnresolvedUniValue or not) if this method is called multiple times with the same key.
*/
return CompletableFuture.completedFuture(NullValueConverter.toCacheValue(emittedValue));
}
});
return null;
}
});
}

@Override
public Set<Object> keySet() {
return Collections.unmodifiableSet(new HashSet<>(cache.asMap().keySet()));
Expand Down Expand Up @@ -293,4 +286,59 @@ private <T> T cast(Object value) {
"An existing cached value type does not match the type returned by the value loading function", e);
}
}

@SuppressWarnings("unchecked")
private <K, V> CompletionStage<V> computeWithStats(K key, Function<K, Uni<V>> valueLoader) {
return (CompletionStage<V>) cache.asMap().compute(key,
new BiFunction<Object, CompletableFuture<Object>, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object key, CompletableFuture<Object> value) {
if (value == null) {
statsCounter.recordMisses(1);
return valueLoader.apply((K) key)
.map(TO_CACHE_VALUE)
.subscribeAsCompletionStage();
} else {
LOGGER.tracef("Key [%s] found in cache [%s]", key, cacheInfo.name);
statsCounter.recordHits(1);
return value;
}
}
});
}

@SuppressWarnings("unchecked")
private <K, V> CompletionStage<V> computeWithoutStats(K key, Function<K, Uni<V>> valueLoader) {
return (CompletionStage<V>) cache.asMap().computeIfAbsent(key,
new Function<Object, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object key) {
return valueLoader.apply((K) key)
.map(TO_CACHE_VALUE)
.subscribeAsCompletionStage();
}
});
}

@SuppressWarnings("unchecked")
private <V> Function<V, V> fromCacheValue() {
return (Function<V, V>) FROM_CACHE_VALUE;
}

private static final Function<Object, Object> FROM_CACHE_VALUE = new Function<Object, Object>() {

@Override
public Object apply(Object value) {
return NullValueConverter.fromCacheValue(value);
}
};

private static final Function<Object, Object> TO_CACHE_VALUE = new Function<Object, Object>() {

@Override
public Object apply(Object value) {
return NullValueConverter.toCacheValue(value);
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public V get() {
});
}

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
return valueLoader.apply(key);
}

@Override
public Uni<Void> invalidate(Object key) {
return Uni.createFrom().voidItem();
Expand All @@ -45,8 +50,4 @@ public Uni<Void> invalidateIf(Predicate<Object> predicate) {
return Uni.createFrom().voidItem();
}

@Override
public Uni<Void> replaceUniValue(Object key, Object emittedValue) {
return Uni.createFrom().voidItem();
}
}