-
-
Notifications
You must be signed in to change notification settings - Fork 956
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Dispatcher] improve api, reduce overhead, improve performances for items > 1k #2083
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ | |
using Stride.Core.Diagnostics; | ||
using System; | ||
using System.Collections.Concurrent; | ||
using System.Reflection; | ||
using System.Runtime.InteropServices; | ||
using System.Threading; | ||
|
||
namespace Stride.Core.Threading | ||
|
@@ -15,6 +17,8 @@ namespace Stride.Core.Threading | |
/// </summary> | ||
public sealed partial class ThreadPool : IDisposable | ||
{ | ||
private static readonly Logger Logger = GlobalLogger.GetLogger(nameof(ThreadPool)); | ||
|
||
/// <summary> | ||
/// The default instance that the whole process shares, use this one to avoid wasting process memory. | ||
/// </summary> | ||
|
@@ -28,9 +32,9 @@ public sealed partial class ThreadPool : IDisposable | |
|
||
private static readonly ProfilingKey ProcessWorkItemKey = new ProfilingKey($"{nameof(ThreadPool)}.ProcessWorkItem"); | ||
|
||
private readonly ConcurrentQueue<Action> workItems = new ConcurrentQueue<Action>(); | ||
private readonly SemaphoreW semaphore; | ||
private readonly ConcurrentQueue<Work> workItems = new ConcurrentQueue<Work>(); | ||
private readonly ISemaphore semaphore; | ||
|
||
private long completionCounter; | ||
private int workScheduled, threadsBusy; | ||
private int disposing; | ||
|
@@ -47,8 +51,30 @@ public sealed partial class ThreadPool : IDisposable | |
|
||
public ThreadPool(int? threadCount = null) | ||
{ | ||
semaphore = new SemaphoreW(spinCountParam:70); | ||
|
||
int spinCount = 70; | ||
|
||
if(RuntimeInformation.ProcessArchitecture is Architecture.Arm or Architecture.Arm64) | ||
{ | ||
// Dotnet: | ||
// On systems with ARM processors, more spin-waiting seems to be necessary to avoid perf regressions from incurring | ||
// the full wait when work becomes available soon enough. This is more noticeable after reducing the number of | ||
// thread requests made to the thread pool because otherwise the extra thread requests cause threads to do more | ||
// busy-waiting instead and adding to contention in trying to look for work items, which is less preferable. | ||
spinCount *= 4; | ||
} | ||
try | ||
{ | ||
semaphore = new DotnetLifoSemaphore(spinCount); | ||
} | ||
catch(Exception e) | ||
{ | ||
// For net6+ this should not happen, logging instead of throwing as this is just a performance regression | ||
if(Environment.Version.Major >= 6) | ||
Logger.Warning($"Could not bind to dotnet's Lifo Semaphore, falling back to suboptimal semaphore:\n{e}"); | ||
|
||
semaphore = new SemaphoreW(spinCountParam:70); | ||
} | ||
|
||
WorkerThreadsCount = threadCount ?? (Environment.ProcessorCount == 1 ? 1 : Environment.ProcessorCount - 1); | ||
leftToDispose = WorkerThreadsCount; | ||
for (int i = 0; i < WorkerThreadsCount; i++) | ||
|
@@ -66,7 +92,7 @@ static ThreadPool() | |
/// Queue an action to run on one of the available threads, | ||
/// it is strongly recommended that the action takes less than a millisecond. | ||
/// </summary> | ||
public void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) | ||
public unsafe void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) | ||
{ | ||
// Throw right here to help debugging | ||
if (workItem == null) | ||
|
@@ -85,10 +111,55 @@ public void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) | |
} | ||
|
||
Interlocked.Add(ref workScheduled, amount); | ||
var work = new Work { WorkHandler = &ActionHandler, Data = workItem }; | ||
for (int i = 0; i < amount; i++) | ||
{ | ||
PooledDelegateHelper.AddReference(workItem); | ||
workItems.Enqueue(workItem); | ||
workItems.Enqueue(work); | ||
} | ||
semaphore.Release(amount); | ||
} | ||
|
||
static void ActionHandler(object param) | ||
{ | ||
Action action = (Action)param; | ||
try | ||
{ | ||
action(); | ||
} | ||
finally | ||
{ | ||
PooledDelegateHelper.Release(action); | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Queue some work item to run on one of the available threads, | ||
/// it is strongly recommended that the action takes less than a millisecond. | ||
/// Additionally, the parameter provided must be fixed from this call onward until the action has finished executing | ||
/// </summary> | ||
public unsafe void QueueUnsafeWorkItem(object parameter, delegate*<object, void> obj, int amount = 1) | ||
{ | ||
if (parameter == null) | ||
{ | ||
throw new NullReferenceException(nameof(parameter)); | ||
} | ||
|
||
if (amount < 1) | ||
{ | ||
throw new ArgumentOutOfRangeException(nameof(amount)); | ||
} | ||
|
||
if (disposing > 0) | ||
{ | ||
throw new ObjectDisposedException(ToString()); | ||
} | ||
|
||
Interlocked.Add(ref workScheduled, amount); | ||
var work = new Work { WorkHandler = obj, Data = parameter }; | ||
for (int i = 0; i < amount; i++) | ||
{ | ||
workItems.Enqueue(work); | ||
} | ||
semaphore.Release(amount); | ||
} | ||
|
@@ -98,20 +169,19 @@ public void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) | |
/// If you absolutely have to block inside one of the threadpool's thread for whatever | ||
/// reason do a busy loop over this function. | ||
/// </summary> | ||
public bool TryCooperate() | ||
public unsafe bool TryCooperate() | ||
{ | ||
if (workItems.TryDequeue(out var workItem)) | ||
{ | ||
Interlocked.Increment(ref threadsBusy); | ||
Interlocked.Decrement(ref workScheduled); | ||
try | ||
{ | ||
using var _ = Profiler.Begin(ProcessWorkItemKey); | ||
workItem.Invoke(); | ||
using (Profiler.Begin(ProcessWorkItemKey)) | ||
workItem.WorkHandler(workItem.Data); | ||
} | ||
finally | ||
{ | ||
PooledDelegateHelper.Release(workItem); | ||
Interlocked.Decrement(ref threadsBusy); | ||
Interlocked.Increment(ref completionCounter); | ||
} | ||
|
@@ -172,14 +242,11 @@ public void Dispose() | |
{ | ||
return; | ||
} | ||
|
||
semaphore.Release(WorkerThreadsCount); | ||
semaphore.Dispose(); | ||
while (Volatile.Read(ref leftToDispose) != 0) | ||
{ | ||
if (semaphore.SignalCount == 0) | ||
{ | ||
semaphore.Release(1); | ||
} | ||
Thread.Yield(); | ||
} | ||
|
||
|
@@ -189,5 +256,38 @@ public void Dispose() | |
|
||
} | ||
} | ||
|
||
unsafe struct Work | ||
{ | ||
public object Data; | ||
public delegate*<object, void> WorkHandler; | ||
} | ||
|
||
private interface ISemaphore : IDisposable | ||
{ | ||
public void Release(int count); | ||
public void Wait(int timeout = -1); | ||
} | ||
|
||
private sealed class DotnetLifoSemaphore : ISemaphore | ||
{ | ||
private readonly IDisposable semaphore; | ||
private readonly Func<int, bool, bool> wait; | ||
private readonly Action<int> release; | ||
|
||
public DotnetLifoSemaphore(int spinCount) | ||
{ | ||
// The semaphore Dotnet uses for its own threadpool is more efficient than what's publicly available, | ||
// but sadly it is internal - we'll hijack it through reflection | ||
Type lifoType = Type.GetType("System.Threading.LowLevelLifoSemaphore"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to add a comment here that we use reflection to access an internal type |
||
semaphore = Activator.CreateInstance(lifoType, new object[]{ 0, short.MaxValue, spinCount, new Action( () => {} ) }) as IDisposable; | ||
wait = lifoType.GetMethod("Wait", BindingFlags.Instance | BindingFlags.Public).CreateDelegate<Func<int, bool, bool>>(semaphore); | ||
release = lifoType.GetMethod("Release", BindingFlags.Instance | BindingFlags.Public).CreateDelegate<Action<int>>(semaphore); | ||
} | ||
|
||
public void Dispose() => semaphore.Dispose(); | ||
public void Release(int count) => release(count); | ||
public void Wait(int timeout = -1) => wait(timeout, true); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, can C# 9.0 function pointers (and GetFunctionPointer() be useful in this scenario?
(not sure it would make an actual perf difference though, but curious as if usable enough for this use case, as this would mean I can probably use it in some other places)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my limited testing, yes, although I do not know the implications this has for JIT and such; I do remember that the address static function pointers lay on when taking its address is not fixed. If the method is 'moved' after JIT took care of the method, then we might have to retrieve the function pointer from its runtime method handle on every call to make sure we run the optimal version ...