Skip to content

Commit

Permalink
Added lazy RefCount operator for IObservables. Lazy RefCount connects…
Browse files Browse the repository at this point in the history
… like RefCount but may delay disconnection. This is useful whenever a lot of connect/disconnect cycles are expected within a short timespan but with a significant overhead in connecting/disconnecting. Some unit tests have been added. Lazy RefCount has been excluded from methods that must be present for Qbservable as well. I leave it up to others to decide what Lazy RefCount means for Qbservable and whether there should be an implementation. (#133)
  • Loading branch information
danielcweber authored Jun 12, 2018
1 parent 24fb6db commit bdb5413
Show file tree
Hide file tree
Showing 7 changed files with 429 additions and 48 deletions.
2 changes: 2 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ internal interface IQueryLanguage
IConnectableObservable<TSource> PublishLast<TSource>(IObservable<TSource> source);
IObservable<TResult> PublishLast<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectDelay);
IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler schedulder);
IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source);
IConnectableObservable<TSource> Replay<TSource>(IObservable<TSource> source, IScheduler scheduler);
IObservable<TResult> Replay<TSource, TResult>(IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> selector);
Expand Down
42 changes: 42 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Binding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,48 @@ public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable
return s_impl.RefCount<TSource>(source);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, TimeSpan disconnectDelay)
{
if (source == null)
throw new ArgumentNullException("source");

if (disconnectDelay < TimeSpan.Zero)
throw new ArgumentException("disconnectDelay");

return s_impl.RefCount<TSource>(source, disconnectDelay);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <param name="scheduler">The scheduler to use for delayed unsubscription.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> RefCount<TSource>(this IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");

if (scheduler == null)
throw new ArgumentNullException("scheduler");

if (disconnectDelay < TimeSpan.Zero)
throw new ArgumentException("disconnectDelay");

return s_impl.RefCount<TSource>(source, disconnectDelay, scheduler);
}

#endregion

#region + AutoConnect +
Expand Down
171 changes: 126 additions & 45 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,159 @@
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;

namespace System.Reactive.Linq.ObservableImpl
{
internal sealed class RefCount<TSource> : Producer<TSource, RefCount<TSource>._>
internal static class RefCount<TSource>
{
private readonly IConnectableObservable<TSource> _source;

private readonly object _gate;
private int _count;
private IDisposable _connectableSubscription;

public RefCount(IConnectableObservable<TSource> source)
internal sealed class Eager : Producer<TSource, Eager._>
{
_source = source;
_gate = new object();
_count = 0;
_connectableSubscription = default(IDisposable);
}
private readonly IConnectableObservable<TSource> _source;

protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);

protected override void Run(_ sink) => sink.Run();

internal sealed class _ : IdentitySink<TSource>
{
readonly RefCount<TSource> _parent;
private readonly object _gate;
private int _count;
private IDisposable _connectableSubscription;

public _(IObserver<TSource> observer, RefCount<TSource> parent)
: base(observer)
public Eager(IConnectableObservable<TSource> source)
{
this._parent = parent;
_source = source;
_gate = new object();
_count = 0;
_connectableSubscription = default(IDisposable);
}

public void Run()
protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);

protected override void Run(_ sink) => sink.Run();

internal sealed class _ : IdentitySink<TSource>
{
base.Run(_parent._source);
readonly Eager _parent;

public _(IObserver<TSource> observer, Eager parent)
: base(observer)
{
this._parent = parent;
}

lock (_parent._gate)
public void Run()
{
if (++_parent._count == 1)
base.Run(_parent._source);

lock (_parent._gate)
{
// We need to set _connectableSubscription to something
// before Connect because if Connect terminates synchronously,
// Dispose(bool) gets executed and will try to dispose
// _connectableSubscription of null.
// ?.Dispose() is no good because the dispose action has to be
// executed anyway.
// We can't inline SAD either because the IDisposable of Connect
// may belong to the wrong connection.
var sad = new SingleAssignmentDisposable();
_parent._connectableSubscription = sad;

sad.Disposable = _parent._source.Connect();
if (++_parent._count == 1)
{
// We need to set _connectableSubscription to something
// before Connect because if Connect terminates synchronously,
// Dispose(bool) gets executed and will try to dispose
// _connectableSubscription of null.
// ?.Dispose() is no good because the dispose action has to be
// executed anyway.
// We can't inline SAD either because the IDisposable of Connect
// may belong to the wrong connection.
var sad = new SingleAssignmentDisposable();
_parent._connectableSubscription = sad;

sad.Disposable = _parent._source.Connect();
}
}
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
{
lock (_parent._gate)
{
if (--_parent._count == 0)
{
_parent._connectableSubscription.Dispose();
}
}
}
}
}
}

internal sealed class Lazy : Producer<TSource, Lazy._>
{
private readonly object _gate;
private readonly IScheduler _scheduler;
private readonly TimeSpan _disconnectTime;
private readonly IConnectableObservable<TSource> _source;
private readonly SerialDisposable _serial = new SerialDisposable();

private int _count;
private IDisposable _connectableSubscription;

protected override void Dispose(bool disposing)
public Lazy(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
{
base.Dispose(disposing);
_source = source;
_gate = new object();
_disconnectTime = disconnectTime;
_scheduler = scheduler;
_count = 0;
_connectableSubscription = default(IDisposable);
}

if (disposing)
protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);

protected override void Run(_ sink) => sink.Run(this);

internal sealed class _ : IdentitySink<TSource>
{
public _(IObserver<TSource> observer)
: base(observer)
{
lock (_parent._gate)
}

public void Run(Lazy parent)
{
var subscription = parent._source.SubscribeSafe(this);

lock (parent._gate)
{
if (--_parent._count == 0)
if (++parent._count == 1)
{
_parent._connectableSubscription.Dispose();
if (parent._connectableSubscription == null)
parent._connectableSubscription = parent._source.Connect();

parent._serial.Disposable = new SingleAssignmentDisposable();
}
}

SetUpstream(Disposable.Create(() =>
{
subscription.Dispose();
lock (parent._gate)
{
if (--parent._count == 0)
{
var cancelable = (SingleAssignmentDisposable)parent._serial.Disposable;
cancelable.Disposable = parent._scheduler.Schedule(cancelable, parent._disconnectTime, (self, state) =>
{
lock (parent._gate)
{
if (object.ReferenceEquals(parent._serial.Disposable, state))
{
parent._connectableSubscription.Dispose();
parent._connectableSubscription = null;
}
}
return Disposable.Empty;
});
}
}
}));
}
}
}
Expand Down
72 changes: 70 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* WARNING: Auto-generated file (05/28/2018 22:20:18)
/*
* WARNING: Auto-generated file (06/12/2018 13:00:48)
* Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
*/

Expand Down Expand Up @@ -10667,6 +10667,74 @@ public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider pr
);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, TimeSpan disconnectDelay)
{
if (provider == null)
throw new ArgumentNullException(nameof(provider));
if (source == null)
throw new ArgumentNullException(nameof(source));

return provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(TimeSpan))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
Expression.Constant(provider, typeof(IQbservableProvider)),
Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
Expression.Constant(disconnectDelay, typeof(TimeSpan))
)
);
}

/// <summary>
/// Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
/// </summary>
/// <param name="provider">Query provider used to construct the <see cref="IQbservable{T}"/> data source.</param>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Connectable observable sequence.</param>
/// <param name="disconnectDelay">The time span that should be waited before possibly unsubscribing from the connectable observable.</param>
/// <param name="scheduler">The scheduler to use for delayed unsubscription.</param>
/// <returns>An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
public static IQbservable<TSource> RefCount<TSource>(this IQbservableProvider provider, IConnectableObservable<TSource> source, TimeSpan disconnectDelay, IScheduler scheduler)
{
if (provider == null)
throw new ArgumentNullException(nameof(provider));
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.RefCount<TSource>(default(IQbservableProvider), default(IConnectableObservable<TSource>), default(TimeSpan), default(IScheduler))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
Expression.Constant(provider, typeof(IQbservableProvider)),
Expression.Constant(source, typeof(IConnectableObservable<TSource>)),
Expression.Constant(disconnectDelay, typeof(TimeSpan)),
Expression.Constant(scheduler, typeof(IScheduler))
)
);
}

/// <summary>
/// Generates an observable sequence that repeats the given element infinitely.
/// </summary>
Expand Down
12 changes: 11 additions & 1 deletion Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Binding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,17 @@ public virtual IObservable<TResult> PublishLast<TSource, TResult>(IObservable<TS

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source)
{
return new RefCount<TSource>(source);
return new RefCount<TSource>.Eager(source);
}

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectTime)
{
return RefCount(source, disconnectTime, Scheduler.Default);
}

public virtual IObservable<TSource> RefCount<TSource>(IConnectableObservable<TSource> source, TimeSpan disconnectTime, IScheduler scheduler)
{
return new RefCount<TSource>.Lazy(source, disconnectTime, scheduler);
}

#endregion
Expand Down
Loading

0 comments on commit bdb5413

Please sign in to comment.