Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Optimize Concat(IObservable<IObservable<T>>) #491

Merged
merged 3 commits into from
May 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 237 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Collections.Concurrent;
using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class ConcatMany<T> : IObservable<T>
{
readonly IObservable<IObservable<T>> sources;

internal ConcatMany(IObservable<IObservable<T>> sources)
{
this.sources = sources;
}

public IDisposable Subscribe(IObserver<T> observer)
{
var parent = new ConcatManyOuterObserver(observer);
var d = sources.SubscribeSafe(parent);
parent.OnSubscribe(d);
return parent;
}

internal sealed class ConcatManyOuterObserver : IObserver<IObservable<T>>, IDisposable
{
readonly IObserver<T> downstream;

readonly ConcurrentQueue<IObservable<T>> queue;

readonly InnerObserver innerObserver;

IDisposable upstream;

int trampoline;

Exception error;

bool done;

int active;

internal ConcatManyOuterObserver(IObserver<T> downstream)
{
this.downstream = downstream;
this.queue = new ConcurrentQueue<IObservable<T>>();
this.innerObserver = new InnerObserver(this);
}

internal void OnSubscribe(IDisposable d)
{
if (Interlocked.CompareExchange(ref upstream, d, null) != null)
{
d?.Dispose();
}
}

public void Dispose()
{
innerObserver.Dispose();
DisposeMain();
}

void DisposeMain()
{
Interlocked.Exchange(ref upstream, BooleanDisposable.True)?.Dispose();
}

bool IsDisposed()
{
return Volatile.Read(ref upstream) == BooleanDisposable.True;
}

public void OnCompleted()
{
Volatile.Write(ref done, true);
Drain();
}

public void OnError(Exception error)
{
if (Interlocked.CompareExchange(ref this.error, error, null) == null)
{
Volatile.Write(ref done, true);
Drain();
}
}

public void OnNext(IObservable<T> value)
{
queue.Enqueue(value);
Drain();
}

void InnerNext(T item)
{
downstream.OnNext(item);
}

void InnerError(Exception error)
{
if (innerObserver.Finish())
{
if (Interlocked.CompareExchange(ref this.error, error, null) == null)
{
Volatile.Write(ref done, true);
Volatile.Write(ref active, 0);
Drain();
}
}
}

void InnerComplete()
{
if (innerObserver.Finish())
{
Volatile.Write(ref active, 0);
Drain();
}
}

void Drain()
{
if (Interlocked.Increment(ref trampoline) != 1)
{
return;
}

do
{
if (IsDisposed())
{
while (queue.TryDequeue(out var _)) ;
}
else
{
if (Volatile.Read(ref active) == 0)
{
var isDone = Volatile.Read(ref done);

if (isDone)
{
var ex = Volatile.Read(ref error);
if (ex != null)
{
downstream.OnError(ex);
DisposeMain();
continue;
}
}

if (queue.TryDequeue(out var source))
{
var sad = new SingleAssignmentDisposable();
if (innerObserver.SetDisposable(sad))
{
Interlocked.Exchange(ref active, 1);
sad.Disposable = source.SubscribeSafe(innerObserver);
}
}
else
{
if (isDone)
{
downstream.OnCompleted();
DisposeMain();
}
}
}
}
} while (Interlocked.Decrement(ref trampoline) != 0);
}

internal sealed class InnerObserver : IObserver<T>, IDisposable
{
readonly ConcatManyOuterObserver parent;

internal SingleAssignmentDisposable upstream;

static readonly SingleAssignmentDisposable DISPOSED;

static InnerObserver()
{
DISPOSED = new SingleAssignmentDisposable();
DISPOSED.Dispose();
}

internal InnerObserver(ConcatManyOuterObserver parent)
{
this.parent = parent;
}

internal bool SetDisposable(SingleAssignmentDisposable sad)
{
return Interlocked.CompareExchange(ref upstream, sad, null) == null;
}

internal bool Finish()
{
var sad = Volatile.Read(ref upstream);
if (sad != DISPOSED)
{
if (Interlocked.CompareExchange(ref upstream, null, sad) == sad)
{
sad.Dispose();
return true;
}
}
return false;
}

public void Dispose()
{
Interlocked.Exchange(ref upstream, DISPOSED)?.Dispose();
}

public void OnCompleted()
{
parent.InnerComplete();
}

public void OnError(Exception error)
{
parent.InnerError(error);
}

public void OnNext(T value)
{
parent.InnerNext(value);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public virtual IObservable<TSource> Concat<TSource>(IObservable<Task<TSource>> s

private IObservable<TSource> Concat_<TSource>(IObservable<IObservable<TSource>> sources)
{
return Merge(sources, 1);
return new ConcatMany<TSource>(sources);
}

#endregion
Expand Down