Skip to content

Commit

Permalink
Review Throttle, save some allocations. (#638)
Browse files Browse the repository at this point in the history
* Don't override Run, the base methode will handle it.

* Save closure allocation and allow delegate caching.

* Save the allocation of a SingleAssignmentDisposable in OnNext. It's purpose is only to dispose of the currently scheduled timer early. We can do that with another call to TrySetSerial, passing null.

* Don't override Run, the base implementation has it covered.

* Let ThrottleObserver inherit from SafeObserver so it can hold onto its own subscription, saving the allocation of a SingleAssignmentDisposable.
  • Loading branch information
danielcweber authored Jun 26, 2018
1 parent 0231e8d commit c9bf3f1
Showing 1 changed file with 21 additions and 39 deletions.
60 changes: 21 additions & 39 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,19 @@ public _(Throttle<TSource> parent, IObserver<TSource> observer)
_scheduler = parent._scheduler;
}

private object _gate;
private readonly object _gate = new object();
private TSource _value;
private bool _hasValue;
private IDisposable _serialCancelable;
private ulong _id;

public override void Run(IObservable<TSource> source)
{
_gate = new object();
_value = default(TSource);
_hasValue = false;
_id = 0UL;

base.Run(source);
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _serialCancelable);
}

base.Dispose(disposing);
}

Expand All @@ -71,12 +62,12 @@ public override void OnNext(TSource value)
_id = unchecked(_id + 1);
currentid = _id;
}
var d = new SingleAssignmentDisposable();
Disposable.TrySetSerial(ref _serialCancelable, d);
d.Disposable = _scheduler.Schedule(currentid, _dueTime, Propagate);

Disposable.TrySetSerial(ref _serialCancelable, null);
Disposable.TrySetSerial(ref _serialCancelable, _scheduler.Schedule((@this: this, currentid), _dueTime, (_, tuple) => tuple.@this.Propagate(tuple.currentid)));
}

private IDisposable Propagate(IScheduler self, ulong currentid)
private IDisposable Propagate(ulong currentid)
{
lock (_gate)
{
Expand Down Expand Up @@ -132,7 +123,7 @@ public Throttle(IObservable<TSource> source, Func<TSource, IObservable<TThrottle

protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);

protected override void Run(_ sink) => sink.Run(this);
protected override void Run(_ sink) => sink.Run(_source);

internal sealed class _ : IdentitySink<TSource>
{
Expand All @@ -144,22 +135,12 @@ public _(Throttle<TSource, TThrottle> parent, IObserver<TSource> observer)
_throttleSelector = parent._throttleSelector;
}

private object _gate;
private readonly object _gate = new object();
private TSource _value;
private bool _hasValue;
private IDisposable _serialCancelable;
private ulong _id;

public void Run(Throttle<TSource, TThrottle> parent)
{
_gate = new object();
_value = default(TSource);
_hasValue = false;
_id = 0UL;

base.Run(parent._source);
}

protected override void Dispose(bool disposing)
{
if (disposing)
Expand Down Expand Up @@ -195,9 +176,12 @@ public override void OnNext(TSource value)
currentid = _id;
}

var d = new SingleAssignmentDisposable();
Disposable.TrySetSerial(ref _serialCancelable, d);
d.Disposable = throttle.SubscribeSafe(new ThrottleObserver(this, value, currentid, d));
Disposable.TrySetSerial(ref _serialCancelable, null);

var newInnerObserver = new ThrottleObserver(this, value, currentid);
newInnerObserver.SetResource(throttle.SubscribeSafe(newInnerObserver));

Disposable.TrySetSerial(ref _serialCancelable, newInnerObserver);
}

public override void OnError(Exception error)
Expand Down Expand Up @@ -229,50 +213,48 @@ public override void OnCompleted()
}
}

private sealed class ThrottleObserver : IObserver<TThrottle>
private sealed class ThrottleObserver : SafeObserver<TThrottle>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly ulong _currentid;
private readonly IDisposable _self;

public ThrottleObserver(_ parent, TSource value, ulong currentid, IDisposable self)
public ThrottleObserver(_ parent, TSource value, ulong currentid)
{
_parent = parent;
_value = value;
_currentid = currentid;
_self = self;
}

public void OnNext(TThrottle value)
public override void OnNext(TThrottle value)
{
lock (_parent._gate)
{
if (_parent._hasValue && _parent._id == _currentid)
_parent.ForwardOnNext(_value);

_parent._hasValue = false;
_self.Dispose();
Dispose();
}
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
lock (_parent._gate)
{
_parent.ForwardOnError(error);
}
}

public void OnCompleted()
public override void OnCompleted()
{
lock (_parent._gate)
{
if (_parent._hasValue && _parent._id == _currentid)
_parent.ForwardOnNext(_value);

_parent._hasValue = false;
_self.Dispose();
Dispose();
}
}
}
Expand Down

0 comments on commit c9bf3f1

Please sign in to comment.