Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka:Streams Resolve IAsyncEnumerator.DisposeAsync bug #6290

Conversation

Aaronontheweb
Copy link
Member

Fixes #6280

Changes

We no longer attempt to dispose IAsyncEnumerators inside Source stages due to the reasons outlined here: #6280 (comment)

This prevents the log from being filled with NotSupportedException warnings from failed DisposeAsync operations.

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

@Aaronontheweb Aaronontheweb added logging akka-streams akka.net v1.4 Issues affecting Akka.NET v1.4 labels Dec 5, 2022
Copy link
Member Author

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed my changes

@@ -5,6 +5,7 @@
<PropertyGroup>
<AssemblyName>Akka.Streams.Tests</AssemblyName>
<TargetFrameworks>$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
<LangVersion>9</LangVersion>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this in order to natively support IAsyncEnumerable inside both Akka.Streams and its test suite. We can consider dropping the dependencies on the BCL NuGet package that pulls it in now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />

Is referenced in Akka.Streams - let me see if I can remove it in an additional commit.

foreach (var i in Enumerable.Range(0, 100))
{
if (i > 50)
await Task.Delay(1000);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is designed to force the ValueTask returned from IAsyncEnumerator.MoveNextAsync() to be still running at the moment we attempt to dispose the IAsyncEnumerator.

var source = Source.From(GenerateInts);
var subscriber = this.CreateManualSubscriberProbe<int>();

await EventFilter.Warning().ExpectAsync(0, async () =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion failed until I implemented my fix.

var subscription = subscriber.ExpectSubscription();
subscription.Request(50);
subscriber.ExpectNextN(Enumerable.Range(0, 50));
subscription.Request(10); // the iterator is going to start delaying 1000ms per item here
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Force the IAsyncEnumerable to emit the elements that start producing 1s delays.

subscription.Request(50);
subscriber.ExpectNextN(Enumerable.Range(0, 50));
subscription.Request(10); // the iterator is going to start delaying 1000ms per item here
subscription.Cancel();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And cancel immediately afterwards - this will trigger the exception before I made my changes.

@@ -3817,19 +3817,7 @@ public override void OnDownstreamFinish()
_completionCts.Cancel();
_completionCts.Dispose();

try
{
_enumerator.DisposeAsync().GetAwaiter().GetResult();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The right solution here is to not dispose the iterator - iteration should stop automatically as a result of the CancellationToken and the DisposeAsync method won't do anything in scenarios where we're still waiting on the Task anyway. GC should still clean this up afterwards.

The alternative, adding a bunch of accounting code to safely call DisposeAsync, is really infeasible since we can't pass the ValueTask around (because it's a struct) and only supports a single await-er - unless we want to wrap it in a Task each time.

None of our other stages which retain an IEnumerator<T> call Dispose on it, so I don't think it's an issue.

Copy link
Contributor

@Arkatufus Arkatufus Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the _completionCts.Token is passed into the _enumerator sink during construction, this will gracefully shuts down the _enumerable on cancellation.
We don't need to call DisposeAsync here.

@Aaronontheweb
Copy link
Member Author

No idea why AzDo isn't running....

Copy link
Contributor

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -3817,19 +3817,7 @@ public override void OnDownstreamFinish()
_completionCts.Cancel();
_completionCts.Dispose();

try
{
_enumerator.DisposeAsync().GetAwaiter().GetResult();
Copy link
Contributor

@Arkatufus Arkatufus Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the _completionCts.Token is passed into the _enumerator sink during construction, this will gracefully shuts down the _enumerable on cancellation.
We don't need to call DisposeAsync here.

@Arkatufus Arkatufus enabled auto-merge (squash) December 6, 2022 13:11
@Aaronontheweb Aaronontheweb merged commit 3156272 into akkadotnet:v1.4 Dec 6, 2022
@Aaronontheweb Aaronontheweb deleted the AkkaStreams-IAsyncEnumerable-disposal branch December 6, 2022 13:37
Arkatufus pushed a commit to Arkatufus/akka.net that referenced this pull request Dec 7, 2022
Fixes akkadotnet#6280

We no longer attempt to dispose IAsyncEnumerators inside Source stages due to the reasons outlined here: akkadotnet#6280 (comment)

This prevents the log from being filled with NotSupportedException warnings from failed DisposeAsync operations.

(cherry-picked from 3156272)
Aaronontheweb added a commit that referenced this pull request Dec 7, 2022
…` bug (#6296)

* Backports #6290

Fixes #6280

We no longer attempt to dispose IAsyncEnumerators inside Source stages due to the reasons outlined here: #6280 (comment)

This prevents the log from being filled with NotSupportedException warnings from failed DisposeAsync operations.

(cherry-picked from 3156272)

* post-merge fix

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
akka.net v1.4 Issues affecting Akka.NET v1.4 akka-streams logging
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants