From 3156272a480db1ee0faa80555695757c702a2dcd Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 Dec 2022 07:37:07 -0600 Subject: [PATCH] Akka:Streams Resolve `IAsyncEnumerator.DisposeAsync` bug (#6290) * Upgraded Akka.Streams and Akka.Streams.Tests to C# 9 * added reproduction for #6280 * close #6280 * added compiler directive back to fix compilation issues on Linux * added comment * bump CI --- .../Akka.Streams.Tests.csproj | 1 + .../Dsl/AsyncEnumerableSpec.cs | 84 +++++++++++++------ src/core/Akka.Streams/Akka.Streams.csproj | 1 + .../Akka.Streams/Implementation/Fusing/Ops.cs | 15 +--- 4 files changed, 63 insertions(+), 38 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj b/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj index b28f8f02ec2..4dc130b0292 100644 --- a/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj +++ b/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj @@ -5,6 +5,7 @@ Akka.Streams.Tests $(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion) + 9 diff --git a/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs index 10a1a0a83f7..30f44218417 100644 --- a/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/AsyncEnumerableSpec.cs @@ -17,22 +17,19 @@ using Xunit; using Xunit.Abstractions; using System.Collections.Generic; -using Akka.Actor; -using Akka.Streams.Actors; using Akka.Streams.TestKit.Tests; -using Akka.Streams.Tests.Actor; -using Reactive.Streams; using System.Runtime.CompilerServices; using Akka.Util; using FluentAssertions.Extensions; namespace Akka.Streams.Tests.Dsl { -#if NETCOREAPP +#if !NETFRAMEWORK // disabling this causes .NET Framework 4.7.2 builds to fail on Linux public class AsyncEnumerableSpec : AkkaSpec { private ActorMaterializer Materializer { get; } private ITestOutputHelper _helper; + public AsyncEnumerableSpec(ITestOutputHelper helper) : base( AkkaSpecConfig.WithFallback(StreamTestDefaultMailbox.DefaultConfig), helper) @@ -50,7 +47,7 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken() var cts = new CancellationTokenSource(); var token = cts.Token; - + var asyncEnumerable = Source.From(input).RunAsAsyncEnumerable(Materializer); var output = input.ToArray(); bool caught = false; @@ -65,7 +62,7 @@ public async Task RunAsAsyncEnumerable_Uses_CancellationToken() { caught = true; } - + caught.ShouldBeTrue(); } @@ -80,6 +77,7 @@ public async Task RunAsAsyncEnumerable_must_return_an_IAsyncEnumerableT_from_a_S (output[0] == a).ShouldBeTrue("Did not get elements in order!"); output = output.Skip(1).ToArray(); } + output.Length.ShouldBe(0, "Did not receive all elements!"); } @@ -94,14 +92,16 @@ public async Task RunAsAsyncEnumerable_must_allow_multiple_enumerations() (output[0] == a).ShouldBeTrue("Did not get elements in order!"); output = output.Skip(1).ToArray(); } + output.Length.ShouldBe(0, "Did not receive all elements!"); - + output = input.ToArray(); await foreach (var a in asyncEnumerable) { (output[0] == a).ShouldBeTrue("Did not get elements in order!"); output = output.Skip(1).ToArray(); } + output.Length.ShouldBe(0, "Did not receive all elements in second enumeration!!"); } @@ -112,7 +112,7 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination() var materializer = ActorMaterializer.Create(Sys); var probe = this.CreatePublisherProbe(); var task = Source.FromPublisher(probe).RunAsAsyncEnumerable(materializer); - + var a = Task.Run(async () => { await foreach (var notused in task) @@ -140,7 +140,7 @@ public async Task RunAsAsyncEnumerable_Throws_on_Abrupt_Stream_termination() thrown.ShouldBeTrue(); } - + [Fact] public async Task RunAsAsyncEnumerable_Throws_if_materializer_gone_before_Enumeration() { @@ -155,7 +155,7 @@ async Task ShouldThrow() { } } - + await Assert.ThrowsAsync(ShouldThrow); } @@ -187,7 +187,7 @@ public void AsyncEnumerableSource_Must_Process_All_Elements() subscription.Request(101); subscriber.ExpectNextN(Enumerable.Range(0, 100)); - + subscriber.ExpectComplete(); } @@ -206,7 +206,7 @@ public void AsyncEnumerableSource_Must_Process_Source_That_Immediately_Throws() subscriber.ExpectNextN(Enumerable.Range(0, 50)); var exception = subscriber.ExpectError(); - + // Exception should be automatically unrolled, this SHOULD NOT be AggregateException exception.Should().BeOfType(); exception.Message.Should().Be("BOOM!"); @@ -231,13 +231,54 @@ public async Task AsyncEnumerableSource_Must_Cancel_Running_Source_If_Downstream await WithinAsync(3.Seconds(), async () => latch.Value); } - private static async IAsyncEnumerable RangeAsync(int start, int count, + /// + /// Reproduction for https://github.com/akkadotnet/akka.net/issues/6280 + /// + [Fact] + public async Task AsyncEnumerableSource_BugFix6280() + { + async IAsyncEnumerable GenerateInts() + { + foreach (var i in Enumerable.Range(0, 100)) + { + if (i > 50) + await Task.Delay(1000); + yield return i; + } + } + + var source = Source.From(GenerateInts); + var subscriber = this.CreateManualSubscriberProbe(); + + await EventFilter.Warning().ExpectAsync(0, async () => + { + var mat = source + .WatchTermination(Keep.Right) + .ToMaterialized(Sink.FromSubscriber(subscriber), Keep.Left); + +#pragma warning disable CS4014 + var task = mat.Run(Materializer); +#pragma warning restore CS4014 + + var subscription = subscriber.ExpectSubscription(); + subscription.Request(50); + subscriber.ExpectNextN(Enumerable.Range(0, 50)); + subscription.Request(10); // the iterator is going to start delaying 1000ms per item here + subscription.Cancel(); + + + // The cancellation token inside the IAsyncEnumerable should be cancelled + await task; + }); + } + + private static async IAsyncEnumerable RangeAsync(int start, int count, [EnumeratorCancellation] CancellationToken token = default) { foreach (var i in Enumerable.Range(start, count)) { await Task.Delay(10, token); - if(token.IsCancellationRequested) + if (token.IsCancellationRequested) yield break; yield return i; } @@ -248,12 +289,12 @@ private static async IAsyncEnumerable ThrowingRangeAsync(int start, int cou { foreach (var i in Enumerable.Range(start, count)) { - if(token.IsCancellationRequested) + if (token.IsCancellationRequested) yield break; if (i == throwAt) throw new TestException("BOOM!"); - + yield return i; } } @@ -261,20 +302,15 @@ private static async IAsyncEnumerable ThrowingRangeAsync(int start, int cou private static async IAsyncEnumerable ProbeableRangeAsync(int start, int count, AtomicBoolean latch, [EnumeratorCancellation] CancellationToken token = default) { - token.Register(() => - { - latch.GetAndSet(true); - }); + token.Register(() => { latch.GetAndSet(true); }); foreach (var i in Enumerable.Range(start, count)) { - if(token.IsCancellationRequested) + if (token.IsCancellationRequested) yield break; yield return i; } } - } -#else #endif } \ No newline at end of file diff --git a/src/core/Akka.Streams/Akka.Streams.csproj b/src/core/Akka.Streams/Akka.Streams.csproj index c04783c315e..88bbd44ae8f 100644 --- a/src/core/Akka.Streams/Akka.Streams.csproj +++ b/src/core/Akka.Streams/Akka.Streams.csproj @@ -7,6 +7,7 @@ $(NetStandardLibVersion) $(AkkaPackageTags);reactive;stream true + 9 diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index 65ad8e9af8f..73982da81f2 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -3816,20 +3816,7 @@ public override void OnDownstreamFinish() { _completionCts.Cancel(); _completionCts.Dispose(); - - try - { - _enumerator.DisposeAsync().GetAwaiter().GetResult(); - } - catch (Exception ex) - { - Log.Warning(ex, "Failed to dispose IAsyncEnumerator asynchronously"); - } - finally - { - CompleteStage(); - base.OnDownstreamFinish(); - } + CompleteStage(); base.OnDownstreamFinish(); }