-
Notifications
You must be signed in to change notification settings - Fork 756
/
Copy pathMerge.cs
357 lines (300 loc) · 18.9 KB
/
Merge.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace System.Linq
{
public static partial class AsyncEnumerableEx
{
/// <summary>
/// Merges elements from all of the specified async-enumerable sequences into a single async-enumerable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequences.</typeparam>
/// <param name="sources">Async-enumerable sequences.</param>
/// <returns>The async-enumerable sequence that merges the elements of the async-enumerable sequences.</returns>
/// <exception cref="ArgumentNullException"><paramref name="sources"/> is null.</exception>
public static IAsyncEnumerable<TSource> Merge<TSource>(params IAsyncEnumerable<TSource>[] sources)
{
if (sources == null)
throw Error.ArgumentNull(nameof(sources));
return Core(sources);
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource>[] sources, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
#if USE_FAIR_AND_CHEAPER_MERGE
//
// This new implementation of Merge differs from the original one in a few ways:
//
// - It's cheaper because:
// - no conversion from ValueTask<bool> to Task<bool> takes place using AsTask,
// - we don't instantiate Task.WhenAny tasks for each iteration.
// - It's fairer because:
// - the MoveNextAsync tasks are awaited concurently, but completions are queued,
// instead of awaiting a new WhenAny task where "left" sources have preferential
// treatment over "right" sources.
//
{
var count = sources.Length;
var enumerators = new IAsyncEnumerator<TSource>[count];
var moveNextTasks = new ValueTask<bool>[count];
try
{
for (var i = 0; i < count; i++)
{
IAsyncEnumerator<TSource> enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
enumerators[i] = enumerator;
// REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
// operations immediately. An alternative would be to do this in a separate stage, thus
// preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
// any MoveNextAsync calls before all enumerators are acquired (or an exception has
// occurred doing so).
moveNextTasks[i] = enumerator.MoveNextAsync();
}
var whenAny = TaskExt.WhenAny(moveNextTasks);
int active = count;
while (active > 0)
{
int index = await whenAny;
IAsyncEnumerator<TSource> enumerator = enumerators[index];
ValueTask<bool> moveNextTask = moveNextTasks[index];
if (!await moveNextTask.ConfigureAwait(false))
{
//
// Replace the task in our array by a completed task to make finally logic easier. Note that
// the WhenAnyValueTask object has a reference to our array (i.e. no copy is made), so this
// gets rid of any resources the original task may have held onto. However, we *don't* call
// whenAny.Replace to set this value, because it'd attach an awaiter to the already completed
// task, causing spurious wake-ups when awaiting whenAny.
//
moveNextTasks[index] = new ValueTask<bool>();
// REVIEW: The original implementation did not dispose eagerly, which could lead to resource
// leaks when merged with other long-running sequences.
enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
await enumerator.DisposeAsync().ConfigureAwait(false);
active--;
}
else
{
TSource item = enumerator.Current;
//
// Replace the task using whenAny.Replace, which will write it to the moveNextTasks array, and
// will start awaiting the task. Note we don't have to write to moveNextTasks ourselves because
// the whenAny object has a reference to it (i.e. no copy is made).
//
whenAny.Replace(index, enumerator.MoveNextAsync());
yield return item;
}
}
}
finally
{
// REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
// additional uncontrollable source of concurrency and the sequential resource acquisition. In
// this modern implementation, we release resources in opposite order as we acquired them, thus
// guaranteeing determinism (and mimicking a series of nested `await using` statements).
// REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
// the operator implementation, we should make this symmetric and first await all in flight
// MoveNextAsync operations, prior to disposing the enumerators.
var errors = default(List<Exception>);
for (var i = count - 1; i >= 0; i--)
{
ValueTask<bool> moveNextTask = moveNextTasks[i];
IAsyncEnumerator<TSource> enumerator = enumerators[i];
try
{
try
{
//
// Await the task to ensure outstanding work is completed prior to performing a dispose
// operation. Note that we don't have to do anything special for tasks belonging to
// enumerators that have finished; we swapped in a placeholder completed task.
//
// REVIEW: This adds an additional continuation to all of the pending tasks (note that
// whenAny also has registered one). The whenAny object will be collectible
// after all of these complete. Alternatively, we could drain via whenAny, by
// awaiting it until the active count drops to 0. This saves on attaching the
// additional continuations, but we need to decide on order of dispose. Right
// now, we dispose in opposite order of acquiring the enumerators, with the
// exception of enumerators that were disposed eagerly upon early completion.
// Should we care about the dispose order at all?
_ = await moveNextTask.ConfigureAwait(false);
}
finally
{
if (enumerator != null)
{
await enumerator.DisposeAsync().ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
if (errors == null)
{
errors = new List<Exception>();
}
errors.Add(ex);
}
}
// NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
// instead of the original exception that may have led to running the finally block. This is similar
// to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
// concurrent sequences being merged).
if (errors != null)
{
throw new AggregateException(errors);
}
}
}
#else
{
var count = sources.Length;
var enumerators = new IAsyncEnumerator<TSource>?[count];
var moveNextTasks = new Task<bool>[count];
try
{
for (var i = 0; i < count; i++)
{
var enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
enumerators[i] = enumerator;
// REVIEW: This follows the lead of the original implementation where we kick off MoveNextAsync
// operations immediately. An alternative would be to do this in a separate stage, thus
// preventing concurrency across MoveNextAsync and GetAsyncEnumerator calls and avoiding
// any MoveNextAsync calls before all enumerators are acquired (or an exception has
// occurred doing so).
moveNextTasks[i] = enumerator.MoveNextAsync().AsTask();
}
var active = count;
while (active > 0)
{
// REVIEW: Performance of WhenAny may be an issue when called repeatedly like this. We should
// measure and could consider operating directly on the ValueTask<bool> objects, thus
// also preventing the Task<bool> allocations from AsTask.
var moveNextTask = await Task.WhenAny(moveNextTasks).ConfigureAwait(false);
// REVIEW: This seems wrong. AsTask can return the original Task<bool> (if the ValueTask<bool>
// is wrapping one) or return a singleton instance for true and false, at which point
// the use of IndexOf may pick an element closer to the start of the array because of
// reference equality checks and aliasing effects. See GetTaskForResult in the BCL.
var index = Array.IndexOf(moveNextTasks, moveNextTask);
var enumerator = enumerators[index]!; // NB: Only gets set to null after setting task to Never.
if (!await moveNextTask.ConfigureAwait(false))
{
moveNextTasks[index] = TaskExt.Never;
// REVIEW: The original implementation did not dispose eagerly, which could lead to resource
// leaks when merged with other long-running sequences.
enumerators[index] = null; // NB: Avoids attempt at double dispose in finally if disposing fails.
await enumerator.DisposeAsync().ConfigureAwait(false);
active--;
}
else
{
var item = enumerator.Current;
moveNextTasks[index] = enumerator.MoveNextAsync().AsTask();
yield return item;
}
}
}
finally
{
// REVIEW: The original implementation performs a concurrent dispose, which seems undesirable given the
// additional uncontrollable source of concurrency and the sequential resource acquisition. In
// this modern implementation, we release resources in opposite order as we acquired them, thus
// guaranteeing determinism (and mimicking a series of nested `await using` statements).
// REVIEW: If we decide to phase GetAsyncEnumerator and the initial MoveNextAsync calls at the start of
// the operator implementation, we should make this symmetric and first await all in flight
// MoveNextAsync operations, prior to disposing the enumerators.
var errors = default(List<Exception>);
for (var i = count - 1; i >= 0; i--)
{
var moveNextTask = moveNextTasks[i];
var enumerator = enumerators[i];
try
{
try
{
if (moveNextTask != null && moveNextTask != TaskExt.Never)
{
_ = await moveNextTask.ConfigureAwait(false);
}
}
finally
{
if (enumerator != null)
{
await enumerator.DisposeAsync().ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
(errors ??= []).Add(ex);
}
}
// NB: If we had any errors during cleaning (and awaiting pending operations), we throw these exceptions
// instead of the original exception that may have led to running the finally block. This is similar
// to throwing from any finally block (except that we catch all exceptions to ensure cleanup of all
// concurrent sequences being merged).
if (errors != null)
{
#if NET6_0_OR_GREATER
#pragma warning disable CA2219 // Do not raise an exception from within a finally clause
#endif
throw new AggregateException(errors);
#if NET6_0_OR_GREATER
#pragma warning restore CA2219
#endif
}
}
}
#endif
}
/// <summary>
/// Merges elements from all async-enumerable sequences in the given enumerable sequence into a single async-enumerable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequences.</typeparam>
/// <param name="sources">Enumerable sequence of async-enumerable sequences.</param>
/// <returns>The async-enumerable sequence that merges the elements of the async-enumerable sequences.</returns>
/// <exception cref="ArgumentNullException"><paramref name="sources"/> is null.</exception>
public static IAsyncEnumerable<TSource> Merge<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
{
if (sources == null)
throw Error.ArgumentNull(nameof(sources));
//
// REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
// avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation. It is
// unfortunate though that the Merge overload accepting an array has always been concurrent, so we can't
// change that either (in order to have consistency where Merge is non-concurrent, and ConcurrentMerge
// is). We could consider a breaking change to Ix Async to streamline this, but we should do so when
// shipping with the BCL interfaces (which is already a breaking change to existing Ix Async users). If
// we go that route, we can either have:
//
// - All overloads of Merge are concurrent
// - and continue to be named Merge, or,
// - are renamed to ConcurrentMerge for clarity (likely alongside a ConcurrentZip).
// - All overloads of Merge are non-concurrent
// - and are simply SelectMany operator macros (maybe more optimized)
// - Have ConcurrentMerge next to Merge overloads
// - where ConcurrentMerge may need a degree of concurrency parameter (and maybe other options), and,
// - where the overload set of both families may be asymmetric
//
return sources.ToAsyncEnumerable().SelectMany(source => source);
}
/// <summary>
/// Merges elements from all inner async-enumerable sequences into a single async-enumerable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequences.</typeparam>
/// <param name="sources">Async-enumerable sequence of inner async-enumerable sequences.</param>
/// <returns>The async-enumerable sequence that merges the elements of the inner sequences.</returns>
/// <exception cref="ArgumentNullException"><paramref name="sources"/> is null.</exception>
public static IAsyncEnumerable<TSource> Merge<TSource>(this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources)
{
if (sources == null)
throw Error.ArgumentNull(nameof(sources));
//
// REVIEW: This implementation does not exploit concurrency. We should not introduce such behavior in order to
// avoid breaking changes, but we could introduce a parallel ConcurrentMerge implementation.
//
return sources.SelectMany(source => source);
}
}
}