Skip to content

Commit

Permalink
Improve Timer() scheduler handling (#700)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored and danielcweber committed Jun 28, 2018
1 parent 037305e commit 837e65b
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ internal sealed class _ : IdentitySink<long>
{
private readonly TimeSpan _period;

long _index;

public _(TimeSpan period, IObserver<long> observer)
: base(observer)
{
Expand All @@ -128,7 +130,7 @@ public _(TimeSpan period, IObserver<long> observer)

public void Run(Periodic parent, DateTimeOffset dueTime)
{
SetUpstream(parent._scheduler.Schedule(default(object), dueTime, InvokeStart));
SetUpstream(parent._scheduler.Schedule(this, dueTime, (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
}

public void Run(Periodic parent, TimeSpan dueTime)
Expand All @@ -137,9 +139,9 @@ public void Run(Periodic parent, TimeSpan dueTime)
// Optimize for the case of Observable.Interval.
//
if (dueTime == _period)
SetUpstream(parent._scheduler.SchedulePeriodic(0L, _period, (Func<long, long>)Tick));
SetUpstream(parent._scheduler.SchedulePeriodic(this, _period, @this => @this.Tick()));
else
SetUpstream(parent._scheduler.Schedule(default(object), dueTime, InvokeStart));
SetUpstream(parent._scheduler.Schedule(this, dueTime, (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
}

//
Expand All @@ -156,16 +158,18 @@ public void Run(Periodic parent, TimeSpan dueTime)
// selectors. When the system clock changes, intervals will not be the same as diffs between
// consecutive absolute time values. The precision will be low (1s range by default).
//
private long Tick(long count)
private void Tick()
{
var count = _index;
_index = unchecked(count + 1);

ForwardOnNext(count);
return unchecked(count + 1);
}

private int _pendingTickCount;
private IDisposable _periodic;

private IDisposable InvokeStart(IScheduler self, object state)
private IDisposable InvokeStart(IScheduler self)
{
//
// Notice the first call to OnNext will introduce skew if it takes significantly long when
Expand Down Expand Up @@ -218,7 +222,8 @@ private IDisposable InvokeStart(IScheduler self, object state)

var d = new SingleAssignmentDisposable();
_periodic = d;
d.Disposable = self.SchedulePeriodic(1L, _period, (Func<long, long>)Tock);
_index = 1;
d.Disposable = self.SchedulePeriodic(this, _period, @this => @this.Tock());

try
{
Expand All @@ -238,16 +243,15 @@ private IDisposable InvokeStart(IScheduler self, object state)
//
if (Interlocked.Decrement(ref _pendingTickCount) > 0)
{
var c = new SingleAssignmentDisposable();
c.Disposable = self.Schedule(1L, CatchUp);
var c = self.Schedule((@this: this, index: 1L), (tuple, action) => tuple.@this.CatchUp(tuple.index, action));

return StableCompositeDisposable.Create(d, c);
}

return d;
}

private long Tock(long count)
private void Tock()
{
//
// Notice the handler for (emulated) periodic scheduling is non-reentrant.
Expand All @@ -261,14 +265,15 @@ private long Tock(long count)
//
if (Interlocked.Increment(ref _pendingTickCount) == 1)
{
var count = _index;
_index = unchecked(count + 1);

ForwardOnNext(count);
Interlocked.Decrement(ref _pendingTickCount);
}

return unchecked(count + 1);
}

private void CatchUp(long count, Action<long> recurse)
private void CatchUp(long count, Action<(_, long)> recurse)
{
try
{
Expand All @@ -286,7 +291,7 @@ private void CatchUp(long count, Action<long> recurse)
//
if (Interlocked.Decrement(ref _pendingTickCount) > 0)
{
recurse(unchecked(count + 1));
recurse((this, unchecked(count + 1)));
}
}
}
Expand Down

0 comments on commit 837e65b

Please sign in to comment.