diff --git a/Rx.NET/Source/src/System.Reactive/Internal/ConcatSink.cs b/Rx.NET/Source/src/System.Reactive/Internal/ConcatSink.cs index fa79c1f697..1872a640d6 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/ConcatSink.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/ConcatSink.cs @@ -15,6 +15,6 @@ public ConcatSink(IObserver observer, IDisposable cancel) protected override IEnumerable> Extract(IObservable source) => (source as IConcatenatable)?.GetSources(); - public override void OnCompleted() => _recurse(); + public override void OnCompleted() => Recurse(); } } diff --git a/Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs b/Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs index 99167c64cf..66eb46e712 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Threading; namespace System.Reactive { @@ -15,164 +16,176 @@ public TailRecursiveSink(IObserver observer, IDisposable cancel) { } - private bool _isDisposed; - private SerialDisposable _subscription; - private AsyncLock _gate; - private Stack>> _stack; - private Stack _length; - protected Action _recurse; + bool _isDisposed; + + int trampoline; + + IDisposable currentSubscription; + + Stack>> stack; public IDisposable Run(IEnumerable> sources) { - _isDisposed = false; - _subscription = new SerialDisposable(); - _gate = new AsyncLock(); - _stack = new Stack>>(); - _length = new Stack(); - - if (!TryGetEnumerator(sources, out var e)) + if (!TryGetEnumerator(sources, out var current)) return Disposable.Empty; - _stack.Push(e); - _length.Push(Helpers.GetLength(sources)); + stack = new Stack>>(); + stack.Push(current); - var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => - { - _recurse = self; - _gate.Wait(MoveNext); - }); + Drain(); - return StableCompositeDisposable.Create(_subscription, cancelable, Disposable.Create(() => _gate.Wait(Dispose))); + return new RecursiveSinkDisposable(this); } - protected abstract IEnumerable> Extract(IObservable source); - - private void MoveNext() + sealed class RecursiveSinkDisposable : IDisposable { - var hasNext = false; - var next = default(IObservable); + readonly TailRecursiveSink parent; - do + public RecursiveSinkDisposable(TailRecursiveSink parent) { - if (_stack.Count == 0) - break; + this.parent = parent; + } - if (_isDisposed) - return; + public void Dispose() + { + parent.DisposeAll(); + } + } - var e = _stack.Peek(); - var l = _length.Peek(); + void Drain() + { + if (Interlocked.Increment(ref trampoline) != 1) + { + return; + } - var current = default(IObservable); - try + for (; ; ) + { + if (Volatile.Read(ref _isDisposed)) { - hasNext = e.MoveNext(); - if (hasNext) + while (stack.Count != 0) { - current = e.Current; + var enumerator = stack.Pop(); + enumerator.Dispose(); + } + if (Volatile.Read(ref currentSubscription) != BooleanDisposable.True) + { + Interlocked.Exchange(ref currentSubscription, BooleanDisposable.True)?.Dispose(); } - } - catch (Exception ex) - { - e.Dispose(); - - // - // Failure to enumerate the sequence cannot be handled, even by - // operators like Catch, because it'd lead to another attempt at - // enumerating to find the next observable sequence. Therefore, - // we feed those errors directly to the observer. - // - _observer.OnError(ex); - base.Dispose(); - return; - } - - if (!hasNext) - { - e.Dispose(); - - _stack.Pop(); - _length.Pop(); } else { - var r = l - 1; - _length.Pop(); - _length.Push(r); - - try + if (stack.Count != 0) { - next = Helpers.Unpack(current); - } - catch (Exception exception) - { - // - // Errors from unpacking may produce side-effects that normally - // would occur during a SubscribeSafe operation. Those would feed - // back into the observer and be subject to the operator's error - // handling behavior. For example, Catch would allow to handle - // the error using a handler function. - // - if (!Fail(exception)) + var currentEnumerator = stack.Peek(); + + var currentObservable = default(IObservable); + var next = default(IObservable); + + try + { + if (currentEnumerator.MoveNext()) + { + currentObservable = currentEnumerator.Current; + } + } + catch (Exception ex) { - e.Dispose(); + currentEnumerator.Dispose(); + _observer.OnError(ex); + base.Dispose(); + Volatile.Write(ref _isDisposed, true); + continue; } - return; - } + try + { + next = Helpers.Unpack(currentObservable); - // - // Tail recursive case; drop the current frame. - // - if (r == 0) - { - e.Dispose(); + } + catch (Exception ex) + { + next = null; + if (!Fail(ex)) + { + Volatile.Write(ref _isDisposed, true); + } + continue; + } - _stack.Pop(); - _length.Pop(); + if (next != null) + { + var nextSeq = Extract(next); + if (nextSeq != null) + { + if (TryGetEnumerator(nextSeq, out var nextEnumerator)) + { + stack.Push(nextEnumerator); + continue; + } + else + { + Volatile.Write(ref _isDisposed, true); + continue; + } + } + else + { + var sad = new SingleAssignmentDisposable(); + if (Interlocked.CompareExchange(ref currentSubscription, sad, null) == null) + { + sad.Disposable = next.SubscribeSafe(this); + } + else + { + continue; + } + } + } + else + { + stack.Pop(); + currentEnumerator.Dispose(); + continue; + } } - - // - // Flattening of nested sequences. Prevents stack overflow in observers. - // - var nextSeq = Extract(next); - if (nextSeq != null) + else { - if (!TryGetEnumerator(nextSeq, out var nextEnumerator)) - return; - - _stack.Push(nextEnumerator); - _length.Push(Helpers.GetLength(nextSeq)); - - hasNext = false; + Volatile.Write(ref _isDisposed, true); + Done(); } } - } while (!hasNext); - if (!hasNext) - { - Done(); - return; + if (Interlocked.Decrement(ref trampoline) == 0) + { + break; + } } + } - var d = new SingleAssignmentDisposable(); - _subscription.Disposable = d; - d.Disposable = next.SubscribeSafe(this); + void DisposeAll() + { + Volatile.Write(ref _isDisposed, true); + // the disposing of currentSubscription is deferred to drain due to some ObservableExTest.Iterate_Complete() + // Interlocked.Exchange(ref currentSubscription, BooleanDisposable.True)?.Dispose(); + Drain(); } - private new void Dispose() + protected void Recurse() { - while (_stack.Count > 0) + var d = Volatile.Read(ref currentSubscription); + if (d != BooleanDisposable.True) { - var e = _stack.Pop(); - _length.Pop(); - - e.Dispose(); + d?.Dispose(); + if (Interlocked.CompareExchange(ref currentSubscription, null, d) == d) + { + Drain(); + } } - - _isDisposed = true; } + protected abstract IEnumerable> Extract(IObservable source); + private bool TryGetEnumerator(IEnumerable> sources, out IEnumerator> result) { try @@ -182,12 +195,6 @@ private bool TryGetEnumerator(IEnumerable> sources, out IEn } catch (Exception exception) { - // - // Failure to enumerate the sequence cannot be handled, even by - // operators like Catch, because it'd lead to another attempt at - // enumerating to find the next observable sequence. Therefore, - // we feed those errors directly to the observer. - // _observer.OnError(exception); base.Dispose(); diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs index 458868772d..6fc3147fcd 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs @@ -45,7 +45,7 @@ public override void OnNext(TSource value) public override void OnError(Exception error) { _lastException = error; - _recurse(); + Recurse(); } public override void OnCompleted() diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/OnErrorResumeNext.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/OnErrorResumeNext.cs index 7bf40099ca..05f01378e5 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/OnErrorResumeNext.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/OnErrorResumeNext.cs @@ -41,12 +41,12 @@ public override void OnNext(TSource value) public override void OnError(Exception error) { - _recurse(); + Recurse(); } public override void OnCompleted() { - _recurse(); + Recurse(); } protected override bool Fail(Exception error)