Skip to content

Commit

Permalink
Merge 0fb644d into 02627a5
Browse files Browse the repository at this point in the history
  • Loading branch information
atifaziz authored Feb 17, 2023
2 parents 02627a5 + 0fb644d commit 28996d4
Show file tree
Hide file tree
Showing 9 changed files with 675 additions and 2 deletions.
3 changes: 3 additions & 0 deletions MoreLinq.Test/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ dotnet_diagnostic.CA1707.severity = none
# CA1308: Normalize strings to uppercase
dotnet_diagnostic.CA1308.severity = none

# CA2007: Consider calling ConfigureAwait on the awaited task
dotnet_diagnostic.CA2007.severity = suggestion

# IDE0047: Remove unnecessary parentheses
dotnet_diagnostic.IDE0047.severity = suggestion

Expand Down
161 changes: 161 additions & 0 deletions MoreLinq.Test/Async/MergeTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#region License and Terms
// MoreLINQ - Extensions to LINQ to Objects
// Copyright (c) 2020 Atif Aziz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion

namespace MoreLinq.Test.Async
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Experimental.Async;
using NUnit.Framework;
using Throws = Throws;

[TestFixture]
public class MergeTest
{
[TestCase(0)]
[TestCase(-1)]
[TestCase(-2)]
[TestCase(-3)]
public void InvalidMaxConcurrent(int n)
{
var sources = new IAsyncEnumerable<object>[0];
void Act() => sources.Merge(n);
Assert.That(Act, Throws.ArgumentOutOfRangeException("maxConcurrent"));
}

[Test]
public async Task MergeSync()
{
using var ts1 = AsyncEnumerable.Range(1, 1).AsTestingSequence();
using var ts2 = AsyncEnumerable.Range(1, 2).AsTestingSequence();
using var ts3 = AsyncEnumerable.Range(1, 3).AsTestingSequence();
using var ts4 = AsyncEnumerable.Range(1, 4).AsTestingSequence();
using var ts5 = AsyncEnumerable.Range(1, 5).AsTestingSequence();

using var ts = TestingSequence.Of(ts1, ts2, ts3, ts4, ts5);
var result = await ts.Merge().ToListAsync();

Assert.That(result, Is.EqualTo(new[]
{
1,
1, 2,
1, 2, 3,
1, 2, 3, 4,
1, 2, 3, 4, 5,
}));
}

[Test]
public async Task MergeAsyncAll()
{
using var ts1 = AsyncEnumerable.Range(10, 1).Yield().AsTestingSequence();
using var ts2 = AsyncEnumerable.Range(20, 2).Yield().AsTestingSequence();
using var ts3 = AsyncEnumerable.Range(30, 3).Yield().AsTestingSequence();
using var ts4 = AsyncEnumerable.Range(40, 4).Yield().AsTestingSequence();
using var ts5 = AsyncEnumerable.Range(50, 5).Yield().AsTestingSequence();

using var ts = TestingSequence.Of(ts1, ts2, ts3, ts4, ts5);
var result = await ts.Merge().ToListAsync();

Assert.That(result, Is.EquivalentTo(new[]
{
10,
20, 21,
30, 31, 32,
40, 41, 42, 43,
50, 51, 52, 53, 54,
}));
}

sealed class AsyncControl<T>
{
sealed record State(TaskCompletionSource<T> TaskCompletionSource, T Result);

State? _state;

public Task<T> Result(T result)
{
if (_state is not null)
throw new InvalidOperationException();
_state = new State(new TaskCompletionSource<T>(), result);
return _state.TaskCompletionSource.Task;
}

public void Complete()
{
if (_state is not { } state)
throw new InvalidOperationException();
_state = null;
state.TaskCompletionSource.SetResult(state.Result);
}
}

[Test]
public async Task MergeAsyncSome()
{
var ac1 = new AsyncControl<int>();
var ac2 = new AsyncControl<int>();

async IAsyncEnumerable<int> Source1()
{
yield return await ac1.Result(1);
yield return 2;
yield return 3;
_ = await ac1.Result(0);
}

async IAsyncEnumerable<int> Source2()
{
yield return await ac2.Result(4);
}

using var ts1 = Source1().AsTestingSequence();
using var ts2 = Source2().AsTestingSequence();
using var sources = TestingSequence.Of(ts1, ts2);
var e = sources.Merge().GetAsyncEnumerator();

async Task<int> ExpectAsync(AsyncControl<int> control)
{
var t = e.MoveNextAsync();
Assert.That(t.IsCompleted, Is.False);
control.Complete();
Assert.That(await t, Is.True);
return e.Current;
}

async Task<int> ExpectSync()
{
var t = e.MoveNextAsync();
Assert.That(t.IsCompleted, Is.True);
Assert.That(await t, Is.True);
return e.Current;
}

Assert.That(await ExpectAsync(ac2), Is.EqualTo(4));
Assert.That(await ExpectAsync(ac1), Is.EqualTo(1));
Assert.That(ts2.IsDisposed, Is.True);
Assert.That(await ExpectSync(), Is.EqualTo(2));
Assert.That(await ExpectSync(), Is.EqualTo(3));

var t = e.MoveNextAsync();
Assert.That(t.IsCompleted, Is.False);
ac1.Complete();
Assert.That(await t, Is.False);
}
}
}
60 changes: 60 additions & 0 deletions MoreLinq.Test/Async/TestExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#region License and Terms
// MoreLINQ - Extensions to LINQ to Objects
// Copyright (c) 2020 Atif Aziz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion

namespace MoreLinq.Test.Async
{
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

static partial class TestExtensions
{
public static IAsyncEnumerable<T> Yield<T>(this IAsyncEnumerable<T> source) =>
Yield(source, _ => true);

public static IAsyncEnumerable<T>
Yield<T>(this IAsyncEnumerable<T> source,
Func<T, bool> predicate) =>
Yield(source, shouldEndSynchronously: false, predicate);

public static IAsyncEnumerable<T>
Yield<T>(this IAsyncEnumerable<T> source,
bool shouldEndSynchronously,
Func<T, bool> predicate)
{
if (source is null) throw new ArgumentNullException(nameof(source));
if (predicate is null) throw new ArgumentNullException(nameof(predicate));

return Async();

async IAsyncEnumerable<T> Async([EnumeratorCancellation]CancellationToken cancellationToken = default)
{
await foreach (var item in source.WithCancellation(cancellationToken))
{
if (predicate(item))
await Task.Yield();
yield return item;
}

if (!shouldEndSynchronously)
await Task.Yield();
}
}
}
}
115 changes: 115 additions & 0 deletions MoreLinq.Test/Async/TestingAsyncSequence.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#region License and Terms
// MoreLINQ - Extensions to LINQ to Objects
// Copyright (c) 2009 Atif Aziz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion

namespace MoreLinq.Test.Async
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Threading;
using NUnit.Framework;

static class TestingAsyncSequence
{
public static TestingAsyncSequence<T> Of<T>(params T[] elements) =>
new(elements.ToAsyncEnumerable());

public static TestingAsyncSequence<T> AsTestingSequence<T>(this IAsyncEnumerable<T> source) =>
source is not null
? new TestingAsyncSequence<T>(source)
: throw new ArgumentNullException(nameof(source));
}

/// <summary>
/// Sequence that asserts whether its iterator has been disposed
/// when it is disposed itself and also whether GetEnumerator() is
/// called exactly once or not.
/// </summary>

sealed class TestingAsyncSequence<T> : IAsyncEnumerable<T>, IDisposable
{
bool? _disposed;
IAsyncEnumerable<T>? _source;
ExceptionDispatchInfo? _disposeErrorInfo;

internal TestingAsyncSequence(IAsyncEnumerable<T> sequence) =>
_source = sequence;

public bool IsDisposed => _disposed == true;
public int MoveNextCallCount { get; private set; }

void IDisposable.Dispose() =>
AssertDisposed();

/// <summary>
/// Checks that the iterator was disposed, and then resets.
/// </summary>

void AssertDisposed()
{
_disposeErrorInfo?.Throw();

if (_disposed is null)
return;

Assert.IsTrue(_disposed, "Expected sequence to be disposed.");
_disposed = null;
}

public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
Assert.That(_source, Is.Not.Null,
"LINQ operators should not enumerate a sequence more than once.");

// Dammit (!) below is okay since we assert above it's not null.
var enumerator = _source!.GetAsyncEnumerator(cancellationToken).AsWatchable();

_disposed = false;
enumerator.Disposed += delegate
{
// If the following assertion fails the capture the error
// and re-throw later during the disposal of the test
// sequence. This is done so because "DisposeAsync" is never
// expected to throw and could interfere with how an operator
// builds on that assumption.

try
{
Assert.That(_disposed, Is.False, "LINQ operators should not dispose a sequence more than once.");
}
catch (AssertionException e)
{
_disposeErrorInfo = ExceptionDispatchInfo.Capture(e);
}

_disposed = true;
};

var ended = false;
enumerator.MoveNextCalled += (_, moved) =>
{
Assert.That(ended, Is.False, "LINQ operators should not continue iterating a sequence that has terminated.");
ended = !moved;
MoveNextCallCount++;
};

_source = null;
return enumerator;
}
}
}
Loading

0 comments on commit 28996d4

Please sign in to comment.