Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Improve Timer() scheduler handling #700

Merged
merged 2 commits into from
Jun 28, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ internal sealed class _ : IdentitySink<long>
{
private readonly TimeSpan _period;

long _index;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good without volatile ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the schedulers won't execute Tick and Tock concurrently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a ForwardOnNext in both Tick and Tock, which has always required those methods to run non-concurrently with themselves and the schedulers ensure that by default. The fact that they now take the value of a field is irrelevant because reading/writing that field followed by the ForwardOnNext call still has to be non-concurrent.


public _(TimeSpan period, IObserver<long> observer)
: base(observer)
{
Expand All @@ -128,7 +130,7 @@ public _(TimeSpan period, IObserver<long> observer)

public void Run(Periodic parent, DateTimeOffset dueTime)
{
SetUpstream(parent._scheduler.Schedule(default(object), dueTime, InvokeStart));
SetUpstream(parent._scheduler.Schedule(this, dueTime, (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
}

public void Run(Periodic parent, TimeSpan dueTime)
Expand All @@ -137,9 +139,9 @@ public void Run(Periodic parent, TimeSpan dueTime)
// Optimize for the case of Observable.Interval.
//
if (dueTime == _period)
SetUpstream(parent._scheduler.SchedulePeriodic(0L, _period, (Func<long, long>)Tick));
SetUpstream(parent._scheduler.SchedulePeriodic(this, _period, @this => @this.Tick()));
else
SetUpstream(parent._scheduler.Schedule(default(object), dueTime, InvokeStart));
SetUpstream(parent._scheduler.Schedule(this, dueTime, (innerScheduler, @this) => @this.InvokeStart(innerScheduler)));
}

//
Expand All @@ -156,16 +158,18 @@ public void Run(Periodic parent, TimeSpan dueTime)
// selectors. When the system clock changes, intervals will not be the same as diffs between
// consecutive absolute time values. The precision will be low (1s range by default).
//
private long Tick(long count)
private void Tick()
{
var count = _index;
_index = unchecked(count + 1);

ForwardOnNext(count);
return unchecked(count + 1);
}

private int _pendingTickCount;
private IDisposable _periodic;

private IDisposable InvokeStart(IScheduler self, object state)
private IDisposable InvokeStart(IScheduler self)
{
//
// Notice the first call to OnNext will introduce skew if it takes significantly long when
Expand Down Expand Up @@ -218,7 +222,8 @@ private IDisposable InvokeStart(IScheduler self, object state)

var d = new SingleAssignmentDisposable();
_periodic = d;
d.Disposable = self.SchedulePeriodic(1L, _period, (Func<long, long>)Tock);
_index = 1;
d.Disposable = self.SchedulePeriodic(this, _period, @this => @this.Tock());

try
{
Expand All @@ -238,16 +243,15 @@ private IDisposable InvokeStart(IScheduler self, object state)
//
if (Interlocked.Decrement(ref _pendingTickCount) > 0)
{
var c = new SingleAssignmentDisposable();
c.Disposable = self.Schedule(1L, CatchUp);
var c = self.Schedule((@this: this, index: 1L), (tuple, action) => [email protected](tuple.index, action));

return StableCompositeDisposable.Create(d, c);
}

return d;
}

private long Tock(long count)
private void Tock()
{
//
// Notice the handler for (emulated) periodic scheduling is non-reentrant.
Expand All @@ -261,14 +265,15 @@ private long Tock(long count)
//
if (Interlocked.Increment(ref _pendingTickCount) == 1)
{
var count = _index;
_index = unchecked(count + 1);

ForwardOnNext(count);
Interlocked.Decrement(ref _pendingTickCount);
}

return unchecked(count + 1);
}

private void CatchUp(long count, Action<long> recurse)
private void CatchUp(long count, Action<(_, long)> recurse)
{
try
{
Expand All @@ -286,7 +291,7 @@ private void CatchUp(long count, Action<long> recurse)
//
if (Interlocked.Decrement(ref _pendingTickCount) > 0)
{
recurse(unchecked(count + 1));
recurse((this, unchecked(count + 1)));
}
}
}
Expand Down