From 1729ca6696c3ea6048a64fddaad7ec1cfec3822d Mon Sep 17 00:00:00 2001 From: Anton Firszov Date: Wed, 27 Jul 2022 00:11:56 +0200 Subject: [PATCH] Tear down pending HTTP connection when the originating request completes (#71785) Resolves #66297 --- .../TaskCompletionSourceWithCancellation.cs | 2 +- .../System/Net/Http/GenericLoopbackServer.cs | 3 + .../tests/System/Net/Http/LoopbackServer.cs | 2 +- .../src/System.Net.Http.csproj | 4 +- .../src/System/Net/Http/GlobalHttpSettings.cs | 4 + .../Http/SocketsHttpHandler/HttpConnection.cs | 2 +- .../SocketsHttpHandler/HttpConnectionPool.cs | 346 ++++++++++++------ ...ttpHandlerTest.Cancellation.NonParallel.cs | 140 +++++++ .../System.Net.Http.Functional.Tests.csproj | 3 +- 9 files changed, 381 insertions(+), 125 deletions(-) create mode 100644 src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.NonParallel.cs diff --git a/src/libraries/Common/src/System/Threading/Tasks/TaskCompletionSourceWithCancellation.cs b/src/libraries/Common/src/System/Threading/Tasks/TaskCompletionSourceWithCancellation.cs index 8964fdba9a720..cd1cfb6e8893d 100644 --- a/src/libraries/Common/src/System/Threading/Tasks/TaskCompletionSourceWithCancellation.cs +++ b/src/libraries/Common/src/System/Threading/Tasks/TaskCompletionSourceWithCancellation.cs @@ -8,7 +8,7 @@ namespace System.Threading.Tasks /// s contain the relevant , /// while also avoiding unnecessary allocations for closure captures. /// - internal sealed class TaskCompletionSourceWithCancellation : TaskCompletionSource + internal class TaskCompletionSourceWithCancellation : TaskCompletionSource { public TaskCompletionSourceWithCancellation() : base(TaskCreationOptions.RunContinuationsAsynchronously) { diff --git a/src/libraries/Common/tests/System/Net/Http/GenericLoopbackServer.cs b/src/libraries/Common/tests/System/Net/Http/GenericLoopbackServer.cs index c3fb1512fbce8..adc60fdf4a89a 100644 --- a/src/libraries/Common/tests/System/Net/Http/GenericLoopbackServer.cs +++ b/src/libraries/Common/tests/System/Net/Http/GenericLoopbackServer.cs @@ -152,6 +152,9 @@ public abstract class GenericLoopbackConnection : IAsyncDisposable /// Waits for the client to signal cancellation. public abstract Task WaitForCloseAsync(CancellationToken cancellationToken); + /// Reset the connection's internal state so it can process further requests. + public virtual void CompleteRequestProcessing() { } + /// Helper function to make it easier to convert old test with strings. public async Task SendResponseBodyAsync(string content, bool isFinal = true) { diff --git a/src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs b/src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs index 07a039da95349..4879258fab2c4 100644 --- a/src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs +++ b/src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs @@ -873,7 +873,7 @@ public override async Task ReadRequestBodyAsync() return buffer; } - public void CompleteRequestProcessing() + public override void CompleteRequestProcessing() { _contentLength = 0; _bodyRead = false; diff --git a/src/libraries/System.Net.Http/src/System.Net.Http.csproj b/src/libraries/System.Net.Http/src/System.Net.Http.csproj index 28a2fffffe9b8..e84a7e1c46627 100644 --- a/src/libraries/System.Net.Http/src/System.Net.Http.csproj +++ b/src/libraries/System.Net.Http/src/System.Net.Http.csproj @@ -138,8 +138,6 @@ Link="Common\System\Net\HttpDateParser.cs" /> - + Common\System\Net\Http\aspnetcore\IHttpStreamHeadersHandler.cs diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/GlobalHttpSettings.cs b/src/libraries/System.Net.Http/src/System/Net/Http/GlobalHttpSettings.cs index 0dd87f86745a1..072fbbd0ec16b 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/GlobalHttpSettings.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/GlobalHttpSettings.cs @@ -45,6 +45,10 @@ internal static class SocketsHttpHandler // Defaults to 1.0. Higher values result in shorter window, but slower downloads. public static double Http2StreamWindowScaleThresholdMultiplier { get; } = GetHttp2StreamWindowScaleThresholdMultiplier(); + public static int PendingConnectionTimeoutOnRequestCompletion { get; } = RuntimeSettingParser.QueryRuntimeSettingInt32( + "System.Net.SocketsHttpHandler.PendingConnectionTimeoutOnRequestCompletion", + "DOTNET_SYSTEM_NET_HTTP_SOCKETSHTTPHANDLER_PENDINGCONNECTIONTIMEOUTONREQUESTCOMPLETION", 5000); + public const int DefaultHttp2MaxStreamWindowSize = 16 * 1024 * 1024; public const double DefaultHttp2StreamWindowScaleThresholdMultiplier = 1.0; diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs index e42c83bbd60be..2d92a7b74e9a1 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs @@ -2074,7 +2074,7 @@ private void ReturnConnectionToPool() _idleSinceTickCount = Environment.TickCount64; // Put connection back in the pool. - _pool.ReturnHttp11Connection(this, isNewConnection: false); + _pool.RecycleHttp11Connection(this); } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs index 7a4767d1e13df..d909eef019de6 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs @@ -451,27 +451,41 @@ private async Task AddHttp11ConnectionAsync(RequestQueue.QueueIt { if (NetEventSource.Log.IsEnabled()) Trace("Creating new HTTP/1.1 connection for pool."); - HttpConnection connection; - using (CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource()) + HttpConnectionWaiter waiter = queueItem.Waiter; + HttpConnection? connection = null; + Exception? connectionException = null; + + CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(); + waiter.ConnectionCancellationTokenSource = cts; + try { - try - { - connection = await CreateHttp11ConnectionAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false); - } - catch (OperationCanceledException oce) when (oce.CancellationToken == cts.Token) - { - HandleHttp11ConnectionFailure(queueItem.Waiter, CreateConnectTimeoutException(oce)); - return; - } - catch (Exception e) + connection = await CreateHttp11ConnectionAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false); + } + catch (Exception e) + { + connectionException = e is OperationCanceledException oce && oce.CancellationToken == cts.Token && !waiter.CancelledByOriginatingRequestCompletion ? + CreateConnectTimeoutException(oce) : + e; + } + finally + { + lock (waiter) { - HandleHttp11ConnectionFailure(queueItem.Waiter, e); - return; + waiter.ConnectionCancellationTokenSource = null; + cts.Dispose(); } } - // Add the established connection to the pool. - ReturnHttp11Connection(connection, isNewConnection: true, queueItem.Waiter); + if (connection is not null) + { + // Add the established connection to the pool. + ReturnHttp11Connection(connection, isNewConnection: true, queueItem.Waiter); + } + else + { + Debug.Assert(connectionException is not null); + HandleHttp11ConnectionFailure(waiter, connectionException); + } } private void CheckForHttp11ConnectionInjection() @@ -480,19 +494,22 @@ private void CheckForHttp11ConnectionInjection() _http11RequestQueue.PruneCompletedRequestsFromHeadOfQueue(this); + // Determine if we can and should add a new connection to the pool. + bool willInject = _availableHttp11Connections.Count == 0 && // No available connections + _http11RequestQueue.Count > _pendingHttp11ConnectionCount && // More requests queued than pending connections + _associatedHttp11ConnectionCount < _maxHttp11Connections && // Under the connection limit + _http11RequestQueue.RequestsWithoutAConnectionAttempt > 0; // There are requests we haven't issued a connection attempt for + if (NetEventSource.Log.IsEnabled()) { Trace($"Available HTTP/1.1 connections: {_availableHttp11Connections.Count}, Requests in the queue: {_http11RequestQueue.Count}, " + $"Requests without a connection attempt: {_http11RequestQueue.RequestsWithoutAConnectionAttempt}, " + $"Pending HTTP/1.1 connections: {_pendingHttp11ConnectionCount}, Total associated HTTP/1.1 connections: {_associatedHttp11ConnectionCount}, " + - $"Max HTTP/1.1 connection limit: {_maxHttp11Connections}."); + $"Max HTTP/1.1 connection limit: {_maxHttp11Connections}, " + + $"Will inject connection: {willInject}."); } - // Determine if we can and should add a new connection to the pool. - if (_availableHttp11Connections.Count == 0 && // No available connections - _http11RequestQueue.Count > _pendingHttp11ConnectionCount && // More requests queued than pending connections - _associatedHttp11ConnectionCount < _maxHttp11Connections && // Under the connection limit - _http11RequestQueue.RequestsWithoutAConnectionAttempt > 0) // There are requests we haven't issued a connection attempt for + if (willInject) { _associatedHttp11ConnectionCount++; _pendingHttp11ConnectionCount++; @@ -507,13 +524,10 @@ private void CheckForHttp11ConnectionInjection() } } - private async ValueTask GetHttp11ConnectionAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) + private bool TryGetPooledHttp11Connection(HttpRequestMessage request, bool async, [NotNullWhen(true)] out HttpConnection? connection, [NotNullWhen(false)] out HttpConnectionWaiter? waiter) { - // Look for a usable idle connection. - TaskCompletionSourceWithCancellation waiter; while (true) { - HttpConnection? connection = null; lock (SyncObj) { _usedSinceLastCleanup = true; @@ -533,8 +547,10 @@ private async ValueTask GetHttp11ConnectionAsync(HttpRequestMess CheckForHttp11ConnectionInjection(); - // Break out of the loop and continue processing below. - break; + // There were no available idle connections. This request has been added to the request queue. + if (NetEventSource.Log.IsEnabled()) Trace($"No available HTTP/1.1 connections; request queued."); + connection = null; + return false; } } @@ -553,23 +569,8 @@ private async ValueTask GetHttp11ConnectionAsync(HttpRequestMess } if (NetEventSource.Log.IsEnabled()) connection.Trace("Found usable HTTP/1.1 connection in pool."); - return connection; - } - - // There were no available idle connections. This request has been added to the request queue. - if (NetEventSource.Log.IsEnabled()) Trace($"No available HTTP/1.1 connections; request queued."); - - long startingTimestamp = Stopwatch.GetTimestamp(); - try - { - return await waiter.WaitWithCancellationAsync(async, cancellationToken).ConfigureAwait(false); - } - finally - { - if (HttpTelemetry.Log.IsEnabled()) - { - HttpTelemetry.Log.Http11RequestLeftQueue(Stopwatch.GetElapsedTime(startingTimestamp).TotalMilliseconds); - } + waiter = null; + return true; } } @@ -578,7 +579,7 @@ private async Task HandleHttp11Downgrade(HttpRequestMessage request, Stream stre if (NetEventSource.Log.IsEnabled()) Trace("Server does not support HTTP2; disabling HTTP2 use and proceeding with HTTP/1.1 connection"); bool canUse = true; - TaskCompletionSourceWithCancellation? waiter = null; + HttpConnectionWaiter? waiter = null; lock (SyncObj) { Debug.Assert(_pendingHttp2Connection); @@ -608,7 +609,10 @@ private async Task HandleHttp11Downgrade(HttpRequestMessage request, Stream stre { if (NetEventSource.Log.IsEnabled()) Trace("Downgrading queued HTTP2 request to HTTP/1.1"); - // We don't care if this fails; that means the request was previously canceled or handled by a different connection. + // We are done with the HTTP2 connection attempt, no point to cancel it. + Volatile.Write(ref waiter.ConnectionCancellationTokenSource, null); + + // We don't care if this fails; that means the request was previously canceled or handeled by a different connection. waiter.TrySetResult(null); lock (SyncObj) @@ -647,64 +651,80 @@ private async Task AddHttp2ConnectionAsync(RequestQueue.QueueI { if (NetEventSource.Log.IsEnabled()) Trace("Creating new HTTP/2 connection for pool."); - Http2Connection connection; - using (CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource()) + Http2Connection? connection = null; + Exception? connectionException = null; + HttpConnectionWaiter waiter = queueItem.Waiter; + + CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(); + waiter.ConnectionCancellationTokenSource = cts; + try { - try + (Stream stream, TransportContext? transportContext) = await ConnectAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false); + + if (IsSecure) { - (Stream stream, TransportContext? transportContext) = await ConnectAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false); + SslStream sslStream = (SslStream)stream; - if (IsSecure) + if (sslStream.NegotiatedApplicationProtocol == SslApplicationProtocol.Http2) { - SslStream sslStream = (SslStream)stream; + // The server accepted our request for HTTP2. - if (sslStream.NegotiatedApplicationProtocol == SslApplicationProtocol.Http2) + if (sslStream.SslProtocol < SslProtocols.Tls12) { - // The server accepted our request for HTTP2. - - if (sslStream.SslProtocol < SslProtocols.Tls12) - { - stream.Dispose(); - throw new HttpRequestException(SR.Format(SR.net_ssl_http2_requires_tls12, sslStream.SslProtocol)); - } - - connection = await ConstructHttp2ConnectionAsync(stream, queueItem.Request, cts.Token).ConfigureAwait(false); + stream.Dispose(); + connectionException = new HttpRequestException(SR.Format(SR.net_ssl_http2_requires_tls12, sslStream.SslProtocol)); } else { - // We established an SSL connection, but the server denied our request for HTTP2. - await HandleHttp11Downgrade(queueItem.Request, stream, transportContext, cts.Token).ConfigureAwait(false); - return; + connection = await ConstructHttp2ConnectionAsync(stream, queueItem.Request, cts.Token).ConfigureAwait(false); } } else { - connection = await ConstructHttp2ConnectionAsync(stream, queueItem.Request, cts.Token).ConfigureAwait(false); + // We established an SSL connection, but the server denied our request for HTTP2. + await HandleHttp11Downgrade(queueItem.Request, stream, transportContext, cts.Token).ConfigureAwait(false); + return; } } - catch (OperationCanceledException oce) when (oce.CancellationToken == cts.Token) + else { - HandleHttp2ConnectionFailure(queueItem.Waiter, CreateConnectTimeoutException(oce)); - return; + connection = await ConstructHttp2ConnectionAsync(stream, queueItem.Request, cts.Token).ConfigureAwait(false); } - catch (Exception e) + } + catch (Exception e) + { + connectionException = e is OperationCanceledException oce && oce.CancellationToken == cts.Token && !waiter.CancelledByOriginatingRequestCompletion ? + CreateConnectTimeoutException(oce) : + e; + } + finally + { + lock (waiter) { - HandleHttp2ConnectionFailure(queueItem.Waiter, e); - return; + waiter.ConnectionCancellationTokenSource = null; + cts.Dispose(); } } - // Register for shutdown notification. - // Do this before we return the connection to the pool, because that may result in it being disposed. - ValueTask shutdownTask = connection.WaitForShutdownAsync(); + if (connection is not null) + { + // Register for shutdown notification. + // Do this before we return the connection to the pool, because that may result in it being disposed. + ValueTask shutdownTask = connection.WaitForShutdownAsync(); - // Add the new connection to the pool. - ReturnHttp2Connection(connection, isNewConnection: true, queueItem.Waiter); + // Add the new connection to the pool. + ReturnHttp2Connection(connection, isNewConnection: true, queueItem.Waiter); - // Wait for connection shutdown. - await shutdownTask.ConfigureAwait(false); + // Wait for connection shutdown. + await shutdownTask.ConfigureAwait(false); - InvalidateHttp2Connection(connection); + InvalidateHttp2Connection(connection); + } + else + { + Debug.Assert(connectionException is not null); + HandleHttp2ConnectionFailure(waiter, connectionException); + } } private void CheckForHttp2ConnectionInjection() @@ -714,11 +734,24 @@ private void CheckForHttp2ConnectionInjection() _http2RequestQueue.PruneCompletedRequestsFromHeadOfQueue(this); // Determine if we can and should add a new connection to the pool. - if ((_availableHttp2Connections?.Count ?? 0) == 0 && // No available connections + int availableHttp2ConnectionCount = _availableHttp2Connections?.Count ?? 0; + bool willInject = availableHttp2ConnectionCount == 0 && // No available connections !_pendingHttp2Connection && // Only allow one pending HTTP2 connection at a time _http2RequestQueue.Count > 0 && // There are requests left on the queue (_associatedHttp2ConnectionCount == 0 || EnableMultipleHttp2Connections) && // We allow multiple connections, or don't have a connection currently - _http2RequestQueue.RequestsWithoutAConnectionAttempt > 0) // There are requests we haven't issued a connection attempt for + _http2RequestQueue.RequestsWithoutAConnectionAttempt > 0; // There are requests we haven't issued a connection attempt for + + if (NetEventSource.Log.IsEnabled()) + { + Trace($"Available HTTP/2.0 connections: {availableHttp2ConnectionCount}, " + + $"Pending HTTP/2.0 connection: {_pendingHttp2Connection}" + + $"Requests in the queue: {_http2RequestQueue.Count}, " + + $"Requests without a connection attempt: {_http2RequestQueue.RequestsWithoutAConnectionAttempt}, " + + $"Total associated HTTP/2.0 connections: {_associatedHttp2ConnectionCount}, " + + $"Will inject connection: {willInject}."); + } + + if (willInject) { _associatedHttp2ConnectionCount++; _pendingHttp2Connection = true; @@ -733,22 +766,22 @@ private void CheckForHttp2ConnectionInjection() } } - private async ValueTask GetHttp2ConnectionAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) + private bool TryGetPooledHttp2Connection(HttpRequestMessage request, bool async, [NotNullWhen(true)] out Http2Connection? connection, out HttpConnectionWaiter? waiter) { Debug.Assert(_kind == HttpConnectionKind.Https || _kind == HttpConnectionKind.SslProxyTunnel || _kind == HttpConnectionKind.Http || _kind == HttpConnectionKind.SocksTunnel || _kind == HttpConnectionKind.SslSocksTunnel); // Look for a usable connection. - TaskCompletionSourceWithCancellation waiter; while (true) { - Http2Connection connection; lock (SyncObj) { _usedSinceLastCleanup = true; if (!_http2Enabled) { - return null; + waiter = null; + connection = null; + return false; } int availableConnectionCount = _availableHttp2Connections?.Count ?? 0; @@ -765,8 +798,10 @@ private void CheckForHttp2ConnectionInjection() CheckForHttp2ConnectionInjection(); - // Break out of the loop and continue processing below. - break; + // There were no available connections. This request has been added to the request queue. + if (NetEventSource.Log.IsEnabled()) Trace($"No available HTTP/2 connections; request queued."); + connection = null; + return false; } } @@ -802,23 +837,8 @@ private void CheckForHttp2ConnectionInjection() } if (NetEventSource.Log.IsEnabled()) connection.Trace("Found usable HTTP/2 connection in pool."); - return connection; - } - - // There were no available connections. This request has been added to the request queue. - if (NetEventSource.Log.IsEnabled()) Trace($"No available HTTP/2 connections; request queued."); - - long startingTimestamp = Stopwatch.GetTimestamp(); - try - { - return await waiter.WaitWithCancellationAsync(async, cancellationToken).ConfigureAwait(false); - } - finally - { - if (HttpTelemetry.Log.IsEnabled()) - { - HttpTelemetry.Log.Http20RequestLeftQueue(Stopwatch.GetElapsedTime(startingTimestamp).TotalMilliseconds); - } + waiter = null; + return true; } } @@ -989,6 +1009,8 @@ public async ValueTask SendWithVersionDetectionAndRetryAsyn int retryCount = 0; while (true) { + HttpConnectionWaiter? http11ConnectionWaiter = null; + HttpConnectionWaiter? http2ConnectionWaiter = null; try { HttpResponseMessage? response = null; @@ -1016,7 +1038,12 @@ public async ValueTask SendWithVersionDetectionAndRetryAsyn (request.Version.Major >= 2 || (request.VersionPolicy == HttpVersionPolicy.RequestVersionOrHigher && IsSecure)) && (request.VersionPolicy != HttpVersionPolicy.RequestVersionOrLower || IsSecure)) // prefer HTTP/1.1 if connection is not secured and downgrade is possible { - Http2Connection? connection = await GetHttp2ConnectionAsync(request, async, cancellationToken).ConfigureAwait(false); + if (!TryGetPooledHttp2Connection(request, async, out Http2Connection? connection, out http2ConnectionWaiter) && + http2ConnectionWaiter != null) + { + connection = await http2ConnectionWaiter.WaitForConnectionAsync(async, cancellationToken).ConfigureAwait(false); + } + Debug.Assert(connection is not null || !_http2Enabled); if (connection is not null) { @@ -1044,7 +1071,11 @@ public async ValueTask SendWithVersionDetectionAndRetryAsyn } // Use HTTP/1.x. - HttpConnection connection = await GetHttp11ConnectionAsync(request, async, cancellationToken).ConfigureAwait(false); + if (!TryGetPooledHttp11Connection(request, async, out HttpConnection? connection, out http11ConnectionWaiter)) + { + connection = await http11ConnectionWaiter.WaitForConnectionAsync(async, cancellationToken).ConfigureAwait(false); + } + connection.Acquire(); // In case we are doing Windows (i.e. connection-based) auth, we need to ensure that we hold on to this specific connection while auth is underway. try { @@ -1113,6 +1144,52 @@ public async ValueTask SendWithVersionDetectionAndRetryAsyn // Eat exception and try again. } + finally + { + // We never cancel both attempts at the same time. When downgrade happens, it's possible that both waiters are non-null, + // but in that case http2ConnectionWaiter.ConnectionCancellationTokenSource shall be null. + Debug.Assert(http11ConnectionWaiter is null || http2ConnectionWaiter?.ConnectionCancellationTokenSource is null); + CancelIfNecessary(http11ConnectionWaiter, cancellationToken.IsCancellationRequested); + CancelIfNecessary(http2ConnectionWaiter, cancellationToken.IsCancellationRequested); + } + } + } + + private void CancelIfNecessary(HttpConnectionWaiter? waiter, bool requestCancelled) + { + int timeout = GlobalHttpSettings.SocketsHttpHandler.PendingConnectionTimeoutOnRequestCompletion; + if (waiter?.ConnectionCancellationTokenSource is null || + timeout == Timeout.Infinite || + Settings._connectTimeout != Timeout.InfiniteTimeSpan && timeout > (int)Settings._connectTimeout.TotalMilliseconds) // Do not override shorter ConnectTimeout + { + return; + } + + lock (waiter) + { + if (waiter.ConnectionCancellationTokenSource is null) + { + return; + } + + if (NetEventSource.Log.IsEnabled()) + { + Trace($"Initiating cancellation of a pending connection attempt with delay of {timeout} ms, " + + $"Reason: {(requestCancelled ? "Request cancelled" : "Request served by another connection")}."); + } + + waiter.CancelledByOriginatingRequestCompletion = true; + if (timeout > 0) + { + // Cancel after the specified timeout. This cancellation will not fire if the connection + // succeeds within the delay and the CTS becomes disposed. + waiter.ConnectionCancellationTokenSource.CancelAfter(timeout); + } + else + { + // Cancel immediately if no timeout specified. + waiter.ConnectionCancellationTokenSource.Cancel(); + } } } @@ -1675,7 +1752,7 @@ private async ValueTask EstablishSocksTunnel(HttpRequestMessage request, return stream; } - private void HandleHttp11ConnectionFailure(TaskCompletionSourceWithCancellation? requestWaiter, Exception e) + private void HandleHttp11ConnectionFailure(HttpConnectionWaiter? requestWaiter, Exception e) { if (NetEventSource.Log.IsEnabled()) Trace($"HTTP/1.1 connection failed: {e}"); @@ -1695,7 +1772,7 @@ private void HandleHttp11ConnectionFailure(TaskCompletionSourceWithCancellation< } } - private void HandleHttp2ConnectionFailure(TaskCompletionSourceWithCancellation requestWaiter, Exception e) + private void HandleHttp2ConnectionFailure(HttpConnectionWaiter requestWaiter, Exception e) { if (NetEventSource.Log.IsEnabled()) Trace($"HTTP2 connection failed: {e}"); @@ -1776,7 +1853,9 @@ private bool CheckExpirationOnReturn(HttpConnectionBase connection) return false; } - public void ReturnHttp11Connection(HttpConnection connection, bool isNewConnection, TaskCompletionSourceWithCancellation? initialRequestWaiter = null) + public void RecycleHttp11Connection(HttpConnection connection) => ReturnHttp11Connection(connection, false); + + private void ReturnHttp11Connection(HttpConnection connection, bool isNewConnection, HttpConnectionWaiter? initialRequestWaiter = null) { if (NetEventSource.Log.IsEnabled()) connection.Trace($"{nameof(isNewConnection)}={isNewConnection}"); @@ -1792,7 +1871,7 @@ public void ReturnHttp11Connection(HttpConnection connection, bool isNewConnecti // Loop in case we get a request that has already been canceled or handled by a different connection. while (true) { - TaskCompletionSourceWithCancellation? waiter = null; + HttpConnectionWaiter? waiter = null; bool added = false; lock (SyncObj) { @@ -1869,7 +1948,7 @@ public void ReturnHttp11Connection(HttpConnection connection, bool isNewConnecti } } - public void ReturnHttp2Connection(Http2Connection connection, bool isNewConnection, TaskCompletionSourceWithCancellation? initialRequestWaiter = null) + private void ReturnHttp2Connection(Http2Connection connection, bool isNewConnection, HttpConnectionWaiter? initialRequestWaiter = null) { if (NetEventSource.Log.IsEnabled()) connection.Trace($"{nameof(isNewConnection)}={isNewConnection}"); @@ -1894,7 +1973,7 @@ public void ReturnHttp2Connection(Http2Connection connection, bool isNewConnecti // Loop in case we get a request that has already been canceled or handled by a different connection. while (true) { - TaskCompletionSourceWithCancellation? waiter = null; + HttpConnectionWaiter? waiter = null; bool added = false; lock (SyncObj) { @@ -2307,7 +2386,7 @@ private struct RequestQueue public struct QueueItem { public HttpRequestMessage Request; - public TaskCompletionSourceWithCancellation Waiter; + public HttpConnectionWaiter Waiter; } // This implementation mimics that of Queue, but without version checks and with an extra head pointer @@ -2406,9 +2485,9 @@ private void Grow() } - public TaskCompletionSourceWithCancellation EnqueueRequest(HttpRequestMessage request) + public HttpConnectionWaiter EnqueueRequest(HttpRequestMessage request) { - var waiter = new TaskCompletionSourceWithCancellation(); + var waiter = new HttpConnectionWaiter(); Enqueue(new QueueItem { Request = request, Waiter = waiter }); return waiter; } @@ -2428,7 +2507,7 @@ public void PruneCompletedRequestsFromHeadOfQueue(HttpConnectionPool pool) } } - public bool TryDequeueWaiter(HttpConnectionPool pool, [MaybeNullWhen(false)] out TaskCompletionSourceWithCancellation waiter) + public bool TryDequeueWaiter(HttpConnectionPool pool, [MaybeNullWhen(false)] out HttpConnectionWaiter waiter) { PruneCompletedRequestsFromHeadOfQueue(pool); @@ -2442,7 +2521,7 @@ public bool TryDequeueWaiter(HttpConnectionPool pool, [MaybeNullWhen(false)] out return false; } - public void TryDequeueSpecificWaiter(TaskCompletionSourceWithCancellation waiter) + public void TryDequeueSpecificWaiter(HttpConnectionWaiter waiter) { if (TryPeek(out QueueItem queueItem) && queueItem.Waiter == waiter) { @@ -2470,5 +2549,34 @@ public QueueItem PeekNextRequestForConnectionAttempt() public int RequestsWithoutAConnectionAttempt => _size - _attemptedConnectionsOffset; } + + private sealed class HttpConnectionWaiter : TaskCompletionSourceWithCancellation + { + // When a connection attempt is pending, reference the connection's CTS, so we can tear it down if the initiating request is cancelled + // or completes on a different connection. + public CancellationTokenSource? ConnectionCancellationTokenSource; + + // Distinguish connection cancellation that happens because the initiating request is cancelled or completed on a different connection. + public bool CancelledByOriginatingRequestCompletion { get; set; } + + public async ValueTask WaitForConnectionAsync(bool async, CancellationToken requestCancellationToken) + { + long startingTimestamp = Stopwatch.GetTimestamp(); + try + { + return await WaitWithCancellationAsync(async, requestCancellationToken).ConfigureAwait(false); + } + finally + { + if (HttpTelemetry.Log.IsEnabled()) + { + if (typeof(T) == typeof(HttpConnection)) + HttpTelemetry.Log.Http11RequestLeftQueue(Stopwatch.GetElapsedTime(startingTimestamp).TotalMilliseconds); + else if (typeof(T) == typeof(Http2Connection)) + HttpTelemetry.Log.Http20RequestLeftQueue(Stopwatch.GetElapsedTime(startingTimestamp).TotalMilliseconds); + } + } + } + } } } diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.NonParallel.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.NonParallel.cs new file mode 100644 index 0000000000000..1f8e681cab25c --- /dev/null +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.NonParallel.cs @@ -0,0 +1,140 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.IO; +using System.Net.Sockets; +using System.Net.Test.Common; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.DotNet.RemoteExecutor; +using Xunit; +using Xunit.Abstractions; + +namespace System.Net.Http.Functional.Tests +{ + [Collection(nameof(DisableParallelization))] // Reduces chance of timing-related issues + [ConditionalClass(typeof(SocketsHttpHandler), nameof(SocketsHttpHandler.IsSupported))] + public class SocketsHttpHandler_Cancellation_Test_NonParallel : HttpClientHandlerTestBase + { + public SocketsHttpHandler_Cancellation_Test_NonParallel(ITestOutputHelper output) : base(output) + { + } + + [OuterLoop("Incurs significant delay.")] + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData("1.1", 10_000, 1_000, 100)] + [InlineData("2.0", 10_000, 1_000, 100)] + [InlineData("1.1", 20_000, 10_000, null)] + [InlineData("2.0", 20_000, 10_000, null)] + public static void CancelPendingRequest_DropsStalledConnectionAttempt(string versionString, int firstConnectionDelayMs, int requestTimeoutMs, int? pendingConnectionTimeoutOnRequestCompletion) + { + RemoteInvokeOptions options = new RemoteInvokeOptions(); + if (pendingConnectionTimeoutOnRequestCompletion is not null) + { + options.StartInfo.EnvironmentVariables["DOTNET_SYSTEM_NET_HTTP_SOCKETSHTTPHANDLER_PENDINGCONNECTIONTIMEOUTONREQUESTCOMPLETION"] = pendingConnectionTimeoutOnRequestCompletion.ToString(); + } + + RemoteExecutor.Invoke(CancelPendingRequest_DropsStalledConnectionAttempt_Impl, versionString, firstConnectionDelayMs.ToString(), requestTimeoutMs.ToString(), options).Dispose(); + } + + private static async Task CancelPendingRequest_DropsStalledConnectionAttempt_Impl(string versionString, string firstConnectionDelayMsString, string requestTimeoutMsString) + { + var version = Version.Parse(versionString); + LoopbackServerFactory factory = GetFactoryForVersion(version); + + const int AttemptCount = 3; + int firstConnectionDelayMs = int.Parse(firstConnectionDelayMsString); + int requestTimeoutMs = int.Parse(requestTimeoutMsString); + bool firstConnection = true; + + using CancellationTokenSource cts0 = new CancellationTokenSource(requestTimeoutMs); + + await factory.CreateClientAndServerAsync(async uri => + { + using var handler = CreateHttpClientHandler(version); + GetUnderlyingSocketsHttpHandler(handler).ConnectCallback = DoConnect; + using var client = new HttpClient(handler) { DefaultRequestVersion = version }; + + await Assert.ThrowsAnyAsync(async () => + { + await client.GetAsync(uri, cts0.Token); + }); + + for (int i = 0; i < AttemptCount; i++) + { + using var cts1 = new CancellationTokenSource(requestTimeoutMs); + using var response = await client.GetAsync(uri, cts1.Token); + Assert.Equal(HttpStatusCode.OK, response.StatusCode); + } + }, async server => + { + await server.AcceptConnectionAsync(async connection => + { + for (int i = 0; i < AttemptCount; i++) + { + await connection.ReadRequestDataAsync(); + await connection.SendResponseAsync(); + connection.CompleteRequestProcessing(); + } + }); + }); + + async ValueTask DoConnect(SocketsHttpConnectionContext ctx, CancellationToken cancellationToken) + { + if (firstConnection) + { + firstConnection = false; + await Task.Delay(100, cancellationToken); // Wait for the request to be pushed to the queue + cts0.Cancel(); // cancel the first request faster than RequestTimeoutMs + await Task.Delay(firstConnectionDelayMs, cancellationToken); // Simulate stalled connection + } + var s = new Socket(SocketType.Stream, ProtocolType.Tcp) { NoDelay = true }; + await s.ConnectAsync(ctx.DnsEndPoint, cancellationToken); + + return new NetworkStream(s, ownsSocket: true); + } + } + + [OuterLoop("Incurs significant delay.")] + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData(20_000)] + [InlineData(Timeout.Infinite)] + public void PendingConnectionTimeout_HighValue_PendingConnectionIsNotCancelled(int timeout) + { + RemoteExecutor.Invoke(async timoutStr => + { + // Setup "infinite" timeout of int.MaxValue milliseconds + AppContext.SetData("System.Net.SocketsHttpHandler.PendingConnectionTimeoutOnRequestCompletion", int.Parse(timoutStr)); + + bool connected = false; + CancellationTokenSource cts = new CancellationTokenSource(); + + await new Http11LoopbackServerFactory().CreateClientAndServerAsync(async uri => + { + using var handler = CreateHttpClientHandler(HttpVersion.Version11); + GetUnderlyingSocketsHttpHandler(handler).ConnectCallback = DoConnect; + using var client = new HttpClient(handler) { DefaultRequestVersion = HttpVersion.Version11 }; + + await Assert.ThrowsAnyAsync(() => client.GetAsync(uri, cts.Token)); + }, + async server => { + await server.AcceptConnectionAsync(_ => Task.CompletedTask).WaitAsync(30_000); + }); + + async ValueTask DoConnect(SocketsHttpConnectionContext ctx, CancellationToken cancellationToken) + { + var s = new Socket(SocketType.Stream, ProtocolType.Tcp) { NoDelay = true }; + await Task.Delay(100, cancellationToken); // Wait for the request to be pushed to the queue + cts.Cancel(); + + await Task.Delay(10_000, cancellationToken); + await s.ConnectAsync(ctx.DnsEndPoint, cancellationToken); + connected = true; + return new NetworkStream(s, ownsSocket: true); + } + + Assert.True(connected); + }, timeout.ToString()).Dispose(); + } + } +} diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/System.Net.Http.Functional.Tests.csproj b/src/libraries/System.Net.Http/tests/FunctionalTests/System.Net.Http.Functional.Tests.csproj index 57b3d2065fce4..6c05b3cc0f35b 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/System.Net.Http.Functional.Tests.csproj +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/System.Net.Http.Functional.Tests.csproj @@ -1,4 +1,4 @@ - + ../../src/Resources/Strings.resx $(DefineConstants);SYSNETHTTP_NO_OPENSSL;HTTP3 @@ -159,6 +159,7 @@ Link="Common\TestUtilities\System\DisableParallelization.cs" /> +