From 4e93b57d16c1f9c2daec619a865afd4beb887b2d Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 14 Nov 2024 13:09:35 -0500 Subject: [PATCH 1/2] Transfer ThreadPool local queue to high-pri queue on Task blocking Added for now under the same config flag as is being used for other work prioritization experimentation. --- .../src/System/Threading/Tasks/Task.cs | 21 ++++++++++++++++ .../System/Threading/ThreadPoolWorkQueue.cs | 21 ++++++++++++++++ .../tests/ThreadPoolTests.cs | 24 ++++++++++++++++++- 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs index 9211dd6e0fd88..0ab2d859c0b96 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @@ -3059,6 +3059,27 @@ private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken can bool returnValue = SpinWait(millisecondsTimeout); if (!returnValue) { +#if CORECLR + if (ThreadPoolWorkQueue.s_prioritizationExperiment) + { + // We're about to block waiting for the task to complete, which is expensive, and if + // the task being waited on depends on some other work to run, this thread could end up + // waiting for some other thread to do work. If the two threads are part of the same scheduler, + // such as the thread pool, that could lead to a (temporary) deadlock. This is made worse by + // it also leading to a possible priority inversion on previously queued work. Each thread in + // the thread pool has a local queue. A key motivator for this local queue is it allows this + // thread to create work items that it will then prioritize above all other work in the + // pool. However, while this thread makes its own local queue the top priority, that queue is + // every other thread's lowest priority. If this thread blocks, all of its created work that's + // supposed to be high priority becomes low priority, and work that's typically part of a + // currently in-flight operation gets deprioritized relative to new requests coming into the + // pool, which can lead to the whole system slowing down or even deadlocking. To address that, + // just before we block, we move all local work into a global queue, so that it's at least + // prioritized by other threads more fairly with respect to other work. + ThreadPoolWorkQueue.TransferAllLocalWorkItemsToHighPriorityGlobalQueue(); + } +#endif + var mres = new SetOnInvokeMres(); try { diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index ebca16fb7ec49..66aba5ef175b2 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -691,6 +691,27 @@ public void EnqueueAtHighPriority(object workItem) EnsureThreadRequested(); } + internal static void TransferAllLocalWorkItemsToHighPriorityGlobalQueue() + { + // If there's no local queue, there's nothing to transfer. + if (ThreadPoolWorkQueueThreadLocals.threadLocals is not ThreadPoolWorkQueueThreadLocals tl) + { + return; + } + + // Pop each work item off the local queue and push it onto the global. This is a + // bounded loop as no other thread is allowed to push into this thread's queue. + ThreadPoolWorkQueue queue = ThreadPool.s_workQueue; + while (tl.workStealingQueue.LocalPop() is object workItem) + { + queue.highPriorityWorkItems.Enqueue(workItem); + } + + Volatile.Write(ref queue._mayHaveHighPriorityWorkItems, true); + + queue.EnsureThreadRequested(); + } + internal static bool LocalFindAndPop(object callback) { ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals; diff --git a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs index 46e4c5e097542..59764268029fa 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs +++ b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs @@ -1172,12 +1172,13 @@ public static void PrioritizationExperimentConfigVarTest() RemoteExecutor.Invoke(() => { const int WorkItemCountPerKind = 100; + const int Kinds = 3; int completedWorkItemCount = 0; var allWorkItemsCompleted = new AutoResetEvent(false); Action workItem = _ => { - if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * 3) + if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * Kinds) { allWorkItemsCompleted.Set(); } @@ -1214,6 +1215,27 @@ public static void PrioritizationExperimentConfigVarTest() 0, preferLocal: false); + ThreadPool.UnsafeQueueUserWorkItem( + _ => + { + // Enqueue tasks from a thread pool thread into the local queue, + // then block this thread until a queued task completes. + + startTest.CheckedWait(); + + Task queued = null; + for (int i = 0; i < WorkItemCountPerKind; i++) + { + queued = Task.Run(() => workItem(0)); + } + + queued + .ContinueWith(_ => { }) // prevent wait inlining + .Wait(); + }, + 0, + preferLocal: false); + t = new Thread(() => { // Enqueue local work from thread pool worker threads From 619f0f69f5d4dde33fd1b2681c58f18331544d75 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Wed, 20 Nov 2024 23:29:01 -0500 Subject: [PATCH 2/2] Update src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs --- .../src/System/Threading/ThreadPoolWorkQueue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 66aba5ef175b2..696c2960b05b5 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -707,7 +707,7 @@ internal static void TransferAllLocalWorkItemsToHighPriorityGlobalQueue() queue.highPriorityWorkItems.Enqueue(workItem); } - Volatile.Write(ref queue._mayHaveHighPriorityWorkItems, true); + Volatile.Write(ref queue._mayHaveHighPriorityWorkItems, 1); queue.EnsureThreadRequested(); }