diff --git a/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs b/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs index eb14022b53c..47d23ce82fb 100644 --- a/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs +++ b/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs @@ -8,8 +8,6 @@ using System; using System.Threading.Tasks; using Akka.Actor; -using Akka.Dispatch; -using Akka.Event; using Akka.TestKit; using Xunit; @@ -29,8 +27,6 @@ public AsyncActor() } }); } - - ILoggingAdapter Log = Context.GetLogger(); } public class SuspendActor : ReceiveActor @@ -42,7 +38,7 @@ public SuspendActor() { state = 1; }); - Receive(AsyncBehavior.Suspend, async _ => + Receive(async _ => { Self.Tell("change"); await Task.Delay(TimeSpan.FromSeconds(1)); @@ -51,21 +47,7 @@ public SuspendActor() }); } } - public class ReentrantActor : ReceiveActor - { - public ReentrantActor() - { - var state = 0; - Receive(s => s == "change", _ => state = 1); - Receive(AsyncBehavior.Reentrant, async _ => - { - Self.Tell("change"); - await Task.Delay(TimeSpan.FromSeconds(1)); - //we expect that state should have changed due to an incoming message - Sender.Tell(state); - }); - } - } + public class AsyncAwaitActor : ReceiveActor { public AsyncAwaitActor() @@ -89,7 +71,7 @@ protected override void OnReceive(object message) { if (message is string) { - RunTask(AsyncBehavior.Suspend, async () => + RunTask(async () => { var sender = Sender; var self = Self; @@ -132,7 +114,7 @@ protected override void OnReceive(object message) { if (message is string) { - RunTask(AsyncBehavior.Suspend, async () => + RunTask(async () => { var sender = Sender; var self = Self; @@ -210,7 +192,7 @@ public AsyncTplActor() Receive(m => { //this is also safe, all tasks complete in the actor context - RunTask(AsyncBehavior.Suspend, () => + RunTask(() => { Task.Delay(TimeSpan.FromSeconds(1)) .ContinueWith(t => { Sender.Tell("done"); }); @@ -228,7 +210,7 @@ public AsyncTplExceptionActor(IActorRef callback) _callback = callback; Receive(m => { - RunTask(AsyncBehavior.Suspend, () => + RunTask(() => { Task.Delay(TimeSpan.FromSeconds(1)) .ContinueWith(t => { throw new Exception("foo"); }); @@ -320,13 +302,7 @@ public void Actors_should_be_able_to_supervise_exception_ContinueWith() asker.Tell("start"); ExpectMsg("done", TimeSpan.FromSeconds(5)); } - [Fact] - public async Task Actors_should_be_able_to_reenter() - { - var asker = Sys.ActorOf(Props.Create(() => new ReentrantActor())); - var res = await asker.Ask("start",TimeSpan.FromSeconds(5)); - res.ShouldBe(1); - } + [Fact] public async Task Actors_should_be_able_to_suspend_reentrancy() diff --git a/src/core/Akka/Actor/ActorCell.DefaultMessages.cs b/src/core/Akka/Actor/ActorCell.DefaultMessages.cs index 9b2e36d4279..4d6b48e630c 100644 --- a/src/core/Akka/Actor/ActorCell.DefaultMessages.cs +++ b/src/core/Akka/Actor/ActorCell.DefaultMessages.cs @@ -174,8 +174,6 @@ public void SystemInvoke(Envelope envelope) else if (m is Recreate) FaultRecreate((m as Recreate).Cause); else if (m is Suspend) FaultSuspend(); else if (m is Resume) FaultResume((m as Resume).CausedByFailure); - else if (m is SuspendReentrancy) HandleSuspendReentrancy(); - else if (m is ResumeReentrancy) HandleResumeReentrancy(); else if (m is Terminate) Terminate(); else if (m is Supervise) { @@ -193,16 +191,6 @@ public void SystemInvoke(Envelope envelope) } } - private void HandleSuspendReentrancy() - { - Mailbox.Suspend(MailboxSuspendStatus.AwaitingTask); - } - - private void HandleResumeReentrancy() - { - Mailbox.Resume(MailboxSuspendStatus.AwaitingTask); - } - private void HandleCompleteTask(CompleteTask task) { CurrentMessage = task.State.Message; @@ -357,16 +345,6 @@ public void Suspend() SendSystemMessage(Dispatch.SysMsg.Suspend.Instance); } - public void SuspendReentrancy() - { - SendSystemMessage(Dispatch.SysMsg.SuspendReentrancy.Instance); - } - - public void ResumeReentrancy() - { - SendSystemMessage(Dispatch.SysMsg.ResumeReentrancy.Instance); - } - private void SendSystemMessage(ISystemMessage systemMessage) { try diff --git a/src/core/Akka/Actor/ReceiveActor.cs b/src/core/Akka/Actor/ReceiveActor.cs index a6ef0aafbb5..f6bee454c03 100644 --- a/src/core/Akka/Actor/ReceiveActor.cs +++ b/src/core/Akka/Actor/ReceiveActor.cs @@ -116,17 +116,7 @@ protected void Receive(Func handler) _matchHandlerBuilders.Peek().Match( m => { Func wrap = () => handler(m); - ActorTaskScheduler.RunTask(AsyncBehavior.Suspend, wrap); - }); - } - - protected void Receive(AsyncBehavior behavior, Func handler) - { - EnsureMayConfigureMessageHandlers(); - _matchHandlerBuilders.Peek().Match(m => - { - Func wrap = () => handler(m); - ActorTaskScheduler.RunTask(behavior, wrap); + ActorTaskScheduler.RunTask(wrap); }); } diff --git a/src/core/Akka/Actor/UntypedActor.cs b/src/core/Akka/Actor/UntypedActor.cs index 37f50f1af02..996631e10d0 100644 --- a/src/core/Akka/Actor/UntypedActor.cs +++ b/src/core/Akka/Actor/UntypedActor.cs @@ -22,14 +22,14 @@ protected sealed override bool Receive(object message) return true; } - protected void RunTask(AsyncBehavior behavior, Action action) + protected void RunTask(Action action) { - ActorTaskScheduler.RunTask(behavior,action); + ActorTaskScheduler.RunTask(action); } - protected void RunTask(AsyncBehavior behavior, Func action) + protected void RunTask(Func action) { - ActorTaskScheduler.RunTask(behavior,action); + ActorTaskScheduler.RunTask(action); } /// diff --git a/src/core/Akka/Dispatch/ActorTaskScheduler.cs b/src/core/Akka/Dispatch/ActorTaskScheduler.cs index cc94ed6f18e..659b9481080 100644 --- a/src/core/Akka/Dispatch/ActorTaskScheduler.cs +++ b/src/core/Akka/Dispatch/ActorTaskScheduler.cs @@ -15,12 +15,6 @@ namespace Akka.Dispatch { - public enum AsyncBehavior - { - Reentrant, - Suspend - } - public class AmbientState { public IActorRef Self { get; set; } @@ -85,8 +79,8 @@ protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQu //Is the current cell and the current state the same? if (cell != null && s != null && - cell.Self == s.Self && - cell.Sender == s.Sender && + Equals(cell.Self, s.Self) && + Equals(cell.Sender, s.Sender) && cell.CurrentMessage == s.Message) { var res = TryExecuteTask(task); @@ -96,24 +90,22 @@ protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQu return false; } - public static void RunTask(AsyncBehavior behavior, Action action) + public static void RunTask(Action action) { - RunTask(behavior, () => + RunTask(() => { action(); return Task.FromResult(0); }); } - public static void RunTask(AsyncBehavior behavior, Func action) + public static void RunTask(Func action) { var context = ActorCell.Current; + var mailbox = context.Mailbox; - //if reentrancy is not allowed, suspend user message processing - if (behavior == AsyncBehavior.Suspend) - { - context.SuspendReentrancy(); - } + //suspend the mailbox + mailbox.Suspend(MailboxSuspendStatus.AwaitingTask); SetCurrentState(context.Self, context.Sender, null); @@ -134,11 +126,8 @@ await action() Faulted, TaskContinuationOptions.None); - //if reentrancy was suspended, make sure we re-enable message processing again - if (behavior == AsyncBehavior.Suspend) - { - context.ResumeReentrancy(); - } + //if mailbox was suspended, make sure we re-enable message processing again + mailbox.Resume(MailboxSuspendStatus.AwaitingTask); }, Outer, CancellationToken.None, diff --git a/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs b/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs index 5b932286876..54694b7a68a 100644 --- a/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs +++ b/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs @@ -360,38 +360,6 @@ public override string ToString() } } - /// - /// Class SuspendReentrancy. - /// - public sealed class SuspendReentrancy : ISystemMessage - { - private SuspendReentrancy() { } - private static readonly SuspendReentrancy _instance = new SuspendReentrancy(); - public static SuspendReentrancy Instance - { - get - { - return _instance; - } - } - } - - /// - /// Class ResumeReentrancy. - /// - public sealed class ResumeReentrancy : ISystemMessage - { - private ResumeReentrancy() { } - private static readonly ResumeReentrancy _instance = new ResumeReentrancy(); - public static ResumeReentrancy Instance - { - get - { - return _instance; - } - } - } - /// /// Class Stop. ///