Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Improve the performance of ToObservable() #705

Merged
merged 4 commits into from
Jun 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ private static void Main()
typeof(SwitchBenchmark),
typeof(BufferCountBenchmark),
typeof(RangeBenchmark),
typeof(ToObservableBenchmark),
typeof(RepeatBenchmark),
typeof(AppendPrependBenchmark)
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using BenchmarkDotNet.Attributes;

namespace Benchmarks.System.Reactive
{
[MemoryDiagnoser]
public class ToObservableBenchmark
{
[Params(1, 10, 100, 1000, 10000, 100000, 1000000)]
public int N;

int _store;

[Benchmark]
public void Exact()
{
Enumerable.Range(1, N)
.ToObservable()
.Subscribe(v => Volatile.Write(ref _store, v));
}
}
}
134 changes: 86 additions & 48 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/ToObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,41 @@
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class ToObservable<TSource> : Producer<TSource, ToObservable<TSource>._>
internal sealed class ToObservableRecursive<TSource> : Producer<TSource, ToObservableRecursive<TSource>._>
{
private readonly IEnumerable<TSource> _source;
private readonly IScheduler _scheduler;

public ToObservable(IEnumerable<TSource> source, IScheduler scheduler)
public ToObservableRecursive(IEnumerable<TSource> source, IScheduler scheduler)
{
_source = source;
_scheduler = scheduler;
}

protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);

protected override void Run(_ sink) => sink.Run(this);
protected override void Run(_ sink) => sink.Run(_source, _scheduler);

internal sealed class _ : IdentitySink<TSource>
{
IEnumerator<TSource> _enumerator;

volatile bool _disposed;

public _(IObserver<TSource> observer)
: base(observer)
{
}

public void Run(ToObservable<TSource> parent)
public void Run(IEnumerable<TSource> source, IScheduler scheduler)
{
var e = default(IEnumerator<TSource>);
try
{
e = parent._source.GetEnumerator();
_enumerator = source.GetEnumerator();
}
catch (Exception exception)
{
Expand All @@ -44,62 +48,45 @@ public void Run(ToObservable<TSource> parent)
return;
}

var longRunning = parent._scheduler.AsLongRunning();
if (longRunning != null)
{
//
// Long-running schedulers have the contract they should *never* prevent
// the work from starting, such that the scheduled work has the chance
// to observe the cancellation and perform proper clean-up. In this case,
// we're sure Loop will be entered, allowing us to dispose the enumerator.
//
SetUpstream(longRunning.ScheduleLongRunning((@this: this, e), (tuple, cancelable) => [email protected](tuple.e, cancelable)));
}
else
{
//
// We never allow the scheduled work to be cancelled. Instead, the flag
// is used to have LoopRec bail out and perform proper clean-up of the
// enumerator.
//
var flag = new BooleanDisposable();
parent._scheduler.Schedule(new State(this, flag, e), (state, action) => state.sink.LoopRec(state, action));
SetUpstream(flag);
}
//
// We never allow the scheduled work to be cancelled. Instead, the _disposed flag
// is used to have LoopRec bail out and perform proper clean-up of the
// enumerator.
//
scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
}

private struct State
protected override void Dispose(bool disposing)
{
public readonly _ sink;
public readonly ICancelable flag;
public readonly IEnumerator<TSource> enumerator;

public State(_ sink, ICancelable flag, IEnumerator<TSource> enumerator)
base.Dispose(disposing);
if (disposing)
{
this.sink = sink;
this.flag = flag;
this.enumerator = enumerator;
_disposed = true;
}
}

private void LoopRec(State state, Action<State> recurse)
private IDisposable LoopRec(IScheduler scheduler)
{
var hasNext = false;
var ex = default(Exception);
var current = default(TSource);

if (state.flag.IsDisposed)
var enumerator = _enumerator;

if (_disposed)
{
state.enumerator.Dispose();
return;
_enumerator.Dispose();
_enumerator = null;

return Disposable.Empty;
}

try
{
hasNext = state.enumerator.MoveNext();
hasNext = enumerator.MoveNext();
if (hasNext)
{
current = state.enumerator.Current;
current = enumerator.Current;
}
}
catch (Exception exception)
Expand All @@ -109,22 +96,73 @@ private void LoopRec(State state, Action<State> recurse)

if (ex != null)
{
state.enumerator.Dispose();
enumerator.Dispose();
_enumerator = null;

ForwardOnError(ex);
return;
return Disposable.Empty;
}

if (!hasNext)
{
state.enumerator.Dispose();
enumerator.Dispose();
_enumerator = null;

ForwardOnCompleted();
return;
return Disposable.Empty;
}

ForwardOnNext(current);
recurse(state);

//
// We never allow the scheduled work to be cancelled. Instead, the _disposed flag
// is used to have LoopRec bail out and perform proper clean-up of the
// enumerator.
//
scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));

return Disposable.Empty;
}
}
}

internal sealed class ToObservableLongRunning<TSource> : Producer<TSource, ToObservableLongRunning<TSource>._>
{
private readonly IEnumerable<TSource> _source;
private readonly ISchedulerLongRunning _scheduler;

public ToObservableLongRunning(IEnumerable<TSource> source, ISchedulerLongRunning scheduler)
{
_source = source;
_scheduler = scheduler;
}

protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);

protected override void Run(_ sink) => sink.Run(_source, _scheduler);

internal sealed class _ : IdentitySink<TSource>
{
public _(IObserver<TSource> observer)
: base(observer)
{
}

public void Run(IEnumerable<TSource> source, ISchedulerLongRunning scheduler)
{
var e = default(IEnumerator<TSource>);
try
{
e = source.GetEnumerator();
}
catch (Exception exception)
{
ForwardOnError(exception);

return;
}

SetUpstream(scheduler.ScheduleLongRunning((@this: this, e), (tuple, cancelable) => [email protected](tuple.e, cancelable)));
}

private void Loop(IEnumerator<TSource> enumerator, ICancelable cancel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObse

private static IDisposable Subscribe_<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)
{
var longRunning = scheduler.AsLongRunning();
if (longRunning != null)
{
//
// [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
//
return new ToObservableLongRunning<TSource>(source, longRunning).Subscribe/*Unsafe*/(observer);
}
//
// [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
//
return new ToObservable<TSource>(source, scheduler).Subscribe/*Unsafe*/(observer);
return new ToObservableRecursive<TSource>(source, scheduler).Subscribe/*Unsafe*/(observer);
}

#endregion
Expand Down Expand Up @@ -72,12 +80,28 @@ public virtual IEventPatternSource<TEventArgs> ToEventPattern<TEventArgs>(IObser

public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source)
{
return new ToObservable<TSource>(source, SchedulerDefaults.Iteration);
return ToObservable_(source, SchedulerDefaults.Iteration);
}

public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
{
return new ToObservable<TSource>(source, scheduler);
return ToObservable_(source, scheduler);
}

private static IObservable<TSource> ToObservable_<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
{
var longRunning = scheduler.AsLongRunning();
if (longRunning != null)
{
//
// [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
//
return new ToObservableLongRunning<TSource>(source, longRunning);
}
//
// [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
//
return new ToObservableRecursive<TSource>(source, scheduler);
}

#endregion
Expand Down