-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
4.x: Rework & fix SkipUntil with lock-free methods #551
Changes from all commits
826a253
176ee2d
bbd81a1
23cb959
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
using System.Reactive.Concurrency; | ||
using System.Reactive.Disposables; | ||
using System.Threading; | ||
|
||
namespace System.Reactive.Linq.ObservableImpl | ||
{ | ||
|
@@ -24,102 +25,141 @@ public SkipUntil(IObservable<TSource> source, IObservable<TOther> other) | |
|
||
internal sealed class _ : IdentitySink<TSource> | ||
{ | ||
IDisposable _mainDisposable; | ||
IDisposable _otherDisposable; | ||
volatile bool _forward; | ||
int _halfSerializer; | ||
Exception _error; | ||
|
||
public _(IObserver<TSource> observer, IDisposable cancel) | ||
: base(observer, cancel) | ||
{ | ||
} | ||
|
||
public IDisposable Run(SkipUntil<TSource, TOther> parent) | ||
{ | ||
var sourceObserver = new SourceObserver(this); | ||
var otherObserver = new OtherObserver(this, sourceObserver); | ||
|
||
var otherSubscription = parent._other.SubscribeSafe(otherObserver); | ||
var sourceSubscription = parent._source.SubscribeSafe(sourceObserver); | ||
|
||
sourceObserver.Disposable = sourceSubscription; | ||
otherObserver.Disposable = otherSubscription; | ||
|
||
return StableCompositeDisposable.Create( | ||
sourceSubscription, | ||
otherSubscription | ||
); | ||
Disposable.TrySetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this))); | ||
|
||
Disposable.TrySetSingle(ref _mainDisposable, parent._source.Subscribe(this)); | ||
|
||
return this; | ||
} | ||
|
||
private sealed class SourceObserver : IObserver<TSource> | ||
protected override void Dispose(bool disposing) | ||
{ | ||
private readonly _ _parent; | ||
public volatile bool _forward; | ||
private readonly SingleAssignmentDisposable _subscription; | ||
|
||
public SourceObserver(_ parent) | ||
if (disposing) | ||
{ | ||
_parent = parent; | ||
_subscription = new SingleAssignmentDisposable(); | ||
DisposeMain(); | ||
if (!Disposable.GetIsDisposed(ref _otherDisposable)) | ||
{ | ||
Disposable.TryDispose(ref _otherDisposable); | ||
} | ||
} | ||
|
||
public IDisposable Disposable | ||
base.Dispose(disposing); | ||
} | ||
|
||
void DisposeMain() | ||
{ | ||
if (!Disposable.GetIsDisposed(ref _mainDisposable)) | ||
{ | ||
set { _subscription.Disposable = value; } | ||
Disposable.TryDispose(ref _mainDisposable); | ||
} | ||
} | ||
|
||
public void OnNext(TSource value) | ||
public override void OnNext(TSource value) | ||
{ | ||
if (_forward) | ||
{ | ||
if (_forward) | ||
_parent.ForwardOnNext(value); | ||
if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would Interlocked.Increment work as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interlocked has to force a new value so it is more expensive in contended cases whereas CAS is generally cheaper, especially on the weaker platforms where increment may be actually implemented as a CAS loop. Here, there is no need to loop. |
||
{ | ||
ForwardOnNext(value); | ||
if (Interlocked.Decrement(ref _halfSerializer) != 0) | ||
{ | ||
var ex = _error; | ||
_error = SkipUntilTerminalException.Instance; | ||
ForwardOnError(ex); | ||
} | ||
} | ||
} | ||
} | ||
|
||
public void OnError(Exception error) | ||
public override void OnError(Exception ex) | ||
{ | ||
if (Interlocked.CompareExchange(ref _error, ex, null) == null) | ||
{ | ||
_parent.ForwardOnError(error); | ||
if (Interlocked.Increment(ref _halfSerializer) == 1) | ||
{ | ||
_error = SkipUntilTerminalException.Instance; | ||
ForwardOnError(ex); | ||
} | ||
} | ||
} | ||
|
||
public void OnCompleted() | ||
public override void OnCompleted() | ||
{ | ||
if (_forward) | ||
{ | ||
if (_forward) | ||
_parent.ForwardOnCompleted(); | ||
|
||
_subscription.Dispose(); // We can't cancel the other stream yet, it may be on its way to dispatch an OnError message and we don't want to have a race. | ||
if (Interlocked.CompareExchange(ref _error, SkipUntilTerminalException.Instance, null) == null) | ||
{ | ||
if (Interlocked.Increment(ref _halfSerializer) == 1) | ||
{ | ||
ForwardOnCompleted(); | ||
} | ||
} | ||
} | ||
else | ||
{ | ||
DisposeMain(); | ||
} | ||
} | ||
|
||
private sealed class OtherObserver : IObserver<TOther> | ||
void OtherComplete() | ||
{ | ||
_forward = true; | ||
} | ||
|
||
sealed class OtherObserver : IObserver<TOther>, IDisposable | ||
{ | ||
private readonly _ _parent; | ||
private readonly SourceObserver _sourceObserver; | ||
private readonly SingleAssignmentDisposable _subscription; | ||
readonly _ _parent; | ||
|
||
public OtherObserver(_ parent, SourceObserver sourceObserver) | ||
public OtherObserver(_ parent) | ||
{ | ||
_parent = parent; | ||
_sourceObserver = sourceObserver; | ||
_subscription = new SingleAssignmentDisposable(); | ||
} | ||
|
||
public IDisposable Disposable | ||
public void Dispose() | ||
{ | ||
set { _subscription.Disposable = value; } | ||
if (!Disposable.GetIsDisposed(ref _parent._otherDisposable)) | ||
{ | ||
Disposable.TryDispose(ref _parent._otherDisposable); | ||
} | ||
} | ||
|
||
public void OnNext(TOther value) | ||
public void OnCompleted() | ||
{ | ||
_sourceObserver._forward = true; | ||
_subscription.Dispose(); | ||
Dispose(); | ||
} | ||
|
||
public void OnError(Exception error) | ||
{ | ||
_parent.ForwardOnError(error); | ||
_parent.OnError(error); | ||
} | ||
|
||
public void OnCompleted() | ||
public void OnNext(TOther value) | ||
{ | ||
_subscription.Dispose(); | ||
_parent.OtherComplete(); | ||
Dispose(); | ||
} | ||
} | ||
} | ||
} | ||
|
||
internal static class SkipUntilTerminalException | ||
{ | ||
internal static readonly Exception Instance = new Exception("No further exceptions"); | ||
} | ||
|
||
internal sealed class SkipUntil<TSource> : Producer<TSource, SkipUntil<TSource>._> | ||
{ | ||
private readonly IObservable<TSource> _source; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I missed those before merging, this will for now but it should probably be SetSingle (i.e. throw when already set).