From 74d568623419ef5d8f299ccaebaa647a8d655495 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 21 Jun 2018 20:57:10 +0200 Subject: [PATCH 1/5] Don't override Run, the base methode will handle it. --- .../src/System.Reactive/Linq/Observable/Throttle.cs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs index ae45413767..a84f057c0e 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs @@ -36,22 +36,12 @@ public _(Throttle parent, IObserver observer) _scheduler = parent._scheduler; } - private object _gate; + private readonly object _gate = new object(); private TSource _value; private bool _hasValue; private IDisposable _serialCancelable; private ulong _id; - public override void Run(IObservable source) - { - _gate = new object(); - _value = default(TSource); - _hasValue = false; - _id = 0UL; - - base.Run(source); - } - protected override void Dispose(bool disposing) { if (disposing) From 0fceacfe12657f62ed739d370eee6c4540dba736 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 21 Jun 2018 21:07:24 +0200 Subject: [PATCH 2/5] Save closure allocation and allow delegate caching. --- Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs index a84f057c0e..991ee16d58 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs @@ -63,10 +63,10 @@ public override void OnNext(TSource value) } var d = new SingleAssignmentDisposable(); Disposable.TrySetSerial(ref _serialCancelable, d); - d.Disposable = _scheduler.Schedule(currentid, _dueTime, Propagate); + d.Disposable = _scheduler.Schedule((@this: this, currentid), _dueTime, (_, tuple) => tuple.@this.Propagate(tuple.currentid)); } - private IDisposable Propagate(IScheduler self, ulong currentid) + private IDisposable Propagate(ulong currentid) { lock (_gate) { From 2c8c0fa0eaa3bc0939dcddc8bc0faba629498d14 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 21 Jun 2018 21:14:25 +0200 Subject: [PATCH 3/5] Save the allocation of a SingleAssignmentDisposable in OnNext. It's purpose is only to dispose of the currently scheduled timer early. We can do that with another call to TrySetSerial, passing null. --- .../Source/src/System.Reactive/Linq/Observable/Throttle.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs index 991ee16d58..27b5bf64ae 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs @@ -48,6 +48,7 @@ protected override void Dispose(bool disposing) { Disposable.TryDispose(ref _serialCancelable); } + base.Dispose(disposing); } @@ -61,9 +62,9 @@ public override void OnNext(TSource value) _id = unchecked(_id + 1); currentid = _id; } - var d = new SingleAssignmentDisposable(); - Disposable.TrySetSerial(ref _serialCancelable, d); - d.Disposable = _scheduler.Schedule((@this: this, currentid), _dueTime, (_, tuple) => tuple.@this.Propagate(tuple.currentid)); + + Disposable.TrySetSerial(ref _serialCancelable, null); + Disposable.TrySetSerial(ref _serialCancelable, _scheduler.Schedule((@this: this, currentid), _dueTime, (_, tuple) => tuple.@this.Propagate(tuple.currentid))); } private IDisposable Propagate(ulong currentid) From 0c1dd0ce76aeaee4d657d9f30db0e1109097dca1 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 21 Jun 2018 21:20:08 +0200 Subject: [PATCH 4/5] Don't override Run, the base implementation has it covered. --- .../System.Reactive/Linq/Observable/Throttle.cs | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs index 27b5bf64ae..9157e14b3e 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs @@ -123,7 +123,7 @@ public Throttle(IObservable source, Func observer) => new _(this, observer); - protected override void Run(_ sink) => sink.Run(this); + protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : IdentitySink { @@ -135,22 +135,12 @@ public _(Throttle parent, IObserver observer) _throttleSelector = parent._throttleSelector; } - private object _gate; + private readonly object _gate = new object(); private TSource _value; private bool _hasValue; private IDisposable _serialCancelable; private ulong _id; - public void Run(Throttle parent) - { - _gate = new object(); - _value = default(TSource); - _hasValue = false; - _id = 0UL; - - base.Run(parent._source); - } - protected override void Dispose(bool disposing) { if (disposing) From e17a553500c842baa06126e1915d51bb026af464 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Thu, 21 Jun 2018 21:25:22 +0200 Subject: [PATCH 5/5] Let ThrottleObserver inherit from SafeObserver so it can hold onto its own subscription, saving the allocation of a SingleAssignmentDisposable. --- .../Linq/Observable/Throttle.cs | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs index 9157e14b3e..dd3710a1dc 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs @@ -176,9 +176,12 @@ public override void OnNext(TSource value) currentid = _id; } - var d = new SingleAssignmentDisposable(); - Disposable.TrySetSerial(ref _serialCancelable, d); - d.Disposable = throttle.SubscribeSafe(new ThrottleObserver(this, value, currentid, d)); + Disposable.TrySetSerial(ref _serialCancelable, null); + + var newInnerObserver = new ThrottleObserver(this, value, currentid); + newInnerObserver.SetResource(throttle.SubscribeSafe(newInnerObserver)); + + Disposable.TrySetSerial(ref _serialCancelable, newInnerObserver); } public override void OnError(Exception error) @@ -210,22 +213,20 @@ public override void OnCompleted() } } - private sealed class ThrottleObserver : IObserver + private sealed class ThrottleObserver : SafeObserver { private readonly _ _parent; private readonly TSource _value; private readonly ulong _currentid; - private readonly IDisposable _self; - public ThrottleObserver(_ parent, TSource value, ulong currentid, IDisposable self) + public ThrottleObserver(_ parent, TSource value, ulong currentid) { _parent = parent; _value = value; _currentid = currentid; - _self = self; } - public void OnNext(TThrottle value) + public override void OnNext(TThrottle value) { lock (_parent._gate) { @@ -233,11 +234,11 @@ public void OnNext(TThrottle value) _parent.ForwardOnNext(_value); _parent._hasValue = false; - _self.Dispose(); + Dispose(); } } - public void OnError(Exception error) + public override void OnError(Exception error) { lock (_parent._gate) { @@ -245,7 +246,7 @@ public void OnError(Exception error) } } - public void OnCompleted() + public override void OnCompleted() { lock (_parent._gate) { @@ -253,7 +254,7 @@ public void OnCompleted() _parent.ForwardOnNext(_value); _parent._hasValue = false; - _self.Dispose(); + Dispose(); } } }