Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review QueryLanguage.Creation #744

Merged
merged 4 commits into from
Jul 4, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 138 additions & 104 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,48 +68,61 @@ public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Can

private sealed class CreateWithTaskTokenObservable<TResult> : ObservableBase<TResult>
{
private readonly Func<IObserver<TResult>, CancellationToken, Task> _subscribeAsync;

public CreateWithTaskTokenObservable(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
{
_subscribeAsync = subscribeAsync;
}

protected override IDisposable SubscribeCore(IObserver<TResult> observer)
private sealed class Subscription : IDisposable
{
var cancellable = new CancellationDisposable();
private sealed class TaskCompletionObserver : IObserver<Unit>
{
private readonly IObserver<TResult> _observer;

var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
var taskCompletionObserver = new TaskCompletionObserver(observer);
var subscription = taskObservable.Subscribe(taskCompletionObserver);
public TaskCompletionObserver(IObserver<TResult> observer)
{
_observer = observer;
}

return StableCompositeDisposable.Create(cancellable, subscription);
}
public void OnCompleted()
{
_observer.OnCompleted();
}

private sealed class TaskCompletionObserver : IObserver<Unit>
{
private readonly IObserver<TResult> _observer;
public void OnError(Exception error)
{
_observer.OnError(error);
}

public TaskCompletionObserver(IObserver<TResult> observer)
{
_observer = observer;
public void OnNext(Unit value)
{
// deliberately ignored
}
}

public void OnCompleted()
{
_observer.OnCompleted();
}
private readonly IDisposable _subscription;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public void OnError(Exception error)
public Subscription(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync, IObserver<TResult> observer)
{
_observer.OnError(error);
_subscription = subscribeAsync(observer, _cts.Token)
.ToObservable()
.Subscribe(new TaskCompletionObserver(observer));
}

public void OnNext(Unit value)
public void Dispose()
{
// deliberately ignored
_cts.Cancel();
_subscription.Dispose();
}
}

private readonly Func<IObserver<TResult>, CancellationToken, Task> _subscribeAsync;

public CreateWithTaskTokenObservable(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
{
_subscribeAsync = subscribeAsync;
}

protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
return new Subscription(_subscribeAsync, observer);
}
}

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync)
Expand All @@ -124,59 +137,70 @@ public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Can

private sealed class CreateWithTaskDisposable<TResult> : ObservableBase<TResult>
{
private readonly Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> _subscribeAsync;

public CreateWithTaskDisposable(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
private sealed class Subscription : IDisposable
{
_subscribeAsync = subscribeAsync;
}
private sealed class TaskDisposeCompletionObserver : IObserver<IDisposable>, IDisposable
{
private readonly IObserver<TResult> _observer;
private IDisposable _disposable;

protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
var cancellable = new CancellationDisposable();
public TaskDisposeCompletionObserver(IObserver<TResult> observer)
{
_observer = observer;
}

var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
public void Dispose()
{
Disposable.TryDispose(ref _disposable);
}

var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);
public void OnCompleted()
{
_observer.OnCompleted();
}

//
// We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
taskObservable.Subscribe(taskCompletionObserver);
public void OnError(Exception error)
{
_observer.OnError(error);
}

return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
}
public void OnNext(IDisposable value)
{
Disposable.SetSingle(ref _disposable, value);
}
}

private sealed class TaskDisposeCompletionObserver : IObserver<IDisposable>, IDisposable
{
private readonly IObserver<TResult> _observer;
private IDisposable _disposable;
private readonly TaskDisposeCompletionObserver _observer;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public TaskDisposeCompletionObserver(IObserver<TResult> observer)
public Subscription(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync, IObserver<TResult> observer)
{
_observer = observer;
//
// We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
subscribeAsync(observer, _cts.Token)
.ToObservable()
.Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
}

public void Dispose()
{
Disposable.TryDispose(ref _disposable);
_cts.Cancel();
_observer.Dispose();
}
}

public void OnCompleted()
{
_observer.OnCompleted();
}
private readonly Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> _subscribeAsync;

public void OnError(Exception error)
{
_observer.OnError(error);
}
public CreateWithTaskDisposable(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
{
_subscribeAsync = subscribeAsync;
}

public void OnNext(IDisposable value)
{
Disposable.SetSingle(ref _disposable, value);
}
protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
return new Subscription(_subscribeAsync, observer);
}
}

Expand All @@ -192,63 +216,73 @@ public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Can

private sealed class CreateWithTaskActionObservable<TResult> : ObservableBase<TResult>
{
private readonly Func<IObserver<TResult>, CancellationToken, Task<Action>> _subscribeAsync;

public CreateWithTaskActionObservable(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
private sealed class Subscription : IDisposable
{
_subscribeAsync = subscribeAsync;
}
private sealed class TaskDisposeCompletionObserver : IObserver<Action>, IDisposable
{
private readonly IObserver<TResult> _observer;
private Action _disposable;

protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
var cancellable = new CancellationDisposable();
public TaskDisposeCompletionObserver(IObserver<TResult> observer)
{
_observer = observer;
}

var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
public void Dispose()
{
Interlocked.Exchange(ref _disposable, Stubs.Nop)?.Invoke();
}

var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);
public void OnCompleted()
{
_observer.OnCompleted();
}

//
// We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
taskObservable.Subscribe(taskCompletionObserver);
public void OnError(Exception error)
{
_observer.OnError(error);
}

return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
}
public void OnNext(Action value)
{
if (Interlocked.CompareExchange(ref _disposable, value, null) != null)
{
value?.Invoke();
}
}
}

private sealed class TaskDisposeCompletionObserver : IObserver<Action>, IDisposable
{
private readonly IObserver<TResult> _observer;
private Action _disposable;
private static readonly Action DisposedAction = () => { };
private readonly TaskDisposeCompletionObserver _observer;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public TaskDisposeCompletionObserver(IObserver<TResult> observer)
public Subscription(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync, IObserver<TResult> observer)
{
_observer = observer;
//
// We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
subscribeAsync(observer, _cts.Token)
.ToObservable()
.Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
}

public void Dispose()
{
Interlocked.Exchange(ref _disposable, DisposedAction)?.Invoke();
_cts.Cancel();
_observer.Dispose();
}
}

public void OnCompleted()
{
_observer.OnCompleted();
}
private readonly Func<IObserver<TResult>, CancellationToken, Task<Action>> _subscribeAsync;

public void OnError(Exception error)
{
_observer.OnError(error);
}
public CreateWithTaskActionObservable(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
{
_subscribeAsync = subscribeAsync;
}

public void OnNext(Action value)
{
if (Interlocked.CompareExchange(ref _disposable, value, null) != null)
{
value?.Invoke();
}
}
protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
return new Subscription(_subscribeAsync, observer);
}
}

Expand Down