From 669590b6bcaba928aa6328b9a22214678990c47d Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 7 May 2018 14:25:49 +0200 Subject: [PATCH] 4.x: Add the IConnectableObservable.AutoConnect() operator --- .../System.Reactive/Linq/IQueryLanguage.cs | 1 + .../Linq/Observable.Binding.cs | 22 +++ .../Linq/Observable/AutoConnect.cs | 47 ++++++ .../Linq/Qbservable.Generated.cs | 37 ++++- .../Linq/QueryLanguage.Binding.cs | 17 +++ .../Tests/Linq/ObservableAutoConnectTest.cs | 140 ++++++++++++++++++ 6 files changed, 262 insertions(+), 2 deletions(-) create mode 100644 Rx.NET/Source/src/System.Reactive/Linq/Observable/AutoConnect.cs create mode 100644 Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/ObservableAutoConnectTest.cs diff --git a/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs b/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs index 4cfd66bf36..655cfccefa 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs @@ -323,6 +323,7 @@ internal interface IQueryLanguage IObservable Replay(IObservable source, Func, IObservable> selector, int bufferSize, TimeSpan window); IConnectableObservable Replay(IObservable source, int bufferSize, TimeSpan window, IScheduler scheduler); IObservable Replay(IObservable source, Func, IObservable> selector, int bufferSize, TimeSpan window, IScheduler scheduler); + IObservable AutoConnect(IConnectableObservable source, int minObservers, Action onConnect); #endregion diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs index eb08f32915..efe8a39c91 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs @@ -205,6 +205,28 @@ public static IObservable RefCount(this IConnectableObservable #endregion + #region + AutoConnect + + + /// + /// Automatically connect the upstream IConnectableObservable at most once when the + /// specified number of IObservers have subscribed to this IObservable. + /// + /// The type of the elements in the source sequence. + /// Connectable observable sequence. + /// The number of observers required to subscribe before the connection to source happens, non-positive value will trigger an immediate subscription. + /// If not null, the connection's IDisposable is provided to it. + /// An observable sequence that connects to the source at most once when the given number of observers have subscribed to it. + /// is null. + public static IObservable AutoConnect(this IConnectableObservable source, int minObservers = 1, Action onConnect = null) + { + if (source == null) + throw new ArgumentNullException(nameof(source)); + + return s_impl.AutoConnect(source, minObservers, onConnect); + } + + #endregion + #region + Replay + /// diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/AutoConnect.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AutoConnect.cs new file mode 100644 index 0000000000..0c756438f9 --- /dev/null +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/AutoConnect.cs @@ -0,0 +1,47 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System.Reactive.Subjects; +using System.Threading; + +namespace System.Reactive.Linq.ObservableImpl +{ + /// + /// Automatically connect the upstream IConnectableObservable once the + /// specified number of IObservers have subscribed to this IObservable. + /// + /// The upstream value type. + internal sealed class AutoConnect : IObservable + { + readonly IConnectableObservable source; + + readonly int minObservers; + + readonly Action onConnect; + + int count; + + internal AutoConnect(IConnectableObservable source, int minObservers, Action onConnect) + { + this.source = source; + this.minObservers = minObservers; + this.onConnect = onConnect; + } + + public IDisposable Subscribe(IObserver observer) + { + var d = source.Subscribe(observer); + + if (Volatile.Read(ref count) < minObservers) + { + if (Interlocked.Increment(ref count) == minObservers) + { + var c = source.Connect(); + onConnect?.Invoke(c); + } + } + return d; + } + } +} diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs b/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs index 762a6c8d44..f3ad905622 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs @@ -1,4 +1,4 @@ -/* +/* * WARNING: Auto-generated file (5/1/2015 21:21:20) * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory). */ @@ -10627,7 +10627,40 @@ public static IQbservable RefCount(this IQbservableProvider pr ) ); } - + + /// + /// Automatically connect the upstream IConnectableObservable at most once when the + /// specified number of IObservers have subscribed to this IObservable. + /// + /// The type of the elements in the source sequence. + /// Connectable observable sequence. + /// The number of observers required to subscribe before the connection to source happens, non-positive value will trigger an immediate subscription. + /// If not null, the connection's IDisposable is provided to it. + /// An observable sequence that connects to the source at most once when the given number of observers have subscribed to it. + /// is null. + public static IQbservable AutoConnect(this IQbservableProvider provider, IConnectableObservable source, int minObservers, Action onConnect) + { + if (provider == null) + throw new ArgumentNullException(nameof(provider)); + if (source == null) + throw new ArgumentNullException(nameof(source)); + + return provider.CreateQuery( + Expression.Call( + null, +#if CRIPPLED_REFLECTION + InfoOf(() => Qbservable.AutoConnect(default(IQbservableProvider), default(IConnectableObservable), default(int), default(Action))), +#else + ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), +#endif + Expression.Constant(provider, typeof(IQbservableProvider)), + Expression.Constant(source, typeof(IConnectableObservable)), + Expression.Constant(minObservers, typeof(int)), + Expression.Constant(onConnect, typeof(Action)) + ) + ); + } + /// /// Generates an observable sequence that repeats the given element infinitely. /// diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs index dd69696813..cbaafa8556 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs @@ -71,6 +71,23 @@ public virtual IObservable RefCount(IConnectableObservable(source); } + #endregion + + #region + AutoConnect + + + public virtual IObservable AutoConnect(IConnectableObservable source, int minObservers = 1, Action onConnect = null) + { + if (minObservers <= 0) + { + var d = source.Connect(); + onConnect?.Invoke(d); + return source; + } + + return new AutoConnect(source, minObservers, onConnect); + } + + #endregion #region + Replay + diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/ObservableAutoConnectTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/ObservableAutoConnectTest.cs new file mode 100644 index 0000000000..c86c270518 --- /dev/null +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/ObservableAutoConnectTest.cs @@ -0,0 +1,140 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using Microsoft.Reactive.Testing; +using Xunit; +using ReactiveTests.Dummies; +using System.Reflection; +using System.Reactive.Subjects; + +namespace ReactiveTests.Tests +{ + public class ObservableAutoConnectTest : ReactiveTest + { + [Fact] + public void AutoConnect_Basic() + { + int called = 0; + + var source = Observable.Defer(() => + { + called++; + return Observable.Range(1, 5); + }) + .Replay() + .AutoConnect(); + + Assert.Equal(0, called); + + var list = source.ToList().First(); + + Assert.Equal(1, called); + Assert.Equal(new List() { 1, 2, 3, 4, 5 }, list); + + list = source.ToList().First(); + + Assert.Equal(1, called); + Assert.Equal(new List() { 1, 2, 3, 4, 5 }, list); + } + + [Fact] + public void AutoConnect_Immediately() + { + int called = 0; + + var source = Observable.Defer(() => + { + called++; + return Observable.Range(1, 5); + }) + .Replay() + .AutoConnect(0); + + Assert.Equal(1, called); + + var list = source.ToList().First(); + + Assert.Equal(1, called); + Assert.Equal(new List() { 1, 2, 3, 4, 5 }, list); + + list = source.ToList().First(); + + Assert.Equal(1, called); + Assert.Equal(new List() { 1, 2, 3, 4, 5 }, list); + } + + [Fact] + public void AutoConnect_TwoConsumers() + { + int called = 0; + + var source = Observable.Defer(() => + { + called++; + return Observable.Range(1, 5); + }) + .Replay() + .AutoConnect(2); + + Assert.Equal(0, called); + + var list0 = new List(); + + source.Subscribe(v => list0.Add(v)); + + Assert.Equal(0, called); + Assert.Equal(0, list0.Count); + + var list = source.ToList().First(); + + Assert.Equal(1, called); + Assert.Equal(new List() { 1, 2, 3, 4, 5 }, list); + + Assert.Equal(new List() { 1, 2, 3, 4, 5 }, list0); + + list = source.ToList().First(); + + Assert.Equal(1, called); + Assert.Equal(new List() { 1, 2, 3, 4, 5 }, list); + } + + [Fact] + public void AutoConnect_Dispose() + { + var subject = new Subject(); + + var disposable = new IDisposable[1]; + + var source = subject + .Replay() + .AutoConnect(1, d => disposable[0] = d); + + Assert.Null(disposable[0]); + + var list = new List(); + + source.Subscribe(v => list.Add(v)); + + Assert.NotNull(disposable[0]); + + subject.OnNext(1); + subject.OnNext(2); + subject.OnNext(3); + + disposable[0].Dispose(); + + subject.OnNext(4); + subject.OnNext(5); + + Assert.Equal(new List() { 1, 2, 3 }, list); + + } + } +}