From 5449869cd7bed830e9b74eccef821fe5e8748408 Mon Sep 17 00:00:00 2001 From: Lee Campbell Date: Wed, 1 Jan 2014 01:12:11 +0000 Subject: [PATCH] Adding ReplaySubject perf improvements --- .../Reactive/Subjects/ReplaySubject.cs | 761 +++++++--- .../Tests/Linq/Subjects/ReplaySubjectTest.cs | 1262 ++++++++++++++++- 2 files changed, 1788 insertions(+), 235 deletions(-) diff --git a/Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs b/Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs index 8f08d51a1e..f3407f70da 100644 --- a/Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs +++ b/Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; +using System.Threading; namespace System.Reactive.Subjects { @@ -12,21 +13,7 @@ namespace System.Reactive.Subjects /// The type of the elements processed by the subject. public sealed class ReplaySubject : ISubject, IDisposable { - private const int InfiniteBufferSize = int.MaxValue; - - private readonly int _bufferSize; - private readonly TimeSpan _window; - private readonly IScheduler _scheduler; - private readonly IStopwatch _stopwatch; - - private readonly Queue> _queue; - private bool _isStopped; - private Exception _error; - - private ImmutableList> _observers; - private bool _isDisposed; - - private readonly object _gate = new object(); + private readonly IReplaySubjectImplementation _implementation; /// /// Initializes a new instance of the class with the specified buffer size, window and scheduler. @@ -38,23 +25,7 @@ public sealed class ReplaySubject : ISubject, IDisposable /// is null. public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler) { - if (bufferSize < 0) - throw new ArgumentOutOfRangeException("bufferSize"); - if (window < TimeSpan.Zero) - throw new ArgumentOutOfRangeException("window"); - if (scheduler == null) - throw new ArgumentNullException("scheduler"); - - _bufferSize = bufferSize; - _window = window; - _scheduler = scheduler; - - _stopwatch = _scheduler.StartStopwatch(); - _queue = new Queue>(); - _isStopped = false; - _error = null; - - _observers = new ImmutableList>(); + _implementation = new ReplayByTime(bufferSize, window, scheduler); } /// @@ -64,16 +35,16 @@ public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler) /// Maximum time length of the replay buffer. /// is less than zero. -or- is less than TimeSpan.Zero. public ReplaySubject(int bufferSize, TimeSpan window) - : this(bufferSize, window, SchedulerDefaults.Iteration) { + _implementation = new ReplayByTime(bufferSize, window); } /// /// Initializes a new instance of the class. /// public ReplaySubject() - : this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration) { + _implementation = new ReplayAll(); } /// @@ -82,10 +53,12 @@ public ReplaySubject() /// Scheduler the observers are invoked on. /// is null. public ReplaySubject(IScheduler scheduler) - : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler) { + _implementation = new ReplayByTime(scheduler); } + //TODO: Does this overload make any sense with the optimisations? Surely this now is just new ReplaySubject(bufferSize).SubscribeOn(scheduler)? + //Potentially should be marked as obsolete /// /// Initializes a new instance of the class with the specified buffer size and scheduler. /// @@ -94,8 +67,8 @@ public ReplaySubject(IScheduler scheduler) /// is null. /// is less than zero. public ReplaySubject(int bufferSize, IScheduler scheduler) - : this(bufferSize, TimeSpan.MaxValue, scheduler) { + _implementation = new ReplayByTime(bufferSize, scheduler); } /// @@ -104,8 +77,19 @@ public ReplaySubject(int bufferSize, IScheduler scheduler) /// Maximum element count of the replay buffer. /// is less than zero. public ReplaySubject(int bufferSize) - : this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration) { + switch (bufferSize) + { + case 1: + _implementation = new ReplayOne(); + break; + case int.MaxValue: + _implementation = new ReplayAll(); + break; + default: + _implementation = new ReplayMany(bufferSize); + break; + } } /// @@ -116,8 +100,8 @@ public ReplaySubject(int bufferSize) /// is null. /// is less than TimeSpan.Zero. public ReplaySubject(TimeSpan window, IScheduler scheduler) - : this(InfiniteBufferSize, window, scheduler) { + _implementation = new ReplayByTime(window, scheduler); } /// @@ -126,8 +110,8 @@ public ReplaySubject(TimeSpan window, IScheduler scheduler) /// Maximum time length of the replay buffer. /// is less than TimeSpan.Zero. public ReplaySubject(TimeSpan window) - : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration) { + _implementation = new ReplayByTime(window); } /// @@ -135,19 +119,7 @@ public ReplaySubject(TimeSpan window) /// public bool HasObservers { - get - { - var observers = _observers; - return observers != null && observers.Data.Length > 0; - } - } - - void Trim(TimeSpan now) - { - while (_queue.Count > _bufferSize) - _queue.Dequeue(); - while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0) - _queue.Dequeue(); + get { return _implementation.HasObservers; } } /// @@ -156,26 +128,7 @@ void Trim(TimeSpan now) /// The value to send to all observers. public void OnNext(T value) { - var o = default(ScheduledObserver[]); - lock (_gate) - { - CheckDisposed(); - - if (!_isStopped) - { - var now = _stopwatch.Elapsed; - _queue.Enqueue(new TimeInterval(value, now)); - Trim(now); - - o = _observers.Data; - foreach (var observer in o) - observer.OnNext(value); - } - } - - if (o != null) - foreach (var observer in o) - observer.EnsureActive(); + _implementation.OnNext(value); } /// @@ -185,169 +138,603 @@ public void OnNext(T value) /// is null. public void OnError(Exception error) { - if (error == null) - throw new ArgumentNullException("error"); + _implementation.OnError(error); + } - var o = default(ScheduledObserver[]); - lock (_gate) - { - CheckDisposed(); + /// + /// Notifies all subscribed and future observers about the end of the sequence. + /// + public void OnCompleted() + { + _implementation.OnCompleted(); + } - if (!_isStopped) - { - var now = _stopwatch.Elapsed; - _isStopped = true; - _error = error; - Trim(now); + /// + /// Subscribes an observer to the subject. + /// + /// Observer to subscribe to the subject. + /// Disposable object that can be used to unsubscribe the observer from the subject. + /// is null. + public IDisposable Subscribe(IObserver observer) + { + return _implementation.Subscribe(observer); + } - o = _observers.Data; - foreach (var observer in o) - observer.OnError(error); + /// + /// Releases all resources used by the current instance of the class and unsubscribe all observers. + /// + public void Dispose() + { + _implementation.Dispose(); + } - _observers = new ImmutableList>(); - } + private interface IReplaySubjectImplementation : ISubject, IDisposable + { + bool HasObservers { get; } + void Unsubscribe(IObserver observer); + } + + private class Subscription : IDisposable + { + private IReplaySubjectImplementation _subject; + private IObserver _observer; + + public Subscription(IReplaySubjectImplementation subject, IObserver observer) + { + _subject = subject; + _observer = observer; } - if (o != null) - foreach (var observer in o) - observer.EnsureActive(); + public void Dispose() + { + var observer = Interlocked.Exchange(ref _observer, null); + if (observer == null) + return; + + _subject.Unsubscribe(observer); + _subject = null; + } } - /// - /// Notifies all subscribed and future observers about the end of the sequence. - /// - public void OnCompleted() + //Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time). + private sealed class ReplayByTime : IReplaySubjectImplementation { - var o = default(ScheduledObserver[]); - lock (_gate) + private const int InfiniteBufferSize = int.MaxValue; + + private readonly int _bufferSize; + private readonly TimeSpan _window; + private readonly IScheduler _scheduler; + private readonly IStopwatch _stopwatch; + + private readonly Queue> _queue; + private bool _isStopped; + private Exception _error; + + private ImmutableList> _observers; + private bool _isDisposed; + + private readonly object _gate = new object(); + + public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler) + { + if (bufferSize < 0) + throw new ArgumentOutOfRangeException("bufferSize"); + if (window < TimeSpan.Zero) + throw new ArgumentOutOfRangeException("window"); + if (scheduler == null) + throw new ArgumentNullException("scheduler"); + + _bufferSize = bufferSize; + _window = window; + _scheduler = scheduler; + + _stopwatch = _scheduler.StartStopwatch(); + _queue = new Queue>(); + _isStopped = false; + _error = null; + + _observers = new ImmutableList>(); + } + + public ReplayByTime(int bufferSize, TimeSpan window) + : this(bufferSize, window, SchedulerDefaults.Iteration) + { + } + + public ReplayByTime(IScheduler scheduler) + : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler) + { + } + + public ReplayByTime(int bufferSize, IScheduler scheduler) + : this(bufferSize, TimeSpan.MaxValue, scheduler) + { + } + + public ReplayByTime(TimeSpan window, IScheduler scheduler) + : this(InfiniteBufferSize, window, scheduler) + { + } + + public ReplayByTime(TimeSpan window) + : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration) { - CheckDisposed(); + } + + public bool HasObservers + { + get + { + var observers = _observers; + return observers != null && observers.Data.Length > 0; + } + } + + private void Trim(TimeSpan now) + { + while (_queue.Count > _bufferSize) + _queue.Dequeue(); + while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0) + _queue.Dequeue(); + } - if (!_isStopped) + public void OnNext(T value) + { + var o = default(ScheduledObserver[]); + lock (_gate) { - var now = _stopwatch.Elapsed; - _isStopped = true; - Trim(now); + CheckDisposed(); + + if (!_isStopped) + { + var now = _stopwatch.Elapsed; + _queue.Enqueue(new TimeInterval(value, now)); + Trim(now); + + o = _observers.Data; + foreach (var observer in o) + observer.OnNext(value); + } + } - o = _observers.Data; + if (o != null) foreach (var observer in o) - observer.OnCompleted(); + observer.EnsureActive(); + } + + public void OnError(Exception error) + { + if (error == null) + throw new ArgumentNullException("error"); - _observers = new ImmutableList>(); + var o = default(ScheduledObserver[]); + lock (_gate) + { + CheckDisposed(); + + if (!_isStopped) + { + var now = _stopwatch.Elapsed; + _isStopped = true; + _error = error; + Trim(now); + + o = _observers.Data; + foreach (var observer in o) + observer.OnError(error); + + _observers = new ImmutableList>(); + } } + + if (o != null) + foreach (var observer in o) + observer.EnsureActive(); } - if (o != null) - foreach (var observer in o) - observer.EnsureActive(); - } + public void OnCompleted() + { + var o = default(ScheduledObserver[]); + lock (_gate) + { + CheckDisposed(); - /// - /// Subscribes an observer to the subject. - /// - /// Observer to subscribe to the subject. - /// Disposable object that can be used to unsubscribe the observer from the subject. - /// is null. - public IDisposable Subscribe(IObserver observer) - { - if (observer == null) - throw new ArgumentNullException("observer"); - - var so = new ScheduledObserver(_scheduler, observer); - - var n = 0; - - var subscription = new RemovableDisposable(this, so); - lock (_gate) - { - CheckDisposed(); - - // - // Notice the v1.x behavior of always calling Trim is preserved here. - // - // This may be subject (pun intended) of debate: should this policy - // only be applied while the sequence is active? With the current - // behavior, a sequence will "die out" after it has terminated by - // continuing to drop OnNext notifications from the queue. - // - // In v1.x, this behavior was due to trimming based on the clock value - // returned by scheduler.Now, applied to all but the terminal message - // in the queue. Using the IStopwatch has the same effect. Either way, - // we guarantee the final notification will be observed, but there's - // no way to retain the buffer directly. One approach is to use the - // time-based TakeLast operator and apply an unbounded ReplaySubject - // to it. - // - // To conclude, we're keeping the behavior as-is for compatibility - // reasons with v1.x. - // - Trim(_stopwatch.Elapsed); - _observers = _observers.Add(so); - - n = _queue.Count; - foreach (var item in _queue) - so.OnNext(item.Value); + if (!_isStopped) + { + var now = _stopwatch.Elapsed; + _isStopped = true; + Trim(now); + + o = _observers.Data; + foreach (var observer in o) + observer.OnCompleted(); + + _observers = new ImmutableList>(); + } + } - if (_error != null) + if (o != null) + foreach (var observer in o) + observer.EnsureActive(); + } + + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + var so = new ScheduledObserver(_scheduler, observer); + + var n = 0; + + //var subscription = new Subscription(this, so); + var subscription = new RemovableDisposable(this, so); + lock (_gate) { - n++; - so.OnError(_error); + CheckDisposed(); + + // + // Notice the v1.x behavior of always calling Trim is preserved here. + // + // This may be subject (pun intended) of debate: should this policy + // only be applied while the sequence is active? With the current + // behavior, a sequence will "die out" after it has terminated by + // continuing to drop OnNext notifications from the queue. + // + // In v1.x, this behavior was due to trimming based on the clock value + // returned by scheduler.Now, applied to all but the terminal message + // in the queue. Using the IStopwatch has the same effect. Either way, + // we guarantee the final notification will be observed, but there's + // no way to retain the buffer directly. One approach is to use the + // time-based TakeLast operator and apply an unbounded ReplaySubject + // to it. + // + // To conclude, we're keeping the behavior as-is for compatibility + // reasons with v1.x. + // + Trim(_stopwatch.Elapsed); + _observers = _observers.Add(so); + + n = _queue.Count; + foreach (var item in _queue) + so.OnNext(item.Value); + + if (_error != null) + { + n++; + so.OnError(_error); + } + else if (_isStopped) + { + n++; + so.OnCompleted(); + } } - else if (_isStopped) + + so.EnsureActive(n); + + return subscription; + } + + //public void Unsubscribe(IObserver observer) + public void Unsubscribe(ScheduledObserver observer) + { + lock (_gate) + { + //var so = (ScheduledObserver)observer; + //so.Dispose(); + if (!_isDisposed) + _observers = _observers.Remove(observer); + } + } + //public void Unsubscribe(IObserver observer) + void IReplaySubjectImplementation.Unsubscribe(IObserver observer) + { + var so = (ScheduledObserver)observer; + Unsubscribe(so); + } + + sealed class RemovableDisposable : IDisposable + { + private readonly ReplayByTime _subject; + private readonly ScheduledObserver _observer; + + public RemovableDisposable(ReplayByTime subject, ScheduledObserver observer) + { + _subject = subject; + _observer = observer; + } + + public void Dispose() { - n++; - so.OnCompleted(); + _observer.Dispose(); + _subject.Unsubscribe(_observer); } } - so.EnsureActive(n); + private void CheckDisposed() + { + if (_isDisposed) + throw new ObjectDisposedException(string.Empty); + } - return subscription; + public void Dispose() + { + lock (_gate) + { + _isDisposed = true; + _observers = null; + //_queue.Clear(); + } + } } - void Unsubscribe(ScheduledObserver observer) + + + + + + + + + + + + + //Below are the non-time based implementations. + //These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action. + //The ReplayOne implementation also removes the need to even have a queue. + private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation { - lock (_gate) + private bool _hasValue; + private T _value; + + protected override void Trim() { - if (!_isDisposed) - _observers = _observers.Remove(observer); + //NoOp. No need to trim. + } + + protected override void AddValueToBuffer(T value) + { + _hasValue = true; + _value = value; + } + + protected override void ReplayBuffer(IObserver observer) + { + if (_hasValue) + observer.OnNext(_value); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + _value = default(T); } } - sealed class RemovableDisposable : IDisposable + private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation { - private readonly ReplaySubject _subject; - private readonly ScheduledObserver _observer; + private readonly int _bufferSize; - public RemovableDisposable(ReplaySubject subject, ScheduledObserver observer) + public ReplayMany(int bufferSize) + : base(bufferSize) { - _subject = subject; - _observer = observer; + _bufferSize = bufferSize; } - public void Dispose() + protected override void Trim() { - _observer.Dispose(); - _subject.Unsubscribe(_observer); + while (Queue.Count > _bufferSize) + Queue.Dequeue(); } } - void CheckDisposed() + private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation { - if (_isDisposed) - throw new ObjectDisposedException(string.Empty); + public ReplayAll() + : base(0) + { + } + + protected override void Trim() + { + //NoOp; i.e. Dont' trim, keep all values. + } } - /// - /// Releases all resources used by the current instance of the class and unsubscribe all observers. - /// - public void Dispose() + private abstract class ReplayBufferBase : IReplaySubjectImplementation + { + private readonly object _gate = new object(); + private bool _isDisposed; + private bool _isStopped; + private Exception _error; + private ImmutableList> _observers; + + protected ReplayBufferBase() + { + _observers = new ImmutableList>(); + } + + protected abstract void Trim(); + protected abstract void AddValueToBuffer(T value); + protected abstract void ReplayBuffer(IObserver observer); + + public bool HasObservers + { + get + { + var observers = _observers; + return observers != null && observers.Data.Length > 0; + } + } + + public void OnNext(T value) + { + lock (_gate) + { + CheckDisposed(); + + if (!_isStopped) + { + AddValueToBuffer(value); + Trim(); + + var o = _observers.Data; + foreach (var observer in o) + observer.OnNext(value); + } + } + } + + public void OnError(Exception error) + { + if (error == null) + throw new ArgumentNullException("error"); + + lock (_gate) + { + CheckDisposed(); + + if (!_isStopped) + { + _isStopped = true; + _error = error; + Trim(); + + var o = _observers.Data; + foreach (var observer in o) + observer.OnError(error); + + _observers = new ImmutableList>(); + } + } + } + + public void OnCompleted() + { + lock (_gate) + { + CheckDisposed(); + + if (!_isStopped) + { + _isStopped = true; + Trim(); + + var o = _observers.Data; + foreach (var observer in o) + observer.OnCompleted(); + + _observers = new ImmutableList>(); + } + } + } + + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + throw new ArgumentNullException("observer"); + + + var subscription = new Subscription(this, observer); + lock (_gate) + { + CheckDisposed(); + + // + // Notice the v1.x behavior of always calling Trim is preserved here. + // + // This may be subject (pun intended) of debate: should this policy + // only be applied while the sequence is active? With the current + // behavior, a sequence will "die out" after it has terminated by + // continuing to drop OnNext notifications from the queue. + // + // In v1.x, this behavior was due to trimming based on the clock value + // returned by scheduler.Now, applied to all but the terminal message + // in the queue. Using the IStopwatch has the same effect. Either way, + // we guarantee the final notification will be observed, but there's + // no way to retain the buffer directly. One approach is to use the + // time-based TakeLast operator and apply an unbounded ReplaySubject + // to it. + // + // To conclude, we're keeping the behavior as-is for compatibility + // reasons with v1.x. + // + _observers = _observers.Add(observer); + + ReplayBuffer(observer); + + if (_error != null) + { + observer.OnError(_error); + } + else if (_isStopped) + { + observer.OnCompleted(); + } + } + + return subscription; + } + + public void Unsubscribe(IObserver observer) + { + lock (_gate) + { + if (!_isDisposed) + _observers = _observers.Remove(observer); + } + } + + private void CheckDisposed() + { + if (_isDisposed) + throw new ObjectDisposedException(string.Empty); + } + + public void Dispose() + { + Dispose(true); + } + protected virtual void Dispose(bool disposing) + { + lock (_gate) + { + _isDisposed = true; + _observers = null; + } + } + } + + private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation { - lock (_gate) + private readonly Queue _queue; + + protected ReplayManyBase(int queueSize) + : base() + { + _queue = new Queue(queueSize); + } + + protected Queue Queue { get { return _queue; } } + + protected override void AddValueToBuffer(T value) + { + _queue.Enqueue(value); + } + + protected override void ReplayBuffer(IObserver observer) + { + foreach (var item in _queue) + observer.OnNext(item); + } + + protected override void Dispose(bool disposing) { - _isDisposed = true; - _observers = null; + base.Dispose(disposing); + _queue.Clear(); } } } -} +} \ No newline at end of file diff --git a/Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs b/Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs index 21159271a8..7537e30499 100644 --- a/Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs +++ b/Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System; +using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Subjects; using Microsoft.Reactive.Testing; @@ -16,11 +17,17 @@ public partial class ReplaySubjectTest : ReactiveTest public void Subscribe_ArgumentChecking() { ReactiveAssert.Throws(() => new ReplaySubject().Subscribe(null)); + ReactiveAssert.Throws(() => new ReplaySubject(1).Subscribe(null)); + ReactiveAssert.Throws(() => new ReplaySubject(2).Subscribe(null)); + ReactiveAssert.Throws(() => new ReplaySubject(DummyScheduler.Instance).Subscribe(null)); } [TestMethod] public void OnError_ArgumentChecking() { + ReactiveAssert.Throws(() => new ReplaySubject().OnError(null)); + ReactiveAssert.Throws(() => new ReplaySubject(1).OnError(null)); + ReactiveAssert.Throws(() => new ReplaySubject(2).OnError(null)); ReactiveAssert.Throws(() => new ReplaySubject(DummyScheduler.Instance).OnError(null)); } @@ -52,7 +59,7 @@ public void Constructor_ArgumentChecking() } [TestMethod] - public void Infinite() + public void Infinite_ReplayByTime() { var scheduler = new TestScheduler(); @@ -93,7 +100,6 @@ public void Infinite() scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); - scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); @@ -118,6 +124,226 @@ public void Infinite() OnNext(941, 11) ); } + [TestMethod] + public void Infinite_ReplayOne() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(70, 1), + OnNext(110, 2), + OnNext(220, 3), + OnNext(270, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7), + OnNext(630, 8), + OnNext(710, 9), + OnNext(870, 10), + OnNext(940, 11), + OnNext(1020, 12) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + var results4 = scheduler.CreateObserver(); + var subscription4 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1)); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1200, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + scheduler.ScheduleAbsolute(1100, () => subscription4 = subject.Subscribe(results4)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(300, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7) + ); + + results2.Messages.AssertEqual( + OnNext(400, 5), + OnNext(410, 6), + OnNext(520, 7), + OnNext(630, 8) + ); + + results3.Messages.AssertEqual( + OnNext(900, 10), + OnNext(940, 11) + ); + + results4.Messages.AssertEqual( + OnNext(1100, 12) + ); + } + [TestMethod] + public void Infinite_ReplayMany() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(70, 1), + OnNext(110, 2), + OnNext(220, 3), + OnNext(270, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7), + OnNext(630, 8), + OnNext(710, 9), + OnNext(870, 10), + OnNext(940, 11), + OnNext(1020, 12) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3)); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(300, 3), + OnNext(300, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7) + ); + + results2.Messages.AssertEqual( + OnNext(400, 3), + OnNext(400, 4), + OnNext(400, 5), + OnNext(410, 6), + OnNext(520, 7), + OnNext(630, 8) + ); + + results3.Messages.AssertEqual( + OnNext(900, 8), + OnNext(900, 9), + OnNext(900, 10), + OnNext(940, 11) + ); + } + [TestMethod] + public void Infinite_ReplayAll() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(70, 1), + OnNext(110, 2), + OnNext(220, 3), + OnNext(270, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7), + OnNext(630, 8), + OnNext(710, 9), + OnNext(870, 10), + OnNext(940, 11), + OnNext(1020, 12) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject()); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(300, 3), + OnNext(300, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7) + ); + + results2.Messages.AssertEqual( + OnNext(400, 3), + OnNext(400, 4), + OnNext(400, 5), + OnNext(410, 6), + OnNext(520, 7), + OnNext(630, 8) + ); + + results3.Messages.AssertEqual( + OnNext(900, 3), + OnNext(900, 4), + OnNext(900, 5), + OnNext(900, 6), + OnNext(900, 7), + OnNext(900, 8), + OnNext(900, 9), + OnNext(900, 10), + OnNext(940, 11) + ); + } + [TestMethod] public void Infinite2() @@ -191,7 +417,71 @@ public void Infinite2() } [TestMethod] - public void Finite() + public void Finite_ReplayByTime() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(70, 1), + OnNext(110, 2), + OnNext(220, 3), + OnNext(270, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7), + OnCompleted(630), + OnNext(640, 9), + OnCompleted(650), + OnError(660, new Exception()) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(301, 3), + OnNext(302, 4), + OnNext(341, 5), + OnNext(411, 6), + OnNext(521, 7) + ); + + results2.Messages.AssertEqual( + OnNext(401, 5), + OnNext(411, 6), + OnNext(521, 7), + OnCompleted(631) + ); + + results3.Messages.AssertEqual( + OnCompleted(901) + ); + } + [TestMethod] + public void Finite_ReplayOne() { var scheduler = new TestScheduler(); @@ -221,7 +511,533 @@ public void Finite() var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); - scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1)); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(300, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7) + ); + + results2.Messages.AssertEqual( + OnNext(400, 5), + OnNext(410, 6), + OnNext(520, 7), + OnCompleted(630) + ); + + results3.Messages.AssertEqual( + OnNext(900, 7), + OnCompleted(900) + ); + } + [TestMethod] + public void Finite_ReplayMany() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(70, 1), + OnNext(110, 2), + OnNext(220, 3), + OnNext(270, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7), + OnCompleted(630), + OnNext(640, 9), + OnCompleted(650), + OnError(660, new Exception()) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3)); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(300, 3), + OnNext(300, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7) + ); + + results2.Messages.AssertEqual( + OnNext(400, 3), + OnNext(400, 4), + OnNext(400, 5), + OnNext(410, 6), + OnNext(520, 7), + OnCompleted(630) + ); + + results3.Messages.AssertEqual( + OnNext(900, 5), + OnNext(900, 6), + OnNext(900, 7), + OnCompleted(900) + ); + } + [TestMethod] + public void Finite_ReplayAll() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnNext(70, 1), + OnNext(110, 2), + OnNext(220, 3), + OnNext(270, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7), + OnCompleted(630), + OnNext(640, 9), + OnCompleted(650), + OnError(660, new Exception()) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject()); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(300, 3), + OnNext(300, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7) + ); + + results2.Messages.AssertEqual( + OnNext(400, 3), + OnNext(400, 4), + OnNext(400, 5), + OnNext(410, 6), + OnNext(520, 7), + OnCompleted(630) + ); + + results3.Messages.AssertEqual( + OnNext(900, 3), + OnNext(900, 4), + OnNext(900, 5), + OnNext(900, 6), + OnNext(900, 7), + OnCompleted(900) + ); + } + + [TestMethod] + public void Error_ReplayByTime() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(70, 1), + OnNext(110, 2), + OnNext(220, 3), + OnNext(270, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7), + OnError(630, ex), + OnNext(640, 9), + OnCompleted(650), + OnError(660, new Exception()) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(301, 3), + OnNext(302, 4), + OnNext(341, 5), + OnNext(411, 6), + OnNext(521, 7) + ); + + results2.Messages.AssertEqual( + OnNext(401, 5), + OnNext(411, 6), + OnNext(521, 7), + OnError(631, ex) + ); + + results3.Messages.AssertEqual( + OnError(901, ex) + ); + } + [TestMethod] + public void Error_ReplayOne() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(70, 1), + OnNext(110, 2), + OnNext(220, 3), + OnNext(270, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7), + OnError(630, ex), + OnNext(640, 9), + OnCompleted(650), + OnError(660, new Exception()) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1)); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(300, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7) + ); + + results2.Messages.AssertEqual( + OnNext(400, 5), + OnNext(410, 6), + OnNext(520, 7), + OnError(630, ex) + ); + + results3.Messages.AssertEqual( + OnNext(900, 7), + OnError(900, ex) + ); + } + [TestMethod] + public void Error_ReplayMany() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(70, 1), + OnNext(110, 2), + OnNext(220, 3), + OnNext(270, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7), + OnError(630, ex), + OnNext(640, 9), + OnCompleted(650), + OnError(660, new Exception()) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3)); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(300, 3), + OnNext(300, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7) + ); + + results2.Messages.AssertEqual( + OnNext(400, 3), + OnNext(400, 4), + OnNext(400, 5), + OnNext(410, 6), + OnNext(520, 7), + OnError(630, ex) + ); + + results3.Messages.AssertEqual( + OnNext(900, 5), + OnNext(900, 6), + OnNext(900, 7), + OnError(900, ex) + ); + } + [TestMethod] + public void Error_ReplayAll() + { + var scheduler = new TestScheduler(); + + var ex = new Exception(); + + var xs = scheduler.CreateHotObservable( + OnNext(70, 1), + OnNext(110, 2), + OnNext(220, 3), + OnNext(270, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7), + OnError(630, ex), + OnNext(640, 9), + OnCompleted(650), + OnError(660, new Exception()) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject()); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(300, 3), + OnNext(300, 4), + OnNext(340, 5), + OnNext(410, 6), + OnNext(520, 7) + ); + + results2.Messages.AssertEqual( + OnNext(400, 3), + OnNext(400, 4), + OnNext(400, 5), + OnNext(410, 6), + OnNext(520, 7), + OnError(630, ex) + ); + + results3.Messages.AssertEqual( + OnNext(900, 3), + OnNext(900, 4), + OnNext(900, 5), + OnNext(900, 6), + OnNext(900, 7), + OnError(900, ex) + ); + } + + [TestMethod] + public void Canceled_ReplayByTime() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnCompleted(630), + OnNext(640, 9), + OnCompleted(650), + OnError(660, new Exception()) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); + scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); + scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); + + scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); + + scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); + + scheduler.Start(); + + results1.Messages.AssertEqual( + ); + + results2.Messages.AssertEqual( + OnCompleted(631) + ); + + results3.Messages.AssertEqual( + OnCompleted(901) + ); + } + [TestMethod] + public void Canceled_ReplayOne() + { + var scheduler = new TestScheduler(); + + var xs = scheduler.CreateHotObservable( + OnCompleted(630), + OnNext(640, 9), + OnCompleted(650), + OnError(660, new Exception()) + ); + + var subject = default(ReplaySubject); + var subscription = default(IDisposable); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); @@ -237,41 +1053,23 @@ public void Finite() scheduler.Start(); results1.Messages.AssertEqual( - OnNext(301, 3), - OnNext(302, 4), - OnNext(341, 5), - OnNext(411, 6), - OnNext(521, 7) ); results2.Messages.AssertEqual( - OnNext(401, 5), - OnNext(411, 6), - OnNext(521, 7), - OnCompleted(631) + OnCompleted(630) ); results3.Messages.AssertEqual( - OnCompleted(901) + OnCompleted(900) ); } - [TestMethod] - public void Error() + public void Canceled_ReplayMany() { var scheduler = new TestScheduler(); - var ex = new Exception(); - var xs = scheduler.CreateHotObservable( - OnNext(70, 1), - OnNext(110, 2), - OnNext(220, 3), - OnNext(270, 4), - OnNext(340, 5), - OnNext(410, 6), - OnNext(520, 7), - OnError(630, ex), + OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) @@ -289,7 +1087,7 @@ public void Error() var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); - scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); @@ -305,27 +1103,18 @@ public void Error() scheduler.Start(); results1.Messages.AssertEqual( - OnNext(301, 3), - OnNext(302, 4), - OnNext(341, 5), - OnNext(411, 6), - OnNext(521, 7) ); results2.Messages.AssertEqual( - OnNext(401, 5), - OnNext(411, 6), - OnNext(521, 7), - OnError(631, ex) + OnCompleted(630) ); results3.Messages.AssertEqual( - OnError(901, ex) + OnCompleted(900) ); } - [TestMethod] - public void Canceled() + public void Canceled_ReplayAll() { var scheduler = new TestScheduler(); @@ -348,7 +1137,7 @@ public void Canceled() var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); - scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject()); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); @@ -367,11 +1156,11 @@ public void Canceled() ); results2.Messages.AssertEqual( - OnCompleted(631) + OnCompleted(630) ); results3.Messages.AssertEqual( - OnCompleted(901) + OnCompleted(900) ); } @@ -435,7 +1224,187 @@ public void SubjectDisposed() OnNext(551, 5) ); } + [TestMethod] + public void SubjectDisposed_ReplayOne() + { + var scheduler = new TestScheduler(); + + var subject = default(ReplaySubject); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1)); + scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3)); + scheduler.ScheduleAbsolute(500, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(600, () => subject.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(800, () => subscription3.Dispose()); + + scheduler.ScheduleAbsolute(150, () => subject.OnNext(1)); + scheduler.ScheduleAbsolute(250, () => subject.OnNext(2)); + scheduler.ScheduleAbsolute(350, () => subject.OnNext(3)); + scheduler.ScheduleAbsolute(450, () => subject.OnNext(4)); + scheduler.ScheduleAbsolute(550, () => subject.OnNext(5)); + scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6))); + scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted())); + scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception()))); + scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe())); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(200, 1), + OnNext(250, 2), + OnNext(350, 3), + OnNext(450, 4) + ); + + results2.Messages.AssertEqual( + OnNext(300, 2), + OnNext(350, 3), + OnNext(450, 4), + OnNext(550, 5) + ); + + results3.Messages.AssertEqual( + OnNext(400, 3), + OnNext(450, 4), + OnNext(550, 5) + ); + } + [TestMethod] + public void SubjectDisposed_ReplayMany() + { + var scheduler = new TestScheduler(); + + var subject = default(ReplaySubject); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3)); + scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3)); + scheduler.ScheduleAbsolute(500, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(600, () => subject.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(800, () => subscription3.Dispose()); + + scheduler.ScheduleAbsolute(150, () => subject.OnNext(1)); + scheduler.ScheduleAbsolute(250, () => subject.OnNext(2)); + scheduler.ScheduleAbsolute(350, () => subject.OnNext(3)); + scheduler.ScheduleAbsolute(450, () => subject.OnNext(4)); + scheduler.ScheduleAbsolute(550, () => subject.OnNext(5)); + scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6))); + scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted())); + scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception()))); + scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe())); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(200, 1), + OnNext(250, 2), + OnNext(350, 3), + OnNext(450, 4) + ); + + results2.Messages.AssertEqual( + OnNext(300, 1), + OnNext(300, 2), + OnNext(350, 3), + OnNext(450, 4), + OnNext(550, 5) + ); + + results3.Messages.AssertEqual( + OnNext(400, 1), + OnNext(400, 2), + OnNext(400, 3), + OnNext(450, 4), + OnNext(550, 5) + ); + } + [TestMethod] + public void SubjectDisposed_ReplayAll() + { + var scheduler = new TestScheduler(); + + var subject = default(ReplaySubject); + + var results1 = scheduler.CreateObserver(); + var subscription1 = default(IDisposable); + + var results2 = scheduler.CreateObserver(); + var subscription2 = default(IDisposable); + + var results3 = scheduler.CreateObserver(); + var subscription3 = default(IDisposable); + + scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject()); + scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1)); + scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2)); + scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3)); + scheduler.ScheduleAbsolute(500, () => subscription1.Dispose()); + scheduler.ScheduleAbsolute(600, () => subject.Dispose()); + scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); + scheduler.ScheduleAbsolute(800, () => subscription3.Dispose()); + + scheduler.ScheduleAbsolute(150, () => subject.OnNext(1)); + scheduler.ScheduleAbsolute(250, () => subject.OnNext(2)); + scheduler.ScheduleAbsolute(350, () => subject.OnNext(3)); + scheduler.ScheduleAbsolute(450, () => subject.OnNext(4)); + scheduler.ScheduleAbsolute(550, () => subject.OnNext(5)); + scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6))); + scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted())); + scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception()))); + scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe())); + + scheduler.Start(); + + results1.Messages.AssertEqual( + OnNext(200, 1), + OnNext(250, 2), + OnNext(350, 3), + OnNext(450, 4) + ); + + results2.Messages.AssertEqual( + OnNext(300, 1), + OnNext(300, 2), + OnNext(350, 3), + OnNext(450, 4), + OnNext(550, 5) + ); + + results3.Messages.AssertEqual( + OnNext(400, 1), + OnNext(400, 2), + OnNext(400, 3), + OnNext(450, 4), + OnNext(550, 5) + ); + } + //TODO: Create a failing test for this for the other implementations (ReplayOne/Many/All). + //I Don't understand the behavior. + //I think it may have to do with calling Trim() on Subscription (as well as in the OnNext calls). -LC [TestMethod] public void ReplaySubjectDiesOut() { @@ -502,7 +1471,13 @@ public void ReplaySubjectDiesOut() [TestMethod] public void HasObservers() { - var s = new ReplaySubject(); + HasObservers(new ReplaySubject()); + HasObservers(new ReplaySubject(1)); + HasObservers(new ReplaySubject(3)); + HasObservers(new ReplaySubject(TimeSpan.FromSeconds(1))); + } + private static void HasObservers(ReplaySubject s) + { Assert.IsFalse(s.HasObservers); var d1 = s.Subscribe(_ => { }); @@ -527,7 +1502,13 @@ public void HasObservers() [TestMethod] public void HasObservers_Dispose1() { - var s = new ReplaySubject(); + HasObservers_Dispose1(new ReplaySubject()); + HasObservers_Dispose1(new ReplaySubject(1)); + HasObservers_Dispose1(new ReplaySubject(3)); + HasObservers_Dispose1(new ReplaySubject(TimeSpan.FromSeconds(1))); + } + private static void HasObservers_Dispose1(ReplaySubject s) + { Assert.IsFalse(s.HasObservers); var d = s.Subscribe(_ => { }); @@ -543,7 +1524,13 @@ public void HasObservers_Dispose1() [TestMethod] public void HasObservers_Dispose2() { - var s = new ReplaySubject(); + HasObservers_Dispose2(new ReplaySubject()); + HasObservers_Dispose2(new ReplaySubject(1)); + HasObservers_Dispose2(new ReplaySubject(3)); + HasObservers_Dispose2(new ReplaySubject(TimeSpan.FromSeconds(1))); + } + private static void HasObservers_Dispose2(ReplaySubject s) + { Assert.IsFalse(s.HasObservers); var d = s.Subscribe(_ => { }); @@ -559,7 +1546,13 @@ public void HasObservers_Dispose2() [TestMethod] public void HasObservers_Dispose3() { - var s = new ReplaySubject(); + HasObservers_Dispose3(new ReplaySubject()); + HasObservers_Dispose3(new ReplaySubject(1)); + HasObservers_Dispose3(new ReplaySubject(3)); + HasObservers_Dispose3(new ReplaySubject(TimeSpan.FromSeconds(1))); + } + private static void HasObservers_Dispose3(ReplaySubject s) + { Assert.IsFalse(s.HasObservers); s.Dispose(); @@ -569,7 +1562,13 @@ public void HasObservers_Dispose3() [TestMethod] public void HasObservers_OnCompleted() { - var s = new ReplaySubject(); + HasObservers_OnCompleted(new ReplaySubject()); + HasObservers_OnCompleted(new ReplaySubject(1)); + HasObservers_OnCompleted(new ReplaySubject(3)); + HasObservers_OnCompleted(new ReplaySubject(TimeSpan.FromSeconds(1))); + } + private static void HasObservers_OnCompleted(ReplaySubject s) + { Assert.IsFalse(s.HasObservers); var d = s.Subscribe(_ => { }); @@ -585,7 +1584,13 @@ public void HasObservers_OnCompleted() [TestMethod] public void HasObservers_OnError() { - var s = new ReplaySubject(); + HasObservers_OnError(new ReplaySubject()); + HasObservers_OnError(new ReplaySubject(1)); + HasObservers_OnError(new ReplaySubject(3)); + HasObservers_OnError(new ReplaySubject(TimeSpan.FromSeconds(1))); + } + private static void HasObservers_OnError(ReplaySubject s) + { Assert.IsFalse(s.HasObservers); var d = s.Subscribe(_ => { }, ex => { }); @@ -597,5 +1602,166 @@ public void HasObservers_OnError() s.OnError(new Exception()); Assert.IsFalse(s.HasObservers); } + + + //Potentially already covered by Finite_* tests + [TestMethod] + public void Completed_to_late_subscriber_ReplayAll() + { + var s = new ReplaySubject(); + s.OnNext(1); + s.OnNext(2); + s.OnCompleted(); + + var scheduler = new TestScheduler(); + var observer = scheduler.CreateObserver(); + s.Subscribe(observer); + + Assert.AreEqual(3, observer.Messages.Count); + + Assert.AreEqual(1, observer.Messages[0].Value.Value); + Assert.AreEqual(2, observer.Messages[1].Value.Value); + Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind); + } + [TestMethod] + public void Completed_to_late_subscriber_ReplayOne() + { + var s = new ReplaySubject(1); + s.OnNext(1); + s.OnNext(2); + s.OnCompleted(); + + var scheduler = new TestScheduler(); + var observer = scheduler.CreateObserver(); + s.Subscribe(observer); + + Assert.AreEqual(2, observer.Messages.Count); + + Assert.AreEqual(2, observer.Messages[0].Value.Value); + Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[1].Value.Kind); + } + [TestMethod] + public void Completed_to_late_subscriber_ReplayMany() + { + var s = new ReplaySubject(2); + s.OnNext(1); + s.OnNext(2); + s.OnNext(3); + s.OnCompleted(); + + var scheduler = new TestScheduler(); + var observer = scheduler.CreateObserver(); + s.Subscribe(observer); + + Assert.AreEqual(3, observer.Messages.Count); + + Assert.AreEqual(2, observer.Messages[0].Value.Value); + Assert.AreEqual(3, observer.Messages[1].Value.Value); + Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind); + } + [TestMethod] + public void Completed_to_late_subscriber_ReplayByTime() + { + var s = new ReplaySubject(TimeSpan.FromMinutes(1)); + s.OnNext(1); + s.OnNext(2); + s.OnNext(3); + s.OnCompleted(); + + var scheduler = new TestScheduler(); + var observer = scheduler.CreateObserver(); + s.Subscribe(observer); + + Assert.AreEqual(4, observer.Messages.Count); + + Assert.AreEqual(1, observer.Messages[0].Value.Value); + Assert.AreEqual(2, observer.Messages[1].Value.Value); + Assert.AreEqual(3, observer.Messages[2].Value.Value); + Assert.AreEqual(NotificationKind.OnCompleted, observer.Messages[3].Value.Kind); + } + + //Potentially already covered by Error_* tests + [TestMethod] + public void Errored_to_late_subscriber_ReplayAll() + { + var expectedException = new Exception("Test"); + var s = new ReplaySubject(); + s.OnNext(1); + s.OnNext(2); + s.OnError(expectedException); + + var scheduler = new TestScheduler(); + var observer = scheduler.CreateObserver(); + s.Subscribe(observer); + + Assert.AreEqual(3, observer.Messages.Count); + + Assert.AreEqual(1, observer.Messages[0].Value.Value); + Assert.AreEqual(2, observer.Messages[1].Value.Value); + Assert.AreEqual(NotificationKind.OnError, observer.Messages[2].Value.Kind); + Assert.AreEqual(expectedException, observer.Messages[2].Value.Exception); + } + [TestMethod] + public void Errored_to_late_subscriber_ReplayOne() + { + var expectedException = new Exception("Test"); + var s = new ReplaySubject(1); + s.OnNext(1); + s.OnNext(2); + s.OnError(expectedException); + + var scheduler = new TestScheduler(); + var observer = scheduler.CreateObserver(); + s.Subscribe(observer); + + Assert.AreEqual(2, observer.Messages.Count); + + Assert.AreEqual(2, observer.Messages[0].Value.Value); + Assert.AreEqual(NotificationKind.OnError, observer.Messages[1].Value.Kind); + Assert.AreEqual(expectedException, observer.Messages[1].Value.Exception); + } + [TestMethod] + public void Errored_to_late_subscriber_ReplayMany() + { + var expectedException = new Exception("Test"); + var s = new ReplaySubject(2); + s.OnNext(1); + s.OnNext(2); + s.OnNext(3); + s.OnError(expectedException); + + var scheduler = new TestScheduler(); + var observer = scheduler.CreateObserver(); + s.Subscribe(observer); + + Assert.AreEqual(3, observer.Messages.Count); + + Assert.AreEqual(2, observer.Messages[0].Value.Value); + Assert.AreEqual(3, observer.Messages[1].Value.Value); + Assert.AreEqual(NotificationKind.OnError, observer.Messages[2].Value.Kind); + Assert.AreEqual(expectedException, observer.Messages[2].Value.Exception); + } + [TestMethod] + public void Errored_to_late_subscriber_ReplayByTime() + { + var expectedException = new Exception("Test"); + var s = new ReplaySubject(TimeSpan.FromMinutes(1)); + s.OnNext(1); + s.OnNext(2); + s.OnNext(3); + s.OnError(expectedException); + + var scheduler = new TestScheduler(); + var observer = scheduler.CreateObserver(); + s.Subscribe(observer); + + Assert.AreEqual(4, observer.Messages.Count); + + Assert.AreEqual(1, observer.Messages[0].Value.Value); + Assert.AreEqual(2, observer.Messages[1].Value.Value); + Assert.AreEqual(3, observer.Messages[2].Value.Value); + Assert.AreEqual(NotificationKind.OnError, observer.Messages[3].Value.Kind); + Assert.AreEqual(expectedException, observer.Messages[3].Value.Exception); + } } }