Skip to content

Commit

Permalink
Use HalfSerializer extensions (#572)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielcweber authored Jun 5, 2018
1 parent bd7178f commit fa0d583
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 115 deletions.
26 changes: 13 additions & 13 deletions Rx.NET/Source/src/System.Reactive/Internal/HalfSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,26 @@ internal static class HalfSerializer
/// Use a full SerializedObserver wrapper for merging multiple sequences.
/// </summary>
/// <typeparam name="T">The element type of the observer.</typeparam>
/// <param name="observer">The observer to signal events in a serialized fashion.</param>
/// <param name="sink">The observer to signal events in a serialized fashion.</param>
/// <param name="item">The item to signal.</param>
/// <param name="wip">Indicates there is an emission going on currently.</param>
/// <param name="error">The field containing an error or terminal indicator.</param>
public static void OnNext<T>(IObserver<T> observer, T item, ref int wip, ref Exception error)
public static void ForwardOnNext<T>(ISink<T> sink, T item, ref int wip, ref Exception error)
{
if (Interlocked.CompareExchange(ref wip, 1, 0) == 0)
{
observer.OnNext(item);
sink.ForwardOnNext(item);
if (Interlocked.Decrement(ref wip) != 0)
{
var ex = error;
if (ex != ExceptionHelper.Terminated)
{
error = ExceptionHelper.Terminated;
observer.OnError(ex);
sink.ForwardOnError(ex);
}
else
{
observer.OnCompleted();
sink.ForwardOnCompleted();
}
}
}
Expand All @@ -55,46 +55,46 @@ public static void OnNext<T>(IObserver<T> observer, T item, ref int wip, ref Exc
/// <summary>
/// Signals the given exception to the observer. If there is a concurrent
/// OnNext emission is happening, saves the exception into the given field
/// otherwise to be picked up by <see cref="OnNext{T}(IObserver{T}, T, ref int, ref Exception)"/>.
/// otherwise to be picked up by <see cref="ForwardOnNext{T}"/>.
/// This method can be called concurrently with itself and the other methods of this
/// helper class but only one terminal signal may actually win.
/// </summary>
/// <typeparam name="T">The element type of the observer.</typeparam>
/// <param name="observer">The observer to signal events in a serialized fashion.</param>
/// <param name="sink">The observer to signal events in a serialized fashion.</param>
/// <param name="ex">The exception to signal sooner or later.</param>
/// <param name="wip">Indicates there is an emission going on currently.</param>
/// <param name="error">The field containing an error or terminal indicator.</param>
public static void OnError<T>(IObserver<T> observer, Exception ex, ref int wip, ref Exception error)
public static void ForwardOnError<T>(ISink<T> sink, Exception ex, ref int wip, ref Exception error)
{
if (ExceptionHelper.TrySetException(ref error, ex))
{
if (Interlocked.Increment(ref wip) == 1)
{
error = ExceptionHelper.Terminated;
observer.OnError(ex);
sink.ForwardOnError(ex);
}
}
}

/// <summary>
/// Signals OnCompleted on the observer. If there is a concurrent
/// OnNext emission happening, the error field will host a special
/// terminal exception signal to be picked up by <see cref="OnNext{T}(IObserver{T}, T, ref int, ref Exception)"/> once it finishes with OnNext and signal the
/// terminal exception signal to be picked up by <see cref="ForwardOnNext{T}"/> once it finishes with OnNext and signal the
/// OnCompleted as well.
/// This method can be called concurrently with itself and the other methods of this
/// helper class but only one terminal signal may actually win.
/// </summary>
/// <typeparam name="T">The element type of the observer.</typeparam>
/// <param name="observer">The observer to signal events in a serialized fashion.</param>
/// <param name="sink">The observer to signal events in a serialized fashion.</param>
/// <param name="wip">Indicates there is an emission going on currently.</param>
/// <param name="error">The field containing an error or terminal indicator.</param>
public static void OnCompleted<T>(IObserver<T> observer, ref int wip, ref Exception error)
public static void ForwardOnCompleted<T>(ISink<T> sink, ref int wip, ref Exception error)
{
if (ExceptionHelper.TrySetException(ref error, ExceptionHelper.Terminated))
{
if (Interlocked.Increment(ref wip) == 1)
{
observer.OnCompleted();
sink.ForwardOnCompleted();
}
}
}
Expand Down
15 changes: 11 additions & 4 deletions Rx.NET/Source/src/System.Reactive/Internal/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@

namespace System.Reactive
{
internal abstract class Sink<TTarget> : IDisposable
internal interface ISink<in TTarget>
{
void ForwardOnNext(TTarget value);
void ForwardOnCompleted();
void ForwardOnError(Exception error);
}

internal abstract class Sink<TTarget> : ISink<TTarget>, IDisposable
{
private IDisposable _cancel;
private volatile IObserver<TTarget> _observer;
Expand All @@ -29,18 +36,18 @@ protected virtual void Dispose(bool disposing)
Disposable.TryDispose(ref _cancel);
}

protected void ForwardOnNext(TTarget value)
public void ForwardOnNext(TTarget value)
{
_observer.OnNext(value);
}

protected void ForwardOnCompleted()
public void ForwardOnCompleted()
{
_observer.OnCompleted();
Dispose();
}

protected void ForwardOnError(Exception error)
public void ForwardOnError(Exception error)
{
_observer.OnError(error);
Dispose();
Expand Down
37 changes: 4 additions & 33 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public _(IObserver<TSource> observer, IDisposable cancel)
public IDisposable Run(SkipUntil<TSource, TOther> parent)
{
Disposable.TrySetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this)));

Disposable.TrySetSingle(ref _mainDisposable, parent._source.Subscribe(this));

return this;
Expand Down Expand Up @@ -70,48 +69,20 @@ void DisposeMain()
public override void OnNext(TSource value)
{
if (_forward)
{
if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0)
{
ForwardOnNext(value);
if (Interlocked.Decrement(ref _halfSerializer) != 0)
{
var ex = _error;
_error = SkipUntilTerminalException.Instance;
ForwardOnError(ex);
}
}
}
HalfSerializer.ForwardOnNext(this, value, ref _halfSerializer, ref _error);
}

public override void OnError(Exception ex)
{
if (Interlocked.CompareExchange(ref _error, ex, null) == null)
{
if (Interlocked.Increment(ref _halfSerializer) == 1)
{
_error = SkipUntilTerminalException.Instance;
ForwardOnError(ex);
}
}
HalfSerializer.ForwardOnError(this, ex, ref _halfSerializer, ref _error);
}

public override void OnCompleted()
{
if (_forward)
{
if (Interlocked.CompareExchange(ref _error, SkipUntilTerminalException.Instance, null) == null)
{
if (Interlocked.Increment(ref _halfSerializer) == 1)
{
ForwardOnCompleted();
}
}
}
HalfSerializer.ForwardOnCompleted(this, ref _halfSerializer, ref _error);
else
{
DisposeMain();
}
}

void OtherComplete()
Expand Down Expand Up @@ -143,7 +114,7 @@ public void OnCompleted()

public void OnError(Exception error)
{
_parent.OnError(error);
HalfSerializer.ForwardOnError(_parent, error, ref _parent._halfSerializer, ref _parent._error);
}

public void OnNext(TOther value)
Expand Down
39 changes: 5 additions & 34 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,46 +59,17 @@ protected override void Dispose(bool disposing)

public override void OnNext(TSource value)
{
if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0)
{
ForwardOnNext(value);
if (Interlocked.Decrement(ref _halfSerializer) != 0)
{
var ex = _error;
if (ex != TakeUntilTerminalException.Instance)
{
_error = TakeUntilTerminalException.Instance;
ForwardOnError(ex);
}
else
{
ForwardOnCompleted();
}
}
}
HalfSerializer.ForwardOnNext(this, value, ref _halfSerializer, ref _error);
}

public override void OnError(Exception ex)
{
if (Interlocked.CompareExchange(ref _error, ex, null) == null)
{
if (Interlocked.Increment(ref _halfSerializer) == 1)
{
_error = TakeUntilTerminalException.Instance;
ForwardOnError(ex);
}
}
HalfSerializer.ForwardOnError(this, ex, ref _halfSerializer, ref _error);
}

public override void OnCompleted()
{
if (Interlocked.CompareExchange(ref _error, TakeUntilTerminalException.Instance, null) == null)
{
if (Interlocked.Increment(ref _halfSerializer) == 1)
{
ForwardOnCompleted();
}
}
HalfSerializer.ForwardOnCompleted(this, ref _halfSerializer, ref _error);
}

sealed class OtherObserver : IObserver<TOther>
Expand All @@ -118,12 +89,12 @@ public void OnCompleted()

public void OnError(Exception error)
{
_parent.OnError(error);
HalfSerializer.ForwardOnError(_parent, error, ref _parent._halfSerializer, ref _parent._error);
}

public void OnNext(TOther value)
{
_parent.OnCompleted();
HalfSerializer.ForwardOnCompleted(_parent, ref _parent._halfSerializer, ref _parent._error);
}
}

Expand Down
Loading

0 comments on commit fa0d583

Please sign in to comment.