From 09446642755a6d481db46fb0120cb1e6b7e5c411 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 27 Oct 2016 12:46:14 +0200 Subject: [PATCH 01/56] SelectAsync to asynchronously pair sequence elements with their projections --- MoreLinq/SelectAsync.cs | 54 +++++++++++++++++++++++++++++++++++++++++ MoreLinq/project.json | 5 ++-- 2 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 MoreLinq/SelectAsync.cs diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs new file mode 100644 index 000000000..e633e095b --- /dev/null +++ b/MoreLinq/SelectAsync.cs @@ -0,0 +1,54 @@ +#region License and Terms +// MoreLINQ - Extensions to LINQ to Objects +// Copyright (c) 2016 Atif Aziz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#endregion + +#if !NO_ASYNC + +namespace MoreLinq +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + + static partial class MoreEnumerable + { + /// + /// Asynchronously pairs each element of a sequence with its projection. + /// + + public static Task>> SelectAsync( + this IEnumerable sources, Func> taskSelector) + { + return SelectAsync(sources, taskSelector, (k, v) => new KeyValuePair(k, v)); + } + + static async Task> SelectAsync( + this IEnumerable sources, Func> taskSelector, + Func resultSelector) + { + if (sources == null) throw new ArgumentNullException("sources"); + if (taskSelector == null) throw new ArgumentNullException("taskSelector"); + if (resultSelector == null) throw new ArgumentNullException("resultSelector"); + + var results = await + Task.WhenAll(sources.Select(async e => new KeyValuePair(e, await taskSelector(e).ConfigureAwait(continueOnCapturedContext: false)))); + return results.Select(e => resultSelector(e.Key, e.Value)).ToList(); + } + } +} + +#endif // !NO_ASYNC \ No newline at end of file diff --git a/MoreLinq/project.json b/MoreLinq/project.json index f78776ba7..a7df713e6 100644 --- a/MoreLinq/project.json +++ b/MoreLinq/project.json @@ -57,7 +57,7 @@ } }, "frameworks": { - "net35": { + "net45": { "buildOptions": { "define": [ "MORELINQ" @@ -104,7 +104,8 @@ "NO_SERIALIZATION_ATTRIBUTES", "NO_EXCEPTION_SERIALIZATION", "NO_TRACING", - "NO_COM" + "NO_COM", + "NO_ASYNC" ] }, "frameworkAssemblies": { From d36166cc371f540ba4a38649373ca8dec8a76c08 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 27 Oct 2016 13:06:10 +0200 Subject: [PATCH 02/56] Make second SelectAsync overload public --- MoreLinq/SelectAsync.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index e633e095b..a4327683d 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -36,7 +36,12 @@ public static Task>> SelectAsync( return SelectAsync(sources, taskSelector, (k, v) => new KeyValuePair(k, v)); } - static async Task> SelectAsync( + /// + /// Asynchronously projects each element of a sequence and then uses + /// a function to create the resulting value from the two. + /// + + public static async Task> SelectAsync( this IEnumerable sources, Func> taskSelector, Func resultSelector) { From 1afd841dcbbdfa6bf29fa56e2ee0825ae792a4ce Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 27 Oct 2016 17:43:01 +0200 Subject: [PATCH 03/56] Fix test build --- MoreLinq.Test/project.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq.Test/project.json b/MoreLinq.Test/project.json index 4128125cf..fa395614f 100644 --- a/MoreLinq.Test/project.json +++ b/MoreLinq.Test/project.json @@ -43,7 +43,7 @@ } }, "frameworks": { - "net4": { + "net45": { "frameworkAssemblies": { "System.Data.DataSetExtensions": "4.0.0.0", "System.Data": "4.0.0.0", From 122a905993cba4a8600572d79f718d5a398869f2 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 27 Oct 2016 18:20:01 +0200 Subject: [PATCH 04/56] Make SelectAsync like Select and streaming --- MoreLinq/SelectAsync.cs | 63 ++++++++++++++++++++++++++++++++--------- MoreLinq/project.json | 1 + 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index a4327683d..5da707489 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -20,38 +20,73 @@ namespace MoreLinq { using System; + using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; + using System.Runtime.ExceptionServices; + using System.Threading; using System.Threading.Tasks; static partial class MoreEnumerable { /// - /// Asynchronously pairs each element of a sequence with its projection. + /// Asynchronously projects each element of a sequence to its new form. /// - public static Task>> SelectAsync( - this IEnumerable sources, Func> taskSelector) + public static IEnumerable SelectAsync( + this IEnumerable sources, Func> selector) { - return SelectAsync(sources, taskSelector, (k, v) => new KeyValuePair(k, v)); + return SelectAsync(sources, null, selector); } /// - /// Asynchronously projects each element of a sequence and then uses - /// a function to create the resulting value from the two. + /// Asynchronously projects each element of a sequence to its new form. + /// An additional parameter specifies the + /// to use to await for tasks to complete. /// - public static async Task> SelectAsync( - this IEnumerable sources, Func> taskSelector, - Func resultSelector) + public static IEnumerable SelectAsync( + this IEnumerable sources, + TaskScheduler scheduler, + Func> selector) { if (sources == null) throw new ArgumentNullException("sources"); - if (taskSelector == null) throw new ArgumentNullException("taskSelector"); - if (resultSelector == null) throw new ArgumentNullException("resultSelector"); + if (selector == null) throw new ArgumentNullException("selector"); - var results = await - Task.WhenAll(sources.Select(async e => new KeyValuePair(e, await taskSelector(e).ConfigureAwait(continueOnCapturedContext: false)))); - return results.Select(e => resultSelector(e.Key, e.Value)).ToList(); + var queue = new BlockingCollection(); + var tasks = sources.Select(selector).ToList(); // TODO max concurrency + + Task.Factory.StartNew(async () => + { + try + { + while (tasks.Count > 0) + { + var task = await Task.WhenAny(tasks); + tasks.Remove(task); + queue.Add(task); + } + queue.Add(null); + } + catch (Exception e) + { + queue.Add(ExceptionDispatchInfo.Capture(e)); + } + queue.CompleteAdding(); + }, + CancellationToken.None, + TaskCreationOptions.DenyChildAttach, + scheduler ?? TaskScheduler.Default); + + // TODO Consider the impact of throwing partway while other tasks are in flight! + + foreach (var e in queue.GetConsumingEnumerable()) + { + (e as ExceptionDispatchInfo)?.Throw(); + if (e == null) + yield break; + yield return ((Task) e).Result; + } } } } diff --git a/MoreLinq/project.json b/MoreLinq/project.json index a7df713e6..8d1693c60 100644 --- a/MoreLinq/project.json +++ b/MoreLinq/project.json @@ -86,6 +86,7 @@ }, "dependencies": { "System.Collections": "4.0.11", + "System.Collections.Concurrent": "4.0.12", "System.Diagnostics.Debug": "4.0.11", "System.Linq": "4.1.0", "System.Resources.ResourceManager": "4.0.1", From 689c2d38bf08df55386cf09e6e5f9f61ddbdcd02 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 27 Oct 2016 18:26:28 +0200 Subject: [PATCH 05/56] Source arg name --- MoreLinq/SelectAsync.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index 5da707489..13ee910b3 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -34,9 +34,9 @@ static partial class MoreEnumerable /// public static IEnumerable SelectAsync( - this IEnumerable sources, Func> selector) + this IEnumerable source, Func> selector) { - return SelectAsync(sources, null, selector); + return SelectAsync(source, null, selector); } /// @@ -46,15 +46,15 @@ public static IEnumerable SelectAsync( /// public static IEnumerable SelectAsync( - this IEnumerable sources, + this IEnumerable source, TaskScheduler scheduler, Func> selector) { - if (sources == null) throw new ArgumentNullException("sources"); + if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); var queue = new BlockingCollection(); - var tasks = sources.Select(selector).ToList(); // TODO max concurrency + var tasks = source.Select(selector).ToList(); // TODO max concurrency Task.Factory.StartNew(async () => { From e39107420cde3a93754bf18205aae586743ef494 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 27 Oct 2016 18:46:30 +0200 Subject: [PATCH 06/56] Concurrency control --- MoreLinq/SelectAsync.cs | 96 ++++++++++++++++++++++++++++------------- 1 file changed, 65 insertions(+), 31 deletions(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index 13ee910b3..4c712caa8 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -22,7 +22,6 @@ namespace MoreLinq using System; using System.Collections.Concurrent; using System.Collections.Generic; - using System.Linq; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; @@ -36,56 +35,91 @@ static partial class MoreEnumerable public static IEnumerable SelectAsync( this IEnumerable source, Func> selector) { - return SelectAsync(source, null, selector); + return source.SelectAsync(int.MaxValue, null, selector); } /// - /// Asynchronously projects each element of a sequence to its new form. - /// An additional parameter specifies the - /// to use to await for tasks to complete. + /// Asynchronously projects each element of a sequence to its new form + /// with a given concurrency. + /// + /// + /// The function should be designed to be + /// thread-agnostic. + /// + + public static IEnumerable SelectAsync( + this IEnumerable source, + int maxConcurrency, + Func> selector) + { + return source.SelectAsync(maxConcurrency, null, selector); + } + + /// + /// Asynchronously projects each element of a sequence to its new form + /// with a given concurrency. An additional parameter specifies the + /// to use to await for tasks to complete. /// + /// + /// The function should be designed to be + /// thread-agnostic. + /// public static IEnumerable SelectAsync( this IEnumerable source, + int maxConcurrency, TaskScheduler scheduler, Func> selector) { if (source == null) throw new ArgumentNullException("source"); + if (maxConcurrency <= 0) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); if (selector == null) throw new ArgumentNullException("selector"); var queue = new BlockingCollection(); - var tasks = source.Select(selector).ToList(); // TODO max concurrency + using (var _ = source.GetEnumerator()) + { + var item = _; - Task.Factory.StartNew(async () => - { - try + Task.Factory.StartNew(async () => { - while (tasks.Count > 0) + var tasks = new List>(); + + var more = false; + for (var i = 0; i < maxConcurrency && (more = item.MoveNext()); i++) + tasks.Add(selector(item.Current)); + + try + { + while (tasks.Count > 0) + { + var task = await Task.WhenAny(tasks); + tasks.Remove(task); + queue.Add(task); + + if (more && (more = item.MoveNext())) + tasks.Add(selector(item.Current)); + } + queue.Add(null); + } + catch (Exception e) { - var task = await Task.WhenAny(tasks); - tasks.Remove(task); - queue.Add(task); + queue.Add(ExceptionDispatchInfo.Capture(e)); } - queue.Add(null); - } - catch (Exception e) - { - queue.Add(ExceptionDispatchInfo.Capture(e)); - } - queue.CompleteAdding(); - }, - CancellationToken.None, - TaskCreationOptions.DenyChildAttach, - scheduler ?? TaskScheduler.Default); + queue.CompleteAdding(); + }, + CancellationToken.None, + TaskCreationOptions.DenyChildAttach, + scheduler ?? TaskScheduler.Default); - // TODO Consider the impact of throwing partway while other tasks are in flight! + // TODO Consider the impact of throwing partway while other tasks are in flight! - foreach (var e in queue.GetConsumingEnumerable()) - { - (e as ExceptionDispatchInfo)?.Throw(); - if (e == null) - yield break; - yield return ((Task) e).Result; + foreach (var e in queue.GetConsumingEnumerable()) + { + (e as ExceptionDispatchInfo)?.Throw(); + if (e == null) + yield break; + yield return ((Task) e).Result; + } } } } From fa2148e58b9919bbdac715c36a09ac72925014e5 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 27 Oct 2016 23:00:21 +0200 Subject: [PATCH 07/56] Eager disposal of source --- MoreLinq/SelectAsync.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index 4c712caa8..d5ff6a906 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -88,6 +88,9 @@ public static IEnumerable SelectAsync( for (var i = 0; i < maxConcurrency && (more = item.MoveNext()); i++) tasks.Add(selector(item.Current)); + if (!more) + item.Dispose(); + try { while (tasks.Count > 0) From 1144ae356512d141fcb8cd240bea81d7da667625 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 28 Oct 2016 11:08:08 +0200 Subject: [PATCH 08/56] SelectAsync is lazy so validate args eagerly --- MoreLinq/SelectAsync.cs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index d5ff6a906..ad7690c42 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -70,9 +70,18 @@ public static IEnumerable SelectAsync( int maxConcurrency, TaskScheduler scheduler, Func> selector) + { + return SelectAsyncImpl(source, maxConcurrency, scheduler, selector); + } + + static IEnumerable SelectAsyncImpl( + IEnumerable source, + int maxConcurrency, + TaskScheduler scheduler, + Func> selector) { if (source == null) throw new ArgumentNullException("source"); - if (maxConcurrency <= 0) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); + if (maxConcurrency <= 0) throw new ArgumentOutOfRangeException("maxConcurrency"); if (selector == null) throw new ArgumentNullException("selector"); var queue = new BlockingCollection(); From b5b6f18b6d50b84ee6cb90845511e12b1d9d63f0 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 28 Oct 2016 12:04:52 +0200 Subject: [PATCH 09/56] Cancellation support in case of error or early termination --- MoreLinq/SelectAsync.cs | 113 +++++++++++++++++++++++++++++++++++----- 1 file changed, 101 insertions(+), 12 deletions(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index ad7690c42..fac002852 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -38,6 +38,19 @@ public static IEnumerable SelectAsync( return source.SelectAsync(int.MaxValue, null, selector); } + /// + /// Asynchronously projects each element of a sequence to its new form. + /// The projection function receives a + /// as an additional argument that can be used to abort any + /// asynchronous operations in flight. + /// + + public static IEnumerable SelectAsync( + this IEnumerable source, Func> selector) + { + return source.SelectAsync(int.MaxValue, null, selector); + } + /// /// Asynchronously projects each element of a sequence to its new form /// with a given concurrency. @@ -55,6 +68,25 @@ public static IEnumerable SelectAsync( return source.SelectAsync(maxConcurrency, null, selector); } + /// + /// Asynchronously projects each element of a sequence to its new form + /// with a given concurrency. The projection function receives a + /// as an additional argument that can + /// be used to abort any asynchronous operations in flight. + /// + /// + /// The function should be designed to be + /// thread-agnostic. + /// + + public static IEnumerable SelectAsync( + this IEnumerable source, + int maxConcurrency, + Func> selector) + { + return source.SelectAsync(maxConcurrency, null, selector); + } + /// /// Asynchronously projects each element of a sequence to its new form /// with a given concurrency. An additional parameter specifies the @@ -70,6 +102,26 @@ public static IEnumerable SelectAsync( int maxConcurrency, TaskScheduler scheduler, Func> selector) + { + if (selector == null) throw new ArgumentNullException("selector"); + return source.SelectAsync(maxConcurrency, scheduler, (e, _) => selector(e)); + } + + /// + /// Asynchronously projects each element of a sequence to its new form + /// with a given concurrency. An additional parameter specifies the + /// to use to await for tasks to complete. + /// + /// + /// The function should be designed to be + /// thread-agnostic. + /// + + public static IEnumerable SelectAsync( + this IEnumerable source, + int maxConcurrency, + TaskScheduler scheduler, + Func> selector) { return SelectAsyncImpl(source, maxConcurrency, scheduler, selector); } @@ -78,24 +130,28 @@ static IEnumerable SelectAsyncImpl( IEnumerable source, int maxConcurrency, TaskScheduler scheduler, - Func> selector) + Func> selector) { if (source == null) throw new ArgumentNullException("source"); if (maxConcurrency <= 0) throw new ArgumentOutOfRangeException("maxConcurrency"); if (selector == null) throw new ArgumentNullException("selector"); var queue = new BlockingCollection(); - using (var _ = source.GetEnumerator()) - { - var item = _; + var cancellationTokenSource = new CancellationTokenSource(); + var completed = false; - Task.Factory.StartNew(async () => + var item = source.GetEnumerator(); + IDisposable disposable = item; // disables AccessToDisposedClosure warnings + try + { + Task.Factory.StartNew(() => { + var cancellationToken = cancellationTokenSource.Token; var tasks = new List>(); var more = false; for (var i = 0; i < maxConcurrency && (more = item.MoveNext()); i++) - tasks.Add(selector(item.Current)); + tasks.Add(selector(item.Current, cancellationToken)); if (!more) item.Dispose(); @@ -104,17 +160,38 @@ static IEnumerable SelectAsyncImpl( { while (tasks.Count > 0) { - var task = await Task.WhenAny(tasks); - tasks.Remove(task); + int i; + try + { + // ReSharper disable once CoVariantArrayConversion + i = Task.WaitAny(tasks.ToArray(), cancellationToken); + } + catch (OperationCanceledException) + { + // Cancellation during the wait means the + // enumeration has been stopped by the user + // so the results of the remaining tasks + // are no longer needed. Those tasks should + // cancel as a result of sharing the same + // cancellation token and provided that + // they passed it on to any downstream + // asynchronous operation. Either way, this + // loop is done so exit hard here. + + return; + } + var task = tasks[i]; + tasks.RemoveAt(i); queue.Add(task); if (more && (more = item.MoveNext())) - tasks.Add(selector(item.Current)); + tasks.Add(selector(item.Current, cancellationToken)); } queue.Add(null); } catch (Exception e) { + cancellationTokenSource.Cancel(); queue.Add(ExceptionDispatchInfo.Capture(e)); } queue.CompleteAdding(); @@ -123,15 +200,27 @@ static IEnumerable SelectAsyncImpl( TaskCreationOptions.DenyChildAttach, scheduler ?? TaskScheduler.Default); - // TODO Consider the impact of throwing partway while other tasks are in flight! - foreach (var e in queue.GetConsumingEnumerable()) { (e as ExceptionDispatchInfo)?.Throw(); if (e == null) - yield break; + break; yield return ((Task) e).Result; } + + completed = true; + } + finally + { + // The cancellation token is signaled here for the case where + // tasks may be in flight but the user stopped the enumeration + // partway (e.g. SelectAsync was combined with a Take or + // TakeWhile). The in-flight tasks need to be aborted as well + // as the awaiter loop. + + if (!completed) + cancellationTokenSource.Cancel(); + disposable.Dispose(); } } } From 07578f8b6a145398b66f67baada7b65bebf35fca Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 28 Oct 2016 13:32:27 +0200 Subject: [PATCH 10/56] Back to an async awaiter loop so thread can return to pool --- MoreLinq/SelectAsync.cs | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index fac002852..de7a01d01 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -22,6 +22,7 @@ namespace MoreLinq using System; using System.Collections.Concurrent; using System.Collections.Generic; + using System.Linq; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; @@ -144,9 +145,12 @@ static IEnumerable SelectAsyncImpl( IDisposable disposable = item; // disables AccessToDisposedClosure warnings try { - Task.Factory.StartNew(() => + Task.Factory.StartNew(async () => { var cancellationToken = cancellationTokenSource.Token; + var cancellationTaskSource = new TaskCompletionSource(); + cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true)); + var tasks = new List>(); var more = false; @@ -160,13 +164,19 @@ static IEnumerable SelectAsyncImpl( { while (tasks.Count > 0) { - int i; - try - { - // ReSharper disable once CoVariantArrayConversion - i = Task.WaitAny(tasks.ToArray(), cancellationToken); - } - catch (OperationCanceledException) + // Task.WaitAny is synchronous and blocking but + // allows the waiting to be cancelled via a + // CancellationToken. Task.WhenAny can be + // awaited so it is better since the tread + // won't be blocked and can return to the pool. + // However, it doesn't support cancellation so + // instead a task is built on top of the + // CancellationToken that completes when the + // CancellationToken trips. + + var task = await Task.WhenAny(tasks.Cast().Concat(cancellationTaskSource.Task)); + + if (task == cancellationTaskSource.Task) { // Cancellation during the wait means the // enumeration has been stopped by the user @@ -180,8 +190,8 @@ static IEnumerable SelectAsyncImpl( return; } - var task = tasks[i]; - tasks.RemoveAt(i); + + tasks.Remove((Task)task); queue.Add(task); if (more && (more = item.MoveNext())) From 2b86ebaced1e258be726800c2fd3c7420f6597e5 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 27 Feb 2018 10:00:12 +0100 Subject: [PATCH 11/56] Exclude SelectAsync from .NET Standard 1.0 build System.Collections.Concurrent is available in .NET Standard 1.1 and above, and we don't want to add a whole new target for that just now since it is covered by the .NET Standard 2.0 target. --- MoreLinq/MoreLinq.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/MoreLinq.csproj b/MoreLinq/MoreLinq.csproj index 615ac162c..c6d60eb2d 100644 --- a/MoreLinq/MoreLinq.csproj +++ b/MoreLinq/MoreLinq.csproj @@ -56,7 +56,7 @@ - $(DefineConstants);MORELINQ;NO_SERIALIZATION_ATTRIBUTES;NO_EXCEPTION_SERIALIZATION;NO_TRACING;NO_COM + $(DefineConstants);MORELINQ;NO_SERIALIZATION_ATTRIBUTES;NO_EXCEPTION_SERIALIZATION;NO_TRACING;NO_COM;NO_ASYNC From bbe2dafee54a2d366284c696bf7278b21c1c038a Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 27 Feb 2018 10:17:31 +0100 Subject: [PATCH 12/56] Fix arg validation to pass tests --- MoreLinq.Test/NullArgumentTest.cs | 5 +++++ MoreLinq/SelectAsync.cs | 2 ++ 2 files changed, 7 insertions(+) diff --git a/MoreLinq.Test/NullArgumentTest.cs b/MoreLinq.Test/NullArgumentTest.cs index 911bdb317..9dfb4d04b 100644 --- a/MoreLinq.Test/NullArgumentTest.cs +++ b/MoreLinq.Test/NullArgumentTest.cs @@ -23,6 +23,7 @@ namespace MoreLinq.Test using System.Diagnostics; using System.Linq.Expressions; using System.Reflection; + using System.Threading.Tasks; using NUnit.Framework; using NUnit.Framework.Interfaces; @@ -122,6 +123,9 @@ static bool CanBeNull(ParameterInfo parameter) #if NET451 || NETCOREAPP2_0 nameof(MoreEnumerable.ToDataTable) + ".expressions", #endif + #if NET451 || NETCOREAPP2_0 + nameof(MoreEnumerable.SelectAsync) + ".scheduler", + #endif nameof(MoreEnumerable.Trace) + ".format" }; @@ -136,6 +140,7 @@ static object CreateInstance(Type type) { if (type == typeof (int)) return 7; // int is used as size/length/range etc. avoid ArgumentOutOfRange for '0'. if (type == typeof (string)) return ""; + if (type == typeof(TaskScheduler)) return TaskScheduler.Default; if (type == typeof(IEnumerable)) return new[] { 1, 2, 3 }; // Provide non-empty sequence for MinBy/MaxBy. if (type.IsArray) return Array.CreateInstance(type.GetElementType(), 0); if (type.GetTypeInfo().IsValueType || HasDefaultConstructor(type)) return Activator.CreateInstance(type); diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index de7a01d01..46cc84fc6 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -124,6 +124,8 @@ public static IEnumerable SelectAsync( TaskScheduler scheduler, Func> selector) { + if (source == null) throw new ArgumentNullException("source"); + if (selector == null) throw new ArgumentNullException("selector"); return SelectAsyncImpl(source, maxConcurrency, scheduler, selector); } From 2854b077644a67f92011e8b32959e9e0fc9bf180 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 27 Feb 2018 10:18:35 +0100 Subject: [PATCH 13/56] Use nameof to get arg names --- MoreLinq/SelectAsync.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index 46cc84fc6..a0ce61ef9 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -104,7 +104,7 @@ public static IEnumerable SelectAsync( TaskScheduler scheduler, Func> selector) { - if (selector == null) throw new ArgumentNullException("selector"); + if (selector == null) throw new ArgumentNullException(nameof(selector)); return source.SelectAsync(maxConcurrency, scheduler, (e, _) => selector(e)); } @@ -124,8 +124,8 @@ public static IEnumerable SelectAsync( TaskScheduler scheduler, Func> selector) { - if (source == null) throw new ArgumentNullException("source"); - if (selector == null) throw new ArgumentNullException("selector"); + if (source == null) throw new ArgumentNullException(nameof(source)); + if (selector == null) throw new ArgumentNullException(nameof(selector)); return SelectAsyncImpl(source, maxConcurrency, scheduler, selector); } @@ -135,9 +135,9 @@ static IEnumerable SelectAsyncImpl( TaskScheduler scheduler, Func> selector) { - if (source == null) throw new ArgumentNullException("source"); - if (maxConcurrency <= 0) throw new ArgumentOutOfRangeException("maxConcurrency"); - if (selector == null) throw new ArgumentNullException("selector"); + if (source == null) throw new ArgumentNullException(nameof(source)); + if (maxConcurrency <= 0) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); + if (selector == null) throw new ArgumentNullException(nameof(selector)); var queue = new BlockingCollection(); var cancellationTokenSource = new CancellationTokenSource(); From 403df2d0b91466be145ec361f1f436ee780d340d Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 27 Feb 2018 10:22:07 +0100 Subject: [PATCH 14/56] Use local function pattern for implementation --- MoreLinq/SelectAsync.cs | 180 +++++++++++++++++++--------------------- 1 file changed, 87 insertions(+), 93 deletions(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index a0ce61ef9..e0b22f8b5 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -123,116 +123,110 @@ public static IEnumerable SelectAsync( int maxConcurrency, TaskScheduler scheduler, Func> selector) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (selector == null) throw new ArgumentNullException(nameof(selector)); - return SelectAsyncImpl(source, maxConcurrency, scheduler, selector); - } - - static IEnumerable SelectAsyncImpl( - IEnumerable source, - int maxConcurrency, - TaskScheduler scheduler, - Func> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); if (maxConcurrency <= 0) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); if (selector == null) throw new ArgumentNullException(nameof(selector)); - var queue = new BlockingCollection(); - var cancellationTokenSource = new CancellationTokenSource(); - var completed = false; - - var item = source.GetEnumerator(); - IDisposable disposable = item; // disables AccessToDisposedClosure warnings - try + return _(); IEnumerable _() { - Task.Factory.StartNew(async () => - { - var cancellationToken = cancellationTokenSource.Token; - var cancellationTaskSource = new TaskCompletionSource(); - cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true)); + var queue = new BlockingCollection(); + var cancellationTokenSource = new CancellationTokenSource(); + var completed = false; + + var item = source.GetEnumerator(); + IDisposable disposable = item; // disables AccessToDisposedClosure warnings + try + { + Task.Factory.StartNew(async () => + { + var cancellationToken = cancellationTokenSource.Token; + var cancellationTaskSource = new TaskCompletionSource(); + cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true)); - var tasks = new List>(); + var tasks = new List>(); - var more = false; - for (var i = 0; i < maxConcurrency && (more = item.MoveNext()); i++) - tasks.Add(selector(item.Current, cancellationToken)); + var more = false; + for (var i = 0; i < maxConcurrency && (more = item.MoveNext()); i++) + tasks.Add(selector(item.Current, cancellationToken)); - if (!more) - item.Dispose(); + if (!more) + item.Dispose(); - try - { - while (tasks.Count > 0) + try { - // Task.WaitAny is synchronous and blocking but - // allows the waiting to be cancelled via a - // CancellationToken. Task.WhenAny can be - // awaited so it is better since the tread - // won't be blocked and can return to the pool. - // However, it doesn't support cancellation so - // instead a task is built on top of the - // CancellationToken that completes when the - // CancellationToken trips. - - var task = await Task.WhenAny(tasks.Cast().Concat(cancellationTaskSource.Task)); - - if (task == cancellationTaskSource.Task) + while (tasks.Count > 0) { - // Cancellation during the wait means the - // enumeration has been stopped by the user - // so the results of the remaining tasks - // are no longer needed. Those tasks should - // cancel as a result of sharing the same - // cancellation token and provided that - // they passed it on to any downstream - // asynchronous operation. Either way, this - // loop is done so exit hard here. - - return; + // Task.WaitAny is synchronous and blocking + // but allows the waiting to be cancelled + // via a CancellationToken. Task.WhenAny can + // be awaited so it is better since the + // tread won't be blocked and can return to + // the pool. However, it doesn't support + // cancellation so instead a task is built + // on top of the CancellationToken that + // completes when the CancellationToken + // trips. + + var task = await Task.WhenAny(tasks.Cast().Concat(cancellationTaskSource.Task)); + + if (task == cancellationTaskSource.Task) + { + // Cancellation during the wait means + // the enumeration has been stopped by + // the user so the results of the + // remaining tasks are no longer needed. + // Those tasks should cancel as a result + // of sharing the same cancellation + // token and provided that they passed + // it on to any downstream asynchronous + // operations. Either way, this loop + // is done so exit hard here. + + return; + } + + tasks.Remove((Task)task); + queue.Add(task); + + if (more && (more = item.MoveNext())) + tasks.Add(selector(item.Current, cancellationToken)); } + queue.Add(null); + } + catch (Exception e) + { + cancellationTokenSource.Cancel(); + queue.Add(ExceptionDispatchInfo.Capture(e)); + } + queue.CompleteAdding(); + }, + CancellationToken.None, + TaskCreationOptions.DenyChildAttach, + scheduler ?? TaskScheduler.Default); - tasks.Remove((Task)task); - queue.Add(task); + foreach (var e in queue.GetConsumingEnumerable()) + { + (e as ExceptionDispatchInfo)?.Throw(); + if (e == null) + break; + yield return ((Task) e).Result; + } - if (more && (more = item.MoveNext())) - tasks.Add(selector(item.Current, cancellationToken)); - } - queue.Add(null); - } - catch (Exception e) - { - cancellationTokenSource.Cancel(); - queue.Add(ExceptionDispatchInfo.Capture(e)); - } - queue.CompleteAdding(); - }, - CancellationToken.None, - TaskCreationOptions.DenyChildAttach, - scheduler ?? TaskScheduler.Default); - - foreach (var e in queue.GetConsumingEnumerable()) + completed = true; + } + finally { - (e as ExceptionDispatchInfo)?.Throw(); - if (e == null) - break; - yield return ((Task) e).Result; + // The cancellation token is signaled here for the case where + // tasks may be in flight but the user stopped the enumeration + // partway (e.g. SelectAsync was combined with a Take or + // TakeWhile). The in-flight tasks need to be aborted as well + // as the awaiter loop. + + if (!completed) + cancellationTokenSource.Cancel(); + disposable.Dispose(); } - - completed = true; - } - finally - { - // The cancellation token is signaled here for the case where - // tasks may be in flight but the user stopped the enumeration - // partway (e.g. SelectAsync was combined with a Take or - // TakeWhile). The in-flight tasks need to be aborted as well - // as the awaiter loop. - - if (!completed) - cancellationTokenSource.Cancel(); - disposable.Dispose(); } } } From 34af1fd3bde739c3699e33f6028a23ee32ebbd85 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 27 Feb 2018 19:23:13 +0100 Subject: [PATCH 15/56] Add remarks that order is not preserved --- MoreLinq/SelectAsync.cs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index e0b22f8b5..7ee182313 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -32,6 +32,12 @@ static partial class MoreEnumerable /// /// Asynchronously projects each element of a sequence to its new form. /// + /// + /// This method uses deferred execution semantics. The results are + /// yielded as each asynchronous projection completes and therefore not + /// guaranteed to be based on the source sequence order. If order is + /// important, sort the results. + /// public static IEnumerable SelectAsync( this IEnumerable source, Func> selector) @@ -45,6 +51,12 @@ public static IEnumerable SelectAsync( /// as an additional argument that can be used to abort any /// asynchronous operations in flight. /// + /// + /// This method uses deferred execution semantics. The results are + /// yielded as each asynchronous projection completes and therefore not + /// guaranteed to be based on the source sequence order. If order is + /// important, sort the results. + /// public static IEnumerable SelectAsync( this IEnumerable source, Func> selector) @@ -57,6 +69,11 @@ public static IEnumerable SelectAsync( /// with a given concurrency. /// /// + /// This method uses deferred execution semantics. The results are + /// yielded as each asynchronous projection completes and therefore not + /// guaranteed to be based on the source sequence order. If order is + /// important, sort the results. + /// /// The function should be designed to be /// thread-agnostic. /// @@ -76,6 +93,11 @@ public static IEnumerable SelectAsync( /// be used to abort any asynchronous operations in flight. /// /// + /// This method uses deferred execution semantics. The results are + /// yielded as each asynchronous projection completes and therefore not + /// guaranteed to be based on the source sequence order. If order is + /// important, sort the results. + /// /// The function should be designed to be /// thread-agnostic. /// @@ -94,6 +116,11 @@ public static IEnumerable SelectAsync( /// to use to await for tasks to complete. /// /// + /// This method uses deferred execution semantics. The results are + /// yielded as each asynchronous projection completes and therefore not + /// guaranteed to be based on the source sequence order. If order is + /// important, sort the results. + /// /// The function should be designed to be /// thread-agnostic. /// @@ -113,6 +140,11 @@ public static IEnumerable SelectAsync( /// with a given concurrency. An additional parameter specifies the /// to use to await for tasks to complete. /// + /// This method uses deferred execution semantics. The results are + /// yielded as each asynchronous projection completes and therefore not + /// guaranteed to be based on the source sequence order. If order is + /// important, sort the results. + /// /// /// The function should be designed to be /// thread-agnostic. From 6142ca1770e948d83843d375b28f1e604ab744b0 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 27 Feb 2018 22:29:14 +0100 Subject: [PATCH 16/56] Add remarks that a new task is used as the workhorse --- MoreLinq/SelectAsync.cs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index 7ee182313..38ad1355c 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -37,6 +37,9 @@ static partial class MoreEnumerable /// yielded as each asynchronous projection completes and therefore not /// guaranteed to be based on the source sequence order. If order is /// important, sort the results. + /// + /// This method starts a new task on the default scheduler where the + /// asynchronous projections are started and awaited. /// public static IEnumerable SelectAsync( @@ -56,6 +59,9 @@ public static IEnumerable SelectAsync( /// yielded as each asynchronous projection completes and therefore not /// guaranteed to be based on the source sequence order. If order is /// important, sort the results. + /// + /// This method starts a new task on the default scheduler where the + /// asynchronous projections are started and awaited. /// public static IEnumerable SelectAsync( @@ -74,6 +80,9 @@ public static IEnumerable SelectAsync( /// guaranteed to be based on the source sequence order. If order is /// important, sort the results. /// + /// This method starts a new task on the default scheduler where the + /// asynchronous projections are started and awaited. + /// /// The function should be designed to be /// thread-agnostic. /// @@ -98,6 +107,9 @@ public static IEnumerable SelectAsync( /// guaranteed to be based on the source sequence order. If order is /// important, sort the results. /// + /// This method starts a new task on the default scheduler where the + /// asynchronous projections are started and awaited. + /// /// The function should be designed to be /// thread-agnostic. /// @@ -121,6 +133,9 @@ public static IEnumerable SelectAsync( /// guaranteed to be based on the source sequence order. If order is /// important, sort the results. /// + /// This method starts a new task on the given scheduler where the + /// asynchronous projections are started and awaited. + /// /// The function should be designed to be /// thread-agnostic. /// @@ -145,6 +160,9 @@ public static IEnumerable SelectAsync( /// guaranteed to be based on the source sequence order. If order is /// important, sort the results. /// + /// This method starts a new task on the given scheduler where the + /// asynchronous projections are started and awaited. + /// /// /// The function should be designed to be /// thread-agnostic. From 2b58508cd313d3de0176cdad597bb9ee826190fd Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 27 Feb 2018 22:31:23 +0100 Subject: [PATCH 17/56] Fix remarks section start --- MoreLinq/SelectAsync.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index 38ad1355c..b9f105cca 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -155,6 +155,7 @@ public static IEnumerable SelectAsync( /// with a given concurrency. An additional parameter specifies the /// to use to await for tasks to complete. /// + /// /// This method uses deferred execution semantics. The results are /// yielded as each asynchronous projection completes and therefore not /// guaranteed to be based on the source sequence order. If order is @@ -163,7 +164,6 @@ public static IEnumerable SelectAsync( /// This method starts a new task on the given scheduler where the /// asynchronous projections are started and awaited. /// - /// /// The function should be designed to be /// thread-agnostic. /// From ce92ff32f30743ef5958c2589aee9d4a6e5f0fd4 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 27 Feb 2018 22:36:15 +0100 Subject: [PATCH 18/56] eclint fix -nw eclint fix -n "**/*.{cs,tt,cmd,sh,md,txt,yml}" eclint fix -w "**/*.{cs,tt,cmd,sh,md,txt,yml,json,sln,csproj,shfbproj}" --- MoreLinq/SelectAsync.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index b9f105cca..ea9d34378 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -1,13 +1,13 @@ #region License and Terms // MoreLINQ - Extensions to LINQ to Objects // Copyright (c) 2016 Atif Aziz. All rights reserved. -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -282,4 +282,4 @@ public static IEnumerable SelectAsync( } } -#endif // !NO_ASYNC \ No newline at end of file +#endif // !NO_ASYNC From 88927b6665e5bf0097a2db3fe2398955b330fecd Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 27 Feb 2018 22:40:20 +0100 Subject: [PATCH 19/56] Proper paragraph formatting of remarks --- MoreLinq/SelectAsync.cs | 58 +++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index ea9d34378..c76fc7250 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -33,13 +33,14 @@ static partial class MoreEnumerable /// Asynchronously projects each element of a sequence to its new form. /// /// + /// /// This method uses deferred execution semantics. The results are /// yielded as each asynchronous projection completes and therefore not /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. - /// + /// important, sort the results. + /// /// This method starts a new task on the default scheduler where the - /// asynchronous projections are started and awaited. + /// asynchronous projections are started and awaited. /// public static IEnumerable SelectAsync( @@ -55,13 +56,14 @@ public static IEnumerable SelectAsync( /// asynchronous operations in flight. /// /// + /// /// This method uses deferred execution semantics. The results are /// yielded as each asynchronous projection completes and therefore not /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. - /// + /// important, sort the results. + /// /// This method starts a new task on the default scheduler where the - /// asynchronous projections are started and awaited. + /// asynchronous projections are started and awaited. /// public static IEnumerable SelectAsync( @@ -75,16 +77,17 @@ public static IEnumerable SelectAsync( /// with a given concurrency. /// /// + /// /// This method uses deferred execution semantics. The results are /// yielded as each asynchronous projection completes and therefore not /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. - /// + /// important, sort the results. + /// /// This method starts a new task on the default scheduler where the - /// asynchronous projections are started and awaited. - /// + /// asynchronous projections are started and awaited. + /// /// The function should be designed to be - /// thread-agnostic. + /// thread-agnostic. /// public static IEnumerable SelectAsync( @@ -102,16 +105,17 @@ public static IEnumerable SelectAsync( /// be used to abort any asynchronous operations in flight. /// /// + /// /// This method uses deferred execution semantics. The results are /// yielded as each asynchronous projection completes and therefore not /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. - /// + /// important, sort the results. + /// /// This method starts a new task on the default scheduler where the - /// asynchronous projections are started and awaited. - /// + /// asynchronous projections are started and awaited. + /// /// The function should be designed to be - /// thread-agnostic. + /// thread-agnostic. /// public static IEnumerable SelectAsync( @@ -128,16 +132,17 @@ public static IEnumerable SelectAsync( /// to use to await for tasks to complete. /// /// + /// /// This method uses deferred execution semantics. The results are /// yielded as each asynchronous projection completes and therefore not /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. - /// + /// important, sort the results. + /// /// This method starts a new task on the given scheduler where the - /// asynchronous projections are started and awaited. - /// + /// asynchronous projections are started and awaited. + /// /// The function should be designed to be - /// thread-agnostic. + /// thread-agnostic. /// public static IEnumerable SelectAsync( @@ -156,16 +161,17 @@ public static IEnumerable SelectAsync( /// to use to await for tasks to complete. /// /// + /// /// This method uses deferred execution semantics. The results are /// yielded as each asynchronous projection completes and therefore not /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. - /// + /// important, sort the results. + /// /// This method starts a new task on the given scheduler where the - /// asynchronous projections are started and awaited. - /// + /// asynchronous projections are started and awaited. + /// /// The function should be designed to be - /// thread-agnostic. + /// thread-agnostic. /// public static IEnumerable SelectAsync( From f3c1cf5bd0e0ca3c2d5072a46e5b12eae1bc7e09 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Wed, 28 Feb 2018 17:53:30 +0100 Subject: [PATCH 20/56] Refactor options and add one to preserve order Instead of lots of overloads due to the increasing number of options, the arguments have been refactored into an options objects. With this, there are now only two simple overloads of SelectAny. This reduces overload overload (pun, pun), documentation duplication and provides a more scalable and discoverable API. --- MoreLinq/SelectAsync.cs | 360 +++++++++++++++++++++++++++------------- 1 file changed, 246 insertions(+), 114 deletions(-) diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index c76fc7250..8f6354a9a 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -20,177 +20,214 @@ namespace MoreLinq { using System; + using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; + using System.Diagnostics; using System.Linq; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; - static partial class MoreEnumerable + /// + /// Represents options for an asynchronous projection operation. + /// + + public sealed class SelectAsyncOptions { /// - /// Asynchronously projects each element of a sequence to its new form. + /// The default options an asynchronous projection operation. /// - /// - /// - /// This method uses deferred execution semantics. The results are - /// yielded as each asynchronous projection completes and therefore not - /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. - /// - /// This method starts a new task on the default scheduler where the - /// asynchronous projections are started and awaited. - /// - public static IEnumerable SelectAsync( - this IEnumerable source, Func> selector) - { - return source.SelectAsync(int.MaxValue, null, selector); - } + public static readonly SelectAsyncOptions Default = + new SelectAsyncOptions(null /* = unbounded concurrency */, + TaskScheduler.Default, + preserveOrder: false); /// - /// Asynchronously projects each element of a sequence to its new form. - /// The projection function receives a - /// as an additional argument that can be used to abort any - /// asynchronous operations in flight. + /// Gets a positive (non-zero) integer that specifies the maximum + /// projections to run concurrenctly or null to mean unlimited + /// concurrency. /// - /// - /// - /// This method uses deferred execution semantics. The results are - /// yielded as each asynchronous projection completes and therefore not - /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. - /// - /// This method starts a new task on the default scheduler where the - /// asynchronous projections are started and awaited. - /// - public static IEnumerable SelectAsync( - this IEnumerable source, Func> selector) - { - return source.SelectAsync(int.MaxValue, null, selector); - } + public int? MaxConcurrency { get; } /// - /// Asynchronously projects each element of a sequence to its new form - /// with a given concurrency. + /// Get the scheduler to be used for any workhorse task. /// - /// - /// - /// This method uses deferred execution semantics. The results are - /// yielded as each asynchronous projection completes and therefore not - /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. - /// - /// This method starts a new task on the default scheduler where the - /// asynchronous projections are started and awaited. - /// - /// The function should be designed to be - /// thread-agnostic. - /// - public static IEnumerable SelectAsync( - this IEnumerable source, - int maxConcurrency, - Func> selector) + public TaskScheduler Scheduler { get; } + + /// + /// Get a Boolean that determines whether results should be ordered + /// the same as the projection source. + /// + + public bool PreserveOrder { get; } + + SelectAsyncOptions(int? maxConcurrency, TaskScheduler scheduler, bool preserveOrder) { - return source.SelectAsync(maxConcurrency, null, selector); + MaxConcurrency = maxConcurrency == null || maxConcurrency > 0 + ? maxConcurrency + : throw new ArgumentOutOfRangeException( + nameof(maxConcurrency), maxConcurrency, + "Maximum concurrency must be 1 or greater."); + Scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler)); + PreserveOrder = preserveOrder; } /// - /// Asynchronously projects each element of a sequence to its new form - /// with a given concurrency. The projection function receives a - /// as an additional argument that can - /// be used to abort any asynchronous operations in flight. + /// Returns new options with the given concurrency limit. + /// + + public SelectAsyncOptions WithMaxConcurrency(int? value) => + value == MaxConcurrency ? this : new SelectAsyncOptions(value, Scheduler, PreserveOrder); + + /// + /// Returns new options with the given scheduler. + /// + + public SelectAsyncOptions WithScheduler(TaskScheduler value) => + value == Scheduler ? this : new SelectAsyncOptions(MaxConcurrency, value, PreserveOrder); + + /// + /// Returns new options with the given Boolean indicating whether or + /// not the projections should be returned in the order of the + /// projection source. + /// + + public SelectAsyncOptions WithPreserveOrder(bool value) => + value == PreserveOrder ? this : new SelectAsyncOptions(MaxConcurrency, Scheduler, value); + } + + /// + /// An representing an asynchronous projection. + /// + /// + + public interface ISelectAsyncEnumerable : IEnumerable + { + /// + /// The options to apply to this asynchronous projection operation. + /// + + SelectAsyncOptions Options { get; } + + /// + /// Returns a new asynchronous projection operation that will use the + /// given options. + /// + + ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options); + } + + static partial class MoreEnumerable + { + /// + /// Returns a new asynchronous projection operation with the given + /// concurrency limit. + /// + + public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int? value) => + source.WithOptions(source.Options.WithMaxConcurrency(value)); + + /// + /// Returns a new asynchronous projection operation with the given + /// scheduler. + /// + + public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable source, TaskScheduler value) => + source.WithOptions(source.Options.WithScheduler(value)); + + /// + /// Returns a new asynchronous projection operation for which the + /// results will be returned in the order of the source sequence. /// /// - /// - /// This method uses deferred execution semantics. The results are - /// yielded as each asynchronous projection completes and therefore not - /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. - /// - /// This method starts a new task on the default scheduler where the - /// asynchronous projections are started and awaited. - /// - /// The function should be designed to be - /// thread-agnostic. + /// Internally, the projections will be done concurrently but the + /// results will be yielded in order. /// - public static IEnumerable SelectAsync( - this IEnumerable source, - int maxConcurrency, - Func> selector) - { - return source.SelectAsync(maxConcurrency, null, selector); - } + public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source) => + PreserveOrder(source, true); /// - /// Asynchronously projects each element of a sequence to its new form - /// with a given concurrency. An additional parameter specifies the - /// to use to await for tasks to complete. + /// Returns a new asynchronous projection operation with the given + /// Boolean indicating whether or not the projections should be + /// returned in the order of the projection source. + /// + + public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source, bool value) => + source.WithOptions(source.Options.WithPreserveOrder(value)); + + /// + /// Asynchronously projects each element of a sequence to its new form. /// /// /// /// This method uses deferred execution semantics. The results are - /// yielded as each asynchronous projection completes and therefore not - /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. + /// yielded as each asynchronous projection completes and, by default, + /// not guaranteed to be based on the source sequence order. If order + /// is important, compose further with + /// . /// - /// This method starts a new task on the given scheduler where the - /// asynchronous projections are started and awaited. + /// This method starts a new task where the asynchronous projections + /// are started and awaited. /// /// The function should be designed to be /// thread-agnostic. /// - public static IEnumerable SelectAsync( - this IEnumerable source, - int maxConcurrency, - TaskScheduler scheduler, - Func> selector) + public static ISelectAsyncEnumerable SelectAsync( + this IEnumerable source, Func> selector) { if (selector == null) throw new ArgumentNullException(nameof(selector)); - return source.SelectAsync(maxConcurrency, scheduler, (e, _) => selector(e)); + return source.SelectAsync((e, _) => selector(e)); } /// - /// Asynchronously projects each element of a sequence to its new form - /// with a given concurrency. An additional parameter specifies the - /// to use to await for tasks to complete. + /// Asynchronously projects each element of a sequence to its new form. + /// The projection function receives a + /// as an additional argument that can be used to abort any asynchronous + /// operations in flight. /// /// /// /// This method uses deferred execution semantics. The results are - /// yielded as each asynchronous projection completes and therefore not - /// guaranteed to be based on the source sequence order. If order is - /// important, sort the results. + /// yielded as each asynchronous projection completes and, by default, + /// not guaranteed to be based on the source sequence order. If order + /// is important, compose further using + /// + /// and a Boolean value of true. /// - /// This method starts a new task on the given scheduler where the - /// asynchronous projections are started and awaited. + /// This method starts a new task where the asynchronous projections + /// are started and awaited. /// /// The function should be designed to be /// thread-agnostic. /// - public static IEnumerable SelectAsync( - this IEnumerable source, - int maxConcurrency, - TaskScheduler scheduler, - Func> selector) + public static ISelectAsyncEnumerable SelectAsync( + this IEnumerable source, Func> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); - if (maxConcurrency <= 0) throw new ArgumentOutOfRangeException(nameof(maxConcurrency)); if (selector == null) throw new ArgumentNullException(nameof(selector)); - return _(); IEnumerable _() + return SelectAsyncEnumerable.Create( + options => _(options.MaxConcurrency ?? int.MaxValue, + options.Scheduler ?? TaskScheduler.Default, + options.PreserveOrder)); + + IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered) { var queue = new BlockingCollection(); var cancellationTokenSource = new CancellationTokenSource(); var completed = false; - var item = source.GetEnumerator(); + async Task> Select(KeyValuePair input, CancellationToken cancellationToken) => + new KeyValuePair(input.Key, await selector(input.Value, cancellationToken).ConfigureAwait(false)); + + var item = source.Index().GetEnumerator(); IDisposable disposable = item; // disables AccessToDisposedClosure warnings try { @@ -200,11 +237,11 @@ public static IEnumerable SelectAsync( var cancellationTaskSource = new TaskCompletionSource(); cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true)); - var tasks = new List>(); + var tasks = new List>>(); var more = false; for (var i = 0; i < maxConcurrency && (more = item.MoveNext()); i++) - tasks.Add(selector(item.Current, cancellationToken)); + tasks.Add(Select(item.Current, cancellationToken)); if (!more) item.Dispose(); @@ -242,11 +279,11 @@ public static IEnumerable SelectAsync( return; } - tasks.Remove((Task)task); + tasks.Remove((Task>)task); queue.Add(task); if (more && (more = item.MoveNext())) - tasks.Add(selector(item.Current, cancellationToken)); + tasks.Add(Select(item.Current, cancellationToken)); } queue.Add(null); } @@ -259,14 +296,66 @@ public static IEnumerable SelectAsync( }, CancellationToken.None, TaskCreationOptions.DenyChildAttach, - scheduler ?? TaskScheduler.Default); + scheduler); + + var nextKey = 0; + var holds = ordered ? new List>() : null; foreach (var e in queue.GetConsumingEnumerable()) { (e as ExceptionDispatchInfo)?.Throw(); if (e == null) break; - yield return ((Task) e).Result; + + var r = ((Task>) e).Result; + + if (holds == null || r.Key == nextKey) + { + // If order does not need to be preserved or the key + // is the next that should be yielded then yield + // the result. + + yield return r.Value; + + if (holds != null) // preserve order? + { + // Release withheld results consecutive in key + // order to the one just yielded... + + var releaseCount = 0; + + for (nextKey++; + holds.Count > 0 && holds[0] is KeyValuePair n + && n.Key == nextKey; + nextKey++) + { + releaseCount++; + yield return n.Value; + } + + holds.RemoveRange(0, releaseCount); + } + } + else + { + // Received a result out of order when order must be + // preserved, so withhold the result by finding out + // where it belongs in the order of results withheld + // so far and insert it in the list. + + var i = holds.BinarySearch(r, KeyValueComparer.Default); + Debug.Assert(i < 0); + holds.Insert(~i, r); + } + } + + if (holds?.Count > 0) // yield any withheld, which should be in order... + { + foreach (var hold in holds) + { + Debug.Assert(nextKey++ == hold.Key); //...assert so! + yield return hold.Value; + } } completed = true; @@ -285,6 +374,49 @@ public static IEnumerable SelectAsync( } } } + + static class SelectAsyncEnumerable + { + public static ISelectAsyncEnumerable + Create( + Func> impl, + SelectAsyncOptions options = null) => + new SelectAsyncEnumerable(impl, options); + + } + + sealed class SelectAsyncEnumerable : ISelectAsyncEnumerable + { + readonly Func> _impl; + + public SelectAsyncEnumerable(Func> impl, + SelectAsyncOptions options = null) + { + _impl = impl; + Options = options ?? SelectAsyncOptions.Default; + } + + public SelectAsyncOptions Options { get; } + + public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) => + Options == options ? this : new SelectAsyncEnumerable(_impl, options); + + public IEnumerator GetEnumerator() => _impl(Options).GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + } + + sealed class DelegatingComparer : IComparer + { + readonly Func _comparer; + public DelegatingComparer(Func comparer) => _comparer = comparer; + public int Compare(T x, T y) => _comparer(x, y); + } + + static class KeyValueComparer + { + public static readonly IComparer> Default = + new DelegatingComparer>((x, y) => Comparer.Default.Compare(x.Key, y.Key)); + } } } From e4681fe24ab3237b1d9f3080f0ea80a903af2f03 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Wed, 28 Feb 2018 19:19:12 +0100 Subject: [PATCH 21/56] Scheduler parameter no longer exists on SelectAny --- MoreLinq.Test/NullArgumentTest.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/MoreLinq.Test/NullArgumentTest.cs b/MoreLinq.Test/NullArgumentTest.cs index 1f9059c38..19780d235 100644 --- a/MoreLinq.Test/NullArgumentTest.cs +++ b/MoreLinq.Test/NullArgumentTest.cs @@ -123,9 +123,6 @@ static bool CanBeNull(ParameterInfo parameter) #if NET451 || NETCOREAPP2_0 nameof(MoreEnumerable.ToDataTable) + ".expressions", #endif - #if NET451 || NETCOREAPP2_0 - nameof(MoreEnumerable.SelectAsync) + ".scheduler", - #endif nameof(MoreEnumerable.Trace) + ".format" }; From ffa4ef2a40e32ec448efd973b8d305cd4bb11d23 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Wed, 28 Feb 2018 20:30:23 +0100 Subject: [PATCH 22/56] Null arg checking to pass broken tests --- MoreLinq.Test/MoreLinq.Test.csproj | 4 ++++ MoreLinq.Test/NullArgumentTest.cs | 11 +++++++++ MoreLinq/SelectAsync.cs | 36 +++++++++++++++++++++--------- 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/MoreLinq.Test/MoreLinq.Test.csproj b/MoreLinq.Test/MoreLinq.Test.csproj index e44cc4acb..6b14e9b4d 100644 --- a/MoreLinq.Test/MoreLinq.Test.csproj +++ b/MoreLinq.Test/MoreLinq.Test.csproj @@ -19,6 +19,10 @@ 618 + + $(DefineConstants);NO_ASYNC + + diff --git a/MoreLinq.Test/NullArgumentTest.cs b/MoreLinq.Test/NullArgumentTest.cs index 19780d235..55b1b5310 100644 --- a/MoreLinq.Test/NullArgumentTest.cs +++ b/MoreLinq.Test/NullArgumentTest.cs @@ -215,6 +215,17 @@ public System.Linq.IOrderedEnumerable CreateOrderedEnumerable(Func : Enumerable, + ISelectAsyncEnumerable + { + public SelectAsyncOptions Options => SelectAsyncOptions.Default; + public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) => this; + } + + #endif + public class Comparer : IComparer { public int Compare(T x, T y) => -1; diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/SelectAsync.cs index 8f6354a9a..02e836ddf 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/SelectAsync.cs @@ -128,16 +128,23 @@ static partial class MoreEnumerable /// concurrency limit. /// - public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int? value) => - source.WithOptions(source.Options.WithMaxConcurrency(value)); + public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int? value) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.WithOptions(source.Options.WithMaxConcurrency(value)); + } /// /// Returns a new asynchronous projection operation with the given /// scheduler. /// - public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable source, TaskScheduler value) => - source.WithOptions(source.Options.WithScheduler(value)); + public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable source, TaskScheduler value) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (value == null) throw new ArgumentNullException(nameof(value)); + return source.WithOptions(source.Options.WithScheduler(value)); + } /// /// Returns a new asynchronous projection operation for which the @@ -148,8 +155,11 @@ public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable /// results will be yielded in order. /// - public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source) => - PreserveOrder(source, true); + public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return PreserveOrder(source, true); + } /// /// Returns a new asynchronous projection operation with the given @@ -157,8 +167,11 @@ public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumer /// returned in the order of the projection source. /// - public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source, bool value) => - source.WithOptions(source.Options.WithPreserveOrder(value)); + public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source, bool value) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.WithOptions(source.Options.WithPreserveOrder(value)); + } /// /// Asynchronously projects each element of a sequence to its new form. @@ -398,8 +411,11 @@ public SelectAsyncEnumerable(Func> impl, public SelectAsyncOptions Options { get; } - public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) => - Options == options ? this : new SelectAsyncEnumerable(_impl, options); + public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + return Options == options ? this : new SelectAsyncEnumerable(_impl, options); + } public IEnumerator GetEnumerator() => _impl(Options).GetEnumerator(); IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); From 69ea1dc372b9099d5b7be5ec815a0015f3150993 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 1 Mar 2018 10:03:04 +0100 Subject: [PATCH 23/56] Move under experimental namespace --- MoreLinq.Test/NullArgumentTest.cs | 6 +++--- MoreLinq/{ => Experimental}/SelectAsync.cs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) rename MoreLinq/{ => Experimental}/SelectAsync.cs (99%) diff --git a/MoreLinq.Test/NullArgumentTest.cs b/MoreLinq.Test/NullArgumentTest.cs index 55b1b5310..3cdb67bea 100644 --- a/MoreLinq.Test/NullArgumentTest.cs +++ b/MoreLinq.Test/NullArgumentTest.cs @@ -218,10 +218,10 @@ public System.Linq.IOrderedEnumerable CreateOrderedEnumerable(Func : Enumerable, - ISelectAsyncEnumerable + Experimental.ISelectAsyncEnumerable { - public SelectAsyncOptions Options => SelectAsyncOptions.Default; - public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) => this; + public Experimental.SelectAsyncOptions Options => Experimental.SelectAsyncOptions.Default; + public Experimental.ISelectAsyncEnumerable WithOptions(Experimental.SelectAsyncOptions options) => this; } #endif diff --git a/MoreLinq/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs similarity index 99% rename from MoreLinq/SelectAsync.cs rename to MoreLinq/Experimental/SelectAsync.cs index 02e836ddf..6dd462ba1 100644 --- a/MoreLinq/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -17,7 +17,7 @@ #if !NO_ASYNC -namespace MoreLinq +namespace MoreLinq.Experimental { using System; using System.Collections; @@ -121,7 +121,7 @@ public interface ISelectAsyncEnumerable : IEnumerable ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options); } - static partial class MoreEnumerable + static partial class ExperimentalEnumerable { /// /// Returns a new asynchronous projection operation with the given @@ -210,7 +210,7 @@ public static ISelectAsyncEnumerable SelectAsync( /// yielded as each asynchronous projection completes and, by default, /// not guaranteed to be based on the source sequence order. If order /// is important, compose further using - /// + /// /// and a Boolean value of true. /// /// This method starts a new task where the asynchronous projections From 67ef192cfae25822699e053c0166350767e88c53 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 1 Mar 2018 17:48:55 +0100 Subject: [PATCH 24/56] Rename `PreserveOrder()` to `AsOrdered()` This aligns with PLINQ's vocabulary: https://docs.microsoft.com/en-us/dotnet/api/system.linq.parallelenumerable.asordered --- MoreLinq/Experimental/SelectAsync.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 6dd462ba1..546477cec 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -155,7 +155,7 @@ public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable /// results will be yielded in order. /// - public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source) + public static ISelectAsyncEnumerable AsOrdered(this ISelectAsyncEnumerable source) { if (source == null) throw new ArgumentNullException(nameof(source)); return PreserveOrder(source, true); @@ -182,7 +182,7 @@ public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumer /// yielded as each asynchronous projection completes and, by default, /// not guaranteed to be based on the source sequence order. If order /// is important, compose further with - /// . + /// . /// /// This method starts a new task where the asynchronous projections /// are started and awaited. @@ -210,7 +210,7 @@ public static ISelectAsyncEnumerable SelectAsync( /// yielded as each asynchronous projection completes and, by default, /// not guaranteed to be based on the source sequence order. If order /// is important, compose further using - /// + /// /// and a Boolean value of true. /// /// This method starts a new task where the asynchronous projections From bdc4a8121489aaa1cc9093dd6e78b67f7dbd07e8 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 1 Mar 2018 17:50:46 +0100 Subject: [PATCH 25/56] Add AsUnordered to counter AsOrdered --- MoreLinq/Experimental/SelectAsync.cs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 546477cec..24352d44f 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -161,6 +161,18 @@ public static ISelectAsyncEnumerable AsOrdered(this ISelectAsyncEnumerable return PreserveOrder(source, true); } + /// + /// Returns a new asynchronous projection operation for which the + /// results are no longer guaranteed to be in the order of the source + /// sequence. + /// + + public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerable source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return PreserveOrder(source, false); + } + /// /// Returns a new asynchronous projection operation with the given /// Boolean indicating whether or not the projections should be From 70d50731ee8f7ae51c3a3eff20002fa12236c927 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 1 Mar 2018 17:59:16 +0100 Subject: [PATCH 26/56] Add AsSequential as MaxConcurrency(1) --- MoreLinq/Experimental/SelectAsync.cs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 24352d44f..e1fd9645e 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -123,6 +123,17 @@ public interface ISelectAsyncEnumerable : IEnumerable static partial class ExperimentalEnumerable { + /// + /// Converts an asynchronous projection operation to use sequential + /// evaluation. + /// + + public static IEnumerable AsSequential(this ISelectAsyncEnumerable source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.MaxConcurrency(1); + } + /// /// Returns a new asynchronous projection operation with the given /// concurrency limit. From b8104c4dc72f415c9835db319504d73a3e4f6e73 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 1 Mar 2018 18:01:40 +0100 Subject: [PATCH 27/56] Remove redundant arg checks --- MoreLinq/Experimental/SelectAsync.cs | 35 ++++++++-------------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index e1fd9645e..2e0887e7e 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -128,22 +128,16 @@ static partial class ExperimentalEnumerable /// evaluation. /// - public static IEnumerable AsSequential(this ISelectAsyncEnumerable source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.MaxConcurrency(1); - } + public static IEnumerable AsSequential(this ISelectAsyncEnumerable source) => + source.MaxConcurrency(1); /// /// Returns a new asynchronous projection operation with the given /// concurrency limit. /// - public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int? value) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.WithOptions(source.Options.WithMaxConcurrency(value)); - } + public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int? value) => + source.WithOptions(source.Options.WithMaxConcurrency(value)); /// /// Returns a new asynchronous projection operation with the given @@ -166,11 +160,8 @@ public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable /// results will be yielded in order. /// - public static ISelectAsyncEnumerable AsOrdered(this ISelectAsyncEnumerable source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return PreserveOrder(source, true); - } + public static ISelectAsyncEnumerable AsOrdered(this ISelectAsyncEnumerable source) => + PreserveOrder(source, true); /// /// Returns a new asynchronous projection operation for which the @@ -178,11 +169,8 @@ public static ISelectAsyncEnumerable AsOrdered(this ISelectAsyncEnumerable /// sequence. /// - public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerable source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return PreserveOrder(source, false); - } + public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerable source) => + PreserveOrder(source, false); /// /// Returns a new asynchronous projection operation with the given @@ -190,11 +178,8 @@ public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerab /// returned in the order of the projection source. /// - public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source, bool value) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.WithOptions(source.Options.WithPreserveOrder(value)); - } + public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source, bool value) => + source.WithOptions(source.Options.WithPreserveOrder(value)); /// /// Asynchronously projects each element of a sequence to its new form. From 655e45acba84eea7f873a11f43e2e128af3af24a Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 1 Mar 2018 18:34:58 +0100 Subject: [PATCH 28/56] Complete the documentation --- MoreLinq/Experimental/SelectAsync.cs | 70 ++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 2e0887e7e..f792dd671 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -79,6 +79,10 @@ public sealed class SelectAsyncOptions /// /// Returns new options with the given concurrency limit. /// + /// + /// The maximum concurrent asynchronous operation to keep in flight. + /// Use null to mean unbounded concurrency. + /// Options with the new setting. public SelectAsyncOptions WithMaxConcurrency(int? value) => value == MaxConcurrency ? this : new SelectAsyncOptions(value, Scheduler, PreserveOrder); @@ -86,6 +90,9 @@ public SelectAsyncOptions WithMaxConcurrency(int? value) => /// /// Returns new options with the given scheduler. /// + /// + /// The scheduler to use to for the workhorse task. + /// Options with the new setting. public SelectAsyncOptions WithScheduler(TaskScheduler value) => value == Scheduler ? this : new SelectAsyncOptions(MaxConcurrency, value, PreserveOrder); @@ -95,6 +102,11 @@ public SelectAsyncOptions WithScheduler(TaskScheduler value) => /// not the projections should be returned in the order of the /// projection source. /// + /// + /// A Boolean where true means results are in source order and + /// false means that results can be delivered in order of + /// efficiency. + /// Options with the new setting. public SelectAsyncOptions WithPreserveOrder(bool value) => value == PreserveOrder ? this : new SelectAsyncOptions(MaxConcurrency, Scheduler, value); @@ -104,6 +116,7 @@ public SelectAsyncOptions WithPreserveOrder(bool value) => /// An representing an asynchronous projection. /// /// + /// The type of the source elements. public interface ISelectAsyncEnumerable : IEnumerable { @@ -117,6 +130,10 @@ public interface ISelectAsyncEnumerable : IEnumerable /// Returns a new asynchronous projection operation that will use the /// given options. /// + /// The new options to use. + /// + /// Returns a new sequence that projects asynchronously using the + /// supplied options. ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options); } @@ -127,6 +144,9 @@ static partial class ExperimentalEnumerable /// Converts an asynchronous projection operation to use sequential /// evaluation. /// + /// The type of the source elements. + /// The source sequence. + /// The converted sequence. public static IEnumerable AsSequential(this ISelectAsyncEnumerable source) => source.MaxConcurrency(1); @@ -135,6 +155,12 @@ public static IEnumerable AsSequential(this ISelectAsyncEnumerable sour /// Returns a new asynchronous projection operation with the given /// concurrency limit. /// + /// The type of the source elements. + /// The source sequence. + /// + /// + /// A sequence that projects results asynchronously using the given + /// concurrency limit. public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int? value) => source.WithOptions(source.Options.WithMaxConcurrency(value)); @@ -143,6 +169,12 @@ public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnume /// Returns a new asynchronous projection operation with the given /// scheduler. /// + /// The type of the source elements. + /// The source sequence. + /// The scheduler to use. + /// + /// A sequence that projects results asynchronously using the given + /// scheduler. public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable source, TaskScheduler value) { @@ -155,6 +187,11 @@ public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable /// Returns a new asynchronous projection operation for which the /// results will be returned in the order of the source sequence. /// + /// The type of the source elements. + /// The source sequence. + /// + /// A sequence that projects results asynchronously but returns + /// results in the order of the source sequence. /// /// Internally, the projections will be done concurrently but the /// results will be yielded in order. @@ -168,6 +205,12 @@ public static ISelectAsyncEnumerable AsOrdered(this ISelectAsyncEnumerable /// results are no longer guaranteed to be in the order of the source /// sequence. /// + /// The type of the source elements. + /// The source sequence. + /// + /// A sequence that projects results asynchronously but without any + /// guarantee of returning results in the order of the source + /// sequence. public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerable source) => PreserveOrder(source, false); @@ -177,6 +220,16 @@ public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerab /// Boolean indicating whether or not the projections should be /// returned in the order of the projection source. /// + /// The type of the source elements. + /// The source sequence. + /// + /// A Boolean where true means results are in source order and + /// false means that results can be delivered in order of + /// efficiency. + /// + /// A sequence that projects results asynchronously and returns the + /// results order or unordered based on + /// . public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source, bool value) => source.WithOptions(source.Options.WithPreserveOrder(value)); @@ -184,6 +237,13 @@ public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumer /// /// Asynchronously projects each element of a sequence to its new form. /// + /// The type of the source elements. + /// The type of the result elements. + /// The source sequence. + /// A transform function to apply to each element. + /// + /// A sequence that projects results asynchronously. + /// /// /// /// This method uses deferred execution semantics. The results are @@ -212,6 +272,16 @@ public static ISelectAsyncEnumerable SelectAsync( /// as an additional argument that can be used to abort any asynchronous /// operations in flight. /// + /// The type of the source elements. + /// The type of the result elements. + /// The source sequence. + /// A transform function to apply to each + /// element, the second parameter of which is a + /// that can be used to abort + /// asynchronous operations. + /// + /// A sequence that projects results asynchronously. + /// /// /// /// This method uses deferred execution semantics. The results are From bb8e544f440164125c62acda18cbadec32cb77a1 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 1 Mar 2018 20:05:52 +0100 Subject: [PATCH 29/56] Make exception throw case clearer --- MoreLinq/Experimental/SelectAsync.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index f792dd671..83f275703 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -394,7 +394,9 @@ async Task> Select(KeyValuePair input, Cancel foreach (var e in queue.GetConsumingEnumerable()) { - (e as ExceptionDispatchInfo)?.Throw(); + if (e is ExceptionDispatchInfo edi) + edi.Throw(); + if (e == null) break; From e3c4259b0be9307a64673a8a54715023bdf8cd76 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 1 Mar 2018 21:23:17 +0100 Subject: [PATCH 30/56] DelegatingComparer is redundant with Comparer.Create --- MoreLinq/Experimental/SelectAsync.cs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 83f275703..b269f9f39 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -501,17 +501,10 @@ public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } - sealed class DelegatingComparer : IComparer - { - readonly Func _comparer; - public DelegatingComparer(Func comparer) => _comparer = comparer; - public int Compare(T x, T y) => _comparer(x, y); - } - static class KeyValueComparer { public static readonly IComparer> Default = - new DelegatingComparer>((x, y) => Comparer.Default.Compare(x.Key, y.Key)); + Comparer>.Create((x, y) => Comparer.Default.Compare(x.Key, y.Key)); } } } From d6269f5c2f46197d83162e064641322317670d74 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 2 Mar 2018 00:18:43 +0100 Subject: [PATCH 31/56] Fix typo in comment --- MoreLinq/Experimental/SelectAsync.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index b269f9f39..aaf106c41 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -345,7 +345,7 @@ async Task> Select(KeyValuePair input, Cancel // but allows the waiting to be cancelled // via a CancellationToken. Task.WhenAny can // be awaited so it is better since the - // tread won't be blocked and can return to + // thread won't be blocked and can return to // the pool. However, it doesn't support // cancellation so instead a task is built // on top of the CancellationToken that From 33e437aa52582c11c497d041f1217332ed43401d Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 2 Mar 2018 00:27:58 +0100 Subject: [PATCH 32/56] Extract async result collection into own method --- MoreLinq/Experimental/SelectAsync.cs | 166 +++++++++++++++------------ 1 file changed, 94 insertions(+), 72 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index aaf106c41..f31932cd6 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -311,80 +311,30 @@ public static ISelectAsyncEnumerable SelectAsync( IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered) { - var queue = new BlockingCollection(); + var notices = new BlockingCollection(); var cancellationTokenSource = new CancellationTokenSource(); + var cancellationToken = cancellationTokenSource.Token; var completed = false; - async Task> Select(KeyValuePair input, CancellationToken cancellationToken) => - new KeyValuePair(input.Key, await selector(input.Value, cancellationToken).ConfigureAwait(false)); + var enumerator = + source.Index() + .Select(e => (Key: e.Key, Value: selector(e.Value, cancellationToken))) + .GetEnumerator(); - var item = source.Index().GetEnumerator(); - IDisposable disposable = item; // disables AccessToDisposedClosure warnings - try - { - Task.Factory.StartNew(async () => - { - var cancellationToken = cancellationTokenSource.Token; - var cancellationTaskSource = new TaskCompletionSource(); - cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true)); - - var tasks = new List>>(); - - var more = false; - for (var i = 0; i < maxConcurrency && (more = item.MoveNext()); i++) - tasks.Add(Select(item.Current, cancellationToken)); + IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings - if (!more) - item.Dispose(); + object endNotice = null; - try - { - while (tasks.Count > 0) - { - // Task.WaitAny is synchronous and blocking - // but allows the waiting to be cancelled - // via a CancellationToken. Task.WhenAny can - // be awaited so it is better since the - // thread won't be blocked and can return to - // the pool. However, it doesn't support - // cancellation so instead a task is built - // on top of the CancellationToken that - // completes when the CancellationToken - // trips. - - var task = await Task.WhenAny(tasks.Cast().Concat(cancellationTaskSource.Task)); - - if (task == cancellationTaskSource.Task) - { - // Cancellation during the wait means - // the enumeration has been stopped by - // the user so the results of the - // remaining tasks are no longer needed. - // Those tasks should cancel as a result - // of sharing the same cancellation - // token and provided that they passed - // it on to any downstream asynchronous - // operations. Either way, this loop - // is done so exit hard here. - - return; - } - - tasks.Remove((Task>)task); - queue.Add(task); - - if (more && (more = item.MoveNext())) - tasks.Add(Select(item.Current, cancellationToken)); - } - queue.Add(null); - } - catch (Exception e) - { - cancellationTokenSource.Cancel(); - queue.Add(ExceptionDispatchInfo.Capture(e)); - } - queue.CompleteAdding(); - }, + try + { + Task.Factory.StartNew( + () => CollectToAsync(enumerator, + e => e.Value, + notices, + (e, r) => new KeyValuePair(e.Key, r), // boxing :-( + ExceptionDispatchInfo.Capture, + endNotice, + maxConcurrency, cancellationTokenSource), CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler); @@ -392,15 +342,15 @@ async Task> Select(KeyValuePair input, Cancel var nextKey = 0; var holds = ordered ? new List>() : null; - foreach (var e in queue.GetConsumingEnumerable()) + foreach (var notice in notices.GetConsumingEnumerable()) { - if (e is ExceptionDispatchInfo edi) + if (notice is ExceptionDispatchInfo edi) edi.Throw(); - if (e == null) + if (notice == endNotice) break; - var r = ((Task>) e).Result; + var r = ((Task>) notice).Result; if (holds == null || r.Key == nextKey) { @@ -468,6 +418,78 @@ async Task> Select(KeyValuePair input, Cancel } } + static async Task Select(this Task task, Func selector) => + selector(await task); + + static async Task CollectToAsync( + this IEnumerator e, + Func> taskSelector, + BlockingCollection collection, + Func resultNoticeSelector, + Func errorNoticeSelector, + TNotice endNotice, + int maxConcurrency, + CancellationTokenSource cancellationTokenSource) + { + var cancellationToken = cancellationTokenSource.Token; + var cancellationTaskSource = new TaskCompletionSource(); + cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true)); + + var tasks = new List>(); + + var more = false; + for (var i = 0; i < maxConcurrency && (more = e.MoveNext()); i++) + tasks.Add(taskSelector(e.Current).Select(r => (e.Current, r))); + + if (!more) + e.Dispose(); + + try + { + while (tasks.Count > 0) + { + // Task.WaitAny is synchronous and blocking but allows the + // waiting to be cancelled via a CancellationToken. + // Task.WhenAny can be awaited so it is better since the + // thread won't be blocked and can return to the pool. + // However, it doesn't support cancellation so instead a + // task is built on top of the CancellationToken that + // completes when the CancellationToken trips. + + var task = await Task.WhenAny(tasks.Cast().Concat(cancellationTaskSource.Task)); + + if (task == cancellationTaskSource.Task) + { + // Cancellation during the wait means the enumeration + // has been stopped by the user so the results of the + // remaining tasks are no longer needed. Those tasks + // should cancel as a result of sharing the same + // cancellation token and provided that they passed it + // on to any downstream asynchronous operations. Either + // way, this loop is done so exit hard here. + + return; + } + + var rt = (Task<(T Input, TResult Result)>) task; + tasks.Remove(rt); + collection.Add(resultNoticeSelector(rt.Result.Input, rt.Result.Result)); + + if (more && (more = e.MoveNext())) + tasks.Add(taskSelector(e.Current).Select(r => (e.Current, r))); + } + + collection.Add(endNotice); + } + catch (Exception ex) + { + cancellationTokenSource.Cancel(); + collection.Add(errorNoticeSelector(ex)); + } + + collection.CompleteAdding(); + } + static class SelectAsyncEnumerable { public static ISelectAsyncEnumerable From 90acfbeff8f42729894da7c84a85447a96a76a88 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 2 Mar 2018 13:05:49 +0100 Subject: [PATCH 33/56] Strong-typed notices This avoids casting bugs & removes boxing. --- MoreLinq/Experimental/SelectAsync.cs | 30 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index f31932cd6..666a407b8 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -311,7 +311,7 @@ public static ISelectAsyncEnumerable SelectAsync( IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered) { - var notices = new BlockingCollection(); + var notices = new BlockingCollection<(Notice, KeyValuePair, ExceptionDispatchInfo)>(); var cancellationTokenSource = new CancellationTokenSource(); var cancellationToken = cancellationTokenSource.Token; var completed = false; @@ -323,17 +323,15 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings - object endNotice = null; - try { Task.Factory.StartNew( () => CollectToAsync(enumerator, e => e.Value, notices, - (e, r) => new KeyValuePair(e.Key, r), // boxing :-( - ExceptionDispatchInfo.Capture, - endNotice, + (e, r) => (Notice.Result, new KeyValuePair(e.Key, r), default(ExceptionDispatchInfo)), + ex => (Notice.Error, default(KeyValuePair), ExceptionDispatchInfo.Capture(ex)), + (Notice.End, default(KeyValuePair), default(ExceptionDispatchInfo)), maxConcurrency, cancellationTokenSource), CancellationToken.None, TaskCreationOptions.DenyChildAttach, @@ -342,23 +340,23 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered var nextKey = 0; var holds = ordered ? new List>() : null; - foreach (var notice in notices.GetConsumingEnumerable()) + foreach (var (kind, result, error) in notices.GetConsumingEnumerable()) { - if (notice is ExceptionDispatchInfo edi) - edi.Throw(); + if (kind == Notice.Error) + error.Throw(); - if (notice == endNotice) + if (kind == Notice.End) break; - var r = ((Task>) notice).Result; + Debug.Assert(kind == Notice.Result); - if (holds == null || r.Key == nextKey) + if (holds == null || result.Key == nextKey) { // If order does not need to be preserved or the key // is the next that should be yielded then yield // the result. - yield return r.Value; + yield return result.Value; if (holds != null) // preserve order? { @@ -386,9 +384,9 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered // where it belongs in the order of results withheld // so far and insert it in the list. - var i = holds.BinarySearch(r, KeyValueComparer.Default); + var i = holds.BinarySearch(result, KeyValueComparer.Default); Debug.Assert(i < 0); - holds.Insert(~i, r); + holds.Insert(~i, result); } } @@ -418,6 +416,8 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered } } + enum Notice { Result, Error, End } + static async Task Select(this Task task, Func selector) => selector(await task); From 0da4625537ba5df328e55fc24e6d82684d7eb413 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 2 Mar 2018 13:41:57 +0100 Subject: [PATCH 34/56] Remove syntactic redundancies --- MoreLinq/Experimental/SelectAsync.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 666a407b8..e5c42cda0 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -318,7 +318,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered var enumerator = source.Index() - .Select(e => (Key: e.Key, Value: selector(e.Value, cancellationToken))) + .Select(e => (e.Key, Value: selector(e.Value, cancellationToken))) .GetEnumerator(); IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings @@ -329,9 +329,9 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered () => CollectToAsync(enumerator, e => e.Value, notices, - (e, r) => (Notice.Result, new KeyValuePair(e.Key, r), default(ExceptionDispatchInfo)), - ex => (Notice.Error, default(KeyValuePair), ExceptionDispatchInfo.Capture(ex)), - (Notice.End, default(KeyValuePair), default(ExceptionDispatchInfo)), + (e, r) => (Notice.Result, new KeyValuePair(e.Key, r), default), + ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), + (Notice.End, default, default), maxConcurrency, cancellationTokenSource), CancellationToken.None, TaskCreationOptions.DenyChildAttach, From d7207d2fca4ac7cc2ceb99f20349d4c8b33672c7 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 2 Mar 2018 19:36:21 +0100 Subject: [PATCH 35/56] Better names --- MoreLinq/Experimental/SelectAsync.cs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index e5c42cda0..048ea0776 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -456,9 +456,11 @@ static async Task CollectToAsync( // task is built on top of the CancellationToken that // completes when the CancellationToken trips. - var task = await Task.WhenAny(tasks.Cast().Concat(cancellationTaskSource.Task)); + var completedTask = await + Task.WhenAny(tasks.Cast() + .Concat(cancellationTaskSource.Task)); - if (task == cancellationTaskSource.Task) + if (completedTask == cancellationTaskSource.Task) { // Cancellation during the wait means the enumeration // has been stopped by the user so the results of the @@ -471,9 +473,9 @@ static async Task CollectToAsync( return; } - var rt = (Task<(T Input, TResult Result)>) task; - tasks.Remove(rt); - collection.Add(resultNoticeSelector(rt.Result.Input, rt.Result.Result)); + var task = (Task<(T Input, TResult Result)>) completedTask; + tasks.Remove(task); + collection.Add(resultNoticeSelector(task.Result.Input, task.Result.Result)); if (more && (more = e.MoveNext())) tasks.Add(taskSelector(e.Current).Select(r => (e.Current, r))); From 466a74b04a5584ff38000ed4b3203c153d8afd2b Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 2 Mar 2018 19:37:05 +0100 Subject: [PATCH 36/56] Remove empty line --- MoreLinq/Experimental/SelectAsync.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 048ea0776..cbb7317e0 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -499,7 +499,6 @@ public static ISelectAsyncEnumerable Func> impl, SelectAsyncOptions options = null) => new SelectAsyncEnumerable(impl, options); - } sealed class SelectAsyncEnumerable : ISelectAsyncEnumerable From c241fb0b443f6a968ea93ff8160d26190d1a5575 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 2 Mar 2018 19:37:46 +0100 Subject: [PATCH 37/56] Reduce indentation --- MoreLinq/Experimental/SelectAsync.cs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index cbb7317e0..d3bbf600f 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -326,13 +326,15 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered try { Task.Factory.StartNew( - () => CollectToAsync(enumerator, - e => e.Value, - notices, - (e, r) => (Notice.Result, new KeyValuePair(e.Key, r), default), - ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), - (Notice.End, default, default), - maxConcurrency, cancellationTokenSource), + () => + CollectToAsync( + enumerator, + e => e.Value, + notices, + (e, r) => (Notice.Result, new KeyValuePair(e.Key, r), default), + ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), + (Notice.End, default, default), + maxConcurrency, cancellationTokenSource), CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler); From 7560944f406365a4230bac3bb07ec6115ce3133f Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 2 Mar 2018 19:49:37 +0100 Subject: [PATCH 38/56] Replace KeyValuePair<,> with a couple --- MoreLinq/Experimental/SelectAsync.cs | 41 ++++++++++++++++------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index d3bbf600f..1653664d6 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -311,7 +311,7 @@ public static ISelectAsyncEnumerable SelectAsync( IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered) { - var notices = new BlockingCollection<(Notice, KeyValuePair, ExceptionDispatchInfo)>(); + var notices = new BlockingCollection<(Notice, (int, TResult), ExceptionDispatchInfo)>(); var cancellationTokenSource = new CancellationTokenSource(); var cancellationToken = cancellationTokenSource.Token; var completed = false; @@ -331,7 +331,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered enumerator, e => e.Value, notices, - (e, r) => (Notice.Result, new KeyValuePair(e.Key, r), default), + (e, r) => (Notice.Result, (e.Key, r), default), ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), (Notice.End, default, default), maxConcurrency, cancellationTokenSource), @@ -340,7 +340,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered scheduler); var nextKey = 0; - var holds = ordered ? new List>() : null; + var holds = ordered ? new List<(int, TResult)>() : null; foreach (var (kind, result, error) in notices.GetConsumingEnumerable()) { @@ -352,13 +352,14 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered Debug.Assert(kind == Notice.Result); - if (holds == null || result.Key == nextKey) + var (key, value) = result; + if (holds == null || key == nextKey) { // If order does not need to be preserved or the key // is the next that should be yielded then yield // the result. - yield return result.Value; + yield return value; if (holds != null) // preserve order? { @@ -367,13 +368,14 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered var releaseCount = 0; - for (nextKey++; - holds.Count > 0 && holds[0] is KeyValuePair n - && n.Key == nextKey; - nextKey++) + for (nextKey++; holds.Count > 0; nextKey++) { - releaseCount++; - yield return n.Value; + var (candidateKey, candidate) = holds[0]; + if (candidateKey == nextKey) + { + releaseCount++; + yield return candidate; + } } holds.RemoveRange(0, releaseCount); @@ -386,7 +388,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered // where it belongs in the order of results withheld // so far and insert it in the list. - var i = holds.BinarySearch(result, KeyValueComparer.Default); + var i = holds.BinarySearch(result, TupleComparer.Key1); Debug.Assert(i < 0); holds.Insert(~i, result); } @@ -394,10 +396,10 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered if (holds?.Count > 0) // yield any withheld, which should be in order... { - foreach (var hold in holds) + foreach (var (key, value) in holds) { - Debug.Assert(nextKey++ == hold.Key); //...assert so! - yield return hold.Value; + Debug.Assert(nextKey++ == key); //...assert so! + yield return value; } } @@ -526,10 +528,13 @@ public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } - static class KeyValueComparer + static class TupleComparer { - public static readonly IComparer> Default = - Comparer>.Create((x, y) => Comparer.Default.Compare(x.Key, y.Key)); + public static readonly IComparer<(T1, T2)> Key1 = + Comparer<(T1, T2)>.Create((x, y) => Comparer.Default.Compare(x.Item1, y.Item1)); + + public static readonly IComparer<(T1, T2)> Key2 = + Comparer<(T1, T2)>.Create((x, y) => Comparer.Default.Compare(x.Item2, y.Item2)); } } } From e389cabfdefcf4ad3d3386d9981a3e1529cbd9f1 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Fri, 2 Mar 2018 19:50:27 +0100 Subject: [PATCH 39/56] Fix indentation --- MoreLinq/Experimental/SelectAsync.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 1653664d6..96b6020e5 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -304,10 +304,11 @@ public static ISelectAsyncEnumerable SelectAsync( if (source == null) throw new ArgumentNullException(nameof(source)); if (selector == null) throw new ArgumentNullException(nameof(selector)); - return SelectAsyncEnumerable.Create( - options => _(options.MaxConcurrency ?? int.MaxValue, - options.Scheduler ?? TaskScheduler.Default, - options.PreserveOrder)); + return + SelectAsyncEnumerable.Create( + options => _(options.MaxConcurrency ?? int.MaxValue, + options.Scheduler ?? TaskScheduler.Default, + options.PreserveOrder)); IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered) { From 9d2d55d0b6f19f95f2175fcd85be13a193669c62 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sat, 3 Mar 2018 10:45:22 +0100 Subject: [PATCH 40/56] Fix remark about AsOrdered in 2nd overload --- MoreLinq/Experimental/SelectAsync.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 96b6020e5..29316dd15 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -287,9 +287,8 @@ public static ISelectAsyncEnumerable SelectAsync( /// This method uses deferred execution semantics. The results are /// yielded as each asynchronous projection completes and, by default, /// not guaranteed to be based on the source sequence order. If order - /// is important, compose further using - /// - /// and a Boolean value of true. + /// is important, compose further with + /// . /// /// This method starts a new task where the asynchronous projections /// are started and awaited. From 6dd995831a40885b5f11052c7e6133bf6a3b52a6 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Sat, 3 Mar 2018 11:08:06 +0100 Subject: [PATCH 41/56] Remarks about partial consumption & cancellation --- MoreLinq/Experimental/SelectAsync.cs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 29316dd15..12a7ef919 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -253,7 +253,9 @@ public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumer /// . /// /// This method starts a new task where the asynchronous projections - /// are started and awaited. + /// are started and awaited. If the resulting sequence is partially + /// consumed then there's a good chance that some projection work will + /// be wasted, those that are in flight. /// /// The function should be designed to be /// thread-agnostic. @@ -291,7 +293,11 @@ public static ISelectAsyncEnumerable SelectAsync( /// . /// /// This method starts a new task where the asynchronous projections - /// are started and awaited. + /// are started and awaited. If the resulting sequence is partially + /// consumed then there's a good chance that some projection work will + /// be wasted and a cooperative effort is done that depends on the + /// projection function (via a as its + /// second argument) to cancel those in flight. /// /// The function should be designed to be /// thread-agnostic. From 60e5474ce7f5d294b1c69eb60484d526b9fe9a87 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 5 Mar 2018 10:52:19 +0100 Subject: [PATCH 42/56] Clearer name for task element of tuple --- MoreLinq/Experimental/SelectAsync.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 12a7ef919..acba1b731 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -324,7 +324,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered var enumerator = source.Index() - .Select(e => (e.Key, Value: selector(e.Value, cancellationToken))) + .Select(e => (e.Key, Task: selector(e.Value, cancellationToken))) .GetEnumerator(); IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings @@ -335,7 +335,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered () => CollectToAsync( enumerator, - e => e.Value, + e => e.Task, notices, (e, r) => (Notice.Result, (e.Key, r), default), ex => (Notice.Error, default, ExceptionDispatchInfo.Capture(ex)), From 21314f4fd56feb7bbd003aa932ef76a83f237e4a Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 6 Mar 2018 18:17:56 +0100 Subject: [PATCH 43/56] MaxConcurrency(int) + UnboundedConcurrency() --- MoreLinq/Experimental/SelectAsync.cs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index acba1b731..8144ed195 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -162,9 +162,22 @@ public static IEnumerable AsSequential(this ISelectAsyncEnumerable sour /// A sequence that projects results asynchronously using the given /// concurrency limit. - public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int? value) => + public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int value) => source.WithOptions(source.Options.WithMaxConcurrency(value)); + /// + /// Returns a new asynchronous projection operation with no defined + /// limitation on concurrency. + /// + /// The type of the source elements. + /// The source sequence. + /// + /// A sequence that projects results asynchronously using no defined + /// limitation on concurrency. + + public static ISelectAsyncEnumerable UnboundedConcurrency(this ISelectAsyncEnumerable source) => + source.WithOptions(source.Options.WithMaxConcurrency(null)); + /// /// Returns a new asynchronous projection operation with the given /// scheduler. From 6e22d292beed98e6dbdf61bcf04f852042643735 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 6 Mar 2018 18:23:44 +0100 Subject: [PATCH 44/56] Eager disposals of enumerator in CollectToAsync --- MoreLinq/Experimental/SelectAsync.cs | 104 ++++++++++++++------------- 1 file changed, 56 insertions(+), 48 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 8144ed195..16d5bbd38 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -454,65 +454,73 @@ static async Task CollectToAsync( int maxConcurrency, CancellationTokenSource cancellationTokenSource) { - var cancellationToken = cancellationTokenSource.Token; - var cancellationTaskSource = new TaskCompletionSource(); - cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true)); + using (e) + { + var cancellationToken = cancellationTokenSource.Token; + var cancellationTaskSource = new TaskCompletionSource(); + cancellationToken.Register(() => cancellationTaskSource.TrySetResult(true)); - var tasks = new List>(); + var tasks = new List>(); - var more = false; - for (var i = 0; i < maxConcurrency && (more = e.MoveNext()); i++) - tasks.Add(taskSelector(e.Current).Select(r => (e.Current, r))); + var more = false; + for (var i = 0; i < maxConcurrency && (more = e.MoveNext()); i++) + tasks.Add(taskSelector(e.Current).Select(r => (e.Current, r))); - if (!more) - e.Dispose(); + if (!more) + e.Dispose(); - try - { - while (tasks.Count > 0) + try { - // Task.WaitAny is synchronous and blocking but allows the - // waiting to be cancelled via a CancellationToken. - // Task.WhenAny can be awaited so it is better since the - // thread won't be blocked and can return to the pool. - // However, it doesn't support cancellation so instead a - // task is built on top of the CancellationToken that - // completes when the CancellationToken trips. - - var completedTask = await - Task.WhenAny(tasks.Cast() - .Concat(cancellationTaskSource.Task)); - - if (completedTask == cancellationTaskSource.Task) + while (tasks.Count > 0) { - // Cancellation during the wait means the enumeration - // has been stopped by the user so the results of the - // remaining tasks are no longer needed. Those tasks - // should cancel as a result of sharing the same - // cancellation token and provided that they passed it - // on to any downstream asynchronous operations. Either - // way, this loop is done so exit hard here. - - return; - } + // Task.WaitAny is synchronous and blocking but allows the + // waiting to be cancelled via a CancellationToken. + // Task.WhenAny can be awaited so it is better since the + // thread won't be blocked and can return to the pool. + // However, it doesn't support cancellation so instead a + // task is built on top of the CancellationToken that + // completes when the CancellationToken trips. + + var completedTask = await + Task.WhenAny(tasks.Cast() + .Concat(cancellationTaskSource.Task)); + + if (completedTask == cancellationTaskSource.Task) + { + // Cancellation during the wait means the enumeration + // has been stopped by the user so the results of the + // remaining tasks are no longer needed. Those tasks + // should cancel as a result of sharing the same + // cancellation token and provided that they passed it + // on to any downstream asynchronous operations. Either + // way, this loop is done so exit hard here. + + return; + } + + var task = (Task<(T Input, TResult Result)>) completedTask; + tasks.Remove(task); + collection.Add(resultNoticeSelector(task.Result.Input, task.Result.Result)); - var task = (Task<(T Input, TResult Result)>) completedTask; - tasks.Remove(task); - collection.Add(resultNoticeSelector(task.Result.Input, task.Result.Result)); + if (more) + { + if (more = e.MoveNext()) + tasks.Add(taskSelector(e.Current).Select(r => (e.Current, r))); + else + e.Dispose(); + } + } - if (more && (more = e.MoveNext())) - tasks.Add(taskSelector(e.Current).Select(r => (e.Current, r))); + collection.Add(endNotice); + } + catch (Exception ex) + { + cancellationTokenSource.Cancel(); + collection.Add(errorNoticeSelector(ex)); } - collection.Add(endNotice); - } - catch (Exception ex) - { - cancellationTokenSource.Cancel(); - collection.Add(errorNoticeSelector(ex)); + collection.CompleteAdding(); } - - collection.CompleteAdding(); } static class SelectAsyncEnumerable From 811b95660edb1722f8e554ff4edff2841323e047 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 6 Mar 2018 18:26:28 +0100 Subject: [PATCH 45/56] Keep TupleComparer member parity with ValueTuple --- MoreLinq/Experimental/SelectAsync.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 16d5bbd38..6c3be1bfc 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -407,7 +407,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered // where it belongs in the order of results withheld // so far and insert it in the list. - var i = holds.BinarySearch(result, TupleComparer.Key1); + var i = holds.BinarySearch(result, TupleComparer.Item1); Debug.Assert(i < 0); holds.Insert(~i, result); } @@ -557,10 +557,10 @@ public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) static class TupleComparer { - public static readonly IComparer<(T1, T2)> Key1 = + public static readonly IComparer<(T1, T2)> Item1 = Comparer<(T1, T2)>.Create((x, y) => Comparer.Default.Compare(x.Item1, y.Item1)); - public static readonly IComparer<(T1, T2)> Key2 = + public static readonly IComparer<(T1, T2)> Item2 = Comparer<(T1, T2)>.Create((x, y) => Comparer.Default.Compare(x.Item2, y.Item2)); } } From e377fdb387cbfdf7d988a143fb078d9a273de914 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 6 Mar 2018 18:34:07 +0100 Subject: [PATCH 46/56] Don't continue our awaits on captured context --- MoreLinq/Experimental/SelectAsync.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 6c3be1bfc..0ff5da62d 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -442,7 +442,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered enum Notice { Result, Error, End } static async Task Select(this Task task, Func selector) => - selector(await task); + selector(await task.ConfigureAwait(continueOnCapturedContext: false)); static async Task CollectToAsync( this IEnumerator e, @@ -482,8 +482,8 @@ static async Task CollectToAsync( // completes when the CancellationToken trips. var completedTask = await - Task.WhenAny(tasks.Cast() - .Concat(cancellationTaskSource.Task)); + Task.WhenAny(tasks.Cast().Concat(cancellationTaskSource.Task)) + .ConfigureAwait(continueOnCapturedContext: false); if (completedTask == cancellationTaskSource.Task) { From 3819d3d44546eb8f217c3044f03ce8fa3629483f Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Tue, 6 Mar 2018 22:34:04 +0100 Subject: [PATCH 47/56] Simplify first overload to an awaiter --- MoreLinq/Experimental/SelectAsync.cs | 33 +++++++++++++--------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 0ff5da62d..7549c5f6e 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -248,37 +248,34 @@ public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumer source.WithOptions(source.Options.WithPreserveOrder(value)); /// - /// Asynchronously projects each element of a sequence to its new form. + /// Creates a sequence that streams the result of each task in the + /// source sequence as it completes. /// - /// The type of the source elements. - /// The type of the result elements. - /// The source sequence. - /// A transform function to apply to each element. + /// + /// The type of each task's result as well as the type of the elements + /// of the resulting sequence. + /// The source sequence of tasks. /// - /// A sequence that projects results asynchronously. + /// A sequence that streams the result of each task in + /// as it completes. /// /// /// /// This method uses deferred execution semantics. The results are - /// yielded as each asynchronous projection completes and, by default, + /// yielded as each asynchronous task completes and, by default, /// not guaranteed to be based on the source sequence order. If order /// is important, compose further with /// . /// - /// This method starts a new task where the asynchronous projections - /// are started and awaited. If the resulting sequence is partially - /// consumed then there's a good chance that some projection work will - /// be wasted, those that are in flight. - /// - /// The function should be designed to be - /// thread-agnostic. + /// This method starts a new task where the tasks are awaited. If the + /// resulting sequence is partially consumed then there's a good chance + /// that some tasks will be wasted, those that are in flight. /// - public static ISelectAsyncEnumerable SelectAsync( - this IEnumerable source, Func> selector) + public static ISelectAsyncEnumerable Await( + this IEnumerable> source) { - if (selector == null) throw new ArgumentNullException(nameof(selector)); - return source.SelectAsync((e, _) => selector(e)); + return source.SelectAsync((e, _) => e); } /// From d2857e8f78268f41e3a4babdff2c64c02930ccb1 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Wed, 7 Mar 2018 07:25:11 +0100 Subject: [PATCH 48/56] Rename to Await --- MoreLinq.Test/NullArgumentTest.cs | 8 +-- MoreLinq/Experimental/SelectAsync.cs | 74 ++++++++++++++-------------- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/MoreLinq.Test/NullArgumentTest.cs b/MoreLinq.Test/NullArgumentTest.cs index 3cdb67bea..4cf3fb3cf 100644 --- a/MoreLinq.Test/NullArgumentTest.cs +++ b/MoreLinq.Test/NullArgumentTest.cs @@ -217,11 +217,11 @@ public System.Linq.IOrderedEnumerable CreateOrderedEnumerable(Func : Enumerable, - Experimental.ISelectAsyncEnumerable + public class AwaitQuery : Enumerable, + Experimental.IAwaitQuery { - public Experimental.SelectAsyncOptions Options => Experimental.SelectAsyncOptions.Default; - public Experimental.ISelectAsyncEnumerable WithOptions(Experimental.SelectAsyncOptions options) => this; + public Experimental.AwaitQueryOptions Options => Experimental.AwaitQueryOptions.Default; + public Experimental.IAwaitQuery WithOptions(Experimental.AwaitQueryOptions options) => this; } #endif diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index 7549c5f6e..b4d8a2669 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -33,16 +33,16 @@ namespace MoreLinq.Experimental /// Represents options for an asynchronous projection operation. /// - public sealed class SelectAsyncOptions + public sealed class AwaitQueryOptions { /// /// The default options an asynchronous projection operation. /// - public static readonly SelectAsyncOptions Default = - new SelectAsyncOptions(null /* = unbounded concurrency */, - TaskScheduler.Default, - preserveOrder: false); + public static readonly AwaitQueryOptions Default = + new AwaitQueryOptions(null /* = unbounded concurrency */, + TaskScheduler.Default, + preserveOrder: false); /// /// Gets a positive (non-zero) integer that specifies the maximum @@ -65,7 +65,7 @@ public sealed class SelectAsyncOptions public bool PreserveOrder { get; } - SelectAsyncOptions(int? maxConcurrency, TaskScheduler scheduler, bool preserveOrder) + AwaitQueryOptions(int? maxConcurrency, TaskScheduler scheduler, bool preserveOrder) { MaxConcurrency = maxConcurrency == null || maxConcurrency > 0 ? maxConcurrency @@ -84,8 +84,8 @@ public sealed class SelectAsyncOptions /// Use null to mean unbounded concurrency. /// Options with the new setting. - public SelectAsyncOptions WithMaxConcurrency(int? value) => - value == MaxConcurrency ? this : new SelectAsyncOptions(value, Scheduler, PreserveOrder); + public AwaitQueryOptions WithMaxConcurrency(int? value) => + value == MaxConcurrency ? this : new AwaitQueryOptions(value, Scheduler, PreserveOrder); /// /// Returns new options with the given scheduler. @@ -94,8 +94,8 @@ public SelectAsyncOptions WithMaxConcurrency(int? value) => /// The scheduler to use to for the workhorse task. /// Options with the new setting. - public SelectAsyncOptions WithScheduler(TaskScheduler value) => - value == Scheduler ? this : new SelectAsyncOptions(MaxConcurrency, value, PreserveOrder); + public AwaitQueryOptions WithScheduler(TaskScheduler value) => + value == Scheduler ? this : new AwaitQueryOptions(MaxConcurrency, value, PreserveOrder); /// /// Returns new options with the given Boolean indicating whether or @@ -108,8 +108,8 @@ public SelectAsyncOptions WithScheduler(TaskScheduler value) => /// efficiency. /// Options with the new setting. - public SelectAsyncOptions WithPreserveOrder(bool value) => - value == PreserveOrder ? this : new SelectAsyncOptions(MaxConcurrency, Scheduler, value); + public AwaitQueryOptions WithPreserveOrder(bool value) => + value == PreserveOrder ? this : new AwaitQueryOptions(MaxConcurrency, Scheduler, value); } /// @@ -118,13 +118,13 @@ public SelectAsyncOptions WithPreserveOrder(bool value) => /// /// The type of the source elements. - public interface ISelectAsyncEnumerable : IEnumerable + public interface IAwaitQuery : IEnumerable { /// /// The options to apply to this asynchronous projection operation. /// - SelectAsyncOptions Options { get; } + AwaitQueryOptions Options { get; } /// /// Returns a new asynchronous projection operation that will use the @@ -135,7 +135,7 @@ public interface ISelectAsyncEnumerable : IEnumerable /// Returns a new sequence that projects asynchronously using the /// supplied options. - ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options); + IAwaitQuery WithOptions(AwaitQueryOptions options); } static partial class ExperimentalEnumerable @@ -148,7 +148,7 @@ static partial class ExperimentalEnumerable /// The source sequence. /// The converted sequence. - public static IEnumerable AsSequential(this ISelectAsyncEnumerable source) => + public static IEnumerable AsSequential(this IAwaitQuery source) => source.MaxConcurrency(1); /// @@ -162,7 +162,7 @@ public static IEnumerable AsSequential(this ISelectAsyncEnumerable sour /// A sequence that projects results asynchronously using the given /// concurrency limit. - public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int value) => + public static IAwaitQuery MaxConcurrency(this IAwaitQuery source, int value) => source.WithOptions(source.Options.WithMaxConcurrency(value)); /// @@ -175,7 +175,7 @@ public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnume /// A sequence that projects results asynchronously using no defined /// limitation on concurrency. - public static ISelectAsyncEnumerable UnboundedConcurrency(this ISelectAsyncEnumerable source) => + public static IAwaitQuery UnboundedConcurrency(this IAwaitQuery source) => source.WithOptions(source.Options.WithMaxConcurrency(null)); /// @@ -189,7 +189,7 @@ public static ISelectAsyncEnumerable UnboundedConcurrency(this ISelectAsyn /// A sequence that projects results asynchronously using the given /// scheduler. - public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable source, TaskScheduler value) + public static IAwaitQuery Scheduler(this IAwaitQuery source, TaskScheduler value) { if (source == null) throw new ArgumentNullException(nameof(source)); if (value == null) throw new ArgumentNullException(nameof(value)); @@ -210,7 +210,7 @@ public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable /// results will be yielded in order. /// - public static ISelectAsyncEnumerable AsOrdered(this ISelectAsyncEnumerable source) => + public static IAwaitQuery AsOrdered(this IAwaitQuery source) => PreserveOrder(source, true); /// @@ -225,7 +225,7 @@ public static ISelectAsyncEnumerable AsOrdered(this ISelectAsyncEnumerable /// guarantee of returning results in the order of the source /// sequence. - public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerable source) => + public static IAwaitQuery AsUnordered(this IAwaitQuery source) => PreserveOrder(source, false); /// @@ -244,7 +244,7 @@ public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerab /// results order or unordered based on /// . - public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source, bool value) => + public static IAwaitQuery PreserveOrder(this IAwaitQuery source, bool value) => source.WithOptions(source.Options.WithPreserveOrder(value)); /// @@ -272,10 +272,10 @@ public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumer /// that some tasks will be wasted, those that are in flight. /// - public static ISelectAsyncEnumerable Await( + public static IAwaitQuery Await( this IEnumerable> source) { - return source.SelectAsync((e, _) => e); + return source.Await((e, _) => e); } /// @@ -313,7 +313,7 @@ public static ISelectAsyncEnumerable Await( /// thread-agnostic. /// - public static ISelectAsyncEnumerable SelectAsync( + public static IAwaitQuery Await( this IEnumerable source, Func> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); @@ -522,30 +522,30 @@ static async Task CollectToAsync( static class SelectAsyncEnumerable { - public static ISelectAsyncEnumerable + public static IAwaitQuery Create( - Func> impl, - SelectAsyncOptions options = null) => - new SelectAsyncEnumerable(impl, options); + Func> impl, + AwaitQueryOptions options = null) => + new AwaitQuery(impl, options); } - sealed class SelectAsyncEnumerable : ISelectAsyncEnumerable + sealed class AwaitQuery : IAwaitQuery { - readonly Func> _impl; + readonly Func> _impl; - public SelectAsyncEnumerable(Func> impl, - SelectAsyncOptions options = null) + public AwaitQuery(Func> impl, + AwaitQueryOptions options = null) { _impl = impl; - Options = options ?? SelectAsyncOptions.Default; + Options = options ?? AwaitQueryOptions.Default; } - public SelectAsyncOptions Options { get; } + public AwaitQueryOptions Options { get; } - public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) + public IAwaitQuery WithOptions(AwaitQueryOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); - return Options == options ? this : new SelectAsyncEnumerable(_impl, options); + return Options == options ? this : new AwaitQuery(_impl, options); } public IEnumerator GetEnumerator() => _impl(Options).GetEnumerator(); From 3efb94dde01c57618f548138c9ad28144fc8e26e Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Wed, 7 Mar 2018 07:55:55 +0100 Subject: [PATCH 49/56] Revert "Rename to Await" This reverts commit d2857e8f78268f41e3a4babdff2c64c02930ccb1 that was partially complete. --- MoreLinq.Test/NullArgumentTest.cs | 8 +-- MoreLinq/Experimental/SelectAsync.cs | 74 ++++++++++++++-------------- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/MoreLinq.Test/NullArgumentTest.cs b/MoreLinq.Test/NullArgumentTest.cs index 4cf3fb3cf..3cdb67bea 100644 --- a/MoreLinq.Test/NullArgumentTest.cs +++ b/MoreLinq.Test/NullArgumentTest.cs @@ -217,11 +217,11 @@ public System.Linq.IOrderedEnumerable CreateOrderedEnumerable(Func : Enumerable, - Experimental.IAwaitQuery + public class SelectAsyncEnumerable : Enumerable, + Experimental.ISelectAsyncEnumerable { - public Experimental.AwaitQueryOptions Options => Experimental.AwaitQueryOptions.Default; - public Experimental.IAwaitQuery WithOptions(Experimental.AwaitQueryOptions options) => this; + public Experimental.SelectAsyncOptions Options => Experimental.SelectAsyncOptions.Default; + public Experimental.ISelectAsyncEnumerable WithOptions(Experimental.SelectAsyncOptions options) => this; } #endif diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/SelectAsync.cs index b4d8a2669..7549c5f6e 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/SelectAsync.cs @@ -33,16 +33,16 @@ namespace MoreLinq.Experimental /// Represents options for an asynchronous projection operation. /// - public sealed class AwaitQueryOptions + public sealed class SelectAsyncOptions { /// /// The default options an asynchronous projection operation. /// - public static readonly AwaitQueryOptions Default = - new AwaitQueryOptions(null /* = unbounded concurrency */, - TaskScheduler.Default, - preserveOrder: false); + public static readonly SelectAsyncOptions Default = + new SelectAsyncOptions(null /* = unbounded concurrency */, + TaskScheduler.Default, + preserveOrder: false); /// /// Gets a positive (non-zero) integer that specifies the maximum @@ -65,7 +65,7 @@ public sealed class AwaitQueryOptions public bool PreserveOrder { get; } - AwaitQueryOptions(int? maxConcurrency, TaskScheduler scheduler, bool preserveOrder) + SelectAsyncOptions(int? maxConcurrency, TaskScheduler scheduler, bool preserveOrder) { MaxConcurrency = maxConcurrency == null || maxConcurrency > 0 ? maxConcurrency @@ -84,8 +84,8 @@ public sealed class AwaitQueryOptions /// Use null to mean unbounded concurrency. /// Options with the new setting. - public AwaitQueryOptions WithMaxConcurrency(int? value) => - value == MaxConcurrency ? this : new AwaitQueryOptions(value, Scheduler, PreserveOrder); + public SelectAsyncOptions WithMaxConcurrency(int? value) => + value == MaxConcurrency ? this : new SelectAsyncOptions(value, Scheduler, PreserveOrder); /// /// Returns new options with the given scheduler. @@ -94,8 +94,8 @@ public AwaitQueryOptions WithMaxConcurrency(int? value) => /// The scheduler to use to for the workhorse task. /// Options with the new setting. - public AwaitQueryOptions WithScheduler(TaskScheduler value) => - value == Scheduler ? this : new AwaitQueryOptions(MaxConcurrency, value, PreserveOrder); + public SelectAsyncOptions WithScheduler(TaskScheduler value) => + value == Scheduler ? this : new SelectAsyncOptions(MaxConcurrency, value, PreserveOrder); /// /// Returns new options with the given Boolean indicating whether or @@ -108,8 +108,8 @@ public AwaitQueryOptions WithScheduler(TaskScheduler value) => /// efficiency. /// Options with the new setting. - public AwaitQueryOptions WithPreserveOrder(bool value) => - value == PreserveOrder ? this : new AwaitQueryOptions(MaxConcurrency, Scheduler, value); + public SelectAsyncOptions WithPreserveOrder(bool value) => + value == PreserveOrder ? this : new SelectAsyncOptions(MaxConcurrency, Scheduler, value); } /// @@ -118,13 +118,13 @@ public AwaitQueryOptions WithPreserveOrder(bool value) => /// /// The type of the source elements. - public interface IAwaitQuery : IEnumerable + public interface ISelectAsyncEnumerable : IEnumerable { /// /// The options to apply to this asynchronous projection operation. /// - AwaitQueryOptions Options { get; } + SelectAsyncOptions Options { get; } /// /// Returns a new asynchronous projection operation that will use the @@ -135,7 +135,7 @@ public interface IAwaitQuery : IEnumerable /// Returns a new sequence that projects asynchronously using the /// supplied options. - IAwaitQuery WithOptions(AwaitQueryOptions options); + ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options); } static partial class ExperimentalEnumerable @@ -148,7 +148,7 @@ static partial class ExperimentalEnumerable /// The source sequence. /// The converted sequence. - public static IEnumerable AsSequential(this IAwaitQuery source) => + public static IEnumerable AsSequential(this ISelectAsyncEnumerable source) => source.MaxConcurrency(1); /// @@ -162,7 +162,7 @@ public static IEnumerable AsSequential(this IAwaitQuery source) => /// A sequence that projects results asynchronously using the given /// concurrency limit. - public static IAwaitQuery MaxConcurrency(this IAwaitQuery source, int value) => + public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int value) => source.WithOptions(source.Options.WithMaxConcurrency(value)); /// @@ -175,7 +175,7 @@ public static IAwaitQuery MaxConcurrency(this IAwaitQuery source, int v /// A sequence that projects results asynchronously using no defined /// limitation on concurrency. - public static IAwaitQuery UnboundedConcurrency(this IAwaitQuery source) => + public static ISelectAsyncEnumerable UnboundedConcurrency(this ISelectAsyncEnumerable source) => source.WithOptions(source.Options.WithMaxConcurrency(null)); /// @@ -189,7 +189,7 @@ public static IAwaitQuery UnboundedConcurrency(this IAwaitQuery source) /// A sequence that projects results asynchronously using the given /// scheduler. - public static IAwaitQuery Scheduler(this IAwaitQuery source, TaskScheduler value) + public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable source, TaskScheduler value) { if (source == null) throw new ArgumentNullException(nameof(source)); if (value == null) throw new ArgumentNullException(nameof(value)); @@ -210,7 +210,7 @@ public static IAwaitQuery Scheduler(this IAwaitQuery source, TaskSchedu /// results will be yielded in order. /// - public static IAwaitQuery AsOrdered(this IAwaitQuery source) => + public static ISelectAsyncEnumerable AsOrdered(this ISelectAsyncEnumerable source) => PreserveOrder(source, true); /// @@ -225,7 +225,7 @@ public static IAwaitQuery AsOrdered(this IAwaitQuery source) => /// guarantee of returning results in the order of the source /// sequence. - public static IAwaitQuery AsUnordered(this IAwaitQuery source) => + public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerable source) => PreserveOrder(source, false); /// @@ -244,7 +244,7 @@ public static IAwaitQuery AsUnordered(this IAwaitQuery source) => /// results order or unordered based on /// . - public static IAwaitQuery PreserveOrder(this IAwaitQuery source, bool value) => + public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source, bool value) => source.WithOptions(source.Options.WithPreserveOrder(value)); /// @@ -272,10 +272,10 @@ public static IAwaitQuery PreserveOrder(this IAwaitQuery source, bool v /// that some tasks will be wasted, those that are in flight. /// - public static IAwaitQuery Await( + public static ISelectAsyncEnumerable Await( this IEnumerable> source) { - return source.Await((e, _) => e); + return source.SelectAsync((e, _) => e); } /// @@ -313,7 +313,7 @@ public static IAwaitQuery Await( /// thread-agnostic. /// - public static IAwaitQuery Await( + public static ISelectAsyncEnumerable SelectAsync( this IEnumerable source, Func> selector) { if (source == null) throw new ArgumentNullException(nameof(source)); @@ -522,30 +522,30 @@ static async Task CollectToAsync( static class SelectAsyncEnumerable { - public static IAwaitQuery + public static ISelectAsyncEnumerable Create( - Func> impl, - AwaitQueryOptions options = null) => - new AwaitQuery(impl, options); + Func> impl, + SelectAsyncOptions options = null) => + new SelectAsyncEnumerable(impl, options); } - sealed class AwaitQuery : IAwaitQuery + sealed class SelectAsyncEnumerable : ISelectAsyncEnumerable { - readonly Func> _impl; + readonly Func> _impl; - public AwaitQuery(Func> impl, - AwaitQueryOptions options = null) + public SelectAsyncEnumerable(Func> impl, + SelectAsyncOptions options = null) { _impl = impl; - Options = options ?? AwaitQueryOptions.Default; + Options = options ?? SelectAsyncOptions.Default; } - public AwaitQueryOptions Options { get; } + public SelectAsyncOptions Options { get; } - public IAwaitQuery WithOptions(AwaitQueryOptions options) + public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); - return Options == options ? this : new AwaitQuery(_impl, options); + return Options == options ? this : new SelectAsyncEnumerable(_impl, options); } public IEnumerator GetEnumerator() => _impl(Options).GetEnumerator(); From 09497dc85854569ad9f510ec3bdda5761741ace2 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Wed, 7 Mar 2018 08:39:44 +0100 Subject: [PATCH 50/56] Rename SelectAsync to Await & update doc wording This is what commit d2857e8f78268f41e3a4babdff2c64c02930ccb1 should have been. --- MoreLinq.Test/NullArgumentTest.cs | 8 +- .../Experimental/{SelectAsync.cs => Await.cs} | 199 +++++++++--------- 2 files changed, 104 insertions(+), 103 deletions(-) rename MoreLinq/Experimental/{SelectAsync.cs => Await.cs} (72%) diff --git a/MoreLinq.Test/NullArgumentTest.cs b/MoreLinq.Test/NullArgumentTest.cs index 3cdb67bea..4cf3fb3cf 100644 --- a/MoreLinq.Test/NullArgumentTest.cs +++ b/MoreLinq.Test/NullArgumentTest.cs @@ -217,11 +217,11 @@ public System.Linq.IOrderedEnumerable CreateOrderedEnumerable(Func : Enumerable, - Experimental.ISelectAsyncEnumerable + public class AwaitQuery : Enumerable, + Experimental.IAwaitQuery { - public Experimental.SelectAsyncOptions Options => Experimental.SelectAsyncOptions.Default; - public Experimental.ISelectAsyncEnumerable WithOptions(Experimental.SelectAsyncOptions options) => this; + public Experimental.AwaitQueryOptions Options => Experimental.AwaitQueryOptions.Default; + public Experimental.IAwaitQuery WithOptions(Experimental.AwaitQueryOptions options) => this; } #endif diff --git a/MoreLinq/Experimental/SelectAsync.cs b/MoreLinq/Experimental/Await.cs similarity index 72% rename from MoreLinq/Experimental/SelectAsync.cs rename to MoreLinq/Experimental/Await.cs index 7549c5f6e..6a98d0bbc 100644 --- a/MoreLinq/Experimental/SelectAsync.cs +++ b/MoreLinq/Experimental/Await.cs @@ -30,24 +30,25 @@ namespace MoreLinq.Experimental using System.Threading.Tasks; /// - /// Represents options for an asynchronous projection operation. + /// Represents options for a query whose results evaluate asynchronously. /// - public sealed class SelectAsyncOptions + public sealed class AwaitQueryOptions { /// - /// The default options an asynchronous projection operation. + /// The default options used for a query whose results evaluate + /// asynchronously. /// - public static readonly SelectAsyncOptions Default = - new SelectAsyncOptions(null /* = unbounded concurrency */, - TaskScheduler.Default, - preserveOrder: false); + public static readonly AwaitQueryOptions Default = + new AwaitQueryOptions(null /* = unbounded concurrency */, + TaskScheduler.Default, + preserveOrder: false); /// /// Gets a positive (non-zero) integer that specifies the maximum - /// projections to run concurrenctly or null to mean unlimited - /// concurrency. + /// number of asynchronous operations to have in-flight concurrently + /// or null to mean unlimited concurrency. /// public int? MaxConcurrency { get; } @@ -60,12 +61,12 @@ public sealed class SelectAsyncOptions /// /// Get a Boolean that determines whether results should be ordered - /// the same as the projection source. + /// the same as the source. /// public bool PreserveOrder { get; } - SelectAsyncOptions(int? maxConcurrency, TaskScheduler scheduler, bool preserveOrder) + AwaitQueryOptions(int? maxConcurrency, TaskScheduler scheduler, bool preserveOrder) { MaxConcurrency = maxConcurrency == null || maxConcurrency > 0 ? maxConcurrency @@ -84,8 +85,8 @@ public sealed class SelectAsyncOptions /// Use null to mean unbounded concurrency. /// Options with the new setting. - public SelectAsyncOptions WithMaxConcurrency(int? value) => - value == MaxConcurrency ? this : new SelectAsyncOptions(value, Scheduler, PreserveOrder); + public AwaitQueryOptions WithMaxConcurrency(int? value) => + value == MaxConcurrency ? this : new AwaitQueryOptions(value, Scheduler, PreserveOrder); /// /// Returns new options with the given scheduler. @@ -94,13 +95,12 @@ public SelectAsyncOptions WithMaxConcurrency(int? value) => /// The scheduler to use to for the workhorse task. /// Options with the new setting. - public SelectAsyncOptions WithScheduler(TaskScheduler value) => - value == Scheduler ? this : new SelectAsyncOptions(MaxConcurrency, value, PreserveOrder); + public AwaitQueryOptions WithScheduler(TaskScheduler value) => + value == Scheduler ? this : new AwaitQueryOptions(MaxConcurrency, value, PreserveOrder); /// /// Returns new options with the given Boolean indicating whether or - /// not the projections should be returned in the order of the - /// projection source. + /// not the results should be returned in the order of the source. /// /// /// A Boolean where true means results are in source order and @@ -108,88 +108,88 @@ public SelectAsyncOptions WithScheduler(TaskScheduler value) => /// efficiency. /// Options with the new setting. - public SelectAsyncOptions WithPreserveOrder(bool value) => - value == PreserveOrder ? this : new SelectAsyncOptions(MaxConcurrency, Scheduler, value); + public AwaitQueryOptions WithPreserveOrder(bool value) => + value == PreserveOrder ? this : new AwaitQueryOptions(MaxConcurrency, Scheduler, value); } /// - /// An representing an asynchronous projection. + /// Represents a sequence whose elements or results evaluate asynchronously. /// /// /// The type of the source elements. - public interface ISelectAsyncEnumerable : IEnumerable + public interface IAwaitQuery : IEnumerable { /// - /// The options to apply to this asynchronous projection operation. + /// The options that determine how the sequence evaluation behaves when + /// it is iterated. /// - SelectAsyncOptions Options { get; } + AwaitQueryOptions Options { get; } /// - /// Returns a new asynchronous projection operation that will use the - /// given options. + /// Returns a new query that will use the given options. /// /// The new options to use. /// - /// Returns a new sequence that projects asynchronously using the - /// supplied options. + /// Returns a new query using the supplied options. + /// - ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options); + IAwaitQuery WithOptions(AwaitQueryOptions options); } static partial class ExperimentalEnumerable { /// - /// Converts an asynchronous projection operation to use sequential - /// evaluation. + /// Converts a query whose results evaluate asynchronously to use + /// sequential instead of concurrentl evaluation. /// /// The type of the source elements. /// The source sequence. /// The converted sequence. - public static IEnumerable AsSequential(this ISelectAsyncEnumerable source) => + public static IEnumerable AsSequential(this IAwaitQuery source) => source.MaxConcurrency(1); /// - /// Returns a new asynchronous projection operation with the given + /// Returns a query whose results evaluate asynchronously to use a /// concurrency limit. /// /// The type of the source elements. /// The source sequence. /// /// - /// A sequence that projects results asynchronously using the given + /// A query whose results evaluate asynchronously using the given /// concurrency limit. - public static ISelectAsyncEnumerable MaxConcurrency(this ISelectAsyncEnumerable source, int value) => + public static IAwaitQuery MaxConcurrency(this IAwaitQuery source, int value) => source.WithOptions(source.Options.WithMaxConcurrency(value)); /// - /// Returns a new asynchronous projection operation with no defined - /// limitation on concurrency. + /// Returns a query whose results evaluate asynchronously and + /// concurrently with no defined limitation on concurrency. /// /// The type of the source elements. /// The source sequence. /// - /// A sequence that projects results asynchronously using no defined + /// A query whose results evaluate asynchronously using no defined /// limitation on concurrency. - public static ISelectAsyncEnumerable UnboundedConcurrency(this ISelectAsyncEnumerable source) => + public static IAwaitQuery UnboundedConcurrency(this IAwaitQuery source) => source.WithOptions(source.Options.WithMaxConcurrency(null)); /// - /// Returns a new asynchronous projection operation with the given - /// scheduler. + /// Returns a query whose results evaluate asynchronously and uses the + /// given scheduler for the workhorse task. /// /// The type of the source elements. /// The source sequence. /// The scheduler to use. /// - /// A sequence that projects results asynchronously using the given - /// scheduler. + /// A query whose results evaluate asynchronously and uses the + /// given scheduler for the workhorse task. - public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable source, TaskScheduler value) + public static IAwaitQuery Scheduler(this IAwaitQuery source, TaskScheduler value) { if (source == null) throw new ArgumentNullException(nameof(source)); if (value == null) throw new ArgumentNullException(nameof(value)); @@ -197,41 +197,39 @@ public static ISelectAsyncEnumerable Scheduler(this ISelectAsyncEnumerable } /// - /// Returns a new asynchronous projection operation for which the - /// results will be returned in the order of the source sequence. + /// Returns a query whose results evaluate asynchronously but which + /// are returned in the order of the source. /// /// The type of the source elements. /// The source sequence. /// - /// A sequence that projects results asynchronously but returns - /// results in the order of the source sequence. + /// A query whose results evaluate asynchronously but which + /// are returned in the order of the source. /// - /// Internally, the projections will be done concurrently but the - /// results will be yielded in order. + /// Internally, the asynchronous operations will be done concurrently + /// but the results will be yielded in order. /// - public static ISelectAsyncEnumerable AsOrdered(this ISelectAsyncEnumerable source) => + public static IAwaitQuery AsOrdered(this IAwaitQuery source) => PreserveOrder(source, true); /// - /// Returns a new asynchronous projection operation for which the - /// results are no longer guaranteed to be in the order of the source - /// sequence. + /// Returns a query whose results evaluate asynchronously but which + /// are returned without guarantee of the source order. /// /// The type of the source elements. /// The source sequence. /// - /// A sequence that projects results asynchronously but without any - /// guarantee of returning results in the order of the source - /// sequence. + /// A query whose results evaluate asynchronously but which + /// are returned without guarantee of the source order. - public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerable source) => + public static IAwaitQuery AsUnordered(this IAwaitQuery source) => PreserveOrder(source, false); /// - /// Returns a new asynchronous projection operation with the given - /// Boolean indicating whether or not the projections should be - /// returned in the order of the projection source. + /// Returns a query whose results evaluate asynchronously and a Boolean + /// argument indicating whether the source order of the results is + /// preserved. /// /// The type of the source elements. /// The source sequence. @@ -240,24 +238,24 @@ public static ISelectAsyncEnumerable AsUnordered(this ISelectAsyncEnumerab /// false means that results can be delivered in order of /// efficiency. /// - /// A sequence that projects results asynchronously and returns the - /// results order or unordered based on - /// . + /// A query whose results evaluate asynchronously and returns the + /// results ordered or unordered based on . + /// - public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumerable source, bool value) => + public static IAwaitQuery PreserveOrder(this IAwaitQuery source, bool value) => source.WithOptions(source.Options.WithPreserveOrder(value)); /// - /// Creates a sequence that streams the result of each task in the - /// source sequence as it completes. + /// Creates a sequence query that streams the result of each task in + /// the source sequence as it completes asynchronously. /// /// /// The type of each task's result as well as the type of the elements /// of the resulting sequence. /// The source sequence of tasks. /// - /// A sequence that streams the result of each task in - /// as it completes. + /// A sequence query that streams the result of each task in + /// as it completes asynchronously. /// /// /// @@ -272,52 +270,55 @@ public static ISelectAsyncEnumerable PreserveOrder(this ISelectAsyncEnumer /// that some tasks will be wasted, those that are in flight. /// - public static ISelectAsyncEnumerable Await( + public static IAwaitQuery Await( this IEnumerable> source) { - return source.SelectAsync((e, _) => e); + return source.Await((e, _) => e); } /// - /// Asynchronously projects each element of a sequence to its new form. - /// The projection function receives a - /// as an additional argument that can be used to abort any asynchronous - /// operations in flight. + /// Creates a sequence query that streams the result of each task in + /// the source sequence as it completes asynchronously. A + /// is passed for each asynchronous + /// evaluation to abort any asynchronous operations in flight if the + /// sequence is not full iterated. /// /// The type of the source elements. /// The type of the result elements. /// The source sequence. - /// A transform function to apply to each - /// element, the second parameter of which is a + /// A function to begin the asynchronous + /// evaluation of each element, the second parameter of which is a /// that can be used to abort /// asynchronous operations. /// - /// A sequence that projects results asynchronously. + /// A sequence query that stream its results as they are + /// evaluated asynchronously. /// /// /// /// This method uses deferred execution semantics. The results are - /// yielded as each asynchronous projection completes and, by default, + /// yielded as each asynchronous evaluation completes and, by default, /// not guaranteed to be based on the source sequence order. If order /// is important, compose further with /// . /// - /// This method starts a new task where the asynchronous projections - /// are started and awaited. If the resulting sequence is partially + /// This method starts a new task where the asynchronous evaluations + /// take place and awaited. If the resulting sequence is partially /// consumed then there's a good chance that some projection work will /// be wasted and a cooperative effort is done that depends on the - /// projection function (via a as its - /// second argument) to cancel those in flight. + /// function (via a + /// as its second argument) to cancel + /// those in flight. /// - /// The function should be designed to be + /// The function should be designed to be /// thread-agnostic. /// - public static ISelectAsyncEnumerable SelectAsync( - this IEnumerable source, Func> selector) + public static IAwaitQuery Await( + this IEnumerable source, Func> evaluator) { if (source == null) throw new ArgumentNullException(nameof(source)); - if (selector == null) throw new ArgumentNullException(nameof(selector)); + if (evaluator == null) throw new ArgumentNullException(nameof(evaluator)); return SelectAsyncEnumerable.Create( @@ -334,7 +335,7 @@ IEnumerable _(int maxConcurrency, TaskScheduler scheduler, bool ordered var enumerator = source.Index() - .Select(e => (e.Key, Task: selector(e.Value, cancellationToken))) + .Select(e => (e.Key, Task: evaluator(e.Value, cancellationToken))) .GetEnumerator(); IDisposable disposable = enumerator; // disables AccessToDisposedClosure warnings @@ -522,30 +523,30 @@ static async Task CollectToAsync( static class SelectAsyncEnumerable { - public static ISelectAsyncEnumerable + public static IAwaitQuery Create( - Func> impl, - SelectAsyncOptions options = null) => - new SelectAsyncEnumerable(impl, options); + Func> impl, + AwaitQueryOptions options = null) => + new AwaitQuery(impl, options); } - sealed class SelectAsyncEnumerable : ISelectAsyncEnumerable + sealed class AwaitQuery : IAwaitQuery { - readonly Func> _impl; + readonly Func> _impl; - public SelectAsyncEnumerable(Func> impl, - SelectAsyncOptions options = null) + public AwaitQuery(Func> impl, + AwaitQueryOptions options = null) { _impl = impl; - Options = options ?? SelectAsyncOptions.Default; + Options = options ?? AwaitQueryOptions.Default; } - public SelectAsyncOptions Options { get; } + public AwaitQueryOptions Options { get; } - public ISelectAsyncEnumerable WithOptions(SelectAsyncOptions options) + public IAwaitQuery WithOptions(AwaitQueryOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); - return Options == options ? this : new SelectAsyncEnumerable(_impl, options); + return Options == options ? this : new AwaitQuery(_impl, options); } public IEnumerator GetEnumerator() => _impl(Options).GetEnumerator(); From 7c9273d79f6f37a04de241eadc0b2cafe6a4c2fa Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 5 Apr 2018 09:07:30 +0200 Subject: [PATCH 51/56] Fix doc typo --- MoreLinq/Experimental/Await.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index 6a98d0bbc..e6028e886 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -142,7 +142,7 @@ static partial class ExperimentalEnumerable { /// /// Converts a query whose results evaluate asynchronously to use - /// sequential instead of concurrentl evaluation. + /// sequential instead of concurrent evaluation. /// /// The type of the source elements. /// The source sequence. From 3bcaa35fabc62641cc85da0f3746292a85effdc8 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 5 Apr 2018 10:13:18 +0200 Subject: [PATCH 52/56] Doc remarks about effects of hot/cold tasks --- MoreLinq/Experimental/Await.cs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/MoreLinq/Experimental/Await.cs b/MoreLinq/Experimental/Await.cs index e6028e886..2907ef398 100644 --- a/MoreLinq/Experimental/Await.cs +++ b/MoreLinq/Experimental/Await.cs @@ -268,6 +268,14 @@ public static IAwaitQuery PreserveOrder(this IAwaitQuery source, bool v /// This method starts a new task where the tasks are awaited. If the /// resulting sequence is partially consumed then there's a good chance /// that some tasks will be wasted, those that are in flight. + /// + /// The tasks in are already assumed to be in + /// flight therefore changing concurrency options via + /// , or + /// will only change how many + /// tasks are awaited at any given moment, not how many will be + /// kept in flight. For the latter effect, use the other overload. + /// /// public static IAwaitQuery Await( @@ -312,6 +320,15 @@ public static IAwaitQuery Await( /// /// The function should be designed to be /// thread-agnostic. + /// + /// The task returned by should be started + /// when the function is called (and not just a mere projection) + /// otherwise changing concurrency options via + /// , or + /// will only change how many + /// tasks are awaited at any given moment, not how many will be + /// kept in flight. + /// /// public static IAwaitQuery Await( From 551ffbb43bfbad5de78188bbcf15840e986a694a Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 5 Apr 2018 10:40:30 +0200 Subject: [PATCH 53/56] Add Await to list of operators in package description --- MoreLinq/MoreLinq.csproj | 1 + 1 file changed, 1 insertion(+) diff --git a/MoreLinq/MoreLinq.csproj b/MoreLinq/MoreLinq.csproj index d4a22ad6c..131eda40d 100644 --- a/MoreLinq/MoreLinq.csproj +++ b/MoreLinq/MoreLinq.csproj @@ -14,6 +14,7 @@ - AssertCount - AtLeast - AtMost + - Await - Batch - Cartesian - CountBetween From aa9dc5dc9eacfd1da9a3d4d6376068d5987917fa Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Thu, 5 Apr 2018 10:40:48 +0200 Subject: [PATCH 54/56] Add Await to readme --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index 25bfc9cc9..8dca32043 100644 --- a/README.md +++ b/README.md @@ -569,6 +569,24 @@ from each of the argument sequences. This method has 3 overloads. +## Experimental Operators + +THESE METHODS ARE EXPERIMENTAL. THEY MAY BE UNSTABLE AND UNTESTED. THEY MAY BE +REMOVED FROM A FUTURE MAJOR OR MINOR RELEASE AND POSSIBLY WITHOUT NOTICE. USE +THEM AT YOUR OWN RISK. THE METHODS ARE PUBLISHED FOR FIELD EXPERIMENTATION TO +SOLICIT FEEDBACK ON THEIR UTILITY AND DESIGN/IMPLEMENTATION DEFECTS. + +Use of experimental methods requires importing the `MoreLinq.Experimental` +namespace. + +### Await + +Creates a sequence query that streams the result of each task in the source +sequence as it completes asynchronously. + +This method has 2 overloads. + + [#122]: https://github.com/morelinq/MoreLINQ/issues/122 [dict]: https://docs.microsoft.com/en-us/dotnet/api/System.Collections.Generic.Dictionary-2 [kvp]: https://docs.microsoft.com/en-us/dotnet/api/System.Collections.Generic.KeyValuePair-2 From de2d232f58bd2668cc230f4a59be21fc883f4cc4 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 9 Apr 2018 09:43:50 +0200 Subject: [PATCH 55/56] Add await to new ops list in release notes --- MoreLinq/MoreLinq.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MoreLinq/MoreLinq.csproj b/MoreLinq/MoreLinq.csproj index 006ef1c52..02539451b 100644 --- a/MoreLinq/MoreLinq.csproj +++ b/MoreLinq/MoreLinq.csproj @@ -119,7 +119,7 @@ true morelinq linq;extensions - Adds new operators: CompareCount, Transpose. See also https://github.com/morelinq/MoreLINQ/wiki/API-Changes. + Adds new operators: Await, CompareCount, Transpose. See also https://github.com/morelinq/MoreLINQ/wiki/API-Changes. https://morelinq.github.io/ http://www.apache.org/licenses/LICENSE-2.0 false From 70edcbeb1fea4741f448e8a636892b8a38127c15 Mon Sep 17 00:00:00 2001 From: Atif Aziz Date: Mon, 9 Apr 2018 09:56:28 +0200 Subject: [PATCH 56/56] Mark Await as experimental in package metadata --- MoreLinq/MoreLinq.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/MoreLinq/MoreLinq.csproj b/MoreLinq/MoreLinq.csproj index 02539451b..67c6f4862 100644 --- a/MoreLinq/MoreLinq.csproj +++ b/MoreLinq/MoreLinq.csproj @@ -14,7 +14,7 @@ - AssertCount - AtLeast - AtMost - - Await + - Await (EXPERIMENTAL) - Batch - Cartesian - CountBetween @@ -119,7 +119,7 @@ true morelinq linq;extensions - Adds new operators: Await, CompareCount, Transpose. See also https://github.com/morelinq/MoreLINQ/wiki/API-Changes. + Adds new operators: Await (EXPERIMENTAL), CompareCount, Transpose. See also https://github.com/morelinq/MoreLINQ/wiki/API-Changes. https://morelinq.github.io/ http://www.apache.org/licenses/LICENSE-2.0 false