From 03c1aec6ca5dae92e4a14d2d8bf48e4481d8b029 Mon Sep 17 00:00:00 2001 From: JeffCyr Date: Sun, 13 Mar 2016 09:29:47 -0400 Subject: [PATCH] Replaced Receive(Func handler) by ReceiveAsync(...) --- src/benchmark/PingPong/ClientAsyncActor.cs | 2 +- .../CoreAPISpec.ApproveCore.approved.txt | 6 ++ .../Reporting/TestRunCoordinator.cs | 4 +- src/core/Akka.Remote/EndpointManager.cs | 23 +++-- src/core/Akka.Tests/Actor/ActorCellSpec.cs | 2 +- .../Akka.Tests/Dispatch/AsyncAwaitSpec.cs | 55 +++++++++-- .../Akka.Tests/Routing/TailChoppingSpec.cs | 2 +- src/core/Akka/Actor/ReceiveActor.cs | 92 ++++++++++++++++++- 8 files changed, 156 insertions(+), 30 deletions(-) diff --git a/src/benchmark/PingPong/ClientAsyncActor.cs b/src/benchmark/PingPong/ClientAsyncActor.cs index 4ab606fc77a..f3ede7b853c 100644 --- a/src/benchmark/PingPong/ClientAsyncActor.cs +++ b/src/benchmark/PingPong/ClientAsyncActor.cs @@ -19,7 +19,7 @@ public ClientAsyncActor(IActorRef actor, long repeat, TaskCompletionSource { var received = 0L; var sent = 0L; - Receive(async m => + ReceiveAsync(async m => { received++; if (sent < repeat) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 37479cfc60f..4e44d8e7f2e 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -1318,6 +1318,7 @@ namespace Akka.Actor protected void Become(System.Action configure, bool discardOld = True) { } protected void BecomeStacked(System.Action configure) { } protected virtual void OnReceive(object message) { } + [System.ObsoleteAttribute("Use ReceiveAsync instead. This method will be removed in future versions")] protected void Receive(System.Func handler) { } protected void Receive(System.Action handler, System.Predicate shouldHandle = null) { } protected void Receive(System.Predicate shouldHandle, System.Action handler) { } @@ -1326,6 +1327,11 @@ namespace Akka.Actor protected void Receive(System.Func handler) { } protected void Receive(System.Type messageType, System.Func handler) { } protected void ReceiveAny(System.Action handler) { } + protected void ReceiveAnyAsync(System.Func handler) { } + protected void ReceiveAsync(System.Func handler, System.Predicate shouldHandle = null) { } + protected void ReceiveAsync(System.Predicate shouldHandle, System.Func handler) { } + protected void ReceiveAsync(System.Type messageType, System.Func handler, System.Predicate shouldHandle = null) { } + protected void ReceiveAsync(System.Type messageType, System.Predicate shouldHandle, System.Func handler) { } } public class ReceiveTimeout : Akka.Actor.IPossiblyHarmful { diff --git a/src/core/Akka.MultiNodeTestRunner.Shared/Reporting/TestRunCoordinator.cs b/src/core/Akka.MultiNodeTestRunner.Shared/Reporting/TestRunCoordinator.cs index 4e27cc862bd..8136cd169f7 100644 --- a/src/core/Akka.MultiNodeTestRunner.Shared/Reporting/TestRunCoordinator.cs +++ b/src/core/Akka.MultiNodeTestRunner.Shared/Reporting/TestRunCoordinator.cs @@ -112,11 +112,11 @@ private void SetReceive() _currentSpecRunActor.Forward(message); }); Receive(spec => ReceiveBeginSpecRun(spec)); - Receive(spec => ReceiveEndSpecRun(spec)); + ReceiveAsync(spec => ReceiveEndSpecRun(spec)); Receive(state => Sender.Tell(TestRunData.Copy(TestRunPassed(TestRunData)))); Receive(messages => AddSubscriber(messages)); Receive(messages => RemoveSubscriber(messages)); - Receive(async run => + ReceiveAsync(async run => { //clean up the current spec, if it hasn't been done already if (_currentSpecRunActor != null) diff --git a/src/core/Akka.Remote/EndpointManager.cs b/src/core/Akka.Remote/EndpointManager.cs index a660c40e371..956a2cdf86f 100644 --- a/src/core/Akka.Remote/EndpointManager.cs +++ b/src/core/Akka.Remote/EndpointManager.cs @@ -470,18 +470,21 @@ private void Receiving() * those results will then be piped back to Remoting, who waits for the results of * listen.AddressPromise. * */ - Receive(listen => Listens.ContinueWith(listens => + Receive(listen => { - if (listens.IsFaulted) + Listens.ContinueWith(listens => { - return new ListensFailure(listen.AddressesPromise, listens.Exception); - } - else - { - return new ListensResult(listen.AddressesPromise, listens.Result); - } - }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default) - .PipeTo(Self)); + if (listens.IsFaulted) + { + return new ListensFailure(listen.AddressesPromise, listens.Exception); + } + else + { + return new ListensResult(listen.AddressesPromise, listens.Result); + } + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default) + .PipeTo(Self); + }); Receive(listens => { diff --git a/src/core/Akka.Tests/Actor/ActorCellSpec.cs b/src/core/Akka.Tests/Actor/ActorCellSpec.cs index b57d98fbc69..09e35a5fb04 100644 --- a/src/core/Akka.Tests/Actor/ActorCellSpec.cs +++ b/src/core/Akka.Tests/Actor/ActorCellSpec.cs @@ -32,7 +32,7 @@ public class DummyAsyncActor : ReceiveActor { public DummyAsyncActor(AutoResetEvent autoResetEvent) { - Receive(async m => + ReceiveAsync(async m => { await Task.Delay(500); autoResetEvent.Set(); diff --git a/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs b/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs index f9f2ac5ed49..7253eb48a99 100644 --- a/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs +++ b/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs @@ -23,7 +23,7 @@ public ReceiveTimeoutAsyncActor() { _replyTo.Tell("GotIt"); }); - Receive(async s => + ReceiveAsync(async s => { _replyTo = Sender; @@ -36,7 +36,7 @@ class AsyncActor : ReceiveActor { public AsyncActor() { - Receive(async s => + ReceiveAsync(async s => { await Task.Yield(); await Task.Delay(TimeSpan.FromMilliseconds(100)); @@ -57,7 +57,7 @@ public SuspendActor() { state = 1; }); - Receive(async m_ => + ReceiveAsync(async m_ => { Self.Tell("change"); await Task.Delay(TimeSpan.FromSeconds(1)); @@ -75,7 +75,7 @@ public class AsyncAwaitActor : ReceiveActor { public AsyncAwaitActor() { - Receive(async _ => + ReceiveAsync(async _ => { var sender = Sender; var self = Self; @@ -85,6 +85,24 @@ public AsyncAwaitActor() Assert.Same(self, Self); Sender.Tell("done"); }); + + ReceiveAsync(async msg => + { + await Task.Yield(); + Sender.Tell("handled"); + }, i => i > 10); + + ReceiveAsync(typeof(double), async msg => + { + await Task.Yield(); + Sender.Tell("handled"); + }); + + ReceiveAnyAsync(async msg => + { + await Task.Yield(); + Sender.Tell("receiveany"); + }); } } @@ -112,7 +130,7 @@ public class Asker : ReceiveActor { public Asker(IActorRef other) { - Receive(async _ => + ReceiveAsync(async _ => { var sender = Sender; var self = Self; @@ -189,7 +207,7 @@ public class AsyncExceptionActor : ReceiveActor public AsyncExceptionActor(IActorRef callback) { _callback = callback; - Receive(async _ => + ReceiveAsync(async _ => { await Task.Yield(); ThrowException(); @@ -374,7 +392,7 @@ public class AsyncExceptionCatcherActor : ReceiveActor public AsyncExceptionCatcherActor() { - Receive(async m => + ReceiveAsync(async m => { _lastMessage = m; try @@ -410,7 +428,7 @@ public class AsyncFailingActor : ReceiveActor { public AsyncFailingActor() { - Receive(async m => + ReceiveAsync(async m => { ThrowException(); }); @@ -443,7 +461,7 @@ public class AsyncPipeToDelayActor : ReceiveActor { public AsyncPipeToDelayActor() { - Receive(async msg => + ReceiveAsync(async msg => { Task.Run(() => { @@ -460,7 +478,7 @@ public class AsyncReentrantActor : ReceiveActor { public AsyncReentrantActor() { - Receive(async msg => + ReceiveAsync(async msg => { var sender = Sender; Task.Run(() => @@ -492,6 +510,23 @@ public void Actor_PipeTo_should_not_be_delayed_by_async_receive() actor.Tell("hello"); ExpectMsg(m => "hello".Equals(m), TimeSpan.FromMilliseconds(1000)); } + + [Fact] + public async Task Actor_receiveasync_overloads_should_work() + { + var actor = Sys.ActorOf(); + + actor.Tell(11); + ExpectMsg(m => "handled".Equals(m), TimeSpan.FromMilliseconds(1000)); + + actor.Tell(9); + ExpectMsg(m => "receiveany".Equals(m), TimeSpan.FromMilliseconds(1000)); + + actor.Tell(1.0); + ExpectMsg(m => "handled".Equals(m), TimeSpan.FromMilliseconds(1000)); + + + } } } diff --git a/src/core/Akka.Tests/Routing/TailChoppingSpec.cs b/src/core/Akka.Tests/Routing/TailChoppingSpec.cs index 353ea6d4163..cf46db4ffbc 100644 --- a/src/core/Akka.Tests/Routing/TailChoppingSpec.cs +++ b/src/core/Akka.Tests/Routing/TailChoppingSpec.cs @@ -25,7 +25,7 @@ class TailChopTestActor : ReceiveActor public TailChopTestActor(int sleepTime) { - Receive(async command => + ReceiveAsync(async command => { switch (command) { diff --git a/src/core/Akka/Actor/ReceiveActor.cs b/src/core/Akka/Actor/ReceiveActor.cs index c3ae9f32e91..ac34f747ef4 100644 --- a/src/core/Akka/Actor/ReceiveActor.cs +++ b/src/core/Akka/Actor/ReceiveActor.cs @@ -110,14 +110,96 @@ private PartialAction CreateNewHandler(Action configure) return newHandler; } - protected void Receive(Func handler) + [Obsolete("Use ReceiveAsync instead. This method will be removed in future versions")] + protected void Receive(Func handler) { - EnsureMayConfigureMessageHandlers(); - _matchHandlerBuilders.Peek().Match( m => + ReceiveAsync(handler); + } + + private Action WrapAsyncHandler(Func asyncHandler) + { + return m => { - Func wrap = () => handler(m); + Func wrap = () => asyncHandler(m); ActorTaskScheduler.RunTask(wrap); - }); + }; + } + + /// + /// Registers an asynchronous handler for incoming messages of the specified type . + /// If !=null then it must return true before a message is passed to . + /// The actor will be suspended until the task returned by completes. + /// This method may only be called when constructing the actor or from or . + /// Note that handlers registered prior to this may have handled the message already. + /// In that case, this handler will not be invoked. + /// + /// The type of the message + /// The message handler that is invoked for incoming messages of the specified type + /// When not null it is used to determine if the message matches. + protected void ReceiveAsync(Func handler, Predicate shouldHandle = null) + { + Receive(WrapAsyncHandler(handler), shouldHandle); + } + + /// + /// Registers an asynchronous handler for incoming messages of the specified type . + /// If !=null then it must return true before a message is passed to . + /// The actor will be suspended until the task returned by completes. + /// This method may only be called when constructing the actor or from or . + /// Note that handlers registered prior to this may have handled the message already. + /// In that case, this handler will not be invoked. + /// + /// The type of the message + /// When not null it is used to determine if the message matches. + /// The message handler that is invoked for incoming messages of the specified type + protected void ReceiveAsync(Predicate shouldHandle, Func handler) + { + Receive(WrapAsyncHandler(handler), shouldHandle); + } + + /// + /// Registers an asynchronous handler for incoming messages of the specified . + /// If !=null then it must return true before a message is passed to . + /// The actor will be suspended until the task returned by completes. + /// This method may only be called when constructing the actor or from or . + /// Note that handlers registered prior to this may have handled the message already. + /// In that case, this handler will not be invoked. + /// + /// The type of the message + /// The message handler that is invoked for incoming messages of the specified + /// When not null it is used to determine if the message matches. + protected void ReceiveAsync(Type messageType, Func handler, Predicate shouldHandle = null) + { + Receive(messageType, WrapAsyncHandler(handler), shouldHandle); + } + + /// + /// Registers an asynchronous handler for incoming messages of the specified . + /// If !=null then it must return true before a message is passed to . + /// The actor will be suspended until the task returned by completes. + /// This method may only be called when constructing the actor or from or . + /// Note that handlers registered prior to this may have handled the message already. + /// In that case, this handler will not be invoked. + /// + /// The type of the message + /// When not null it is used to determine if the message matches. + /// The message handler that is invoked for incoming messages of the specified + protected void ReceiveAsync(Type messageType, Predicate shouldHandle, Func handler) + { + Receive(messageType, WrapAsyncHandler(handler), shouldHandle); + } + + /// + /// Registers an asynchronous handler for incoming messages of any type. + /// The actor will be suspended until the task returned by completes. + /// This method may only be called when constructing the actor or from or . + /// Note that handlers registered prior to this may have handled the message already. + /// In that case, this handler will not be invoked. + /// + /// The message handler that is invoked for all + protected void ReceiveAnyAsync(Func handler) + { + ReceiveAny(WrapAsyncHandler(handler)); } ///