Skip to content

Commit

Permalink
Merge pull request #917 from rogeralsing/remove-reentrancy
Browse files Browse the repository at this point in the history
Removed Reentrancy behavior
  • Loading branch information
Aaronontheweb committed Apr 28, 2015
2 parents 56b567c + 5f2dd97 commit aa54a2a
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 121 deletions.
38 changes: 7 additions & 31 deletions src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Event;
using Akka.TestKit;
using Xunit;

Expand All @@ -29,8 +27,6 @@ public AsyncActor()
}
});
}

ILoggingAdapter Log = Context.GetLogger();
}

public class SuspendActor : ReceiveActor
Expand All @@ -42,7 +38,7 @@ public SuspendActor()
{
state = 1;
});
Receive<string>(AsyncBehavior.Suspend, async _ =>
Receive<string>(async _ =>
{
Self.Tell("change");
await Task.Delay(TimeSpan.FromSeconds(1));
Expand All @@ -51,21 +47,7 @@ public SuspendActor()
});
}
}
public class ReentrantActor : ReceiveActor
{
public ReentrantActor()
{
var state = 0;
Receive<string>(s => s == "change", _ => state = 1);
Receive<string>(AsyncBehavior.Reentrant, async _ =>
{
Self.Tell("change");
await Task.Delay(TimeSpan.FromSeconds(1));
//we expect that state should have changed due to an incoming message
Sender.Tell(state);
});
}
}

public class AsyncAwaitActor : ReceiveActor
{
public AsyncAwaitActor()
Expand All @@ -89,7 +71,7 @@ protected override void OnReceive(object message)
{
if (message is string)
{
RunTask(AsyncBehavior.Suspend, async () =>
RunTask(async () =>
{
var sender = Sender;
var self = Self;
Expand Down Expand Up @@ -132,7 +114,7 @@ protected override void OnReceive(object message)
{
if (message is string)
{
RunTask(AsyncBehavior.Suspend, async () =>
RunTask(async () =>
{
var sender = Sender;
var self = Self;
Expand Down Expand Up @@ -210,7 +192,7 @@ public AsyncTplActor()
Receive<string>(m =>
{
//this is also safe, all tasks complete in the actor context
RunTask(AsyncBehavior.Suspend, () =>
RunTask(() =>
{
Task.Delay(TimeSpan.FromSeconds(1))
.ContinueWith(t => { Sender.Tell("done"); });
Expand All @@ -228,7 +210,7 @@ public AsyncTplExceptionActor(IActorRef callback)
_callback = callback;
Receive<string>(m =>
{
RunTask(AsyncBehavior.Suspend, () =>
RunTask(() =>
{
Task.Delay(TimeSpan.FromSeconds(1))
.ContinueWith(t => { throw new Exception("foo"); });
Expand Down Expand Up @@ -320,13 +302,7 @@ public void Actors_should_be_able_to_supervise_exception_ContinueWith()
asker.Tell("start");
ExpectMsg("done", TimeSpan.FromSeconds(5));
}
[Fact]
public async Task Actors_should_be_able_to_reenter()
{
var asker = Sys.ActorOf(Props.Create(() => new ReentrantActor()));
var res = await asker.Ask<int>("start",TimeSpan.FromSeconds(5));
res.ShouldBe(1);
}


[Fact]
public async Task Actors_should_be_able_to_suspend_reentrancy()
Expand Down
22 changes: 0 additions & 22 deletions src/core/Akka/Actor/ActorCell.DefaultMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ public void SystemInvoke(Envelope envelope)
else if (m is Recreate) FaultRecreate((m as Recreate).Cause);
else if (m is Suspend) FaultSuspend();
else if (m is Resume) FaultResume((m as Resume).CausedByFailure);
else if (m is SuspendReentrancy) HandleSuspendReentrancy();
else if (m is ResumeReentrancy) HandleResumeReentrancy();
else if (m is Terminate) Terminate();
else if (m is Supervise)
{
Expand All @@ -193,16 +191,6 @@ public void SystemInvoke(Envelope envelope)
}
}

private void HandleSuspendReentrancy()
{
Mailbox.Suspend(MailboxSuspendStatus.AwaitingTask);
}

private void HandleResumeReentrancy()
{
Mailbox.Resume(MailboxSuspendStatus.AwaitingTask);
}

private void HandleCompleteTask(CompleteTask task)
{
CurrentMessage = task.State.Message;
Expand Down Expand Up @@ -357,16 +345,6 @@ public void Suspend()
SendSystemMessage(Dispatch.SysMsg.Suspend.Instance);
}

public void SuspendReentrancy()
{
SendSystemMessage(Dispatch.SysMsg.SuspendReentrancy.Instance);
}

public void ResumeReentrancy()
{
SendSystemMessage(Dispatch.SysMsg.ResumeReentrancy.Instance);
}

private void SendSystemMessage(ISystemMessage systemMessage)
{
try
Expand Down
12 changes: 1 addition & 11 deletions src/core/Akka/Actor/ReceiveActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,7 @@ protected void Receive<T>(Func<T,Task> handler)
_matchHandlerBuilders.Peek().Match<T>( m =>
{
Func<Task> wrap = () => handler(m);
ActorTaskScheduler.RunTask(AsyncBehavior.Suspend, wrap);
});
}

protected void Receive<T>(AsyncBehavior behavior, Func<T, Task> handler)
{
EnsureMayConfigureMessageHandlers();
_matchHandlerBuilders.Peek().Match<T>(m =>
{
Func<Task> wrap = () => handler(m);
ActorTaskScheduler.RunTask(behavior, wrap);
ActorTaskScheduler.RunTask(wrap);
});
}

Expand Down
8 changes: 4 additions & 4 deletions src/core/Akka/Actor/UntypedActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ protected sealed override bool Receive(object message)
return true;
}

protected void RunTask(AsyncBehavior behavior, Action action)
protected void RunTask(Action action)
{
ActorTaskScheduler.RunTask(behavior,action);
ActorTaskScheduler.RunTask(action);
}

protected void RunTask(AsyncBehavior behavior, Func<Task> action)
protected void RunTask(Func<Task> action)
{
ActorTaskScheduler.RunTask(behavior,action);
ActorTaskScheduler.RunTask(action);
}

/// <summary>
Expand Down
31 changes: 10 additions & 21 deletions src/core/Akka/Dispatch/ActorTaskScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@

namespace Akka.Dispatch
{
public enum AsyncBehavior
{
Reentrant,
Suspend
}

public class AmbientState
{
public IActorRef Self { get; set; }
Expand Down Expand Up @@ -85,8 +79,8 @@ protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQu
//Is the current cell and the current state the same?
if (cell != null &&
s != null &&
cell.Self == s.Self &&
cell.Sender == s.Sender &&
Equals(cell.Self, s.Self) &&
Equals(cell.Sender, s.Sender) &&
cell.CurrentMessage == s.Message)
{
var res = TryExecuteTask(task);
Expand All @@ -96,24 +90,22 @@ protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQu
return false;
}

public static void RunTask(AsyncBehavior behavior, Action action)
public static void RunTask(Action action)
{
RunTask(behavior, () =>
RunTask(() =>
{
action();
return Task.FromResult(0);
});
}

public static void RunTask(AsyncBehavior behavior, Func<Task> action)
public static void RunTask(Func<Task> action)
{
var context = ActorCell.Current;
var mailbox = context.Mailbox;

//if reentrancy is not allowed, suspend user message processing
if (behavior == AsyncBehavior.Suspend)
{
context.SuspendReentrancy();
}
//suspend the mailbox
mailbox.Suspend(MailboxSuspendStatus.AwaitingTask);

SetCurrentState(context.Self, context.Sender, null);

Expand All @@ -134,11 +126,8 @@ await action()
Faulted,
TaskContinuationOptions.None);
//if reentrancy was suspended, make sure we re-enable message processing again
if (behavior == AsyncBehavior.Suspend)
{
context.ResumeReentrancy();
}
//if mailbox was suspended, make sure we re-enable message processing again
mailbox.Resume(MailboxSuspendStatus.AwaitingTask);
},
Outer,
CancellationToken.None,
Expand Down
32 changes: 0 additions & 32 deletions src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,38 +360,6 @@ public override string ToString()
}
}

/// <summary>
/// Class SuspendReentrancy.
/// </summary>
public sealed class SuspendReentrancy : ISystemMessage
{
private SuspendReentrancy() { }
private static readonly SuspendReentrancy _instance = new SuspendReentrancy();
public static SuspendReentrancy Instance
{
get
{
return _instance;
}
}
}

/// <summary>
/// Class ResumeReentrancy.
/// </summary>
public sealed class ResumeReentrancy : ISystemMessage
{
private ResumeReentrancy() { }
private static readonly ResumeReentrancy _instance = new ResumeReentrancy();
public static ResumeReentrancy Instance
{
get
{
return _instance;
}
}
}

/// <summary>
/// Class Stop.
/// </summary>
Expand Down

0 comments on commit aa54a2a

Please sign in to comment.