From d1ec2243300425bca4ec1c142e32e83c2cc7c2f8 Mon Sep 17 00:00:00 2001 From: "Andy(Jingzhang)Chen" Date: Thu, 29 Aug 2024 02:12:35 +0800 Subject: [PATCH] additional mailbox selector for typed props (#1096) * additional mailbox selector for typed props * add unit test * chore change of unit test * Revert "configuration typo" This reverts commit 7917feb32a4b042124513d98068db832cc59a541. * fix pekko imports * mention interoperability in doc * share configuration in tests * revert configuration change * fix new typo * fix jdocs tests * optimized import * mention api version in doc * resolve import issue --- .../pekko/typed/DispatchersDocTest.java | 23 +++ .../apache/pekko/typed/MailboxDocTest.java | 30 ++++ .../pekko/typed/DispatchersDocSpec.scala | 21 +++ .../apache/pekko/typed/MailboxDocSpec.scala | 24 ++++ .../scaladsl/DispatcherSelectorSpec.scala | 37 ++--- .../typed/scaladsl/MailboxSelectorSpec.scala | 132 ++++++++++++++++++ .../org/apache/pekko/actor/typed/Props.scala | 8 ++ actor/src/main/resources/reference.conf | 2 +- docs/src/main/paradox/typed/dispatchers.md | 13 +- docs/src/main/paradox/typed/mailboxes.md | 11 ++ 10 files changed, 283 insertions(+), 18 deletions(-) create mode 100644 actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/scaladsl/MailboxSelectorSpec.scala diff --git a/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/DispatchersDocTest.java b/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/DispatchersDocTest.java index 183f697eda1..df8b6728102 100644 --- a/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/DispatchersDocTest.java +++ b/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/DispatchersDocTest.java @@ -29,4 +29,27 @@ public static void spawnDispatchers(ActorContext context, Behavior context, Behavior behavior) { + // #interoperability-with-mailbox + context.spawn( + behavior, + "ExplicitDefaultDispatcher", + DispatcherSelector.defaultDispatcher().withMailboxFromConfig("my-app.my-special-mailbox")); + context.spawn( + behavior, + "BlockingDispatcher", + DispatcherSelector.blocking().withMailboxFromConfig("my-app.my-special-mailbox")); + context.spawn( + behavior, + "ParentDispatcher", + DispatcherSelector.sameAsParent().withMailboxFromConfig("my-app.my-special-mailbox")); + context.spawn( + behavior, + "DispatcherFromConfig", + DispatcherSelector.fromConfig("your-dispatcher") + .withMailboxFromConfig("my-app.my-special-mailbox")); + // #interoperability-with-mailbox + } } diff --git a/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/MailboxDocTest.java b/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/MailboxDocTest.java index c0486f57d8c..9d6b64c1e8e 100644 --- a/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/MailboxDocTest.java +++ b/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/MailboxDocTest.java @@ -19,6 +19,7 @@ import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.Behavior; +import org.apache.pekko.actor.typed.Dispatchers; import org.apache.pekko.actor.typed.MailboxSelector; import org.apache.pekko.actor.typed.javadsl.Behaviors; import com.typesafe.config.ConfigFactory; @@ -59,4 +60,33 @@ public void startSomeActorsWithDifferentMailboxes() { ActorRef ref = testKit.spawn(setup); testProbe.receiveMessage(); } + + @Test + public void startSomeActorsWithMailboxSelectorInteroperability() { + TestProbe testProbe = testKit.createTestProbe(); + Behavior childBehavior = Behaviors.empty(); + + Behavior setup = + Behaviors.setup( + context -> { + // #interoperability-with-dispatcher + context.spawn( + childBehavior, + "bounded-mailbox-child", + MailboxSelector.bounded(100).withDispatcherDefault()); + + context.spawn( + childBehavior, + "from-config-mailbox-child", + MailboxSelector.fromConfig("my-app.my-special-mailbox") + .withDispatcherFromConfig(Dispatchers.DefaultDispatcherId())); + // #interoperability-with-dispatcher + + testProbe.ref().tell(Done.getInstance()); + return Behaviors.stopped(); + }); + + ActorRef ref = testKit.spawn(setup); + testProbe.receiveMessage(); + } } diff --git a/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/DispatchersDocSpec.scala b/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/DispatchersDocSpec.scala index 54753d6f8de..c7dcdb1fbac 100644 --- a/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/DispatchersDocSpec.scala +++ b/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/DispatchersDocSpec.scala @@ -40,6 +40,9 @@ object DispatchersDocSpec { throughput = 1 } //#config + your-mailbox { + mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox" + } """.stripMargin) case class WhichDispatcher(replyTo: ActorRef[Dispatcher]) @@ -65,6 +68,24 @@ object DispatchersDocSpec { Behaviors.same } + val interoperableExample = Behaviors.receive[Any] { (context, _) => + // #interoperability-with-mailbox + import org.apache.pekko.actor.typed.DispatcherSelector + + context.spawn(yourBehavior, "DefaultDispatcher") + context.spawn(yourBehavior, "ExplicitDefaultDispatcher", + DispatcherSelector.default().withMailboxFromConfig("your-mailbox")) + context.spawn(yourBehavior, "BlockingDispatcher", + DispatcherSelector.blocking().withMailboxFromConfig("your-mailbox")) + context.spawn(yourBehavior, "ParentDispatcher", + DispatcherSelector.sameAsParent().withMailboxFromConfig("your-mailbox")) + context.spawn(yourBehavior, "DispatcherFromConfig", + DispatcherSelector.fromConfig("your-dispatcher").withMailboxFromConfig("your-mailbox")) + // #interoperability-with-mailbox + + Behaviors.same + } + } class DispatchersDocSpec diff --git a/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/MailboxDocSpec.scala b/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/MailboxDocSpec.scala index 1d24535dab9..aecf77f4ba8 100644 --- a/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/MailboxDocSpec.scala +++ b/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/MailboxDocSpec.scala @@ -18,6 +18,7 @@ import pekko.Done import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.typed.Behavior +import pekko.actor.typed.Dispatchers import pekko.actor.typed.MailboxSelector import pekko.actor.typed.scaladsl.Behaviors import com.typesafe.config.ConfigFactory @@ -47,6 +48,29 @@ class MailboxDocSpec probe.receiveMessage() } + + "interoperability with DispatcherSelector" in { + + val probe = createTestProbe[Done]() + val childBehavior: Behavior[String] = Behaviors.empty + val parent: Behavior[Unit] = Behaviors.setup { context => + // #interoperability-with-dispatcher + context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100).withDispatcherDefault) + + val props = + MailboxSelector.fromConfig("my-app.my-special-mailbox").withDispatcherFromConfig( + Dispatchers.DefaultDispatcherId) + context.spawn(childBehavior, "from-config-mailbox-child", props) + // #interoperability-with-dispatcher + + probe.ref ! Done + Behaviors.stopped + } + spawn(parent) + + probe.receiveMessage() + + } } } diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/scaladsl/DispatcherSelectorSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/scaladsl/DispatcherSelectorSpec.scala index c1ce21cc991..62dcd12c5e4 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/scaladsl/DispatcherSelectorSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/scaladsl/DispatcherSelectorSpec.scala @@ -28,7 +28,9 @@ import pekko.actor.testkit.typed.scaladsl.TestProbe import pekko.actor.typed.ActorRef import pekko.actor.typed.ActorSystem import pekko.actor.typed.Behavior +import pekko.actor.typed.DispatcherSelector import pekko.actor.typed.Props +import pekko.actor.typed.scaladsl.AskPattern._ import pekko.actor.typed.SpawnProtocol object DispatcherSelectorSpec { @@ -64,7 +66,7 @@ class DispatcherSelectorSpec(config: Config) "DispatcherSelector" must { - "select dispatcher from config" in { + "select dispatcher from empty Props" in { val probe = createTestProbe[Pong]() val pingPong = spawn(PingPong(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher")) pingPong ! Ping(probe.ref) @@ -73,18 +75,27 @@ class DispatcherSelectorSpec(config: Config) response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher") } + "select dispatcher from DispatcherSelector" in { + val probe = createTestProbe[Pong]() + val pingPong = spawn(PingPong(), DispatcherSelector.fromConfig("ping-pong-dispatcher")) + pingPong ! Ping(probe.ref) + + val response = probe.receiveMessage() + response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher") + } + "detect unknown dispatcher from config" in { val probe = createTestProbe[Pong]() LoggingTestKit.error("Spawn failed").expect { - val ref = spawn(PingPong(), Props.empty.withDispatcherFromConfig("unknown")) + val ref = spawn(PingPong(), DispatcherSelector.fromConfig("unknown")) probe.expectTerminated(ref) } } "select same dispatcher as parent" in { - val parent = spawn(SpawnProtocol(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher")) + val parent = spawn(SpawnProtocol(), DispatcherSelector.fromConfig("ping-pong-dispatcher")) val childProbe = createTestProbe[ActorRef[Ping]]() - parent ! SpawnProtocol.Spawn(PingPong(), "child", Props.empty.withDispatcherSameAsParent, childProbe.ref) + parent ! SpawnProtocol.Spawn(PingPong(), "child", DispatcherSelector.sameAsParent(), childProbe.ref) val probe = createTestProbe[Pong]() val child = childProbe.receiveMessage() @@ -95,19 +106,13 @@ class DispatcherSelectorSpec(config: Config) } "select same dispatcher as parent, several levels" in { - val grandParent = spawn(SpawnProtocol(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher")) - val parentProbe = createTestProbe[ActorRef[SpawnProtocol.Spawn[Ping]]]() - grandParent ! SpawnProtocol.Spawn( - SpawnProtocol(), - "parent", - Props.empty.withDispatcherSameAsParent, - parentProbe.ref) - - val childProbe = createTestProbe[ActorRef[Ping]]() - grandParent ! SpawnProtocol.Spawn(PingPong(), "child", Props.empty.withDispatcherSameAsParent, childProbe.ref) + val guardian = spawn(SpawnProtocol(), DispatcherSelector.fromConfig("ping-pong-dispatcher")) + val parent: ActorRef[SpawnProtocol.Command] = guardian.ask((replyTo: ActorRef[ActorRef[SpawnProtocol.Command]]) => + SpawnProtocol.Spawn(SpawnProtocol(), "parent", DispatcherSelector.sameAsParent(), replyTo)).futureValue + val child: ActorRef[Ping] = parent.ask((reply: ActorRef[ActorRef[Ping]]) => + SpawnProtocol.Spawn(PingPong(), "child", DispatcherSelector.sameAsParent(), reply)).futureValue val probe = createTestProbe[Pong]() - val child = childProbe.receiveMessage() child ! Ping(probe.ref) val response = probe.receiveMessage() @@ -119,7 +124,7 @@ class DispatcherSelectorSpec(config: Config) PingPong(), "DispatcherSelectorSpec2", ActorSystemSetup.create(BootstrapSetup()), - Props.empty.withDispatcherSameAsParent) + DispatcherSelector.sameAsParent()) try { val probe = TestProbe[Pong]()(sys) sys ! Ping(probe.ref) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/scaladsl/MailboxSelectorSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/scaladsl/MailboxSelectorSpec.scala new file mode 100644 index 00000000000..0f9b6a42c7b --- /dev/null +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/scaladsl/MailboxSelectorSpec.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.actor.typed.scaladsl + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +import org.apache.pekko +import pekko.actor.ActorCell +import pekko.actor.testkit.typed.scaladsl.LogCapturing +import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import pekko.actor.typed.ActorRef +import pekko.actor.typed.Behavior +import pekko.actor.typed.DispatcherSelector +import pekko.actor.typed.MailboxSelector +import pekko.actor.typed.Props +import pekko.actor.typed.internal.adapter.ActorContextAdapter +import pekko.actor.typed.scaladsl.AskPattern._ +import pekko.dispatch.BoundedMessageQueueSemantics +import pekko.dispatch.BoundedNodeMessageQueue +import pekko.dispatch.Dispatchers +import pekko.dispatch.MessageQueue +import pekko.dispatch.NodeMessageQueue + +object MailboxSelectorSpec { + val config = ConfigFactory.parseString( + """ + specific-mailbox { + mailbox-type = "org.apache.pekko.dispatch.NonBlockingBoundedMailbox" + mailbox-capacity = 4 + } + """) + + object PingPong { + case class Ping(replyTo: ActorRef[Pong]) + + case class Pong(threadName: String) + + def apply(): Behavior[Ping] = + Behaviors.receiveMessage[Ping] { message => + message.replyTo ! Pong(Thread.currentThread().getName) + Behaviors.same + } + + } + +} + +class MailboxSelectorSpec(config: Config) + extends ScalaTestWithActorTestKit(config) + with AnyWordSpecLike + with LogCapturing { + + def this() = this(MailboxSelectorSpec.config) + + sealed trait Command + case class WhatsYourMailbox(replyTo: ActorRef[MessageQueue]) extends Command + case class WhatsYourDispatcher(replyTo: ActorRef[String]) extends Command + + private def extract[R](context: ActorContext[_], f: ActorCell => R): R = { + context match { + case adapter: ActorContextAdapter[_] => + adapter.classicActorContext match { + case cell: ActorCell => f(cell) + case unexpected => throw new RuntimeException(s"Unexpected: $unexpected") + } + case unexpected => throw new RuntimeException(s"Unexpected: $unexpected") + } + } + + private def behavior: Behavior[Command] = + Behaviors.setup { context => + Behaviors.receiveMessage[Command] { + case WhatsYourMailbox(replyTo) => + replyTo ! extract(context, cell => cell.mailbox.messageQueue) + Behaviors.same + case WhatsYourDispatcher(replyTo) => + replyTo ! extract(context, cell => cell.dispatcher.id) + Behaviors.same + } + } + + "MailboxSelectorSpec" must { + + "default is unbounded" in { + val actor = spawn(behavior) + val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue + mailbox shouldBe a[NodeMessageQueue] + } + + "select an specific mailbox from MailboxSelector " in { + val actor = spawn(behavior, MailboxSelector.fromConfig("specific-mailbox")) + val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue + mailbox shouldBe a[BoundedMessageQueueSemantics] + mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4) + } + + "select an specific mailbox from empty Props " in { + val actor = spawn(behavior, Props.empty.withMailboxFromConfig("specific-mailbox")) + val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue + mailbox shouldBe a[BoundedMessageQueueSemantics] + mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4) + } + + "select an specific mailbox from DispatcherSelector " in { + val actor = spawn(behavior, DispatcherSelector.blocking().withMailboxFromConfig("specific-mailbox")) + val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue + mailbox shouldBe a[BoundedMessageQueueSemantics] + mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4) + val dispatcher = actor.ask(WhatsYourDispatcher(_)).futureValue + dispatcher shouldBe Dispatchers.DefaultBlockingDispatcherId + } + + } + +} diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/Props.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/Props.scala index afc2ece0b35..3c8a8d23f3c 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/Props.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/Props.scala @@ -81,6 +81,14 @@ abstract class Props private[pekko] () extends Product with Serializable { */ def withDispatcherFromConfig(path: String): Props = DispatcherFromConfig(path, this) + /** + * Prepend a selection of the mailbox found at the given Config path to this Props. + * The path is relative to the configuration root of the [[ActorSystem]] that looks up the + * mailbox. + * @since 1.1.0 + */ + def withMailboxFromConfig(path: String): Props = MailboxFromConfigSelector(path, this) + /** * Prepend a selection of the same executor as the parent actor to this Props. */ diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 46890c5ba1f..6831cf056db 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -324,7 +324,7 @@ pekko { chance-of-exploration = 0.4 # When downsizing after a long streak of under-utilization, the resizer - # will downsize the pool to the highest utilization multiplied by a + # will downsize the pool to the highest utilization multiplied by # a downsize ratio. This downsize ratio determines the new pools size # in comparison to the highest utilization. # E.g. if the highest utilization is 10, and the down size ratio diff --git a/docs/src/main/paradox/typed/dispatchers.md b/docs/src/main/paradox/typed/dispatchers.md index c98be3a043e..2347dc73001 100644 --- a/docs/src/main/paradox/typed/dispatchers.md +++ b/docs/src/main/paradox/typed/dispatchers.md @@ -55,7 +55,7 @@ A default dispatcher is used for all actors that are spawned without specifying This is suitable for all actors that don't block. Blocking in actors needs to be carefully managed, more details @ref:[here](#blocking-needs-careful-management). -To select a dispatcher use `DispatcherSelector` to create a `Props` instance for spawning your actor: +To select a dispatcher use @apidoc[DispatcherSelector](DispatcherSelector$) to create a @apidoc[Props](typed.Props) instance for spawning your actor: Scala : @@snip [DispatcherDocSpec.scala](/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/DispatchersDocSpec.scala) { #spawn-dispatcher } @@ -74,6 +74,17 @@ The final example shows how to load a custom dispatcher from configuration and r @@snip [DispatcherDocSpec.scala](/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/DispatchersDocSpec.scala) { #config } +### Interoperability with MailboxSelector + +The @apidoc[DispatcherSelector](DispatcherSelector$) will create a @apidoc[Props](typed.Props) instance that can be both set up Dispatcher and Mailbox, +which means that you can continue to set up Mailbox through chain calls. + +Scala +: @@snip [DispatcherDocSpec.scala](/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/DispatchersDocSpec.scala) { #interoperability-with-mailbox } + +Java +: @@snip [DispatcherDocTest.java](/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/DispatchersDocTest.java) { #interoperability-with-mailbox } + ## Types of dispatchers There are 2 different types of message dispatchers: diff --git a/docs/src/main/paradox/typed/mailboxes.md b/docs/src/main/paradox/typed/mailboxes.md index e2ed3e8b0c9..5c73293ae5a 100644 --- a/docs/src/main/paradox/typed/mailboxes.md +++ b/docs/src/main/paradox/typed/mailboxes.md @@ -58,6 +58,17 @@ configuration section from the @apidoc[ActorSystem](typed.ActorSystem) configura `id` key with the configuration path of the mailbox type and adding a fall-back to the default mailbox configuration section. +### Interoperability with DispatcherSelector + +The @apidoc[MailboxSelector](MailboxSelector$) will create a @apidoc[Props](typed.Props) instance that can be both set up Dispatcher and Mailbox, +which means that you can continue to set up Dispatcher through chain calls. + +Scala +: @@snip [MailboxDocSpec.scala](/actor-typed-tests/src/test/scala/docs/org/apache/pekko/typed/MailboxDocSpec.scala) { #interoperability-with-dispatcher } + +Java +: @@snip [MailboxDocTest.java](/actor-typed-tests/src/test/java/jdocs/org/apache/pekko/typed/MailboxDocTest.java) { #interoperability-with-dispatcher } + ## Mailbox Implementations Pekko ships with a number of mailbox implementations: