Skip to content

Commit

Permalink
Cache the item emitted by a Uni instead of the Uni itself
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg committed Apr 18, 2021
1 parent bf6e069 commit 85ac88b
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 5 deletions.
4 changes: 4 additions & 0 deletions extensions/cache/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-caffeine-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny-deployment</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.quarkus.cache.test.runtime;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.cache.CacheResult;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

/**
* Tests the {@link CacheResult} annotation on methods returning {@link Uni}.
*/
public class UniValueTest {

private static final String KEY = "key";

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClass(CachedService.class).addAsResource(
new StringAsset("quarkus.log.category.\"io.quarkus.cache\".level=DEBUG"), "application.properties"));

@Inject
CachedService cachedService;

@Test
public void test() {
// STEP 1
// Action: a method annotated with @CacheResult and returning a Uni is called.
// Expected effect: the method is invoked and its result is wrapped into an UnresolvedUniValue which is cached.
// Verified by: invocations counter and CacheResultInterceptor log.
Uni<String> uni1 = cachedService.cachedMethod(KEY);
assertEquals(1, cachedService.getInvocations());

// STEP 2
// Action: same call as STEP 1.
// Expected effect: the method is NOT invoked and the cached UnresolvedUniValue from STEP 1 is used to produce a result.
// Verified by: invocations counter and CacheResultInterceptor log.
Uni<String> uni2 = cachedService.cachedMethod(KEY);
assertEquals(1, cachedService.getInvocations());

// STEP 3
// Action: the Uni returned in STEP 1 is subscribed to and we wait for an item event to be fired.
// Expected effect: the UnresolvedUniValue wrapper cached during STEP 1 is replaced with the emitted item from this step in the cache.
// Verified by: subscriptions counter and CaffeineCache log.
String emittedItem1 = uni1.await().indefinitely();
assertEquals("1", emittedItem1); // This checks the subscriptions counter value.

// STEP 4
// Action: the Uni returned in STEP 2 is subscribed to and we wait for an item event to be fired.
// Expected effect: the emitted item from STEP 3 is replaced with the emitted item from this step in the cache.
// Verified by: subscriptions counter, CaffeineCache log and different objects references between STEPS 3 and 4 emitted items.
String emittedItem2 = uni2.await().indefinitely();
assertTrue(emittedItem1 != emittedItem2);
assertEquals("2", emittedItem2); // This checks the subscriptions counter value.

// STEP 5
// Action: same call as STEP 2 but we immediately subscribe to the returned Uni and wait for an item event to be fired.
// Expected effect: the method is not invoked and the emitted item cached during STEP 4 is returned.
// Verified by: invocations and subscriptions counters, same object reference between STEPS 4 and 5 emitted items.
String emittedItem3 = cachedService.cachedMethod(KEY).await().indefinitely();
assertEquals(1, cachedService.getInvocations());
assertEquals("2", emittedItem3); // This checks the subscriptions counter value.
assertTrue(emittedItem2 == emittedItem3);

// STEP 6
// Action: same call as STEP 5 with a different key.
// Expected effect: the method is invoked, its result is wrapped into an UnresolvedUniValue which is cached.
// Verified by: invocations and subscriptions counters, CacheResultInterceptor log and different objects references between STEPS 5 and 6 emitted items.
String emittedItem4 = cachedService.cachedMethod("another-key").await().indefinitely();
assertEquals(2, cachedService.getInvocations());
assertEquals("3", emittedItem4); // This checks the subscriptions counter value.
assertTrue(emittedItem3 != emittedItem4);
}

@ApplicationScoped
static class CachedService {

private int invocations;
private int subscriptions;

@CacheResult(cacheName = "test-cache")
public Uni<String> cachedMethod(String key) {
invocations++;
return Uni.createFrom().item(() -> {
subscriptions++;
return "" + subscriptions;
});
}

public int getInvocations() {
return invocations;
}
}
}
4 changes: 4 additions & 0 deletions extensions/cache/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-caffeine</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.function.Function;

import io.quarkus.cache.Cache;
import io.smallrye.mutiny.Uni;

public abstract class AbstractCache implements Cache {

Expand Down Expand Up @@ -32,4 +33,17 @@ public Object getDefaultKey() {
public abstract void invalidate(Object key);

public abstract void invalidateAll();

/**
* Replaces the cache value associated with the given key by an item emitted by a {@link Uni}. This method can be called
* several times for the same key, each call will then always replace the existing cache entry with the given emitted
* value. If the key no longer identifies a cache entry, this method must not put the emitted item into the cache.
*/
public abstract Uni<Void> replaceUniValue(Object key, Object emittedValue);

/**
* Removes the cache entry identified by the given key only if the cache value is the given {@link UnresolvedUniValue}.
* This method is called in case of failure during the computation of a cached {@link Uni}.
*/
public abstract Uni<Void> removeUnresolvedUniValue(Object key, UnresolvedUniValue unresolvedUniValue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.jboss.logging.Logger;

import io.quarkus.cache.CacheResult;
import io.smallrye.mutiny.Uni;

@CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding.
@Interceptor
Expand Down Expand Up @@ -47,28 +48,42 @@ public Object intercept(InvocationContext invocationContext) throws Throwable {
@Override
public Object apply(Object k) {
try {
return invocationContext.proceed();
Object invocationResult = invocationContext.proceed();
if (invocationResult instanceof Uni) {
LOGGER.debugf("Adding %s entry with key [%s] into cache [%s]",
UnresolvedUniValue.class.getSimpleName(), key, cache.getName());
return new UnresolvedUniValue((Uni<Object>) invocationResult);
} else {
return invocationResult;
}
} catch (Exception e) {
throw new CacheException(e);
}
}
});

Object value;
if (binding.lockTimeout() <= 0) {
return cacheValue.get();
value = cacheValue.get();
} else {
try {
/*
* If the current thread started the cache value computation, then the computation is already finished since
* it was done synchronously and the following call will never time out.
*/
return cacheValue.get(binding.lockTimeout(), TimeUnit.MILLISECONDS);
value = cacheValue.get(binding.lockTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// TODO: Add statistics here to monitor the timeout.
return invocationContext.proceed();
}
}

if (Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType())) {
return unwrapUniValue(cache, key, value);
} else {
return value;
}

} catch (ExecutionException e) {
/*
* Any exception raised during a cache computation will be encapsulated into an ExecutionException because it is
Expand All @@ -93,4 +108,15 @@ public Object apply(Object k) {
}
}
}

private Object unwrapUniValue(AbstractCache cache, Object key, Object value) {
if (value instanceof UnresolvedUniValue) {
UnresolvedUniValue unresolvedUniValue = (UnresolvedUniValue) value;
return unresolvedUniValue.getUni()
.onItem().call(emittedValue -> cache.replaceUniValue(key, emittedValue))
.onFailure().call(() -> cache.removeUnresolvedUniValue(key, unresolvedUniValue));
} else {
return Uni.createFrom().item(value);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.cache.runtime;

import io.smallrye.mutiny.Uni;

public class UnresolvedUniValue {

private final Uni<Object> uni;

public UnresolvedUniValue(Uni<Object> uni) {
this.uni = uni;
}

public Uni<Object> getUni() {
return uni;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import org.jboss.logging.Logger;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;

import io.quarkus.cache.runtime.AbstractCache;
import io.quarkus.cache.runtime.CacheException;
import io.quarkus.cache.runtime.NullValueConverter;
import io.quarkus.cache.runtime.UnresolvedUniValue;
import io.smallrye.mutiny.Uni;

/**
* This class is an internal Quarkus cache implementation. Do not use it explicitly from your Quarkus application. The public
* methods signatures may change without prior notice.
*/
public class CaffeineCache extends AbstractCache {

private static final Logger LOGGER = Logger.getLogger(CaffeineCache.class);

private AsyncCache<Object, Object> cache;

private String name;
Expand Down Expand Up @@ -71,7 +77,7 @@ public CompletableFuture<Object> get(Object key, Function<Object, Object> valueL
if (key == null) {
throw new NullPointerException(NULL_KEYS_NOT_SUPPORTED_MSG);
}
CompletableFuture<Object> newCacheValue = new CompletableFuture<Object>();
CompletableFuture<Object> newCacheValue = new CompletableFuture<>();
CompletableFuture<Object> existingCacheValue = cache.asMap().putIfAbsent(key, newCacheValue);
if (existingCacheValue == null) {
try {
Expand All @@ -88,7 +94,7 @@ public CompletableFuture<Object> get(Object key, Function<Object, Object> valueL
}

private CompletableFuture<Object> unwrapCacheValueOrThrowable(CompletableFuture<Object> cacheValue) {
return cacheValue.thenApply(new Function<Object, Object>() {
return cacheValue.thenApply(new Function<>() {
@Override
public Object apply(Object value) {
// If there's a throwable encapsulated into a CaffeineComputationThrowable, it must be rethrown.
Expand Down Expand Up @@ -119,6 +125,38 @@ public void invalidateAll() {
cache.synchronous().invalidateAll();
}

@Override
public Uni<Void> replaceUniValue(Object key, Object emittedValue) {
return Uni.createFrom().item(() -> {
// If the cache no longer contains the key because it was removed, we don't want to put it back.
cache.asMap().computeIfPresent(key, (k, currentValue) -> {
LOGGER.debugf("Replacing Uni value entry with key [%s] into cache [%s]", key, name);
/*
* The following computed value will always replace the current cache value (whether it is an
* UncomputedUniValue or not) if this method is called multiple times with the same key.
*/
return CompletableFuture.completedFuture(NullValueConverter.toCacheValue(emittedValue));
});
return null;
});
}

@Override
public Uni<Void> removeUnresolvedUniValue(Object key, UnresolvedUniValue unresolvedUniValue) {
/*
* The cache value associated with the given key will be removed only it is the given UnresolvedUniValue.
* Emitted Uni items that would be cached will never be removed by this method.
*/
return Uni.createFrom().item(() -> {
boolean removed = cache.asMap().remove(key, unresolvedUniValue);
if (removed) {
LOGGER.debugf("%s entry with key [%s] removed from cache [%s]", UnresolvedUniValue.class.getSimpleName(), key,
name);
}
return null;
});
}

// For testing purposes only.
public Integer getInitialCapacity() {
return initialCapacity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.function.Function;

import io.quarkus.cache.runtime.AbstractCache;
import io.quarkus.cache.runtime.UnresolvedUniValue;
import io.smallrye.mutiny.Uni;

/**
* This class is an internal Quarkus cache implementation. Do not use it explicitly from your Quarkus application. The public
Expand Down Expand Up @@ -37,4 +39,14 @@ public void invalidate(Object key) {
@Override
public void invalidateAll() {
}

@Override
public Uni<Void> replaceUniValue(Object key, Object emittedValue) {
return Uni.createFrom().voidItem();
}

@Override
public Uni<Void> removeUnresolvedUniValue(Object key, UnresolvedUniValue unresolvedUniValue) {
return Uni.createFrom().voidItem();
}
}

0 comments on commit 85ac88b

Please sign in to comment.