Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed memory leak on websocket transport. #6476

Merged
merged 1 commit into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,16 @@ await _session.Protocol.SendErrorMessageAsync(
break;

case IResponseStream responseStream:
await foreach (var item in
responseStream.ReadResultsAsync().WithCancellation(ct))
await foreach (var item in responseStream.ReadResultsAsync().WithCancellation(ct))
{
await SendResultMessageAsync(item, ct);
try
{
await SendResultMessageAsync(item, ct);
}
finally
{
await item.DisposeAsync();
}
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,18 @@ public void Reset()
}

ref var mem = ref MemoryMarshal.GetReference(_buffer.AsSpan());
ref var end = ref Unsafe.Add(ref mem, _index);

for (var i = 0; i < _index; i++)
while (Unsafe.IsAddressLessThan(ref mem, ref end))
{
ref var element = ref Unsafe.Add(ref mem, i);

if (element is not null && !_policy.Return(element))
if (mem is not null && !_policy.Return(mem))
{
element = default;
mem = default;
}

mem = ref Unsafe.Add(ref mem, 1);
}

_index = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ private async Task ProcessResponseStreamAsync(
Stream outputStream,
CancellationToken ct)
{
await foreach (var queryResult in responseStream.ReadResultsAsync()
.WithCancellation(ct)
.ConfigureAwait(false))
await foreach (var queryResult in responseStream.ReadResultsAsync().WithCancellation(ct).ConfigureAwait(false))
{
// we do not need try-finally here because we dispose the semaphore in the parent
// method.
Expand All @@ -138,8 +136,7 @@ private async Task ProcessResponseStreamAsync(

try
{
await WriteNextMessageAsync(queryResult, outputStream, ct)
.ConfigureAwait(false);
await WriteNextMessageAsync(queryResult, outputStream, ct).ConfigureAwait(false);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private async IAsyncEnumerable<GraphQLResponse> SubscribeInternalAsync(
{
var request = new GraphQLHttpRequest(subgraphRequest, _config.EndpointUri);
using var response = await _client.SendAsync(request, cancellationToken).ConfigureAwait(false);

await foreach (var result in response.ReadAsResultStreamAsync(cancellationToken).ConfigureAwait(false))
{
yield return new GraphQLResponse(result);
Expand All @@ -54,4 +54,4 @@ public ValueTask DisposeAsync()
_client.Dispose();
return default;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static FusionGatewayBuilder AddFusionGatewayServer(
throw new ArgumentNullException(nameof(services));
}

services.AddTransient<IWebSocketConnectionFactory>(
services.TryAddSingleton<IWebSocketConnectionFactory>(
_ => new DefaultWebSocketConnectionFactory());
services.TryAddSingleton<IGraphQLClientFactory>(
sp => new DefaultHttpGraphQLClientFactory(
Expand Down
62 changes: 31 additions & 31 deletions src/HotChocolate/Fusion/test/Core.Tests/DemoIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ query GetUser {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -140,7 +140,7 @@ query GetUser {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -185,7 +185,7 @@ query GetUser {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand All @@ -200,11 +200,11 @@ query GetUser {
Assert.NotEmpty(result.ExpectQueryResult().Errors!);
}

[Fact(Skip = "The message order is not guaranteed, this needs to be fixed.")]
[Fact(Skip = "Fix stream order")]
public async Task Authors_And_Reviews_Subscription_OnNewReview()
{
// arrange
using var cts = new CancellationTokenSource(10_000);
using var cts = new CancellationTokenSource(500_10_000);
using var demoProject = await DemoProject.CreateAsync(cts.Token);

// act
Expand Down Expand Up @@ -237,7 +237,7 @@ subscription OnNewReview {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand All @@ -250,7 +250,7 @@ subscription OnNewReview {
await snapshot.MatchAsync(cts.Token);
}

[Fact(Skip = "The message order is not guaranteed, this needs to be fixed.")]
[Fact(Skip = "Fix stream order")]
public async Task Authors_And_Reviews_Subscription_OnNewReview_Two_Graphs()
{
// arrange
Expand Down Expand Up @@ -288,7 +288,7 @@ subscription OnNewReview {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -350,7 +350,7 @@ query GetUser {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -400,7 +400,7 @@ query ReformatIds {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -450,7 +450,7 @@ query ReformatIds {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -501,7 +501,7 @@ query GetUser {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -553,7 +553,7 @@ query TopProducts {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -606,7 +606,7 @@ query TopProducts {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -652,7 +652,7 @@ query TopProducts($first: Int!) {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -709,7 +709,7 @@ query Introspect {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -761,7 +761,7 @@ ... on User {
var id = idSerializer.Serialize("User", 1);

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -814,7 +814,7 @@ ... on User {
var id = idSerializer.Serialize("Review", 1);

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -867,7 +867,7 @@ ... on User {
var id = idSerializer.Serialize("Unknown", 1);

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -921,7 +921,7 @@ ... on User {
var id = idSerializer.Serialize("User", 1);

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1039,7 +1039,7 @@ query Introspect {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1090,7 +1090,7 @@ query ProductReviews(
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1143,7 +1143,7 @@ public async Task Forward_Nested_Variables_No_OpName()
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1200,7 +1200,7 @@ public async Task Forward_Nested_Variables_No_OpName_Two_RootSelections()
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1255,7 +1255,7 @@ ... on Product {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1312,7 +1312,7 @@ query ProductReviews(
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1373,7 +1373,7 @@ query Requires {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1433,7 +1433,7 @@ query Requires {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1499,7 +1499,7 @@ query Large {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1547,7 +1547,7 @@ query AfterNull($after: String) {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down Expand Up @@ -1603,7 +1603,7 @@ query TopProducts {
""");

// act
var result = await executor.ExecuteAsync(
await using var result = await executor.ExecuteAsync(
QueryRequestBuilder
.New()
.SetQuery(request)
Expand Down
21 changes: 14 additions & 7 deletions src/HotChocolate/Fusion/test/Core.Tests/TestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,22 @@ public static async Task CollectStreamSnapshotData(
await foreach (var item in result.ExpectResponseStream()
.ReadResultsAsync().WithCancellation(cancellationToken))
{
if (item.ContextData is not null &&
item.ContextData.TryGetValue("queryPlan", out var value) &&
value is QueryPlan queryPlan)
try
{
snapshot.Add(queryPlan, "QueryPlan");
snapshot.Add(queryPlan.Hash, "QueryPlan Hash");
}
if (item.ContextData is not null &&
item.ContextData.TryGetValue("queryPlan", out var value) &&
value is QueryPlan queryPlan)
{
snapshot.Add(queryPlan, "QueryPlan");
snapshot.Add(queryPlan.Hash, "QueryPlan Hash");
}

snapshot.Add(item, $"Result {++i}");
snapshot.Add(item, $"Result {++i}");
}
finally
{
await item.DisposeAsync();
}
}

snapshot.Add(SchemaFormatter.FormatAsString(fusionGraph), "Fusion Graph");
Expand Down
Loading