From 609ab9cf25c3ba89856854a34fc5675ced8bce7c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 10 May 2022 00:18:38 +0700 Subject: [PATCH 1/2] Add async fluent builder feature to TestPublisher --- .../PublisherFluentBuilder.cs | 314 ++++++++++++++++++ .../SubscriberFluentBuilder.cs | 173 +++++++--- .../Akka.Streams.TestKit/TestPublisher.cs | 185 ++++------- .../TestPublisher_Fluent.cs | 113 +++++++ .../TestPublisher_Shared.cs | 141 ++++++++ .../Akka.Streams.TestKit/TestSubscriber.cs | 53 ++- .../TestSubscriber_Fluent.cs | 30 ++ .../TestSubscriber_Shared.cs | 4 +- src/core/Akka.Streams.TestKit/Utils.cs | 4 +- 9 files changed, 837 insertions(+), 180 deletions(-) create mode 100644 src/core/Akka.Streams.TestKit/PublisherFluentBuilder.cs create mode 100644 src/core/Akka.Streams.TestKit/TestPublisher_Fluent.cs create mode 100644 src/core/Akka.Streams.TestKit/TestPublisher_Shared.cs diff --git a/src/core/Akka.Streams.TestKit/PublisherFluentBuilder.cs b/src/core/Akka.Streams.TestKit/PublisherFluentBuilder.cs new file mode 100644 index 00000000000..5b8f5e9b9fd --- /dev/null +++ b/src/core/Akka.Streams.TestKit/PublisherFluentBuilder.cs @@ -0,0 +1,314 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +#nullable enable +using System; +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Akka.TestKit; +using Reactive.Streams; +using static Akka.Streams.TestKit.TestPublisher; + +namespace Akka.Streams.TestKit +{ + public class PublisherFluentBuilder + { + private readonly List> _tasks = new List>(); + private bool _executed; + + internal PublisherFluentBuilder(ManualProbe probe) + { + Probe = probe; + } + + public ManualProbe Probe { get; } + + /// + /// Execute the async chain. + /// + /// + /// + /// + public async Task ExecuteAsync(Func? asyncAction = null, CancellationToken cancellationToken = default) + { + if (_executed) + throw new InvalidOperationException("Fluent async builder has already been executed."); + _executed = true; + + foreach (var func in _tasks) + { + await func(cancellationToken) + .ConfigureAwait(false); + } + + if (asyncAction != null) + await asyncAction() + .ConfigureAwait(false); + } + + #region ManualProbe wrapper + + /// + /// Execute the async chain and then receive messages for a given duration or until one does not match a given partial function. + /// NOTE: This method will execute the async chain + /// + public async IAsyncEnumerable ReceiveWhileAsync( + TimeSpan? max = null, + TimeSpan? idle = null, + Func? filter = null, + int msgCount = int.MaxValue, + [EnumeratorCancellation] CancellationToken cancellationToken = default) where TOther : class + { + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + await foreach (var item in ManualProbe.ReceiveWhileTask(Probe.Probe, max, idle, filter, msgCount, cancellationToken)) + { + yield return item; + } + } + + /// + /// Execute the async chain and then expect a publisher event from the stream. + /// NOTE: This method will execute the async chain + /// + public async Task ExpectEventAsync(CancellationToken cancellationToken = default) + { + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + return await ManualProbe.ExpectEventTask(Probe.Probe, cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Execute the async chain and then execute the code block while bounding its execution time between and + /// . blocks may be nested. + /// All methods in this class which take maximum wait times are available in a version which implicitly uses + /// the remaining time governed by the innermost enclosing block. + /// + /// + /// + /// Note that the timeout is scaled using , which uses the + /// configuration entry "akka.test.timefactor", while the min Duration is not. + /// + /// + /// { + /// test.Tell("ping"); + /// return ExpectMsg(); + /// }); + /// ]]> + /// + /// + /// { + /// test.Tell("ping"); + /// await ExpectMsgAsync("expected"); + /// }); + /// + /// NOTE: This method will execute the async chain + /// ]]> + /// + public async Task WithinAsync( + TimeSpan min, + TimeSpan max, + Func function, + CancellationToken cancellationToken = default) + { + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + return await Probe.WithinAsync(min, max, function, cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Execute the async chain and then execute the code block while bounding its execution time between and + /// . blocks may be nested. + /// All methods in this class which take maximum wait times are available in a version which implicitly uses + /// the remaining time governed by the innermost enclosing block. + /// + /// + /// + /// Note that the timeout is scaled using , which uses the + /// configuration entry "akka.test.timefactor", while the min Duration is not. + /// + /// + /// { + /// test.Tell("ping"); + /// return await ExpectMsgAsync(); + /// }); + /// ]]> + /// + /// NOTE: This method will execute the async chain + /// + public async Task WithinAsync( + TimeSpan min, + TimeSpan max, + Func> function, + CancellationToken cancellationToken = default) + { + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + return await Probe.WithinAsync(min, max, function, cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Sane as calling WithinAsync(TimeSpan.Zero, max, function, cancellationToken). + /// + /// NOTE: This method will execute the async chain + /// + public async Task WithinAsync(TimeSpan max, Func execute, CancellationToken cancellationToken = default) + { + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + return await Probe.WithinAsync(max, execute, cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Sane as calling WithinAsync(TimeSpan.Zero, max, function, cancellationToken). + /// + /// NOTE: This method will execute the async chain + /// + public async Task WithinAsync(TimeSpan max, Func> execute, CancellationToken cancellationToken = default) + { + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + return await Probe.WithinAsync(max, execute, cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + + #endregion + + #region ManualProbe fluent wrapper + + /// + /// Fluent async DSL + /// Expect demand from the given subscription. + /// + public PublisherFluentBuilder ExpectRequest(ISubscription subscription, int nrOfElements) + { + _tasks.Add(ct => ManualProbe.ExpectRequestTask(Probe.Probe, subscription, nrOfElements, ct)); + return this; + } + + /// + /// Fluent async DSL + /// Expect no messages. + /// + public PublisherFluentBuilder ExpectNoMsg() + { + _tasks.Add(ct => ManualProbe.ExpectNoMsgTask(Probe.Probe, ct)); + return this; + } + + /// + /// Fluent async DSL + /// Expect no messages for given duration. + /// + public PublisherFluentBuilder ExpectNoMsg(TimeSpan duration, CancellationToken cancellationToken = default) + { + _tasks.Add(ct => ManualProbe.ExpectNoMsgTask(Probe.Probe, duration, ct)); + return this; + } + + #endregion + + #region Probe fluent wrapper + + /// + /// Asserts that a subscription has been received or will be received + /// + public PublisherFluentBuilder EnsureSubscription() + { + if (!(Probe is Probe probe)) + { + throw new InvalidOperationException($"{nameof(EnsureSubscription)} can only be used on a {nameof(Probe)} instance"); + } + _tasks.Add(ct => Probe.EnsureSubscriptionTask(probe, ct)); + return this; + } + + public PublisherFluentBuilder SendNext(T element) + { + if (!(Probe is Probe probe)) + { + throw new InvalidOperationException($"{nameof(SendNext)} can only be used on a {nameof(Probe)} instance"); + } + _tasks.Add(ct => Probe.SendNextTask(probe, element, ct)); + return this; + } + + public PublisherFluentBuilder SendNext(IEnumerable elements) + { + if (!(Probe is Probe probe)) + { + throw new InvalidOperationException($"{nameof(SendNext)} can only be used on a {nameof(Probe)} instance"); + } + _tasks.Add(ct => Probe.SendNextTask(probe, elements, ct)); + return this; + } + + public PublisherFluentBuilder UnsafeSendNext(T element) + { + if (!(Probe is Probe probe)) + { + throw new InvalidOperationException($"{nameof(UnsafeSendNext)} can only be used on a {nameof(Probe)} instance"); + } + _tasks.Add(ct => Probe.UnsafeSendNextTask(probe, element, ct)); + return this; + } + + public PublisherFluentBuilder SendComplete() + { + if (!(Probe is Probe probe)) + { + throw new InvalidOperationException($"{nameof(SendComplete)} can only be used on a {nameof(Probe)} instance"); + } + _tasks.Add(ct => Probe.SendCompleteTask(probe, ct)); + return this; + } + + public PublisherFluentBuilder SendError(Exception e) + { + if (!(Probe is Probe probe)) + { + throw new InvalidOperationException($"{nameof(SendError)} can only be used on a {nameof(Probe)} instance"); + } + _tasks.Add(ct => Probe.SendErrorTask(probe, e, ct)); + return this; + } + + public PublisherFluentBuilder ExpectCancellation() + { + if (!(Probe is Probe probe)) + { + throw new InvalidOperationException($"{nameof(ExpectCancellation)} can only be used on a {nameof(Probe)} instance"); + } + _tasks.Add(ct => Probe.ExpectCancellationTask(probe, ct)); + return this; + } + + #endregion + + public async Task ExpectRequestAsync(CancellationToken cancellationToken = default) + { + if (!(Probe is Probe probe)) + { + throw new InvalidOperationException($"{nameof(ExpectCancellation)} can only be used on a {nameof(Probe)} instance"); + } + + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + return await Probe.ExpectRequestTask(probe, cancellationToken) + .ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams.TestKit/SubscriberFluentBuilder.cs b/src/core/Akka.Streams.TestKit/SubscriberFluentBuilder.cs index 822463c6dc8..56513a8132c 100644 --- a/src/core/Akka.Streams.TestKit/SubscriberFluentBuilder.cs +++ b/src/core/Akka.Streams.TestKit/SubscriberFluentBuilder.cs @@ -27,7 +27,8 @@ public class SubscriberFluentBuilder /// public async Task ExpectSubscriptionAsync(CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectSubscriptionTask(Probe, cancellationToken) .ConfigureAwait(false); } @@ -39,7 +40,8 @@ public async Task ExpectSubscriptionAsync(CancellationToken cance /// public async Task ExpectEventAsync(CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectEventTask(Probe.TestProbe, (TimeSpan?)null, cancellationToken) .ConfigureAwait(false); } @@ -53,7 +55,8 @@ public async Task ExpectEventAsync( TimeSpan? max, CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectEventTask(Probe.TestProbe, max, cancellationToken) .ConfigureAwait(false); } @@ -64,7 +67,8 @@ public async Task ExpectEventAsync( /// public async Task ExpectNextAsync(CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectNextTask(Probe.TestProbe, null, cancellationToken) .ConfigureAwait(false); } @@ -75,7 +79,8 @@ public async Task ExpectNextAsync(CancellationToken cancellationToken = defau /// public async Task ExpectNextAsync(TimeSpan? timeout, CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectNextTask(Probe.TestProbe, timeout, cancellationToken) .ConfigureAwait(false); } @@ -89,7 +94,8 @@ public async IAsyncEnumerable ExpectNextNAsync( TimeSpan? timeout = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); await foreach (var item in ManualProbe.ExpectNextNTask(Probe.TestProbe, n, timeout, cancellationToken)) { yield return item; @@ -102,7 +108,8 @@ public async IAsyncEnumerable ExpectNextNAsync( /// public async Task ExpectErrorAsync(CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectErrorTask(Probe.TestProbe, cancellationToken) .ConfigureAwait(false); } @@ -115,7 +122,8 @@ public async Task ExpectErrorAsync(CancellationToken cancellationToke /// public async Task ExpectSubscriptionAndErrorAsync(CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectSubscriptionAndErrorTask(Probe, true, cancellationToken) .ConfigureAwait(false); } @@ -132,7 +140,8 @@ public async Task ExpectSubscriptionAndErrorAsync( bool signalDemand, CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectSubscriptionAndErrorTask(Probe, signalDemand, cancellationToken) .ConfigureAwait(false); } @@ -143,7 +152,8 @@ public async Task ExpectSubscriptionAndErrorAsync( /// public async Task ExpectNextOrErrorAsync(CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectNextOrErrorTask(Probe.TestProbe, cancellationToken) .ConfigureAwait(false); } @@ -154,7 +164,8 @@ public async Task ExpectNextOrErrorAsync(CancellationToken cancellationT /// public async Task ExpectNextOrCompleteAsync(CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectNextOrCompleteTask(Probe.TestProbe, cancellationToken) .ConfigureAwait(false); } @@ -171,7 +182,8 @@ public async Task ExpectNextAsync( Predicate predicate, CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectNextTask(Probe.TestProbe, predicate, cancellationToken) .ConfigureAwait(false); } @@ -180,7 +192,8 @@ public async Task ExpectEventAsync( Func func, CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); return await ManualProbe.ExpectEventTask(Probe.TestProbe, func, cancellationToken) .ConfigureAwait(false); } @@ -196,7 +209,8 @@ public async IAsyncEnumerable ReceiveWhileAsync( int msgs = int.MaxValue, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); await foreach (var item in Probe.TestProbe.ReceiveWhileAsync(max, idle, filter, msgs, cancellationToken)) { yield return item; @@ -212,7 +226,8 @@ public async IAsyncEnumerable ReceiveWithinAsync( int messages = int.MaxValue, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); await foreach (var item in ManualProbe.ReceiveWithinTask(Probe.TestProbe, max, messages, cancellationToken)) { yield return item; @@ -220,36 +235,93 @@ public async IAsyncEnumerable ReceiveWithinAsync( } /// - /// Execute code block while bounding its execution time between and . + /// Execute the async chain and then execute the code block while bounding its execution time between and . /// /// Note that the timeout is scaled using , which uses the /// configuration entry "akka.test.timefactor", while the min Duration is not. /// /// + /// var ret = await probe.AsyncBuilder().WithinAsync(Timespan.FromMilliseconds(50), Timespan.FromSeconds(3), () => /// { /// test.Tell("ping"); /// return ExpectMsg(); /// }); /// ]]> + /// + /// + /// { + /// test.Tell("ping"); + /// await ExpectMsgAsync("expected"); + /// }); + /// ]]> + /// + /// NOTE: This method will execute the async chain /// - public async Task WithinAsync(TimeSpan min, TimeSpan max, Func execute, CancellationToken cancellationToken = default) + public async Task WithinAsync( + TimeSpan min, + TimeSpan max, + Func function, + string? hint = null, + TimeSpan? epsilonValue = null, + CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); - return await Probe.TestProbe.WithinAsync(min, max, execute, cancellationToken: cancellationToken) + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + return await Probe.TestProbe.WithinAsync(min, max, function, hint, epsilonValue, cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Execute the async chain and then execute the code block while bounding its execution time between and . + /// + /// Note that the timeout is scaled using , which uses the + /// configuration entry "akka.test.timefactor", while the min Duration is not. + /// + /// + /// { + /// test.Tell("ping"); + /// await ExpectMsgAsync("expected"); + /// }); + /// ]]> + /// + /// NOTE: This method will execute the async chain + /// + public async Task WithinAsync( + TimeSpan min, + TimeSpan max, + Func> asyncFunction, + string? hint = null, + TimeSpan? epsilonValue = null, + CancellationToken cancellationToken = default) + { + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + return await Probe.TestProbe.WithinAsync(min, max, asyncFunction, hint, epsilonValue, cancellationToken) .ConfigureAwait(false); } /// - /// Execute code block while bounding its execution time with a timeout. + /// Execute the async chain and then execute code block while bounding its execution time with a timeout. /// /// + /// var ret = await probe.AsyncBuilder().WithinAsync(Timespan.FromSeconds(3), () => /// { /// test.Tell("ping"); /// return ExpectMsg(); /// }); /// ]]> + /// + /// + /// { + /// test.Tell("ping"); + /// await ExpectMsgAsync("expected"); + /// }); + /// ]]> + /// + /// NOTE: This method will execute the async chain /// public async Task WithinAsync(TimeSpan max, Func execute, CancellationToken cancellationToken = default) { @@ -258,31 +330,58 @@ public async Task WithinAsync(TimeSpan max, Func execute .ConfigureAwait(false); } - public async Task WithinAsync( + /// + /// Execute the async chain and then execute code block while bounding its execution time with a timeout. + /// + /// + /// { + /// test.Tell("ping"); + /// return await ExpectMsgAsync(); + /// }); + /// ]]> + /// NOTE: This method will execute the async chain + /// + public async Task WithinAsync( TimeSpan max, - Func actionAsync, + Func> actionAsync, TimeSpan? epsilonValue = null, CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); - await Probe.TestProbe.WithinAsync(max, actionAsync, epsilonValue, cancellationToken) + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + return await Probe.TestProbe.WithinAsync(max, actionAsync, epsilonValue, cancellationToken) .ConfigureAwait(false); } /// - /// Attempt to drain the stream into an IAsyncEnumerable (by requesting long.MaxValue elements). + /// Execute the async chain and then attempt to drain the stream into an IAsyncEnumerable (by requesting long.MaxValue elements). + /// NOTE: This method will execute the async chain /// public async IAsyncEnumerable ToStrictAsync( TimeSpan atMost, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - await ExecuteAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await ExecuteAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); await foreach (var item in ManualProbe.ToStrictTask(Probe, atMost, cancellationToken)) { yield return item; } } + #endregion + + private readonly List> _tasks = new List>(); + private bool _executed; + + internal SubscriberFluentBuilder(ManualProbe probe) + { + Probe = probe; + } + + public ManualProbe Probe { get; } + /// /// Execute the async chain. /// @@ -297,25 +396,15 @@ public async Task ExecuteAsync(Func? asyncAction = null, CancellationToken foreach (var func in _tasks) { - await func(cancellationToken); + await func(cancellationToken) + .ConfigureAwait(false); } if (asyncAction != null) - await asyncAction(); + await asyncAction() + .ConfigureAwait(false); } - #endregion - - private readonly List> _tasks = new List>(); - private bool _executed; - - internal SubscriberFluentBuilder(ManualProbe probe) - { - Probe = probe; - } - - public ManualProbe Probe { get; } - #region Probe wrapper /// diff --git a/src/core/Akka.Streams.TestKit/TestPublisher.cs b/src/core/Akka.Streams.TestKit/TestPublisher.cs index 2bd4ee258bd..1a2e542e9f3 100644 --- a/src/core/Akka.Streams.TestKit/TestPublisher.cs +++ b/src/core/Akka.Streams.TestKit/TestPublisher.cs @@ -21,7 +21,7 @@ namespace Akka.Streams.TestKit /// /// Provides factory methods for various Publishers. /// - public static class TestPublisher + public static partial class TestPublisher { #region messages @@ -66,13 +66,14 @@ public RequestMore(ISubscription subscription, long nrOfElements) /// This probe does not track demand.Therefore you need to expect demand before sending /// elements downstream. /// - public class ManualProbe : IPublisher + public partial class ManualProbe : IPublisher { - private readonly TestProbe _probe; + private volatile StreamTestKit.PublisherProbeSubscription _subscription_DoNotUseDirectly; + public TestProbe Probe { get; } internal ManualProbe(TestKitBase system, bool autoOnSubscribe = true) { - _probe = system.CreateTestProbe(); + Probe = system.CreateTestProbe(); AutoOnSubscribe = autoOnSubscribe; } @@ -80,13 +81,21 @@ internal ManualProbe(TestKitBase system, bool autoOnSubscribe = true) public IPublisher Publisher => this; + public StreamTestKit.PublisherProbeSubscription Subscription + { +#pragma warning disable CS0420 + get => Volatile.Read(ref _subscription_DoNotUseDirectly); + protected set => Volatile.Write(ref _subscription_DoNotUseDirectly, value); +#pragma warning restore CS0420 + } + /// /// Subscribes a given to this probe. /// public void Subscribe(ISubscriber subscriber) { - var subscription = new StreamTestKit.PublisherProbeSubscription(subscriber, _probe); - _probe.Ref.Tell(new Subscribe(subscription)); + var subscription = new StreamTestKit.PublisherProbeSubscription(subscriber, Probe); + Probe.Ref.Tell(new Subscribe(subscription)); if (AutoOnSubscribe) subscriber.OnSubscribe(subscription); } @@ -95,7 +104,7 @@ public void Subscribe(ISubscriber subscriber) /// public StreamTestKit.PublisherProbeSubscription ExpectSubscription( CancellationToken cancellationToken = default) - => ExpectSubscriptionAsync(cancellationToken) + => ExpectSubscriptionTask(Probe, cancellationToken) .ConfigureAwait(false).GetAwaiter().GetResult(); /// @@ -103,64 +112,32 @@ public StreamTestKit.PublisherProbeSubscription ExpectSubscription( /// public async Task> ExpectSubscriptionAsync( CancellationToken cancellationToken = default) - { - var msg = await _probe.ExpectMsgAsync(cancellationToken: cancellationToken) - .ConfigureAwait(false); - return (StreamTestKit.PublisherProbeSubscription) msg.Subscription; - } + => await ExpectSubscriptionTask(Probe, cancellationToken) + .ConfigureAwait(false); - /// - /// Expect demand from the given subscription. - /// - public void ExpectRequest( - ISubscription subscription, - int n, - CancellationToken cancellationToken = default) - => ExpectRequestAsync(subscription, n, cancellationToken) - .ConfigureAwait(false).GetAwaiter().GetResult(); - /// /// Expect demand from the given subscription. /// public async Task ExpectRequestAsync( ISubscription subscription, - int n, + int nrOfElements, CancellationToken cancellationToken = default) - { - await _probe.ExpectMsgAsync( - isMessage: x => x.NrOfElements == n && x.Subscription == subscription, - cancellationToken: cancellationToken); - } - - /// - /// Expect no messages. - /// - public void ExpectNoMsg(CancellationToken cancellationToken = default) - => ExpectNoMsgAsync(cancellationToken) - .ConfigureAwait(false).GetAwaiter().GetResult(); + => await ExpectRequestTask(Probe, subscription, nrOfElements, cancellationToken) + .ConfigureAwait(false); /// /// Expect no messages. /// public async Task ExpectNoMsgAsync(CancellationToken cancellationToken = default) - { - await _probe.ExpectNoMsgAsync(cancellationToken); - } - - /// - /// Expect no messages for given duration. - /// - public void ExpectNoMsg(TimeSpan duration, CancellationToken cancellationToken = default) - => ExpectNoMsgAsync(duration, cancellationToken) - .ConfigureAwait(false).GetAwaiter().GetResult(); + => await ExpectNoMsgTask(Probe, cancellationToken) + .ConfigureAwait(false); /// /// Expect no messages for given duration. /// public async Task ExpectNoMsgAsync(TimeSpan duration, CancellationToken cancellationToken = default) - { - await _probe.ExpectNoMsgAsync(duration, cancellationToken); - } + => await ExpectNoMsgTask(Probe, duration, cancellationToken) + .ConfigureAwait(false); /// /// Receive messages for a given duration or until one does not match a given partial function. @@ -169,10 +146,10 @@ public IEnumerable ReceiveWhile( TimeSpan? max = null, TimeSpan? idle = null, Func filter = null, - int msgs = int.MaxValue, + int msgCount = int.MaxValue, CancellationToken cancellationToken = default) where TOther : class - => ReceiveWhileAsync(max, idle, filter, msgs, cancellationToken) - .ToListAsync(cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult(); + => ReceiveWhileTask(Probe, max, idle, filter, msgCount, cancellationToken).ToListAsync(cancellationToken) + .ConfigureAwait(false).GetAwaiter().GetResult(); /// /// Receive messages for a given duration or until one does not match a given partial function. @@ -181,16 +158,22 @@ public IAsyncEnumerable ReceiveWhileAsync( TimeSpan? max = null, TimeSpan? idle = null, Func filter = null, - int msgs = int.MaxValue, + int msgCount = int.MaxValue, CancellationToken cancellationToken = default) where TOther : class - => _probe.ReceiveWhileAsync(max, idle, filter, msgs, cancellationToken); + => ReceiveWhileTask(Probe, max, idle, filter, msgCount, cancellationToken); + /// + /// Expect a publisher event from the stream. + /// public IPublisherEvent ExpectEvent(CancellationToken cancellationToken = default) - => ExpectEventAsync(cancellationToken) + => ExpectEventTask(Probe, cancellationToken) .ConfigureAwait(false).GetAwaiter().GetResult(); + /// + /// Expect a publisher event from the stream. + /// public async Task ExpectEventAsync(CancellationToken cancellationToken = default) - => await _probe.ExpectMsgAsync(cancellationToken: cancellationToken) + => await ExpectEventTask(Probe, cancellationToken) .ConfigureAwait(false); /// @@ -254,7 +237,7 @@ public async Task WithinAsync( TimeSpan max, Func execute, CancellationToken cancellationToken = default) - => await _probe.WithinAsync(min, max, execute, cancellationToken: cancellationToken) + => await Probe.WithinAsync(min, max, execute, cancellationToken: cancellationToken) .ConfigureAwait(false); /// @@ -278,7 +261,7 @@ public async Task WithinAsync( /// /// /// - /// + /// /// /// public async Task WithinAsync( @@ -286,28 +269,28 @@ public async Task WithinAsync( TimeSpan max, Func> actionAsync, CancellationToken cancellationToken = default) - => await _probe.WithinAsync(min, max, actionAsync, cancellationToken: cancellationToken) + => await Probe.WithinAsync(min, max, actionAsync, cancellationToken: cancellationToken) .ConfigureAwait(false); /// /// Sane as calling Within(TimeSpan.Zero, max, function, cancellationToken). /// - public TOther Within(TimeSpan max, Func execute) - => WithinAsync(max, execute) + public TOther Within(TimeSpan max, Func execute, CancellationToken cancellationToken = default) + => WithinAsync(max, execute, cancellationToken) .ConfigureAwait(false).GetAwaiter().GetResult(); /// /// Sane as calling WithinAsync(TimeSpan.Zero, max, function, cancellationToken). /// public async Task WithinAsync(TimeSpan max, Func execute, CancellationToken cancellationToken = default) - => await _probe.WithinAsync(max, execute, cancellationToken: cancellationToken) + => await Probe.WithinAsync(max, execute, cancellationToken: cancellationToken) .ConfigureAwait(false); /// /// Sane as calling WithinAsync(TimeSpan.Zero, max, function, cancellationToken). /// public async Task WithinAsync(TimeSpan max, Func> execute, CancellationToken cancellationToken = default) - => await _probe.WithinAsync(max, execute, cancellationToken: cancellationToken) + => await Probe.WithinAsync(max, execute, cancellationToken: cancellationToken) .ConfigureAwait(false); } @@ -315,11 +298,10 @@ public async Task WithinAsync(TimeSpan max, Func> e /// Single subscription and demand tracking for . /// /// - public class Probe : ManualProbe + public partial class Probe : ManualProbe { private readonly long _initialPendingRequests; - private StreamTestKit.PublisherProbeSubscription _subscription = null; - + internal Probe(TestKitBase system, long initialPendingRequests) : base(system) { _initialPendingRequests = Pending = initialPendingRequests; @@ -334,71 +316,40 @@ internal Probe(TestKitBase system, long initialPendingRequests) : base(system) /// Asserts that a subscription has been received or will be received /// public void EnsureSubscription(CancellationToken cancellationToken = default) - { - if(_subscription == null) - _subscription = ExpectSubscription(cancellationToken); - } + => EnsureSubscriptionTask(this, cancellationToken) + .ConfigureAwait(false).GetAwaiter().GetResult(); public async Task EnsureSubscriptionAsync(CancellationToken cancellationToken = default) - { - if(_subscription == null) - _subscription = await ExpectSubscriptionAsync(cancellationToken) - .ConfigureAwait(false); - } - public Probe SendNext(T element) - { - EnsureSubscription(); - var sub = _subscription; - if (Pending == 0) - Pending = sub.ExpectRequest(); - Pending--; - sub.SendNext(element); - return this; - } + => await EnsureSubscriptionTask(this, cancellationToken) + .ConfigureAwait(false); - public Probe UnsafeSendNext(T element) - { - EnsureSubscription(); - _subscription.SendNext(element); - return this; - } + public async Task SendNextAsync(T element, CancellationToken cancellationToken = default) + => await SendNextTask(this, element, cancellationToken) + .ConfigureAwait(false); + + public async Task UnsafeSendNextAsync(T element, CancellationToken cancellationToken = default) + => await UnsafeSendNextTask(this, element, cancellationToken) + .ConfigureAwait(false); - public Probe SendComplete() - { - EnsureSubscription(); - _subscription.SendComplete(); - return this; - } + public async Task SendCompleteAsync(CancellationToken cancellationToken = default) + => await SendCompleteTask(this, cancellationToken) + .ConfigureAwait(false); - public Probe SendError(Exception e) - { - EnsureSubscription(); - _subscription.SendError(e); - return this; - } + public async Task SendErrorAsync(Exception e, CancellationToken cancellationToken = default) + => await SendErrorTask(this, e, cancellationToken) + .ConfigureAwait(false); public long ExpectRequest(CancellationToken cancellationToken = default) - => ExpectRequestAsync(cancellationToken) + => ExpectRequestTask(this, cancellationToken) .ConfigureAwait(false).GetAwaiter().GetResult(); public async Task ExpectRequestAsync(CancellationToken cancellationToken = default) - { - await EnsureSubscriptionAsync(cancellationToken); - var requests = await _subscription.ExpectRequestAsync(cancellationToken) + => await ExpectRequestTask(this, cancellationToken) .ConfigureAwait(false); - Pending += requests; - return requests; - } - public void ExpectCancellation(CancellationToken cancellationToken = default) - => ExpectCancellationAsync(cancellationToken) - .ConfigureAwait(false).GetAwaiter().GetResult(); - public async Task ExpectCancellationAsync(CancellationToken cancellationToken = default) - { - await EnsureSubscriptionAsync(cancellationToken); - await _subscription.ExpectCancellationAsync(cancellationToken); - } + => await ExpectCancellationTask(this, cancellationToken) + .ConfigureAwait(false); } internal sealed class LazyEmptyPublisher : IPublisher diff --git a/src/core/Akka.Streams.TestKit/TestPublisher_Fluent.cs b/src/core/Akka.Streams.TestKit/TestPublisher_Fluent.cs new file mode 100644 index 00000000000..1542a6bd06f --- /dev/null +++ b/src/core/Akka.Streams.TestKit/TestPublisher_Fluent.cs @@ -0,0 +1,113 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using Reactive.Streams; + +namespace Akka.Streams.TestKit +{ + // Fluent implementation + public static partial class TestPublisher + { + public partial class ManualProbe + { + /// + /// Fluent async DSL. + /// This will return an instance of that will compose and run + /// all of its method call asynchronously. + /// Note that contains two types of methods: + /// * Methods that returns are used to chain test methods together + /// using a fluent builder pattern. + /// * Methods with names that ends with the postfix "Async" and returns either a or + /// a . These methods invokes the previously chained methods asynchronously one + /// after another before executing its own code. + /// + /// + public PublisherFluentBuilder AsyncBuilder() + => new PublisherFluentBuilder(this); + + /// + /// Fluent DSL + /// Expect demand from the given subscription. + /// + public ManualProbe ExpectRequest( + ISubscription subscription, + int nrOfElements, + CancellationToken cancellationToken = default) + { + ExpectRequestTask(Probe, subscription, nrOfElements, cancellationToken) + .ConfigureAwait(false).GetAwaiter().GetResult(); + return this; + } + + /// + /// Fluent DSL + /// Expect no messages. + /// + public ManualProbe ExpectNoMsg(CancellationToken cancellationToken = default) + { + ExpectNoMsgTask(Probe, cancellationToken) + .ConfigureAwait(false).GetAwaiter().GetResult(); + return this; + } + + /// + /// Fluent DSL + /// Expect no messages for given duration. + /// + public ManualProbe ExpectNoMsg(TimeSpan duration, CancellationToken cancellationToken = default) + { + ExpectNoMsgTask(Probe, duration, cancellationToken) + .ConfigureAwait(false).GetAwaiter().GetResult(); + return this; + } + + + } + + public partial class Probe + { + public Probe SendNext(T element, CancellationToken cancellationToken = default) + { + SendNextTask(this, element, cancellationToken) + .ConfigureAwait(false).GetAwaiter().GetResult(); + return this; + } + + public Probe UnsafeSendNext(T element, CancellationToken cancellationToken = default) + { + UnsafeSendNextTask(this, element, cancellationToken) + .ConfigureAwait(false).GetAwaiter().GetResult(); + return this; + } + + public Probe SendComplete(CancellationToken cancellationToken = default) + { + SendCompleteTask(this, cancellationToken) + .ConfigureAwait(false).GetAwaiter().GetResult(); + return this; + } + + public Probe SendError(Exception e, CancellationToken cancellationToken = default) + { + SendErrorTask(this, e, cancellationToken) + .ConfigureAwait(false).GetAwaiter().GetResult(); + return this; + } + + public Probe ExpectCancellation(CancellationToken cancellationToken = default) + { + ExpectCancellationTask(this, cancellationToken) + .ConfigureAwait(false).GetAwaiter().GetResult(); + return this; + } + + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams.TestKit/TestPublisher_Shared.cs b/src/core/Akka.Streams.TestKit/TestPublisher_Shared.cs new file mode 100644 index 00000000000..cedcb11c8a6 --- /dev/null +++ b/src/core/Akka.Streams.TestKit/TestPublisher_Shared.cs @@ -0,0 +1,141 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Akka.TestKit; +using Reactive.Streams; + +namespace Akka.Streams.TestKit +{ + public static partial class TestPublisher + { + public partial class ManualProbe + { + protected static async Task> ExpectSubscriptionTask( + TestProbe probe, + CancellationToken cancellationToken) + { + var msg = await probe.ExpectMsgAsync(cancellationToken: cancellationToken); + return (StreamTestKit.PublisherProbeSubscription) msg.Subscription; + } + + #region void methods + + internal static async Task ExpectRequestTask( + TestProbe probe, + ISubscription subscription, + int nrOfElements, + CancellationToken cancellationToken) + => await probe.ExpectMsgAsync( + isMessage: x => x.NrOfElements == nrOfElements && x.Subscription == subscription, + cancellationToken: cancellationToken); + + internal static async Task ExpectNoMsgTask(TestProbe probe, CancellationToken cancellationToken) + => await probe.ExpectNoMsgAsync(cancellationToken); + + internal static async Task ExpectNoMsgTask( + TestProbe probe, + TimeSpan duration, + CancellationToken cancellationToken) + => await probe.ExpectNoMsgAsync(duration, cancellationToken); + + #endregion + + #region Return type methods + + internal static IAsyncEnumerable ReceiveWhileTask( + TestProbe probe, + TimeSpan? max, + TimeSpan? idle, + Func filter, + int msgCount, + CancellationToken cancellationToken) where TOther : class + => probe.ReceiveWhileAsync(max, idle, filter, msgCount, cancellationToken); + + internal static ValueTask ExpectEventTask( + TestProbe probe, + CancellationToken cancellationToken) + => probe.ExpectMsgAsync(cancellationToken: cancellationToken); + + #endregion + } + + public partial class Probe + { + #region void methods + + internal static async Task EnsureSubscriptionTask(Probe probe, CancellationToken cancellationToken) + { + probe.Subscription ??= await ExpectSubscriptionTask(probe.Probe, cancellationToken); + } + + internal static async Task SendNextTask(Probe probe, T element, CancellationToken cancellationToken) + { + await EnsureSubscriptionTask(probe, cancellationToken); + var sub = probe.Subscription; + if (probe.Pending == 0) + probe.Pending = await sub.ExpectRequestAsync(cancellationToken); + probe.Pending--; + sub.SendNext(element); + } + + internal static async Task SendNextTask(Probe probe, IEnumerable elements, CancellationToken cancellationToken) + { + await EnsureSubscriptionTask(probe, cancellationToken); + var sub = probe.Subscription; + foreach (var element in elements) + { + if (probe.Pending == 0) + probe.Pending = await sub.ExpectRequestAsync(cancellationToken); + probe.Pending--; + sub.SendNext(element); + } + } + + internal static async Task UnsafeSendNextTask(Probe probe, T element, CancellationToken cancellationToken) + { + await EnsureSubscriptionTask(probe, cancellationToken); + probe.Subscription.SendNext(element); + } + + internal static async Task SendCompleteTask(Probe probe, CancellationToken cancellationToken) + { + await EnsureSubscriptionTask(probe, cancellationToken); + probe.Subscription.SendComplete(); + } + + internal static async Task SendErrorTask(Probe probe, Exception e, CancellationToken cancellationToken) + { + await EnsureSubscriptionTask(probe, cancellationToken); + probe.Subscription.SendError(e); + } + + #endregion + + #region Return type methods + + internal static async Task ExpectRequestTask(Probe probe, CancellationToken cancellationToken) + { + await EnsureSubscriptionTask(probe, cancellationToken); + var requests = await probe.Subscription.ExpectRequestAsync(cancellationToken); + probe.Pending += requests; + return requests; + } + + internal static async Task ExpectCancellationTask(Probe probe, CancellationToken cancellationToken = default) + { + await EnsureSubscriptionTask(probe, cancellationToken); + await probe.Subscription.ExpectCancellationAsync(cancellationToken); + } + + #endregion + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams.TestKit/TestSubscriber.cs b/src/core/Akka.Streams.TestKit/TestSubscriber.cs index 29ccb15c938..7968b79aa46 100644 --- a/src/core/Akka.Streams.TestKit/TestSubscriber.cs +++ b/src/core/Akka.Streams.TestKit/TestSubscriber.cs @@ -92,7 +92,7 @@ public partial class ManualProbe : ISubscriber { private readonly TestKitBase _testKit; internal readonly TestProbe TestProbe; - private volatile ISubscription _subscription; + private volatile ISubscription _subscription_DoNotUseDirectly; internal ManualProbe(TestKitBase testKit) { @@ -103,8 +103,8 @@ internal ManualProbe(TestKitBase testKit) public ISubscription Subscription { #pragma warning disable CS0420 - get => Volatile.Read(ref _subscription); - internal set => Volatile.Write(ref _subscription, value); + get => Volatile.Read(ref _subscription_DoNotUseDirectly); + protected set => Volatile.Write(ref _subscription_DoNotUseDirectly, value); #pragma warning restore CS0420 } @@ -188,6 +188,10 @@ public async Task ExpectNextAsync(TimeSpan? timeout, CancellationToken cancel => await ExpectNextTask(TestProbe, timeout, cancellationToken) .ConfigureAwait(false); + public async Task ExpectNextAsync(T element, CancellationToken cancellationToken = default) + => await ExpectNextTask(probe: TestProbe, element: element, timeout: null, cancellationToken: cancellationToken) + .ConfigureAwait(false); + /// /// Expect and return the next stream elements. /// @@ -459,32 +463,27 @@ public Probe EnsureSubscription(CancellationToken cancellationToken = default /// /// Ensure that the probe has received or will receive a subscription /// - public async Task> EnsureSubscriptionAsync(CancellationToken cancellationToken = default) - { - await EnsureSubscriptionTask(this, cancellationToken); - return this; - } + public async Task EnsureSubscriptionAsync(CancellationToken cancellationToken = default) + => await EnsureSubscriptionTask(this, cancellationToken) + .ConfigureAwait(false); - public Probe Request(long n) + public async Task RequestAsync(long n) { - EnsureSubscription(); + await EnsureSubscriptionAsync(); Subscription.Request(n); - return this; } - public Probe RequestNext(T element) + public async Task RequestNextAsync(T element) { - EnsureSubscription(); + await EnsureSubscriptionAsync(); Subscription.Request(1); - ExpectNext(element); - return this; + await ExpectNextAsync(element); } - public Probe Cancel() + public async Task CancelAsync() { - EnsureSubscription(); + await EnsureSubscriptionAsync(); Subscription.Cancel(); - return this; } /// @@ -497,6 +496,16 @@ public T RequestNext() return ExpectNext(); } + /// + /// Request and expect a stream element. + /// + public async Task RequestNextAsync() + { + await EnsureSubscriptionAsync(); + Subscription.Request(1); + return await ExpectNextAsync(); + } + /// /// Request and expect a stream element during the specified time or timeout. /// @@ -506,6 +515,14 @@ public T RequestNext(TimeSpan timeout) Subscription.Request(1); return ExpectNext(timeout); } + + public async Task RequestNextAsync(TimeSpan timeout) + { + await EnsureSubscriptionAsync(); + Subscription.Request(1); + return await ExpectNextAsync(timeout); + } + } public static ManualProbe CreateManualSubscriberProbe(this TestKitBase testKit) diff --git a/src/core/Akka.Streams.TestKit/TestSubscriber_Fluent.cs b/src/core/Akka.Streams.TestKit/TestSubscriber_Fluent.cs index 446b3227265..8af11c806e3 100644 --- a/src/core/Akka.Streams.TestKit/TestSubscriber_Fluent.cs +++ b/src/core/Akka.Streams.TestKit/TestSubscriber_Fluent.cs @@ -188,6 +188,9 @@ public ManualProbe ExpectComplete(CancellationToken cancellationToken = defau return this; } + public async Task ExpectCompleteAsync(CancellationToken cancellationToken = default) + => await ExpectCompleteTask(TestProbe, null, cancellationToken); + /// /// Fluent DSL. Expect completion with a timeout. /// @@ -276,5 +279,32 @@ public ManualProbe MatchNext(Predicate predicate, Cancellatio return this; } } + + public partial class Probe + { + public Probe Request(long n) + { + EnsureSubscription(); + Subscription.Request(n); + return this; + } + + public Probe Cancel() + { + EnsureSubscription(); + Subscription.Cancel(); + return this; + } + + public Probe RequestNext(T element) + { + EnsureSubscription(); + Subscription.Request(1); + ExpectNext(element); + return this; + } + + + } } } \ No newline at end of file diff --git a/src/core/Akka.Streams.TestKit/TestSubscriber_Shared.cs b/src/core/Akka.Streams.TestKit/TestSubscriber_Shared.cs index 59a5f6f23a0..c707eb289b5 100644 --- a/src/core/Akka.Streams.TestKit/TestSubscriber_Shared.cs +++ b/src/core/Akka.Streams.TestKit/TestSubscriber_Shared.cs @@ -305,11 +305,11 @@ internal static async IAsyncEnumerable ToStrictTask( TimeSpan atMost, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - var deadline = DateTime.UtcNow + atMost; // if no subscription was obtained yet, we expect it await EnsureSubscriptionTask(probe, cancellationToken); probe.Subscription.Request(long.MaxValue); + var deadline = DateTime.UtcNow + atMost; var result = new List(); while (true) { @@ -330,6 +330,8 @@ internal static async IAsyncEnumerable ToStrictTask( result.Add(next.Element); yield return next.Element; break; + default: + throw new InvalidOperationException($"Invalid response, expected {nameof(OnError)}, {nameof(OnComplete)}, or {nameof(OnNext)}, received [{e.GetType()}]"); } } } diff --git a/src/core/Akka.Streams.TestKit/Utils.cs b/src/core/Akka.Streams.TestKit/Utils.cs index c4942138bb7..5a2e457f34a 100644 --- a/src/core/Akka.Streams.TestKit/Utils.cs +++ b/src/core/Akka.Streams.TestKit/Utils.cs @@ -41,13 +41,13 @@ public static async Task AssertAllStagesStoppedAsync(this AkkaSpec spec, F public static async Task AssertAllStagesStoppedAsync(this AkkaSpec spec, Func> block, IMaterializer materializer) { + var result = await block(); if (!(materializer is ActorMaterializerImpl impl)) - return await block(); + return result; var probe = spec.CreateTestProbe(impl.System); probe.Send(impl.Supervisor, StreamSupervisor.StopChildren.Instance); await probe.ExpectMsgAsync(); - var result = await block(); await probe.WithinAsync(TimeSpan.FromSeconds(5), async () => { From 489af63dd0eb9158e246817dee0fb333089012c9 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 10 May 2022 21:17:22 +0700 Subject: [PATCH 2/2] Revert build.fsx changes --- build.fsx | 67 +++++++++++++++++++------------------------------------ 1 file changed, 23 insertions(+), 44 deletions(-) diff --git a/build.fsx b/build.fsx index f78ef249ef7..83350b7cabb 100644 --- a/build.fsx +++ b/build.fsx @@ -241,34 +241,27 @@ Target "RunTests" (fun _ -> | true -> !! "./src/**/*.Tests.*sproj" ++ "./src/**/Akka.Streams.Tests.TCK.csproj" -- "./src/**/*.Tests.MultiNode.csproj" - -- "./src/**/*.Tests.Performance.csproj" -- "./src/examples/**" | _ -> !! "./src/**/*.Tests.*sproj" // if you need to filter specs for Linux vs. Windows, do it here -- "./src/**/*.Tests.MultiNode.csproj" - -- "./src/**/*.Tests.Performance.csproj" -- "./src/examples/**" rawProjects |> Seq.choose filterProjects - let projectDlls = projects |> Seq.map ( fun project -> - let assemblyName = fileNameWithoutExt project - (directory project) @@ "bin" @@ "Release" @@ testNetFrameworkVersion @@ assemblyName + ".dll" - ) - - let runSingleProject projectDll = + let runSingleProject project = let arguments = match (hasTeamCity) with - | true -> (sprintf "test \"%s\" --blame-crash --blame-hang-timeout 25m -l:trx -l:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -teamcity" projectDll testNetFrameworkVersion outputTests) - | false -> (sprintf "test \"%s\" --blame-crash --blame-hang-timeout 25m -l:trx -l:\"console;verbosity=normal\" --framework %s --results-directory \"%s\"" projectDll testNetFrameworkVersion outputTests) - + | true -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetFrameworkVersion outputTests) + | false -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetFrameworkVersion outputTests) + let result = ExecProcess(fun info -> info.FileName <- "dotnet" - info.WorkingDirectory <- outputTests + info.WorkingDirectory <- (Directory.GetParent project).FullName info.Arguments <- arguments) (TimeSpan.FromMinutes 30.0) ResultHandling.failBuildIfXUnitReportedError TestRunnerErrorLevel.Error result - + CreateDir outputTests - projectDlls |> Seq.iter (runSingleProject) + projects |> Seq.iter (runSingleProject) ) Target "RunTestsNetCore" (fun _ -> @@ -278,34 +271,27 @@ Target "RunTestsNetCore" (fun _ -> | true -> !! "./src/**/*.Tests.*sproj" ++ "./src/**/Akka.Streams.Tests.TCK.csproj" -- "./src/**/*.Tests.MultiNode.csproj" - -- "./src/**/*.Tests.Performance.csproj" -- "./src/examples/**" | _ -> !! "./src/**/*.Tests.*sproj" // if you need to filter specs for Linux vs. Windows, do it here -- "./src/**/*.Tests.MultiNode.csproj" - -- "./src/**/*.Tests.Performance.csproj" -- "./src/examples/**" rawProjects |> Seq.choose filterProjects - let projectDlls = projects |> Seq.map ( fun project -> - let assemblyName = fileNameWithoutExt project - (directory project) @@ "bin" @@ "Release" @@ testNetCoreVersion @@ assemblyName + ".dll" - ) - - let runSingleProject projectDll = + let runSingleProject project = let arguments = match (hasTeamCity) with - | true -> (sprintf "test \"%s\" --blame-crash --blame-hang-timeout 25m -l:trx -l:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -teamcity" projectDll testNetCoreVersion outputTests) - | false -> (sprintf "test \"%s\" --blame-crash --blame-hang-timeout 25m -l:trx -l:\"console;verbosity=normal\" --framework %s --results-directory \"%s\"" projectDll testNetCoreVersion outputTests) + | true -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetCoreVersion outputTests) + | false -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetCoreVersion outputTests) let result = ExecProcess(fun info -> info.FileName <- "dotnet" - info.WorkingDirectory <- outputTests + info.WorkingDirectory <- (Directory.GetParent project).FullName info.Arguments <- arguments) (TimeSpan.FromMinutes 30.0) ResultHandling.failBuildIfXUnitReportedError TestRunnerErrorLevel.Error result CreateDir outputTests - projectDlls |> Seq.iter (runSingleProject) + projects |> Seq.iter (runSingleProject) ) Target "RunTestsNet" (fun _ -> @@ -315,34 +301,27 @@ Target "RunTestsNet" (fun _ -> | true -> !! "./src/**/*.Tests.*sproj" ++ "./src/**/Akka.Streams.Tests.TCK.csproj" -- "./src/**/*.Tests.MultiNode.csproj" - -- "./src/**/*.Tests.Performance.csproj" -- "./src/examples/**" | _ -> !! "./src/**/*.Tests.*sproj" // if you need to filter specs for Linux vs. Windows, do it here -- "./src/**/*.Tests.MultiNode.csproj" - -- "./src/**/*.Tests.Performance.csproj" -- "./src/examples/**" rawProjects |> Seq.choose filterProjects - let projectDlls = projects |> Seq.map ( fun project -> - let assemblyName = fileNameWithoutExt project - (directory project) @@ "bin" @@ "Release" @@ testNetVersion @@ assemblyName + ".dll" - ) - - let runSingleProject projectDll = + let runSingleProject project = let arguments = match (hasTeamCity) with - | true -> (sprintf "test \"%s\" --blame-crash --blame-hang-timeout 25m -l:trx -l:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -teamcity" projectDll testNetVersion outputTests) - | false -> (sprintf "test \"%s\" --blame-crash --blame-hang-timeout 25m -l:trx -l:\"console;verbosity=normal\" --framework %s --results-directory \"%s\"" projectDll testNetVersion outputTests) + | true -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none -teamcity" testNetVersion outputTests) + | false -> (sprintf "test -c Release --no-build --logger:trx --logger:\"console;verbosity=normal\" --framework %s --results-directory \"%s\" -- -parallel none" testNetVersion outputTests) let result = ExecProcess(fun info -> info.FileName <- "dotnet" - info.WorkingDirectory <- outputTests + info.WorkingDirectory <- (Directory.GetParent project).FullName info.Arguments <- arguments) (TimeSpan.FromMinutes 30.0) ResultHandling.failBuildIfXUnitReportedError TestRunnerErrorLevel.Error result CreateDir outputTests - projectDlls |> Seq.iter (runSingleProject) + projects |> Seq.iter (runSingleProject) ) Target "MultiNodeTestsNetCore" (fun _ -> @@ -363,13 +342,13 @@ Target "MultiNodeTestsNetCore" (fun _ -> let runSingleProject projectDll = let arguments = match (hasTeamCity) with - | true -> (sprintf "test \"%s\" -l:trx -l:\"console;verbosity=detailed\" --framework %s --results-directory \"%s\" -- -teamcity" projectDll testNetCoreVersion outputMultiNode) - | false -> (sprintf "test \"%s\" -l:trx -l:\"console;verbosity=detailed\" --framework %s --results-directory \"%s\"" projectDll testNetCoreVersion outputMultiNode) + | true -> (sprintf "test \"%s\" -l:\"console;verbosity=detailed\" --framework %s --results-directory \"%s\" -- -teamcity" projectDll testNetCoreVersion outputMultiNode) + | false -> (sprintf "test \"%s\" -l:\"console;verbosity=detailed\" --framework %s --results-directory \"%s\"" projectDll testNetCoreVersion outputMultiNode) let resultPath = (directory projectDll) File.WriteAllText( (resultPath @@ "xunit.multinode.runner.json"), - (sprintf "{\"outputDirectory\":\"%s\"}" outputMultiNode).Replace("\\", "\\\\")) + (sprintf "{\"outputDirectory\":\"%s\", \"useBuiltInTrxReporter\":true}" outputMultiNode).Replace("\\", "\\\\")) let result = ExecProcess(fun info -> info.FileName <- "dotnet" @@ -402,13 +381,13 @@ Target "MultiNodeTestsNet" (fun _ -> let runSingleProject projectDll = let arguments = match (hasTeamCity) with - | true -> (sprintf "test \"%s\" -l:trx -l:\"console;verbosity=detailed\" --framework %s --results-directory \"%s\" -- -teamcity" projectDll testNetVersion outputMultiNode) - | false -> (sprintf "test \"%s\" -l:trx -l:\"console;verbosity=detailed\" --framework %s --results-directory \"%s\"" projectDll testNetVersion outputMultiNode) + | true -> (sprintf "test \"%s\" -l:\"console;verbosity=detailed\" --framework %s --results-directory \"%s\" -- -teamcity" projectDll testNetVersion outputMultiNode) + | false -> (sprintf "test \"%s\" -l:\"console;verbosity=detailed\" --framework %s --results-directory \"%s\"" projectDll testNetVersion outputMultiNode) let resultPath = (directory projectDll) File.WriteAllText( (resultPath @@ "xunit.multinode.runner.json"), - (sprintf "{\"outputDirectory\":\"%s\"}" outputMultiNode).Replace("\\", "\\\\")) + (sprintf "{\"outputDirectory\":\"%s\", \"useBuiltInTrxReporter\":true}" outputMultiNode).Replace("\\", "\\\\")) let result = ExecProcess(fun info -> info.FileName <- "dotnet"