From c723f067fddf39528384d65cd79e8a86197555e2 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Thu, 8 Aug 2024 12:58:22 -0700 Subject: [PATCH] Add a thread pool config var on Windows for choosing the number of IOCPs (#105145) There were cases where using more than one IOCP was beneficial along with some other changes. Being able to configure the number would be useful for folks to do further testing without having to use private binaries. --- .../PortableThreadPool.IO.Windows.cs | 104 +++++++++++++----- .../System/Threading/PortableThreadPool.cs | 20 +++- .../System.Threading.ThreadPool.Tests.csproj | 1 + .../tests/ThreadPoolTests.cs | 100 +++++++++++++++++ ....ThreadPool.WindowsThreadPool.Tests.csproj | 1 + 5 files changed, 195 insertions(+), 31 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs index bb71f82c339e7..63332c26a2595 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs @@ -10,43 +10,85 @@ namespace System.Threading { internal sealed partial class PortableThreadPool { - // Continuations of IO completions are dispatched to the ThreadPool from IO completion poller threads. This avoids - // continuations blocking/stalling the IO completion poller threads. Setting UnsafeInlineIOCompletionCallbacks allows - // continuations to run directly on the IO completion poller thread, but is inherently unsafe due to the potential for - // those threads to become stalled due to blocking. Sometimes, setting this config value may yield better latency. The - // config value is named for consistency with SocketAsyncEngine.Unix.cs. - private static readonly bool UnsafeInlineIOCompletionCallbacks = - Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1"; + private readonly nint[] _ioPorts = new nint[IOCompletionPortCount]; + private uint _ioPortSelectorForRegister = unchecked((uint)-1); + private uint _ioPortSelectorForQueue = unchecked((uint)-1); + private IOCompletionPoller[]? _ioCompletionPollers; - private static readonly int IOCompletionPollerCount = GetIOCompletionPollerCount(); + private static short DetermineIOCompletionPortCount() + { + const short DefaultIOPortCount = 1; + const short MaxIOPortCount = 1 << 10; + + short ioPortCount = + AppContextConfigHelper.GetInt16Config( + "System.Threading.ThreadPool.IOCompletionPortCount", + "DOTNET_ThreadPool_IOCompletionPortCount", + DefaultIOPortCount, + allowNegative: false); + return ioPortCount == 0 ? DefaultIOPortCount : Math.Min(ioPortCount, MaxIOPortCount); + } - private static int GetIOCompletionPollerCount() + private static int DetermineIOCompletionPollerCount() { // Named for consistency with SocketAsyncEngine.Unix.cs, this environment variable is checked to override the exact // number of IO completion poller threads to use. See the comment in SocketAsyncEngine.Unix.cs about its potential // uses. For this implementation, the ProcessorsPerIOPollerThread config option below may be preferable as it may be // less machine-specific. - if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count)) + int ioPollerCount; + if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count) && + count != 0) { - return Math.Min((int)count, MaxPossibleThreadCount); + ioPollerCount = (int)Math.Min(count, (uint)MaxPossibleThreadCount); } - - if (UnsafeInlineIOCompletionCallbacks) + else if (UnsafeInlineIOCompletionCallbacks) { // In this mode, default to ProcessorCount pollers to ensure that all processors can be utilized if more work // happens on the poller threads - return Environment.ProcessorCount; + ioPollerCount = Environment.ProcessorCount; + } + else + { + int processorsPerPoller = + AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false); + ioPollerCount = (Environment.ProcessorCount - 1) / processorsPerPoller + 1; } - int processorsPerPoller = - AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false); - return (Environment.ProcessorCount - 1) / processorsPerPoller + 1; + if (IOCompletionPortCount == 1) + { + return ioPollerCount; + } + + // Use at least one IO poller per port + if (ioPollerCount <= IOCompletionPortCount) + { + return IOCompletionPortCount; + } + + // Use the same number of IO pollers per port, align up if necessary to make it even + int rem = ioPollerCount % IOCompletionPortCount; + if (rem != 0) + { + ioPollerCount += IOCompletionPortCount - rem; + } + + return ioPollerCount; + } + + private void InitializeIOOnWindows() + { + Debug.Assert(IOCompletionPollerCount % IOCompletionPortCount == 0); + int numConcurrentThreads = IOCompletionPollerCount / IOCompletionPortCount; + for (int i = 0; i < IOCompletionPortCount; i++) + { + _ioPorts[i] = CreateIOCompletionPort(numConcurrentThreads); + } } - private static nint CreateIOCompletionPort() + private static nint CreateIOCompletionPort(int numConcurrentThreads) { nint port = - Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, IOCompletionPollerCount); + Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, numConcurrentThreads); if (port == 0) { int hr = Marshal.GetHRForLastWin32Error(); @@ -58,26 +100,32 @@ private static nint CreateIOCompletionPort() public void RegisterForIOCompletionNotifications(nint handle) { - Debug.Assert(_ioPort != 0); + Debug.Assert(_ioPorts != null); if (_ioCompletionPollers == null) { EnsureIOCompletionPollers(); } - nint port = Interop.Kernel32.CreateIoCompletionPort(handle, _ioPort, UIntPtr.Zero, 0); + uint selectedPortIndex = + IOCompletionPortCount == 1 + ? 0 + : Interlocked.Increment(ref _ioPortSelectorForRegister) % (uint)IOCompletionPortCount; + nint selectedPort = _ioPorts[selectedPortIndex]; + Debug.Assert(selectedPort != 0); + nint port = Interop.Kernel32.CreateIoCompletionPort(handle, selectedPort, UIntPtr.Zero, 0); if (port == 0) { ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error()); } - Debug.Assert(port == _ioPort); + Debug.Assert(port == selectedPort); } public unsafe void QueueNativeOverlapped(NativeOverlapped* nativeOverlapped) { Debug.Assert(nativeOverlapped != null); - Debug.Assert(_ioPort != 0); + Debug.Assert(_ioPorts != null); if (_ioCompletionPollers == null) { @@ -89,7 +137,13 @@ public unsafe void QueueNativeOverlapped(NativeOverlapped* nativeOverlapped) NativeRuntimeEventSource.Log.ThreadPoolIOEnqueue(nativeOverlapped); } - if (!Interop.Kernel32.PostQueuedCompletionStatus(_ioPort, 0, UIntPtr.Zero, (IntPtr)nativeOverlapped)) + uint selectedPortIndex = + IOCompletionPortCount == 1 + ? 0 + : Interlocked.Increment(ref _ioPortSelectorForQueue) % (uint)IOCompletionPortCount; + nint selectedPort = _ioPorts[selectedPortIndex]; + Debug.Assert(selectedPort != 0); + if (!Interop.Kernel32.PostQueuedCompletionStatus(selectedPort, 0, UIntPtr.Zero, (IntPtr)nativeOverlapped)) { ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error()); } @@ -109,7 +163,7 @@ private void EnsureIOCompletionPollers() IOCompletionPoller[] pollers = new IOCompletionPoller[IOCompletionPollerCount]; for (int i = 0; i < IOCompletionPollerCount; ++i) { - pollers[i] = new IOCompletionPoller(_ioPort); + pollers[i] = new IOCompletionPoller(_ioPorts[i % IOCompletionPortCount]); } _ioCompletionPollers = pollers; diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs index 2523213110bae..8dc875f7bea96 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs @@ -41,6 +41,19 @@ internal sealed partial class PortableThreadPool private static readonly short ForcedMaxWorkerThreads = AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MaxThreads", 0, false); +#if TARGET_WINDOWS + // Continuations of IO completions are dispatched to the ThreadPool from IO completion poller threads. This avoids + // continuations blocking/stalling the IO completion poller threads. Setting UnsafeInlineIOCompletionCallbacks allows + // continuations to run directly on the IO completion poller thread, but is inherently unsafe due to the potential for + // those threads to become stalled due to blocking. Sometimes, setting this config value may yield better latency. The + // config value is named for consistency with SocketAsyncEngine.Unix.cs. + private static readonly bool UnsafeInlineIOCompletionCallbacks = + Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1"; + + private static readonly short IOCompletionPortCount = DetermineIOCompletionPortCount(); + private static readonly int IOCompletionPollerCount = DetermineIOCompletionPollerCount(); +#endif + private static readonly int ThreadPoolThreadTimeoutMs = DetermineThreadPoolThreadTimeoutMs(); private static int DetermineThreadPoolThreadTimeoutMs() @@ -107,11 +120,6 @@ private struct CacheLineSeparated private long _memoryUsageBytes; private long _memoryLimitBytes; -#if TARGET_WINDOWS - private readonly nint _ioPort; - private IOCompletionPoller[]? _ioCompletionPollers; -#endif - private readonly LowLevelLock _threadAdjustmentLock = new LowLevelLock(); private CacheLineSeparated _separated; // SOS's ThreadPool command depends on this name @@ -149,7 +157,7 @@ private PortableThreadPool() _separated.counts.NumThreadsGoal = _minThreads; #if TARGET_WINDOWS - _ioPort = CreateIOCompletionPort(); + InitializeIOOnWindows(); #endif } diff --git a/src/libraries/System.Threading.ThreadPool/tests/System.Threading.ThreadPool.Tests.csproj b/src/libraries/System.Threading.ThreadPool/tests/System.Threading.ThreadPool.Tests.csproj index 55b876cd3bedf..02c47cb4e4153 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/System.Threading.ThreadPool.Tests.csproj +++ b/src/libraries/System.Threading.ThreadPool/tests/System.Threading.ThreadPool.Tests.csproj @@ -2,6 +2,7 @@ true $(NetCoreAppCurrent) + true true diff --git a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs index 1f2ad97489a6b..45133de212ad3 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs +++ b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs @@ -1334,6 +1334,106 @@ public static void PrioritizationExperimentConfigVarTest() }).Dispose(); } + public static IEnumerable IOCompletionPortCountConfigVarTest_Args = + from x in Enumerable.Range(0, 9) + select new object[] { x }; + + // Just verifies that some basic IO operations work with different IOCP counts + [ConditionalTheory(nameof(IsThreadingAndRemoteExecutorSupported), nameof(UsePortableThreadPool))] + [MemberData(nameof(IOCompletionPortCountConfigVarTest_Args))] + [PlatformSpecific(TestPlatforms.Windows)] + public static void IOCompletionPortCountConfigVarTest(int ioCompletionPortCount) + { + // Avoid contaminating the main process' environment + RemoteExecutor.Invoke(ioCompletionPortCountStr => + { + int ioCompletionPortCount = int.Parse(ioCompletionPortCountStr); + + const int PretendProcessorCount = 80; + + // The actual test process below will inherit the config vars + Environment.SetEnvironmentVariable("DOTNET_PROCESSOR_COUNT", PretendProcessorCount.ToString()); + Environment.SetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT", "7"); + if (ioCompletionPortCount != 0) + { + Environment.SetEnvironmentVariable( + "DOTNET_ThreadPool_IOCompletionPortCount", + ioCompletionPortCount.ToString()); + } + + RemoteExecutor.Invoke(() => + { + RunQueueNativeOverlappedTest(); + RunAsyncIOTest().Wait(); + + static unsafe void RunQueueNativeOverlappedTest() + { + var done = new AutoResetEvent(false); + for (int i = 0; i < PretendProcessorCount; i++) + { + // Queue a NativeOverlapped, wait for the callback to run + var overlapped = new Overlapped(); + NativeOverlapped* nativeOverlapped = overlapped.Pack((_, _, _) => done.Set(), null); + try + { + ThreadPool.UnsafeQueueNativeOverlapped(nativeOverlapped); + done.CheckedWait(); + } + finally + { + if (nativeOverlapped != null) + { + Overlapped.Free(nativeOverlapped); + } + } + } + } + + static async Task RunAsyncIOTest() + { + var done = new AutoResetEvent(false); + + // Receiver + var t = ThreadTestHelpers.CreateGuardedThread( + out Action checkForThreadErrors, + out Action waitForThread, + async () => + { + using var listener = new TcpListener(IPAddress.Loopback, 55555); + var receiveBuffer = new byte[1]; + listener.Start(); + done.Set(); // indicate listener started + while (true) + { + // Accept a connection, receive a byte + using var socket = await listener.AcceptSocketAsync(); + int bytesRead = + await socket.ReceiveAsync(new ArraySegment(receiveBuffer), SocketFlags.None); + Assert.Equal(1, bytesRead); + done.Set(); // indicate byte received + } + }); + t.IsBackground = true; + t.Start(); + done.CheckedWait(); // wait for listener to start + + // Sender + var sendBuffer = new byte[1]; + for (int i = 0; i < PretendProcessorCount / 2; i++) + { + // Connect, send a byte + using var client = new TcpClient(); + await client.ConnectAsync(IPAddress.Loopback, 55555); + int bytesSent = + await client.Client.SendAsync(new ArraySegment(sendBuffer), SocketFlags.None); + Assert.Equal(1, bytesSent); + done.CheckedWait(); // wait for byte to the received + } + } + }).Dispose(); + }, ioCompletionPortCount.ToString()).Dispose(); + } + public static bool IsThreadingAndRemoteExecutorSupported => PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported; diff --git a/src/libraries/System.Threading.ThreadPool/tests/WindowsThreadPool/System.Threading.ThreadPool.WindowsThreadPool.Tests.csproj b/src/libraries/System.Threading.ThreadPool/tests/WindowsThreadPool/System.Threading.ThreadPool.WindowsThreadPool.Tests.csproj index f39bea77d0fbc..b4925716d96e2 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/WindowsThreadPool/System.Threading.ThreadPool.WindowsThreadPool.Tests.csproj +++ b/src/libraries/System.Threading.ThreadPool/tests/WindowsThreadPool/System.Threading.ThreadPool.WindowsThreadPool.Tests.csproj @@ -3,6 +3,7 @@ true $(NetCoreAppCurrent)-windows + true true