Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review TaskObservableExtensions #760

Merged
merged 8 commits into from
Jul 4, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public void OnNext(Unit value)
public Subscription(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync, IObserver<TResult> observer)
{
_subscription = subscribeAsync(observer, _cts.Token)
.ToObservable()
.Subscribe(new TaskCompletionObserver(observer));
}

Expand Down Expand Up @@ -180,7 +179,6 @@ public Subscription(Func<IObserver<TResult>, CancellationToken, Task<IDisposable
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
subscribeAsync(observer, _cts.Token)
.ToObservable()
.Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
}

Expand Down Expand Up @@ -262,7 +260,6 @@ public Subscription(Func<IObserver<TResult>, CancellationToken, Task<Action>> su
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
subscribeAsync(observer, _cts.Token)
.ToObservable()
.Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Linq.ObservableImpl;
using System.Reactive.Subjects;
Expand Down Expand Up @@ -58,48 +59,30 @@ public static IObservable<Unit> ToObservable(this Task task, IScheduler schedule

private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler)
{
var res = default(IObservable<Unit>);

if (task.IsCompleted)
{
scheduler = scheduler ?? ImmediateScheduler.Instance;

switch (task.Status)
{
case TaskStatus.RanToCompletion:
res = new Return<Unit>(Unit.Default, scheduler);
break;
case TaskStatus.Faulted:
res = new Throw<Unit>(task.Exception.InnerException, scheduler);
break;
return new Throw<Unit>(task.Exception.InnerException, scheduler);
case TaskStatus.Canceled:
res = new Throw<Unit>(new TaskCanceledException(task), scheduler);
break;
return new Throw<Unit>(new TaskCanceledException(task), scheduler);
}
}
else
{
//
// Separate method to avoid closure in synchronous completion case.
//
res = ToObservableSlow(task, scheduler);
}

return res;
}
return new Return<Unit>(Unit.Default, scheduler);
}

private static IObservable<Unit> ToObservableSlow(Task task, IScheduler scheduler)
{
var subject = new AsyncSubject<Unit>();

var options = GetTaskContinuationOptions(scheduler);

task.ContinueWith(t => ToObservableDone(task, subject), options);
task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<Unit>)subjectObject), subject, options);

return ToObservableResult(subject, scheduler);
return subject.ToObservableResult(scheduler);
}

private static void ToObservableDone(Task task, IObserver<Unit> subject)
private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
{
switch (task.Status)
{
Expand All @@ -116,6 +99,26 @@ private static void ToObservableDone(Task task, IObserver<Unit> subject)
}
}

internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer)
{
if (task.IsCompleted)
{
task.EmitTaskResult(observer);
return Disposable.Empty;
}

var cts = new CancellationDisposable();

task.ContinueWith(
(t, observerObject) => t.EmitTaskResult((IObserver<Unit>)observerObject),
observer,
cts.Token,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);

return cts;
}

/// <summary>
/// Returns an observable sequence that propagates the result of the task.
/// </summary>
Expand Down Expand Up @@ -160,48 +163,30 @@ public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task

private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler)
{
var res = default(IObservable<TResult>);

if (task.IsCompleted)
{
scheduler = scheduler ?? ImmediateScheduler.Instance;

switch (task.Status)
{
case TaskStatus.RanToCompletion:
res = new Return<TResult>(task.Result, scheduler);
break;
case TaskStatus.Faulted:
res = new Throw<TResult>(task.Exception.InnerException, scheduler);
break;
return new Throw<TResult>(task.Exception.InnerException, scheduler);
case TaskStatus.Canceled:
res = new Throw<TResult>(new TaskCanceledException(task), scheduler);
break;
return new Throw<TResult>(new TaskCanceledException(task), scheduler);
}
}
else
{
//
// Separate method to avoid closure in synchronous completion case.
//
res = ToObservableSlow(task, scheduler);
}

return res;
}
return new Return<TResult>(task.Result, scheduler);
}

private static IObservable<TResult> ToObservableSlow<TResult>(Task<TResult> task, IScheduler scheduler)
{
var subject = new AsyncSubject<TResult>();

var options = GetTaskContinuationOptions(scheduler);

task.ContinueWith(t => ToObservableDone(task, subject), options);
task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<TResult>)subjectObject), subject, options);

return ToObservableResult(subject, scheduler);
return subject.ToObservableResult(scheduler);
}

private static void ToObservableDone<TResult>(Task<TResult> task, IObserver<TResult> subject)
private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)
{
switch (task.Status)
{
Expand Down Expand Up @@ -240,7 +225,7 @@ private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler sch
return options;
}

private static IObservable<TResult> ToObservableResult<TResult>(AsyncSubject<TResult> subject, IScheduler scheduler)
private static IObservable<TResult> ToObservableResult<TResult>(this AsyncSubject<TResult> subject, IScheduler scheduler)
{
if (scheduler != null)
{
Expand All @@ -250,6 +235,26 @@ private static IObservable<TResult> ToObservableResult<TResult>(AsyncSubject<TRe
return subject.AsObservable();
}

internal static IDisposable Subscribe<TResult>(this Task<TResult> task, IObserver<TResult> observer)
{
if (task.IsCompleted)
{
task.EmitTaskResult(observer);
return Disposable.Empty;
}

var cts = new CancellationDisposable();

task.ContinueWith(
(t, observerObject) => t.EmitTaskResult((IObserver<TResult>)observerObject),
observer,
cts.Token,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);

return cts;
}

/// <summary>
/// Returns a task that will receive the last value or the exception produced by the observable sequence.
/// </summary>
Expand Down