diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs index 5a43be401c..3918949989 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs @@ -120,6 +120,8 @@ internal sealed class _ : IdentitySink { private readonly TimeSpan _period; + long _index; + public _(TimeSpan period, IObserver observer) : base(observer) { @@ -128,7 +130,7 @@ public _(TimeSpan period, IObserver observer) public void Run(Periodic parent, DateTimeOffset dueTime) { - SetUpstream(parent._scheduler.Schedule(default(object), dueTime, InvokeStart)); + SetUpstream(parent._scheduler.Schedule(this, dueTime, (innerScheduler, @this) => @this.InvokeStart(innerScheduler))); } public void Run(Periodic parent, TimeSpan dueTime) @@ -137,9 +139,9 @@ public void Run(Periodic parent, TimeSpan dueTime) // Optimize for the case of Observable.Interval. // if (dueTime == _period) - SetUpstream(parent._scheduler.SchedulePeriodic(0L, _period, (Func)Tick)); + SetUpstream(parent._scheduler.SchedulePeriodic(this, _period, @this => @this.Tick())); else - SetUpstream(parent._scheduler.Schedule(default(object), dueTime, InvokeStart)); + SetUpstream(parent._scheduler.Schedule(this, dueTime, (innerScheduler, @this) => @this.InvokeStart(innerScheduler))); } // @@ -156,16 +158,18 @@ public void Run(Periodic parent, TimeSpan dueTime) // selectors. When the system clock changes, intervals will not be the same as diffs between // consecutive absolute time values. The precision will be low (1s range by default). // - private long Tick(long count) + private void Tick() { + var count = _index; + _index = unchecked(count + 1); + ForwardOnNext(count); - return unchecked(count + 1); } private int _pendingTickCount; private IDisposable _periodic; - private IDisposable InvokeStart(IScheduler self, object state) + private IDisposable InvokeStart(IScheduler self) { // // Notice the first call to OnNext will introduce skew if it takes significantly long when @@ -218,7 +222,8 @@ private IDisposable InvokeStart(IScheduler self, object state) var d = new SingleAssignmentDisposable(); _periodic = d; - d.Disposable = self.SchedulePeriodic(1L, _period, (Func)Tock); + _index = 1; + d.Disposable = self.SchedulePeriodic(this, _period, @this => @this.Tock()); try { @@ -238,8 +243,7 @@ private IDisposable InvokeStart(IScheduler self, object state) // if (Interlocked.Decrement(ref _pendingTickCount) > 0) { - var c = new SingleAssignmentDisposable(); - c.Disposable = self.Schedule(1L, CatchUp); + var c = self.Schedule((@this: this, index: 1L), (tuple, action) => tuple.@this.CatchUp(tuple.index, action)); return StableCompositeDisposable.Create(d, c); } @@ -247,7 +251,7 @@ private IDisposable InvokeStart(IScheduler self, object state) return d; } - private long Tock(long count) + private void Tock() { // // Notice the handler for (emulated) periodic scheduling is non-reentrant. @@ -261,14 +265,15 @@ private long Tock(long count) // if (Interlocked.Increment(ref _pendingTickCount) == 1) { + var count = _index; + _index = unchecked(count + 1); + ForwardOnNext(count); Interlocked.Decrement(ref _pendingTickCount); } - - return unchecked(count + 1); } - private void CatchUp(long count, Action recurse) + private void CatchUp(long count, Action<(_, long)> recurse) { try { @@ -286,7 +291,7 @@ private void CatchUp(long count, Action recurse) // if (Interlocked.Decrement(ref _pendingTickCount) > 0) { - recurse(unchecked(count + 1)); + recurse((this, unchecked(count + 1))); } } }