Skip to content

Commit

Permalink
4.x: Fix PeriodicTimerSystemClockMonitor concurrency & failure behavi…
Browse files Browse the repository at this point in the history
…or (#528)
  • Loading branch information
akarnokd authored and danielcweber committed Jun 5, 2018
1 parent d1a7ee2 commit bd7178f
Showing 1 changed file with 36 additions and 12 deletions.
48 changes: 36 additions & 12 deletions Rx.NET/Source/src/System.Reactive/Internal/SystemClock.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System.ComponentModel;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
using System.Threading.Tasks;

namespace System.Reactive.PlatformServices
{
Expand Down Expand Up @@ -39,7 +41,12 @@ public class PeriodicTimerSystemClockMonitor : INotifySystemClockChanged
private readonly TimeSpan _period;
private readonly SerialDisposable _timer;

private DateTimeOffset _lastTime;
/// <summary>
/// Use the Unix milliseconds for the current time
/// so it can be atomically read/written without locking.
/// </summary>
private long _lastTimeUnixMillis;

private EventHandler<SystemClockChangedEventArgs> _systemClockChanged;

private const int SYNC_MAXRETRIES = 100;
Expand Down Expand Up @@ -80,30 +87,47 @@ private void NewTimer()
{
_timer.Disposable = Disposable.Empty;

var n = 0;
do
var n = 0L;
for (; ; )
{
_lastTime = SystemClock.UtcNow;
var now = SystemClock.UtcNow.ToUnixTimeMilliseconds();
Interlocked.Exchange(ref _lastTimeUnixMillis, now);

_timer.Disposable = ConcurrencyAbstractionLayer.Current.StartPeriodicTimer(TimeChanged, _period);
} while (Math.Abs((SystemClock.UtcNow - _lastTime).TotalMilliseconds) > SYNC_MAXDELTA && ++n < SYNC_MAXRETRIES);

if (n >= SYNC_MAXRETRIES)
throw new InvalidOperationException(Strings_Core.FAILED_CLOCK_MONITORING);
if (Math.Abs(SystemClock.UtcNow.ToUnixTimeMilliseconds() - now) <= SYNC_MAXDELTA)
{
break;
}
if (_timer.Disposable == Disposable.Empty)
{
break;
}
if (++n >= SYNC_MAXRETRIES)
{
Task.Delay((int)SYNC_MAXDELTA).Wait();
}
};
}

private void TimeChanged()
{
var now = SystemClock.UtcNow;
var diff = now - (_lastTime + _period);
if (Math.Abs(diff.TotalMilliseconds) >= MAXERROR)
var newTime = SystemClock.UtcNow;
var now = newTime.ToUnixTimeMilliseconds();
var last = Volatile.Read(ref _lastTimeUnixMillis);

var oldTime = (long)(last + _period.TotalMilliseconds);
var diff = now - oldTime;
if (Math.Abs(diff) >= MAXERROR)
{
_systemClockChanged?.Invoke(this, new SystemClockChangedEventArgs(_lastTime + _period, now));
_systemClockChanged?.Invoke(this, new SystemClockChangedEventArgs(
DateTimeOffset.FromUnixTimeMilliseconds(oldTime), newTime));

NewTimer();
}
else
{
_lastTime = SystemClock.UtcNow;
Interlocked.Exchange(ref _lastTimeUnixMillis, SystemClock.UtcNow.ToUnixTimeMilliseconds());
}
}
}
Expand Down

0 comments on commit bd7178f

Please sign in to comment.