Skip to content

Commit

Permalink
4.x: Rework TakeUntil with lock-free methods (#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored and danielcweber committed May 30, 2018
1 parent 86b3b94 commit caf6203
Showing 1 changed file with 60 additions and 74 deletions.
134 changes: 60 additions & 74 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
Expand All @@ -24,131 +25,116 @@ public TakeUntil(IObservable<TSource> source, IObservable<TOther> other)

internal sealed class _ : IdentitySink<TSource>
{
private IDisposable _mainDisposable;
private IDisposable _otherDisposable;
private int _halfSerializer;
private Exception _error;

public _(IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
}

public IDisposable Run(TakeUntil<TSource, TOther> parent)
{
var sourceObserver = new SourceObserver(this);
var otherObserver = new OtherObserver(this, sourceObserver);

// COMPAT - Order of Subscribe calls per v1.0.10621
var otherSubscription = parent._other.SubscribeSafe(otherObserver);
otherObserver.Disposable = otherSubscription;

var sourceSubscription = parent._source.SubscribeSafe(sourceObserver);
Disposable.TrySetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this)));
Disposable.TrySetSingle(ref _mainDisposable, parent._source.Subscribe(this));

return StableCompositeDisposable.Create(
otherSubscription,
sourceSubscription
);
return this;
}

/*
* We tried a more fine-grained synchronization scheme to make TakeUntil more efficient, but
* this requires several CAS instructions, which quickly add up to being non-beneficial.
*
* Notice an approach where the "other" channel performs an Interlocked.Exchange operation on
* the _parent._observer field to substitute it with a NopObserver<TSource> doesn't work,
* because the "other" channel still needs to send an OnCompleted message, which could happen
* concurrently with another message when the "source" channel has already read from the
* _parent._observer field between making the On* call.
*
* Fixing this issue requires an ownership transfer mechanism for channels to get exclusive
* access to the outgoing observer while dispatching a message. Doing this more fine-grained
* than using locks turns out to be tricky and doesn't reduce cost.
*/
private sealed class SourceObserver : IObserver<TSource>
protected override void Dispose(bool disposing)
{
private readonly _ _parent;
public volatile bool _open;

public SourceObserver(_ parent)
if (disposing)
{
_parent = parent;
_open = false;
if (!Disposable.GetIsDisposed(ref _mainDisposable))
{
Disposable.TryDispose(ref _mainDisposable);
Disposable.TryDispose(ref _otherDisposable);
}
}

public void OnNext(TSource value)
base.Dispose(disposing);
}

public override void OnNext(TSource value)
{
if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0)
{
if (_open)
{
_parent.ForwardOnNext(value);
}
else
ForwardOnNext(value);
if (Interlocked.Decrement(ref _halfSerializer) != 0)
{
lock (_parent)
var ex = _error;
if (ex != TakeUntilTerminalException.Instance)
{
_error = TakeUntilTerminalException.Instance;
ForwardOnError(ex);
}
else
{
_parent.ForwardOnNext(value);
ForwardOnCompleted();
}
}
}
}

public void OnError(Exception error)
public override void OnError(Exception ex)
{
if (Interlocked.CompareExchange(ref _error, ex, null) == null)
{
lock (_parent)
if (Interlocked.Increment(ref _halfSerializer) == 1)
{
_parent.ForwardOnError(error);
_error = TakeUntilTerminalException.Instance;
ForwardOnError(ex);
}
}
}

public void OnCompleted()
public override void OnCompleted()
{
if (Interlocked.CompareExchange(ref _error, TakeUntilTerminalException.Instance, null) == null)
{
lock (_parent)
if (Interlocked.Increment(ref _halfSerializer) == 1)
{
_parent.ForwardOnCompleted();
ForwardOnCompleted();
}
}
}

private sealed class OtherObserver : IObserver<TOther>
sealed class OtherObserver : IObserver<TOther>
{
private readonly _ _parent;
private readonly SourceObserver _sourceObserver;
private readonly SingleAssignmentDisposable _subscription;
readonly _ _parent;

public OtherObserver(_ parent, SourceObserver sourceObserver)
public OtherObserver(_ parent)
{
_parent = parent;
_sourceObserver = sourceObserver;
_subscription = new SingleAssignmentDisposable();
}

public IDisposable Disposable
{
set { _subscription.Disposable = value; }
}

public void OnNext(TOther value)
public void OnCompleted()
{
lock (_parent)
{
_parent.ForwardOnCompleted();
}
// Completion doesn't mean termination in Rx.NET for this operator
Disposable.TryDispose(ref _parent._otherDisposable);
}

public void OnError(Exception error)
{
lock (_parent)
{
_parent.ForwardOnError(error);
}
_parent.OnError(error);
}

public void OnCompleted()
public void OnNext(TOther value)
{
lock (_parent)
{
_sourceObserver._open = true;
_subscription.Dispose();
}
_parent.OnCompleted();
}
}

}
}

internal static class TakeUntilTerminalException
{
internal static readonly Exception Instance = new Exception("No further exceptions");
}

internal sealed class TakeUntil<TSource> : Producer<TSource, TakeUntil<TSource>._>
{
private readonly IObservable<TSource> _source;
Expand Down

0 comments on commit caf6203

Please sign in to comment.