diff --git a/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj b/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj index e6000a2ad98..9e6636a8d65 100644 --- a/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj +++ b/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj @@ -5,6 +5,7 @@ Akka.Streams.Tests $(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion) 8.0 + 9 diff --git a/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs index 6f5f2b0552c..99adbcce088 100644 --- a/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs @@ -25,7 +25,7 @@ namespace Akka.Streams.Tests.Dsl { -#if NETCOREAPP +#if !NETFRAMEWORK // disabling this causes .NET Framework 4.7.2 builds to fail on Linux public class AsyncEnumerableSpec : AkkaSpec { private ActorMaterializer Materializer { get; } @@ -227,6 +227,47 @@ await this.AssertAllStagesStoppedAsync(async () => }, Materializer); } + /// + /// Reproduction for https://github.com/akkadotnet/akka.net/issues/6280 + /// + [Fact] + public async Task AsyncEnumerableSource_BugFix6280() + { + async IAsyncEnumerable GenerateInts() + { + foreach (var i in Enumerable.Range(0, 100)) + { + if (i > 50) + await Task.Delay(1000); + yield return i; + } + } + + var source = Source.From(GenerateInts); + var subscriber = this.CreateManualSubscriberProbe(); + + await EventFilter.Warning().ExpectAsync(0, async () => + { + var mat = source + .WatchTermination(Keep.Right) + .ToMaterialized(Sink.FromSubscriber(subscriber), Keep.Left); + +#pragma warning disable CS4014 + var task = mat.Run(Materializer); +#pragma warning restore CS4014 + + var subscription = await subscriber.ExpectSubscriptionAsync(); + subscription.Request(50); + subscriber.ExpectNextN(Enumerable.Range(0, 50)); + subscription.Request(10); // the iterator is going to start delaying 1000ms per item here + subscription.Cancel(); + + + // The cancellation token inside the IAsyncEnumerable should be cancelled + await task; + }); + } + private static async IAsyncEnumerable RangeAsync(int start, int count, [EnumeratorCancellation] CancellationToken token = default) { @@ -257,10 +298,7 @@ private static async IAsyncEnumerable ThrowingRangeAsync(int start, int cou private static async IAsyncEnumerable ProbeableRangeAsync(int start, int count, AtomicBoolean latch, [EnumeratorCancellation] CancellationToken token = default) { - token.Register(() => - { - latch.GetAndSet(true); - }); + token.Register(() => { latch.GetAndSet(true); }); foreach (var i in Enumerable.Range(start, count)) { if(token.IsCancellationRequested) @@ -269,8 +307,6 @@ private static async IAsyncEnumerable ProbeableRangeAsync(int start, int co yield return i; } } - } -#else #endif } diff --git a/src/core/Akka.Streams/Akka.Streams.csproj b/src/core/Akka.Streams/Akka.Streams.csproj index 13901dacd4f..3123a7665c8 100644 --- a/src/core/Akka.Streams/Akka.Streams.csproj +++ b/src/core/Akka.Streams/Akka.Streams.csproj @@ -7,7 +7,7 @@ $(NetStandardLibVersion);$(NetLibVersion) $(AkkaPackageTags);reactive;stream true - 8.0 + 9 diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index c2e6ad0c3ab..1d69d485b8d 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -3871,20 +3871,8 @@ public override void OnDownstreamFinish(Exception cause) { _completionCts.Cancel(); _completionCts.Dispose(); - - try - { - _enumerator.DisposeAsync().GetAwaiter().GetResult(); - } - catch (Exception ex) - { - Log.Warning(ex, "Failed to dispose IAsyncEnumerator asynchronously"); - } - finally - { - CompleteStage(); - base.OnDownstreamFinish(cause); - } + CompleteStage(); + base.OnDownstreamFinish(cause); } }