Skip to content

Commit

Permalink
Use the stateful overload of AsyncLock in some places to save allocat…
Browse files Browse the repository at this point in the history
…ion of closures and allow delegate caching. (#583)
  • Loading branch information
danielcweber authored Jun 12, 2018
1 parent b6058c0 commit 7551d1a
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 120 deletions.
41 changes: 28 additions & 13 deletions Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,40 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<
if (action == null)
throw new ArgumentNullException(nameof(action));

var state1 = state;
var gate = new AsyncLock();
return new PeriodicallyScheduledWorkItem<TState>(state, period, action);
}

private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
{
private TState _state;
private Func<TState, TState> _action;
private readonly IDisposable _cancel;
private readonly AsyncLock _gate = new AsyncLock();

var cancel = s_cal.StartPeriodicTimer(() =>
public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action)
{
gate.Wait(() =>
{
state1 = action(state1);
});
}, period);
_state = state;
_action = action;

_cancel = s_cal.StartPeriodicTimer(Tick, period);
}

return Disposable.Create(() =>
private void Tick()
{
cancel.Dispose();
gate.Dispose();
action = Stubs<TState>.I;
});
_gate.Wait(
this,
closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state));
}

public void Dispose()
{
_cancel.Dispose();
_gate.Dispose();
_action = Stubs<TState>.I;
}
}


/// <summary>
/// Discovers scheduler services by interface type.
/// </summary>
Expand Down
62 changes: 29 additions & 33 deletions Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,52 +191,48 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<
if (action == null)
throw new ArgumentNullException(nameof(action));

var start = _stopwatch.Elapsed;
var next = start + period;

var state1 = state;
return new PeriodicallyScheduledWorkItem<TState>(this, state, period, action);
}

var td = new TernaryDisposable();
private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
{
private readonly TimeSpan _period;
private readonly Func<TState, TState> _action;
private readonly EventLoopScheduler _scheduler;
private readonly AsyncLock _gate = new AsyncLock();

var gate = new AsyncLock();
td.Extra = gate;
private TState _state;
private TimeSpan _next;
private IDisposable _task;

var tick = default(Func<IScheduler, object, IDisposable>);
tick = (self_, _) =>
public PeriodicallyScheduledWorkItem(EventLoopScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action)
{
next += period;
_state = state;
_period = period;
_action = action;
_scheduler = scheduler;
_next = scheduler._stopwatch.Elapsed + period;

td.Next = self_.Schedule(null, next - _stopwatch.Elapsed, tick);
gate.Wait(() =>
{
state1 = action(state1);
});
return Disposable.Empty;
};
Disposable.TrySetSingle(ref _task, scheduler.Schedule(this, _next - scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_)));
}

td.First = Schedule(null, next - _stopwatch.Elapsed, tick);
private IDisposable Tick(IScheduler self)
{
_next += _period;

return td;
}
Disposable.TrySetMultiple(ref _task, self.Schedule(this, _next - _scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_)));

private sealed class TernaryDisposable : IDisposable
{
private IDisposable _task;
private IDisposable _extra;
_gate.Wait(
this,
closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state));

// If Next was called before this assignment is executed, it won't overwrite
// a more fresh IDisposable task
public IDisposable First { set { Disposable.TrySetSingle(ref _task, value); } }
// It is fine to overwrite the first or previous IDisposable task
public IDisposable Next { set { Disposable.TrySetMultiple(ref _task, value); } }
public IDisposable Extra { set { Disposable.SetSingle(ref _extra, value); } }
return Disposable.Empty;
}

public void Dispose()
{
Disposable.TryDispose(ref _task);
Disposable.TryDispose(ref _extra);
_gate.Dispose();
}
}

Expand Down
42 changes: 23 additions & 19 deletions Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TSta
asyncLock = new AsyncLock();
}

asyncLock.Wait(() =>
{
if (!m.IsDisposed)
asyncLock.Wait(
(@this: this, m, action, state),
tuple =>
{
m.Disposable = action(this, state);
}
});
if (!m.IsDisposed)
{
tuple.m.Disposable = tuple.action(tuple.@this, tuple.state);
}
});

return m;
}
Expand Down Expand Up @@ -113,22 +115,24 @@ private IDisposable ScheduleSlow<TState>(TState state, TimeSpan dueTime, Func<IS
asyncLock = new AsyncLock();
}

asyncLock.Wait(() =>
{
if (!m.IsDisposed)
asyncLock.Wait(
(@this: this, m, state, action, timer, dueTime),
tuple =>
{
var sleep = dueTime - timer.Elapsed;
if (sleep.Ticks > 0)
{
ConcurrencyAbstractionLayer.Current.Sleep(sleep);
}
if (!m.IsDisposed)
if (!tuple.m.IsDisposed)
{
m.Disposable = action(this, state);
var sleep = tuple.dueTime - tuple.timer.Elapsed;
if (sleep.Ticks > 0)
{
ConcurrencyAbstractionLayer.Current.Sleep(sleep);
}
if (!tuple.m.IsDisposed)
{
tuple.m.Disposable = tuple.action(tuple.@this, tuple.state);
}
}
}
});
});

return m;
}
Expand Down
59 changes: 41 additions & 18 deletions Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,54 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<
if (action == null)
throw new ArgumentNullException(nameof(action));

var cancel = new CancellationDisposable();
return new PeriodicallyScheduledWorkItem<TState>(state, period, action, taskFactory);
}

private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
{
private TState _state;

private readonly TimeSpan _period;
private readonly TaskFactory _taskFactory;
private readonly Func<TState, TState> _action;
private readonly AsyncLock _gate = new AsyncLock();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action, TaskFactory taskFactory)
{
_state = state;
_period = period;
_action = action;
_taskFactory = taskFactory;

MoveNext();
}

var state1 = state;
var gate = new AsyncLock();
public void Dispose()
{
_cts.Cancel();
_gate.Dispose();
}

var moveNext = default(Action);
moveNext = () =>
private void MoveNext()
{
TaskHelpers.Delay(period, cancel.Token).ContinueWith(
_ =>
TaskHelpers.Delay(_period, _cts.Token).ContinueWith(
(_, thisObject) =>
{
moveNext();
var @this = (PeriodicallyScheduledWorkItem<TState>)thisObject;
@this.MoveNext();
gate.Wait(() =>
{
state1 = action(state1);
});
@this._gate.Wait(
@this,
closureThis => closureThis._state = closureThis._action(closureThis._state));
},
CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler
this,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion,
_taskFactory.Scheduler
);
};

moveNext();

return StableCompositeDisposable.Create(cancel, gate);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,26 +157,40 @@ public IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<
if (action == null)
throw new ArgumentNullException(nameof(action));

var state1 = state;
var gate = new AsyncLock();
return new PeriodicallyScheduledWorkItem<TState>(state, period, action);
}

var res = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer(
tpt =>
{
gate.Wait(() =>
{
state1 = action(state1);
});
},
period
);
private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
{
private TState _state;
private Func<TState, TState> _action;

private readonly ThreadPoolTimer _timer;
private readonly AsyncLock _gate = new AsyncLock();

return Disposable.Create(() =>
public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action)
{
res.Cancel();
gate.Dispose();
action = Stubs<TState>.I;
});
_state = state;
_action = action;

_timer = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer(
Tick,
period);
}

private void Tick(ThreadPoolTimer timer)
{
_gate.Wait(
this,
@this => @this._state = @this._action(@this._state));
}

public void Dispose()
{
_timer.Cancel();
_gate.Dispose();
_action = Stubs<TState>.I;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,12 @@ public PeriodicTimer(TState state, TimeSpan period, Func<TState, TState> action)

private void Tick(object state)
{
_gate.Wait(() =>
{
_state = _action(_state);
});
_gate.Wait(
this,
@this =>
{
@this._state = @this._action(@this._state);
});
}

public void Dispose()
Expand Down
21 changes: 9 additions & 12 deletions Rx.NET/Source/src/System.Reactive/Internal/AsyncLockObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,23 @@ public AsyncLockObserver(IObserver<T> observer, AsyncLock gate)

protected override void OnNextCore(T value)
{
_gate.Wait(() =>
{
_observer.OnNext(value);
});
_gate.Wait(
(_observer, value),
tuple => tuple._observer.OnNext(tuple.value));
}

protected override void OnErrorCore(Exception exception)
{
_gate.Wait(() =>
{
_observer.OnError(exception);
});
_gate.Wait(
(_observer, exception),
tuple => tuple._observer.OnError(tuple.exception));
}

protected override void OnCompletedCore()
{
_gate.Wait(() =>
{
_observer.OnCompleted();
});
_gate.Wait(
_observer,
closureObserver => closureObserver.OnCompleted());
}
}
}
4 changes: 2 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ public override void Run(IObservable<TSource> source)

base.Run(source);

_bufferGate.Wait(CreateBufferClose);
_bufferGate.Wait(this, @this => @this.CreateBufferClose());
}

protected override void Dispose(bool disposing)
Expand Down Expand Up @@ -564,7 +564,7 @@ private void CloseBuffer(IDisposable closingSubscription)
ForwardOnNext(res);
}

_bufferGate.Wait(CreateBufferClose);
_bufferGate.Wait(this, @this => @this.CreateBufferClose());
}

private sealed class BufferClosingObserver : IObserver<TBufferClosing>
Expand Down
Loading

0 comments on commit 7551d1a

Please sign in to comment.