Skip to content

Commit

Permalink
Merge pull request #13244 from gwenneg/issue-13158-cache-with-context…
Browse files Browse the repository at this point in the history
…-propagation-deadlock

Compute cache value on calling thread to fix thread starvation
  • Loading branch information
gsmet authored Nov 13, 2020
2 parents 3933e9d + 72ea4e8 commit f163e37
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 138 deletions.
11 changes: 0 additions & 11 deletions docs/src/main/asciidoc/cache.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,6 @@ quarkus.cache.caffeine."bar".maximum-size=1000 <2>
<1> The `foo` cache is being configured.
<2> The `bar` cache is being configured.

== Context propagation

This extension relies on non-blocking calls internally for cache values computations.
By default, there's no context propagation between the calling thread (from your application) and a thread that performs such a computation.

The context propagation can be enabled for this extension by simply adding the `quarkus-smallrye-context-propagation` extension to your project.

If you see a `javax.enterprise.context.ContextNotActiveException` in your application log during a cache computation, then you probably need to enable the context propagation.

You can find more information about context propagation in Quarkus in the link:context-propagation[dedicated guide].

== Annotated beans examples

=== Implicit simple cache key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import static io.quarkus.cache.deployment.CacheDeploymentConstants.API_METHODS_ANNOTATIONS;
import static io.quarkus.cache.deployment.CacheDeploymentConstants.API_METHODS_ANNOTATIONS_LISTS;
import static io.quarkus.cache.deployment.CacheDeploymentConstants.CACHE_NAME_PARAM;
import static io.quarkus.deployment.annotations.ExecutionTime.RUNTIME_INIT;
import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;
import static org.jboss.jandex.AnnotationTarget.Kind.METHOD;

import java.util.ArrayList;
Expand Down Expand Up @@ -76,7 +76,7 @@ ValidationErrorBuildItem validateBeanDeployment(ValidationPhaseBuildItem validat
}

@BuildStep(onlyIf = CacheEnabled.class)
@Record(RUNTIME_INIT)
@Record(STATIC_INIT)
void recordCachesBuild(CombinedIndexBuildItem combinedIndex, BeanContainerBuildItem beanContainer, CacheConfig config,
CaffeineCacheBuildRecorder caffeineRecorder,
List<AdditionalCacheNameBuildItem> additionalCacheNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void testDefaultKey() {
// We need a CaffeineCache instance to test the default key logic.
CaffeineCacheInfo cacheInfo = new CaffeineCacheInfo();
cacheInfo.name = "test-cache";
CaffeineCache cache = new CaffeineCache(cacheInfo, null);
CaffeineCache cache = new CaffeineCache(cacheInfo);

DefaultCacheKey expectedKey = new DefaultCacheKey(cache.getName());
Object actualKey = getCacheKey(cache, new short[] {}, new Object[] {});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package io.quarkus.cache.runtime;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.function.Function;

import javax.annotation.Priority;
import javax.interceptor.AroundInvoke;
Expand Down Expand Up @@ -34,90 +31,40 @@ public Object intercept(InvocationContext context) throws Exception {
LOGGER.debugf("Loading entry with key [%s] from cache [%s]", key, cache.getName());
}

if (binding.lockTimeout() <= 0) {
CompletableFuture<Object> cacheValue = cache.get(key,
new BiFunction<Object, Executor, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object k, Executor executor) {
return getValueLoader(context, executor);
}
});
try {
return cacheValue.get();
} catch (ExecutionException e) {
throw getExceptionToThrow(e);
}
} else {

// The lock timeout logic starts here.

/*
* If the current key is not already associated with a value in the Caffeine cache, there's no way to know if the
* current thread or another one started the missing value computation. The following variable will be used to
* determine whether or not a timeout should be triggered during the computation depending on which thread started
* it.
*/
boolean[] isCurrentThreadComputation = { false };
try {

CompletableFuture<Object> cacheValue = cache.get(key,
new BiFunction<Object, Executor, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object k, Executor executor) {
isCurrentThreadComputation[0] = true;
return getValueLoader(context, executor);
}
});

if (isCurrentThreadComputation[0]) {
// The value is missing and its computation was started from the current thread.
// We'll wait for the result no matter how long it takes.
try {
return cacheValue.get();
} catch (ExecutionException e) {
throw getExceptionToThrow(e);
CompletableFuture<Object> cacheValue = cache.get(key, new Function<Object, Object>() {
@Override
public Object apply(Object k) {
try {
return context.proceed();
} catch (Exception e) {
throw new CacheException(e);
}
}
});

if (binding.lockTimeout() <= 0) {
return cacheValue.get();
} else {
// The value is either already present in the cache or missing and its computation was started from another thread.
// We want to retrieve it from the cache within the lock timeout delay.
try {
/*
* If the current thread started the cache value computation, then the computation is already finished since
* it was done synchronously and the following call will never time out.
*/
return cacheValue.get(binding.lockTimeout(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw getExceptionToThrow(e);
} catch (TimeoutException e) {
// Timeout triggered! We don't want to wait any longer for the value computation and we'll simply invoke the
// cached method and return its result without caching it.
// TODO: Add statistics here to monitor the timeout.
return context.proceed();
}
}
}
}

private CompletableFuture<Object> getValueLoader(InvocationContext context, Executor executor) {
return CompletableFuture.supplyAsync(new Supplier<Object>() {
@Override
public Object get() {
try {
return context.proceed();
} catch (Exception e) {
throw new CacheException(e);
}
} catch (CacheException e) {
if (e.getCause() instanceof Exception) {
throw (Exception) e.getCause();
} else {
throw e;
}
}, executor);
}

private Exception getExceptionToThrow(ExecutionException e) {
if (e.getCause() instanceof CacheException && e.getCause().getCause() instanceof Exception) {
return (Exception) e.getCause().getCause();
} else {
/*
* If:
* - the cause is not a CacheException
* - the cause is a CacheException which doesn't have a cause itself
* - the cause is a CacheException which was caused itself by an Error
* ... then we'll throw the original ExecutionException.
*/
return e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;

import com.github.benmanes.caffeine.cache.AsyncCache;
Expand Down Expand Up @@ -31,12 +29,9 @@ public class CaffeineCache {

private Object defaultKey;

public CaffeineCache(CaffeineCacheInfo cacheInfo, Executor executor) {
public CaffeineCache(CaffeineCacheInfo cacheInfo) {
this.name = cacheInfo.name;
Caffeine<Object, Object> builder = Caffeine.newBuilder();
if (executor != null) {
builder.executor(executor);
}
if (cacheInfo.initialCapacity != null) {
this.initialCapacity = cacheInfo.initialCapacity;
builder.initialCapacity(cacheInfo.initialCapacity);
Expand All @@ -56,47 +51,36 @@ public CaffeineCache(CaffeineCacheInfo cacheInfo, Executor executor) {
cache = builder.buildAsync();
}

public CompletableFuture<Object> get(Object key, BiFunction<Object, Executor, CompletableFuture<Object>> valueLoader) {
/**
* Returns a {@link CompletableFuture} holding the cache value identified by {@code key}, obtaining that value from
* {@code valueLoader} if necessary. The value computation is done synchronously on the calling thread and the
* {@link CompletableFuture} is immediately completed before being returned.
*
* @param key cache key
* @param valueLoader function used to compute the cache value if {@code key} is not already associated with a value
* @return a {@link CompletableFuture} holding the cache value
* @throws CacheException if an exception is thrown during the cache value computation
*/
public CompletableFuture<Object> get(Object key, Function<Object, Object> valueLoader) {
if (key == null) {
throw new NullPointerException(NULL_KEYS_NOT_SUPPORTED_MSG);
}
CompletableFuture<Object> cacheValue = cache.get(key, new BiFunction<Object, Executor, CompletableFuture<Object>>() {
@Override
public CompletableFuture<Object> apply(Object k, Executor executor) {
return valueLoader.apply(k, executor).exceptionally(new Function<Throwable, Object>() {
@Override
public Object apply(Throwable cause) {
// This is required to prevent Caffeine from logging unwanted warnings.
return new CaffeineComputationThrowable(cause);
}
}).thenApply(new Function<Object, Object>() {
@Override
public Object apply(Object value) {
return NullValueConverter.toCacheValue(value);
}
});
}
});
CompletableFuture<Object> newCacheValue = new CompletableFuture<Object>();
CompletableFuture<Object> existingCacheValue = cache.asMap().putIfAbsent(key, newCacheValue);
if (existingCacheValue == null) {
Object value = valueLoader.apply(key);
newCacheValue.complete(NullValueConverter.toCacheValue(value));
return unwrapCacheValue(newCacheValue);
} else {
return unwrapCacheValue(existingCacheValue);
}
}

private CompletableFuture<Object> unwrapCacheValue(CompletableFuture<Object> cacheValue) {
return cacheValue.thenApply(new Function<Object, Object>() {
@Override
@SuppressWarnings("finally")
public Object apply(Object value) {
// If there's a throwable encapsulated into a CaffeineComputationThrowable, it must be rethrown.
if (value instanceof CaffeineComputationThrowable) {
try {
// The cache entry needs to be removed from Caffeine explicitly (this would usually happen automatically).
cache.asMap().remove(key, cacheValue);
} finally {
Throwable cause = ((CaffeineComputationThrowable) value).getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new CacheException(cause);
}
}
} else {
return NullValueConverter.fromCacheValue(value);
}
return NullValueConverter.fromCacheValue(value);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
import java.util.Map;
import java.util.Set;

import org.eclipse.microprofile.context.ManagedExecutor;
import org.jboss.logging.Logger;

import io.quarkus.arc.Arc;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.cache.runtime.CacheRepository;
import io.quarkus.runtime.annotations.Recorder;
Expand All @@ -17,21 +15,18 @@ public class CaffeineCacheBuildRecorder {

private static final Logger LOGGER = Logger.getLogger(CaffeineCacheBuildRecorder.class);

public void buildCaches(BeanContainer beanContainer,
Set<CaffeineCacheInfo> cacheInfos) {
public void buildCaches(BeanContainer beanContainer, Set<CaffeineCacheInfo> cacheInfos) {
// The number of caches is known at build time so we can use fixed initialCapacity and loadFactor for the caches map.
Map<String, CaffeineCache> caches = new HashMap<>(cacheInfos.size() + 1, 1.0F);

ManagedExecutor managedExecutor = Arc.container().instance(ManagedExecutor.class).orElse(null);

for (CaffeineCacheInfo cacheInfo : cacheInfos) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf(
"Building Caffeine cache [%s] with [initialCapacity=%s], [maximumSize=%s], [expireAfterWrite=%s] and [expireAfterAccess=%s]",
cacheInfo.name, cacheInfo.initialCapacity, cacheInfo.maximumSize, cacheInfo.expireAfterWrite,
cacheInfo.expireAfterAccess);
}
CaffeineCache cache = new CaffeineCache(cacheInfo, managedExecutor);
CaffeineCache cache = new CaffeineCache(cacheInfo);
caches.put(cacheInfo.name, cache);
}

Expand Down

0 comments on commit f163e37

Please sign in to comment.