diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs index bb71f82c339e7..63332c26a2595 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs @@ -10,43 +10,85 @@ namespace System.Threading { internal sealed partial class PortableThreadPool { - // Continuations of IO completions are dispatched to the ThreadPool from IO completion poller threads. This avoids - // continuations blocking/stalling the IO completion poller threads. Setting UnsafeInlineIOCompletionCallbacks allows - // continuations to run directly on the IO completion poller thread, but is inherently unsafe due to the potential for - // those threads to become stalled due to blocking. Sometimes, setting this config value may yield better latency. The - // config value is named for consistency with SocketAsyncEngine.Unix.cs. - private static readonly bool UnsafeInlineIOCompletionCallbacks = - Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1"; + private readonly nint[] _ioPorts = new nint[IOCompletionPortCount]; + private uint _ioPortSelectorForRegister = unchecked((uint)-1); + private uint _ioPortSelectorForQueue = unchecked((uint)-1); + private IOCompletionPoller[]? _ioCompletionPollers; - private static readonly int IOCompletionPollerCount = GetIOCompletionPollerCount(); + private static short DetermineIOCompletionPortCount() + { + const short DefaultIOPortCount = 1; + const short MaxIOPortCount = 1 << 10; + + short ioPortCount = + AppContextConfigHelper.GetInt16Config( + "System.Threading.ThreadPool.IOCompletionPortCount", + "DOTNET_ThreadPool_IOCompletionPortCount", + DefaultIOPortCount, + allowNegative: false); + return ioPortCount == 0 ? DefaultIOPortCount : Math.Min(ioPortCount, MaxIOPortCount); + } - private static int GetIOCompletionPollerCount() + private static int DetermineIOCompletionPollerCount() { // Named for consistency with SocketAsyncEngine.Unix.cs, this environment variable is checked to override the exact // number of IO completion poller threads to use. See the comment in SocketAsyncEngine.Unix.cs about its potential // uses. For this implementation, the ProcessorsPerIOPollerThread config option below may be preferable as it may be // less machine-specific. - if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count)) + int ioPollerCount; + if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count) && + count != 0) { - return Math.Min((int)count, MaxPossibleThreadCount); + ioPollerCount = (int)Math.Min(count, (uint)MaxPossibleThreadCount); } - - if (UnsafeInlineIOCompletionCallbacks) + else if (UnsafeInlineIOCompletionCallbacks) { // In this mode, default to ProcessorCount pollers to ensure that all processors can be utilized if more work // happens on the poller threads - return Environment.ProcessorCount; + ioPollerCount = Environment.ProcessorCount; + } + else + { + int processorsPerPoller = + AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false); + ioPollerCount = (Environment.ProcessorCount - 1) / processorsPerPoller + 1; } - int processorsPerPoller = - AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false); - return (Environment.ProcessorCount - 1) / processorsPerPoller + 1; + if (IOCompletionPortCount == 1) + { + return ioPollerCount; + } + + // Use at least one IO poller per port + if (ioPollerCount <= IOCompletionPortCount) + { + return IOCompletionPortCount; + } + + // Use the same number of IO pollers per port, align up if necessary to make it even + int rem = ioPollerCount % IOCompletionPortCount; + if (rem != 0) + { + ioPollerCount += IOCompletionPortCount - rem; + } + + return ioPollerCount; + } + + private void InitializeIOOnWindows() + { + Debug.Assert(IOCompletionPollerCount % IOCompletionPortCount == 0); + int numConcurrentThreads = IOCompletionPollerCount / IOCompletionPortCount; + for (int i = 0; i < IOCompletionPortCount; i++) + { + _ioPorts[i] = CreateIOCompletionPort(numConcurrentThreads); + } } - private static nint CreateIOCompletionPort() + private static nint CreateIOCompletionPort(int numConcurrentThreads) { nint port = - Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, IOCompletionPollerCount); + Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, numConcurrentThreads); if (port == 0) { int hr = Marshal.GetHRForLastWin32Error(); @@ -58,26 +100,32 @@ private static nint CreateIOCompletionPort() public void RegisterForIOCompletionNotifications(nint handle) { - Debug.Assert(_ioPort != 0); + Debug.Assert(_ioPorts != null); if (_ioCompletionPollers == null) { EnsureIOCompletionPollers(); } - nint port = Interop.Kernel32.CreateIoCompletionPort(handle, _ioPort, UIntPtr.Zero, 0); + uint selectedPortIndex = + IOCompletionPortCount == 1 + ? 0 + : Interlocked.Increment(ref _ioPortSelectorForRegister) % (uint)IOCompletionPortCount; + nint selectedPort = _ioPorts[selectedPortIndex]; + Debug.Assert(selectedPort != 0); + nint port = Interop.Kernel32.CreateIoCompletionPort(handle, selectedPort, UIntPtr.Zero, 0); if (port == 0) { ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error()); } - Debug.Assert(port == _ioPort); + Debug.Assert(port == selectedPort); } public unsafe void QueueNativeOverlapped(NativeOverlapped* nativeOverlapped) { Debug.Assert(nativeOverlapped != null); - Debug.Assert(_ioPort != 0); + Debug.Assert(_ioPorts != null); if (_ioCompletionPollers == null) { @@ -89,7 +137,13 @@ public unsafe void QueueNativeOverlapped(NativeOverlapped* nativeOverlapped) NativeRuntimeEventSource.Log.ThreadPoolIOEnqueue(nativeOverlapped); } - if (!Interop.Kernel32.PostQueuedCompletionStatus(_ioPort, 0, UIntPtr.Zero, (IntPtr)nativeOverlapped)) + uint selectedPortIndex = + IOCompletionPortCount == 1 + ? 0 + : Interlocked.Increment(ref _ioPortSelectorForQueue) % (uint)IOCompletionPortCount; + nint selectedPort = _ioPorts[selectedPortIndex]; + Debug.Assert(selectedPort != 0); + if (!Interop.Kernel32.PostQueuedCompletionStatus(selectedPort, 0, UIntPtr.Zero, (IntPtr)nativeOverlapped)) { ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error()); } @@ -109,7 +163,7 @@ private void EnsureIOCompletionPollers() IOCompletionPoller[] pollers = new IOCompletionPoller[IOCompletionPollerCount]; for (int i = 0; i < IOCompletionPollerCount; ++i) { - pollers[i] = new IOCompletionPoller(_ioPort); + pollers[i] = new IOCompletionPoller(_ioPorts[i % IOCompletionPortCount]); } _ioCompletionPollers = pollers; diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs index 2523213110bae..8dc875f7bea96 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs @@ -41,6 +41,19 @@ internal sealed partial class PortableThreadPool private static readonly short ForcedMaxWorkerThreads = AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MaxThreads", 0, false); +#if TARGET_WINDOWS + // Continuations of IO completions are dispatched to the ThreadPool from IO completion poller threads. This avoids + // continuations blocking/stalling the IO completion poller threads. Setting UnsafeInlineIOCompletionCallbacks allows + // continuations to run directly on the IO completion poller thread, but is inherently unsafe due to the potential for + // those threads to become stalled due to blocking. Sometimes, setting this config value may yield better latency. The + // config value is named for consistency with SocketAsyncEngine.Unix.cs. + private static readonly bool UnsafeInlineIOCompletionCallbacks = + Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1"; + + private static readonly short IOCompletionPortCount = DetermineIOCompletionPortCount(); + private static readonly int IOCompletionPollerCount = DetermineIOCompletionPollerCount(); +#endif + private static readonly int ThreadPoolThreadTimeoutMs = DetermineThreadPoolThreadTimeoutMs(); private static int DetermineThreadPoolThreadTimeoutMs() @@ -107,11 +120,6 @@ private struct CacheLineSeparated private long _memoryUsageBytes; private long _memoryLimitBytes; -#if TARGET_WINDOWS - private readonly nint _ioPort; - private IOCompletionPoller[]? _ioCompletionPollers; -#endif - private readonly LowLevelLock _threadAdjustmentLock = new LowLevelLock(); private CacheLineSeparated _separated; // SOS's ThreadPool command depends on this name @@ -149,7 +157,7 @@ private PortableThreadPool() _separated.counts.NumThreadsGoal = _minThreads; #if TARGET_WINDOWS - _ioPort = CreateIOCompletionPort(); + InitializeIOOnWindows(); #endif } diff --git a/src/libraries/System.Threading.ThreadPool/tests/System.Threading.ThreadPool.Tests.csproj b/src/libraries/System.Threading.ThreadPool/tests/System.Threading.ThreadPool.Tests.csproj index 55b876cd3bedf..02c47cb4e4153 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/System.Threading.ThreadPool.Tests.csproj +++ b/src/libraries/System.Threading.ThreadPool/tests/System.Threading.ThreadPool.Tests.csproj @@ -2,6 +2,7 @@ true $(NetCoreAppCurrent) + true true diff --git a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs index 1f2ad97489a6b..45133de212ad3 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs +++ b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs @@ -1334,6 +1334,106 @@ public static void PrioritizationExperimentConfigVarTest() }).Dispose(); } + public static IEnumerable IOCompletionPortCountConfigVarTest_Args = + from x in Enumerable.Range(0, 9) + select new object[] { x }; + + // Just verifies that some basic IO operations work with different IOCP counts + [ConditionalTheory(nameof(IsThreadingAndRemoteExecutorSupported), nameof(UsePortableThreadPool))] + [MemberData(nameof(IOCompletionPortCountConfigVarTest_Args))] + [PlatformSpecific(TestPlatforms.Windows)] + public static void IOCompletionPortCountConfigVarTest(int ioCompletionPortCount) + { + // Avoid contaminating the main process' environment + RemoteExecutor.Invoke(ioCompletionPortCountStr => + { + int ioCompletionPortCount = int.Parse(ioCompletionPortCountStr); + + const int PretendProcessorCount = 80; + + // The actual test process below will inherit the config vars + Environment.SetEnvironmentVariable("DOTNET_PROCESSOR_COUNT", PretendProcessorCount.ToString()); + Environment.SetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT", "7"); + if (ioCompletionPortCount != 0) + { + Environment.SetEnvironmentVariable( + "DOTNET_ThreadPool_IOCompletionPortCount", + ioCompletionPortCount.ToString()); + } + + RemoteExecutor.Invoke(() => + { + RunQueueNativeOverlappedTest(); + RunAsyncIOTest().Wait(); + + static unsafe void RunQueueNativeOverlappedTest() + { + var done = new AutoResetEvent(false); + for (int i = 0; i < PretendProcessorCount; i++) + { + // Queue a NativeOverlapped, wait for the callback to run + var overlapped = new Overlapped(); + NativeOverlapped* nativeOverlapped = overlapped.Pack((_, _, _) => done.Set(), null); + try + { + ThreadPool.UnsafeQueueNativeOverlapped(nativeOverlapped); + done.CheckedWait(); + } + finally + { + if (nativeOverlapped != null) + { + Overlapped.Free(nativeOverlapped); + } + } + } + } + + static async Task RunAsyncIOTest() + { + var done = new AutoResetEvent(false); + + // Receiver + var t = ThreadTestHelpers.CreateGuardedThread( + out Action checkForThreadErrors, + out Action waitForThread, + async () => + { + using var listener = new TcpListener(IPAddress.Loopback, 55555); + var receiveBuffer = new byte[1]; + listener.Start(); + done.Set(); // indicate listener started + while (true) + { + // Accept a connection, receive a byte + using var socket = await listener.AcceptSocketAsync(); + int bytesRead = + await socket.ReceiveAsync(new ArraySegment(receiveBuffer), SocketFlags.None); + Assert.Equal(1, bytesRead); + done.Set(); // indicate byte received + } + }); + t.IsBackground = true; + t.Start(); + done.CheckedWait(); // wait for listener to start + + // Sender + var sendBuffer = new byte[1]; + for (int i = 0; i < PretendProcessorCount / 2; i++) + { + // Connect, send a byte + using var client = new TcpClient(); + await client.ConnectAsync(IPAddress.Loopback, 55555); + int bytesSent = + await client.Client.SendAsync(new ArraySegment(sendBuffer), SocketFlags.None); + Assert.Equal(1, bytesSent); + done.CheckedWait(); // wait for byte to the received + } + } + }).Dispose(); + }, ioCompletionPortCount.ToString()).Dispose(); + } + public static bool IsThreadingAndRemoteExecutorSupported => PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported; diff --git a/src/libraries/System.Threading.ThreadPool/tests/WindowsThreadPool/System.Threading.ThreadPool.WindowsThreadPool.Tests.csproj b/src/libraries/System.Threading.ThreadPool/tests/WindowsThreadPool/System.Threading.ThreadPool.WindowsThreadPool.Tests.csproj index f39bea77d0fbc..b4925716d96e2 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/WindowsThreadPool/System.Threading.ThreadPool.WindowsThreadPool.Tests.csproj +++ b/src/libraries/System.Threading.ThreadPool/tests/WindowsThreadPool/System.Threading.ThreadPool.WindowsThreadPool.Tests.csproj @@ -3,6 +3,7 @@ true $(NetCoreAppCurrent)-windows + true true