From b2a449fd67473ea428d942757aefa8f5e2a15bea Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 28 Sep 2018 17:18:32 +0200 Subject: [PATCH] 4.x: Fix accidental behavior change with Task-based Create methods completing when the body ends --- .../Linq/QueryLanguage.Creation.cs | 2 - .../Tests/Linq/Observable/CreateAsyncTest.cs | 41 +++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs index 1ef584791d..e117ebe915 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs @@ -155,7 +155,6 @@ public void Dispose() public void OnCompleted() { - _observer.OnCompleted(); } public void OnError(Exception error) @@ -233,7 +232,6 @@ public void Dispose() public void OnCompleted() { - _observer.OnCompleted(); } public void OnError(Exception error) diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/CreateAsyncTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/CreateAsyncTest.cs index cde3803bc4..7806a9e535 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/CreateAsyncTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/CreateAsyncTest.cs @@ -705,5 +705,46 @@ public void CreateAsync_Action_Token() Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10))); } + + [Fact] + public void CreateWithTaskDisposable_NoPrematureTermination() + { + var obs = Observable.Create(async o => + { + // avoid warning on async o due to no await + await Task.CompletedTask; + + var inner = Observable.Range(1, 3); + + return inner.Subscribe(x => + { + o.OnNext(x); + }); + }); + + var result = obs.Take(1).Wait(); + } + + [Fact] + public void CreateWithTaskAction_NoPrematureTermination() + { + var obs = Observable.Create(async o => + { + // avoid warning on async o due to no await + await Task.CompletedTask; + + var inner = Observable.Range(1, 3); + + var d = inner.Subscribe(x => + { + o.OnNext(x); + }); + + Action a = () => d.Dispose(); + return a; + }); + + var result = obs.Take(1).Wait(); + } } }