Skip to content

Commit

Permalink
Support non-blocking cache invalidation
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg committed Jan 10, 2022
1 parent 15ed504 commit 0bc0c69
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 157 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package io.quarkus.cache.test.runtime;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.cache.CacheInvalidate;
import io.quarkus.cache.CacheInvalidateAll;
import io.quarkus.cache.CacheResult;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

/**
* Tests the caching annotations on methods returning {@link Uni}.
*/
public class UniReturnTypeTest {

private static final String CACHE_NAME_1 = "test-cache-1";
private static final String CACHE_NAME_2 = "test-cache-2";
private static final String KEY_1 = "key-1";
private static final String KEY_2 = "key-2";

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest().withApplicationRoot((jar) -> jar.addClass(CachedService.class));

@Inject
CachedService cachedService;

@Test
void testCacheResult() {
// STEP 1
// Action: a method annotated with @CacheResult and returning a Uni is called.
// Expected effect: the method is not invoked, as Uni is lazy.
// Verified by: invocations counter.
Uni<String> uni1 = cachedService.cacheResult1(KEY_1);
assertEquals(0, cachedService.getCacheResultInvocations());

// STEP 2
// Action: same call as STEP 1.
// Expected effect: same as STEP 1 with a different Uni instance returned.
// Verified by: invocations counter and different objects references between STEPS 1 AND 2 results.
Uni<String> uni2 = cachedService.cacheResult1(KEY_1);
assertEquals(0, cachedService.getCacheResultInvocations());
assertNotSame(uni1, uni2);

// STEP 3
// Action: the Uni returned in STEP 1 is subscribed to and we wait for an item event to be fired.
// Expected effect: the method from STEP 1 is invoked and its result is cached.
// Verified by: invocations counter and STEP 4.
String emittedItem1 = uni1.await().indefinitely();
assertEquals(1, cachedService.getCacheResultInvocations());

// STEP 4
// Action: the Uni returned in STEP 2 is subscribed to and we wait for an item event to be fired.
// Expected effect: the method from STEP 2 is not invoked and the value cached in STEP 3 is returned.
// Verified by: invocations counter and same object reference between STEPS 3 and 4 emitted items.
String emittedItem2 = uni2.await().indefinitely();
assertEquals(1, cachedService.getCacheResultInvocations());
assertSame(emittedItem1, emittedItem2);

// STEP 5
// Action: same call as STEP 2 with a different key and an immediate subscription.
// Expected effect: the method is invoked and a new item is emitted (also cached).
// Verified by: invocations counter and different objects references between STEPS 2 and 3 emitted items.
String emittedItem3 = cachedService.cacheResult1("another-key").await().indefinitely();
assertEquals(2, cachedService.getCacheResultInvocations());
assertNotSame(emittedItem2, emittedItem3);
}

@Test
void testCacheInvalidate() {
// First, let's put some data into the caches.
String value1 = cachedService.cacheResult1(KEY_1).await().indefinitely();
Object value2 = cachedService.cacheResult2(KEY_1).await().indefinitely();
Object value3 = cachedService.cacheResult2(KEY_2).await().indefinitely();

// We will invalidate some data (only KEY_1) in all caches later.
Uni<Void> invalidateUni = cachedService.cacheInvalidate(KEY_1);
// For now, the method that will invalidate the data should not be invoked, as Uni is lazy.
assertEquals(0, cachedService.getCacheInvalidateInvocations());

// The data should still be cached at this point.
String value4 = cachedService.cacheResult1(KEY_1).await().indefinitely();
Object value5 = cachedService.cacheResult2(KEY_1).await().indefinitely();
Object value6 = cachedService.cacheResult2(KEY_2).await().indefinitely();
assertSame(value1, value4);
assertSame(value2, value5);
assertSame(value3, value6);

// It's time to perform the data invalidation.
invalidateUni.await().indefinitely();
// The method annotated with @CacheInvalidate should have been invoked now.
assertEquals(1, cachedService.getCacheInvalidateInvocations());

// Let's call the methods annotated with @CacheResult again.
String value7 = cachedService.cacheResult1(KEY_1).await().indefinitely();
Object value8 = cachedService.cacheResult2(KEY_1).await().indefinitely();
Object value9 = cachedService.cacheResult2(KEY_2).await().indefinitely();

// The objects references should be different for the invalidated key.
assertNotSame(value4, value7);
assertNotSame(value5, value8);
// The object reference should remain unchanged for the key that was not invalidated.
assertSame(value6, value9);
}

@Test
void testCacheInvalidateAll() {
// First, let's put some data into the caches.
String value1 = cachedService.cacheResult1(KEY_1).await().indefinitely();
Object value2 = cachedService.cacheResult2(KEY_2).await().indefinitely();

// We will invalidate all the data in all caches later.
Uni<Void> invalidateAllUni = cachedService.cacheInvalidateAll();
// For now, the method that will invalidate the data should not be invoked, as Uni is lazy.
assertEquals(0, cachedService.getCacheInvalidateAllInvocations());

// The data should still be cached at this point.
String value3 = cachedService.cacheResult1(KEY_1).await().indefinitely();
Object value4 = cachedService.cacheResult2(KEY_2).await().indefinitely();
assertSame(value1, value3);
assertSame(value2, value4);

// It's time to perform the data invalidation.
invalidateAllUni.await().indefinitely();
// The method annotated with @CacheInvalidateAll should have been invoked now.
assertEquals(1, cachedService.getCacheInvalidateAllInvocations());

// Let's call the methods annotated with @CacheResult again.
String value5 = cachedService.cacheResult1(KEY_1).await().indefinitely();
Object value6 = cachedService.cacheResult2(KEY_2).await().indefinitely();

// All objects references should be different.
assertNotSame(value1, value5);
assertNotSame(value2, value6);
}

@ApplicationScoped
static class CachedService {

private volatile int cacheResultInvocations;
private volatile int cacheInvalidateInvocations;
private volatile int cacheInvalidateAllInvocations;

@CacheResult(cacheName = CACHE_NAME_1)
public Uni<String> cacheResult1(String key) {
cacheResultInvocations++;
return Uni.createFrom().item(() -> new String());
}

@CacheResult(cacheName = CACHE_NAME_2)
public Uni<Object> cacheResult2(String key) {
return Uni.createFrom().item(() -> new Object());
}

@CacheInvalidate(cacheName = CACHE_NAME_1)
@CacheInvalidate(cacheName = CACHE_NAME_2)
public Uni<Void> cacheInvalidate(String key) {
cacheInvalidateInvocations++;
return Uni.createFrom().nullItem();
}

@CacheInvalidateAll(cacheName = CACHE_NAME_1)
@CacheInvalidateAll(cacheName = CACHE_NAME_2)
public Uni<Void> cacheInvalidateAll() {
cacheInvalidateAllInvocations++;
return Uni.createFrom().nullItem();
}

public int getCacheResultInvocations() {
return cacheResultInvocations;
}

public int getCacheInvalidateInvocations() {
return cacheInvalidateInvocations;
}

public int getCacheInvalidateAllInvocations() {
return cacheInvalidateAllInvocations;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.quarkus.cache.CacheKey;
import io.quarkus.cache.CacheManager;
import io.quarkus.cache.CompositeCacheKey;
import io.smallrye.mutiny.Uni;

public abstract class CacheInterceptor {

Expand Down Expand Up @@ -131,4 +132,8 @@ protected Object getCacheKey(Cache cache, List<Short> cacheKeyParameterPositions
return new CompositeCacheKey(methodParameterValues);
}
}

protected static boolean isUniReturnType(InvocationContext invocationContext) {
return Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.cache.runtime;

import java.util.function.Function;

import javax.annotation.Priority;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
Expand All @@ -8,7 +10,10 @@
import org.jboss.logging.Logger;

import io.quarkus.cache.Cache;
import io.quarkus.cache.CacheException;
import io.quarkus.cache.CacheInvalidateAll;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

@CacheInvalidateAll(cacheName = "") // The `cacheName` attribute is @Nonbinding.
@Interceptor
Expand All @@ -25,15 +30,49 @@ public Object intercept(InvocationContext invocationContext) throws Exception {
if (interceptionContext.getInterceptorBindings().isEmpty()) {
// This should never happen.
LOGGER.warn(INTERCEPTOR_BINDINGS_ERROR_MSG);
return invocationContext.proceed();
} else if (isUniReturnType(invocationContext)) {
return invalidateAllNonBlocking(invocationContext, interceptionContext);
} else {
for (CacheInvalidateAll binding : interceptionContext.getInterceptorBindings()) {
Cache cache = cacheManager.getCache(binding.cacheName()).get();
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("Invalidating all entries from cache [%s]", binding.cacheName());
}
cache.invalidateAll().await().indefinitely();
}
return invalidateAllBlocking(invocationContext, interceptionContext);
}
}

private Object invalidateAllNonBlocking(InvocationContext invocationContext,
CacheInterceptionContext<CacheInvalidateAll> interceptionContext) {
return Multi.createFrom().iterable(interceptionContext.getInterceptorBindings())
.onItem().transformToUniAndMerge(new Function<CacheInvalidateAll, Uni<? extends Void>>() {
@Override
public Uni<Void> apply(CacheInvalidateAll binding) {
return invalidateAll(binding);
}
})
.onItem().ignoreAsUni()
.onItem().transformToUni(new Function<Object, Uni<?>>() {
@Override
public Uni<?> apply(Object ignored) {
try {
return (Uni<Object>) invocationContext.proceed();
} catch (Exception e) {
throw new CacheException(e);
}
}
});
}

private Object invalidateAllBlocking(InvocationContext invocationContext,
CacheInterceptionContext<CacheInvalidateAll> interceptionContext) throws Exception {
for (CacheInvalidateAll binding : interceptionContext.getInterceptorBindings()) {
invalidateAll(binding).await().indefinitely();
}
return invocationContext.proceed();
}

private Uni<Void> invalidateAll(CacheInvalidateAll binding) {
Cache cache = cacheManager.getCache(binding.cacheName()).get();
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("Invalidating all entries from cache [%s]", binding.cacheName());
}
return cache.invalidateAll();
}
}
Loading

0 comments on commit 0bc0c69

Please sign in to comment.