From 2331f3da57d452479a2bea7d9b06dac4de9c445c Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 14 Jan 2021 23:59:16 +0100 Subject: [PATCH 01/25] Add initial implementation of Merge with some tests --- MoreLinq.Test/Async/MergeTest.cs | 162 ++++++++++++ MoreLinq.Test/Async/TestExtensions.cs | 60 +++++ MoreLinq.Test/Async/TestingAsyncSequence.cs | 88 +++++++ MoreLinq.Test/Async/WatchableEnumerator.cs | 56 ++++ .../Async/ExperimentalEnumerable.cs | 24 ++ MoreLinq/Experimental/Async/Merge.cs | 249 ++++++++++++++++++ MoreLinq/MoreLinq.csproj | 12 +- 7 files changed, 646 insertions(+), 5 deletions(-) create mode 100644 MoreLinq.Test/Async/MergeTest.cs create mode 100644 MoreLinq.Test/Async/TestExtensions.cs create mode 100644 MoreLinq.Test/Async/TestingAsyncSequence.cs create mode 100644 MoreLinq.Test/Async/WatchableEnumerator.cs create mode 100644 MoreLinq/Experimental/Async/ExperimentalEnumerable.cs create mode 100644 MoreLinq/Experimental/Async/Merge.cs diff --git a/MoreLinq.Test/Async/MergeTest.cs b/MoreLinq.Test/Async/MergeTest.cs new file mode 100644 index 000000000..f903ef8ee --- /dev/null +++ b/MoreLinq.Test/Async/MergeTest.cs @@ -0,0 +1,162 @@ +#region License and Terms +// MoreLINQ - Extensions to LINQ to Objects +// Copyright (c) 2020 Atif Aziz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#endregion + +#nullable enable + +namespace MoreLinq.Test.Async +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using Experimental.Async; + using NUnit.Framework; + + [TestFixture] + public class MergeTest + { + [TestCase(0)] + [TestCase(-1)] + [TestCase(-2)] + [TestCase(-3)] + public void InvalidMaxConcurrent(int n) + { + var sources = new IAsyncEnumerable[0]; + void Act() => sources.Merge(n); + AssertThrowsArgument.OutOfRangeException("maxConcurrent", Act); + } + + [Test] + public async Task MergeSync() + { + using var ts1 = AsyncEnumerable.Range(1, 1).AsTestingSequence(); + using var ts2 = AsyncEnumerable.Range(1, 2).AsTestingSequence(); + using var ts3 = AsyncEnumerable.Range(1, 3).AsTestingSequence(); + using var ts4 = AsyncEnumerable.Range(1, 4).AsTestingSequence(); + using var ts5 = AsyncEnumerable.Range(1, 5).AsTestingSequence(); + + using var ts = TestingSequence.Of(ts1, ts2, ts3, ts4, ts5); + var result = await ts.Merge().ToListAsync(); + + Assert.That(result, Is.EqualTo(new[] + { + 1, + 1, 2, + 1, 2, 3, + 1, 2, 3, 4, + 1, 2, 3, 4, 5, + })); + } + + [Test] + public async Task MergeAsyncAll() + { + using var ts1 = AsyncEnumerable.Range(10, 1).Yield().AsTestingSequence(); + using var ts2 = AsyncEnumerable.Range(20, 2).Yield().AsTestingSequence(); + using var ts3 = AsyncEnumerable.Range(30, 3).Yield().AsTestingSequence(); + using var ts4 = AsyncEnumerable.Range(40, 4).Yield().AsTestingSequence(); + using var ts5 = AsyncEnumerable.Range(50, 5).Yield().AsTestingSequence(); + + using var ts = TestingSequence.Of(ts1, ts2, ts3, ts4, ts5); + var result = await ts.Merge().ToListAsync(); + + Assert.That(result, Is.EquivalentTo(new[] + { + 10, + 20, 21, + 30, 31, 32, + 40, 41, 42, 43, + 50, 51, 52, 53, 54, + })); + } + + sealed class AsyncControl + { + record State(TaskCompletionSource TaskCompletionSource, T Result); + + State? _state; + + public Task Result(T result) + { + if (!(_state is null)) + throw new InvalidOperationException(); + _state = new State(new TaskCompletionSource(), result); + return _state.TaskCompletionSource.Task; + } + + public void Complete() + { + if (!(_state is {} state)) + throw new InvalidOperationException(); + _state = null; + state.TaskCompletionSource.SetResult(state.Result); + } + } + + [Test] + public async Task MergeAsyncSome() + { + var ac1 = new AsyncControl(); + var ac2 = new AsyncControl(); + + async IAsyncEnumerable Source1() + { + yield return await ac1.Result(1); + yield return 2; + yield return 3; + await ac1.Result(0); + }; + + async IAsyncEnumerable Source2() + { + yield return await ac2.Result(4); + } + + using var ts1 = Source1().AsTestingSequence(); + using var ts2 = Source2().AsTestingSequence(); + using var sources = TestingSequence.Of(ts1, ts2); + var e = sources.Merge().GetAsyncEnumerator(); + + async Task ExpectAsync(AsyncControl control) + { + var t = e.MoveNextAsync(); + Assert.That(t.IsCompleted, Is.False); + control.Complete(); + Assert.That(await t, Is.True); + return e.Current; + } + + async Task ExpectSync() + { + var t = e.MoveNextAsync(); + Assert.That(t.IsCompleted, Is.True); + Assert.That(await t, Is.True); + return e.Current; + } + + Assert.That(await ExpectAsync(ac2), Is.EqualTo(4)); + Assert.That(await ExpectAsync(ac1), Is.EqualTo(1)); + Assert.That(ts2.IsDisposed, Is.True); + Assert.That(await ExpectSync(), Is.EqualTo(2)); + Assert.That(await ExpectSync(), Is.EqualTo(3)); + + var t = e.MoveNextAsync(); + Assert.That(t.IsCompleted, Is.False); + ac1.Complete(); + Assert.That(await t, Is.False); + } + } +} diff --git a/MoreLinq.Test/Async/TestExtensions.cs b/MoreLinq.Test/Async/TestExtensions.cs new file mode 100644 index 000000000..0636fabaf --- /dev/null +++ b/MoreLinq.Test/Async/TestExtensions.cs @@ -0,0 +1,60 @@ +#region License and Terms +// MoreLINQ - Extensions to LINQ to Objects +// Copyright (c) 2020 Atif Aziz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#endregion + +namespace MoreLinq.Test.Async +{ + using System; + using System.Collections.Generic; + using System.Runtime.CompilerServices; + using System.Threading; + using System.Threading.Tasks; + + static partial class TestExtensions + { + public static IAsyncEnumerable Yield(this IAsyncEnumerable source) => + Yield(source, _ => true); + + public static IAsyncEnumerable + Yield(this IAsyncEnumerable source, + Func predicate) => + Yield(source, shouldEndSynchronously: false, predicate); + + public static IAsyncEnumerable + Yield(this IAsyncEnumerable source, + bool shouldEndSynchronously, + Func predicate) + { + if (source is null) throw new ArgumentNullException(nameof(source)); + if (predicate is null) throw new ArgumentNullException(nameof(predicate)); + + return Async(); + + async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancellationToken = default) + { + await foreach (var item in source.WithCancellation(cancellationToken)) + { + if (predicate(item)) + await Task.Yield(); + yield return item; + } + + if (!shouldEndSynchronously) + await Task.Yield(); + } + } + } +} diff --git a/MoreLinq.Test/Async/TestingAsyncSequence.cs b/MoreLinq.Test/Async/TestingAsyncSequence.cs new file mode 100644 index 000000000..a58f0d916 --- /dev/null +++ b/MoreLinq.Test/Async/TestingAsyncSequence.cs @@ -0,0 +1,88 @@ +#region License and Terms +// MoreLINQ - Extensions to LINQ to Objects +// Copyright (c) 2009 Atif Aziz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#endregion + +namespace MoreLinq.Test.Async +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using NUnit.Framework; + + static class TestingAsyncSequence + { + public static TestingAsyncSequence Of(params T[] elements) => + new TestingAsyncSequence(elements.ToAsyncEnumerable()); + + public static TestingAsyncSequence AsTestingSequence(this IAsyncEnumerable source) => + source is not null + ? new TestingAsyncSequence(source) + : throw new ArgumentNullException(nameof(source)); + } + + /// + /// Sequence that asserts whether its iterator has been disposed + /// when it is disposed itself and also whether GetEnumerator() is + /// called exactly once or not. + /// + sealed class TestingAsyncSequence : IAsyncEnumerable, IDisposable + { + bool? _disposed; + IAsyncEnumerable _source; + + internal TestingAsyncSequence(IAsyncEnumerable sequence) => + _source = sequence; + + public bool IsDisposed => _disposed == true; + public int MoveNextCallCount { get; private set; } + + void IDisposable.Dispose() => + AssertDisposed(); + + /// + /// Checks that the iterator was disposed, and then resets. + /// + void AssertDisposed() + { + if (_disposed is null) + return; + Assert.IsTrue(_disposed, "Expected sequence to be disposed."); + _disposed = null; + } + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + Assert.That(_source, Is.Not.Null, "LINQ operators should not enumerate a sequence more than once."); + var enumerator = _source.GetAsyncEnumerator(cancellationToken).AsWatchable(); + _disposed = false; + enumerator.Disposed += delegate + { + Assert.That(_disposed, Is.False, "LINQ operators should not dispose a sequence more than once."); + _disposed = true; + }; + var ended = false; + enumerator.MoveNextCalled += (_, moved) => + { + Assert.That(ended, Is.False, "LINQ operators should not continue iterating a sequence that has terminated."); + ended = !moved; + MoveNextCallCount++; + }; + _source = null; + return enumerator; + } + } +} diff --git a/MoreLinq.Test/Async/WatchableEnumerator.cs b/MoreLinq.Test/Async/WatchableEnumerator.cs new file mode 100644 index 000000000..5b62bf825 --- /dev/null +++ b/MoreLinq.Test/Async/WatchableEnumerator.cs @@ -0,0 +1,56 @@ +#region License and Terms +// MoreLINQ - Extensions to LINQ to Objects +// Copyright (c) 2008 Jonathan Skeet. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#endregion + +namespace MoreLinq.Test.Async +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + + partial class TestExtensions + { + public static WatchableEnumerator AsWatchable(this IAsyncEnumerator source) => + new WatchableEnumerator(source); + } + + sealed class WatchableEnumerator : IAsyncEnumerator + { + readonly IAsyncEnumerator _source; + + public event EventHandler Disposed; + public event EventHandler MoveNextCalled; + + public WatchableEnumerator(IAsyncEnumerator source) => + _source = source ?? throw new ArgumentNullException(nameof(source)); + + public T Current => _source.Current; + + public async ValueTask MoveNextAsync() + { + var moved = await _source.MoveNextAsync(); + MoveNextCalled?.Invoke(this, moved); + return moved; + } + + public ValueTask DisposeAsync() + { + _source.DisposeAsync(); + Disposed?.Invoke(this, EventArgs.Empty); + return new ValueTask(); + } + } +} diff --git a/MoreLinq/Experimental/Async/ExperimentalEnumerable.cs b/MoreLinq/Experimental/Async/ExperimentalEnumerable.cs new file mode 100644 index 000000000..542a35fb9 --- /dev/null +++ b/MoreLinq/Experimental/Async/ExperimentalEnumerable.cs @@ -0,0 +1,24 @@ +#if !NO_ASYNC_STREAMS + +namespace MoreLinq.Experimental.Async +{ + using System.Collections.Generic; + + /// + /// + /// Provides a set of static methods for querying objects that + /// implement . + /// + /// THE METHODS ARE EXPERIMENTAL. THEY MAY BE UNSTABLE AND + /// UNTESTED. THEY MAY BE REMOVED FROM A FUTURE MAJOR OR MINOR RELEASE AND + /// POSSIBLY WITHOUT NOTICE. USE THEM AT YOUR OWN RISK. THE METHODS ARE + /// PUBLISHED FOR FIELD EXPERIMENTATION TO SOLICIT FEEDBACK ON THEIR + /// UTILITY AND DESIGN/IMPLEMENTATION DEFECTS. + /// + + public static partial class ExperimentalEnumerable + { + } +} + +#endif // !NO_ASYNC_STREAMS diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs new file mode 100644 index 000000000..3bc8cf9f8 --- /dev/null +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -0,0 +1,249 @@ +#region License and Terms +// MoreLINQ - Extensions to LINQ to Objects +// Copyright (c) 2020 Atif Aziz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#endregion + +#if !NO_ASYNC_STREAMS + +namespace MoreLinq.Experimental.Async +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Runtime.CompilerServices; + using System.Threading; + using System.Threading.Tasks; + + partial class ExperimentalEnumerable + { + /// + /// Concurrently merges all the elements of multiple asynchronous + /// streams into a single asynchronous stream. + /// + /// + /// The type of the elements in . + /// The sequence of asynchronous streams. + /// + /// An asynchronous stream with all elements from all . + /// + /// + /// This operator uses deferred execution and streams its results. + /// + /// The elements in the resulting stream may appear in a different + /// order than their order in . + /// + /// When disposed part of the way, there is a best-effort attempt to + /// cancel all iterations that are in flight. This requires that all + /// asynchronous streams in properly + /// honour timely cancellation. + /// + + public static IAsyncEnumerable Merge(this IEnumerable> sources) => + Merge(sources, int.MaxValue); + + /// + /// Concurrently merges all the elements of multiple asynchronous + /// streams into a single asynchronous stream. + /// + /// + /// The type of the elements in . + /// The sequence of asynchronous streams. + /// + /// Maximum number of asynchronous operations to have in flight at any + /// given time. A value of 1 (or below) disables concurrency. + /// + /// An asynchronous stream with all elements from all . + /// + /// + /// + /// This operator uses deferred execution and streams its results. + /// + /// When is 2 or greater then the + /// elements in the resulting stream may appear in a different + /// order than their order in . + /// + /// When disposed part of the way, there is a best-effort attempt to + /// cancel all iterations that are in flight. This requires that all + /// asynchronous streams in properly + /// honour timely cancellation. + /// + + public static IAsyncEnumerable Merge(this IEnumerable> sources, + int maxConcurrent) + { + if (sources is null) throw new ArgumentNullException(nameof(sources)); + if (maxConcurrent <= 0) throw new ArgumentOutOfRangeException(nameof(maxConcurrent)); + + return Async(); + + async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancellationToken = default) + { + var thisCancellationTokenSource = new CancellationTokenSource(); + var cancellationTokenSource = cancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource(thisCancellationTokenSource.Token, cancellationToken) + : thisCancellationTokenSource; + cancellationToken = cancellationTokenSource.Token; + + var enumeratorList = new List>(); + + List)>>? pendingTaskList = null; + + try + { + var enumerators = + from source in sources + select source.GetAsyncEnumerator(cancellationToken); + + enumeratorList.AddRange(enumerators); + + pendingTaskList = new List)>>(); + + const bool some = true; + + async ValueTask<(bool, T)> ReadAsync(IAsyncEnumerator enumerator) + { + var task = enumerator.MoveNextAsync(); + + if (task.IsCompleted) + { + if (await task.ConfigureAwait(false)) + return (some, enumerator.Current); + + await enumerator.DisposeAsync().ConfigureAwait(false); + enumeratorList.Remove(enumerator); + } + else + { + pendingTaskList.Add(task.And(enumerator).AsTask()); + } + + return default; + } + + while (enumeratorList.Count > 0) + { + while (pendingTaskList.Count is { } ptc + && ptc < enumeratorList.Count + && ptc < maxConcurrent) + { + var i = pendingTaskList.Count; + var enumerator = enumeratorList[i]; + + while (await ReadAsync(enumerator).ConfigureAwait(false) + is (some, var item)) + { + yield return item; + } + } + + while (pendingTaskList.Count > 0) + { + var completedTask = await Task.WhenAny(pendingTaskList).ConfigureAwait(false); + var (moved, enumerator) = await completedTask.ConfigureAwait(false); + pendingTaskList.Remove(completedTask); + + if (moved) + { + yield return enumerator.Current; + + while (await ReadAsync(enumerator).ConfigureAwait(false) + is (some, var item)) + { + yield return item; + } + } + else + { + await enumerator.DisposeAsync().ConfigureAwait(false); + enumeratorList.Remove(enumerator); + break; + } + } + } + } + finally + { + // Signal cancellation to those in flight. Unfortunately, + // this relies on all iterators to honour the cancellation. + + thisCancellationTokenSource.Cancel(); + + // > The caller of an async-iterator method should only call + // > `DisposeAsync()` when the method completed or was suspended + // > by a `yield return`. + // + // Source: https://github.com/dotnet/roslyn/blob/0e7b657bf6c019ec8019dcbd4f833f0dda50a97d/docs/features/async-streams.md#disposal + // + // > The result of invoking `DisposeAsync` from states -1 or N is + // > unspecified. This compiler generates `throw new + // > NotSupportException()` for those cases. + // + // Source: https://github.com/dotnet/roslyn/blob/0e7b657bf6c019ec8019dcbd4f833f0dda50a97d/docs/features/async-streams.md#state-values-and-transitions + // + // As result, wait for all pending tasks to complete, irrespective + // of how they complete (successfully, faulted or canceled). The + // goal is that the iterator is in some defined stat where it is + // before disposing it otherwise it could throw "NotSupportedException". + + if (pendingTaskList is not null) + { + while (await Task.WhenAny(pendingTaskList) + .ConfigureAwait(false) is { } completedTask) + { + pendingTaskList.Remove(completedTask); + } + } + + List? disposalTasks = null; + + foreach (var enumerator in enumeratorList) + { + ValueTask task; + + try + { + task = enumerator.DisposeAsync(); + } + catch (NotSupportedException) + { + // Ignore just in case we hit an unspecified case; + // see quoted notes from Roslyn spec above. + + continue; + } + + if (task.IsCompleted) + { + await task.ConfigureAwait(false); + } + else + { + disposalTasks ??= new List(); + disposalTasks.Add(task.AsTask()); + } + } + + if (disposalTasks is {} && disposalTasks.Count > 0) + await Task.WhenAll(disposalTasks).ConfigureAwait(false); + } + } + } + + static async ValueTask<(T1, T2)> And(this ValueTask task, T2 second) => + (await task.ConfigureAwait(false), second); + } +} + +#endif // !NO_ASYNC_STREAMS diff --git a/MoreLinq/MoreLinq.csproj b/MoreLinq/MoreLinq.csproj index f958f8280..e0d5bac18 100644 --- a/MoreLinq/MoreLinq.csproj +++ b/MoreLinq/MoreLinq.csproj @@ -54,6 +54,7 @@ - LeftJoin - MaxBy - Memoize (EXPERIMENTAL) + - Merge (EXPERIMENTAL) - MinBy - Move - OrderBy @@ -119,8 +120,9 @@ en-US 3.3.2 MoreLINQ Developers. - net451;netstandard1.0;netstandard2.0 + net451;netstandard1.0;netstandard2.0;netstandard2.1 enable + $(DefineConstants);MORELINQ false @@ -187,12 +189,12 @@ - - $(DefineConstants);MORELINQ + + $(DefineConstants);NO_ASYNC_STREAMS - $(DefineConstants);MORELINQ;NO_SERIALIZATION_ATTRIBUTES;NO_EXCEPTION_SERIALIZATION;NO_TRACING;NO_COM;NO_ASYNC + $(DefineConstants);NO_ASYNC_STREAMS;NO_SERIALIZATION_ATTRIBUTES;NO_EXCEPTION_SERIALIZATION;NO_TRACING;NO_COM;NO_ASYNC @@ -284,4 +286,4 @@ - + \ No newline at end of file From 90e932610e309145049a7e8d1df289a3212d2247 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 15 Jan 2021 00:14:37 +0100 Subject: [PATCH 02/25] Make summaries of overloads distinct --- MoreLinq/Experimental/Async/Merge.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index 3bc8cf9f8..bcd3215b9 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -55,7 +55,9 @@ public static IAsyncEnumerable Merge(this IEnumerable> /// /// Concurrently merges all the elements of multiple asynchronous - /// streams into a single asynchronous stream. + /// streams into a single asynchronous stream. An additional parameter + /// specifies the maximum concurrent operations that may be in flight + /// at any give time. /// /// /// The type of the elements in . From 308d42fae2b15c195dd5cd8e7c6829671f5930a4 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 15 Jan 2021 00:32:48 +0100 Subject: [PATCH 03/25] Fix broken tests --- MoreLinq/Experimental/Async/Merge.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index bcd3215b9..c83c1dd8a 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -199,7 +199,7 @@ from source in sources // goal is that the iterator is in some defined stat where it is // before disposing it otherwise it could throw "NotSupportedException". - if (pendingTaskList is not null) + if (pendingTaskList is { Count: > 0 }) { while (await Task.WhenAny(pendingTaskList) .ConfigureAwait(false) is { } completedTask) From 2565c3359beb72f4d3e3b498184ec3c06387cc02 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 18 Jan 2021 11:06:29 +0100 Subject: [PATCH 04/25] Fold conditions into a single pattern --- MoreLinq/Experimental/Async/Merge.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index c83c1dd8a..bbecb7f91 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -237,7 +237,7 @@ from source in sources } } - if (disposalTasks is {} && disposalTasks.Count > 0) + if (disposalTasks is { Count: > 0 }) await Task.WhenAll(disposalTasks).ConfigureAwait(false); } } From 406d09ada991c74c5d59ae804800c53cad768c03 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 18 Jan 2021 18:28:22 +0100 Subject: [PATCH 05/25] Do not await for async disposals until finally --- MoreLinq.Test/Async/TestingAsyncSequence.cs | 37 ++++++++++++++++-- MoreLinq/Experimental/Async/Merge.cs | 42 +++++++++++---------- 2 files changed, 55 insertions(+), 24 deletions(-) diff --git a/MoreLinq.Test/Async/TestingAsyncSequence.cs b/MoreLinq.Test/Async/TestingAsyncSequence.cs index a58f0d916..78eea824c 100644 --- a/MoreLinq.Test/Async/TestingAsyncSequence.cs +++ b/MoreLinq.Test/Async/TestingAsyncSequence.cs @@ -15,11 +15,14 @@ // limitations under the License. #endregion +#nullable enable + namespace MoreLinq.Test.Async { using System; using System.Collections.Generic; using System.Linq; + using System.Runtime.ExceptionServices; using System.Threading; using NUnit.Framework; @@ -39,10 +42,12 @@ source is not null /// when it is disposed itself and also whether GetEnumerator() is /// called exactly once or not. /// + sealed class TestingAsyncSequence : IAsyncEnumerable, IDisposable { bool? _disposed; - IAsyncEnumerable _source; + IAsyncEnumerable? _source; + ExceptionDispatchInfo? _disposeErrorInfo; internal TestingAsyncSequence(IAsyncEnumerable sequence) => _source = sequence; @@ -56,24 +61,47 @@ void IDisposable.Dispose() => /// /// Checks that the iterator was disposed, and then resets. /// + void AssertDisposed() { + _disposeErrorInfo?.Throw(); + if (_disposed is null) return; + Assert.IsTrue(_disposed, "Expected sequence to be disposed."); _disposed = null; } public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { - Assert.That(_source, Is.Not.Null, "LINQ operators should not enumerate a sequence more than once."); - var enumerator = _source.GetAsyncEnumerator(cancellationToken).AsWatchable(); + Assert.That(_source, Is.Not.Null, + "LINQ operators should not enumerate a sequence more than once."); + + // Dammit (!) below is okay since we assert above it's not null. + var enumerator = _source!.GetAsyncEnumerator(cancellationToken).AsWatchable(); + _disposed = false; enumerator.Disposed += delegate { - Assert.That(_disposed, Is.False, "LINQ operators should not dispose a sequence more than once."); + // If the following assertion fails the capture the error + // and re-throw later during the disposal of the test + // sequence. This is done so because "DisposeAsync" is never + // expected to throw and could interfere with how an operator + // builds on that assumption. + + try + { + Assert.That(_disposed, Is.False, "LINQ operators should not dispose a sequence more than once."); + } + catch (AssertionException e) + { + _disposeErrorInfo = ExceptionDispatchInfo.Capture(e); + } + _disposed = true; }; + var ended = false; enumerator.MoveNextCalled += (_, moved) => { @@ -81,6 +109,7 @@ public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToke ended = !moved; MoveNextCallCount++; }; + _source = null; return enumerator; } diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index bbecb7f91..f03540dab 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -99,6 +99,7 @@ async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancel cancellationToken = cancellationTokenSource.Token; var enumeratorList = new List>(); + var disposalTaskList = (List?)null; List)>>? pendingTaskList = null; @@ -114,6 +115,17 @@ from source in sources const bool some = true; + ValueTask? DisposeAsync(IAsyncEnumerator enumerator) + { + enumeratorList.Remove(enumerator); + var disposalTask = enumerator.DisposeAsync(); + if (disposalTask.IsCompleted) + return disposalTask; + disposalTaskList ??= new List(); + disposalTaskList.Add(disposalTask.AsTask()); + return null; + } + async ValueTask<(bool, T)> ReadAsync(IAsyncEnumerator enumerator) { var task = enumerator.MoveNextAsync(); @@ -123,8 +135,8 @@ from source in sources if (await task.ConfigureAwait(false)) return (some, enumerator.Current); - await enumerator.DisposeAsync().ConfigureAwait(false); - enumeratorList.Remove(enumerator); + if (DisposeAsync(enumerator) is { IsCompleted: true } completedDisposalTask) + await completedDisposalTask.ConfigureAwait(false); } else { @@ -143,11 +155,8 @@ from source in sources var i = pendingTaskList.Count; var enumerator = enumeratorList[i]; - while (await ReadAsync(enumerator).ConfigureAwait(false) - is (some, var item)) - { + while (await ReadAsync(enumerator).ConfigureAwait(false) is (some, var item)) yield return item; - } } while (pendingTaskList.Count > 0) @@ -160,17 +169,12 @@ from source in sources { yield return enumerator.Current; - while (await ReadAsync(enumerator).ConfigureAwait(false) - is (some, var item)) - { + while (await ReadAsync(enumerator).ConfigureAwait(false) is (some, var item)) yield return item; - } } - else + else if (DisposeAsync(enumerator) is { IsCompleted: true } completedDisposalTask) { - await enumerator.DisposeAsync().ConfigureAwait(false); - enumeratorList.Remove(enumerator); - break; + await completedDisposalTask.ConfigureAwait(false); } } } @@ -208,8 +212,6 @@ from source in sources } } - List? disposalTasks = null; - foreach (var enumerator in enumeratorList) { ValueTask task; @@ -232,13 +234,13 @@ from source in sources } else { - disposalTasks ??= new List(); - disposalTasks.Add(task.AsTask()); + disposalTaskList ??= new List(); + disposalTaskList.Add(task.AsTask()); } } - if (disposalTasks is { Count: > 0 }) - await Task.WhenAll(disposalTasks).ConfigureAwait(false); + if (disposalTaskList is { Count: > 0 }) + await Task.WhenAll(disposalTaskList).ConfigureAwait(false); } } } From 72003a34c52fcdd0179b54cec59b97563b4642b4 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 18 Jan 2021 18:30:59 +0100 Subject: [PATCH 06/25] Fix copyright on WatchableEnumerator --- MoreLinq.Test/Async/WatchableEnumerator.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq.Test/Async/WatchableEnumerator.cs b/MoreLinq.Test/Async/WatchableEnumerator.cs index 5b62bf825..5b4e63c4c 100644 --- a/MoreLinq.Test/Async/WatchableEnumerator.cs +++ b/MoreLinq.Test/Async/WatchableEnumerator.cs @@ -1,6 +1,6 @@ #region License and Terms // MoreLINQ - Extensions to LINQ to Objects -// Copyright (c) 2008 Jonathan Skeet. All rights reserved. +// Copyright (c) 2021 Atif Aziz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 536bbb4c762e761f346fe39b7e326b847e7f9ea8 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 18 Jan 2021 18:32:07 +0100 Subject: [PATCH 07/25] Add nullability checks on new test support code --- MoreLinq.Test/Async/TestExtensions.cs | 2 ++ MoreLinq.Test/Async/WatchableEnumerator.cs | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/MoreLinq.Test/Async/TestExtensions.cs b/MoreLinq.Test/Async/TestExtensions.cs index 0636fabaf..60b290223 100644 --- a/MoreLinq.Test/Async/TestExtensions.cs +++ b/MoreLinq.Test/Async/TestExtensions.cs @@ -15,6 +15,8 @@ // limitations under the License. #endregion +#nullable enable + namespace MoreLinq.Test.Async { using System; diff --git a/MoreLinq.Test/Async/WatchableEnumerator.cs b/MoreLinq.Test/Async/WatchableEnumerator.cs index 5b4e63c4c..b83a2b74b 100644 --- a/MoreLinq.Test/Async/WatchableEnumerator.cs +++ b/MoreLinq.Test/Async/WatchableEnumerator.cs @@ -15,6 +15,8 @@ // limitations under the License. #endregion +#nullable enable + namespace MoreLinq.Test.Async { using System; @@ -31,8 +33,8 @@ sealed class WatchableEnumerator : IAsyncEnumerator { readonly IAsyncEnumerator _source; - public event EventHandler Disposed; - public event EventHandler MoveNextCalled; + public event EventHandler? Disposed; + public event EventHandler? MoveNextCalled; public WatchableEnumerator(IAsyncEnumerator source) => _source = source ?? throw new ArgumentNullException(nameof(source)); From ab74950c55377edf06b0bbdd3a4a4f65a8bd2c60 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 18 Jan 2021 21:04:15 +0100 Subject: [PATCH 08/25] Restore final new line project file --- MoreLinq/MoreLinq.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/MoreLinq.csproj b/MoreLinq/MoreLinq.csproj index e0d5bac18..4810d3934 100644 --- a/MoreLinq/MoreLinq.csproj +++ b/MoreLinq/MoreLinq.csproj @@ -286,4 +286,4 @@ - \ No newline at end of file + From f113adffffb120abc09382cc6ef5853f68de518f Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 18 Jan 2021 21:37:20 +0100 Subject: [PATCH 09/25] Expand LINQ query to for-loop (less costly) --- MoreLinq/Experimental/Async/Merge.cs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index f03540dab..41dcf5bcc 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -21,7 +21,6 @@ namespace MoreLinq.Experimental.Async { using System; using System.Collections.Generic; - using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -105,11 +104,8 @@ async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancel try { - var enumerators = - from source in sources - select source.GetAsyncEnumerator(cancellationToken); - - enumeratorList.AddRange(enumerators); + foreach (var source in sources) + enumeratorList.Add(source.GetAsyncEnumerator(cancellationToken)); pendingTaskList = new List)>>(); From 1687fba0d5a18b1132b74f08ddef185e5c43d6be Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 18 Jan 2021 21:38:40 +0100 Subject: [PATCH 10/25] Consistently use "var" --- MoreLinq/Experimental/Async/Merge.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index 41dcf5bcc..3841e0ba3 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -99,8 +99,7 @@ async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancel var enumeratorList = new List>(); var disposalTaskList = (List?)null; - - List)>>? pendingTaskList = null; + var pendingTaskList = (List)>>?)null; try { From af25dbd72f258ed73aa8d2d5eb4f52af04c2aa0c Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sun, 22 Jan 2023 13:29:46 +0100 Subject: [PATCH 11/25] Remove redundant nullable context directive --- MoreLinq.Test/Async/MergeTest.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/MoreLinq.Test/Async/MergeTest.cs b/MoreLinq.Test/Async/MergeTest.cs index 942354e3c..da06d4e47 100644 --- a/MoreLinq.Test/Async/MergeTest.cs +++ b/MoreLinq.Test/Async/MergeTest.cs @@ -15,8 +15,6 @@ // limitations under the License. #endregion -#nullable enable - namespace MoreLinq.Test.Async { using System; From f501609ee11f3c8ffae1a196c9a4ef2283ec645c Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sun, 22 Jan 2023 13:35:59 +0100 Subject: [PATCH 12/25] Use more readable not patterns --- MoreLinq.Test/Async/MergeTest.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/MoreLinq.Test/Async/MergeTest.cs b/MoreLinq.Test/Async/MergeTest.cs index da06d4e47..9c9925a59 100644 --- a/MoreLinq.Test/Async/MergeTest.cs +++ b/MoreLinq.Test/Async/MergeTest.cs @@ -90,7 +90,7 @@ sealed record State(TaskCompletionSource TaskCompletionSource, T Result); public Task Result(T result) { - if (!(_state is null)) + if (_state is not null) throw new InvalidOperationException(); _state = new State(new TaskCompletionSource(), result); return _state.TaskCompletionSource.Task; @@ -98,7 +98,7 @@ public Task Result(T result) public void Complete() { - if (!(_state is {} state)) + if (_state is not { } state) throw new InvalidOperationException(); _state = null; state.TaskCompletionSource.SetResult(state.Result); From aca18c21227a7aadcb69e2f432734a2ff8d6cc51 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sun, 22 Jan 2023 13:36:20 +0100 Subject: [PATCH 13/25] Replace for-loop with LINQ query syntax --- MoreLinq/Experimental/Async/Merge.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index bc414576a..59f6c7ea0 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -21,6 +21,7 @@ namespace MoreLinq.Experimental.Async { using System; using System.Collections.Generic; + using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -103,8 +104,8 @@ async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancel try { - foreach (var source in sources) - enumeratorList.Add(source.GetAsyncEnumerator(cancellationToken)); + enumeratorList.AddRange(from s in sources + select s.GetAsyncEnumerator(cancellationToken)); pendingTaskList = new List)>>(); From 86b00a7f8e6f2c46c8876f426bff06cc2e0e072f Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sun, 22 Jan 2023 13:37:42 +0100 Subject: [PATCH 14/25] Use var pattern for pending task count --- MoreLinq/Experimental/Async/Merge.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index 59f6c7ea0..9921f17ee 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -144,7 +144,7 @@ async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancel while (enumeratorList.Count > 0) { - while (pendingTaskList.Count is { } ptc + while (pendingTaskList.Count is var ptc && ptc < enumeratorList.Count && ptc < maxConcurrent) { From f12a5d619f6b6a0eec72e01b2df8f35d060945f3 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sun, 22 Jan 2023 13:41:39 +0100 Subject: [PATCH 15/25] Add to read-me doc --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index e3ba59bb4..ece3287fc 100644 --- a/README.md +++ b/README.md @@ -755,6 +755,14 @@ Creates a sequence that lazily caches the source as it is iterated for the first time, reusing the cache thereafter for future re-iterations. If the source is already cached or buffered then it is returned verbatim. +### Merge + +Concurrently merges all the elements of multiple asynchronous streams into a +single asynchronous stream. An overload with an additional parameter specifies +the maximum concurrent operations that may be in flight at any give time. + +This method has 2 overloads. + ### TrySingle Returns the only element of a sequence that has just one element. If the From df0693cc504d683e2f35a164181ecbcfaf98e2c0 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sun, 22 Jan 2023 14:26:36 +0100 Subject: [PATCH 16/25] Remove redunatant compilation symbol --- MoreLinq/MoreLinq.csproj | 1 - 1 file changed, 1 deletion(-) diff --git a/MoreLinq/MoreLinq.csproj b/MoreLinq/MoreLinq.csproj index 0e9a19b3b..cf497f656 100644 --- a/MoreLinq/MoreLinq.csproj +++ b/MoreLinq/MoreLinq.csproj @@ -121,7 +121,6 @@ 3.3.2 MoreLINQ Developers. net462;netstandard1.0;netstandard2.0;netstandard2.1;net6.0 - $(DefineConstants);MORELINQ false From 926957b9c317b8770e2e1c834087321c23adeeac Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sun, 22 Jan 2023 14:43:37 +0100 Subject: [PATCH 17/25] Remove more redundant nullable context directives --- MoreLinq.Test/Async/TestExtensions.cs | 2 -- MoreLinq.Test/Async/TestingAsyncSequence.cs | 2 -- MoreLinq.Test/Async/WatchableEnumerator.cs | 2 -- 3 files changed, 6 deletions(-) diff --git a/MoreLinq.Test/Async/TestExtensions.cs b/MoreLinq.Test/Async/TestExtensions.cs index 60b290223..0636fabaf 100644 --- a/MoreLinq.Test/Async/TestExtensions.cs +++ b/MoreLinq.Test/Async/TestExtensions.cs @@ -15,8 +15,6 @@ // limitations under the License. #endregion -#nullable enable - namespace MoreLinq.Test.Async { using System; diff --git a/MoreLinq.Test/Async/TestingAsyncSequence.cs b/MoreLinq.Test/Async/TestingAsyncSequence.cs index 78eea824c..18657ad68 100644 --- a/MoreLinq.Test/Async/TestingAsyncSequence.cs +++ b/MoreLinq.Test/Async/TestingAsyncSequence.cs @@ -15,8 +15,6 @@ // limitations under the License. #endregion -#nullable enable - namespace MoreLinq.Test.Async { using System; diff --git a/MoreLinq.Test/Async/WatchableEnumerator.cs b/MoreLinq.Test/Async/WatchableEnumerator.cs index a357e2bbf..c1f2df74f 100644 --- a/MoreLinq.Test/Async/WatchableEnumerator.cs +++ b/MoreLinq.Test/Async/WatchableEnumerator.cs @@ -15,8 +15,6 @@ // limitations under the License. #endregion -#nullable enable - namespace MoreLinq.Test.Async { using System; From 973d01f377141da8295c5fde2b133e0b222420e0 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sun, 22 Jan 2023 14:44:08 +0100 Subject: [PATCH 18/25] Remove redundant type spec in new expression --- MoreLinq.Test/Async/TestingAsyncSequence.cs | 2 +- MoreLinq.Test/Async/WatchableEnumerator.cs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/MoreLinq.Test/Async/TestingAsyncSequence.cs b/MoreLinq.Test/Async/TestingAsyncSequence.cs index 18657ad68..e63f2227e 100644 --- a/MoreLinq.Test/Async/TestingAsyncSequence.cs +++ b/MoreLinq.Test/Async/TestingAsyncSequence.cs @@ -27,7 +27,7 @@ namespace MoreLinq.Test.Async static class TestingAsyncSequence { public static TestingAsyncSequence Of(params T[] elements) => - new TestingAsyncSequence(elements.ToAsyncEnumerable()); + new(elements.ToAsyncEnumerable()); public static TestingAsyncSequence AsTestingSequence(this IAsyncEnumerable source) => source is not null diff --git a/MoreLinq.Test/Async/WatchableEnumerator.cs b/MoreLinq.Test/Async/WatchableEnumerator.cs index c1f2df74f..0920b36e2 100644 --- a/MoreLinq.Test/Async/WatchableEnumerator.cs +++ b/MoreLinq.Test/Async/WatchableEnumerator.cs @@ -23,8 +23,7 @@ namespace MoreLinq.Test.Async partial class TestExtensions { - public static WatchableEnumerator AsWatchable(this IAsyncEnumerator source) => - new WatchableEnumerator(source); + public static WatchableEnumerator AsWatchable(this IAsyncEnumerator source) => new(source); } sealed class WatchableEnumerator : IAsyncEnumerator From 5b8ffd88429eb05ddcf433916db4aeb79da248aa Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sun, 22 Jan 2023 14:45:26 +0100 Subject: [PATCH 19/25] Widen comments wrapping --- MoreLinq/Experimental/Async/Merge.cs | 39 +++++++++++++--------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index 9921f17ee..9f6ed6bda 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -29,8 +29,8 @@ namespace MoreLinq.Experimental.Async partial class ExperimentalEnumerable { /// - /// Concurrently merges all the elements of multiple asynchronous - /// streams into a single asynchronous stream. + /// Concurrently merges all the elements of multiple asynchronous streams into a single + /// asynchronous stream. /// /// /// The type of the elements in . @@ -41,30 +41,28 @@ partial class ExperimentalEnumerable /// /// This operator uses deferred execution and streams its results. /// - /// The elements in the resulting stream may appear in a different - /// order than their order in . + /// The elements in the resulting stream may appear in a different order than their order in + /// . /// - /// When disposed part of the way, there is a best-effort attempt to - /// cancel all iterations that are in flight. This requires that all - /// asynchronous streams in properly - /// honour timely cancellation. + /// When disposed part of the way, there is a best-effort attempt to cancel all iterations + /// that are in flight. This requires that all asynchronous streams in properly honour timely cancellation. /// public static IAsyncEnumerable Merge(this IEnumerable> sources) => Merge(sources, int.MaxValue); /// - /// Concurrently merges all the elements of multiple asynchronous - /// streams into a single asynchronous stream. An additional parameter - /// specifies the maximum concurrent operations that may be in flight - /// at any give time. + /// Concurrently merges all the elements of multiple asynchronous streams into a single + /// asynchronous stream. An additional parameter specifies the maximum concurrent operations + /// that may be in flight at any give time. /// /// /// The type of the elements in . /// The sequence of asynchronous streams. /// - /// Maximum number of asynchronous operations to have in flight at any - /// given time. A value of 1 (or below) disables concurrency. + /// Maximum number of asynchronous operations to have in flight at any given time. A value + /// of 1 (or below) disables concurrency. /// /// An asynchronous stream with all elements from all . /// @@ -72,14 +70,13 @@ public static IAsyncEnumerable Merge(this IEnumerable> /// /// This operator uses deferred execution and streams its results. /// - /// When is 2 or greater then the - /// elements in the resulting stream may appear in a different - /// order than their order in . + /// When is 2 or greater then the elements in the resulting + /// stream may appear in a different order than their order in . /// - /// When disposed part of the way, there is a best-effort attempt to - /// cancel all iterations that are in flight. This requires that all - /// asynchronous streams in properly - /// honour timely cancellation. + /// When disposed part of the way, there is a best-effort attempt to cancel all iterations + /// that are in flight. This requires that all asynchronous streams in properly honour timely cancellation. /// public static IAsyncEnumerable Merge(this IEnumerable> sources, From 43f46d1190baf68933022170583b318abdb47b94 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 17 Feb 2023 09:43:22 +0100 Subject: [PATCH 20/25] Add missing final EOL in EditorConfig --- MoreLinq.Test/.editorconfig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq.Test/.editorconfig b/MoreLinq.Test/.editorconfig index 0ebe826c2..d9548d45a 100644 --- a/MoreLinq.Test/.editorconfig +++ b/MoreLinq.Test/.editorconfig @@ -36,4 +36,4 @@ dotnet_diagnostic.IDE0047.severity = suggestion [*{Test,Tests}.cs] # IDE0022: Use expression/block body for methods -dotnet_diagnostic.IDE0022.severity = none \ No newline at end of file +dotnet_diagnostic.IDE0022.severity = none From 20dd00196f5b8bb5bac3e24055de8fd814018870 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 17 Feb 2023 09:46:56 +0100 Subject: [PATCH 21/25] Discard unused expression values (IDE10058) https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/style-rules/ide0058 --- MoreLinq.Test/Async/MergeTest.cs | 2 +- MoreLinq/Experimental/Async/Merge.cs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/MoreLinq.Test/Async/MergeTest.cs b/MoreLinq.Test/Async/MergeTest.cs index 9c9925a59..2c325549a 100644 --- a/MoreLinq.Test/Async/MergeTest.cs +++ b/MoreLinq.Test/Async/MergeTest.cs @@ -116,7 +116,7 @@ async IAsyncEnumerable Source1() yield return await ac1.Result(1); yield return 2; yield return 3; - await ac1.Result(0); + _ = await ac1.Result(0); }; async IAsyncEnumerable Source2() diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index 9f6ed6bda..396b9c4dc 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -110,7 +110,7 @@ async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancel ValueTask? DisposeAsync(IAsyncEnumerator enumerator) { - enumeratorList.Remove(enumerator); + _ = enumeratorList.Remove(enumerator); var disposalTask = enumerator.DisposeAsync(); if (disposalTask.IsCompleted) return disposalTask; @@ -156,7 +156,7 @@ async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancel { var completedTask = await Task.WhenAny(pendingTaskList).ConfigureAwait(false); var (moved, enumerator) = await completedTask.ConfigureAwait(false); - pendingTaskList.Remove(completedTask); + _ = pendingTaskList.Remove(completedTask); if (moved) { @@ -201,7 +201,7 @@ async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancel while (await Task.WhenAny(pendingTaskList) .ConfigureAwait(false) is { } completedTask) { - pendingTaskList.Remove(completedTask); + _ = pendingTaskList.Remove(completedTask); } } From b0e6ae28b28e30ed3ab9aac93f3324741bcf3e79 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 17 Feb 2023 09:48:14 +0100 Subject: [PATCH 22/25] Remove empty statement --- MoreLinq.Test/Async/MergeTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq.Test/Async/MergeTest.cs b/MoreLinq.Test/Async/MergeTest.cs index 2c325549a..81bbc18b8 100644 --- a/MoreLinq.Test/Async/MergeTest.cs +++ b/MoreLinq.Test/Async/MergeTest.cs @@ -117,7 +117,7 @@ async IAsyncEnumerable Source1() yield return 2; yield return 3; _ = await ac1.Result(0); - }; + } async IAsyncEnumerable Source2() { From dde76848e8da0374270f5ad189a19bee2363d01d Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 17 Feb 2023 10:09:22 +0100 Subject: [PATCH 23/25] Widen comments wrapping (2) --- MoreLinq/Experimental/Async/Merge.cs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index 396b9c4dc..cc56c9b33 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -174,27 +174,25 @@ async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancel } finally { - // Signal cancellation to those in flight. Unfortunately, - // this relies on all iterators to honour the cancellation. + // Signal cancellation to those in flight. Unfortunately, this relies on all + // iterators to honour the cancellation. thisCancellationTokenSource.Cancel(); - // > The caller of an async-iterator method should only call - // > `DisposeAsync()` when the method completed or was suspended - // > by a `yield return`. + // > The caller of an async-iterator method should only call `DisposeAsync()` + // > when the method completed or was suspended by a `yield return`. // // Source: https://github.com/dotnet/roslyn/blob/0e7b657bf6c019ec8019dcbd4f833f0dda50a97d/docs/features/async-streams.md#disposal // - // > The result of invoking `DisposeAsync` from states -1 or N is - // > unspecified. This compiler generates `throw new - // > NotSupportException()` for those cases. + // > The result of invoking `DisposeAsync` from states -1 or N is unspecified. + // > This compiler generates `throw new NotSupportException()` for those cases. // // Source: https://github.com/dotnet/roslyn/blob/0e7b657bf6c019ec8019dcbd4f833f0dda50a97d/docs/features/async-streams.md#state-values-and-transitions // - // As result, wait for all pending tasks to complete, irrespective - // of how they complete (successfully, faulted or canceled). The - // goal is that the iterator is in some defined stat where it is - // before disposing it otherwise it could throw "NotSupportedException". + // As result, wait for all pending tasks to complete, irrespective of how they + // complete (successfully, faulted or canceled). The goal is that the iterator + // is in some defined stat where it is before disposing it otherwise it could + // throw "NotSupportedException". if (pendingTaskList is { Count: > 0 }) { From 00473658e2445b2c4cbb4d641f8d69959a1ed3d6 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 17 Feb 2023 11:03:32 +0100 Subject: [PATCH 24/25] Fix comment --- MoreLinq/Experimental/Async/Merge.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index cc56c9b33..00cddce96 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -189,10 +189,10 @@ async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancel // // Source: https://github.com/dotnet/roslyn/blob/0e7b657bf6c019ec8019dcbd4f833f0dda50a97d/docs/features/async-streams.md#state-values-and-transitions // - // As result, wait for all pending tasks to complete, irrespective of how they + // As a result, wait for all pending tasks to complete, irrespective of how they // complete (successfully, faulted or canceled). The goal is that the iterator - // is in some defined stat where it is before disposing it otherwise it could - // throw "NotSupportedException". + // is in some defined state before disposing it otherwise it could throw + // "NotSupportedException". if (pendingTaskList is { Count: > 0 }) { From 0fb644dfa98cf683d3858d2386b817a41263e056 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 17 Feb 2023 12:40:36 +0100 Subject: [PATCH 25/25] Wrap long expression --- MoreLinq/Experimental/Async/Merge.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/Async/Merge.cs b/MoreLinq/Experimental/Async/Merge.cs index 00cddce96..34130abef 100644 --- a/MoreLinq/Experimental/Async/Merge.cs +++ b/MoreLinq/Experimental/Async/Merge.cs @@ -90,9 +90,12 @@ public static IAsyncEnumerable Merge(this IEnumerable> async IAsyncEnumerable Async([EnumeratorCancellation]CancellationToken cancellationToken = default) { using var thisCancellationTokenSource = new CancellationTokenSource(); - using var cancellationTokenSource = cancellationToken.CanBeCanceled - ? CancellationTokenSource.CreateLinkedTokenSource(thisCancellationTokenSource.Token, cancellationToken) - : thisCancellationTokenSource; + + using var cancellationTokenSource = + cancellationToken.CanBeCanceled + ? CancellationTokenSource.CreateLinkedTokenSource(thisCancellationTokenSource.Token, cancellationToken) + : thisCancellationTokenSource; + cancellationToken = cancellationTokenSource.Token; var enumeratorList = new List>();