Skip to content

Commit

Permalink
additional mailbox selector for typed props (#1096)
Browse files Browse the repository at this point in the history
* additional mailbox selector for typed props

* add unit test

* chore change of unit test

* Revert "configuration typo"

This reverts commit 7917feb.

* fix pekko imports

* mention interoperability in doc

* share configuration in tests

* revert configuration change

* fix new typo

* fix jdocs tests

* optimized import

* mention api version in doc

* resolve import issue
  • Loading branch information
Roiocam authored Aug 28, 2024
1 parent db8d20d commit d1ec224
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,27 @@ public static void spawnDispatchers(ActorContext<Integer> context, Behavior<Stri
behavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"));
// #spawn-dispatcher
}

public static void spawnDispatchersWithInteroperability(
ActorContext<Integer> context, Behavior<String> behavior) {
// #interoperability-with-mailbox
context.spawn(
behavior,
"ExplicitDefaultDispatcher",
DispatcherSelector.defaultDispatcher().withMailboxFromConfig("my-app.my-special-mailbox"));
context.spawn(
behavior,
"BlockingDispatcher",
DispatcherSelector.blocking().withMailboxFromConfig("my-app.my-special-mailbox"));
context.spawn(
behavior,
"ParentDispatcher",
DispatcherSelector.sameAsParent().withMailboxFromConfig("my-app.my-special-mailbox"));
context.spawn(
behavior,
"DispatcherFromConfig",
DispatcherSelector.fromConfig("your-dispatcher")
.withMailboxFromConfig("my-app.my-special-mailbox"));
// #interoperability-with-mailbox
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.Dispatchers;
import org.apache.pekko.actor.typed.MailboxSelector;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import com.typesafe.config.ConfigFactory;
Expand Down Expand Up @@ -59,4 +60,33 @@ public void startSomeActorsWithDifferentMailboxes() {
ActorRef<Void> ref = testKit.spawn(setup);
testProbe.receiveMessage();
}

@Test
public void startSomeActorsWithMailboxSelectorInteroperability() {
TestProbe<Done> testProbe = testKit.createTestProbe();
Behavior<String> childBehavior = Behaviors.empty();

Behavior<Void> setup =
Behaviors.setup(
context -> {
// #interoperability-with-dispatcher
context.spawn(
childBehavior,
"bounded-mailbox-child",
MailboxSelector.bounded(100).withDispatcherDefault());

context.spawn(
childBehavior,
"from-config-mailbox-child",
MailboxSelector.fromConfig("my-app.my-special-mailbox")
.withDispatcherFromConfig(Dispatchers.DefaultDispatcherId()));
// #interoperability-with-dispatcher

testProbe.ref().tell(Done.getInstance());
return Behaviors.stopped();
});

ActorRef<Void> ref = testKit.spawn(setup);
testProbe.receiveMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ object DispatchersDocSpec {
throughput = 1
}
//#config
your-mailbox {
mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
""".stripMargin)

case class WhichDispatcher(replyTo: ActorRef[Dispatcher])
Expand All @@ -65,6 +68,24 @@ object DispatchersDocSpec {
Behaviors.same
}

val interoperableExample = Behaviors.receive[Any] { (context, _) =>
// #interoperability-with-mailbox
import org.apache.pekko.actor.typed.DispatcherSelector

context.spawn(yourBehavior, "DefaultDispatcher")
context.spawn(yourBehavior, "ExplicitDefaultDispatcher",
DispatcherSelector.default().withMailboxFromConfig("your-mailbox"))
context.spawn(yourBehavior, "BlockingDispatcher",
DispatcherSelector.blocking().withMailboxFromConfig("your-mailbox"))
context.spawn(yourBehavior, "ParentDispatcher",
DispatcherSelector.sameAsParent().withMailboxFromConfig("your-mailbox"))
context.spawn(yourBehavior, "DispatcherFromConfig",
DispatcherSelector.fromConfig("your-dispatcher").withMailboxFromConfig("your-mailbox"))
// #interoperability-with-mailbox

Behaviors.same
}

}

class DispatchersDocSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import pekko.Done
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.typed.Behavior
import pekko.actor.typed.Dispatchers
import pekko.actor.typed.MailboxSelector
import pekko.actor.typed.scaladsl.Behaviors
import com.typesafe.config.ConfigFactory
Expand Down Expand Up @@ -47,6 +48,29 @@ class MailboxDocSpec

probe.receiveMessage()
}

"interoperability with DispatcherSelector" in {

val probe = createTestProbe[Done]()
val childBehavior: Behavior[String] = Behaviors.empty
val parent: Behavior[Unit] = Behaviors.setup { context =>
// #interoperability-with-dispatcher
context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100).withDispatcherDefault)

val props =
MailboxSelector.fromConfig("my-app.my-special-mailbox").withDispatcherFromConfig(
Dispatchers.DefaultDispatcherId)
context.spawn(childBehavior, "from-config-mailbox-child", props)
// #interoperability-with-dispatcher

probe.ref ! Done
Behaviors.stopped
}
spawn(parent)

probe.receiveMessage()

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import pekko.actor.testkit.typed.scaladsl.TestProbe
import pekko.actor.typed.ActorRef
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Behavior
import pekko.actor.typed.DispatcherSelector
import pekko.actor.typed.Props
import pekko.actor.typed.scaladsl.AskPattern._
import pekko.actor.typed.SpawnProtocol

object DispatcherSelectorSpec {
Expand Down Expand Up @@ -64,7 +66,7 @@ class DispatcherSelectorSpec(config: Config)

"DispatcherSelector" must {

"select dispatcher from config" in {
"select dispatcher from empty Props" in {
val probe = createTestProbe[Pong]()
val pingPong = spawn(PingPong(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
pingPong ! Ping(probe.ref)
Expand All @@ -73,18 +75,27 @@ class DispatcherSelectorSpec(config: Config)
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
}

"select dispatcher from DispatcherSelector" in {
val probe = createTestProbe[Pong]()
val pingPong = spawn(PingPong(), DispatcherSelector.fromConfig("ping-pong-dispatcher"))
pingPong ! Ping(probe.ref)

val response = probe.receiveMessage()
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
}

"detect unknown dispatcher from config" in {
val probe = createTestProbe[Pong]()
LoggingTestKit.error("Spawn failed").expect {
val ref = spawn(PingPong(), Props.empty.withDispatcherFromConfig("unknown"))
val ref = spawn(PingPong(), DispatcherSelector.fromConfig("unknown"))
probe.expectTerminated(ref)
}
}

"select same dispatcher as parent" in {
val parent = spawn(SpawnProtocol(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
val parent = spawn(SpawnProtocol(), DispatcherSelector.fromConfig("ping-pong-dispatcher"))
val childProbe = createTestProbe[ActorRef[Ping]]()
parent ! SpawnProtocol.Spawn(PingPong(), "child", Props.empty.withDispatcherSameAsParent, childProbe.ref)
parent ! SpawnProtocol.Spawn(PingPong(), "child", DispatcherSelector.sameAsParent(), childProbe.ref)

val probe = createTestProbe[Pong]()
val child = childProbe.receiveMessage()
Expand All @@ -95,19 +106,13 @@ class DispatcherSelectorSpec(config: Config)
}

"select same dispatcher as parent, several levels" in {
val grandParent = spawn(SpawnProtocol(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
val parentProbe = createTestProbe[ActorRef[SpawnProtocol.Spawn[Ping]]]()
grandParent ! SpawnProtocol.Spawn(
SpawnProtocol(),
"parent",
Props.empty.withDispatcherSameAsParent,
parentProbe.ref)

val childProbe = createTestProbe[ActorRef[Ping]]()
grandParent ! SpawnProtocol.Spawn(PingPong(), "child", Props.empty.withDispatcherSameAsParent, childProbe.ref)
val guardian = spawn(SpawnProtocol(), DispatcherSelector.fromConfig("ping-pong-dispatcher"))
val parent: ActorRef[SpawnProtocol.Command] = guardian.ask((replyTo: ActorRef[ActorRef[SpawnProtocol.Command]]) =>
SpawnProtocol.Spawn(SpawnProtocol(), "parent", DispatcherSelector.sameAsParent(), replyTo)).futureValue
val child: ActorRef[Ping] = parent.ask((reply: ActorRef[ActorRef[Ping]]) =>
SpawnProtocol.Spawn(PingPong(), "child", DispatcherSelector.sameAsParent(), reply)).futureValue

val probe = createTestProbe[Pong]()
val child = childProbe.receiveMessage()
child ! Ping(probe.ref)

val response = probe.receiveMessage()
Expand All @@ -119,7 +124,7 @@ class DispatcherSelectorSpec(config: Config)
PingPong(),
"DispatcherSelectorSpec2",
ActorSystemSetup.create(BootstrapSetup()),
Props.empty.withDispatcherSameAsParent)
DispatcherSelector.sameAsParent())
try {
val probe = TestProbe[Pong]()(sys)
sys ! Ping(probe.ref)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.actor.typed.scaladsl

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike

import org.apache.pekko
import pekko.actor.ActorCell
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.actor.typed.Behavior
import pekko.actor.typed.DispatcherSelector
import pekko.actor.typed.MailboxSelector
import pekko.actor.typed.Props
import pekko.actor.typed.internal.adapter.ActorContextAdapter
import pekko.actor.typed.scaladsl.AskPattern._
import pekko.dispatch.BoundedMessageQueueSemantics
import pekko.dispatch.BoundedNodeMessageQueue
import pekko.dispatch.Dispatchers
import pekko.dispatch.MessageQueue
import pekko.dispatch.NodeMessageQueue

object MailboxSelectorSpec {
val config = ConfigFactory.parseString(
"""
specific-mailbox {
mailbox-type = "org.apache.pekko.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 4
}
""")

object PingPong {
case class Ping(replyTo: ActorRef[Pong])

case class Pong(threadName: String)

def apply(): Behavior[Ping] =
Behaviors.receiveMessage[Ping] { message =>
message.replyTo ! Pong(Thread.currentThread().getName)
Behaviors.same
}

}

}

class MailboxSelectorSpec(config: Config)
extends ScalaTestWithActorTestKit(config)
with AnyWordSpecLike
with LogCapturing {

def this() = this(MailboxSelectorSpec.config)

sealed trait Command
case class WhatsYourMailbox(replyTo: ActorRef[MessageQueue]) extends Command
case class WhatsYourDispatcher(replyTo: ActorRef[String]) extends Command

private def extract[R](context: ActorContext[_], f: ActorCell => R): R = {
context match {
case adapter: ActorContextAdapter[_] =>
adapter.classicActorContext match {
case cell: ActorCell => f(cell)
case unexpected => throw new RuntimeException(s"Unexpected: $unexpected")
}
case unexpected => throw new RuntimeException(s"Unexpected: $unexpected")
}
}

private def behavior: Behavior[Command] =
Behaviors.setup { context =>
Behaviors.receiveMessage[Command] {
case WhatsYourMailbox(replyTo) =>
replyTo ! extract(context, cell => cell.mailbox.messageQueue)
Behaviors.same
case WhatsYourDispatcher(replyTo) =>
replyTo ! extract(context, cell => cell.dispatcher.id)
Behaviors.same
}
}

"MailboxSelectorSpec" must {

"default is unbounded" in {
val actor = spawn(behavior)
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
mailbox shouldBe a[NodeMessageQueue]
}

"select an specific mailbox from MailboxSelector " in {
val actor = spawn(behavior, MailboxSelector.fromConfig("specific-mailbox"))
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
mailbox shouldBe a[BoundedMessageQueueSemantics]
mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4)
}

"select an specific mailbox from empty Props " in {
val actor = spawn(behavior, Props.empty.withMailboxFromConfig("specific-mailbox"))
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
mailbox shouldBe a[BoundedMessageQueueSemantics]
mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4)
}

"select an specific mailbox from DispatcherSelector " in {
val actor = spawn(behavior, DispatcherSelector.blocking().withMailboxFromConfig("specific-mailbox"))
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
mailbox shouldBe a[BoundedMessageQueueSemantics]
mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4)
val dispatcher = actor.ask(WhatsYourDispatcher(_)).futureValue
dispatcher shouldBe Dispatchers.DefaultBlockingDispatcherId
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ abstract class Props private[pekko] () extends Product with Serializable {
*/
def withDispatcherFromConfig(path: String): Props = DispatcherFromConfig(path, this)

/**
* Prepend a selection of the mailbox found at the given Config path to this Props.
* The path is relative to the configuration root of the [[ActorSystem]] that looks up the
* mailbox.
* @since 1.1.0
*/
def withMailboxFromConfig(path: String): Props = MailboxFromConfigSelector(path, this)

/**
* Prepend a selection of the same executor as the parent actor to this Props.
*/
Expand Down
2 changes: 1 addition & 1 deletion actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ pekko {
chance-of-exploration = 0.4

# When downsizing after a long streak of under-utilization, the resizer
# will downsize the pool to the highest utilization multiplied by a
# will downsize the pool to the highest utilization multiplied by
# a downsize ratio. This downsize ratio determines the new pools size
# in comparison to the highest utilization.
# E.g. if the highest utilization is 10, and the down size ratio
Expand Down
Loading

0 comments on commit d1ec224

Please sign in to comment.