From d09bc2c487ff71dc6be7454ade0d541968c66eff Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Tue, 3 Jul 2018 15:12:28 +0200 Subject: [PATCH 1/4] Replace a static nop-action that's unique per generic type parameter by Stubs.Nop. --- .../Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs index f2d585318f..9e7bf70590 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs @@ -220,7 +220,6 @@ private sealed class TaskDisposeCompletionObserver : IObserver, IDisposa { private readonly IObserver _observer; private Action _disposable; - private static readonly Action DisposedAction = () => { }; public TaskDisposeCompletionObserver(IObserver observer) { @@ -229,7 +228,7 @@ public TaskDisposeCompletionObserver(IObserver observer) public void Dispose() { - Interlocked.Exchange(ref _disposable, DisposedAction)?.Invoke(); + Interlocked.Exchange(ref _disposable, Stubs.Nop)?.Invoke(); } public void OnCompleted() From 796a2c533fd537c93fa786ce32778ba7c2c12721 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Tue, 3 Jul 2018 15:43:01 +0200 Subject: [PATCH 2/4] Save an allocation in CreateWithTaskTokenObservable. --- .../Linq/QueryLanguage.Creation.cs | 69 +++++++++++-------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs index 9e7bf70590..3f96c602c6 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs @@ -68,48 +68,61 @@ public virtual IObservable Create(Func, Can private sealed class CreateWithTaskTokenObservable : ObservableBase { - private readonly Func, CancellationToken, Task> _subscribeAsync; - - public CreateWithTaskTokenObservable(Func, CancellationToken, Task> subscribeAsync) + private sealed class Subscription : IDisposable { - _subscribeAsync = subscribeAsync; - } - - protected override IDisposable SubscribeCore(IObserver observer) - { - var cancellable = new CancellationDisposable(); + private sealed class TaskCompletionObserver : IObserver + { + private readonly IObserver _observer; - var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable(); - var taskCompletionObserver = new TaskCompletionObserver(observer); - var subscription = taskObservable.Subscribe(taskCompletionObserver); + public TaskCompletionObserver(IObserver observer) + { + _observer = observer; + } - return StableCompositeDisposable.Create(cancellable, subscription); - } + public void OnCompleted() + { + _observer.OnCompleted(); + } - private sealed class TaskCompletionObserver : IObserver - { - private readonly IObserver _observer; + public void OnError(Exception error) + { + _observer.OnError(error); + } - public TaskCompletionObserver(IObserver observer) - { - _observer = observer; + public void OnNext(Unit value) + { + // deliberately ignored + } } - public void OnCompleted() - { - _observer.OnCompleted(); - } + private readonly IDisposable _subscription; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); - public void OnError(Exception error) + public Subscription(Func, CancellationToken, Task> subscribeAsync, IObserver observer) { - _observer.OnError(error); + _subscription = subscribeAsync(observer, _cts.Token) + .ToObservable() + .Subscribe(new TaskCompletionObserver(observer)); } - public void OnNext(Unit value) + public void Dispose() { - // deliberately ignored + _cts.Cancel(); + _subscription.Dispose(); } } + + private readonly Func, CancellationToken, Task> _subscribeAsync; + + public CreateWithTaskTokenObservable(Func, CancellationToken, Task> subscribeAsync) + { + _subscribeAsync = subscribeAsync; + } + + protected override IDisposable SubscribeCore(IObserver observer) + { + return new Subscription(_subscribeAsync, observer); + } } public virtual IObservable Create(Func, Task> subscribeAsync) From 7323bdffbe2da25169c0c7f2510529debc3a1308 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Tue, 3 Jul 2018 15:54:29 +0200 Subject: [PATCH 3/4] Save an allocation in CreateWithTaskDisposable. --- .../Linq/QueryLanguage.Creation.cs | 83 +++++++++++-------- 1 file changed, 47 insertions(+), 36 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs index 3f96c602c6..ce4d614381 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs @@ -137,59 +137,70 @@ public virtual IObservable Create(Func, Can private sealed class CreateWithTaskDisposable : ObservableBase { - private readonly Func, CancellationToken, Task> _subscribeAsync; - - public CreateWithTaskDisposable(Func, CancellationToken, Task> subscribeAsync) + private sealed class Subscription : IDisposable { - _subscribeAsync = subscribeAsync; - } + private sealed class TaskDisposeCompletionObserver : IObserver, IDisposable + { + private readonly IObserver _observer; + private IDisposable _disposable; - protected override IDisposable SubscribeCore(IObserver observer) - { - var cancellable = new CancellationDisposable(); + public TaskDisposeCompletionObserver(IObserver observer) + { + _observer = observer; + } - var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable(); + public void Dispose() + { + Disposable.TryDispose(ref _disposable); + } - var taskCompletionObserver = new TaskDisposeCompletionObserver(observer); + public void OnCompleted() + { + _observer.OnCompleted(); + } - // - // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. - // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. - // - taskObservable.Subscribe(taskCompletionObserver); + public void OnError(Exception error) + { + _observer.OnError(error); + } - return StableCompositeDisposable.Create(cancellable, taskCompletionObserver); - } + public void OnNext(IDisposable value) + { + Disposable.SetSingle(ref _disposable, value); + } + } - private sealed class TaskDisposeCompletionObserver : IObserver, IDisposable - { - private readonly IObserver _observer; - private IDisposable _disposable; + private readonly TaskDisposeCompletionObserver _observer; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); - public TaskDisposeCompletionObserver(IObserver observer) + public Subscription(Func, CancellationToken, Task> subscribeAsync, IObserver observer) { - _observer = observer; + // + // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. + // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. + // + subscribeAsync(observer, _cts.Token) + .ToObservable() + .Subscribe(_observer = new TaskDisposeCompletionObserver(observer)); } public void Dispose() { - Disposable.TryDispose(ref _disposable); + _cts.Cancel(); + _observer.Dispose(); } + } - public void OnCompleted() - { - _observer.OnCompleted(); - } + private readonly Func, CancellationToken, Task> _subscribeAsync; - public void OnError(Exception error) - { - _observer.OnError(error); - } + public CreateWithTaskDisposable(Func, CancellationToken, Task> subscribeAsync) + { + _subscribeAsync = subscribeAsync; + } - public void OnNext(IDisposable value) - { - Disposable.SetSingle(ref _disposable, value); - } + protected override IDisposable SubscribeCore(IObserver observer) + { + return new Subscription(_subscribeAsync, observer); } } From cabd7505c9b7ffa4a6c23fb47203d2d2a6feb051 Mon Sep 17 00:00:00 2001 From: Daniel Weber Date: Tue, 3 Jul 2018 16:03:49 +0200 Subject: [PATCH 4/4] Save an allocation in CreateWithTaskActionObservable. --- .../Linq/QueryLanguage.Creation.cs | 89 +++++++++++-------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs index ce4d614381..c623841bc6 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs @@ -216,62 +216,73 @@ public virtual IObservable Create(Func, Can private sealed class CreateWithTaskActionObservable : ObservableBase { - private readonly Func, CancellationToken, Task> _subscribeAsync; - - public CreateWithTaskActionObservable(Func, CancellationToken, Task> subscribeAsync) + private sealed class Subscription : IDisposable { - _subscribeAsync = subscribeAsync; - } + private sealed class TaskDisposeCompletionObserver : IObserver, IDisposable + { + private readonly IObserver _observer; + private Action _disposable; - protected override IDisposable SubscribeCore(IObserver observer) - { - var cancellable = new CancellationDisposable(); + public TaskDisposeCompletionObserver(IObserver observer) + { + _observer = observer; + } - var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable(); + public void Dispose() + { + Interlocked.Exchange(ref _disposable, Stubs.Nop)?.Invoke(); + } - var taskCompletionObserver = new TaskDisposeCompletionObserver(observer); + public void OnCompleted() + { + _observer.OnCompleted(); + } - // - // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. - // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. - // - taskObservable.Subscribe(taskCompletionObserver); + public void OnError(Exception error) + { + _observer.OnError(error); + } - return StableCompositeDisposable.Create(cancellable, taskCompletionObserver); - } + public void OnNext(Action value) + { + if (Interlocked.CompareExchange(ref _disposable, value, null) != null) + { + value?.Invoke(); + } + } + } - private sealed class TaskDisposeCompletionObserver : IObserver, IDisposable - { - private readonly IObserver _observer; - private Action _disposable; + private readonly TaskDisposeCompletionObserver _observer; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); - public TaskDisposeCompletionObserver(IObserver observer) + public Subscription(Func, CancellationToken, Task> subscribeAsync, IObserver observer) { - _observer = observer; + // + // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. + // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. + // + subscribeAsync(observer, _cts.Token) + .ToObservable() + .Subscribe(_observer = new TaskDisposeCompletionObserver(observer)); } public void Dispose() { - Interlocked.Exchange(ref _disposable, Stubs.Nop)?.Invoke(); + _cts.Cancel(); + _observer.Dispose(); } + } - public void OnCompleted() - { - _observer.OnCompleted(); - } + private readonly Func, CancellationToken, Task> _subscribeAsync; - public void OnError(Exception error) - { - _observer.OnError(error); - } + public CreateWithTaskActionObservable(Func, CancellationToken, Task> subscribeAsync) + { + _subscribeAsync = subscribeAsync; + } - public void OnNext(Action value) - { - if (Interlocked.CompareExchange(ref _disposable, value, null) != null) - { - value?.Invoke(); - } - } + protected override IDisposable SubscribeCore(IObserver observer) + { + return new Subscription(_subscribeAsync, observer); } }