diff --git a/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs b/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs index 68d419ee11..a231699441 100644 --- a/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs +++ b/Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs @@ -286,43 +286,41 @@ public static Task ToTask(this IObservable observable return observable.ToTask(cancellationToken, state: null); } - private sealed class ToTaskObserver : IObserver + private sealed class ToTaskObserver : SafeObserver { private readonly CancellationToken _ct; - private readonly IDisposable _disposable; private readonly TaskCompletionSource _tcs; private readonly CancellationTokenRegistration _ctr = default(CancellationTokenRegistration); private bool _hasValue; private TResult _lastValue; - public ToTaskObserver(TaskCompletionSource tcs, IDisposable disposable, CancellationToken ct) + public ToTaskObserver(TaskCompletionSource tcs, CancellationToken ct) { _ct = ct; _tcs = tcs; - _disposable = disposable; if (ct.CanBeCanceled) { - _ctr = ct.Register(Cancel); + _ctr = ct.Register(@this => ((ToTaskObserver)@this).Cancel(), this); } } - public void OnNext(TResult value) + public override void OnNext(TResult value) { _hasValue = true; _lastValue = value; } - public void OnError(Exception error) + public override void OnError(Exception error) { _tcs.TrySetException(error); _ctr.Dispose(); // no null-check needed (struct) - _disposable.Dispose(); + Dispose(); } - public void OnCompleted() + public override void OnCompleted() { if (_hasValue) { @@ -334,12 +332,12 @@ public void OnCompleted() } _ctr.Dispose(); // no null-check needed (struct) - _disposable.Dispose(); + Dispose(); } private void Cancel() { - _disposable.Dispose(); + Dispose(); #if HAS_TPL46 _tcs.TrySetCanceled(_ct); #else @@ -364,9 +362,7 @@ public static Task ToTask(this IObservable observable var tcs = new TaskCompletionSource(state); - var disposable = new SingleAssignmentDisposable(); - - var taskCompletionObserver = new ToTaskObserver(tcs, disposable, cancellationToken); + var taskCompletionObserver = new ToTaskObserver(tcs, cancellationToken); // // Subtle race condition: if the source completes before we reach the line below, the SingleAssigmentDisposable @@ -382,7 +378,7 @@ public static Task ToTask(this IObservable observable // exception handling logic here for the reason explained above. We cannot afford to throw here // and as a result never set the TaskCompletionSource, so we tunnel everything through here. // - disposable.Disposable = observable.Subscribe/*Unsafe*/(taskCompletionObserver); + taskCompletionObserver.SetResource(observable.Subscribe/*Unsafe*/(taskCompletionObserver)); } catch (Exception ex) {