From 00f4f9e3235b9172548373ad6b11ae8b9acdca9e Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Thu, 24 Feb 2022 22:37:40 +0100 Subject: [PATCH] Change ReceiveWhile Test Methods to Sync over Async (#5682) * Fix the remaining `FishForMessage` `Sync` over `Async` methods * * Changed `ReceiveWhile` to `Sync` over `Async` * Created `ReceiveWhileAsync()` * Add missing TBD * Create `ReceiveNAsync()` * Potential fix for DocFx `StackOverflow` exception * * Changed `FishForMessage` to directly call its `async` version * Fix possible cause of `Stackoverflow` exception - methods inheriting docs from itself. * Fix build error * Added `CancellationToken` support * Changed Receive methods to sync-over-async * Add CancellationToken support to InternalReceiveNAsync, remove non-async private InternalReceiveN Co-authored-by: Gregorius Soedharmo --- src/core/Akka.TestKit/Akka.TestKit.csproj | 2 + src/core/Akka.TestKit/TestKitBase_Expect.cs | 22 +- src/core/Akka.TestKit/TestKitBase_Receive.cs | 237 ++++++++++++------- 3 files changed, 166 insertions(+), 95 deletions(-) diff --git a/src/core/Akka.TestKit/Akka.TestKit.csproj b/src/core/Akka.TestKit/Akka.TestKit.csproj index a09cb7fbba6..f56c969e28c 100644 --- a/src/core/Akka.TestKit/Akka.TestKit.csproj +++ b/src/core/Akka.TestKit/Akka.TestKit.csproj @@ -28,8 +28,10 @@ + + diff --git a/src/core/Akka.TestKit/TestKitBase_Expect.cs b/src/core/Akka.TestKit/TestKitBase_Expect.cs index 54581346cce..7d25b21b3fc 100644 --- a/src/core/Akka.TestKit/TestKitBase_Expect.cs +++ b/src/core/Akka.TestKit/TestKitBase_Expect.cs @@ -9,9 +9,11 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.TestKit.Internal; using Akka.Util; +using Nito.AsyncEx.Synchronous; namespace Akka.TestKit { @@ -369,19 +371,29 @@ public IReadOnlyCollection ExpectMsgAllOf(TimeSpan max, params T[] message return InternalExpectMsgAllOf(dilated, messages); } - private IReadOnlyCollection InternalExpectMsgAllOf(TimeSpan max, IReadOnlyCollection messages, Func areEqual = null, bool shouldLog=false) + private IReadOnlyCollection InternalExpectMsgAllOf(TimeSpan max, IReadOnlyCollection messages, Func areEqual = null, bool shouldLog=false, CancellationToken cancellationToken = default) + { + var task = InternalExpectMsgAllOfAsync(max, messages, areEqual, shouldLog, cancellationToken); + task.WaitAndUnwrapException(cancellationToken); + return task.Result; + } + + private async Task> InternalExpectMsgAllOfAsync(TimeSpan max, + IReadOnlyCollection messages, Func areEqual = null, bool shouldLog = false, + CancellationToken cancellationToken = default) { ConditionalLog(shouldLog, "Expecting {0} messages during {1}", messages.Count, max); areEqual = areEqual ?? ((x, y) => Equals(x, y)); var start = Now; - var receivedMessages = InternalReceiveN(messages.Count, max, shouldLog).ToList(); - var missing = messages.Where(m => !receivedMessages.Any(r => r is T && areEqual((T)r, m))).ToList(); - var unexpected = receivedMessages.Where(r => !messages.Any(m => r is T && areEqual((T)r, m))).ToList(); + + var receivedMessages = await InternalReceiveNAsync(messages.Count, max, shouldLog, cancellationToken).ToListAsync(cancellationToken); + + var missing = messages.Where(m => !receivedMessages.Any(r => r is T obj && areEqual(obj, m))).ToList(); + var unexpected = receivedMessages.Where(r => !messages.Any(m => r is T obj && areEqual(obj, m))).ToList(); CheckMissingAndUnexpected(missing, unexpected, "not found", "found unexpected", shouldLog, string.Format("Expected {0} messages during {1}. Failed after {2}. ", messages.Count, max, Now-start)); return receivedMessages.Cast().ToList(); } - private void CheckMissingAndUnexpected(IReadOnlyCollection missing, IReadOnlyCollection unexpected, string missingMessage, string unexpectedMessage, bool shouldLog, string hint) { var missingIsEmpty = missing.Count == 0; diff --git a/src/core/Akka.TestKit/TestKitBase_Receive.cs b/src/core/Akka.TestKit/TestKitBase_Receive.cs index 8cfbf5162ec..0d4c0c7ab5b 100644 --- a/src/core/Akka.TestKit/TestKitBase_Receive.cs +++ b/src/core/Akka.TestKit/TestKitBase_Receive.cs @@ -9,6 +9,7 @@ using System.Collections; using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Akka.TestKit.Internal; @@ -28,10 +29,19 @@ public abstract partial class TestKitBase /// The is message. /// The maximum. /// The hint. + /// /// Returns the message that matched - public object FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "") + public object FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) { - return FishForMessage(isMessage, max, hint); + var task = FishForMessageAsync(isMessage, max, hint, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; + } + + /// + public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) + { + return await FishForMessageAsync(isMessage, max, hint, cancellationToken); } /// @@ -42,10 +52,19 @@ public object FishForMessage(Predicate isMessage, TimeSpan? max = null, /// The is message. /// The maximum. /// The hint. + /// /// Returns the message that matched - public T FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "") + public T FishForMessage(Predicate isMessage, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) { - return FishForMessage(isMessage: isMessage, max: max, hint: hint, allMessages: null); + var task = FishForMessageAsync(isMessage, max, hint, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; + } + + /// + public async ValueTask FishForMessageAsync(Predicate isMessage, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) + { + return await FishForMessageAsync(isMessage: isMessage, max: max, hint: hint, allMessages: null, cancellationToken: cancellationToken); } /// @@ -56,17 +75,18 @@ public T FishForMessage(Predicate isMessage, TimeSpan? max = null, string /// The is message. /// The maximum. /// The hint. + /// /// If null then will be ignored. If not null then will be initially cleared, then filled with all the messages until returns true /// Returns the message that matched - public T FishForMessage(Predicate isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "") + public T FishForMessage(Predicate isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) { - var task = FishForMessageAsync(isMessage, allMessages, max, hint).AsTask(); + var task = FishForMessageAsync(isMessage, allMessages, max, hint, cancellationToken).AsTask(); task.WaitAndUnwrapException(); return task.Result; } - /// - public async ValueTask FishForMessageAsync(Predicate isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "") + /// + public async ValueTask FishForMessageAsync(Predicate isMessage, ArrayList allMessages, TimeSpan? max = null, string hint = "", CancellationToken cancellationToken = default) { var maxValue = RemainingOrDilated(max); var end = Now + maxValue; @@ -74,7 +94,7 @@ public async ValueTask FishForMessageAsync(Predicate isMessage, ArrayLi while (true) { var left = end - Now; - var msg = await ReceiveOneAsync(left).ConfigureAwait(false); + var msg = await ReceiveOneAsync(left, cancellationToken).ConfigureAwait(false); _assertions.AssertTrue(msg != null, "Timeout ({0}) during fishForMessage{1}", maxValue, string.IsNullOrEmpty(hint) ? "" : ", hint: " + hint); if (msg is T msg1 && isMessage(msg1)) { @@ -92,16 +112,14 @@ public async ValueTask FishForMessageAsync(Predicate isMessage, ArrayLi /// /// The type that the message is not supposed to be. /// Optional. The maximum wait duration. Defaults to when unset. - public async Task FishUntilMessageAsync(TimeSpan? max = null) + /// + public async Task FishUntilMessageAsync(TimeSpan? max = null, CancellationToken cancellationToken = default) { - await Task.Run(() => + await ReceiveWhileAsync(max: max, shouldContinue: x => { - ReceiveWhile(max: max, shouldContinue: x => - { - _assertions.AssertFalse(x is T, "did not expect a message of type {0}", typeof(T)); - return true; // please continue receiving, don't stop - }); - }); + _assertions.AssertFalse(x is T, "did not expect a message of type {0}", typeof(T)); + return true; // please continue receiving, don't stop + },cancellationToken: cancellationToken); } /// @@ -110,29 +128,27 @@ await Task.Run(() => /// /// A temporary period of 'radio-silence'. /// The method asserts that is never reached. + /// /// If set to null then this method will loop for an infinite number of periods. /// NOTE: If set to null and radio-silence is never reached then this method will never return. /// Returns all the messages encountered before 'radio-silence' was reached. - public async Task WaitForRadioSilenceAsync(TimeSpan? max = null, uint? maxMessages = null) + public async Task WaitForRadioSilenceAsync(TimeSpan? max = null, uint? maxMessages = null, CancellationToken cancellationToken = default) { - return await Task.Run(() => - { - var messages = new ArrayList(); + var messages = new ArrayList(); - for (uint i = 0; ; i++) - { - _assertions.AssertFalse(maxMessages.HasValue && i > maxMessages.Value, $"{nameof(maxMessages)} violated (current iteration: {i})."); + for (uint i = 0; ; i++) + { + _assertions.AssertFalse(maxMessages.HasValue && i > maxMessages.Value, $"{nameof(maxMessages)} violated (current iteration: {i})."); - var message = ReceiveOne(max: max); + var message = await ReceiveOneAsync(max: max, cancellationToken); - if (message == null) - { - return ArrayList.ReadOnly(messages); - } - - messages.Add(message); + if (message == null) + { + return ArrayList.ReadOnly(messages); } - }); + + messages.Add(message); + } } /// /// Receive one message from the internal queue of the TestActor. @@ -144,19 +160,20 @@ public async Task WaitForRadioSilenceAsync(TimeSpan? max = null, uint /// If null the config value "akka.test.single-expect-default" is used as timeout. /// If set to a negative value or , blocks forever. /// This method does NOT automatically scale its Duration parameter using ! + /// /// The message if one was received; null otherwise - public object ReceiveOne(TimeSpan? max = null) + public object ReceiveOne(TimeSpan? max = null,CancellationToken cancellationToken = default) { - var task = ReceiveOneAsync(max).AsTask(); + var task = ReceiveOneAsync(max, cancellationToken).AsTask(); task.WaitAndUnwrapException(); var received = task.Result; return received; } - /// - public async ValueTask ReceiveOneAsync(TimeSpan? max = null) + /// + public async ValueTask ReceiveOneAsync(TimeSpan? max = null, CancellationToken cancellationToken = default) { - var received = await TryReceiveOneAsync(max, CancellationToken.None); + var received = await TryReceiveOneAsync(max, cancellationToken); if (received.success) return received.envelope.Message; @@ -348,30 +365,6 @@ public async ValueTask PeekOneAsync(CancellationToken cancellationToken) return null; } - /// - /// Peek one message from the head of the internal queue of the TestActor within - /// the specified duration. The method blocks the specified duration. - /// Note! that the returned - /// is a containing the sender and the message. - /// This method does NOT automatically scale its Duration parameter using ! - /// - /// The received envelope. - /// Optional: The maximum duration to wait. - /// If null the config value "akka.test.single-expect-default" is used as timeout. - /// If set to a negative value or , blocks forever. - /// This method does NOT automatically scale its Duration parameter using ! - /// True if a message was received within the specified duration; false otherwise. - public bool TryPeekOne(out MessageEnvelope envelope, TimeSpan? max = null) - { - return InternalTryPeekOne(out envelope, max, CancellationToken.None, true); - } - - /// - public async ValueTask<(bool success, MessageEnvelope envelope)> TryPeekOneAsync(TimeSpan? max = null) - { - return await InternalTryPeekOneAsync(max, CancellationToken.None, true); - } - /// /// Peek one message from the head of the internal queue of the TestActor within /// the specified duration. @@ -469,12 +462,20 @@ private bool InternalTryPeekOne(out MessageEnvelope envelope, TimeSpan? max, Can /// TBD /// TBD /// TBD + /// /// TBD - public IReadOnlyList ReceiveWhile(TimeSpan? max, Func filter, int msgs = int.MaxValue) where T : class + public IReadOnlyList ReceiveWhile(TimeSpan? max, Func filter, int msgs = int.MaxValue, CancellationToken cancellationToken = default) where T : class { - return ReceiveWhile(filter, max, Timeout.InfiniteTimeSpan, msgs); + var task = ReceiveWhileAsync(max, filter, msgs, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; } + /// + public async ValueTask> ReceiveWhileAsync(TimeSpan? max, Func filter, int msgs = int.MaxValue, CancellationToken cancellationToken = default) where T : class + { + return await ReceiveWhileAsync(filter, max, Timeout.InfiniteTimeSpan, msgs, cancellationToken); + } /// /// Receive a series of messages until the function returns null or the idle /// timeout is met or the overall maximum duration is elapsed or @@ -489,10 +490,19 @@ public IReadOnlyList ReceiveWhile(TimeSpan? max, Func filter, i /// TBD /// TBD /// TBD + /// /// TBD - public IReadOnlyList ReceiveWhile(TimeSpan? max, TimeSpan? idle, Func filter, int msgs = int.MaxValue) + public IReadOnlyList ReceiveWhile(TimeSpan? max, TimeSpan? idle, Func filter, int msgs = int.MaxValue, CancellationToken cancellationToken = default) + { + var task = ReceiveWhileAsync(max, idle, filter, msgs, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; + } + + /// + public async ValueTask> ReceiveWhileAsync(TimeSpan? max, TimeSpan? idle, Func filter, int msgs = int.MaxValue, CancellationToken cancellationToken = default) { - return ReceiveWhile(filter, max, idle, msgs); + return await ReceiveWhileAsync(filter, max, idle, msgs, cancellationToken); } /// @@ -509,8 +519,18 @@ public IReadOnlyList ReceiveWhile(TimeSpan? max, TimeSpan? idle, FuncTBD /// TBD /// TBD + /// /// TBD - public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue) + public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, CancellationToken cancellationToken = default) + { + var task = ReceiveWhileAsync(filter, max, idle, msgs, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; + } + + + /// + public async ValueTask> ReceiveWhileAsync(Func filter, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, CancellationToken cancellationToken = default) { var maxValue = RemainingOrDilated(max); var start = Now; @@ -522,27 +542,32 @@ public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = MessageEnvelope msg = NullMessageEnvelope.Instance; while (count < msgs) { + cancellationToken.ThrowIfCancellationRequested(); // Peek the message on the front of the queue - if (!TryPeekOne(out var envelope, (stop - Now).Min(idleValue))) + var peeked = await TryPeekOneAsync((stop - Now).Min(idleValue), cancellationToken) + .ConfigureAwait(false); + if (!peeked.success) { _testState.LastMessage = msg; break; } - var message = envelope.Message; + var message = peeked.envelope.Message; var result = filter(message); - + // If the message is accepted by the filter, remove it from the queue if (result != null) { // This should happen immediately (zero timespan). Something is wrong if this fails. - if (!InternalTryReceiveOne(out var removed, TimeSpan.Zero, CancellationToken.None, true)) + var received = await InternalTryReceiveOneAsync(TimeSpan.Zero, cancellationToken, true) + .ConfigureAwait(false); + if (!received.success) throw new InvalidOperationException("[RACY] Could not dequeue an item from test queue."); - + // The removed item should be equal to the one peeked previously - if(!ReferenceEquals(envelope, removed)) + if (!ReferenceEquals(peeked.envelope, received.envelope)) throw new InvalidOperationException("[RACY] Dequeued item does not match earlier peeked item"); - - msg = envelope; + + msg = peeked.envelope; } // If the message is rejected by the filter, stop the loop else @@ -550,7 +575,7 @@ public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = _testState.LastMessage = msg; break; } - + // Store the accepted message and continue. acc.Add(result); count++; @@ -561,7 +586,6 @@ public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = return acc; } - /// /// Receive a series of messages. /// It will continue to receive messages until the predicate returns false or the idle @@ -580,8 +604,17 @@ public IReadOnlyList ReceiveWhile(Func filter, TimeSpan? max = /// TBD /// TBD /// TBD + /// /// TBD - public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, bool shouldIgnoreOtherMessageTypes = true) where T : class + public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, bool shouldIgnoreOtherMessageTypes = true, CancellationToken cancellationToken = default) where T : class + { + var task = ReceiveWhileAsync(shouldContinue, max, idle, msgs, shouldIgnoreOtherMessageTypes, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; + } + + /// + public async ValueTask> ReceiveWhileAsync(Predicate shouldContinue, TimeSpan? max = null, TimeSpan? idle = null, int msgs = int.MaxValue, bool shouldIgnoreOtherMessageTypes = true, CancellationToken cancellationToken = default) where T : class { var start = Now; var maxValue = RemainingOrDilated(max); @@ -594,12 +627,14 @@ public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? m MessageEnvelope msg = NullMessageEnvelope.Instance; while (count < msgs) { - if (!TryPeekOne(out var envelope, (stop - Now).Min(idleValue))) + cancellationToken.ThrowIfCancellationRequested(); + var peeked = await TryPeekOneAsync((stop - Now).Min(idleValue), cancellationToken).ConfigureAwait(false); + if (!peeked.success) { _testState.LastMessage = msg; break; } - var message = envelope.Message; + var message = peeked.envelope.Message; var typedMessage = message as T; var shouldStop = false; if (typedMessage != null) @@ -623,11 +658,13 @@ public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? m if (!shouldStop) { // This should happen immediately (zero timespan). Something is wrong if this fails. - if (!InternalTryReceiveOne(out var removed, TimeSpan.Zero, CancellationToken.None, true)) + var received = await InternalTryReceiveOneAsync(TimeSpan.Zero, cancellationToken, true) + .ConfigureAwait(false); + if (!received.success) throw new InvalidOperationException("[RACY] Could not dequeue an item from test queue."); - + // The removed item should be equal to the one peeked previously - if(!ReferenceEquals(envelope, removed)) + if (!ReferenceEquals(peeked.envelope, received.envelope)) throw new InvalidOperationException("[RACY] Dequeued item does not match earlier peeked item"); } // If the message is rejected by the filter, stop the loop @@ -636,7 +673,7 @@ public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? m _testState.LastMessage = msg; break; } - msg = envelope; + msg = peeked.envelope; } ConditionalLog("Received {0} messages with filter during {1}", count, Now - start); @@ -648,10 +685,19 @@ public IReadOnlyList ReceiveWhile(Predicate shouldContinue, TimeSpan? m /// Receive the specified number of messages using as timeout. /// /// The number of messages. + /// /// The received messages - public IReadOnlyCollection ReceiveN(int numberOfMessages) + public IReadOnlyCollection ReceiveN(int numberOfMessages, CancellationToken cancellationToken = default) { - var result = InternalReceiveN(numberOfMessages, RemainingOrDefault, true).ToList(); + var task = ReceiveNAsync(numberOfMessages, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; + } + + /// + public async ValueTask> ReceiveNAsync(int numberOfMessages, CancellationToken cancellationToken) + { + var result = await InternalReceiveNAsync(numberOfMessages, RemainingOrDefault, true, cancellationToken).ToListAsync(); return result; } @@ -661,24 +707,35 @@ public IReadOnlyCollection ReceiveN(int numberOfMessages) /// /// The number of messages. /// The timeout scaled by "akka.test.timefactor" using . + /// /// The received messages - public IReadOnlyCollection ReceiveN(int numberOfMessages, TimeSpan max) + public IReadOnlyCollection ReceiveN(int numberOfMessages, TimeSpan max, CancellationToken cancellationToken = default) + { + var task = ReceiveNAsync(numberOfMessages, max, cancellationToken).AsTask(); + task.WaitAndUnwrapException(); + return task.Result; + } + + /// + public async ValueTask> ReceiveNAsync(int numberOfMessages, TimeSpan max, CancellationToken cancellationToken = default) { max.EnsureIsPositiveFinite("max"); var dilated = Dilated(max); - var result = InternalReceiveN(numberOfMessages, dilated, true).ToList(); + var result = await InternalReceiveNAsync(numberOfMessages, dilated, true, cancellationToken).ToListAsync(cancellationToken); return result; } - private IEnumerable InternalReceiveN(int numberOfMessages, TimeSpan max, bool shouldLog) + private async IAsyncEnumerable InternalReceiveNAsync(int numberOfMessages, TimeSpan max, bool shouldLog, [EnumeratorCancellation] CancellationToken cancellationToken) { var start = Now; var stop = max + start; ConditionalLog(shouldLog, "Trying to receive {0} messages during {1}.", numberOfMessages, max); - for (int i = 0; i < numberOfMessages; i++) + for (var i = 0; i < numberOfMessages; i++) { + cancellationToken.ThrowIfCancellationRequested(); + var timeout = stop - Now; - var o = ReceiveOne(timeout); + var o = await ReceiveOneAsync(timeout, cancellationToken); var condition = o != null; if (!condition) {