Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT #6281] Add ReceiveAsync to TestActorRef #6286

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/core/Akka.TestKit.Tests/TestActorRefTests/ExceptionHandling.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// -----------------------------------------------------------------------
// <copyright file="ExceptionHandling.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using static FluentAssertions.FluentActions;

namespace Akka.TestKit.Tests.TestActorRefTests
{
public class ExceptionHandling: TestKit.Xunit2.TestKit
{
private class GiveError
{ }

private class GiveErrorAsync
{ }

private class ExceptionActor : ReceiveActor
{
public ExceptionActor()
{
Receive<GiveError>((b) => throw new Exception("WAT"));

ReceiveAsync<GiveErrorAsync>(async (b) =>
{
await Task.Delay(TimeSpan.FromSeconds(0.1));
throw new Exception("WATASYNC");
});
}
}

public ExceptionHandling(ITestOutputHelper helper) : base("akka.loglevel = debug", helper)
{
}

[Fact]
public void GetException()
{
var props = Props.Create<ExceptionActor>();
var subject = new TestActorRef<ExceptionActor>(Sys, props, null, "testA");
Invoking(() => subject.Receive(new GiveError()))
.Should().Throw<Exception>().WithMessage("WAT");
}

[Fact]
public async Task GetExceptionAsync()
{
var props = Props.Create<ExceptionActor>();
var subject = new TestActorRef<ExceptionActor>(Sys, props, null, "testB");
await Awaiting(() => subject.ReceiveAsync(new GiveErrorAsync()))
.Should().ThrowAsync<Exception>().WithMessage("WATASYNC");
}
}
}
74 changes: 68 additions & 6 deletions src/core/Akka.TestKit/Internal/InternalTestActorRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Dispatch;
using Akka.Dispatch.SysMsg;
using Akka.Pattern;
using Akka.Util;
using Akka.Util.Internal;
Expand Down Expand Up @@ -89,6 +90,14 @@ public void Receive(object message, IActorRef sender = null)
cell.UseThreadContext(() => cell.ReceiveMessageForTest(envelope));
}

public Task ReceiveAsync(object message, IActorRef sender = null)
{
var cell = (TestActorCell)Cell;
sender = sender.IsNobody() ? cell.System.DeadLetters : sender;
var envelope = new Envelope(message, sender);
return cell.UseThreadContextAsync(() => cell.ReceiveMessageForTestAsync(envelope));
}

/// <summary>
/// TBD
/// </summary>
Expand Down Expand Up @@ -245,25 +254,73 @@ public override ActorTaskScheduler TaskScheduler
if (taskScheduler != null)
return taskScheduler;

taskScheduler = new TestActorTaskScheduler(this);
taskScheduler = new TestActorTaskScheduler(this, TaskFailureHook);
return Interlocked.CompareExchange(ref _taskScheduler, taskScheduler, null) ?? taskScheduler;
}
}


private readonly Dictionary<object, TaskCompletionSource<Done>> _testActorTasks =
new Dictionary<object, TaskCompletionSource<Done>>();

/// <summary>
/// This is only intended to be called from TestKit's TestActorRef
/// </summary>
/// <param name="envelope">TBD</param>
public Task ReceiveMessageForTestAsync(Envelope envelope)
{
var tcs = new TaskCompletionSource<Done>();
_testActorTasks[envelope.Message] = tcs;
ReceiveMessageForTest(envelope);
return tcs.Task;
}

/// <summary>
/// TBD
/// </summary>
/// <param name="actionAsync">TBD</param>
public Task UseThreadContextAsync(Func<Task> actionAsync)
{
var tmp = InternalCurrentActorCellKeeper.Current;
InternalCurrentActorCellKeeper.Current = this;
try
{
return actionAsync();
}
finally
{
//ensure we set back the old context
InternalCurrentActorCellKeeper.Current = tmp;
}
}

private void TaskFailureHook(object message, Exception exception)
{
if (!_testActorTasks.TryGetValue(message, out var tcs))
return;
if (!(exception is null))
tcs.TrySetException(exception);
else
tcs.TrySetResult(Done.Instance);
_testActorTasks.Remove(message);
}

/// <summary>
/// TBD
/// </summary>
public new object Actor { get { return base.Actor; } }
}

internal class TestActorTaskScheduler : ActorTaskScheduler
internal class TestActorTaskScheduler : ActorTaskScheduler, IAsyncResultInterceptor
{
private readonly ActorCell _testActorCell;
private readonly TestActorCell _testActorCell;
private readonly Action<object, Exception> _taskCallback;

/// <inheritdoc />
internal TestActorTaskScheduler(ActorCell testActorCell) : base(testActorCell)
internal TestActorTaskScheduler(ActorCell testActorCell, Action<object, Exception> taskCallback) : base(testActorCell)
{
_testActorCell = testActorCell;
_taskCallback = taskCallback;
_testActorCell = (TestActorCell) testActorCell;
}

/// <inheritdoc />
Expand All @@ -277,6 +334,11 @@ protected override void OnAfterTaskCompleted()
{
ActorCellKeepingSynchronizationContext.AsyncCache = null;
}

public void OnTaskCompleted(object message, Exception exception)
{
_taskCallback(message, exception);
}
}

/// <summary>
Expand Down
17 changes: 17 additions & 0 deletions src/core/Akka.TestKit/TestActorRefBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Dispatch.SysMsg;
Expand Down Expand Up @@ -51,6 +52,22 @@ public void Receive(object message, IActorRef sender = null)
_internalRef.Receive(message, sender);
}

/// <summary>
/// Directly inject messages into actor ReceiveAsync behavior. Any exceptions
/// thrown will be available to you, while still being able to use
/// become/unbecome.
/// Note: This method violates the actor model and could cause unpredictable
/// behavior. For example, a Receive call to an actor could run simultaneously
/// (2 simultaneous threads running inside the actor) with the actor's handling
/// of a previous Tell call.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="sender">The sender.</param>
public Task ReceiveAsync(object message, IActorRef sender = null)
{
return _internalRef.ReceiveAsync(message, sender);
}

/// <summary>
/// TBD
/// </summary>
Expand Down
15 changes: 11 additions & 4 deletions src/core/Akka/Dispatch/ActorTaskScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public static void RunTask(Func<Task> asyncAction)
//suspend the mailbox
dispatcher.Suspend(context);

ActorTaskScheduler actorScheduler = context.TaskScheduler;
var actorScheduler = context.TaskScheduler;
actorScheduler.CurrentMessage = context.CurrentMessage;

actorScheduler.OnBeforeTaskStarted();
Expand All @@ -158,18 +158,21 @@ public static void RunTask(Func<Task> asyncAction)
.Unwrap()
.ContinueWith(parent =>
{
Exception exception = GetTaskException(parent);

var exception = GetTaskException(parent);
if (exception == null)
{
dispatcher.Resume(context);

context.CheckReceiveTimeout();
}
else
{
context.Self.AsInstanceOf<IInternalActorRef>().SendSystemMessage(new ActorTaskSchedulerMessage(exception, actorScheduler.CurrentMessage));
}

// Used by TestActorRef to intercept async execution result
if(actorScheduler is IAsyncResultInterceptor interceptor)
interceptor.OnTaskCompleted(actorScheduler.CurrentMessage, exception);

//clear the current message field of the scheduler
actorScheduler.CurrentMessage = null;
actorScheduler.OnAfterTaskCompleted();
Expand Down Expand Up @@ -203,3 +206,7 @@ private static Exception TryUnwrapAggregateException(AggregateException aggregat
}
}

internal interface IAsyncResultInterceptor
{
void OnTaskCompleted(object message, Exception exception);
}