Skip to content

Commit

Permalink
4.x: Add the IConnectableObservable.AutoConnect() operator (#497)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored and Oren Novotny committed May 26, 2018
1 parent 10a44ad commit 3964f9f
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 1 deletion.
1 change: 1 addition & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ internal interface IQueryLanguage
IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, int bufferSize, TimeSpan window);
IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, int bufferSize, TimeSpan window, IScheduler scheduler);
IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, int bufferSize, TimeSpan window, IScheduler scheduler);
IObservable<TSource> AutoConnect<TSource>(IConnectableObservable<TSource> source, int minObservers, Action<IDisposable> onConnect);

#endregion

Expand Down
22 changes: 22 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,28 @@ public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable

#endregion

#region + AutoConnect +

/// <summary>
/// Automatically connect the upstream IConnectableObservable at most once when the
/// specified number of IObservers have subscribed to this IObservable.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minObservers">The number of observers required to subscribe before the connection to source happens, non-positive value will trigger an immediate subscription.</param>
/// <param name="onConnect">If not null, the connection's IDisposable is provided to it.</param>
/// <returns>An observable sequence that connects to the source at most once when the given number of observers have subscribed to it.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> AutoConnect<TSource>(this IConnectableObservable<TSource> source, int minObservers = 1, Action<IDisposable> onConnect = null)
{
if (source == null)
throw new ArgumentNullException(nameof(source));

return s_impl.AutoConnect(source, minObservers, onConnect);
}

#endregion

#region + Replay +

/// <summary>
Expand Down
47 changes: 47 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/AutoConnect.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Reactive.Subjects;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
/// <summary>
/// Automatically connect the upstream IConnectableObservable once the
/// specified number of IObservers have subscribed to this IObservable.
/// </summary>
/// <typeparam name="T">The upstream value type.</typeparam>
internal sealed class AutoConnect<T> : IObservable<T>
{
readonly IConnectableObservable<T> source;

readonly int minObservers;

readonly Action<IDisposable> onConnect;

int count;

internal AutoConnect(IConnectableObservable<T> source, int minObservers, Action<IDisposable> onConnect)
{
this.source = source;
this.minObservers = minObservers;
this.onConnect = onConnect;
}

public IDisposable Subscribe(IObserver<T> observer)
{
var d = source.Subscribe(observer);

if (Volatile.Read(ref count) < minObservers)
{
if (Interlocked.Increment(ref count) == minObservers)
{
var c = source.Connect();
onConnect?.Invoke(c);
}
}
return d;
}
}
}
35 changes: 34 additions & 1 deletion Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10627,7 +10627,40 @@ public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider pr
)
);
}


/// <summary>
/// Automatically connect the upstream IConnectableObservable at most once when the
/// specified number of IObservers have subscribed to this IObservable.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minObservers">The number of observers required to subscribe before the connection to source happens, non-positive value will trigger an immediate subscription.</param>
/// <param name="onConnect">If not null, the connection's IDisposable is provided to it.</param>
/// <returns>An observable sequence that connects to the source at most once when the given number of observers have subscribed to it.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IQbservable<TSource> AutoConnect<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, int minObservers, Action<IDisposable> onConnect)
{
if (provider == null)
throw new ArgumentNullException(nameof(provider));
if (source == null)
throw new ArgumentNullException(nameof(source));

return provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.AutoConnect<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(int), default(Action<IDisposable>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
Expression.Constant(provider, typeof(IQbservableProvider)),
Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
Expression.Constant(minObservers, typeof(int)),
Expression.Constant(onConnect, typeof(Action<IDisposable>))
)
);
}

/// <summary>
/// Generates an observable sequence that repeats the given element infinitely.
/// </summary>
Expand Down
17 changes: 17 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSo
return new RefCount<TSource>(source);
}

#endregion

#region + AutoConnect +

public virtual IObservable<TSource> AutoConnect<TSource>(IConnectableObservable<TSource> source, int minObservers = 1, Action<IDisposable> onConnect = null)
{
if (minObservers <= 0)
{
var d = source.Connect();
onConnect?.Invoke(d);
return source;
}

return new AutoConnect<TSource>(source, minObservers, onConnect);
}


#endregion

#region + Replay +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using Xunit;
using ReactiveTests.Dummies;
using System.Reflection;
using System.Reactive.Subjects;

namespace ReactiveTests.Tests
{
public class ObservableAutoConnectTest : ReactiveTest
{
[Fact]
public void AutoConnect_Basic()
{
int called = 0;

var source = Observable.Defer(() =>
{
called++;
return Observable.Range(1, 5);
})
.Replay()
.AutoConnect();

Assert.Equal(0, called);

var list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);

list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
}

[Fact]
public void AutoConnect_Immediately()
{
int called = 0;

var source = Observable.Defer(() =>
{
called++;
return Observable.Range(1, 5);
})
.Replay()
.AutoConnect(0);

Assert.Equal(1, called);

var list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);

list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
}

[Fact]
public void AutoConnect_TwoConsumers()
{
int called = 0;

var source = Observable.Defer(() =>
{
called++;
return Observable.Range(1, 5);
})
.Replay()
.AutoConnect(2);

Assert.Equal(0, called);

var list0 = new List<int>();

source.Subscribe(v => list0.Add(v));

Assert.Equal(0, called);
Assert.Equal(0, list0.Count);

var list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);

Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list0);

list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
}

[Fact]
public void AutoConnect_Dispose()
{
var subject = new Subject<int>();

var disposable = new IDisposable[1];

var source = subject
.Replay()
.AutoConnect(1, d => disposable[0] = d);

Assert.Null(disposable[0]);

var list = new List<int>();

source.Subscribe(v => list.Add(v));

Assert.NotNull(disposable[0]);

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);

disposable[0].Dispose();

subject.OnNext(4);
subject.OnNext(5);

Assert.Equal(new List<int>() { 1, 2, 3 }, list);

}
}
}

0 comments on commit 3964f9f

Please sign in to comment.