diff --git a/Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs b/Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs
index 8f08d51a1e..f3407f70da 100644
--- a/Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs
+++ b/Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Reactive.Concurrency;
+using System.Threading;
namespace System.Reactive.Subjects
{
@@ -12,21 +13,7 @@ namespace System.Reactive.Subjects
/// The type of the elements processed by the subject.
public sealed class ReplaySubject : ISubject, IDisposable
{
- private const int InfiniteBufferSize = int.MaxValue;
-
- private readonly int _bufferSize;
- private readonly TimeSpan _window;
- private readonly IScheduler _scheduler;
- private readonly IStopwatch _stopwatch;
-
- private readonly Queue> _queue;
- private bool _isStopped;
- private Exception _error;
-
- private ImmutableList> _observers;
- private bool _isDisposed;
-
- private readonly object _gate = new object();
+ private readonly IReplaySubjectImplementation _implementation;
///
/// Initializes a new instance of the class with the specified buffer size, window and scheduler.
@@ -38,23 +25,7 @@ public sealed class ReplaySubject : ISubject, IDisposable
/// is null.
public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
- if (bufferSize < 0)
- throw new ArgumentOutOfRangeException("bufferSize");
- if (window < TimeSpan.Zero)
- throw new ArgumentOutOfRangeException("window");
- if (scheduler == null)
- throw new ArgumentNullException("scheduler");
-
- _bufferSize = bufferSize;
- _window = window;
- _scheduler = scheduler;
-
- _stopwatch = _scheduler.StartStopwatch();
- _queue = new Queue>();
- _isStopped = false;
- _error = null;
-
- _observers = new ImmutableList>();
+ _implementation = new ReplayByTime(bufferSize, window, scheduler);
}
///
@@ -64,16 +35,16 @@ public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
/// Maximum time length of the replay buffer.
/// is less than zero. -or- is less than TimeSpan.Zero.
public ReplaySubject(int bufferSize, TimeSpan window)
- : this(bufferSize, window, SchedulerDefaults.Iteration)
{
+ _implementation = new ReplayByTime(bufferSize, window);
}
///
/// Initializes a new instance of the class.
///
public ReplaySubject()
- : this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{
+ _implementation = new ReplayAll();
}
///
@@ -82,10 +53,12 @@ public ReplaySubject()
/// Scheduler the observers are invoked on.
/// is null.
public ReplaySubject(IScheduler scheduler)
- : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
{
+ _implementation = new ReplayByTime(scheduler);
}
+ //TODO: Does this overload make any sense with the optimisations? Surely this now is just new ReplaySubject(bufferSize).SubscribeOn(scheduler)?
+ //Potentially should be marked as obsolete
///
/// Initializes a new instance of the class with the specified buffer size and scheduler.
///
@@ -94,8 +67,8 @@ public ReplaySubject(IScheduler scheduler)
/// is null.
/// is less than zero.
public ReplaySubject(int bufferSize, IScheduler scheduler)
- : this(bufferSize, TimeSpan.MaxValue, scheduler)
{
+ _implementation = new ReplayByTime(bufferSize, scheduler);
}
///
@@ -104,8 +77,19 @@ public ReplaySubject(int bufferSize, IScheduler scheduler)
/// Maximum element count of the replay buffer.
/// is less than zero.
public ReplaySubject(int bufferSize)
- : this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{
+ switch (bufferSize)
+ {
+ case 1:
+ _implementation = new ReplayOne();
+ break;
+ case int.MaxValue:
+ _implementation = new ReplayAll();
+ break;
+ default:
+ _implementation = new ReplayMany(bufferSize);
+ break;
+ }
}
///
@@ -116,8 +100,8 @@ public ReplaySubject(int bufferSize)
/// is null.
/// is less than TimeSpan.Zero.
public ReplaySubject(TimeSpan window, IScheduler scheduler)
- : this(InfiniteBufferSize, window, scheduler)
{
+ _implementation = new ReplayByTime(window, scheduler);
}
///
@@ -126,8 +110,8 @@ public ReplaySubject(TimeSpan window, IScheduler scheduler)
/// Maximum time length of the replay buffer.
/// is less than TimeSpan.Zero.
public ReplaySubject(TimeSpan window)
- : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
{
+ _implementation = new ReplayByTime(window);
}
///
@@ -135,19 +119,7 @@ public ReplaySubject(TimeSpan window)
///
public bool HasObservers
{
- get
- {
- var observers = _observers;
- return observers != null && observers.Data.Length > 0;
- }
- }
-
- void Trim(TimeSpan now)
- {
- while (_queue.Count > _bufferSize)
- _queue.Dequeue();
- while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
- _queue.Dequeue();
+ get { return _implementation.HasObservers; }
}
///
@@ -156,26 +128,7 @@ void Trim(TimeSpan now)
/// The value to send to all observers.
public void OnNext(T value)
{
- var o = default(ScheduledObserver[]);
- lock (_gate)
- {
- CheckDisposed();
-
- if (!_isStopped)
- {
- var now = _stopwatch.Elapsed;
- _queue.Enqueue(new TimeInterval(value, now));
- Trim(now);
-
- o = _observers.Data;
- foreach (var observer in o)
- observer.OnNext(value);
- }
- }
-
- if (o != null)
- foreach (var observer in o)
- observer.EnsureActive();
+ _implementation.OnNext(value);
}
///
@@ -185,169 +138,603 @@ public void OnNext(T value)
/// is null.
public void OnError(Exception error)
{
- if (error == null)
- throw new ArgumentNullException("error");
+ _implementation.OnError(error);
+ }
- var o = default(ScheduledObserver[]);
- lock (_gate)
- {
- CheckDisposed();
+ ///
+ /// Notifies all subscribed and future observers about the end of the sequence.
+ ///
+ public void OnCompleted()
+ {
+ _implementation.OnCompleted();
+ }
- if (!_isStopped)
- {
- var now = _stopwatch.Elapsed;
- _isStopped = true;
- _error = error;
- Trim(now);
+ ///
+ /// Subscribes an observer to the subject.
+ ///
+ /// Observer to subscribe to the subject.
+ /// Disposable object that can be used to unsubscribe the observer from the subject.
+ /// is null.
+ public IDisposable Subscribe(IObserver observer)
+ {
+ return _implementation.Subscribe(observer);
+ }
- o = _observers.Data;
- foreach (var observer in o)
- observer.OnError(error);
+ ///
+ /// Releases all resources used by the current instance of the class and unsubscribe all observers.
+ ///
+ public void Dispose()
+ {
+ _implementation.Dispose();
+ }
- _observers = new ImmutableList>();
- }
+ private interface IReplaySubjectImplementation : ISubject, IDisposable
+ {
+ bool HasObservers { get; }
+ void Unsubscribe(IObserver observer);
+ }
+
+ private class Subscription : IDisposable
+ {
+ private IReplaySubjectImplementation _subject;
+ private IObserver _observer;
+
+ public Subscription(IReplaySubjectImplementation subject, IObserver observer)
+ {
+ _subject = subject;
+ _observer = observer;
}
- if (o != null)
- foreach (var observer in o)
- observer.EnsureActive();
+ public void Dispose()
+ {
+ var observer = Interlocked.Exchange(ref _observer, null);
+ if (observer == null)
+ return;
+
+ _subject.Unsubscribe(observer);
+ _subject = null;
+ }
}
- ///
- /// Notifies all subscribed and future observers about the end of the sequence.
- ///
- public void OnCompleted()
+ //Original implementation of the ReplaySubject with time based operations (Scheduling, Stopwatch, buffer-by-time).
+ private sealed class ReplayByTime : IReplaySubjectImplementation
{
- var o = default(ScheduledObserver[]);
- lock (_gate)
+ private const int InfiniteBufferSize = int.MaxValue;
+
+ private readonly int _bufferSize;
+ private readonly TimeSpan _window;
+ private readonly IScheduler _scheduler;
+ private readonly IStopwatch _stopwatch;
+
+ private readonly Queue> _queue;
+ private bool _isStopped;
+ private Exception _error;
+
+ private ImmutableList> _observers;
+ private bool _isDisposed;
+
+ private readonly object _gate = new object();
+
+ public ReplayByTime(int bufferSize, TimeSpan window, IScheduler scheduler)
+ {
+ if (bufferSize < 0)
+ throw new ArgumentOutOfRangeException("bufferSize");
+ if (window < TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException("window");
+ if (scheduler == null)
+ throw new ArgumentNullException("scheduler");
+
+ _bufferSize = bufferSize;
+ _window = window;
+ _scheduler = scheduler;
+
+ _stopwatch = _scheduler.StartStopwatch();
+ _queue = new Queue>();
+ _isStopped = false;
+ _error = null;
+
+ _observers = new ImmutableList>();
+ }
+
+ public ReplayByTime(int bufferSize, TimeSpan window)
+ : this(bufferSize, window, SchedulerDefaults.Iteration)
+ {
+ }
+
+ public ReplayByTime(IScheduler scheduler)
+ : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
+ {
+ }
+
+ public ReplayByTime(int bufferSize, IScheduler scheduler)
+ : this(bufferSize, TimeSpan.MaxValue, scheduler)
+ {
+ }
+
+ public ReplayByTime(TimeSpan window, IScheduler scheduler)
+ : this(InfiniteBufferSize, window, scheduler)
+ {
+ }
+
+ public ReplayByTime(TimeSpan window)
+ : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
{
- CheckDisposed();
+ }
+
+ public bool HasObservers
+ {
+ get
+ {
+ var observers = _observers;
+ return observers != null && observers.Data.Length > 0;
+ }
+ }
+
+ private void Trim(TimeSpan now)
+ {
+ while (_queue.Count > _bufferSize)
+ _queue.Dequeue();
+ while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
+ _queue.Dequeue();
+ }
- if (!_isStopped)
+ public void OnNext(T value)
+ {
+ var o = default(ScheduledObserver[]);
+ lock (_gate)
{
- var now = _stopwatch.Elapsed;
- _isStopped = true;
- Trim(now);
+ CheckDisposed();
+
+ if (!_isStopped)
+ {
+ var now = _stopwatch.Elapsed;
+ _queue.Enqueue(new TimeInterval(value, now));
+ Trim(now);
+
+ o = _observers.Data;
+ foreach (var observer in o)
+ observer.OnNext(value);
+ }
+ }
- o = _observers.Data;
+ if (o != null)
foreach (var observer in o)
- observer.OnCompleted();
+ observer.EnsureActive();
+ }
+
+ public void OnError(Exception error)
+ {
+ if (error == null)
+ throw new ArgumentNullException("error");
- _observers = new ImmutableList>();
+ var o = default(ScheduledObserver[]);
+ lock (_gate)
+ {
+ CheckDisposed();
+
+ if (!_isStopped)
+ {
+ var now = _stopwatch.Elapsed;
+ _isStopped = true;
+ _error = error;
+ Trim(now);
+
+ o = _observers.Data;
+ foreach (var observer in o)
+ observer.OnError(error);
+
+ _observers = new ImmutableList>();
+ }
}
+
+ if (o != null)
+ foreach (var observer in o)
+ observer.EnsureActive();
}
- if (o != null)
- foreach (var observer in o)
- observer.EnsureActive();
- }
+ public void OnCompleted()
+ {
+ var o = default(ScheduledObserver[]);
+ lock (_gate)
+ {
+ CheckDisposed();
- ///
- /// Subscribes an observer to the subject.
- ///
- /// Observer to subscribe to the subject.
- /// Disposable object that can be used to unsubscribe the observer from the subject.
- /// is null.
- public IDisposable Subscribe(IObserver observer)
- {
- if (observer == null)
- throw new ArgumentNullException("observer");
-
- var so = new ScheduledObserver(_scheduler, observer);
-
- var n = 0;
-
- var subscription = new RemovableDisposable(this, so);
- lock (_gate)
- {
- CheckDisposed();
-
- //
- // Notice the v1.x behavior of always calling Trim is preserved here.
- //
- // This may be subject (pun intended) of debate: should this policy
- // only be applied while the sequence is active? With the current
- // behavior, a sequence will "die out" after it has terminated by
- // continuing to drop OnNext notifications from the queue.
- //
- // In v1.x, this behavior was due to trimming based on the clock value
- // returned by scheduler.Now, applied to all but the terminal message
- // in the queue. Using the IStopwatch has the same effect. Either way,
- // we guarantee the final notification will be observed, but there's
- // no way to retain the buffer directly. One approach is to use the
- // time-based TakeLast operator and apply an unbounded ReplaySubject
- // to it.
- //
- // To conclude, we're keeping the behavior as-is for compatibility
- // reasons with v1.x.
- //
- Trim(_stopwatch.Elapsed);
- _observers = _observers.Add(so);
-
- n = _queue.Count;
- foreach (var item in _queue)
- so.OnNext(item.Value);
+ if (!_isStopped)
+ {
+ var now = _stopwatch.Elapsed;
+ _isStopped = true;
+ Trim(now);
+
+ o = _observers.Data;
+ foreach (var observer in o)
+ observer.OnCompleted();
+
+ _observers = new ImmutableList>();
+ }
+ }
- if (_error != null)
+ if (o != null)
+ foreach (var observer in o)
+ observer.EnsureActive();
+ }
+
+ public IDisposable Subscribe(IObserver observer)
+ {
+ if (observer == null)
+ throw new ArgumentNullException("observer");
+
+ var so = new ScheduledObserver(_scheduler, observer);
+
+ var n = 0;
+
+ //var subscription = new Subscription(this, so);
+ var subscription = new RemovableDisposable(this, so);
+ lock (_gate)
{
- n++;
- so.OnError(_error);
+ CheckDisposed();
+
+ //
+ // Notice the v1.x behavior of always calling Trim is preserved here.
+ //
+ // This may be subject (pun intended) of debate: should this policy
+ // only be applied while the sequence is active? With the current
+ // behavior, a sequence will "die out" after it has terminated by
+ // continuing to drop OnNext notifications from the queue.
+ //
+ // In v1.x, this behavior was due to trimming based on the clock value
+ // returned by scheduler.Now, applied to all but the terminal message
+ // in the queue. Using the IStopwatch has the same effect. Either way,
+ // we guarantee the final notification will be observed, but there's
+ // no way to retain the buffer directly. One approach is to use the
+ // time-based TakeLast operator and apply an unbounded ReplaySubject
+ // to it.
+ //
+ // To conclude, we're keeping the behavior as-is for compatibility
+ // reasons with v1.x.
+ //
+ Trim(_stopwatch.Elapsed);
+ _observers = _observers.Add(so);
+
+ n = _queue.Count;
+ foreach (var item in _queue)
+ so.OnNext(item.Value);
+
+ if (_error != null)
+ {
+ n++;
+ so.OnError(_error);
+ }
+ else if (_isStopped)
+ {
+ n++;
+ so.OnCompleted();
+ }
}
- else if (_isStopped)
+
+ so.EnsureActive(n);
+
+ return subscription;
+ }
+
+ //public void Unsubscribe(IObserver observer)
+ public void Unsubscribe(ScheduledObserver observer)
+ {
+ lock (_gate)
+ {
+ //var so = (ScheduledObserver)observer;
+ //so.Dispose();
+ if (!_isDisposed)
+ _observers = _observers.Remove(observer);
+ }
+ }
+ //public void Unsubscribe(IObserver observer)
+ void IReplaySubjectImplementation.Unsubscribe(IObserver observer)
+ {
+ var so = (ScheduledObserver)observer;
+ Unsubscribe(so);
+ }
+
+ sealed class RemovableDisposable : IDisposable
+ {
+ private readonly ReplayByTime _subject;
+ private readonly ScheduledObserver _observer;
+
+ public RemovableDisposable(ReplayByTime subject, ScheduledObserver observer)
+ {
+ _subject = subject;
+ _observer = observer;
+ }
+
+ public void Dispose()
{
- n++;
- so.OnCompleted();
+ _observer.Dispose();
+ _subject.Unsubscribe(_observer);
}
}
- so.EnsureActive(n);
+ private void CheckDisposed()
+ {
+ if (_isDisposed)
+ throw new ObjectDisposedException(string.Empty);
+ }
- return subscription;
+ public void Dispose()
+ {
+ lock (_gate)
+ {
+ _isDisposed = true;
+ _observers = null;
+ //_queue.Clear();
+ }
+ }
}
- void Unsubscribe(ScheduledObserver observer)
+
+
+
+
+
+
+
+
+
+
+
+
+ //Below are the non-time based implementations.
+ //These removed the need for the scheduler indirection, SchedulerObservers, stopwatch, TimeInterval and ensuring the scheduled observers are active after each action.
+ //The ReplayOne implementation also removes the need to even have a queue.
+ private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
{
- lock (_gate)
+ private bool _hasValue;
+ private T _value;
+
+ protected override void Trim()
{
- if (!_isDisposed)
- _observers = _observers.Remove(observer);
+ //NoOp. No need to trim.
+ }
+
+ protected override void AddValueToBuffer(T value)
+ {
+ _hasValue = true;
+ _value = value;
+ }
+
+ protected override void ReplayBuffer(IObserver observer)
+ {
+ if (_hasValue)
+ observer.OnNext(_value);
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+ _value = default(T);
}
}
- sealed class RemovableDisposable : IDisposable
+ private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
{
- private readonly ReplaySubject _subject;
- private readonly ScheduledObserver _observer;
+ private readonly int _bufferSize;
- public RemovableDisposable(ReplaySubject subject, ScheduledObserver observer)
+ public ReplayMany(int bufferSize)
+ : base(bufferSize)
{
- _subject = subject;
- _observer = observer;
+ _bufferSize = bufferSize;
}
- public void Dispose()
+ protected override void Trim()
{
- _observer.Dispose();
- _subject.Unsubscribe(_observer);
+ while (Queue.Count > _bufferSize)
+ Queue.Dequeue();
}
}
- void CheckDisposed()
+ private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
{
- if (_isDisposed)
- throw new ObjectDisposedException(string.Empty);
+ public ReplayAll()
+ : base(0)
+ {
+ }
+
+ protected override void Trim()
+ {
+ //NoOp; i.e. Dont' trim, keep all values.
+ }
}
- ///
- /// Releases all resources used by the current instance of the class and unsubscribe all observers.
- ///
- public void Dispose()
+ private abstract class ReplayBufferBase : IReplaySubjectImplementation
+ {
+ private readonly object _gate = new object();
+ private bool _isDisposed;
+ private bool _isStopped;
+ private Exception _error;
+ private ImmutableList> _observers;
+
+ protected ReplayBufferBase()
+ {
+ _observers = new ImmutableList>();
+ }
+
+ protected abstract void Trim();
+ protected abstract void AddValueToBuffer(T value);
+ protected abstract void ReplayBuffer(IObserver observer);
+
+ public bool HasObservers
+ {
+ get
+ {
+ var observers = _observers;
+ return observers != null && observers.Data.Length > 0;
+ }
+ }
+
+ public void OnNext(T value)
+ {
+ lock (_gate)
+ {
+ CheckDisposed();
+
+ if (!_isStopped)
+ {
+ AddValueToBuffer(value);
+ Trim();
+
+ var o = _observers.Data;
+ foreach (var observer in o)
+ observer.OnNext(value);
+ }
+ }
+ }
+
+ public void OnError(Exception error)
+ {
+ if (error == null)
+ throw new ArgumentNullException("error");
+
+ lock (_gate)
+ {
+ CheckDisposed();
+
+ if (!_isStopped)
+ {
+ _isStopped = true;
+ _error = error;
+ Trim();
+
+ var o = _observers.Data;
+ foreach (var observer in o)
+ observer.OnError(error);
+
+ _observers = new ImmutableList>();
+ }
+ }
+ }
+
+ public void OnCompleted()
+ {
+ lock (_gate)
+ {
+ CheckDisposed();
+
+ if (!_isStopped)
+ {
+ _isStopped = true;
+ Trim();
+
+ var o = _observers.Data;
+ foreach (var observer in o)
+ observer.OnCompleted();
+
+ _observers = new ImmutableList>();
+ }
+ }
+ }
+
+ public IDisposable Subscribe(IObserver observer)
+ {
+ if (observer == null)
+ throw new ArgumentNullException("observer");
+
+
+ var subscription = new Subscription(this, observer);
+ lock (_gate)
+ {
+ CheckDisposed();
+
+ //
+ // Notice the v1.x behavior of always calling Trim is preserved here.
+ //
+ // This may be subject (pun intended) of debate: should this policy
+ // only be applied while the sequence is active? With the current
+ // behavior, a sequence will "die out" after it has terminated by
+ // continuing to drop OnNext notifications from the queue.
+ //
+ // In v1.x, this behavior was due to trimming based on the clock value
+ // returned by scheduler.Now, applied to all but the terminal message
+ // in the queue. Using the IStopwatch has the same effect. Either way,
+ // we guarantee the final notification will be observed, but there's
+ // no way to retain the buffer directly. One approach is to use the
+ // time-based TakeLast operator and apply an unbounded ReplaySubject
+ // to it.
+ //
+ // To conclude, we're keeping the behavior as-is for compatibility
+ // reasons with v1.x.
+ //
+ _observers = _observers.Add(observer);
+
+ ReplayBuffer(observer);
+
+ if (_error != null)
+ {
+ observer.OnError(_error);
+ }
+ else if (_isStopped)
+ {
+ observer.OnCompleted();
+ }
+ }
+
+ return subscription;
+ }
+
+ public void Unsubscribe(IObserver observer)
+ {
+ lock (_gate)
+ {
+ if (!_isDisposed)
+ _observers = _observers.Remove(observer);
+ }
+ }
+
+ private void CheckDisposed()
+ {
+ if (_isDisposed)
+ throw new ObjectDisposedException(string.Empty);
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ }
+ protected virtual void Dispose(bool disposing)
+ {
+ lock (_gate)
+ {
+ _isDisposed = true;
+ _observers = null;
+ }
+ }
+ }
+
+ private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
{
- lock (_gate)
+ private readonly Queue _queue;
+
+ protected ReplayManyBase(int queueSize)
+ : base()
+ {
+ _queue = new Queue(queueSize);
+ }
+
+ protected Queue Queue { get { return _queue; } }
+
+ protected override void AddValueToBuffer(T value)
+ {
+ _queue.Enqueue(value);
+ }
+
+ protected override void ReplayBuffer(IObserver observer)
+ {
+ foreach (var item in _queue)
+ observer.OnNext(item);
+ }
+
+ protected override void Dispose(bool disposing)
{
- _isDisposed = true;
- _observers = null;
+ base.Dispose(disposing);
+ _queue.Clear();
}
}
}
-}
+}
\ No newline at end of file
diff --git a/Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs b/Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs
index 21159271a8..7537e30499 100644
--- a/Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs
+++ b/Rx.NET/Source/Tests.System.Reactive/Tests/Linq/Subjects/ReplaySubjectTest.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System;
+using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Subjects;
using Microsoft.Reactive.Testing;
@@ -16,11 +17,17 @@ public partial class ReplaySubjectTest : ReactiveTest
public void Subscribe_ArgumentChecking()
{
ReactiveAssert.Throws(() => new ReplaySubject().Subscribe(null));
+ ReactiveAssert.Throws(() => new ReplaySubject(1).Subscribe(null));
+ ReactiveAssert.Throws(() => new ReplaySubject(2).Subscribe(null));
+ ReactiveAssert.Throws(() => new ReplaySubject(DummyScheduler.Instance).Subscribe(null));
}
[TestMethod]
public void OnError_ArgumentChecking()
{
+ ReactiveAssert.Throws(() => new ReplaySubject().OnError(null));
+ ReactiveAssert.Throws(() => new ReplaySubject(1).OnError(null));
+ ReactiveAssert.Throws(() => new ReplaySubject(2).OnError(null));
ReactiveAssert.Throws(() => new ReplaySubject(DummyScheduler.Instance).OnError(null));
}
@@ -52,7 +59,7 @@ public void Constructor_ArgumentChecking()
}
[TestMethod]
- public void Infinite()
+ public void Infinite_ReplayByTime()
{
var scheduler = new TestScheduler();
@@ -93,7 +100,6 @@ public void Infinite()
scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
- scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
scheduler.Start();
@@ -118,6 +124,226 @@ public void Infinite()
OnNext(941, 11)
);
}
+ [TestMethod]
+ public void Infinite_ReplayOne()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(70, 1),
+ OnNext(110, 2),
+ OnNext(220, 3),
+ OnNext(270, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnNext(630, 8),
+ OnNext(710, 9),
+ OnNext(870, 10),
+ OnNext(940, 11),
+ OnNext(1020, 12)
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ var results4 = scheduler.CreateObserver();
+ var subscription4 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1));
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1200, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+ scheduler.ScheduleAbsolute(1100, () => subscription4 = subject.Subscribe(results4));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(300, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(400, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnNext(630, 8)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(900, 10),
+ OnNext(940, 11)
+ );
+
+ results4.Messages.AssertEqual(
+ OnNext(1100, 12)
+ );
+ }
+ [TestMethod]
+ public void Infinite_ReplayMany()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(70, 1),
+ OnNext(110, 2),
+ OnNext(220, 3),
+ OnNext(270, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnNext(630, 8),
+ OnNext(710, 9),
+ OnNext(870, 10),
+ OnNext(940, 11),
+ OnNext(1020, 12)
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3));
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(300, 3),
+ OnNext(300, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(400, 3),
+ OnNext(400, 4),
+ OnNext(400, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnNext(630, 8)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(900, 8),
+ OnNext(900, 9),
+ OnNext(900, 10),
+ OnNext(940, 11)
+ );
+ }
+ [TestMethod]
+ public void Infinite_ReplayAll()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(70, 1),
+ OnNext(110, 2),
+ OnNext(220, 3),
+ OnNext(270, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnNext(630, 8),
+ OnNext(710, 9),
+ OnNext(870, 10),
+ OnNext(940, 11),
+ OnNext(1020, 12)
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject());
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(300, 3),
+ OnNext(300, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(400, 3),
+ OnNext(400, 4),
+ OnNext(400, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnNext(630, 8)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(900, 3),
+ OnNext(900, 4),
+ OnNext(900, 5),
+ OnNext(900, 6),
+ OnNext(900, 7),
+ OnNext(900, 8),
+ OnNext(900, 9),
+ OnNext(900, 10),
+ OnNext(940, 11)
+ );
+ }
+
[TestMethod]
public void Infinite2()
@@ -191,7 +417,71 @@ public void Infinite2()
}
[TestMethod]
- public void Finite()
+ public void Finite_ReplayByTime()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(70, 1),
+ OnNext(110, 2),
+ OnNext(220, 3),
+ OnNext(270, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnCompleted(630),
+ OnNext(640, 9),
+ OnCompleted(650),
+ OnError(660, new Exception())
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler));
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(301, 3),
+ OnNext(302, 4),
+ OnNext(341, 5),
+ OnNext(411, 6),
+ OnNext(521, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(401, 5),
+ OnNext(411, 6),
+ OnNext(521, 7),
+ OnCompleted(631)
+ );
+
+ results3.Messages.AssertEqual(
+ OnCompleted(901)
+ );
+ }
+ [TestMethod]
+ public void Finite_ReplayOne()
{
var scheduler = new TestScheduler();
@@ -221,7 +511,533 @@ public void Finite()
var results3 = scheduler.CreateObserver();
var subscription3 = default(IDisposable);
- scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler));
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1));
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(300, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(400, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnCompleted(630)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(900, 7),
+ OnCompleted(900)
+ );
+ }
+ [TestMethod]
+ public void Finite_ReplayMany()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(70, 1),
+ OnNext(110, 2),
+ OnNext(220, 3),
+ OnNext(270, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnCompleted(630),
+ OnNext(640, 9),
+ OnCompleted(650),
+ OnError(660, new Exception())
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3));
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(300, 3),
+ OnNext(300, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(400, 3),
+ OnNext(400, 4),
+ OnNext(400, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnCompleted(630)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(900, 5),
+ OnNext(900, 6),
+ OnNext(900, 7),
+ OnCompleted(900)
+ );
+ }
+ [TestMethod]
+ public void Finite_ReplayAll()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(70, 1),
+ OnNext(110, 2),
+ OnNext(220, 3),
+ OnNext(270, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnCompleted(630),
+ OnNext(640, 9),
+ OnCompleted(650),
+ OnError(660, new Exception())
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject());
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(300, 3),
+ OnNext(300, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(400, 3),
+ OnNext(400, 4),
+ OnNext(400, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnCompleted(630)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(900, 3),
+ OnNext(900, 4),
+ OnNext(900, 5),
+ OnNext(900, 6),
+ OnNext(900, 7),
+ OnCompleted(900)
+ );
+ }
+
+ [TestMethod]
+ public void Error_ReplayByTime()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(70, 1),
+ OnNext(110, 2),
+ OnNext(220, 3),
+ OnNext(270, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnError(630, ex),
+ OnNext(640, 9),
+ OnCompleted(650),
+ OnError(660, new Exception())
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler));
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(301, 3),
+ OnNext(302, 4),
+ OnNext(341, 5),
+ OnNext(411, 6),
+ OnNext(521, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(401, 5),
+ OnNext(411, 6),
+ OnNext(521, 7),
+ OnError(631, ex)
+ );
+
+ results3.Messages.AssertEqual(
+ OnError(901, ex)
+ );
+ }
+ [TestMethod]
+ public void Error_ReplayOne()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(70, 1),
+ OnNext(110, 2),
+ OnNext(220, 3),
+ OnNext(270, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnError(630, ex),
+ OnNext(640, 9),
+ OnCompleted(650),
+ OnError(660, new Exception())
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1));
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(300, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(400, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnError(630, ex)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(900, 7),
+ OnError(900, ex)
+ );
+ }
+ [TestMethod]
+ public void Error_ReplayMany()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(70, 1),
+ OnNext(110, 2),
+ OnNext(220, 3),
+ OnNext(270, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnError(630, ex),
+ OnNext(640, 9),
+ OnCompleted(650),
+ OnError(660, new Exception())
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3));
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(300, 3),
+ OnNext(300, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(400, 3),
+ OnNext(400, 4),
+ OnNext(400, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnError(630, ex)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(900, 5),
+ OnNext(900, 6),
+ OnNext(900, 7),
+ OnError(900, ex)
+ );
+ }
+ [TestMethod]
+ public void Error_ReplayAll()
+ {
+ var scheduler = new TestScheduler();
+
+ var ex = new Exception();
+
+ var xs = scheduler.CreateHotObservable(
+ OnNext(70, 1),
+ OnNext(110, 2),
+ OnNext(220, 3),
+ OnNext(270, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnError(630, ex),
+ OnNext(640, 9),
+ OnCompleted(650),
+ OnError(660, new Exception())
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject());
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(300, 3),
+ OnNext(300, 4),
+ OnNext(340, 5),
+ OnNext(410, 6),
+ OnNext(520, 7)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(400, 3),
+ OnNext(400, 4),
+ OnNext(400, 5),
+ OnNext(410, 6),
+ OnNext(520, 7),
+ OnError(630, ex)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(900, 3),
+ OnNext(900, 4),
+ OnNext(900, 5),
+ OnNext(900, 6),
+ OnNext(900, 7),
+ OnError(900, ex)
+ );
+ }
+
+ [TestMethod]
+ public void Canceled_ReplayByTime()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnCompleted(630),
+ OnNext(640, 9),
+ OnCompleted(650),
+ OnError(660, new Exception())
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler));
+ scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
+ scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
+
+ scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3));
+
+ scheduler.ScheduleAbsolute(600, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(800, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(950, () => subscription3.Dispose());
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ );
+
+ results2.Messages.AssertEqual(
+ OnCompleted(631)
+ );
+
+ results3.Messages.AssertEqual(
+ OnCompleted(901)
+ );
+ }
+ [TestMethod]
+ public void Canceled_ReplayOne()
+ {
+ var scheduler = new TestScheduler();
+
+ var xs = scheduler.CreateHotObservable(
+ OnCompleted(630),
+ OnNext(640, 9),
+ OnCompleted(650),
+ OnError(660, new Exception())
+ );
+
+ var subject = default(ReplaySubject);
+ var subscription = default(IDisposable);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1));
scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
@@ -237,41 +1053,23 @@ public void Finite()
scheduler.Start();
results1.Messages.AssertEqual(
- OnNext(301, 3),
- OnNext(302, 4),
- OnNext(341, 5),
- OnNext(411, 6),
- OnNext(521, 7)
);
results2.Messages.AssertEqual(
- OnNext(401, 5),
- OnNext(411, 6),
- OnNext(521, 7),
- OnCompleted(631)
+ OnCompleted(630)
);
results3.Messages.AssertEqual(
- OnCompleted(901)
+ OnCompleted(900)
);
}
-
[TestMethod]
- public void Error()
+ public void Canceled_ReplayMany()
{
var scheduler = new TestScheduler();
- var ex = new Exception();
-
var xs = scheduler.CreateHotObservable(
- OnNext(70, 1),
- OnNext(110, 2),
- OnNext(220, 3),
- OnNext(270, 4),
- OnNext(340, 5),
- OnNext(410, 6),
- OnNext(520, 7),
- OnError(630, ex),
+ OnCompleted(630),
OnNext(640, 9),
OnCompleted(650),
OnError(660, new Exception())
@@ -289,7 +1087,7 @@ public void Error()
var results3 = scheduler.CreateObserver();
var subscription3 = default(IDisposable);
- scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler));
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3));
scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
@@ -305,27 +1103,18 @@ public void Error()
scheduler.Start();
results1.Messages.AssertEqual(
- OnNext(301, 3),
- OnNext(302, 4),
- OnNext(341, 5),
- OnNext(411, 6),
- OnNext(521, 7)
);
results2.Messages.AssertEqual(
- OnNext(401, 5),
- OnNext(411, 6),
- OnNext(521, 7),
- OnError(631, ex)
+ OnCompleted(630)
);
results3.Messages.AssertEqual(
- OnError(901, ex)
+ OnCompleted(900)
);
}
-
[TestMethod]
- public void Canceled()
+ public void Canceled_ReplayAll()
{
var scheduler = new TestScheduler();
@@ -348,7 +1137,7 @@ public void Canceled()
var results3 = scheduler.CreateObserver();
var subscription3 = default(IDisposable);
- scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler));
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject());
scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject));
scheduler.ScheduleAbsolute(1000, () => subscription.Dispose());
@@ -367,11 +1156,11 @@ public void Canceled()
);
results2.Messages.AssertEqual(
- OnCompleted(631)
+ OnCompleted(630)
);
results3.Messages.AssertEqual(
- OnCompleted(901)
+ OnCompleted(900)
);
}
@@ -435,7 +1224,187 @@ public void SubjectDisposed()
OnNext(551, 5)
);
}
+ [TestMethod]
+ public void SubjectDisposed_ReplayOne()
+ {
+ var scheduler = new TestScheduler();
+
+ var subject = default(ReplaySubject);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1));
+ scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
+ scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(600, () => subject.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
+
+ scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
+ scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
+ scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
+ scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
+ scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
+ scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6)));
+ scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted()));
+ scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception())));
+ scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe()));
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(200, 1),
+ OnNext(250, 2),
+ OnNext(350, 3),
+ OnNext(450, 4)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(300, 2),
+ OnNext(350, 3),
+ OnNext(450, 4),
+ OnNext(550, 5)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(400, 3),
+ OnNext(450, 4),
+ OnNext(550, 5)
+ );
+ }
+ [TestMethod]
+ public void SubjectDisposed_ReplayMany()
+ {
+ var scheduler = new TestScheduler();
+
+ var subject = default(ReplaySubject);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3));
+ scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
+ scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(600, () => subject.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
+
+ scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
+ scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
+ scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
+ scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
+ scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
+ scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6)));
+ scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted()));
+ scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception())));
+ scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe()));
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(200, 1),
+ OnNext(250, 2),
+ OnNext(350, 3),
+ OnNext(450, 4)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(300, 1),
+ OnNext(300, 2),
+ OnNext(350, 3),
+ OnNext(450, 4),
+ OnNext(550, 5)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(400, 1),
+ OnNext(400, 2),
+ OnNext(400, 3),
+ OnNext(450, 4),
+ OnNext(550, 5)
+ );
+ }
+ [TestMethod]
+ public void SubjectDisposed_ReplayAll()
+ {
+ var scheduler = new TestScheduler();
+
+ var subject = default(ReplaySubject);
+
+ var results1 = scheduler.CreateObserver();
+ var subscription1 = default(IDisposable);
+
+ var results2 = scheduler.CreateObserver();
+ var subscription2 = default(IDisposable);
+
+ var results3 = scheduler.CreateObserver();
+ var subscription3 = default(IDisposable);
+
+ scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject());
+ scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1));
+ scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2));
+ scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3));
+ scheduler.ScheduleAbsolute(500, () => subscription1.Dispose());
+ scheduler.ScheduleAbsolute(600, () => subject.Dispose());
+ scheduler.ScheduleAbsolute(700, () => subscription2.Dispose());
+ scheduler.ScheduleAbsolute(800, () => subscription3.Dispose());
+
+ scheduler.ScheduleAbsolute(150, () => subject.OnNext(1));
+ scheduler.ScheduleAbsolute(250, () => subject.OnNext(2));
+ scheduler.ScheduleAbsolute(350, () => subject.OnNext(3));
+ scheduler.ScheduleAbsolute(450, () => subject.OnNext(4));
+ scheduler.ScheduleAbsolute(550, () => subject.OnNext(5));
+ scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6)));
+ scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted()));
+ scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception())));
+ scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe()));
+
+ scheduler.Start();
+
+ results1.Messages.AssertEqual(
+ OnNext(200, 1),
+ OnNext(250, 2),
+ OnNext(350, 3),
+ OnNext(450, 4)
+ );
+
+ results2.Messages.AssertEqual(
+ OnNext(300, 1),
+ OnNext(300, 2),
+ OnNext(350, 3),
+ OnNext(450, 4),
+ OnNext(550, 5)
+ );
+
+ results3.Messages.AssertEqual(
+ OnNext(400, 1),
+ OnNext(400, 2),
+ OnNext(400, 3),
+ OnNext(450, 4),
+ OnNext(550, 5)
+ );
+ }
+ //TODO: Create a failing test for this for the other implementations (ReplayOne/Many/All).
+ //I Don't understand the behavior.
+ //I think it may have to do with calling Trim() on Subscription (as well as in the OnNext calls). -LC
[TestMethod]
public void ReplaySubjectDiesOut()
{
@@ -502,7 +1471,13 @@ public void ReplaySubjectDiesOut()
[TestMethod]
public void HasObservers()
{
- var s = new ReplaySubject();
+ HasObservers(new ReplaySubject());
+ HasObservers(new ReplaySubject(1));
+ HasObservers(new ReplaySubject(3));
+ HasObservers(new ReplaySubject(TimeSpan.FromSeconds(1)));
+ }
+ private static void HasObservers(ReplaySubject s)
+ {
Assert.IsFalse(s.HasObservers);
var d1 = s.Subscribe(_ => { });
@@ -527,7 +1502,13 @@ public void HasObservers()
[TestMethod]
public void HasObservers_Dispose1()
{
- var s = new ReplaySubject();
+ HasObservers_Dispose1(new ReplaySubject());
+ HasObservers_Dispose1(new ReplaySubject(1));
+ HasObservers_Dispose1(new ReplaySubject(3));
+ HasObservers_Dispose1(new ReplaySubject(TimeSpan.FromSeconds(1)));
+ }
+ private static void HasObservers_Dispose1(ReplaySubject s)
+ {
Assert.IsFalse(s.HasObservers);
var d = s.Subscribe(_ => { });
@@ -543,7 +1524,13 @@ public void HasObservers_Dispose1()
[TestMethod]
public void HasObservers_Dispose2()
{
- var s = new ReplaySubject();
+ HasObservers_Dispose2(new ReplaySubject());
+ HasObservers_Dispose2(new ReplaySubject