diff --git a/Rx.NET/Source/src/System.Reactive/Disposables/ContextDisposable.cs b/Rx.NET/Source/src/System.Reactive/Disposables/ContextDisposable.cs index ae3f210284..7a63c1f55c 100644 --- a/Rx.NET/Source/src/System.Reactive/Disposables/ContextDisposable.cs +++ b/Rx.NET/Source/src/System.Reactive/Disposables/ContextDisposable.cs @@ -12,7 +12,7 @@ namespace System.Reactive.Disposables /// public sealed class ContextDisposable : ICancelable { - private volatile IDisposable _disposable; + private IDisposable _disposable; /// /// Initializes a new instance of the class that uses the specified on which to dispose the specified disposable resource. @@ -28,7 +28,7 @@ public ContextDisposable(SynchronizationContext context, IDisposable disposable) throw new ArgumentNullException(nameof(disposable)); Context = context; - _disposable = disposable; + Disposable.SetSingle(ref _disposable, disposable); } /// @@ -39,19 +39,14 @@ public ContextDisposable(SynchronizationContext context, IDisposable disposable) /// /// Gets a value that indicates whether the object is disposed. /// - public bool IsDisposed => _disposable == BooleanDisposable.True; + public bool IsDisposed => Disposable.GetIsDisposed(ref _disposable); /// /// Disposes the underlying disposable on the provided . /// public void Dispose() { - var disposable = Interlocked.Exchange(ref _disposable, BooleanDisposable.True); - - if (disposable != BooleanDisposable.True) - { - Context.PostWithStartComplete(d => d.Dispose(), disposable); - } + Disposable.TryRelease(ref _disposable, this.Context, (disposable, context) => context.PostWithStartComplete(d => d.Dispose(), disposable)); } } } diff --git a/Rx.NET/Source/src/System.Reactive/Disposables/Disposable.cs b/Rx.NET/Source/src/System.Reactive/Disposables/Disposable.cs index 0dfdf5a79c..ae75070f59 100644 --- a/Rx.NET/Source/src/System.Reactive/Disposables/Disposable.cs +++ b/Rx.NET/Source/src/System.Reactive/Disposables/Disposable.cs @@ -210,5 +210,16 @@ internal static bool TryDispose(ref IDisposable fieldRef) old?.Dispose(); return true; } + + internal static bool TryRelease(ref IDisposable fieldRef, TState state, Action disposeAction) + { + var old = Interlocked.Exchange(ref fieldRef, BooleanDisposable.True); + + if (old == BooleanDisposable.True) + return false; + + disposeAction(old, state); + return true; + } } } diff --git a/Rx.NET/Source/src/System.Reactive/Disposables/ScheduledDisposable.cs b/Rx.NET/Source/src/System.Reactive/Disposables/ScheduledDisposable.cs index c873337c1b..37a2dad740 100644 --- a/Rx.NET/Source/src/System.Reactive/Disposables/ScheduledDisposable.cs +++ b/Rx.NET/Source/src/System.Reactive/Disposables/ScheduledDisposable.cs @@ -12,7 +12,7 @@ namespace System.Reactive.Disposables /// public sealed class ScheduledDisposable : ICancelable { - private volatile IDisposable _disposable; + private IDisposable _disposable; /// /// Initializes a new instance of the class that uses an on which to dispose the disposable. @@ -28,7 +28,7 @@ public ScheduledDisposable(IScheduler scheduler, IDisposable disposable) throw new ArgumentNullException(nameof(disposable)); Scheduler = scheduler; - _disposable = disposable; + Disposables.Disposable.SetSingle(ref _disposable, disposable); } /// @@ -39,39 +39,16 @@ public ScheduledDisposable(IScheduler scheduler, IDisposable disposable) /// /// Gets the underlying disposable. After disposal, the result is undefined. /// - public IDisposable Disposable - { - get - { - var current = _disposable; - - if (current == BooleanDisposable.True) - { - return Disposables.Disposable.Empty; // Don't leak the sentinel value. - } - - return current; - } - } + public IDisposable Disposable => Disposables.Disposable.GetValueOrDefault(ref _disposable); /// /// Gets a value that indicates whether the object is disposed. /// - public bool IsDisposed => _disposable == BooleanDisposable.True; + public bool IsDisposed => Disposables.Disposable.GetIsDisposed(ref _disposable); /// /// Disposes the wrapped disposable on the provided scheduler. /// - public void Dispose() => Scheduler.Schedule(DisposeInner); - - private void DisposeInner() - { - var disposable = Interlocked.Exchange(ref _disposable, BooleanDisposable.True); - - if (disposable != BooleanDisposable.True) - { - disposable.Dispose(); - } - } + public void Dispose() => Scheduler.Schedule(scheduler => Disposables.Disposable.TryDispose(ref scheduler._disposable), this); } } diff --git a/Rx.NET/Source/src/System.Reactive/Internal/AutoDetachObserver.cs b/Rx.NET/Source/src/System.Reactive/Internal/AutoDetachObserver.cs index 0a4959aaa9..6c2243896d 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/AutoDetachObserver.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/AutoDetachObserver.cs @@ -11,7 +11,7 @@ internal sealed class AutoDetachObserver : ObserverBase { private readonly IObserver _observer; - private IDisposable disposable; + private IDisposable _disposable; public AutoDetachObserver(IObserver observer) { @@ -20,17 +20,7 @@ public AutoDetachObserver(IObserver observer) public IDisposable Disposable { - set - { - if (Interlocked.CompareExchange(ref disposable, value, null) != null) - { - value?.Dispose(); - if (Volatile.Read(ref disposable) != BooleanDisposable.True) - { - throw new InvalidOperationException(Strings_Core.DISPOSABLE_ALREADY_ASSIGNED); - } - } - } + set => Disposables.Disposable.SetSingle(ref _disposable, value); } protected override void OnNextCore(T value) @@ -111,7 +101,7 @@ protected override void Dispose(bool disposing) if (disposing) { - Interlocked.Exchange(ref disposable, BooleanDisposable.True)?.Dispose(); + Disposables.Disposable.TryDispose(ref _disposable); } } } diff --git a/Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs b/Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs index 6e0deeba3b..d0fec93b58 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs @@ -401,8 +401,8 @@ public ObserveOnObserverNew(IScheduler scheduler, IObserver downstream, IDisp public void Dispose() { Volatile.Write(ref disposed, true); - Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose(); - Interlocked.Exchange(ref task, BooleanDisposable.True)?.Dispose(); + Disposable.TryDispose(ref upstream); + Disposable.TryDispose(ref task); Clear(); } @@ -443,14 +443,10 @@ void Schedule() { if (Interlocked.Increment(ref wip) == 1) { - var oldTask = Volatile.Read(ref task); - var newTask = new SingleAssignmentDisposable(); - if (oldTask != BooleanDisposable.True - && Interlocked.CompareExchange(ref task, newTask, oldTask) == oldTask) + if (Disposable.TrySetMultiple(ref task, newTask)) { - var longRunning = this.longRunning; if (longRunning != null) { diff --git a/Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs b/Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs index bcaa0d2e9a..07b212566d 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs @@ -68,10 +68,8 @@ void Drain() var enumerator = stack.Pop(); enumerator.Dispose(); } - if (Volatile.Read(ref currentSubscription) != BooleanDisposable.True) - { - Interlocked.Exchange(ref currentSubscription, BooleanDisposable.True)?.Dispose(); - } + + Disposable.TryDispose(ref currentSubscription); } else { @@ -131,7 +129,8 @@ void Drain() else { var sad = new SingleAssignmentDisposable(); - if (Interlocked.CompareExchange(ref currentSubscription, sad, null) == null) + + if (Disposable.TrySetSingle(ref currentSubscription, sad) == TrySetSingleResult.Success) { sad.Disposable = next.SubscribeSafe(this); } @@ -172,15 +171,8 @@ void DisposeAll() protected void Recurse() { - var d = Volatile.Read(ref currentSubscription); - if (d != BooleanDisposable.True) - { - d?.Dispose(); - if (Interlocked.CompareExchange(ref currentSubscription, null, d) == d) - { - Drain(); - } - } + if (Disposable.TrySetSerial(ref currentSubscription, null)) + Drain(); } protected abstract IEnumerable> Extract(IObservable source); diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs index edf6e93905..c55d41a851 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs @@ -103,7 +103,7 @@ internal void OnSubscribe(IDisposable d) public void Dispose() { - Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose(); + Disposable.TryDispose(ref upstream); } public void OnCompleted() diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AmbMany.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AmbMany.cs index 12e788794f..9588951d50 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AmbMany.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AmbMany.cs @@ -153,7 +153,7 @@ public InnerObserver(AmbCoordinator parent, int index) public void Dispose() { - Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose(); + Disposable.TryDispose(ref upstream); } public void OnCompleted() diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs index f0d1cb8de0..c7d25aa69f 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs @@ -66,12 +66,12 @@ public void Dispose() void DisposeMain() { - Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose(); + Disposable.TryDispose(ref upstream); } bool IsDisposed() { - return Volatile.Read(ref upstream) == BooleanDisposable.True; + return Disposable.GetIsDisposed(ref upstream); } public void OnCompleted() diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RetryWhen.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RetryWhen.cs index 2afe11bc72..81eb8d8765 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/RetryWhen.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/RetryWhen.cs @@ -200,7 +200,7 @@ internal void OnSubscribe(IDisposable d) public void Dispose() { - Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose(); + Disposable.TryDispose(ref upstream); } public void OnCompleted() diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs index 8f305d8b91..db2cd19ccd 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs @@ -163,7 +163,7 @@ public TaskDisposeCompletionObserver(IObserver observer) public void Dispose() { - Interlocked.Exchange(ref disposable, BooleanDisposable.True)?.Dispose(); + Disposable.TryDispose(ref disposable); } public void OnCompleted()