From 1990de7c854d9155b2ca3bc9a8d078efd045d358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 22 Aug 2018 11:14:53 +0200 Subject: [PATCH 1/4] Fix Zip(IEnumerable) NullReferenceException if a source completes immediately (cherry picked from commit 1b19cb6b565abc3e073e0177d517ab589da31036) --- .../System.Reactive/Linq/Observable/Zip.cs | 6 +++- .../Tests/Linq/Observable/ZipTest.cs | 35 +++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs index b78960cf3e..50b8a94257 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs @@ -661,7 +661,11 @@ private void OnCompleted(int index) } else { - _subscriptions[index].Dispose(); + var subscriptions = Volatile.Read(ref _subscriptions); + if (subscriptions != null && subscriptions != Array.Empty()) + { + Disposable.TryDispose(ref subscriptions[index]); + } } } } diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ZipTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ZipTest.cs index 4a0cd1eed6..48cec2be4a 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ZipTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ZipTest.cs @@ -4448,6 +4448,41 @@ public void Zip_AtLeastOneThrows4() #endregion + [Fact] + public void Zip2WithImmediateReturn() + { + Observable.Zip( + Observable.Return(Unit.Default), + Observable.Return(Unit.Default), + (_, __) => Unit.Default + ) + .Subscribe(_ => { }); + } + + [Fact] + public void Zip3WithImmediateReturn() + { + Observable.Zip( + Observable.Return(Unit.Default), + Observable.Return(Unit.Default), + Observable.Return(Unit.Default), + (_, __, ___) => Unit.Default + ) + .Subscribe(_ => { }); + } + + [Fact] + public void ZipEnumerableWithImmediateReturn() + { + Enumerable.Range(0, 100) + .Select(_ => Observable.Return(Unit.Default)) + .Zip() + .Subscribe(_ => + { + + } + ); + } } #pragma warning restore IDE0039 // Use local function } From 1a5abeb2ebb47bda07d2167e4484c2523749ac2f Mon Sep 17 00:00:00 2001 From: "Daniel C. Weber" Date: Tue, 28 Aug 2018 12:17:19 +0200 Subject: [PATCH 2/4] Emit a warning if the half-serializer ignores a value in ForwardOnNext. (cherry picked from commit 7d2d4c4ec475a382ef42c3ed46e8f59c159f0d10) --- Rx.NET/Source/Directory.build.targets | 6 +++--- .../Source/src/System.Reactive/Internal/HalfSerializer.cs | 5 +++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Rx.NET/Source/Directory.build.targets b/Rx.NET/Source/Directory.build.targets index 59c66f3578..5a547415a5 100644 --- a/Rx.NET/Source/Directory.build.targets +++ b/Rx.NET/Source/Directory.build.targets @@ -1,7 +1,7 @@ - $(DefineConstants);HAS_WINRT;PREFER_ASYNC;HAS_TPL46;DESKTOPCLR + $(DefineConstants);HAS_TRACE;HAS_WINRT;PREFER_ASYNC;HAS_TPL46;DESKTOPCLR 10.0.16299.0 @@ -9,9 +9,9 @@ $(DefineConstants);NO_CODE_COVERAGE_ATTRIBUTE;HAS_WINRT;PREFER_ASYNC;HAS_TPL46;NO_REMOTING;NO_SERIALIZABLE;CRIPPLED_REFLECTION;NO_THREAD;WINDOWS - $(DefineConstants);HAS_WINRT;PREFER_ASYNC;HAS_TPL46;NO_REMOTING;WINDOWS + $(DefineConstants);HAS_TRACE;HAS_WINRT;PREFER_ASYNC;HAS_TPL46;NO_REMOTING;WINDOWS - $(DefineConstants);HAS_WINRT;PREFER_ASYNC;HAS_TPL46;NO_REMOTING + $(DefineConstants);HAS_TRACE;HAS_WINRT;PREFER_ASYNC;HAS_TPL46;NO_REMOTING \ No newline at end of file diff --git a/Rx.NET/Source/src/System.Reactive/Internal/HalfSerializer.cs b/Rx.NET/Source/src/System.Reactive/Internal/HalfSerializer.cs index 74c7da55a7..37478653d5 100644 --- a/Rx.NET/Source/src/System.Reactive/Internal/HalfSerializer.cs +++ b/Rx.NET/Source/src/System.Reactive/Internal/HalfSerializer.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. +using System.Diagnostics; using System.Threading; namespace System.Reactive @@ -47,6 +48,10 @@ public static void ForwardOnNext(ISink sink, T item, ref int wip, ref Exce } } } +#if (HAS_TRACE) + else if (error == null) + Trace.TraceWarning("OnNext called while another OnNext call was in progress on the same Observer."); +#endif } /// From 8de749f5a66f5f999f00943450475c386fd6ae76 Mon Sep 17 00:00:00 2001 From: Paulo Morgado Date: Tue, 4 Sep 2018 11:05:05 +0100 Subject: [PATCH 3/4] Added debugger display information to TestScheduler and HistoricalScheduler (cherry picked from commit 72c3b7d9551db2851b3b577b7682915b059552dd) --- .../Source/src/Microsoft.Reactive.Testing/TestScheduler.cs | 5 +++++ .../src/System.Reactive/Concurrency/HistoricalScheduler.cs | 5 +++++ .../Api/ApiApprovalTests.Core.approved.txt | 1 + .../Api/ApiApprovalTests.Testing.approved.txt | 1 + 4 files changed, 12 insertions(+) diff --git a/Rx.NET/Source/src/Microsoft.Reactive.Testing/TestScheduler.cs b/Rx.NET/Source/src/Microsoft.Reactive.Testing/TestScheduler.cs index 8bd795632a..82994a9149 100644 --- a/Rx.NET/Source/src/Microsoft.Reactive.Testing/TestScheduler.cs +++ b/Rx.NET/Source/src/Microsoft.Reactive.Testing/TestScheduler.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Diagnostics; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; @@ -12,6 +13,10 @@ namespace Microsoft.Reactive.Testing /// /// Virtual time scheduler used for testing applications and libraries built using Reactive Extensions. /// + [DebuggerDisplay("\\{ " + + nameof(Clock) + " = {" + nameof(Clock) + "} " + + nameof(Now) + " = {" + nameof(Now) + ".ToString(\"O\")} " + + "\\}")] public class TestScheduler : VirtualTimeScheduler { /// diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs index e14e2806a2..9919f7b6e9 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/HistoricalScheduler.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; +using System.Diagnostics; namespace System.Reactive.Concurrency { @@ -67,6 +68,10 @@ protected override DateTimeOffset Add(DateTimeOffset absolute, TimeSpan relative /// /// Provides a virtual time scheduler that uses for absolute time and for relative time. /// + [DebuggerDisplay("\\{ " + + nameof(Clock) + " = {" + nameof(Clock) + "} " + + nameof(Now) + " = {" + nameof(Now) + ".ToString(\"O\")} " + + "\\}")] public class HistoricalScheduler : HistoricalSchedulerBase { private readonly SchedulerQueue _queue = new SchedulerQueue(); diff --git a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt index 3c723b219b..a6477c99f0 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt +++ b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt @@ -267,6 +267,7 @@ namespace System.Reactive.Concurrency public System.IDisposable SchedulePeriodic(TState state, System.TimeSpan period, System.Func action) { } public override System.Reactive.Concurrency.IStopwatch StartStopwatch() { } } + [System.Diagnostics.DebuggerDisplayAttribute("\\{ Clock = {Clock} Now = {Now.ToString(\"O\")} \\}")] public class HistoricalScheduler : System.Reactive.Concurrency.HistoricalSchedulerBase { public HistoricalScheduler() { } diff --git a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Testing.approved.txt b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Testing.approved.txt index 52015e4216..d6b262a213 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Testing.approved.txt +++ b/Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Testing.approved.txt @@ -71,6 +71,7 @@ namespace Microsoft.Reactive.Testing public override int GetHashCode() { } public override string ToString() { } } + [System.Diagnostics.DebuggerDisplayAttribute("\\{ Clock = {Clock} Now = {Now.ToString(\"O\")} \\}")] public class TestScheduler : System.Reactive.Concurrency.VirtualTimeScheduler { public TestScheduler() { } From 2baca5735f3f48a6a0280267e3782ac36ec7a229 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 28 Sep 2018 17:18:32 +0200 Subject: [PATCH 4/4] 4.x: Fix accidental behavior change with Task-based Create methods completing when the body ends (cherry picked from commit b2a449fd67473ea428d942757aefa8f5e2a15bea) --- .../Linq/QueryLanguage.Creation.cs | 2 - .../Tests/Linq/Observable/CreateAsyncTest.cs | 41 +++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs index 1ef584791d..e117ebe915 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs @@ -155,7 +155,6 @@ public void Dispose() public void OnCompleted() { - _observer.OnCompleted(); } public void OnError(Exception error) @@ -233,7 +232,6 @@ public void Dispose() public void OnCompleted() { - _observer.OnCompleted(); } public void OnError(Exception error) diff --git a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/CreateAsyncTest.cs b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/CreateAsyncTest.cs index cde3803bc4..7806a9e535 100644 --- a/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/CreateAsyncTest.cs +++ b/Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/CreateAsyncTest.cs @@ -705,5 +705,46 @@ public void CreateAsync_Action_Token() Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10))); } + + [Fact] + public void CreateWithTaskDisposable_NoPrematureTermination() + { + var obs = Observable.Create(async o => + { + // avoid warning on async o due to no await + await Task.CompletedTask; + + var inner = Observable.Range(1, 3); + + return inner.Subscribe(x => + { + o.OnNext(x); + }); + }); + + var result = obs.Take(1).Wait(); + } + + [Fact] + public void CreateWithTaskAction_NoPrematureTermination() + { + var obs = Observable.Create(async o => + { + // avoid warning on async o due to no await + await Task.CompletedTask; + + var inner = Observable.Range(1, 3); + + var d = inner.Subscribe(x => + { + o.OnNext(x); + }); + + Action a = () => d.Dispose(); + return a; + }); + + var result = obs.Take(1).Wait(); + } } }