Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PlaintextStreamFilter for sync requests #45388

Merged
merged 2 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/libraries/System.Net.Http/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -591,12 +591,6 @@
<data name="net_http_requested_version_server_refused" xml:space="preserve">
<value>Requesting HTTP version {0} with version policy {1} while server offers only version fallback.</value>
</data>
<data name="net_http_sync_operations_not_allowed_with_connect_callback" xml:space="preserve">
<value>Synchronous operation is not supported when a ConnectCallback is specified on the SocketsHttpHandler instance.</value>
</data>
<data name="net_http_sync_operations_not_allowed_with_plaintext_filter" xml:space="preserve">
<value>Synchronous operation is not supported when a PlaintextStreamFilter is specified on the SocketsHttpHandler instance.</value>
</data>
<data name="net_http_exception_during_plaintext_filter" xml:space="preserve">
<value>An exception occurred while invoking the PlaintextStreamFilter.</value>
</data>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,8 @@ internal static bool IsGloballyEnabled()

// SendAsyncCore returns already completed ValueTask for when async: false is passed.
// Internally, it calls the synchronous Send method of the base class.
protected internal override HttpResponseMessage Send(HttpRequestMessage request,
CancellationToken cancellationToken)
{
ValueTask<HttpResponseMessage> sendTask = SendAsyncCore(request, async: false, cancellationToken);
Debug.Assert(sendTask.IsCompleted);
return sendTask.GetAwaiter().GetResult();
}
protected internal override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) =>
SendAsyncCore(request, async: false, cancellationToken).AsTask().GetAwaiter().GetResult();

protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) =>
SendAsyncCore(request, async: true, cancellationToken).AsTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1835,6 +1835,7 @@ private enum SettingId : ushort

public sealed override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken)
{
Debug.Assert(async);
if (NetEventSource.Log.IsEnabled()) Trace($"{request}");

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ private void CheckForShutdown()

public override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken)
{
Debug.Assert(async);

// Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet.

TaskCompletionSourceWithCancellation<bool>? waitForAvailableStreamTcs = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,26 @@ public async Task<HttpResponseMessage> SendAsyncCore(HttpRequestMessage request,
if (t != null)
{
// Handle the pre-emptive read. For the async==false case, hopefully the read has
// already completed and this will be a nop, but if it hasn't, we're forced to block
// already completed and this will be a nop, but if it hasn't, the caller will be forced to block
// waiting for the async operation to complete. We will only hit this case for proxied HTTPS
// requests that use a pooled connection, as in that case we don't have a Socket we
// can poll and are forced to issue an async read.
ValueTask<int> vt = t.GetValueOrDefault();
int bytesRead =
vt.IsCompletedSuccessfully ? vt.Result :
async ? await vt.ConfigureAwait(false) :
vt.AsTask().GetAwaiter().GetResult();
int bytesRead;
if (vt.IsCompleted)
{
bytesRead = vt.Result;
}
else
{
if (!async)
{
Trace($"Pre-emptive read completed asynchronously for a synchronous request.");
}

bytesRead = await vt.ConfigureAwait(false);
}

if (NetEventSource.Log.IsEnabled()) Trace($"Received {bytesRead} bytes.");

if (bytesRead == 0)
Expand Down Expand Up @@ -635,14 +646,7 @@ public async Task<HttpResponseMessage> SendAsyncCore(HttpRequestMessage request,
{
Task sendTask = sendRequestContentTask;
sendRequestContentTask = null;
if (async)
{
await sendTask.ConfigureAwait(false);
}
else
{
sendTask.GetAwaiter().GetResult();
}
await sendTask.ConfigureAwait(false);
}

// Now we are sure that the request was fully sent.
Expand Down Expand Up @@ -735,15 +739,7 @@ public async Task<HttpResponseMessage> SendAsyncCore(HttpRequestMessage request,
{
try
{
if (async)
{
await sendRequestContentTask.ConfigureAwait(false);
}
else
{
// No way around it here if we want to get the exception from the task.
sendRequestContentTask.GetAwaiter().GetResult();
}
await sendRequestContentTask.ConfigureAwait(false);
}
// Map the exception the same way as we normally do.
catch (Exception ex) when (MapSendException(ex, cancellationToken, out Exception mappedEx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,33 +486,23 @@ public byte[] Http2AltSvcOriginUri
conn.Dispose();
}

// We are at the connection limit. Wait for an available connection or connection count (indicated by null).
if (NetEventSource.Log.IsEnabled()) Trace("Connection limit reached, waiting for available connection.");
// We are at the connection limit. Wait for an available connection or connection count.
if (NetEventSource.Log.IsEnabled()) Trace($"{(async ? "As" : "S")}ynchronous request. Connection limit reached, waiting for available connection.");

if (HttpTelemetry.Log.IsEnabled())
{
return WaitOnWaiterWithTelemetryAsync(waiter, async, cancellationToken);
}
else
{
return async ?
waiter.WaitWithCancellationAsync(cancellationToken) :
new ValueTask<HttpConnection?>(waiter.Task.GetAwaiter().GetResult());
return waiter.WaitWithCancellationAsync(cancellationToken);
}

static async ValueTask<HttpConnection?> WaitOnWaiterWithTelemetryAsync(TaskCompletionSourceWithCancellation<HttpConnection?> waiter, bool async, CancellationToken cancellationToken)
{
ValueStopwatch stopwatch = ValueStopwatch.StartNew();
HttpConnection? connection;

if (async)
{
connection = await waiter.WaitWithCancellationAsync(cancellationToken).ConfigureAwait(false);
}
else
{
connection = waiter.Task.GetAwaiter().GetResult();
}
HttpConnection? connection = await waiter.WaitWithCancellationAsync(cancellationToken).ConfigureAwait(false);

HttpTelemetry.Log.Http11RequestLeftQueue(stopwatch.GetElapsedTime().TotalMilliseconds);
return connection;
Expand Down Expand Up @@ -615,7 +605,7 @@ public byte[] Http2AltSvcOriginUri

if (_kind == HttpConnectionKind.Http)
{
http2Connection = await ConstructHttp2Connection(stream, request, cancellationToken).ConfigureAwait(false);
http2Connection = await ConstructHttp2ConnectionAsync(stream, request, cancellationToken).ConfigureAwait(false);

if (NetEventSource.Log.IsEnabled())
{
Expand All @@ -637,7 +627,7 @@ public byte[] Http2AltSvcOriginUri
throw new HttpRequestException(SR.Format(SR.net_ssl_http2_requires_tls12, sslStream.SslProtocol));
}

http2Connection = await ConstructHttp2Connection(stream, request, cancellationToken).ConfigureAwait(false);
http2Connection = await ConstructHttp2ConnectionAsync(stream, request, cancellationToken).ConfigureAwait(false);

if (NetEventSource.Log.IsEnabled())
{
Expand Down Expand Up @@ -691,7 +681,7 @@ public byte[] Http2AltSvcOriginUri

if (canUse)
{
return (await ConstructHttp11Connection(stream!, transportContext, request, cancellationToken).ConfigureAwait(false), true, null);
return (await ConstructHttp11ConnectionAsync(async, stream!, transportContext, request, cancellationToken).ConfigureAwait(false), true, null);
}
else
{
Expand Down Expand Up @@ -1254,7 +1244,7 @@ public ValueTask<HttpResponseMessage> SendAsync(HttpRequestMessage request, bool
case HttpConnectionKind.ProxyTunnel:
case HttpConnectionKind.SslProxyTunnel:
HttpResponseMessage? response;
(stream, response) = await EstablishProxyTunnel(async, request.HasHeaders ? request.Headers : null, cancellationToken).ConfigureAwait(false);
(stream, response) = await EstablishProxyTunnelAsync(async, request.HasHeaders ? request.Headers : null, cancellationToken).ConfigureAwait(false);
if (response != null)
{
// Return non-success response from proxy.
Expand Down Expand Up @@ -1295,22 +1285,16 @@ private async ValueTask<Stream> ConnectToTcpHostAsync(string host, int port, Htt
{
ValueTask<Stream> streamTask = Settings._connectCallback(new SocketsHttpConnectionContext(endPoint, initialRequest), cancellationToken);

Stream stream;
if (async || streamTask.IsCompleted)
{
stream = await streamTask.ConfigureAwait(false);
}
else
if (!async && !streamTask.IsCompleted)
{
// User-provided ConnectCallback is completing asynchronously but the user is making a synchronous request; if the user cares, they should
// set it up so that synchronous requests are made on a handler with a synchronously-completing ConnectCallback supplied. If in the future,
// we could add a Boolean to SocketsHttpConnectionContext (https://github.com/dotnet/runtime/issues/44876) to let the callback know whether
// this request is sync or async. For now, log it and block.
// this request is sync or async.
Trace($"{nameof(SocketsHttpHandler.ConnectCallback)} completing asynchronously for a synchronous request.");
stream = streamTask.AsTask().GetAwaiter().GetResult();
}

return stream ?? throw new HttpRequestException(SR.net_http_null_from_connect_callback);
return await streamTask.ConfigureAwait(false) ?? throw new HttpRequestException(SR.net_http_null_from_connect_callback);
}
else
{
Expand Down Expand Up @@ -1351,7 +1335,7 @@ private async ValueTask<Stream> ConnectToTcpHostAsync(string host, int port, Htt
return (null, failureResponse);
}

return (await ConstructHttp11Connection(stream!, transportContext, request, cancellationToken).ConfigureAwait(false), null);
return (await ConstructHttp11ConnectionAsync(async, stream!, transportContext, request, cancellationToken).ConfigureAwait(false), null);
}

private SslClientAuthenticationOptions GetSslOptionsForRequest(HttpRequestMessage request)
Expand All @@ -1371,7 +1355,7 @@ private SslClientAuthenticationOptions GetSslOptionsForRequest(HttpRequestMessag
return _sslOptionsHttp11!;
}

private async ValueTask<Stream> ApplyPlaintextFilter(Stream stream, Version httpVersion, HttpRequestMessage request, CancellationToken cancellationToken)
private async ValueTask<Stream> ApplyPlaintextFilterAsync(bool async, Stream stream, Version httpVersion, HttpRequestMessage request, CancellationToken cancellationToken)
{
if (Settings._plaintextStreamFilter is null)
{
Expand All @@ -1381,7 +1365,18 @@ private async ValueTask<Stream> ApplyPlaintextFilter(Stream stream, Version http
Stream newStream;
try
{
newStream = await Settings._plaintextStreamFilter(new SocketsHttpPlaintextStreamFilterContext(stream, httpVersion, request), cancellationToken).ConfigureAwait(false);
ValueTask<Stream> streamTask = Settings._plaintextStreamFilter(new SocketsHttpPlaintextStreamFilterContext(stream, httpVersion, request), cancellationToken);

if (!async && !streamTask.IsCompleted)
{
// User-provided PlaintextStreamFilter is completing asynchronously but the user is making a synchronous request; if the user cares, they should
// set it up so that synchronous requests are made on a handler with a synchronously-completing PlaintextStreamFilter supplied. If in the future,
// we could add a Boolean to SocketsHttpPlaintextStreamFilterContext (https://github.com/dotnet/runtime/issues/44876) to let the callback know whether
// this request is sync or async.
Trace($"{nameof(SocketsHttpHandler.PlaintextStreamFilter)} completing asynchronously for a synchronous request.");
}

newStream = await streamTask.ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -1398,15 +1393,15 @@ private async ValueTask<Stream> ApplyPlaintextFilter(Stream stream, Version http
return newStream;
}

private async ValueTask<HttpConnection> ConstructHttp11Connection(Stream stream, TransportContext? transportContext, HttpRequestMessage request, CancellationToken cancellationToken)
private async ValueTask<HttpConnection> ConstructHttp11ConnectionAsync(bool async, Stream stream, TransportContext? transportContext, HttpRequestMessage request, CancellationToken cancellationToken)
{
stream = await ApplyPlaintextFilter(stream, HttpVersion.Version11, request, cancellationToken).ConfigureAwait(false);
stream = await ApplyPlaintextFilterAsync(async, stream, HttpVersion.Version11, request, cancellationToken).ConfigureAwait(false);
return new HttpConnection(this, stream, transportContext);
}

private async ValueTask<Http2Connection> ConstructHttp2Connection(Stream stream, HttpRequestMessage request, CancellationToken cancellationToken)
private async ValueTask<Http2Connection> ConstructHttp2ConnectionAsync(Stream stream, HttpRequestMessage request, CancellationToken cancellationToken)
{
stream = await ApplyPlaintextFilter(stream, HttpVersion.Version20, request, cancellationToken).ConfigureAwait(false);
stream = await ApplyPlaintextFilterAsync(async: true, stream, HttpVersion.Version20, request, cancellationToken).ConfigureAwait(false);

Http2Connection http2Connection = new Http2Connection(this, stream);
await http2Connection.SetupAsync().ConfigureAwait(false);
Expand All @@ -1418,7 +1413,7 @@ private async ValueTask<Http2Connection> ConstructHttp2Connection(Stream stream,


// Returns the established stream or an HttpResponseMessage from the proxy indicating failure.
private async ValueTask<(Stream?, HttpResponseMessage?)> EstablishProxyTunnel(bool async, HttpRequestHeaders? headers, CancellationToken cancellationToken)
private async ValueTask<(Stream?, HttpResponseMessage?)> EstablishProxyTunnelAsync(bool async, HttpRequestHeaders? headers, CancellationToken cancellationToken)
{
Debug.Assert(_originAuthority != null);
// Send a CONNECT request to the proxy server to establish a tunnel.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -12,9 +11,10 @@ internal abstract class HttpMessageHandlerStage : HttpMessageHandler
protected internal sealed override HttpResponseMessage Send(HttpRequestMessage request,
CancellationToken cancellationToken)
{
ValueTask<HttpResponseMessage> sendTask = SendAsync(request, async:false, cancellationToken);
Debug.Assert(sendTask.IsCompleted);
return sendTask.GetAwaiter().GetResult();
ValueTask<HttpResponseMessage> sendTask = SendAsync(request, async: false, cancellationToken);
return sendTask.IsCompleted ?
sendTask.Result :
sendTask.AsTask().GetAwaiter().GetResult();
}

protected internal sealed override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,11 +513,6 @@ protected internal override HttpResponseMessage Send(HttpRequestMessage request,
CheckDisposed();
HttpMessageHandlerStage handler = _handler ?? SetupHandlerChain();

if (_settings._plaintextStreamFilter is not null)
{
throw new NotSupportedException(SR.net_http_sync_operations_not_allowed_with_plaintext_filter);
}

Exception? error = ValidateAndNormalizeRequest(request);
if (error != null)
{
Expand Down
Loading