From e7976a8e55cf221796ca50a93ee11ce6d539942f Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 4 May 2018 15:38:42 +0200 Subject: [PATCH 1/3] 4.x: Optimize Concat(IObservable>) --- .../Linq/Observable/ConcatMany.cs | 236 ++++++++++++++++++ .../Linq/QueryLanguage.Multiple.cs | 2 +- 2 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs new file mode 100644 index 0000000000..c92bb52125 --- /dev/null +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs @@ -0,0 +1,236 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Reactive.Disposables; +using System.Text; +using System.Threading; + +namespace System.Reactive.Linq.ObservableImpl +{ + internal sealed class ConcatMany : IObservable + { + readonly IObservable> sources; + + internal ConcatMany(IObservable> sources) + { + this.sources = sources; + } + + public IDisposable Subscribe(IObserver observer) + { + var parent = new ConcatManyOuterObserver(observer); + var d = sources.SubscribeSafe(parent); + parent.OnSubscribe(d); + return parent; + } + + internal sealed class ConcatManyOuterObserver : IObserver>, IDisposable + { + readonly IObserver downstream; + + readonly ConcurrentQueue> queue; + + readonly InnerObserver innerObserver; + + IDisposable upstream; + + int trampoline; + + Exception error; + + bool done; + + int active; + + internal ConcatManyOuterObserver(IObserver downstream) + { + this.downstream = downstream; + this.queue = new ConcurrentQueue>(); + this.innerObserver = new InnerObserver(this); + } + + internal void OnSubscribe(IDisposable d) + { + if (Interlocked.CompareExchange(ref upstream, d, null) != null) + { + d?.Dispose(); + } + } + + public void Dispose() + { + innerObserver.Dispose(); + DisposeMain(); + } + + void DisposeMain() + { + Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose(); + } + + bool IsDisposed() + { + return Volatile.Read(ref upstream) == BooleanDisposable.True; + } + + public void OnCompleted() + { + Volatile.Write(ref done, true); + Drain(); + } + + public void OnError(Exception error) + { + if (Interlocked.CompareExchange(ref error, error, null) == null) + { + Volatile.Write(ref done, true); + Drain(); + } + } + + public void OnNext(IObservable value) + { + queue.Enqueue(value); + Drain(); + } + + void InnerNext(T item) + { + downstream.OnNext(item); + } + + void InnerError(Exception error) + { + if (innerObserver.Finish()) + { + if (Interlocked.CompareExchange(ref error, error, null) == null) + { + Volatile.Write(ref done, true); + Volatile.Write(ref active, 0); + Drain(); + } + } + } + + void InnerComplete() + { + if (innerObserver.Finish()) + { + Volatile.Write(ref active, 0); + Drain(); + } + } + + void Drain() + { + if (Interlocked.Increment(ref trampoline) != 1) + { + return; + } + + do + { + if (IsDisposed()) + { + while (queue.TryDequeue(out var _)) ; + } + else + { + if (Volatile.Read(ref active) == 0) + { + var isDone = Volatile.Read(ref done); + + if (isDone) + { + var ex = Volatile.Read(ref error); + if (ex != null) + { + downstream.OnError(ex); + DisposeMain(); + continue; + } + } + + if (queue.TryDequeue(out var source)) + { + var sad = new SingleAssignmentDisposable(); + if (innerObserver.SetDisposable(sad)) + { + Interlocked.Exchange(ref active, 1); + sad.Disposable = source.SubscribeSafe(innerObserver); + } + } + else + { + if (isDone) + { + downstream.OnCompleted(); + DisposeMain(); + } + } + } + } + } while (Interlocked.Decrement(ref trampoline) != 0); + } + + internal sealed class InnerObserver : IObserver, IDisposable + { + readonly ConcatManyOuterObserver parent; + + internal SingleAssignmentDisposable upstream; + + static readonly SingleAssignmentDisposable DISPOSED; + + static InnerObserver() + { + DISPOSED = new SingleAssignmentDisposable(); + DISPOSED.Dispose(); + } + + internal InnerObserver(ConcatManyOuterObserver parent) + { + this.parent = parent; + } + + internal bool SetDisposable(SingleAssignmentDisposable sad) + { + return Interlocked.CompareExchange(ref upstream, sad, null) == null; + } + + internal bool Finish() + { + var sad = Volatile.Read(ref upstream); + if (sad != DISPOSED) + { + if (Interlocked.CompareExchange(ref upstream, null, sad) == sad) + { + sad.Dispose(); + return true; + } + } + return false; + } + + public void Dispose() + { + Interlocked.Exchange(ref upstream, DISPOSED)?.Dispose(); + } + + public void OnCompleted() + { + parent.InnerComplete(); + } + + public void OnError(Exception error) + { + parent.InnerError(error); + } + + public void OnNext(T value) + { + parent.InnerNext(value); + } + } + } + } +} diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Multiple.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Multiple.cs index 7d42abf259..67f9e94ca3 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Multiple.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Multiple.cs @@ -230,7 +230,7 @@ public virtual IObservable Concat(IObservable> s private IObservable Concat_(IObservable> sources) { - return Merge(sources, 1); + return new ConcatMany(sources); } #endregion From 9b1ac947a45e49a7e7c1d242dce822be8e642141 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 4 May 2018 15:42:17 +0200 Subject: [PATCH 2/3] Add license header. --- .../src/System.Reactive/Linq/Observable/ConcatMany.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs index c92bb52125..60dc3b1742 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs @@ -1,8 +1,9 @@ -using System; +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + using System.Collections.Concurrent; -using System.Collections.Generic; using System.Reactive.Disposables; -using System.Text; using System.Threading; namespace System.Reactive.Linq.ObservableImpl From c3bb5946f7184d10657f8d955b68c0e2beda3164 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Fri, 18 May 2018 19:12:13 +0200 Subject: [PATCH 3/3] Fix error handling --- .../Source/src/System.Reactive/Linq/Observable/ConcatMany.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs index 60dc3b1742..f0d1cb8de0 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs @@ -82,7 +82,7 @@ public void OnCompleted() public void OnError(Exception error) { - if (Interlocked.CompareExchange(ref error, error, null) == null) + if (Interlocked.CompareExchange(ref this.error, error, null) == null) { Volatile.Write(ref done, true); Drain(); @@ -104,7 +104,7 @@ void InnerError(Exception error) { if (innerObserver.Finish()) { - if (Interlocked.CompareExchange(ref error, error, null) == null) + if (Interlocked.CompareExchange(ref this.error, error, null) == null) { Volatile.Write(ref done, true); Volatile.Write(ref active, 0);