From 4ab8a326b13a06ad962dc1c8a2c90387f2db7882 Mon Sep 17 00:00:00 2001 From: Tobi Ajila Date: Thu, 22 Feb 2024 15:03:59 -0500 Subject: [PATCH] loom locking stuff Signed-off-by: Tobi Ajila --- .../classes/java/lang/VirtualThread.java | 78 +++- .../share/native/libjava/VirtualThread.c | 1 + .../lang/Thread/virtual/MonitorsTest.java | 408 ++++++++++++++++++ 3 files changed, 484 insertions(+), 3 deletions(-) create mode 100644 test/jdk/java/lang/Thread/virtual/MonitorsTest.java diff --git a/src/java.base/share/classes/java/lang/VirtualThread.java b/src/java.base/share/classes/java/lang/VirtualThread.java index 19635db8bbd..4df3b32670c 100644 --- a/src/java.base/share/classes/java/lang/VirtualThread.java +++ b/src/java.base/share/classes/java/lang/VirtualThread.java @@ -81,6 +81,7 @@ final class VirtualThread extends BaseVirtualThread { private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); + private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList"); // scheduler and continuation private final Executor scheduler; @@ -132,6 +133,9 @@ final class VirtualThread extends BaseVirtualThread { // Thread.yield private static final int YIELDING = 10; private static final int YIELDED = 11; // unmounted but runnable + // monitor enter + private static final int BLOCKING = 12; + private static final int BLOCKED = 13; private static final int TERMINATED = 99; // final state @@ -141,12 +145,20 @@ final class VirtualThread extends BaseVirtualThread { // parking permit private volatile boolean parkPermit; + + // unblocked + private volatile boolean unblocked; + // carrier thread when mounted, accessed by VM private volatile Thread carrierThread; // termination object when joining, created lazily if needed private volatile CountDownLatch termination; + // Next waiting vthread to unpark() + private VirtualThread next; + private byte onWaitingList; + /** * Returns the continuation scope used for virtual threads. */ @@ -499,6 +511,16 @@ private void afterYield() { return; } + // blocking on monitorenter + if (s == BLOCKING) { + setState(BLOCKED); + if (unblocked && compareAndSetState(BLOCKED, UNPARKED)) { + unblocked = false; + submitRunContinuation(); + } + return; + } + assert false; } @@ -772,6 +794,19 @@ void unpark() { } } + /** + * Re-enables this virtual thread for scheduling after blocking on monitor enter. + * @throws RejectedExecutionException if the scheduler cannot accept a task + */ + private void unblock() { + assert !Thread.currentThread().isVirtual(); + unblocked = true; + if (state() == BLOCKED && compareAndSetState(BLOCKED, UNPARKED)) { + unblocked = false; + submitRunContinuation(); + } + } + /** * Attempts to yield the current virtual thread (Thread.yield). */ @@ -924,6 +959,7 @@ Thread.State threadState() { } case UNPARKED: case YIELDED: + case BLOCKING: // runnable, not mounted return Thread.State.RUNNABLE; case RUNNING: @@ -952,6 +988,8 @@ Thread.State threadState() { case TIMED_PARKED: case TIMED_PINNED: return State.TIMED_WAITING; + case BLOCKED: + return State.BLOCKED; case TERMINATED: return Thread.State.TERMINATED; default: @@ -997,7 +1035,7 @@ private StackTraceElement[] tryGetStackTrace() { case RUNNING, PINNED, TIMED_PINNED -> { return null; // mounted } - case PARKED, TIMED_PARKED -> { + case PARKED, BLOCKED, TIMED_PARKED -> { // unmounted, not runnable } case UNPARKED, YIELDED -> { @@ -1028,9 +1066,9 @@ private StackTraceElement[] tryGetStackTrace() { // resubmit as task may have run while suspended yield true; } - case PARKED, TIMED_PARKED -> { + case PARKED, BLOCKED, TIMED_PARKED -> { // resubmit if unparked while suspended - yield parkPermit && compareAndSetState(initialState, UNPARKED); + yield (parkPermit || unblocked) && compareAndSetState(initialState, UNPARKED); } default -> throw new InternalError(); }; @@ -1119,6 +1157,16 @@ private void setState(int newValue) { state = newValue; // volatile write } + private VirtualThread next() { + return next; + } + + private void removeFromWaitingList() { + next = null; + boolean res = U.compareAndSetByte(this, ON_WAITING_LIST, (byte)0x01, (byte)0x00); + assert res; + } + private boolean compareAndSetState(int expectedValue, int newValue) { return U.compareAndSetInt(this, STATE, expectedValue, newValue); } @@ -1244,4 +1292,28 @@ private static int tracePinningMode() { } return 0; } + + /** + * Unblock virtual threads that are ready to scheduled again. + */ + private static void processPendingList() { + while (true) { + VirtualThread currentWaitingVThread = waitForPendingList(); + VirtualThread nextWaitingVThread = null; + while (currentWaitingVThread != null) { + nextWaitingVThread = currentWaitingVThread.next(); + currentWaitingVThread.removeFromWaitingList(); + currentWaitingVThread.unpark(); + currentWaitingVThread = nextWaitingVThread; + } + } + } + + private static native VirtualThread waitForPendingList(); + + static { + var unblocker = InnocuousThread.newThread("VirtualThread-unblocker", + VirtualThread::processPendingList); + unblocker.start(); + } } diff --git a/src/java.base/share/native/libjava/VirtualThread.c b/src/java.base/share/native/libjava/VirtualThread.c index 94dbe0b7e37..1ef29270ddd 100644 --- a/src/java.base/share/native/libjava/VirtualThread.c +++ b/src/java.base/share/native/libjava/VirtualThread.c @@ -38,6 +38,7 @@ static JNINativeMethod methods[] = { { "notifyJvmtiUnmount", "(Z)V", (void *)&JVM_VirtualThreadUnmount }, { "notifyJvmtiHideFrames", "(Z)V", (void *)&JVM_VirtualThreadHideFrames }, { "notifyJvmtiDisableSuspend", "(Z)V", (void *)&JVM_VirtualThreadDisableSuspend }, + { "waitForPendingList", "()" VIRTUAL_THREAD, (void *)&JVM_VirtualThreadWaitForPendingList }, }; JNIEXPORT void JNICALL diff --git a/test/jdk/java/lang/Thread/virtual/MonitorsTest.java b/test/jdk/java/lang/Thread/virtual/MonitorsTest.java new file mode 100644 index 00000000000..23391a3918c --- /dev/null +++ b/test/jdk/java/lang/Thread/virtual/MonitorsTest.java @@ -0,0 +1,408 @@ +/* + * Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/** + * @test + * @summary Test virtual threads using synchronized + * @library /test/lib + * @modules java.base/java.lang:+open + * + * @run junit/othervm/timeout=10 -Xint MonitorsTest + * @run junit/othervm/timeout=50 -Xcomp MonitorsTest + * @run junit/othervm/timeout=50 MonitorsTest + * @run junit/othervm/timeout=50 -XX:+FullGCALot -XX:FullGCALotInterval=1000 MonitorsTest + */ + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.*; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + +class MonitorsTest { + final int CARRIER_COUNT = 8; + ExecutorService scheduler = Executors.newFixedThreadPool(CARRIER_COUNT); + + static final Object globalLock = new Object(); + static volatile boolean finish = false; + static volatile int counter = 0; + + ///////////////////////////////////////////////////////////////////// + //////////////////////////// BASIC TESTS //////////////////////////// + ///////////////////////////////////////////////////////////////////// + + static final Runnable FOO = () -> { + Object lock = new Object(); + synchronized(lock) { + while(!finish) { + Thread.yield(); + } + } + System.out.println("Exiting FOO from thread " + Thread.currentThread().getName()); + }; + + static final Runnable BAR = () -> { + synchronized(globalLock) { + counter++; + } + System.out.println("Exiting BAR from thread " + Thread.currentThread().getName()); + }; + + /** + * Test yield while holding monitor. + */ + @Test + void testBasic() throws Exception { + final int VT_COUNT = CARRIER_COUNT; + + // Create first batch of VT threads. + Thread firstBatch[] = new Thread[VT_COUNT]; + for (int i = 0; i < VT_COUNT; i++) { + firstBatch[i] = ThreadBuilders.virtualThreadBuilder(scheduler).name("FirstBatchVT-" + i).start(FOO); + } + + // Give time for all threads to reach Thread.yield + Thread.sleep(1000); + + // Create second batch of VT threads. + Thread secondBatch[] = new Thread[VT_COUNT]; + for (int i = 0; i < VT_COUNT; i++) { + secondBatch[i] = ThreadBuilders.virtualThreadBuilder(scheduler).name("SecondBatchVT-" + i).start(BAR); + } + + while(counter != VT_COUNT) {} + + finish = true; + + for (int i = 0; i < VT_COUNT; i++) { + firstBatch[i].join(); + } + for (int i = 0; i < VT_COUNT; i++) { + secondBatch[i].join(); + } + } + + static final Runnable BAR2 = () -> { + synchronized(globalLock) { + counter++; + } + recursive2(10); + System.out.println("Exiting BAR2 from thread " + Thread.currentThread().getName() + "with counter=" + counter); + }; + + static void recursive2(int count) { + synchronized(Thread.currentThread()) { + if (count > 0) { + recursive2(count - 1); + } else { + synchronized(globalLock) { + counter++; + Thread.yield(); + } + } + } + } + + /** + * Test yield while holding monitor with recursive locking. + */ + @Test + void testRecursive() throws Exception { + final int VT_COUNT = CARRIER_COUNT; + counter = 0; + finish = false; + + // Create first batch of VT threads. + Thread firstBatch[] = new Thread[VT_COUNT]; + for (int i = 0; i < VT_COUNT; i++) { + firstBatch[i] = ThreadBuilders.virtualThreadBuilder(scheduler).name("FirstBatchVT-" + i).start(FOO); + } + + // Give time for all threads to reach Thread.yield + Thread.sleep(1000); + + // Create second batch of VT threads. + Thread secondBatch[] = new Thread[VT_COUNT]; + for (int i = 0; i < VT_COUNT; i++) { + secondBatch[i] = ThreadBuilders.virtualThreadBuilder(scheduler).name("SecondBatchVT-" + i).start(BAR2); + } + + while(counter != 2*VT_COUNT) {} + + finish = true; + + for (int i = 0; i < VT_COUNT; i++) { + firstBatch[i].join(); + } + for (int i = 0; i < VT_COUNT; i++) { + secondBatch[i].join(); + } + } + + static final Runnable FOO3 = () -> { + synchronized(globalLock) { + while(!finish) { + Thread.yield(); + } + } + System.out.println("Exiting FOO3 from thread " + Thread.currentThread().getName()); + }; + + /** + * Test contention on monitorenter. + */ + @Test + void testContention() throws Exception { + final int VT_COUNT = CARRIER_COUNT * 8; + counter = 0; + finish = false; + + // Create batch of VT threads. + Thread batch[] = new Thread[VT_COUNT]; + for (int i = 0; i < VT_COUNT; i++) { + batch[i] = ThreadBuilders.virtualThreadBuilder(scheduler).name("BatchVT-" + i).start(FOO3); + } + + // Give time for all threads to reach synchronized(globalLock) + Thread.sleep(2000); + + finish = true; + + for (int i = 0; i < VT_COUNT; i++) { + batch[i].join(); + } + } + + ///////////////////////////////////////////////////////////////////// + //////////////////////////// MAIN TESTS ///////////////////////////// + ///////////////////////////////////////////////////////////////////// + + static final int MONITORS_CNT = 12; + static Object[] globalLockArray; + static AtomicInteger workerCount = new AtomicInteger(0); + + static void recursive4_1(int depth, int lockNumber) { + if (depth > 0) { + recursive4_1(depth - 1, lockNumber); + } else { + if (Math.random() < 0.5) { + Thread.yield(); + } + recursive4_2(lockNumber); + } + } + + static void recursive4_2(int lockNumber) { + if (lockNumber + 2 <= MONITORS_CNT - 1) { + lockNumber += 2; + synchronized(globalLockArray[lockNumber]) { + Thread.yield(); + recursive4_2(lockNumber); + } + } + } + + static final Runnable FOO4 = () -> { + while (!finish) { + int lockNumber = ThreadLocalRandom.current().nextInt(0, MONITORS_CNT - 1); + synchronized(globalLockArray[lockNumber]) { + recursive4_1(lockNumber, lockNumber); + } + } + workerCount.getAndIncrement(); + System.out.println("Exiting FOO4 from thread " + Thread.currentThread().getName()); + }; + + /** + * Test contention on monitorenter with extra monitors on stack shared by all threads. + */ + @Test + void testContentionMultipleMonitors() throws Exception { + final int VT_COUNT = CARRIER_COUNT * 8; + workerCount.getAndSet(0); + finish = false; + + globalLockArray = new Object[MONITORS_CNT]; + for (int i = 0; i < MONITORS_CNT; i++) { + globalLockArray[i] = new Object(); + } + + Thread batch[] = new Thread[VT_COUNT]; + for (int i = 0; i < VT_COUNT; i++) { + batch[i] = ThreadBuilders.virtualThreadBuilder(scheduler).name("BatchVT-" + i).start(FOO4); + } + + Thread.sleep(10000); + finish = true; + + for (int i = 0; i < VT_COUNT; i++) { + batch[i].join(); + } + + if (workerCount.get() != VT_COUNT) { + throw new RuntimeException("testContentionMultipleMonitors2 failed. Expected " + VT_COUNT + "but found " + workerCount.get()); + } + } + + + static void recursive5_1(int depth, int lockNumber, Object[] myLockArray) { + if (depth > 0) { + recursive5_1(depth - 1, lockNumber, myLockArray); + } else { + if (Math.random() < 0.5) { + Thread.yield(); + } + recursive5_2(lockNumber, myLockArray); + } + } + + static void recursive5_2(int lockNumber, Object[] myLockArray) { + if (lockNumber + 2 <= MONITORS_CNT - 1) { + lockNumber += 2; + synchronized (myLockArray[lockNumber]) { + if (Math.random() < 0.5) { + Thread.yield(); + } + synchronized (globalLockArray[lockNumber]) { + Thread.yield(); + recursive5_2(lockNumber, myLockArray); + } + } + } + } + + static final Runnable FOO5 = () -> { + Object[] myLockArray = new Object[MONITORS_CNT]; + for (int i = 0; i < MONITORS_CNT; i++) { + myLockArray[i] = new Object(); + } + + while (!finish) { + int lockNumber = ThreadLocalRandom.current().nextInt(0, MONITORS_CNT - 1); + synchronized (myLockArray[lockNumber]) { + synchronized (globalLockArray[lockNumber]) { + recursive5_1(lockNumber, lockNumber, myLockArray); + } + } + } + workerCount.getAndIncrement(); + System.out.println("Exiting FOO5 from thread " + Thread.currentThread().getName()); + }; + + /** + * Test contention on monitorenter with extra monitors on stack both local only and shared by all threads. + */ + @Test + void testContentionMultipleMonitors2() throws Exception { + final int VT_COUNT = CARRIER_COUNT * 8; + workerCount.getAndSet(0); + finish = false; + + globalLockArray = new Object[MONITORS_CNT]; + for (int i = 0; i < MONITORS_CNT; i++) { + globalLockArray[i] = new Object(); + } + + // Create batch of VT threads. + Thread batch[] = new Thread[VT_COUNT]; + for (int i = 0; i < VT_COUNT; i++) { + //Thread.ofVirtual().name("FirstBatchVT-" + i).start(FOO); + batch[i] = ThreadBuilders.virtualThreadBuilder(scheduler).name("BatchVT-" + i).start(FOO5); + } + + Thread.sleep(10000); + + finish = true; + + for (int i = 0; i < VT_COUNT; i++) { + batch[i].join(); + } + + if (workerCount.get() != VT_COUNT) { + throw new RuntimeException("testContentionMultipleMonitors2 failed. Expected " + VT_COUNT + "but found " + workerCount.get()); + } + } + + static synchronized void recursive6(int depth, Object myLock) { + if (depth > 0) { + recursive6(depth - 1, myLock); + } else { + if (Math.random() < 0.5) { + Thread.yield(); + } else { + synchronized (myLock) { + Thread.yield(); + } + } + } + } + + static final Runnable FOO6 = () -> { + Object myLock = new Object(); + + while (!finish) { + int lockNumber = ThreadLocalRandom.current().nextInt(0, MONITORS_CNT - 1); + synchronized (myLock) { + synchronized (globalLockArray[lockNumber]) { + recursive6(lockNumber, myLock); + } + } + } + workerCount.getAndIncrement(); + System.out.println("Exiting FOO5 from thread " + Thread.currentThread().getName()); + }; + + /** + * Test contention on monitorenter with synchronized methods. + */ + @Test + void testContentionMultipleMonitors3() throws Exception { + final int VT_COUNT = CARRIER_COUNT * 8; + workerCount.getAndSet(0); + finish = false; + + + globalLockArray = new Object[MONITORS_CNT]; + for (int i = 0; i < MONITORS_CNT; i++) { + globalLockArray[i] = new Object(); + } + + // Create batch of VT threads. + Thread batch[] = new Thread[VT_COUNT]; + for (int i = 0; i < VT_COUNT; i++) { + batch[i] = ThreadBuilders.virtualThreadBuilder(scheduler).name("BatchVT-" + i).start(FOO6); + } + + Thread.sleep(10000); + + finish = true; + + for (int i = 0; i < VT_COUNT; i++) { + batch[i].join(); + } + + if (workerCount.get() != VT_COUNT) { + throw new RuntimeException("testContentionMultipleMonitors2 failed. Expected " + VT_COUNT + "but found " + workerCount.get()); + } + } +} \ No newline at end of file