Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Add the IConnectableObservable.AutoConnect() operator #497

Merged
merged 1 commit into from
May 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ internal interface IQueryLanguage
IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, int bufferSize, TimeSpan window);
IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, int bufferSize, TimeSpan window, IScheduler scheduler);
IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector, int bufferSize, TimeSpan window, IScheduler scheduler);
IObservable<TSource> AutoConnect<TSource>(IConnectableObservable<TSource> source, int minObservers, Action<IDisposable> onConnect);

#endregion

Expand Down
22 changes: 22 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,28 @@ public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable

#endregion

#region + AutoConnect +

/// <summary>
/// Automatically connect the upstream IConnectableObservable at most once when the
/// specified number of IObservers have subscribed to this IObservable.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minObservers">The number of observers required to subscribe before the connection to source happens, non-positive value will trigger an immediate subscription.</param>
/// <param name="onConnect">If not null, the connection's IDisposable is provided to it.</param>
/// <returns>An observable sequence that connects to the source at most once when the given number of observers have subscribed to it.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> AutoConnect<TSource>(this IConnectableObservable<TSource> source, int minObservers = 1, Action<IDisposable> onConnect = null)
{
if (source == null)
throw new ArgumentNullException(nameof(source));

return s_impl.AutoConnect(source, minObservers, onConnect);
}

#endregion

#region + Replay +

/// <summary>
Expand Down
47 changes: 47 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/AutoConnect.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Reactive.Subjects;
using System.Threading;

namespace System.Reactive.Linq.ObservableImpl
{
/// <summary>
/// Automatically connect the upstream IConnectableObservable once the
/// specified number of IObservers have subscribed to this IObservable.
/// </summary>
/// <typeparam name="T">The upstream value type.</typeparam>
internal sealed class AutoConnect<T> : IObservable<T>
{
readonly IConnectableObservable<T> source;

readonly int minObservers;

readonly Action<IDisposable> onConnect;

int count;

internal AutoConnect(IConnectableObservable<T> source, int minObservers, Action<IDisposable> onConnect)
{
this.source = source;
this.minObservers = minObservers;
this.onConnect = onConnect;
}

public IDisposable Subscribe(IObserver<T> observer)
{
var d = source.Subscribe(observer);

if (Volatile.Read(ref count) < minObservers)
{
if (Interlocked.Increment(ref count) == minObservers)
{
var c = source.Connect();
onConnect?.Invoke(c);
}
}
return d;
}
}
}
37 changes: 35 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* WARNING: Auto-generated file (5/1/2015 21:21:20)
* Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
*/
Expand Down Expand Up @@ -10627,7 +10627,40 @@ public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider pr
)
);
}


/// <summary>
/// Automatically connect the upstream IConnectableObservable at most once when the
/// specified number of IObservers have subscribed to this IObservable.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="minObservers">The number of observers required to subscribe before the connection to source happens, non-positive value will trigger an immediate subscription.</param>
/// <param name="onConnect">If not null, the connection's IDisposable is provided to it.</param>
/// <returns>An observable sequence that connects to the source at most once when the given number of observers have subscribed to it.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IQbservable<TSource> AutoConnect<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, int minObservers, Action<IDisposable> onConnect)
{
if (provider == null)
throw new ArgumentNullException(nameof(provider));
if (source == null)
throw new ArgumentNullException(nameof(source));

return provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.AutoConnect<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(int), default(Action<IDisposable>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
Expression.Constant(provider, typeof(IQbservableProvider)),
Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
Expression.Constant(minObservers, typeof(int)),
Expression.Constant(onConnect, typeof(Action<IDisposable>))
)
);
}

/// <summary>
/// Generates an observable sequence that repeats the given element infinitely.
/// </summary>
Expand Down
17 changes: 17 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSo
return new RefCount<TSource>(source);
}

#endregion

#region + AutoConnect +

public virtual IObservable<TSource> AutoConnect<TSource>(IConnectableObservable<TSource> source, int minObservers = 1, Action<IDisposable> onConnect = null)
{
if (minObservers <= 0)
{
var d = source.Connect();
onConnect?.Invoke(d);
return source;
}

return new AutoConnect<TSource>(source, minObservers, onConnect);
}


#endregion

#region + Replay +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using Xunit;
using ReactiveTests.Dummies;
using System.Reflection;
using System.Reactive.Subjects;

namespace ReactiveTests.Tests
{
public class ObservableAutoConnectTest : ReactiveTest
{
[Fact]
public void AutoConnect_Basic()
{
int called = 0;

var source = Observable.Defer(() =>
{
called++;
return Observable.Range(1, 5);
})
.Replay()
.AutoConnect();

Assert.Equal(0, called);

var list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);

list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
}

[Fact]
public void AutoConnect_Immediately()
{
int called = 0;

var source = Observable.Defer(() =>
{
called++;
return Observable.Range(1, 5);
})
.Replay()
.AutoConnect(0);

Assert.Equal(1, called);

var list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);

list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
}

[Fact]
public void AutoConnect_TwoConsumers()
{
int called = 0;

var source = Observable.Defer(() =>
{
called++;
return Observable.Range(1, 5);
})
.Replay()
.AutoConnect(2);

Assert.Equal(0, called);

var list0 = new List<int>();

source.Subscribe(v => list0.Add(v));

Assert.Equal(0, called);
Assert.Equal(0, list0.Count);

var list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);

Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list0);

list = source.ToList().First();

Assert.Equal(1, called);
Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, list);
}

[Fact]
public void AutoConnect_Dispose()
{
var subject = new Subject<int>();

var disposable = new IDisposable[1];

var source = subject
.Replay()
.AutoConnect(1, d => disposable[0] = d);

Assert.Null(disposable[0]);

var list = new List<int>();

source.Subscribe(v => list.Add(v));

Assert.NotNull(disposable[0]);

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);

disposable[0].Dispose();

subject.OnNext(4);
subject.OnNext(5);

Assert.Equal(new List<int>() { 1, 2, 3 }, list);

}
}
}