Skip to content

Commit

Permalink
Merged PR 42925: Merged PR 41476: Fix Http3 Pipe complete
Browse files Browse the repository at this point in the history
Don't complete the `PipeWriter` when it still might be used by Application code.

----
#### AI description  (iteration 1)
#### PR Classification
Bug fix

#### PR Summary
This pull request addresses a bug in the HTTP/3 implementation related to memory management and pipe completion.
- `Http3RequestTests.cs`: Added a new test to ensure memory is preserved when the connection closes.
- `Http3OutputProducer.cs`: Refactored `Dispose` method and added `Complete` method to properly handle pipe completion.
- `DiagnosticMemoryPool.cs`: Added `ContainsMemory` method to check if memory is still rented from the pool.
- `Http3Stream.cs`: Ensured `Complete` is called on `_http3Output` when the stream is closed.
- `Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.csproj`: Added `InternalsVisibleTo` for `Interop.FunctionalTests`.
  • Loading branch information
BrennanConroy authored and wtgodbe committed Sep 22, 2024
1 parent b77c79e commit ef65a77
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 10 deletions.
33 changes: 23 additions & 10 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3OutputProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,21 @@ public void StreamReset()
_dataWriteProcessingTask = ProcessDataWrites().Preserve();
}

public void Dispose()
// Called once Application code has exited
// Or on Dispose which also would occur after Application code finished
public void Complete()
{
lock (_dataWriterLock)
{
if (_disposed)
{
return;
}

_disposed = true;

Stop();

_pipeWriter.Complete();

if (_fakeMemoryOwner != null)
{
_fakeMemoryOwner.Dispose();
_fakeMemoryOwner = null;
}

if (_fakeMemory != null)
{
ArrayPool<byte>.Shared.Return(_fakeMemory);
Expand All @@ -95,6 +91,21 @@ public void Dispose()
}
}

public void Dispose()
{
lock (_dataWriterLock)
{
if (_disposed)
{
return;
}

_disposed = true;

Complete();
}
}

// In HTTP/1.x, this aborts the entire connection. For HTTP/3 we abort the stream.
void IHttpOutputAborter.Abort(ConnectionAbortedException abortReason, ConnectionEndReason reason)
{
Expand Down Expand Up @@ -288,7 +299,9 @@ public void Stop()

_streamCompleted = true;

_pipeWriter.Complete(new OperationCanceledException());
// Application code could be using this PipeWriter, we cancel the next (or in progress) flush so they can observe this Stop
// Additionally, _streamCompleted will cause any future PipeWriter operations to noop
_pipeWriter.CancelPendingFlush();
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ private void CompleteStream(bool errored)
TryClose();
}

_http3Output.Complete();

// Stream will be pooled after app completed.
// Wait to signal app completed after any potential aborts on the stream.
_appCompletedTaskSource.SetResult(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@
<InternalsVisibleTo Include="IIS.Http.FunctionalTests" />
<InternalsVisibleTo Include="IIS.LongTests" />
<InternalsVisibleTo Include="Microsoft.AspNetCore.Server.Kestrel.Tests" />
<InternalsVisibleTo Include="Interop.FunctionalTests" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Net;
Expand Down Expand Up @@ -1145,6 +1146,137 @@ public async Task POST_Bidirectional_LargeData_Cancellation_Error(HttpProtocols
}
}

internal class MemoryPoolFeature : IMemoryPoolFeature
{
public MemoryPool<byte> MemoryPool { get; set; }
}

[ConditionalTheory]
[MsQuicSupported]
[InlineData(HttpProtocols.Http3)]
[InlineData(HttpProtocols.Http2)]
public async Task ApplicationWriteWhenConnectionClosesPreservesMemory(HttpProtocols protocol)
{
// Arrange
var memoryPool = new DiagnosticMemoryPool(new PinnedBlockMemoryPool(), allowLateReturn: true);

var writingTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var cancelTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var completionTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

var builder = CreateHostBuilder(async context =>
{
try
{
var requestBody = context.Request.Body;

await context.Response.BodyWriter.FlushAsync();

// Test relies on Htt2Stream/Http3Stream aborting the token after stopping Http2OutputProducer/Http3OutputProducer
// It's very fragile but it is sort of a best effort test anyways
// Additionally, Http2 schedules it's stopping, so doesn't directly do anything to the PipeWriter when calling stop on Http2OutputProducer
context.RequestAborted.Register(() =>
{
cancelTcs.SetResult();
});

while (true)
{
var memory = context.Response.BodyWriter.GetMemory();

// Unblock client-side to close the connection
writingTcs.TrySetResult();

await cancelTcs.Task;

// Verify memory is still rented from the memory pool after the producer has been stopped
Assert.True(memoryPool.ContainsMemory(memory));

context.Response.BodyWriter.Advance(memory.Length);
var flushResult = await context.Response.BodyWriter.FlushAsync();

if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}

completionTcs.SetResult();
}
catch (Exception ex)
{
writingTcs.TrySetException(ex);
// Exceptions annoyingly don't show up on the client side when doing E2E + cancellation testing
// so we need to use a TCS to observe any unexpected errors
completionTcs.TrySetException(ex);
throw;
}
}, protocol: protocol,
configureKestrel: o =>
{
o.Listen(IPAddress.Parse("127.0.0.1"), 0, listenOptions =>
{
listenOptions.Protocols = protocol;
listenOptions.UseHttps(TestResources.GetTestCertificate()).Use(@delegate =>
{
// Connection middleware for Http/1.1 and Http/2
return (context) =>
{
// Set the memory pool used by the connection so we can observe if memory from the PipeWriter is still rented from the pool
context.Features.Set<IMemoryPoolFeature>(new MemoryPoolFeature() { MemoryPool = memoryPool });
return @delegate(context);
};
});

IMultiplexedConnectionBuilder multiplexedConnectionBuilder = listenOptions;
multiplexedConnectionBuilder.Use(@delegate =>
{
// Connection middleware for Http/3
return (context) =>
{
// Set the memory pool used by the connection so we can observe if memory from the PipeWriter is still rented from the pool
context.Features.Set<IMemoryPoolFeature>(new MemoryPoolFeature() { MemoryPool = memoryPool });
return @delegate(context);
};
});
});
});

var httpClientHandler = new HttpClientHandler();
httpClientHandler.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator;

using (var host = builder.Build())
using (var client = new HttpClient(httpClientHandler))
{
await host.StartAsync().DefaultTimeout();

var cts = new CancellationTokenSource();

var request = new HttpRequestMessage(HttpMethod.Post, $"https://127.0.0.1:{host.GetPort()}/");
request.Version = GetProtocol(protocol);
request.VersionPolicy = HttpVersionPolicy.RequestVersionExact;

// Act
var responseTask = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);

Logger.LogInformation("Client waiting for headers.");
var response = await responseTask.DefaultTimeout();
await writingTcs.Task;

Logger.LogInformation("Client canceled request.");
response.Dispose();

// Assert
await host.StopAsync().DefaultTimeout();

await completionTcs.Task;

memoryPool.Dispose();

await memoryPool.WhenAllBlocksReturnedAsync(TimeSpan.FromSeconds(15));
}
}

// Verify HTTP/2 and HTTP/3 match behavior
[ConditionalTheory]
[MsQuicSupported]
Expand Down
23 changes: 23 additions & 0 deletions src/Shared/Buffers.MemoryPool/DiagnosticMemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,27 @@ public async Task WhenAllBlocksReturnedAsync(TimeSpan timeout)

await task;
}

public bool ContainsMemory(Memory<byte> memory)
{
lock (_syncObj)
{
foreach (var block in _blocks)
{
unsafe
{
fixed (byte* inUseMemoryPtr = memory.Span)
fixed (byte* beginPooledMemoryPtr = block.Memory.Span)
{
byte* endPooledMemoryPtr = beginPooledMemoryPtr + block.Memory.Length;
if (inUseMemoryPtr >= beginPooledMemoryPtr && inUseMemoryPtr < endPooledMemoryPtr)
{
return true;
}
}
}
}
return false;
}
}
}

0 comments on commit ef65a77

Please sign in to comment.