Skip to content

Commit

Permalink
4.x: Rework & fix SkipUntil with lock-free methods (#551)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored and danielcweber committed Jun 1, 2018
1 parent 389881c commit c6766f5
Showing 1 changed file with 88 additions and 48 deletions.
136 changes: 88 additions & 48 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
Expand All @@ -24,102 +25,141 @@ public SkipUntil(IObservable<TSource> source, IObservable<TOther> other)

internal sealed class _ : IdentitySink<TSource>
{
IDisposable _mainDisposable;
IDisposable _otherDisposable;
volatile bool _forward;
int _halfSerializer;
Exception _error;

public _(IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
}

public IDisposable Run(SkipUntil<TSource, TOther> parent)
{
var sourceObserver = new SourceObserver(this);
var otherObserver = new OtherObserver(this, sourceObserver);

var otherSubscription = parent._other.SubscribeSafe(otherObserver);
var sourceSubscription = parent._source.SubscribeSafe(sourceObserver);

sourceObserver.Disposable = sourceSubscription;
otherObserver.Disposable = otherSubscription;

return StableCompositeDisposable.Create(
sourceSubscription,
otherSubscription
);
Disposable.TrySetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this)));

Disposable.TrySetSingle(ref _mainDisposable, parent._source.Subscribe(this));

return this;
}

private sealed class SourceObserver : IObserver<TSource>
protected override void Dispose(bool disposing)
{
private readonly _ _parent;
public volatile bool _forward;
private readonly SingleAssignmentDisposable _subscription;

public SourceObserver(_ parent)
if (disposing)
{
_parent = parent;
_subscription = new SingleAssignmentDisposable();
DisposeMain();
if (!Disposable.GetIsDisposed(ref _otherDisposable))
{
Disposable.TryDispose(ref _otherDisposable);
}
}

public IDisposable Disposable
base.Dispose(disposing);
}

void DisposeMain()
{
if (!Disposable.GetIsDisposed(ref _mainDisposable))
{
set { _subscription.Disposable = value; }
Disposable.TryDispose(ref _mainDisposable);
}
}

public void OnNext(TSource value)
public override void OnNext(TSource value)
{
if (_forward)
{
if (_forward)
_parent.ForwardOnNext(value);
if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0)
{
ForwardOnNext(value);
if (Interlocked.Decrement(ref _halfSerializer) != 0)
{
var ex = _error;
_error = SkipUntilTerminalException.Instance;
ForwardOnError(ex);
}
}
}
}

public void OnError(Exception error)
public override void OnError(Exception ex)
{
if (Interlocked.CompareExchange(ref _error, ex, null) == null)
{
_parent.ForwardOnError(error);
if (Interlocked.Increment(ref _halfSerializer) == 1)
{
_error = SkipUntilTerminalException.Instance;
ForwardOnError(ex);
}
}
}

public void OnCompleted()
public override void OnCompleted()
{
if (_forward)
{
if (_forward)
_parent.ForwardOnCompleted();

_subscription.Dispose(); // We can't cancel the other stream yet, it may be on its way to dispatch an OnError message and we don't want to have a race.
if (Interlocked.CompareExchange(ref _error, SkipUntilTerminalException.Instance, null) == null)
{
if (Interlocked.Increment(ref _halfSerializer) == 1)
{
ForwardOnCompleted();
}
}
}
else
{
DisposeMain();
}
}

private sealed class OtherObserver : IObserver<TOther>
void OtherComplete()
{
_forward = true;
}

sealed class OtherObserver : IObserver<TOther>, IDisposable
{
private readonly _ _parent;
private readonly SourceObserver _sourceObserver;
private readonly SingleAssignmentDisposable _subscription;
readonly _ _parent;

public OtherObserver(_ parent, SourceObserver sourceObserver)
public OtherObserver(_ parent)
{
_parent = parent;
_sourceObserver = sourceObserver;
_subscription = new SingleAssignmentDisposable();
}

public IDisposable Disposable
public void Dispose()
{
set { _subscription.Disposable = value; }
if (!Disposable.GetIsDisposed(ref _parent._otherDisposable))
{
Disposable.TryDispose(ref _parent._otherDisposable);
}
}

public void OnNext(TOther value)
public void OnCompleted()
{
_sourceObserver._forward = true;
_subscription.Dispose();
Dispose();
}

public void OnError(Exception error)
{
_parent.ForwardOnError(error);
_parent.OnError(error);
}

public void OnCompleted()
public void OnNext(TOther value)
{
_subscription.Dispose();
_parent.OtherComplete();
Dispose();
}
}
}
}

internal static class SkipUntilTerminalException
{
internal static readonly Exception Instance = new Exception("No further exceptions");
}

internal sealed class SkipUntil<TSource> : Producer<TSource, SkipUntil<TSource>._>
{
private readonly IObservable<TSource> _source;
Expand Down

0 comments on commit c6766f5

Please sign in to comment.