Skip to content

Commit

Permalink
Detect JDK-8274349 and fallback to a same-thread executor
Browse files Browse the repository at this point in the history
In certain scenarios the common pool is broken and will never execute
a subitted task. This causes behavior like async caches, evictions,
removal notifications, and refresh after write to not function. The
application's own usage will not work either so this only fails less.

Affected users are encouraged to use one of the following workarounds:
- set the property `java.util.concurrent.ForkJoinPool.common.parallelism`
  to 1
- do not use `-XX:ActiveProcessorCount=1` as a startup flag
- do not use a container with 1 cpu

For more details see:
  ForkJoinPool.commonPool() does not work with 1 CPU
  https://bugs.openjdk.java.net/browse/JDK-8274349
  • Loading branch information
ben-manes committed Nov 28, 2021
1 parent 99542c2 commit 0fd820f
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 21 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ jobs:
:guava:test
:jcache:test
:simulator:test
:caffeine:junitTests
:caffeine:junitTest
:caffeine:slowCaffeineTest
no_output_timeout: 1h
- run:
name: Run isolated tests
command: ./gradlew :caffeine:isolatedTests
command: ./gradlew :caffeine:isolatedTest

workflows:
version: 2
Expand Down
2 changes: 1 addition & 1 deletion .github/scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
set -eux

./gradlew check
./gradlew :caffeine:isolatedTests
./gradlew :caffeine:isolatedTest
./gradlew :caffeine:slowGuavaTest
./gradlew :caffeine:slowCaffeineTest
9 changes: 9 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ jobs:
restore-keys: ${{ runner.os }}-gradle
- name: Run tests
run: ./.github/scripts/test.sh
- name: Set up JDK 17.0.1
if: matrix.java == env.MAX_JVM
uses: actions/setup-java@v2
with:
distribution: 'zulu'
java-version: 17.0.1
- name: Run JDK-8274349 tests
if: matrix.java == env.MAX_JVM
run: ./gradlew jdk8274349Test
- name: Publish Coverage
if: >
matrix.java == env.MIN_JVM
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lincheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ jobs:
- name: Run lincheck
run: |
set -eux
./gradlew lincheckTests
./gradlew lincheckTest
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ int getInitialCapacity() {
* with {@link #removalListener} or utilize asynchronous computations. A test may instead prefer
* to configure the cache to execute tasks directly on the same thread.
* <p>
* Beware that configuring a cache with an executor that discards tasks and never runs them may
* Beware that configuring a cache with an executor that discards tasks or never runs them may
* experience non-deterministic behavior.
*
* @param executor the executor to use for asynchronous execution
Expand All @@ -300,7 +300,14 @@ public Caffeine<K, V> executor(Executor executor) {
}

Executor getExecutor() {
return (executor == null) ? ForkJoinPool.commonPool() : executor;
if ((executor != null)
&& !((executor instanceof ForkJoinPool) && (executor == ForkJoinPool.commonPool()))) {
return executor;
} else if ((ForkJoinPool.getCommonPoolParallelism() == 1) && (Runtime.version().feature() == 17)
&& (Runtime.version().interim() == 0) && (Runtime.version().update() <= 1)) {
return Runnable::run; // JDK-8274349
}
return ForkJoinPool.commonPool();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public void rescheduleDrainBuffers() {
}
};
var cache = asBoundedLocalCache(Caffeine.newBuilder()
.executor(CacheExecutor.THREADED.create())
.evictionListener(evictionListener)
.maximumSize(0)
.build());
Expand All @@ -169,7 +170,7 @@ public void rescheduleDrainBuffers() {
assertThat(cache.drainStatus).isEqualTo(PROCESSING_TO_REQUIRED);

done.set(true);
await().untilAsserted(() -> assertThat(cache.drainStatus).isEqualTo(IDLE));
await().untilAsserted(() -> assertThat(cache.drainStatus).isAnyOf(REQUIRED, IDLE));
}

@Test(dataProvider = "caches")
Expand Down Expand Up @@ -373,9 +374,9 @@ public void evict_update() {
(k, v, cause) -> removedValues.accumulateAndGet(v, Int::add);

var cache = Caffeine.newBuilder()
.executor(CacheExecutor.DIRECT.create())
.evictionListener(evictionListener)
.removalListener(removalListener)
.executor(Runnable::run)
.maximumSize(100)
.build();
var localCache = asBoundedLocalCache(cache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.mockito.Mockito.verify;

import java.lang.Runtime.Version;
import java.time.Duration;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand All @@ -31,6 +33,7 @@
import org.testng.annotations.Test;

import com.github.benmanes.caffeine.cache.stats.StatsCounter;
import com.google.common.collect.Range;
import com.google.common.testing.FakeTicker;

/**
Expand Down Expand Up @@ -621,6 +624,17 @@ public void executor() {
builder.build();
}

@Test(groups = "jdk8274349")
public void executor_jdk8274349() throws ReflectiveOperationException {
// This test requires either JDK 17 or 17.0.1 using a single cpu (-XX:ActiveProcessorCount=1)
assertThat(Runtime.version()).isIn(Range.closedOpen(
Version.parse("17"), Version.parse("17.0.2")));
assertThat(Runtime.getRuntime().availableProcessors()).isEqualTo(1);

// JDK-8274349: ForkJoinPool.commonPool() does not work with 1 cpu
assertThat(Caffeine.newBuilder().getExecutor()).isNotSameInstanceAs(ForkJoinPool.commonPool());
}

/* --------------- ticker --------------- */

@Test(expectedExceptions = NullPointerException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.common.testing.FakeTicker;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.testing.TestingExecutors;

/**
* Issue #193: Invalidate before Refresh completes still stores value
Expand Down Expand Up @@ -75,8 +74,8 @@ public void invalidateDuringRefreshRemovalCheck() throws Exception {
var removed = new ArrayList<Long>();
AsyncLoadingCache<String, Long> cache = Caffeine.newBuilder()
.removalListener((String key, Long value, RemovalCause reason) -> removed.add(value))
.executor(TestingExecutors.sameThreadScheduledExecutor())
.refreshAfterWrite(10, TimeUnit.NANOSECONDS)
.executor(Runnable::run)
.ticker(ticker::read)
.buildAsync(loader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.Policy.VarExpiration;
import com.github.benmanes.caffeine.testing.ConcurrentTestHarness;

/**
* Issue #298: Stale data when using Expiry
Expand Down Expand Up @@ -116,6 +117,7 @@ public void readDuringCreate() {

private AsyncLoadingCache<String, String> makeAsyncCache() {
return Caffeine.newBuilder()
.executor(ConcurrentTestHarness.executor)
.expireAfter(new Expiry<String, String>() {
@Override public long expireAfterCreate(@Nonnull String key,
@Nonnull String value, long currentTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.testing.ConcurrentTestHarness;

/**
* Issue #568: Incorrect handling of weak/soft reference caching.
Expand All @@ -41,7 +42,10 @@ public class Issue568Test {
*/
@Test
public void intermittentNull() throws InterruptedException {
Cache<String, String> cache = Caffeine.newBuilder().weakValues().build();
Cache<String, String> cache = Caffeine.newBuilder()
.executor(ConcurrentTestHarness.executor)
.weakValues()
.build();

String key = "key";
String val = "val";
Expand Down Expand Up @@ -87,6 +91,7 @@ public void resurrect() throws InterruptedException {
var error = new AtomicReference<RuntimeException>();
Cache<String, Object> cache = Caffeine.newBuilder()
.weakValues()
.executor(ConcurrentTestHarness.executor)
.removalListener((k, v, cause) -> {
if (cause == RemovalCause.COLLECTED && (v != null)) {
error.compareAndSet(null, new IllegalStateException("Evicted a live value: " + v));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void eviction() throws Exception {
};

Cache<Long, Val> cache = Caffeine.newBuilder()
.executor(ConcurrentTestHarness.executor)
.removalListener(listener)
.maximumSize(maxEntries)
.build();
Expand Down
19 changes: 14 additions & 5 deletions caffeine/testing.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ testNames.each { testName ->
}
}

tasks.register('isolatedTests', Test) {
tasks.register('isolatedTest', Test) {
group = 'Cache tests'
description = 'Tests that must be run in isolation'
useTestNG()
}

tasks.register('lincheckTests', Test) {
tasks.register('lincheckTest', Test) {
group = 'Cache tests'
description = 'Tests that assert linearizability'
useTestNG()
}

tasks.register('junitTests', Test) {
tasks.register('junitTest', Test) {
group = 'Cache tests'
description = 'JUnit tests'
useJUnit()
Expand All @@ -74,6 +74,12 @@ tasks.register('junitTests', Test) {
systemProperty 'caffeine.osgi.jar', project(':caffeine').jar.archivePath.path
}

tasks.register('jdk8274349Test', Test) {
group = 'Cache tests'
description = 'JDK-8274349 tests'
useTestNG()
}

tasks.withType(Test).configureEach {
if (options instanceof TestNGOptions) {
if (name.startsWith('slow')) {
Expand All @@ -91,13 +97,16 @@ tasks.withType(Test).configureEach {
testLogging.events 'started'
maxHeapSize = '2g'
failFast = true
} else if (name.startsWith('jdk8274349')) {
options.includeGroups = ['jdk8274349']
jvmArgs '-XX:ActiveProcessorCount=1'
} else {
options {
threadCount = Math.max(6, Runtime.getRuntime().availableProcessors() - 1)
jvmArgs '-XX:+UseG1GC', '-XX:+ParallelRefProcEnabled'
excludeGroups = ['slow', 'isolated', 'lincheck']
excludeGroups = ['slow', 'isolated', 'lincheck', 'jdk8274349']
parallel = 'methods'
}
jvmArgs '-XX:+UseG1GC', '-XX:+ParallelRefProcEnabled'
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ ext {
jmhReport: '0.9.0',
nexusPublish: '1.1.0',
nullaway: '1.2.0',
pmd: '6.40.0',
pmd: '6.41.0',
semanticVersioning: '1.1.0',
shadow: '7.1.0',
sonarqube: '3.3',
Expand Down
4 changes: 2 additions & 2 deletions guava/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ tasks.named('test').configure {
}
}

def osgiTests = tasks.register('osgiTests', Test) {
def osgiTest = tasks.register('osgiTest', Test) {
group = 'Cache tests'
description = 'Isolated OSGi tests'

Expand All @@ -50,7 +50,7 @@ def osgiTests = tasks.register('osgiTests', Test) {
}
}
tasks.named('test').configure {
dependsOn(osgiTests)
dependsOn(osgiTest)
}

tasks.withType(Test) {
Expand Down
4 changes: 2 additions & 2 deletions jcache/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def testCompatibilityKit = tasks.register('testCompatibilityKit', Test) {
systemProperty 'javax.management.builder.initial', "${pkg}.management.JCacheMBeanServerBuilder"
}

def osgiTests = tasks.register('osgiTests', Test) {
def osgiTest = tasks.register('osgiTest', Test) {
group = 'Build'
description = 'Isolated OSGi tests'
useJUnit()
Expand All @@ -112,5 +112,5 @@ def osgiTests = tasks.register('osgiTests', Test) {

tasks.named('test').configure {
dependsOn(testCompatibilityKit)
dependsOn(osgiTests)
dependsOn(osgiTest)
}

0 comments on commit 0fd820f

Please sign in to comment.