Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[H/3] Remove waiting on tasks from H3 request stream dispose #92410

Merged
merged 2 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ static int EncodeHttpInteger(long longToEncode, Span<byte> buffer)
}
}

public async Task<byte[]> ReadRequestBodyAsync()
public async Task<byte[]> ReadRequestBodyAsync(int minimumBytes = -1)
{
var buffer = new MemoryStream();

Expand All @@ -212,6 +212,10 @@ public async Task<byte[]> ReadRequestBodyAsync()
case null:
return buffer.ToArray();
}
if (minimumBytes >= 0 && buffer.Length >= minimumBytes)
{
return buffer.ToArray();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ internal sealed class Http3RequestStream : IHttpStreamHeadersHandler, IAsyncDisp
private TaskCompletionSource<bool>? _expect100ContinueCompletionSource; // True indicates we should send content (e.g. received 100 Continue).
private bool _disposed;
private readonly CancellationTokenSource _requestBodyCancellationSource;
private Task? _sendRequestTask; // Set with SendContentAsync, must be awaited before QuicStream.DisposeAsync();
private Task? _readResponseTask; // Set with ReadResponseAsync, must be awaited before QuicStream.DisposeAsync();

// Allocated when we receive a :status header.
private HttpResponseMessage? _response;
Expand Down Expand Up @@ -90,25 +88,9 @@ public void Dispose()
{
_disposed = true;
AbortStream();
// We aborted both sides, thus both task should unblock and should be finished before disposing the QuicStream.
WaitUnfinished(_sendRequestTask);
WaitUnfinished(_readResponseTask);
_stream.Dispose();
DisposeSyncHelper();
}

static void WaitUnfinished(Task? task)
{
if (task is not null && !task.IsCompleted)
{
try
{
task.GetAwaiter().GetResult();
}
catch // Exceptions from both tasks are logged via _connection.LogException() in case they're not awaited in SendAsync, so the exception can be ignored here.
{ }
}
}
}

private void RemoveFromConnectionIfDone()
Expand All @@ -125,25 +107,9 @@ public async ValueTask DisposeAsync()
{
_disposed = true;
AbortStream();
// We aborted both sides, thus both task should unblock and should be finished before disposing the QuicStream.
await AwaitUnfinished(_sendRequestTask).ConfigureAwait(false);
await AwaitUnfinished(_readResponseTask).ConfigureAwait(false);
await _stream.DisposeAsync().ConfigureAwait(false);
DisposeSyncHelper();
}

static async ValueTask AwaitUnfinished(Task? task)
{
if (task is not null && !task.IsCompleted)
{
try
{
await task.ConfigureAwait(false);
}
catch // Exceptions from both tasks are logged via _connection.LogException() in case they're not awaited in SendAsync, so the exception can be ignored here.
{ }
}
}
}

private void DisposeSyncHelper()
Expand Down Expand Up @@ -192,51 +158,52 @@ public async Task<HttpResponseMessage> SendAsync(CancellationToken cancellationT
await FlushSendBufferAsync(endStream: _request.Content == null, _requestBodyCancellationSource.Token).ConfigureAwait(false);
}

Task sendRequestTask;
if (_request.Content != null)
{
_sendRequestTask = SendContentAsync(_request.Content!, _requestBodyCancellationSource.Token);
sendRequestTask = SendContentAsync(_request.Content!, _requestBodyCancellationSource.Token);
}
else
{
_sendRequestTask = Task.CompletedTask;
sendRequestTask = Task.CompletedTask;
}

// In parallel, send content and read response.
// Depending on Expect 100 Continue usage, one will depend on the other making progress.
_readResponseTask = ReadResponseAsync(_requestBodyCancellationSource.Token);
Task readResponseTask = ReadResponseAsync(_requestBodyCancellationSource.Token);
bool sendContentObserved = false;

// If we're not doing duplex, wait for content to finish sending here.
// If we are doing duplex and have the unlikely event that it completes here, observe the result.
// See Http2Connection.SendAsync for a full comment on this logic -- it is identical behavior.
if (_sendRequestTask.IsCompleted ||
if (sendRequestTask.IsCompleted ||
_request.Content?.AllowDuplex != true ||
await Task.WhenAny(_sendRequestTask, _readResponseTask).ConfigureAwait(false) == _sendRequestTask ||
_sendRequestTask.IsCompleted)
await Task.WhenAny(sendRequestTask, readResponseTask).ConfigureAwait(false) == sendRequestTask ||
sendRequestTask.IsCompleted)
{
try
{
await _sendRequestTask.ConfigureAwait(false);
await sendRequestTask.ConfigureAwait(false);
sendContentObserved = true;
}
catch
{
// Exceptions will be bubbled up from _sendRequestTask here,
// which means the result of _readResponseTask won't be observed directly:
// Exceptions will be bubbled up from sendRequestTask here,
// which means the result of readResponseTask won't be observed directly:
// Do a background await to log any exceptions.
_connection.LogExceptions(_readResponseTask);
_connection.LogExceptions(readResponseTask);
throw;
}
}
else
{
// Duplex is being used, so we can't wait for content to finish sending.
// Do a background await to log any exceptions.
_connection.LogExceptions(_sendRequestTask);
_connection.LogExceptions(sendRequestTask);
}

// Wait for the response headers to be read.
await _readResponseTask.ConfigureAwait(false);
await readResponseTask.ConfigureAwait(false);

Debug.Assert(_response != null && _response.Content != null);
// Set our content stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,53 @@ public async Task RequestSentResponseDisposed_ThrowsOnServer()
await new[] { clientTask, serverTask }.WhenAllOrAnyFailed(20_000);
}

[Fact]
public async Task RequestStreamingResponseFinishes_ClientFinishes()
{
byte[] data = Encoding.UTF8.GetBytes(new string('a', 1024));

using Http3LoopbackServer server = CreateHttp3LoopbackServer();

Task serverTask = Task.Run(async () =>
{
await using Http3LoopbackConnection connection = (Http3LoopbackConnection)await server.EstablishGenericConnectionAsync();
await using Http3LoopbackStream stream = await connection.AcceptRequestStreamAsync();
HttpRequestData request = await stream.ReadRequestDataAsync(false);
var body = await stream.ReadRequestBodyAsync(data.Length);
await stream.SendResponseHeadersAsync();
await stream.SendResponseBodyAsync(body);
});

Task clientTask = Task.Run(async () =>
{
StreamingHttpContent requestContent = new StreamingHttpContent();
using HttpClient client = CreateHttpClient();
using HttpRequestMessage request = new()
{
Method = HttpMethod.Get,
RequestUri = server.Address,
Version = HttpVersion30,
VersionPolicy = HttpVersionPolicy.RequestVersionExact,
Content = requestContent
};

var responseTask = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);

var requestStream = await requestContent.GetStreamAsync();
await requestStream.FlushAsync();
await requestStream.WriteAsync(data);

var response = await responseTask;
response.EnsureSuccessStatusCode();
var body = await response.Content.ReadAsStringAsync();

await serverTask;
});

await new[] { clientTask, serverTask }.WhenAllOrAnyFailed(20_000);
}


[Fact]
public async Task RequestSendingResponseDisposed_ThrowsOnServer()
{
Expand Down
Loading