diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs index c623841bc6..97d0e2d1e3 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs @@ -101,7 +101,6 @@ public void OnNext(Unit value) public Subscription(Func, CancellationToken, Task> subscribeAsync, IObserver observer) { _subscription = subscribeAsync(observer, _cts.Token) - .ToObservable() .Subscribe(new TaskCompletionObserver(observer)); } @@ -180,7 +179,6 @@ public Subscription(Func, CancellationToken, Task type, we get auto-detach behavior for free. // subscribeAsync(observer, _cts.Token) - .ToObservable() .Subscribe(_observer = new TaskDisposeCompletionObserver(observer)); } @@ -262,7 +260,6 @@ public Subscription(Func, CancellationToken, Task> su // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. // subscribeAsync(observer, _cts.Token) - .ToObservable() .Subscribe(_observer = new TaskDisposeCompletionObserver(observer)); } diff --git a/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs b/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs index 85011cea22..d657a34a6e 100644 --- a/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs +++ b/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; +using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Linq.ObservableImpl; using System.Reactive.Subjects; @@ -58,48 +59,30 @@ public static IObservable ToObservable(this Task task, IScheduler schedule private static IObservable ToObservableImpl(Task task, IScheduler scheduler) { - var res = default(IObservable); - if (task.IsCompleted) { scheduler = scheduler ?? ImmediateScheduler.Instance; switch (task.Status) { - case TaskStatus.RanToCompletion: - res = new Return(Unit.Default, scheduler); - break; case TaskStatus.Faulted: - res = new Throw(task.Exception.InnerException, scheduler); - break; + return new Throw(task.Exception.InnerException, scheduler); case TaskStatus.Canceled: - res = new Throw(new TaskCanceledException(task), scheduler); - break; + return new Throw(new TaskCanceledException(task), scheduler); } - } - else - { - // - // Separate method to avoid closure in synchronous completion case. - // - res = ToObservableSlow(task, scheduler); - } - return res; - } + return new Return(Unit.Default, scheduler); + } - private static IObservable ToObservableSlow(Task task, IScheduler scheduler) - { var subject = new AsyncSubject(); - var options = GetTaskContinuationOptions(scheduler); - task.ContinueWith(t => ToObservableDone(task, subject), options); + task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject)subjectObject), subject, options); - return ToObservableResult(subject, scheduler); + return subject.ToObservableResult(scheduler); } - private static void ToObservableDone(Task task, IObserver subject) + private static void EmitTaskResult(this Task task, IObserver subject) { switch (task.Status) { @@ -116,6 +99,26 @@ private static void ToObservableDone(Task task, IObserver subject) } } + internal static IDisposable Subscribe(this Task task, IObserver observer) + { + if (task.IsCompleted) + { + task.EmitTaskResult(observer); + return Disposable.Empty; + } + + var cts = new CancellationDisposable(); + + task.ContinueWith( + (t, observerObject) => t.EmitTaskResult((IObserver)observerObject), + observer, + cts.Token, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + + return cts; + } + /// /// Returns an observable sequence that propagates the result of the task. /// @@ -160,48 +163,30 @@ public static IObservable ToObservable(this Task task private static IObservable ToObservableImpl(Task task, IScheduler scheduler) { - var res = default(IObservable); - if (task.IsCompleted) { scheduler = scheduler ?? ImmediateScheduler.Instance; switch (task.Status) { - case TaskStatus.RanToCompletion: - res = new Return(task.Result, scheduler); - break; case TaskStatus.Faulted: - res = new Throw(task.Exception.InnerException, scheduler); - break; + return new Throw(task.Exception.InnerException, scheduler); case TaskStatus.Canceled: - res = new Throw(new TaskCanceledException(task), scheduler); - break; + return new Throw(new TaskCanceledException(task), scheduler); } - } - else - { - // - // Separate method to avoid closure in synchronous completion case. - // - res = ToObservableSlow(task, scheduler); - } - return res; - } + return new Return(task.Result, scheduler); + } - private static IObservable ToObservableSlow(Task task, IScheduler scheduler) - { var subject = new AsyncSubject(); - var options = GetTaskContinuationOptions(scheduler); - task.ContinueWith(t => ToObservableDone(task, subject), options); + task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject)subjectObject), subject, options); - return ToObservableResult(subject, scheduler); + return subject.ToObservableResult(scheduler); } - private static void ToObservableDone(Task task, IObserver subject) + private static void EmitTaskResult(this Task task, IObserver subject) { switch (task.Status) { @@ -240,7 +225,7 @@ private static TaskContinuationOptions GetTaskContinuationOptions(IScheduler sch return options; } - private static IObservable ToObservableResult(AsyncSubject subject, IScheduler scheduler) + private static IObservable ToObservableResult(this AsyncSubject subject, IScheduler scheduler) { if (scheduler != null) { @@ -250,6 +235,26 @@ private static IObservable ToObservableResult(AsyncSubject(this Task task, IObserver observer) + { + if (task.IsCompleted) + { + task.EmitTaskResult(observer); + return Disposable.Empty; + } + + var cts = new CancellationDisposable(); + + task.ContinueWith( + (t, observerObject) => t.EmitTaskResult((IObserver)observerObject), + observer, + cts.Token, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + + return cts; + } + /// /// Returns a task that will receive the last value or the exception produced by the observable sequence. ///