Skip to content

Commit

Permalink
Protect reference caches from a discarding executor
Browse files Browse the repository at this point in the history
The maintenance task is scheduled on an executor and disables further
scheduling until the submitted task runs. If the task is not run when
expected this could cause a memory leak or block writers due to
backpressure. This is detected when the write buffer is full, which
causes writers to provide assistance by barging the eviction lock and
performing the maintenance work themselves.

The write buffer was only enabled for the size or expiration policies
as their data structures require replaying activity. A weak or soft
reference cache disabled this as its data structures are managed by
the JVM. Instead that policy leveraged the read buffer as a cheap way
to trigger periodic maintenance which polls the Java ReferenceQueue.
This assumed that there were no failure scenarios that could halt
scheduling.

Unfortunately this was overly optimistic. If the executor silently
discards tasks then an out-of-memory error could occur because the
cache would not recover and restart eviction. This could happen if
using a discarding policy (such as the RejectedExecutionHandler
implementations offered by ThreadPoolExecutor, by ExecutorService's
shutdownNow(), or a bug in the executor. ForkJoinPool suffered from
missed submissions in JDK-8078490 (fixed in jdk 8u60) and recently
discovered in jdk 17 (see newrelic/newrelic-java-agent#558).

The cache should now recover and continue to evict in cases where
the maintenance task is not run promptly. Note that an executor
that discards tasks is inherently broken and can cause asynchronous
loads to never complete (such as refreshAfterWrite or AsyncCache).
In those cases it is left to the user to recover, for example by
using CompletableFuture.orTimeout(duration) with the computation.
  • Loading branch information
ben-manes committed Nov 21, 2021
1 parent 7d0d903 commit f143764
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@ public static boolean usesAccessOrderMainDeque(Set<Feature> features) {
|| features.contains(Feature.MAXIMUM_WEIGHT);
}

public static boolean usesWriteQueue(Set<Feature> features) {
return features.contains(Feature.MAXIMUM_SIZE)
|| features.contains(Feature.MAXIMUM_WEIGHT)
|| features.contains(Feature.EXPIRE_ACCESS)
|| features.contains(Feature.EXPIRE_WRITE)
|| features.contains(Feature.REFRESH_WRITE);
}

public static boolean useWriteTime(Set<Feature> features) {
return features.contains(Feature.EXPIRE_WRITE)
|| features.contains(Feature.REFRESH_WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import com.github.benmanes.caffeine.cache.local.AddRemovalListener;
import com.github.benmanes.caffeine.cache.local.AddStats;
import com.github.benmanes.caffeine.cache.local.AddSubtype;
import com.github.benmanes.caffeine.cache.local.AddWriteBuffer;
import com.github.benmanes.caffeine.cache.local.Finalize;
import com.github.benmanes.caffeine.cache.local.LocalCacheContext;
import com.github.benmanes.caffeine.cache.local.LocalCacheRule;
Expand Down Expand Up @@ -102,7 +101,7 @@ public final class LocalCacheFactoryGenerator {
new AddKeyValueStrength(), new AddRemovalListener(), new AddStats(),
new AddExpirationTicker(), new AddMaximum(), new AddFastPath(), new AddDeques(),
new AddExpireAfterAccess(), new AddExpireAfterWrite(), new AddRefreshAfterWrite(),
new AddWriteBuffer(), new AddPacer(), new Finalize());
new AddPacer(), new Finalize());
final ZoneId timeZone = ZoneId.of("America/Los_Angeles");
final Path directory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ public final class Specifications {
public static final TypeName WRITE_ORDER_DEQUE =
ParameterizedTypeName.get(ClassName.get(PACKAGE_NAME, "WriteOrderDeque"), NODE);

public static final ClassName WRITE_QUEUE_TYPE =
ClassName.get(PACKAGE_NAME, "MpscGrowableArrayQueue");
public static final TypeName WRITE_QUEUE =
ParameterizedTypeName.get(WRITE_QUEUE_TYPE, ClassName.get(Runnable.class));

public static final TypeName EXPIRY = ParameterizedTypeName.get(
ClassName.get(PACKAGE_NAME, "Expiry"), kTypeVar, vTypeVar);
public static final TypeName TIMER_WHEEL = ParameterizedTypeName.get(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
final @Nullable RemovalListener<K, V> evictionListener;
final @Nullable CacheLoader<K, V> cacheLoader;

final MpscGrowableArrayQueue<Runnable> writeBuffer;
final ConcurrentHashMap<Object, Node<K, V>> data;
final PerformCleanupTask drainBuffersTask;
final Consumer<Node<K, V>> accessPolicy;
Expand Down Expand Up @@ -245,6 +246,7 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
? new BoundedBuffer<>()
: Buffer.disabled();
accessPolicy = (evicts() || expiresAfterAccess()) ? this::onAccess : e -> {};
writeBuffer = new MpscGrowableArrayQueue<>(WRITE_BUFFER_MIN, WRITE_BUFFER_MAX);

if (evicts()) {
setMaximumSize(builder.getMaximum());
Expand Down Expand Up @@ -292,15 +294,6 @@ protected WriteOrderDeque<Node<K, V>> writeOrderDeque() {
throw new UnsupportedOperationException();
}

/** If the page replacement policy buffers writes. */
protected boolean buffersWrites() {
return false;
}

protected MpscGrowableArrayQueue<Runnable> writeBuffer() {
throw new UnsupportedOperationException();
}

@Override
public final Executor executor() {
return executor;
Expand Down Expand Up @@ -1454,25 +1447,23 @@ void setAccessTime(Node<K, V> node, long now) {
* @param task the pending operation to be applied
*/
void afterWrite(Runnable task) {
if (buffersWrites()) {
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer().offer(task)) {
scheduleAfterWrite();
return;
}
scheduleDrainBuffers();
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer.offer(task)) {
scheduleAfterWrite();
return;
}
scheduleDrainBuffers();
}

// The maintenance task may be scheduled but not running due to all of the executor's threads
// being busy. If all of the threads are writing into the cache then no progress can be made
// without assistance.
try {
performCleanUp(task);
} catch (RuntimeException e) {
logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);
}
} else {
scheduleAfterWrite();
// The maintenance task may be scheduled but not running due. This might occur due to all of the
// executor's threads being busy (perhaps writing into this cache), the write rate greatly
// exceeds the consuming rate, priority inversion, or if the executor silently discarded the
// maintenance task. In these scenarios then the writing threads cannot make progress and
// instead writers provide assistance by performing this work directly.
try {
performCleanUp(task);
} catch (RuntimeException e) {
logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);
}
}

Expand Down Expand Up @@ -1686,12 +1677,8 @@ static <K, V> void reorder(LinkedDeque<Node<K, V>> deque, Node<K, V> node) {
/** Drains the write buffer. */
@GuardedBy("evictionLock")
void drainWriteBuffer() {
if (!buffersWrites()) {
return;
}

for (int i = 0; i < WRITE_BUFFER_MAX; i++) {
Runnable task = writeBuffer().poll();
for (int i = 0; i <= WRITE_BUFFER_MAX; i++) {
Runnable task = writeBuffer.poll();
if (task == null) {
return;
}
Expand Down Expand Up @@ -1905,7 +1892,7 @@ public void clear() {

// Apply all pending writes
Runnable task;
while (buffersWrites() && (task = writeBuffer().poll()) != null) {
while ((task = writeBuffer.poll()) != null) {
task.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -287,8 +286,8 @@ int getInitialCapacity() {
* with {@link #removalListener} or utilize asynchronous computations. A test may instead prefer
* to configure the cache to execute tasks directly on the same thread.
* <p>
* Beware that configuring a cache with an executor that throws {@link RejectedExecutionException}
* may experience non-deterministic behavior.
* Beware that configuring a cache with an executor that discards tasks and never runs them may
* experience non-deterministic behavior.
*
* @param executor the executor to use for asynchronous execution
* @return this {@code Caffeine} instance (for chaining)
Expand Down
Loading

0 comments on commit f143764

Please sign in to comment.