Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Rework TakeUntil with lock-free methods #550

Merged
merged 7 commits into from
May 30, 2018
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 62 additions & 74 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
Expand All @@ -24,131 +25,118 @@ public TakeUntil(IObservable<TSource> source, IObservable<TOther> other)

internal sealed class _ : IdentitySink<TSource>
{
private readonly OtherObserver _other;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field can be omitted, good to be merged otherwise.

private IDisposable _mainDisposable;
private IDisposable _otherDisposable;
private int _halfSerializer;
private Exception _error;

public _(IObserver<TSource> observer, IDisposable cancel)
: base(observer, cancel)
{
_other = new OtherObserver(this);
}

public IDisposable Run(TakeUntil<TSource, TOther> parent)
{
var sourceObserver = new SourceObserver(this);
var otherObserver = new OtherObserver(this, sourceObserver);

// COMPAT - Order of Subscribe calls per v1.0.10621
var otherSubscription = parent._other.SubscribeSafe(otherObserver);
otherObserver.Disposable = otherSubscription;

var sourceSubscription = parent._source.SubscribeSafe(sourceObserver);
Disposable.TrySetSingle(ref _otherDisposable, parent._other.Subscribe(_other));
Disposable.TrySetSingle(ref _mainDisposable, parent._source.Subscribe(this));

return StableCompositeDisposable.Create(
otherSubscription,
sourceSubscription
);
return this;
}

/*
* We tried a more fine-grained synchronization scheme to make TakeUntil more efficient, but
* this requires several CAS instructions, which quickly add up to being non-beneficial.
*
* Notice an approach where the "other" channel performs an Interlocked.Exchange operation on
* the _parent._observer field to substitute it with a NopObserver<TSource> doesn't work,
* because the "other" channel still needs to send an OnCompleted message, which could happen
* concurrently with another message when the "source" channel has already read from the
* _parent._observer field between making the On* call.
*
* Fixing this issue requires an ownership transfer mechanism for channels to get exclusive
* access to the outgoing observer while dispatching a message. Doing this more fine-grained
* than using locks turns out to be tricky and doesn't reduce cost.
*/
private sealed class SourceObserver : IObserver<TSource>
protected override void Dispose(bool disposing)
{
private readonly _ _parent;
public volatile bool _open;

public SourceObserver(_ parent)
if (disposing)
{
_parent = parent;
_open = false;
if (!Disposable.GetIsDisposed(ref _mainDisposable))
{
Disposable.TryDispose(ref _mainDisposable);
Disposable.TryDispose(ref _otherDisposable);
}
}

public void OnNext(TSource value)
base.Dispose(disposing);
}

public override void OnNext(TSource value)
{
if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0)
{
if (_open)
{
_parent.ForwardOnNext(value);
}
else
ForwardOnNext(value);
if (Interlocked.Decrement(ref _halfSerializer) != 0)
{
lock (_parent)
var ex = _error;
if (ex != TakeUntilTerminalException.Instance)
{
_error = TakeUntilTerminalException.Instance;
ForwardOnError(ex);
}
else
{
_parent.ForwardOnNext(value);
ForwardOnCompleted();
}
}
}
}

public void OnError(Exception error)
public override void OnError(Exception ex)
{
if (Interlocked.CompareExchange(ref _error, ex, null) == null)
{
lock (_parent)
if (Interlocked.Increment(ref _halfSerializer) == 1)
{
_parent.ForwardOnError(error);
_error = TakeUntilTerminalException.Instance;
ForwardOnError(ex);
}
}
}

public void OnCompleted()
public override void OnCompleted()
{
if (Interlocked.CompareExchange(ref _error, TakeUntilTerminalException.Instance, null) == null)
{
lock (_parent)
if (Interlocked.Increment(ref _halfSerializer) == 1)
{
_parent.ForwardOnCompleted();
ForwardOnCompleted();
}
}
}

private sealed class OtherObserver : IObserver<TOther>
sealed class OtherObserver : IObserver<TOther>
{
private readonly _ _parent;
private readonly SourceObserver _sourceObserver;
private readonly SingleAssignmentDisposable _subscription;
readonly _ _parent;

public OtherObserver(_ parent, SourceObserver sourceObserver)
public OtherObserver(_ parent)
{
_parent = parent;
_sourceObserver = sourceObserver;
_subscription = new SingleAssignmentDisposable();
}

public IDisposable Disposable
{
set { _subscription.Disposable = value; }
}

public void OnNext(TOther value)
public void OnCompleted()
{
lock (_parent)
{
_parent.ForwardOnCompleted();
}
// Completion doesn't mean termination in Rx.NET for this operator
Disposable.TryDispose(ref _parent._otherDisposable);
}

public void OnError(Exception error)
{
lock (_parent)
{
_parent.ForwardOnError(error);
}
_parent.OnError(error);
}

public void OnCompleted()
public void OnNext(TOther value)
{
lock (_parent)
{
_sourceObserver._open = true;
_subscription.Dispose();
}
_parent.OnCompleted();
}
}

}
}

internal static class TakeUntilTerminalException
{
internal static readonly Exception Instance = new Exception("No further exceptions");
}

internal sealed class TakeUntil<TSource> : Producer<TSource, TakeUntil<TSource>._>
{
private readonly IObservable<TSource> _source;
Expand Down