Skip to content

Commit

Permalink
Add the TakeUntil(Func<T, bool>) operator (#612)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored and danielcweber committed Jun 19, 2018
1 parent 55e967e commit ae8c989
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 0 deletions.
1 change: 1 addition & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ internal interface IQueryLanguage
IObservable<TSource> SkipUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
IObservable<TSource> Switch<TSource>(IObservable<IObservable<TSource>> sources);
IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, Func<TSource, bool> stopPredicate);
IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector);
IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector);
IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries);
Expand Down
29 changes: 29 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Multiple.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,35 @@ public static IObservable<TSource> TakeUntil<TSource, TOther>(this IObservable<T
return s_impl.TakeUntil<TSource, TOther>(source, other);
}

/// <summary>
/// Relays elements from the source observable sequence and calls the predicate after an
/// emission to check if the sequence should stop after that specific item.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
/// <param name="source">The source sequence to relay elements of.</param>
/// <param name="stopPredicate">Called after each upstream item has been emitted with
/// that upstream item and should return <code>true</code> to indicate the sequence should
/// complete.</param>
/// <returns>The observable sequence with the source elements until the stop predicate returns true.</returns>
/// <example>
/// The following sequence will stop after the value 5 has been encountered:
/// <code>
/// Observable.Range(1, 10)
/// .TakeUntil(item =&gt; item == 5)
/// .Subscribe(Console.WriteLine);
/// </code>
/// </example>
/// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> stopPredicate)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (stopPredicate == null)
throw new ArgumentNullException(nameof(stopPredicate));

return s_impl.TakeUntil(source, stopPredicate);
}

#endregion

#region + Window +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Text;

namespace System.Reactive.Linq.ObservableImpl
{
/// <summary>
/// Relays items to the downstream until the predicate returns <code>true</code>.
/// </summary>
/// <typeparam name="TSource">The element type of the sequence</typeparam>
internal sealed class TakeUntilPredicate<TSource> :
Producer<TSource, TakeUntilPredicate<TSource>.TakeUntilPredicateObserver>
{
readonly IObservable<TSource> _source;

readonly Func<TSource, bool> _stopPredicate;

public TakeUntilPredicate(IObservable<TSource> source, Func<TSource, bool> stopPredicate)
{
this._source = source;
this._stopPredicate = stopPredicate;
}

protected override TakeUntilPredicateObserver CreateSink(IObserver<TSource> observer) => new TakeUntilPredicateObserver(observer, _stopPredicate);

protected override void Run(TakeUntilPredicateObserver sink) => sink.Run(_source);

internal sealed class TakeUntilPredicateObserver : IdentitySink<TSource>
{
readonly Func<TSource, bool> _stopPredicate;

public TakeUntilPredicateObserver(IObserver<TSource> downstream,
Func<TSource, bool> predicate) : base (downstream)
{
this._stopPredicate = predicate;
}

public override void OnCompleted()
{
ForwardOnCompleted();
}

public override void OnError(Exception error)
{
ForwardOnError(error);
}

public override void OnNext(TSource value)
{
ForwardOnNext(value);

var shouldStop = false;
try
{
shouldStop = _stopPredicate(value);
}
catch (Exception ex)
{
ForwardOnError(ex);
return;
}
if (shouldStop)
{
ForwardOnCompleted();
}
}
}
}
}
40 changes: 40 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15015,6 +15015,46 @@ public static IQbservable<TSource> TakeUntil<TSource, TOther>(this IQbservable<T
);
}

/// <summary>
/// Relays elements from the source observable sequence and calls the predicate after an
/// emission to check if the sequence should stop after that specific item.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
/// <param name="source">The source sequence to relay elements of.</param>
/// <param name="stopPredicate">Called after each upstream item has been emitted with
/// that upstream item and should return <code>true</code> to indicate the sequence should
/// complete.</param>
/// <returns>The observable sequence with the source elements until the stop predicate returns true.</returns>
/// <example>
/// The following sequence will stop after the value 5 has been encountered:
/// <code>
/// Observable.Range(1, 10)
/// .TakeUntil(item =&gt; item == 5)
/// .Subscribe(Console.WriteLine);
/// </code>
/// </example>
/// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
public static IQbservable<TSource> TakeUntil<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, bool>> stopPredicate)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (stopPredicate == null)
throw new ArgumentNullException(nameof(stopPredicate));

return source.Provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.TakeUntil<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, bool>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
source.Expression,
stopPredicate
)
);
}

/// <summary>
/// Returns elements from an observable sequence as long as a specified condition is true.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ public virtual IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSour
return new TakeUntil<TSource, TOther>(source, other);
}

public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, Func<TSource, bool> stopPredicate)
{
return new TakeUntilPredicate<TSource>(source, stopPredicate);
}

#endregion

#region + Window +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,7 @@ namespace System.Reactive.Linq
public static System.IObservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.IObservable<TSource> source, System.TimeSpan duration) { }
public static System.IObservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.IObservable<TSource> source, System.TimeSpan duration, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.IObservable<TSource> TakeUntil<TSource, TOther>(this System.IObservable<TSource> source, System.IObservable<TOther> other) { }
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> stopPredicate) { }
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime) { }
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.IObservable<TSource> TakeWhile<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> predicate) { }
Expand Down Expand Up @@ -2097,6 +2098,7 @@ namespace System.Reactive.Linq
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource, TOther>(this System.Reactive.Linq.IQbservable<TSource> source, System.IObservable<TOther> other) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> stopPredicate) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeWhile<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> predicate) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeWhile<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, int, bool>> predicate) { }
public static System.Reactive.Joins.QueryablePlan<TResult> Then<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, TResult>> selector) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,5 +685,182 @@ public void TakeUntil_Default()

#endregion

#region + Predicate +

[Fact]
public void TakeUntil_Predicate_ArgumentChecking()
{
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeUntil<int>(null, v => true));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeUntil<int>(DummyObservable<int>.Instance, null));
}

[Fact]
public void TakeUntil_Predicate_Basic()
{
var scheduler = new TestScheduler();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnNext(40, 4),
OnNext(50, 5),
OnNext(60, 6),
OnNext(70, 7),
OnNext(80, 8),
OnNext(90, 9),
OnCompleted<int>(100)
);

var result = scheduler.Start(() => source.TakeUntil(v => v == 5));

result.Messages.AssertEqual(
OnNext(210, 1),
OnNext(220, 2),
OnNext(230, 3),
OnNext(240, 4),
OnNext(250, 5),
OnCompleted<int>(250)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 250)
);
}

[Fact]
public void TakeUntil_Predicate_True()
{
var scheduler = new TestScheduler();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnNext(40, 4),
OnNext(50, 5),
OnNext(60, 6),
OnNext(70, 7),
OnNext(80, 8),
OnNext(90, 9),
OnCompleted<int>(100)
);

var result = scheduler.Start(() => source.TakeUntil(v => true));

result.Messages.AssertEqual(
OnNext(210, 1),
OnCompleted<int>(210)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 210)
);
}

[Fact]
public void TakeUntil_Predicate_False()
{
var scheduler = new TestScheduler();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnNext(40, 4),
OnNext(50, 5),
OnNext(60, 6),
OnNext(70, 7),
OnNext(80, 8),
OnNext(90, 9),
OnCompleted<int>(100)
);

var result = scheduler.Start(() => source.TakeUntil(v => false));

result.Messages.AssertEqual(
OnNext(210, 1),
OnNext(220, 2),
OnNext(230, 3),
OnNext(240, 4),
OnNext(250, 5),
OnNext(260, 6),
OnNext(270, 7),
OnNext(280, 8),
OnNext(290, 9),
OnCompleted<int>(300)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 300)
);
}

[Fact]
public void TakeUntil_Predicate_Error()
{
var scheduler = new TestScheduler();

var ex = new InvalidOperationException();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnError<int>(40, ex)
);

var result = scheduler.Start(() => source.TakeUntil(v => false));

result.Messages.AssertEqual(
OnNext(210, 1),
OnNext(220, 2),
OnNext(230, 3),
OnError<int>(240, ex)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 240)
);
}

[Fact]
public void TakeUntil_Predicate_Crash()
{
var scheduler = new TestScheduler();

var ex = new InvalidOperationException();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnNext(240, 4),
OnNext(250, 5),
OnCompleted<int>(260)
);

var result = scheduler.Start(() => source.TakeUntil(v => {
if (v == 3)
{
throw ex;
}
return false;
}));

result.Messages.AssertEqual(
OnNext(210, 1),
OnNext(220, 2),
OnNext(230, 3),
OnError<int>(230, ex)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 230)
);
}

#endregion

}
}

0 comments on commit ae8c989

Please sign in to comment.