Skip to content

Commit

Permalink
4.x: Improve Generate() internals (#674)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored and danielcweber committed Jun 28, 2018
1 parent 90aae53 commit 262544b
Showing 1 changed file with 54 additions and 47 deletions.
101 changes: 54 additions & 47 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,38 @@ public NoTime(TState initialState, Func<TState, bool> condition, Func<TState, TS

protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);

protected override void Run(_ sink) => sink.Run();
protected override void Run(_ sink) => sink.Run(_scheduler);

internal sealed class _ : IdentitySink<TResult>
{
// CONSIDER: This sink has a parent reference that can be considered for removal.

private readonly NoTime _parent;
readonly Func<TState, bool> _condition;
readonly Func<TState, TState> _iterate;
readonly Func<TState, TResult> _resultSelector;

public _(NoTime parent, IObserver<TResult> observer)
: base(observer)
{
_parent = parent;
_condition = parent._condition;
_iterate = parent._iterate;
_resultSelector = parent._resultSelector;

_state = parent._initialState;
_first = true;
}

private TState _state;
private bool _first;

public void Run()
public void Run(IScheduler _scheduler)
{
_state = _parent._initialState;
_first = true;

var longRunning = _parent._scheduler.AsLongRunning();
var longRunning = _scheduler.AsLongRunning();
if (longRunning != null)
{
SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.Loop(c)));
}
else
{
SetUpstream(_parent._scheduler.Schedule(this, (@this, a) => @this.LoopRec(a)));
SetUpstream(_scheduler.Schedule(this, (@this, a) => @this.LoopRec(a)));
}
}

Expand All @@ -75,14 +77,14 @@ private void Loop(ICancelable cancel)
}
else
{
_state = _parent._iterate(_state);
_state = _iterate(_state);
}

hasResult = _parent._condition(_state);
hasResult = _condition(_state);

if (hasResult)
{
result = _parent._resultSelector(_state);
result = _resultSelector(_state);
}
}
catch (Exception exception)
Expand Down Expand Up @@ -119,14 +121,14 @@ private void LoopRec(Action<_> recurse)
}
else
{
_state = _parent._iterate(_state);
_state = _iterate(_state);
}

hasResult = _parent._condition(_state);
hasResult = _condition(_state);

if (hasResult)
{
result = _parent._resultSelector(_state);
result = _resultSelector(_state);
}
}
catch (Exception exception)
Expand Down Expand Up @@ -169,31 +171,34 @@ public Absolute(TState initialState, Func<TState, bool> condition, Func<TState,

protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);

protected override void Run(_ sink) => sink.Run();
protected override void Run(_ sink) => sink.Run(_scheduler, _initialState);

internal sealed class _ : IdentitySink<TResult>
{
// CONSIDER: This sink has a parent reference that can be considered for removal.

private readonly Absolute _parent;
readonly Func<TState, bool> _condition;
readonly Func<TState, TState> _iterate;
readonly Func<TState, TResult> _resultSelector;
readonly Func<TState, DateTimeOffset> _timeSelector;

public _(Absolute parent, IObserver<TResult> observer)
: base(observer)
{
_parent = parent;
_condition = parent._condition;
_iterate = parent._iterate;
_resultSelector = parent._resultSelector;
_timeSelector = parent._timeSelector;

_first = true;
}

private bool _first;
private bool _hasResult;

private TResult _result;

public void Run()
public void Run(IScheduler outerScheduler, TState initialState)
{
_first = true;
_hasResult = false;
_result = default(TResult);

SetUpstream(_parent._scheduler.Schedule((@this: this, _parent._initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple._initialState)));
SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState)));
}

private IDisposable InvokeRec(IScheduler self, TState state)
Expand All @@ -213,15 +218,15 @@ private IDisposable InvokeRec(IScheduler self, TState state)
}
else
{
state = _parent._iterate(state);
state = _iterate(state);
}

_hasResult = _parent._condition(state);
_hasResult = _condition(state);

if (_hasResult)
{
_result = _parent._resultSelector(state);
time = _parent._timeSelector(state);
_result = _resultSelector(state);
time = _timeSelector(state);
}
}
catch (Exception exception)
Expand Down Expand Up @@ -262,31 +267,33 @@ public Relative(TState initialState, Func<TState, bool> condition, Func<TState,

protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);

protected override void Run(_ sink) => sink.Run();
protected override void Run(_ sink) => sink.Run(_scheduler, _initialState);

internal sealed class _ : IdentitySink<TResult>
{
// CONSIDER: This sink has a parent reference that can be considered for removal.

private readonly Relative _parent;
private readonly Func<TState, bool> _condition;
private readonly Func<TState, TState> _iterate;
private readonly Func<TState, TResult> _resultSelector;
private readonly Func<TState, TimeSpan> _timeSelector;

public _(Relative parent, IObserver<TResult> observer)
: base(observer)
{
_parent = parent;
_condition = parent._condition;
_iterate = parent._iterate;
_resultSelector = parent._resultSelector;
_timeSelector = parent._timeSelector;

_first = true;
}

private bool _first;
private bool _hasResult;
private TResult _result;

public void Run()
public void Run(IScheduler outerScheduler, TState initialState)
{
_first = true;
_hasResult = false;
_result = default(TResult);

SetUpstream(_parent._scheduler.Schedule((@this: this, _parent._initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple._initialState)));
SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => tuple.@this.InvokeRec(scheduler, tuple.initialState)));
}

private IDisposable InvokeRec(IScheduler self, TState state)
Expand All @@ -306,15 +313,15 @@ private IDisposable InvokeRec(IScheduler self, TState state)
}
else
{
state = _parent._iterate(state);
state = _iterate(state);
}

_hasResult = _parent._condition(state);
_hasResult = _condition(state);

if (_hasResult)
{
_result = _parent._resultSelector(state);
time = _parent._timeSelector(state);
_result = _resultSelector(state);
time = _timeSelector(state);
}
}
catch (Exception exception)
Expand Down

0 comments on commit 262544b

Please sign in to comment.