Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release/8.0-staging] Transfer ThreadPool local queue to high-pri queue on Task blocking #109990

Open
wants to merge 2 commits into
base: release/8.0-staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3059,6 +3059,27 @@ private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken can
bool returnValue = SpinWait(millisecondsTimeout);
if (!returnValue)
{
#if CORECLR
if (ThreadPoolWorkQueue.s_prioritizationExperiment)
{
// We're about to block waiting for the task to complete, which is expensive, and if
// the task being waited on depends on some other work to run, this thread could end up
// waiting for some other thread to do work. If the two threads are part of the same scheduler,
// such as the thread pool, that could lead to a (temporary) deadlock. This is made worse by
// it also leading to a possible priority inversion on previously queued work. Each thread in
// the thread pool has a local queue. A key motivator for this local queue is it allows this
// thread to create work items that it will then prioritize above all other work in the
// pool. However, while this thread makes its own local queue the top priority, that queue is
// every other thread's lowest priority. If this thread blocks, all of its created work that's
// supposed to be high priority becomes low priority, and work that's typically part of a
// currently in-flight operation gets deprioritized relative to new requests coming into the
// pool, which can lead to the whole system slowing down or even deadlocking. To address that,
// just before we block, we move all local work into a global queue, so that it's at least
// prioritized by other threads more fairly with respect to other work.
ThreadPoolWorkQueue.TransferAllLocalWorkItemsToHighPriorityGlobalQueue();
}
#endif

var mres = new SetOnInvokeMres();
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,27 @@ public void EnqueueAtHighPriority(object workItem)
EnsureThreadRequested();
}

internal static void TransferAllLocalWorkItemsToHighPriorityGlobalQueue()
{
// If there's no local queue, there's nothing to transfer.
if (ThreadPoolWorkQueueThreadLocals.threadLocals is not ThreadPoolWorkQueueThreadLocals tl)
{
return;
}

// Pop each work item off the local queue and push it onto the global. This is a
// bounded loop as no other thread is allowed to push into this thread's queue.
ThreadPoolWorkQueue queue = ThreadPool.s_workQueue;
while (tl.workStealingQueue.LocalPop() is object workItem)
{
queue.highPriorityWorkItems.Enqueue(workItem);
}

Volatile.Write(ref queue._mayHaveHighPriorityWorkItems, 1);

queue.EnsureThreadRequested();
}

internal static bool LocalFindAndPop(object callback)
{
ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1172,12 +1172,13 @@ public static void PrioritizationExperimentConfigVarTest()
RemoteExecutor.Invoke(() =>
{
const int WorkItemCountPerKind = 100;
const int Kinds = 3;

int completedWorkItemCount = 0;
var allWorkItemsCompleted = new AutoResetEvent(false);
Action<int> workItem = _ =>
{
if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * 3)
if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * Kinds)
{
allWorkItemsCompleted.Set();
}
Expand Down Expand Up @@ -1214,6 +1215,27 @@ public static void PrioritizationExperimentConfigVarTest()
0,
preferLocal: false);

ThreadPool.UnsafeQueueUserWorkItem(
_ =>
{
// Enqueue tasks from a thread pool thread into the local queue,
// then block this thread until a queued task completes.

startTest.CheckedWait();

Task queued = null;
for (int i = 0; i < WorkItemCountPerKind; i++)
{
queued = Task.Run(() => workItem(0));
}

queued
.ContinueWith(_ => { }) // prevent wait inlining
.Wait();
},
0,
preferLocal: false);

t = new Thread(() =>
{
// Enqueue local work from thread pool worker threads
Expand Down
Loading