From 694b719fad3e0de8aa4754c25d23b210f85424c8 Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Mon, 2 Jul 2018 22:17:55 +0200 Subject: [PATCH 1/7] Use a light recurring scheduling, similar to the `ToObservable()` class --- .../Linq/Observable/AppendPrepend.cs | 78 +++++++++++-------- 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs index 5e571398ee..1dec9b26f2 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs @@ -169,6 +169,7 @@ internal sealed class _ : IdentitySink private readonly TSource[] _appends; private readonly IScheduler _scheduler; private IDisposable _schedulerDisposable; + private volatile bool _disposed; public _(AppendPrependMultiple parent, IObserver observer) : base(observer) @@ -217,8 +218,10 @@ protected override void Dispose(bool disposing) { if (disposing) { + _disposed = true; Disposable.TryDispose(ref _schedulerDisposable); } + base.Dispose(disposing); } @@ -233,42 +236,41 @@ private IDisposable Schedule(TSource[] array, Action<_> continueWith) // to observe the cancellation and perform proper clean-up. In this case, // we're sure Loop will be entered, allowing us to dispose the enumerator. // - return longRunning.ScheduleLongRunning(new State(null, this, array, continueWith), Loop); + return longRunning.ScheduleLongRunning(new State(this, array, continueWith), Loop); + } + else + { + // + // We never allow the scheduled work to be cancelled. Instead, the _disposed flag + // is used to have LoopRec bail out and perform proper clean-up of the + // enumerator. + // + _scheduler.Schedule(new State(this, array, continueWith), (innerScheduler, state) => state._sink.LoopRec(innerScheduler, state)); + return Disposable.Empty; } - - // - // We never allow the scheduled work to be cancelled. Instead, the flag - // is used to have LoopRec bail out and perform proper clean-up of the - // enumerator. - // - var flag = new BooleanDisposable(); - _scheduler.Schedule(new State(flag, this, array, continueWith), LoopRec); - return flag; } private struct State { public readonly _ _sink; - public readonly ICancelable _flag; public readonly TSource[] _array; public readonly Action<_> _continue; public int _current; - public State(ICancelable flag, _ sink, TSource[] array, Action<_> c) + public State(_ sink, TSource[] array, Action<_> c) { _sink = sink; - _flag = flag; _continue = c; _array = array; _current = 0; } } - private void LoopRec(State state, Action recurse) + private IDisposable LoopRec(IScheduler scheduler, State state) { - if (state._flag.IsDisposed) + if (_disposed) { - return; + return Disposable.Empty; } var current = state._array[state._current]; @@ -279,10 +281,18 @@ private void LoopRec(State state, Action recurse) if (state._current == state._array.Length) { state._continue(state._sink); - return; + } + else + { + // + // We never allow the scheduled work to be cancelled. Instead, the _disposed flag + // is used to have LoopRec bail out and perform proper clean-up of the + // enumerator. + // + scheduler.Schedule(state, (innerScheduler, s) => s._sink.LoopRec(innerScheduler, s)); } - recurse(state); + return Disposable.Empty; } private void Loop(State state, ICancelable cancel) @@ -307,9 +317,9 @@ private void Loop(State state, ICancelable cancel) private sealed class Node { - private readonly Node _parent; - private readonly T _value; - private readonly int _count; + public readonly Node Parent; + public readonly T Value; + public readonly int Count; public Node(T value) : this(null, value) @@ -318,44 +328,44 @@ public Node(T value) public Node(Node parent, T value) { - _parent = parent; - _value = value; + Parent = parent; + Value = value; if (parent == null) { - _count = 1; + Count = 1; } else { - if (parent._count == int.MaxValue) + if (parent.Count == int.MaxValue) { throw new NotSupportedException($"Consecutive appends or prepends with a count of more than int.MaxValue ({int.MaxValue}) are not supported."); } - _count = parent._count + 1; + Count = parent.Count + 1; } } public T[] ToArray() { - var array = new T[_count]; + var array = new T[Count]; var current = this; - for (var i = 0; i < _count; i++) + for (var i = 0; i < Count; i++) { - array[i] = current._value; - current = current._parent; + array[i] = current.Value; + current = current.Parent; } return array; } public T[] ToReverseArray() { - var array = new T[_count]; + var array = new T[Count]; var current = this; - for (var i = _count - 1; i >= 0; i--) + for (var i = Count - 1; i >= 0; i--) { - array[i] = current._value; - current = current._parent; + array[i] = current.Value; + current = current.Parent; } return array; } From 029bef360a8e3a52266f7a567e7dc18410aa8263 Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Tue, 3 Jul 2018 21:20:51 +0200 Subject: [PATCH 2/7] Split AppendPrependMultiple into two classes depending on the scheduler --- .../Linq/Observable/AppendPrepend.cs | 223 ++++++++++++++---- .../Linq/QueryLanguage.Single.cs | 2 +- 2 files changed, 177 insertions(+), 48 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs index 1dec9b26f2..7fb8982c96 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs @@ -16,7 +16,7 @@ internal interface IAppendPrepend : IObservable IScheduler Scheduler { get; } } - internal sealed class AppendPrependSingle : Producer._>, IAppendPrepend + internal sealed class SingleValue : Producer._>, IAppendPrepend { private readonly IObservable _source; private readonly TSource _value; @@ -24,7 +24,7 @@ internal sealed class AppendPrependSingle : Producer source, TSource value, IScheduler scheduler, bool append) + public SingleValue(IObservable source, TSource value, IScheduler scheduler, bool append) { _source = source; _value = value; @@ -35,29 +35,49 @@ public AppendPrependSingle(IObservable source, TSource value, ISchedule public IAppendPrepend Append(TSource value) { var prev = new Node(_value); + var appendNode = default(Node); + var prependNode = default(Node); if (_append) { - return new AppendPrependMultiple(_source, - null, new Node(prev, value), Scheduler); + appendNode = new Node(prev, value); + } + else + { + prependNode = prev; + appendNode = new Node(value); } - return new AppendPrependMultiple(_source, - prev, new Node(value), Scheduler); + return CreateAppendPrepend(prependNode, appendNode); } public IAppendPrepend Prepend(TSource value) { var prev = new Node(_value); + var appendNode = default(Node); + var prependNode = default(Node); if (_append) { - return new AppendPrependMultiple(_source, - new Node(value), prev, Scheduler); + prependNode = new Node(value); + appendNode = prev; + } + else + { + prependNode = new Node(prev, value); } - return new AppendPrependMultiple(_source, - new Node(prev, value), null, Scheduler); + return CreateAppendPrepend(prependNode, appendNode); + } + + private IAppendPrepend CreateAppendPrepend(Node prepend, Node append) + { + if (Scheduler is ISchedulerLongRunning longRunning) + { + return new LongRunning(_source, prepend, append, Scheduler, longRunning); + } + + return new Recursive(_source, prepend, append, Scheduler); } protected override _ CreateSink(IObserver observer) => new _(this, observer); @@ -72,7 +92,7 @@ internal sealed class _ : IdentitySink private readonly bool _append; private IDisposable _schedulerDisposable; - public _(AppendPrependSingle parent, IObserver observer) + public _(SingleValue parent, IObserver observer) : base(observer) { _source = parent._source; @@ -126,7 +146,7 @@ protected override void Dispose(bool disposing) } } - private sealed class AppendPrependMultiple : Producer._>, IAppendPrepend + private sealed class Recursive : Producer._>, IAppendPrepend { private readonly IObservable _source; private readonly Node _appends; @@ -134,7 +154,7 @@ private sealed class AppendPrependMultiple : Producer source, Node prepends, Node appends, IScheduler scheduler) + public Recursive(IObservable source, Node prepends, Node appends, IScheduler scheduler) { _source = source; _appends = appends; @@ -144,13 +164,13 @@ public AppendPrependMultiple(IObservable source, Node prepends public IAppendPrepend Append(TSource value) { - return new AppendPrependMultiple(_source, + return new Recursive(_source, _prepends, new Node(_appends, value), Scheduler); } public IAppendPrepend Prepend(TSource value) { - return new AppendPrependMultiple(_source, + return new Recursive(_source, new Node(_prepends, value), _appends, Scheduler); } @@ -168,10 +188,9 @@ internal sealed class _ : IdentitySink private readonly TSource[] _prepends; private readonly TSource[] _appends; private readonly IScheduler _scheduler; - private IDisposable _schedulerDisposable; private volatile bool _disposed; - public _(AppendPrependMultiple parent, IObserver observer) + public _(Recursive parent, IObserver observer) : base(observer) { _source = parent._source; @@ -192,8 +211,13 @@ public void Run() { if (_prepends != null) { - var disposable = Schedule(_prepends, s => s.SetUpstream(s._source.SubscribeSafe(s))); - Disposable.TrySetSingle(ref _schedulerDisposable, disposable); + Action<_ > continueWith = s => s.SetUpstream(s._source.SubscribeSafe(s)); + // + // We never allow the scheduled work to be cancelled. Instead, the _disposed flag + // is used to have LoopRec bail out and perform proper clean-up of the + // enumerator. + // + _scheduler.Schedule(new State(this, _prepends, continueWith), (innerScheduler, state) => state._sink.LoopRec(innerScheduler, state)); } else { @@ -205,8 +229,13 @@ public override void OnCompleted() { if (_appends != null) { - var disposable = Schedule(_appends, s => s.ForwardOnCompleted()); - Disposable.TrySetSerial(ref _schedulerDisposable, disposable); + Action<_> continueWith = s => s.ForwardOnCompleted(); + // + // We never allow the scheduled work to be cancelled. Instead, the _disposed flag + // is used to have LoopRec bail out and perform proper clean-up of the + // enumerator. + // + _scheduler.Schedule(new State(this, _appends, continueWith), (innerScheduler, state) => state._sink.LoopRec(innerScheduler, state)); } else { @@ -219,37 +248,11 @@ protected override void Dispose(bool disposing) if (disposing) { _disposed = true; - Disposable.TryDispose(ref _schedulerDisposable); } base.Dispose(disposing); } - private IDisposable Schedule(TSource[] array, Action<_> continueWith) - { - var longRunning = _scheduler.AsLongRunning(); - if (longRunning != null) - { - // - // Long-running schedulers have the contract they should *never* prevent - // the work from starting, such that the scheduled work has the chance - // to observe the cancellation and perform proper clean-up. In this case, - // we're sure Loop will be entered, allowing us to dispose the enumerator. - // - return longRunning.ScheduleLongRunning(new State(this, array, continueWith), Loop); - } - else - { - // - // We never allow the scheduled work to be cancelled. Instead, the _disposed flag - // is used to have LoopRec bail out and perform proper clean-up of the - // enumerator. - // - _scheduler.Schedule(new State(this, array, continueWith), (innerScheduler, state) => state._sink.LoopRec(innerScheduler, state)); - return Disposable.Empty; - } - } - private struct State { public readonly _ _sink; @@ -294,6 +297,132 @@ private IDisposable LoopRec(IScheduler scheduler, State state) return Disposable.Empty; } + } + } + + private sealed class LongRunning : Producer._>, IAppendPrepend + { + private readonly IObservable _source; + private readonly Node _appends; + private readonly Node _prepends; + private readonly ISchedulerLongRunning _longRunningScheduler; + + public IScheduler Scheduler { get; } + + public LongRunning(IObservable source, Node prepends, Node appends, IScheduler scheduler, ISchedulerLongRunning longRunningScheduler) + { + _source = source; + _appends = appends; + _prepends = prepends; + Scheduler = scheduler; + _longRunningScheduler = longRunningScheduler; + } + + public IAppendPrepend Append(TSource value) + { + return new LongRunning(_source, + _prepends, new Node(_appends, value), Scheduler, _longRunningScheduler); + } + + public IAppendPrepend Prepend(TSource value) + { + return new LongRunning(_source, + new Node(_prepends, value), _appends, Scheduler, _longRunningScheduler); + } + + protected override _ CreateSink(IObserver observer) => new _(this, observer); + + protected override void Run(_ sink) => sink.Run(); + + // The sink is based on the sink of the ToObervalbe class and does basically + // the same twice, once for the append list and once for the prepend list. + // Inbetween it forwards the values of the source class. + // + internal sealed class _ : IdentitySink + { + private readonly IObservable _source; + private readonly TSource[] _prepends; + private readonly TSource[] _appends; + private readonly ISchedulerLongRunning _scheduler; + private IDisposable _schedulerDisposable; + + public _(LongRunning parent, IObserver observer) + : base(observer) + { + _source = parent._source; + _scheduler = parent._longRunningScheduler; + + if (parent._prepends != null) + { + _prepends = parent._prepends.ToArray(); + } + + if (parent._appends != null) + { + _appends = parent._appends.ToReverseArray(); + } + } + + public void Run() + { + if (_prepends != null) + { + var disposable = Schedule(_prepends, s => s.SetUpstream(s._source.SubscribeSafe(s))); + Disposable.TrySetSingle(ref _schedulerDisposable, disposable); + } + else + { + SetUpstream(_source.SubscribeSafe(this)); + } + } + + public override void OnCompleted() + { + if (_appends != null) + { + var disposable = Schedule(_appends, s => s.ForwardOnCompleted()); + Disposable.TrySetSerial(ref _schedulerDisposable, disposable); + } + else + { + ForwardOnCompleted(); + } + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + Disposable.TryDispose(ref _schedulerDisposable); + } + + base.Dispose(disposing); + } + + private IDisposable Schedule(TSource[] array, Action<_> continueWith) + { + // + // Long-running schedulers have the contract they should *never* prevent + // the work from starting, such that the scheduled work has the chance + // to observe the cancellation and perform proper clean-up. In this case, + // we're sure Loop will be entered, allowing us to dispose the enumerator. + // + return _scheduler.ScheduleLongRunning(new State(this, array, continueWith), Loop); + } + + private struct State + { + public readonly _ _sink; + public readonly TSource[] _array; + public readonly Action<_> _continue; + + public State(_ sink, TSource[] array, Action<_> c) + { + _sink = sink; + _continue = c; + _array = array; + } + } private void Loop(State state, ICancelable cancel) { diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs index 5e9a678626..6439cf64da 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs @@ -216,7 +216,7 @@ private static IObservable Prepend_(IObservable sourc return new AppendPrepend.AppendPrependSingleImmediate(source, value, false); } - return new AppendPrepend.AppendPrependSingle(source, value, scheduler, append: false); + return new AppendPrepend.SingleValue(source, value, scheduler, append: false); } #endregion From 65d458dce6e265152ad2b7c63f9ea268435b5d80 Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Tue, 3 Jul 2018 21:42:02 +0200 Subject: [PATCH 3/7] Move the generic type argument to the enclosing static class --- .../Linq/Observable/AppendPrepend.cs | 46 +++++++++---------- .../Linq/QueryLanguage.Single.cs | 10 ++-- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs index 7fb8982c96..0b3c3628ae 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs @@ -7,16 +7,16 @@ namespace System.Reactive.Linq.ObservableImpl { - internal static class AppendPrepend + internal static class AppendPrepend { - internal interface IAppendPrepend : IObservable + internal interface IAppendPrepend : IObservable { - IAppendPrepend Append(TSource value); - IAppendPrepend Prepend(TSource value); + IAppendPrepend Append(TSource value); + IAppendPrepend Prepend(TSource value); IScheduler Scheduler { get; } } - internal sealed class SingleValue : Producer._>, IAppendPrepend + internal sealed class SingleValue : Producer, IAppendPrepend { private readonly IObservable _source; private readonly TSource _value; @@ -32,7 +32,7 @@ public SingleValue(IObservable source, TSource value, IScheduler schedu Scheduler = scheduler; } - public IAppendPrepend Append(TSource value) + public IAppendPrepend Append(TSource value) { var prev = new Node(_value); var appendNode = default(Node); @@ -51,7 +51,7 @@ public IAppendPrepend Append(TSource value) return CreateAppendPrepend(prependNode, appendNode); } - public IAppendPrepend Prepend(TSource value) + public IAppendPrepend Prepend(TSource value) { var prev = new Node(_value); var appendNode = default(Node); @@ -70,14 +70,14 @@ public IAppendPrepend Prepend(TSource value) return CreateAppendPrepend(prependNode, appendNode); } - private IAppendPrepend CreateAppendPrepend(Node prepend, Node append) + private IAppendPrepend CreateAppendPrepend(Node prepend, Node append) { if (Scheduler is ISchedulerLongRunning longRunning) { - return new LongRunning(_source, prepend, append, Scheduler, longRunning); + return new LongRunning(_source, prepend, append, Scheduler, longRunning); } - return new Recursive(_source, prepend, append, Scheduler); + return new Recursive(_source, prepend, append, Scheduler); } protected override _ CreateSink(IObserver observer) => new _(this, observer); @@ -92,7 +92,7 @@ internal sealed class _ : IdentitySink private readonly bool _append; private IDisposable _schedulerDisposable; - public _(SingleValue parent, IObserver observer) + public _(SingleValue parent, IObserver observer) : base(observer) { _source = parent._source; @@ -146,7 +146,7 @@ protected override void Dispose(bool disposing) } } - private sealed class Recursive : Producer._>, IAppendPrepend + private sealed class Recursive : Producer, IAppendPrepend { private readonly IObservable _source; private readonly Node _appends; @@ -162,15 +162,15 @@ public Recursive(IObservable source, Node prepends, Node Append(TSource value) + public IAppendPrepend Append(TSource value) { - return new Recursive(_source, + return new Recursive(_source, _prepends, new Node(_appends, value), Scheduler); } - public IAppendPrepend Prepend(TSource value) + public IAppendPrepend Prepend(TSource value) { - return new Recursive(_source, + return new Recursive(_source, new Node(_prepends, value), _appends, Scheduler); } @@ -190,7 +190,7 @@ internal sealed class _ : IdentitySink private readonly IScheduler _scheduler; private volatile bool _disposed; - public _(Recursive parent, IObserver observer) + public _(Recursive parent, IObserver observer) : base(observer) { _source = parent._source; @@ -300,7 +300,7 @@ private IDisposable LoopRec(IScheduler scheduler, State state) } } - private sealed class LongRunning : Producer._>, IAppendPrepend + private sealed class LongRunning : Producer, IAppendPrepend { private readonly IObservable _source; private readonly Node _appends; @@ -318,15 +318,15 @@ public LongRunning(IObservable source, Node prepends, Node Append(TSource value) + public IAppendPrepend Append(TSource value) { - return new LongRunning(_source, + return new LongRunning(_source, _prepends, new Node(_appends, value), Scheduler, _longRunningScheduler); } - public IAppendPrepend Prepend(TSource value) + public IAppendPrepend Prepend(TSource value) { - return new LongRunning(_source, + return new LongRunning(_source, new Node(_prepends, value), _appends, Scheduler, _longRunningScheduler); } @@ -346,7 +346,7 @@ internal sealed class _ : IdentitySink private readonly ISchedulerLongRunning _scheduler; private IDisposable _schedulerDisposable; - public _(LongRunning parent, IObserver observer) + public _(LongRunning parent, IObserver observer) : base(observer) { _source = parent._source; diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs index 6439cf64da..186fca779e 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs @@ -26,15 +26,15 @@ public virtual IObservable Append(IObservable source, private static IObservable Append_(IObservable source, TSource value, IScheduler scheduler) { - if (source is AppendPrepend.IAppendPrepend ap && ap.Scheduler == scheduler) + if (source is AppendPrepend.IAppendPrepend ap && ap.Scheduler == scheduler) { return ap.Append(value); } if (scheduler == ImmediateScheduler.Instance) { - return new AppendPrepend.AppendPrependSingleImmediate(source, value, true); + return new AppendPrepend.SingleImmediate(source, value, true); } - return new AppendPrepend.AppendPrependSingle(source, value, scheduler, append: true); + return new AppendPrepend.SingleValue(source, value, scheduler, append: true); } #endregion @@ -206,7 +206,7 @@ public virtual IObservable Prepend(IObservable source private static IObservable Prepend_(IObservable source, TSource value, IScheduler scheduler) { - if (source is AppendPrepend.IAppendPrepend ap && ap.Scheduler == scheduler) + if (source is AppendPrepend.IAppendPrepend ap && ap.Scheduler == scheduler) { return ap.Prepend(value); } @@ -216,7 +216,7 @@ private static IObservable Prepend_(IObservable sourc return new AppendPrepend.AppendPrependSingleImmediate(source, value, false); } - return new AppendPrepend.SingleValue(source, value, scheduler, append: false); + return new AppendPrepend.SingleValue(source, value, scheduler, append: false); } #endregion From 900f162b5d0ad0cc83045a31ab27763a44c2d914 Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Wed, 4 Jul 2018 09:20:09 +0200 Subject: [PATCH 4/7] - Add extra loop for the prepend values, that saves the allocation and generation of the first array - Move the state directly to the sink, since we do not intend to loop in parallel. - Some reorders --- .../Linq/Observable/AppendPrepend.cs | 153 ++++++++---------- 1 file changed, 69 insertions(+), 84 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs index 0b3c3628ae..58dc7979f9 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs @@ -185,9 +185,11 @@ public IAppendPrepend Prepend(TSource value) internal sealed class _ : IdentitySink { private readonly IObservable _source; - private readonly TSource[] _prepends; private readonly TSource[] _appends; private readonly IScheduler _scheduler; + + private Node _currentPrependNode; + private int _currentAppendIndex; private volatile bool _disposed; public _(Recursive parent, IObserver observer) @@ -195,11 +197,7 @@ public _(Recursive parent, IObserver observer) { _source = parent._source; _scheduler = parent.Scheduler; - - if (parent._prepends != null) - { - _prepends = parent._prepends.ToArray(); - } + _currentPrependNode = parent._prepends; if (parent._appends != null) { @@ -209,37 +207,35 @@ public _(Recursive parent, IObserver observer) public void Run() { - if (_prepends != null) + if (_currentPrependNode == null) + { + SetUpstream(_source.SubscribeSafe(this)); + } + else { - Action<_ > continueWith = s => s.SetUpstream(s._source.SubscribeSafe(s)); // // We never allow the scheduled work to be cancelled. Instead, the _disposed flag // is used to have LoopRec bail out and perform proper clean-up of the // enumerator. // - _scheduler.Schedule(new State(this, _prepends, continueWith), (innerScheduler, state) => state._sink.LoopRec(innerScheduler, state)); - } - else - { - SetUpstream(_source.SubscribeSafe(this)); + _scheduler.Schedule(this, (innerScheduler, @this) => @this.PrependValues(innerScheduler)); } } public override void OnCompleted() { - if (_appends != null) + if (_appends == null) + { + ForwardOnCompleted(); + } + else { - Action<_> continueWith = s => s.ForwardOnCompleted(); // // We never allow the scheduled work to be cancelled. Instead, the _disposed flag // is used to have LoopRec bail out and perform proper clean-up of the // enumerator. // - _scheduler.Schedule(new State(this, _appends, continueWith), (innerScheduler, state) => state._sink.LoopRec(innerScheduler, state)); - } - else - { - ForwardOnCompleted(); + _scheduler.Schedule(this, (innerScheduler, @this) => @this.AppendValues(innerScheduler)); } } @@ -253,37 +249,49 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private struct State + private IDisposable PrependValues(IScheduler scheduler) { - public readonly _ _sink; - public readonly TSource[] _array; - public readonly Action<_> _continue; - public int _current; + if (_disposed) + { + return Disposable.Empty; + } + + var current = _currentPrependNode.Value; + ForwardOnNext(current); - public State(_ sink, TSource[] array, Action<_> c) + _currentPrependNode = _currentPrependNode.Parent; + if (_currentPrependNode == null) { - _sink = sink; - _continue = c; - _array = array; - _current = 0; + SetUpstream(_source.SubscribeSafe(this)); + } + else + { + // + // We never allow the scheduled work to be cancelled. Instead, the _disposed flag + // is used to have LoopRec bail out and perform proper clean-up of the + // enumerator. + // + scheduler.Schedule(this, (innerScheduler, @this) => @this.PrependValues(innerScheduler)); } + + return Disposable.Empty; } - private IDisposable LoopRec(IScheduler scheduler, State state) + private IDisposable AppendValues(IScheduler scheduler) { if (_disposed) { return Disposable.Empty; } - var current = state._array[state._current]; + var current = _appends[_currentAppendIndex]; ForwardOnNext(current); - state._current++; + _currentAppendIndex++; - if (state._current == state._array.Length) + if (_currentAppendIndex == _appends.Length) { - state._continue(state._sink); + ForwardOnCompleted(); } else { @@ -292,7 +300,7 @@ private IDisposable LoopRec(IScheduler scheduler, State state) // is used to have LoopRec bail out and perform proper clean-up of the // enumerator. // - scheduler.Schedule(state, (innerScheduler, s) => s._sink.LoopRec(innerScheduler, s)); + scheduler.Schedule(this, (innerScheduler, @this) => @this.AppendValues(innerScheduler)); } return Disposable.Empty; @@ -341,9 +349,10 @@ public IAppendPrepend Prepend(TSource value) internal sealed class _ : IdentitySink { private readonly IObservable _source; - private readonly TSource[] _prepends; + private readonly Node _prepends; private readonly TSource[] _appends; private readonly ISchedulerLongRunning _scheduler; + private IDisposable _schedulerDisposable; public _(LongRunning parent, IObserver observer) @@ -352,11 +361,7 @@ public _(LongRunning parent, IObserver observer) _source = parent._source; _scheduler = parent._longRunningScheduler; - if (parent._prepends != null) - { - _prepends = parent._prepends.ToArray(); - } - + _prepends = parent._prepends; if (parent._appends != null) { _appends = parent._appends.ToReverseArray(); @@ -365,27 +370,27 @@ public _(LongRunning parent, IObserver observer) public void Run() { - if (_prepends != null) + if (_prepends == null) { - var disposable = Schedule(_prepends, s => s.SetUpstream(s._source.SubscribeSafe(s))); - Disposable.TrySetSingle(ref _schedulerDisposable, disposable); + SetUpstream(_source.SubscribeSafe(this)); } else { - SetUpstream(_source.SubscribeSafe(this)); + var disposable = _scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.PrependValues(cancel)); + Disposable.TrySetSingle(ref _schedulerDisposable, disposable); } } public override void OnCompleted() { - if (_appends != null) + if (_appends == null) { - var disposable = Schedule(_appends, s => s.ForwardOnCompleted()); - Disposable.TrySetSerial(ref _schedulerDisposable, disposable); + ForwardOnCompleted(); } else { - ForwardOnCompleted(); + var disposable = _scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.AppendValues(cancel)); + Disposable.TrySetSerial(ref _schedulerDisposable, disposable); } } @@ -399,34 +404,26 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private IDisposable Schedule(TSource[] array, Action<_> continueWith) - { - // - // Long-running schedulers have the contract they should *never* prevent - // the work from starting, such that the scheduled work has the chance - // to observe the cancellation and perform proper clean-up. In this case, - // we're sure Loop will be entered, allowing us to dispose the enumerator. - // - return _scheduler.ScheduleLongRunning(new State(this, array, continueWith), Loop); - } - - private struct State + private void PrependValues(ICancelable cancel) { - public readonly _ _sink; - public readonly TSource[] _array; - public readonly Action<_> _continue; + var current = _prepends; - public State(_ sink, TSource[] array, Action<_> c) + while (!cancel.IsDisposed) { - _sink = sink; - _continue = c; - _array = array; + ForwardOnNext(current.Value); + current = current.Parent; + + if (current == null) + { + SetUpstream(_source.SubscribeSafe(this)); + break; + } } } - private void Loop(State state, ICancelable cancel) + private void AppendValues(ICancelable cancel) { - var array = state._array; + var array = _appends; var i = 0; while (!cancel.IsDisposed) @@ -436,7 +433,7 @@ private void Loop(State state, ICancelable cancel) if (i == array.Length) { - state._continue(state._sink); + ForwardOnCompleted(); break; } } @@ -475,18 +472,6 @@ public Node(Node parent, T value) } } - public T[] ToArray() - { - var array = new T[Count]; - var current = this; - for (var i = 0; i < Count; i++) - { - array[i] = current.Value; - current = current.Parent; - } - return array; - } - public T[] ToReverseArray() { var array = new T[Count]; From 2ffc0754d95c9da9f4e418901435a12843617890 Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Thu, 5 Jul 2018 10:25:51 +0200 Subject: [PATCH 5/7] Delay append array creation until the array is really needed --- .../Linq/Observable/AppendPrepend.cs | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs index 58dc7979f9..2e78b8d66d 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs @@ -185,10 +185,11 @@ public IAppendPrepend Prepend(TSource value) internal sealed class _ : IdentitySink { private readonly IObservable _source; - private readonly TSource[] _appends; + private readonly Node _appends; private readonly IScheduler _scheduler; private Node _currentPrependNode; + private TSource[] _appendArray; private int _currentAppendIndex; private volatile bool _disposed; @@ -198,11 +199,7 @@ public _(Recursive parent, IObserver observer) _source = parent._source; _scheduler = parent.Scheduler; _currentPrependNode = parent._prepends; - - if (parent._appends != null) - { - _appends = parent._appends.ToReverseArray(); - } + _appends = parent._appends; } public void Run() @@ -230,6 +227,7 @@ public override void OnCompleted() } else { + _appendArray = _appends.ToReverseArray(); // // We never allow the scheduled work to be cancelled. Instead, the _disposed flag // is used to have LoopRec bail out and perform proper clean-up of the @@ -284,12 +282,12 @@ private IDisposable AppendValues(IScheduler scheduler) return Disposable.Empty; } - var current = _appends[_currentAppendIndex]; + var current = _appendArray[_currentAppendIndex]; ForwardOnNext(current); _currentAppendIndex++; - if (_currentAppendIndex == _appends.Length) + if (_currentAppendIndex == _appendArray.Length) { ForwardOnCompleted(); } @@ -350,7 +348,7 @@ internal sealed class _ : IdentitySink { private readonly IObservable _source; private readonly Node _prepends; - private readonly TSource[] _appends; + private readonly Node _appends; private readonly ISchedulerLongRunning _scheduler; private IDisposable _schedulerDisposable; @@ -360,12 +358,8 @@ public _(LongRunning parent, IObserver observer) { _source = parent._source; _scheduler = parent._longRunningScheduler; - _prepends = parent._prepends; - if (parent._appends != null) - { - _appends = parent._appends.ToReverseArray(); - } + _appends = parent._appends; } public void Run() @@ -423,7 +417,7 @@ private void PrependValues(ICancelable cancel) private void AppendValues(ICancelable cancel) { - var array = _appends; + var array = _appends.ToReverseArray(); var i = 0; while (!cancel.IsDisposed) From 4a49a9716a99a14e864862c25ddb1bf166ca7ed7 Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Fri, 6 Jul 2018 10:45:41 +0200 Subject: [PATCH 6/7] Introduce a common base class for the single values, and resolve some rebase conflicts. --- .../Linq/Observable/AppendPrepend.cs | 70 +++++++------------ .../Linq/QueryLanguage.Single.cs | 4 +- 2 files changed, 26 insertions(+), 48 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs index 2e78b8d66d..6346d33608 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs @@ -16,20 +16,20 @@ internal interface IAppendPrepend : IObservable IScheduler Scheduler { get; } } - internal sealed class SingleValue : Producer, IAppendPrepend + internal abstract class SingleBase : Producer, IAppendPrepend + where TSink : IDisposable { - private readonly IObservable _source; - private readonly TSource _value; - private readonly bool _append; + protected readonly IObservable _source; + protected readonly TSource _value; + protected readonly bool _append; - public IScheduler Scheduler { get; } + public abstract IScheduler Scheduler { get; } - public SingleValue(IObservable source, TSource value, IScheduler scheduler, bool append) + public SingleBase(IObservable source, TSource value, bool append) { _source = source; _value = value; _append = append; - Scheduler = scheduler; } public IAppendPrepend Append(TSource value) @@ -79,6 +79,18 @@ private IAppendPrepend CreateAppendPrepend(Node prepend, Node return new Recursive(_source, prepend, append, Scheduler); } + } + + + internal sealed class SingleValue : SingleBase + { + public override IScheduler Scheduler { get; } + + public SingleValue(IObservable source, TSource value, IScheduler scheduler, bool append) + : base (source, value, append) + { + Scheduler = scheduler; + } protected override _ CreateSink(IObserver observer) => new _(this, observer); @@ -479,47 +491,13 @@ public T[] ToReverseArray() } } - internal sealed class AppendPrependSingleImmediate : Producer._>, IAppendPrepend + internal sealed class SingleImmediate : SingleBase { - private readonly IObservable _source; - private readonly TSource _value; - private readonly bool _append; - - public IScheduler Scheduler { get { return ImmediateScheduler.Instance; } } - - public AppendPrependSingleImmediate(IObservable source, TSource value, bool append) - { - _source = source; - _value = value; - _append = append; - } - - public IAppendPrepend Append(TSource value) - { - var prev = new Node(_value); - - if (_append) - { - return new AppendPrependMultiple(_source, - null, new Node(prev, value), Scheduler); - } - - return new AppendPrependMultiple(_source, - prev, new Node(value), Scheduler); - } + public override IScheduler Scheduler => ImmediateScheduler.Instance; - public IAppendPrepend Prepend(TSource value) + public SingleImmediate(IObservable source, TSource value, bool append) + : base(source, value, append) { - var prev = new Node(_value); - - if (_append) - { - return new AppendPrependMultiple(_source, - new Node(value), prev, Scheduler); - } - - return new AppendPrependMultiple(_source, - new Node(prev, value), null, Scheduler); } protected override _ CreateSink(IObserver observer) => new _(this, observer); @@ -532,7 +510,7 @@ internal sealed class _ : IdentitySink private readonly TSource _value; private readonly bool _append; - public _(AppendPrependSingleImmediate parent, IObserver observer) + public _(SingleImmediate parent, IObserver observer) : base(observer) { _source = parent._source; diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs index 186fca779e..e80b5bf123 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs @@ -32,7 +32,7 @@ private static IObservable Append_(IObservable source } if (scheduler == ImmediateScheduler.Instance) { - return new AppendPrepend.SingleImmediate(source, value, true); + return new AppendPrepend.SingleImmediate(source, value, append: true); } return new AppendPrepend.SingleValue(source, value, scheduler, append: true); } @@ -213,7 +213,7 @@ private static IObservable Prepend_(IObservable sourc if (scheduler == ImmediateScheduler.Instance) { - return new AppendPrepend.AppendPrependSingleImmediate(source, value, false); + return new AppendPrepend.SingleImmediate(source, value, append: false); } return new AppendPrepend.SingleValue(source, value, scheduler, append: false); From f91f4492e1c14e76e2b22aa0003bc2cbefa62881 Mon Sep 17 00:00:00 2001 From: Peter Wehrfritz Date: Fri, 6 Jul 2018 15:01:59 +0200 Subject: [PATCH 7/7] Fix comments --- .../System.Reactive/Linq/Observable/AppendPrepend.cs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs index 6346d33608..bd87771fd5 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs @@ -224,8 +224,7 @@ public void Run() { // // We never allow the scheduled work to be cancelled. Instead, the _disposed flag - // is used to have LoopRec bail out and perform proper clean-up of the - // enumerator. + // is used to have PrependValues() bail out. // _scheduler.Schedule(this, (innerScheduler, @this) => @this.PrependValues(innerScheduler)); } @@ -242,8 +241,7 @@ public override void OnCompleted() _appendArray = _appends.ToReverseArray(); // // We never allow the scheduled work to be cancelled. Instead, the _disposed flag - // is used to have LoopRec bail out and perform proper clean-up of the - // enumerator. + // is used to have `AppendValues` bail out. // _scheduler.Schedule(this, (innerScheduler, @this) => @this.AppendValues(innerScheduler)); } @@ -278,8 +276,7 @@ private IDisposable PrependValues(IScheduler scheduler) { // // We never allow the scheduled work to be cancelled. Instead, the _disposed flag - // is used to have LoopRec bail out and perform proper clean-up of the - // enumerator. + // is used to have PrependValues() bail out. // scheduler.Schedule(this, (innerScheduler, @this) => @this.PrependValues(innerScheduler)); } @@ -307,8 +304,7 @@ private IDisposable AppendValues(IScheduler scheduler) { // // We never allow the scheduled work to be cancelled. Instead, the _disposed flag - // is used to have LoopRec bail out and perform proper clean-up of the - // enumerator. + // is used to have AppendValues() bail out. // scheduler.Schedule(this, (innerScheduler, @this) => @this.AppendValues(innerScheduler)); }