Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review Throttle, save some allocations. #638

Merged
merged 6 commits into from
Jun 26, 2018
60 changes: 21 additions & 39 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,19 @@ public _(Throttle<TSource> parent, IObserver<TSource> observer)
_scheduler = parent._scheduler;
}

private object _gate;
private readonly object _gate = new object();
private TSource _value;
private bool _hasValue;
private IDisposable _serialCancelable;
private ulong _id;

public override void Run(IObservable<TSource> source)
{
_gate = new object();
_value = default(TSource);
_hasValue = false;
_id = 0UL;

base.Run(source);
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _serialCancelable);
}

base.Dispose(disposing);
}

Expand All @@ -71,12 +62,12 @@ public override void OnNext(TSource value)
_id = unchecked(_id + 1);
currentid = _id;
}
var d = new SingleAssignmentDisposable();
Disposable.TrySetSerial(ref _serialCancelable, d);
d.Disposable = _scheduler.Schedule(currentid, _dueTime, Propagate);

Disposable.TrySetSerial(ref _serialCancelable, null);
Disposable.TrySetSerial(ref _serialCancelable, _scheduler.Schedule((@this: this, currentid), _dueTime, (_, tuple) => tuple.@this.Propagate(tuple.currentid)));
}

private IDisposable Propagate(IScheduler self, ulong currentid)
private IDisposable Propagate(ulong currentid)
{
lock (_gate)
{
Expand Down Expand Up @@ -132,7 +123,7 @@ public Throttle(IObservable<TSource> source, Func<TSource, IObservable<TThrottle

protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);

protected override void Run(_ sink) => sink.Run(this);
protected override void Run(_ sink) => sink.Run(_source);

internal sealed class _ : IdentitySink<TSource>
{
Expand All @@ -144,22 +135,12 @@ public _(Throttle<TSource, TThrottle> parent, IObserver<TSource> observer)
_throttleSelector = parent._throttleSelector;
}

private object _gate;
private readonly object _gate = new object();
private TSource _value;
private bool _hasValue;
private IDisposable _serialCancelable;
private ulong _id;

public void Run(Throttle<TSource, TThrottle> parent)
{
_gate = new object();
_value = default(TSource);
_hasValue = false;
_id = 0UL;

base.Run(parent._source);
}

protected override void Dispose(bool disposing)
{
if (disposing)
Expand Down Expand Up @@ -195,9 +176,12 @@ public override void OnNext(TSource value)
currentid = _id;
}

var d = new SingleAssignmentDisposable();
Disposable.TrySetSerial(ref _serialCancelable, d);
d.Disposable = throttle.SubscribeSafe(new ThrottleObserver(this, value, currentid, d));
Disposable.TrySetSerial(ref _serialCancelable, null);

var newInnerObserver = new ThrottleObserver(this, value, currentid);
newInnerObserver.SetResource(throttle.SubscribeSafe(newInnerObserver));

Disposable.TrySetSerial(ref _serialCancelable, newInnerObserver);
}

public override void OnError(Exception error)
Expand Down Expand Up @@ -229,50 +213,48 @@ public override void OnCompleted()
}
}

private sealed class ThrottleObserver : IObserver<TThrottle>
private sealed class ThrottleObserver : SafeObserver<TThrottle>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly ulong _currentid;
private readonly IDisposable _self;

public ThrottleObserver(_ parent, TSource value, ulong currentid, IDisposable self)
public ThrottleObserver(_ parent, TSource value, ulong currentid)
{
_parent = parent;
_value = value;
_currentid = currentid;
_self = self;
}

public void OnNext(TThrottle value)
public override void OnNext(TThrottle value)
{
lock (_parent._gate)
{
if (_parent._hasValue && _parent._id == _currentid)
_parent.ForwardOnNext(_value);

_parent._hasValue = false;
_self.Dispose();
Dispose();
}
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
lock (_parent._gate)
{
_parent.ForwardOnError(error);
}
}

public void OnCompleted()
public override void OnCompleted()
{
lock (_parent._gate)
{
if (_parent._hasValue && _parent._id == _currentid)
_parent.ForwardOnNext(_value);

_parent._hasValue = false;
_self.Dispose();
Dispose();
}
}
}
Expand Down