Skip to content

Commit

Permalink
Add the RepeatWhen operator (#536)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored and danielcweber committed Jun 12, 2018
1 parent 7551d1a commit 24fb6db
Show file tree
Hide file tree
Showing 8 changed files with 867 additions and 133 deletions.
1 change: 1 addition & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ internal interface IQueryLanguage
IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source);
IObservable<TSource> Repeat<TSource>(IObservable<TSource> source);
IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount);
IObservable<TSource> RepeatWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler);
IObservable<TSource> Retry<TSource>(IObservable<TSource> source);
IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount);
IObservable<TSource> RetryWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<Exception>, IObservable<TSignal>> handler);
Expand Down
25 changes: 25 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,31 @@ public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> sou
return s_impl.Repeat<TSource>(source, repeatCount);
}

/// <summary>
/// Repeatedly resubscribes to the source observable after a normal completion and when the observable
/// returned by a handler produces an arbitrary item.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <typeparam name="TSignal">The arbitrary element type signaled by the handler observable.</typeparam>
/// <param name="source">Observable sequence to keep repeating when it successfully terminates.</param>
/// <param name="handler">The function that is called for each observer and takes an observable sequence objects.
/// It should return an observable of arbitrary items that should signal that arbitrary item in
/// response to receiving the completion signal from the source observable. If this observable signals
/// a terminal event, the sequence is terminated with that signal instead.</param>
/// <returns>An observable sequence producing the elements of the given sequence repeatedly while each repetition terminates successfully.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentNullException"><paramref name="handler"/> is null.</exception>
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(this IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (handler == null)
throw new ArgumentNullException(nameof(handler));

return s_impl.RepeatWhen(source, handler);
}


#endregion

#region + Retry +
Expand Down
170 changes: 170 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/RepeatWhen.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class RepeatWhen<T, U> : IObservable<T>
{
readonly IObservable<T> source;

readonly Func<IObservable<object>, IObservable<U>> handler;

internal RepeatWhen(IObservable<T> source, Func<IObservable<object>, IObservable<U>> handler)
{
this.source = source;
this.handler = handler;
}

public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}

var completeSignals = new Subject<object>();
var redo = default(IObservable<U>);

try
{
redo = handler(completeSignals);
if (redo == null)
{
throw new NullReferenceException("The handler returned a null IObservable");
}
}
catch (Exception ex)
{
observer.OnError(ex);
return Disposable.Empty;
}

var parent = new MainObserver(observer, source, new RedoSerializedObserver<object>(completeSignals));

var d = redo.SubscribeSafe(parent.handlerObserver);
Disposable.SetSingle(ref parent.handlerUpstream, d);

parent.HandlerNext();

return parent;
}

sealed class MainObserver : Sink<T>, IObserver<T>
{
readonly IObserver<Exception> errorSignal;

internal readonly HandlerObserver handlerObserver;

readonly IObservable<T> source;

IDisposable upstream;

internal IDisposable handlerUpstream;

int trampoline;

int halfSerializer;

Exception error;

internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<Exception> errorSignal) : base(downstream)
{
this.source = source;
this.errorSignal = errorSignal;
this.handlerObserver = new HandlerObserver(this);
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref upstream);
Disposable.TryDispose(ref handlerUpstream);
}
base.Dispose(disposing);
}

public void OnCompleted()
{
if (Disposable.TrySetSerial(ref upstream, null))
{
errorSignal.OnNext(null);
}

}

public void OnError(Exception error)
{
HalfSerializer.ForwardOnError(this, error, ref halfSerializer, ref this.error);
}

public void OnNext(T value)
{
HalfSerializer.ForwardOnNext(this, value, ref halfSerializer, ref this.error);
}

internal void HandlerError(Exception error)
{
HalfSerializer.ForwardOnError(this, error, ref halfSerializer, ref this.error);
}

internal void HandlerComplete()
{
HalfSerializer.ForwardOnCompleted(this, ref halfSerializer, ref this.error);
}

internal void HandlerNext()
{
if (Interlocked.Increment(ref trampoline) == 1)
{
do
{
var sad = new SingleAssignmentDisposable();
if (Interlocked.CompareExchange(ref upstream, sad, null) != null)
{
return;
}

sad.Disposable = source.SubscribeSafe(this);
}
while (Interlocked.Decrement(ref trampoline) != 0);
}
}

internal sealed class HandlerObserver : IObserver<U>
{
readonly MainObserver main;

internal HandlerObserver(MainObserver main)
{
this.main = main;
}

public void OnCompleted()
{
main.HandlerComplete();
}

public void OnError(Exception error)
{
main.HandlerError(error);
}

public void OnNext(U value)
{
main.HandlerNext();
}
}
}

}
}
Loading

0 comments on commit 24fb6db

Please sign in to comment.