Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

additional mailbox selector for typed props #1096

Merged
merged 13 commits into from
Aug 28, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,27 @@ public static void spawnDispatchers(ActorContext<Integer> context, Behavior<Stri
behavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"));
// #spawn-dispatcher
}

public static void spawnDispatchersWithInteroperability(
ActorContext<Integer> context, Behavior<String> behavior) {
// #interoperability-with-mailbox
context.spawn(
behavior,
"ExplicitDefaultDispatcher",
DispatcherSelector.defaultDispatcher().withMailboxFromConfig("my-app.my-special-mailbox"));
context.spawn(
behavior,
"BlockingDispatcher",
DispatcherSelector.blocking().withMailboxFromConfig("my-app.my-special-mailbox"));
context.spawn(
behavior,
"ParentDispatcher",
DispatcherSelector.sameAsParent().withMailboxFromConfig("my-app.my-special-mailbox"));
context.spawn(
behavior,
"DispatcherFromConfig",
DispatcherSelector.fromConfig("your-dispatcher")
.withMailboxFromConfig("my-app.my-special-mailbox"));
// #interoperability-with-mailbox
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,33 @@ public void startSomeActorsWithDifferentMailboxes() {
ActorRef<Void> ref = testKit.spawn(setup);
testProbe.receiveMessage();
}

@Test
public void startSomeActorsWithMailboxSelectorInteroperability() {
TestProbe<Done> testProbe = testKit.createTestProbe();
Behavior<String> childBehavior = Behaviors.empty();

Behavior<Void> setup =
Behaviors.setup(
context -> {
// #interoperability-with-dispatcher
context.spawn(
childBehavior,
"bounded-mailbox-child",
MailboxSelector.bounded(100).withDispatcherDefault());

context.spawn(
childBehavior,
"from-config-mailbox-child",
MailboxSelector.fromConfig("my-app.my-special-mailbox")
.withMailboxFromConfig("your-dispatcher"));
// #interoperability-with-dispatcher

testProbe.ref().tell(Done.getInstance());
return Behaviors.stopped();
});

ActorRef<Void> ref = testKit.spawn(setup);
testProbe.receiveMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,29 @@ object DispatchersDocSpec {
Behaviors.same
}

val interoperableExample = Behaviors.receive[Any] { (context, _) =>
// #interoperability-with-mailbox
import org.apache.pekko.actor.typed.DispatcherSelector

context.spawn(yourBehavior, "DefaultDispatcher")
context.spawn(yourBehavior, "ExplicitDefaultDispatcher",
DispatcherSelector.default().withMailboxFromConfig("my-app.my-special-mailbox"))
context.spawn(yourBehavior, "BlockingDispatcher",
DispatcherSelector.blocking().withMailboxFromConfig("my-app.my-special-mailbox"))
context.spawn(yourBehavior, "ParentDispatcher",
DispatcherSelector.sameAsParent().withMailboxFromConfig("my-app.my-special-mailbox"))
context.spawn(yourBehavior, "DispatcherFromConfig",
DispatcherSelector.fromConfig("your-dispatcher").withMailboxFromConfig("my-app.my-special-mailbox"))
// #interoperability-with-mailbox

Behaviors.same
}

}

class DispatchersDocSpec
extends ScalaTestWithActorTestKit(DispatchersDocSpec.config)
extends ScalaTestWithActorTestKit(
DispatchersDocSpec.config.withFallback(ConfigFactory.load("mailbox-config-sample.conf")))
with AnyWordSpecLike
with LogCapturing {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike

class MailboxDocSpec
extends ScalaTestWithActorTestKit(ConfigFactory.load("mailbox-config-sample.conf"))
extends ScalaTestWithActorTestKit(
ConfigFactory.load("mailbox-config-sample.conf").withFallback(DispatchersDocSpec.config))
with AnyWordSpecLike
with LogCapturing {

Expand All @@ -47,6 +48,28 @@ class MailboxDocSpec

probe.receiveMessage()
}

"interoperability with DispatcherSelector" in {

val probe = createTestProbe[Done]()
val childBehavior: Behavior[String] = Behaviors.empty
val parent: Behavior[Unit] = Behaviors.setup { context =>
// #interoperability-with-dispatcher
context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100).withDispatcherDefault)

val props =
MailboxSelector.fromConfig("my-app.my-special-mailbox").withDispatcherFromConfig("your-dispatcher")
context.spawn(childBehavior, "from-config-mailbox-child", props)
// #interoperability-with-dispatcher

probe.ref ! Done
Behaviors.stopped
}
spawn(parent)

probe.receiveMessage()

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package org.apache.pekko.actor.typed.scaladsl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike

import org.apache.pekko
import pekko.actor.typed.DispatcherSelector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you reorder the imports and put back the empty line above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about the delay, could you review this again and contain this in release 1.1.0?

import pekko.actor.typed.Props
import pekko.actor.typed.scaladsl.AskPattern._
import pekko.actor.BootstrapSetup
import pekko.actor.setup.ActorSystemSetup
import pekko.actor.testkit.typed.scaladsl.ActorTestKit
Expand All @@ -28,7 +30,6 @@ import pekko.actor.testkit.typed.scaladsl.TestProbe
import pekko.actor.typed.ActorRef
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Behavior
import pekko.actor.typed.Props
import pekko.actor.typed.SpawnProtocol

object DispatcherSelectorSpec {
Expand Down Expand Up @@ -64,7 +65,7 @@ class DispatcherSelectorSpec(config: Config)

"DispatcherSelector" must {

"select dispatcher from config" in {
"select dispatcher from empty Props" in {
val probe = createTestProbe[Pong]()
val pingPong = spawn(PingPong(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
pingPong ! Ping(probe.ref)
Expand All @@ -73,18 +74,27 @@ class DispatcherSelectorSpec(config: Config)
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
}

"select dispatcher from DispatcherSelector" in {
val probe = createTestProbe[Pong]()
val pingPong = spawn(PingPong(), DispatcherSelector.fromConfig("ping-pong-dispatcher"))
pingPong ! Ping(probe.ref)

val response = probe.receiveMessage()
response.threadName should startWith("DispatcherSelectorSpec-ping-pong-dispatcher")
}

"detect unknown dispatcher from config" in {
val probe = createTestProbe[Pong]()
LoggingTestKit.error("Spawn failed").expect {
val ref = spawn(PingPong(), Props.empty.withDispatcherFromConfig("unknown"))
val ref = spawn(PingPong(), DispatcherSelector.fromConfig("unknown"))
probe.expectTerminated(ref)
}
}

"select same dispatcher as parent" in {
val parent = spawn(SpawnProtocol(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
val parent = spawn(SpawnProtocol(), DispatcherSelector.fromConfig("ping-pong-dispatcher"))
val childProbe = createTestProbe[ActorRef[Ping]]()
parent ! SpawnProtocol.Spawn(PingPong(), "child", Props.empty.withDispatcherSameAsParent, childProbe.ref)
parent ! SpawnProtocol.Spawn(PingPong(), "child", DispatcherSelector.sameAsParent(), childProbe.ref)

val probe = createTestProbe[Pong]()
val child = childProbe.receiveMessage()
Expand All @@ -95,19 +105,13 @@ class DispatcherSelectorSpec(config: Config)
}

"select same dispatcher as parent, several levels" in {
val grandParent = spawn(SpawnProtocol(), Props.empty.withDispatcherFromConfig("ping-pong-dispatcher"))
val parentProbe = createTestProbe[ActorRef[SpawnProtocol.Spawn[Ping]]]()
grandParent ! SpawnProtocol.Spawn(
SpawnProtocol(),
"parent",
Props.empty.withDispatcherSameAsParent,
parentProbe.ref)

val childProbe = createTestProbe[ActorRef[Ping]]()
grandParent ! SpawnProtocol.Spawn(PingPong(), "child", Props.empty.withDispatcherSameAsParent, childProbe.ref)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix children not spawn from the parent

val guardian = spawn(SpawnProtocol(), DispatcherSelector.fromConfig("ping-pong-dispatcher"))
val parent: ActorRef[SpawnProtocol.Command] = guardian.ask((replyTo: ActorRef[ActorRef[SpawnProtocol.Command]]) =>
SpawnProtocol.Spawn(SpawnProtocol(), "parent", DispatcherSelector.sameAsParent(), replyTo)).futureValue
val child: ActorRef[Ping] = parent.ask((reply: ActorRef[ActorRef[Ping]]) =>
SpawnProtocol.Spawn(PingPong(), "child", DispatcherSelector.sameAsParent(), reply)).futureValue

val probe = createTestProbe[Pong]()
val child = childProbe.receiveMessage()
child ! Ping(probe.ref)

val response = probe.receiveMessage()
Expand All @@ -119,7 +123,7 @@ class DispatcherSelectorSpec(config: Config)
PingPong(),
"DispatcherSelectorSpec2",
ActorSystemSetup.create(BootstrapSetup()),
Props.empty.withDispatcherSameAsParent)
DispatcherSelector.sameAsParent())
try {
val probe = TestProbe[Pong]()(sys)
sys ! Ping(probe.ref)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.actor.typed.scaladsl

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.actor.ActorCell
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.actor.typed.Behavior
import pekko.actor.typed.DispatcherSelector
import pekko.actor.typed.MailboxSelector
import pekko.actor.typed.Props
import pekko.actor.typed.internal.adapter.ActorContextAdapter
import pekko.actor.typed.scaladsl.AskPattern._
import pekko.dispatch.BoundedMessageQueueSemantics
import pekko.dispatch.BoundedNodeMessageQueue
import pekko.dispatch.Dispatchers
import pekko.dispatch.MessageQueue
import pekko.dispatch.NodeMessageQueue
import org.scalatest.wordspec.AnyWordSpecLike

object MailboxSelectorSpec {
val config = ConfigFactory.parseString(
"""
specific-mailbox {
mailbox-type = "org.apache.pekko.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 4
}
""")

object PingPong {
case class Ping(replyTo: ActorRef[Pong])

case class Pong(threadName: String)

def apply(): Behavior[Ping] =
Behaviors.receiveMessage[Ping] { message =>
message.replyTo ! Pong(Thread.currentThread().getName)
Behaviors.same
}

}

}

class MailboxSelectorSpec(config: Config)
extends ScalaTestWithActorTestKit(config)
with AnyWordSpecLike
with LogCapturing {

def this() = this(MailboxSelectorSpec.config)

sealed trait Command
case class WhatsYourMailbox(replyTo: ActorRef[MessageQueue]) extends Command
case class WhatsYourDispatcher(replyTo: ActorRef[String]) extends Command

private def extract[R](context: ActorContext[_], f: ActorCell => R): R = {
context match {
case adapter: ActorContextAdapter[_] =>
adapter.classicActorContext match {
case cell: ActorCell => f(cell)
case unexpected => throw new RuntimeException(s"Unexpected: $unexpected")
}
case unexpected => throw new RuntimeException(s"Unexpected: $unexpected")
}
}

private def behavior: Behavior[Command] =
Behaviors.setup { context =>
Behaviors.receiveMessage[Command] {
case WhatsYourMailbox(replyTo) =>
replyTo ! extract(context, cell => cell.mailbox.messageQueue)
Behaviors.same
case WhatsYourDispatcher(replyTo) =>
replyTo ! extract(context, cell => cell.dispatcher.id)
Behaviors.same
}
}

"MailboxSelectorSpec" must {

"default is unbounded" in {
val actor = spawn(behavior)
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
mailbox shouldBe a[NodeMessageQueue]
}

"select an specific mailbox from MailboxSelector " in {
val actor = spawn(behavior, MailboxSelector.fromConfig("specific-mailbox"))
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
mailbox shouldBe a[BoundedMessageQueueSemantics]
mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4)
}

"select an specific mailbox from empty Props " in {
val actor = spawn(behavior, Props.empty.withMailboxFromConfig("specific-mailbox"))
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
mailbox shouldBe a[BoundedMessageQueueSemantics]
mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4)
}

"select an specific mailbox from DispatcherSelector " in {
val actor = spawn(behavior, DispatcherSelector.blocking().withMailboxFromConfig("specific-mailbox"))
val mailbox = actor.ask(WhatsYourMailbox(_)).futureValue
mailbox shouldBe a[BoundedMessageQueueSemantics]
mailbox.asInstanceOf[BoundedNodeMessageQueue].capacity should ===(4)
val dispatcher = actor.ask(WhatsYourDispatcher(_)).futureValue
dispatcher shouldBe Dispatchers.DefaultBlockingDispatcherId
}

}

}
4 changes: 2 additions & 2 deletions actor-typed/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pekko.actor.typed {
# buffer. If the capacity is exceed then additional incoming messages are dropped.
restart-stash-capacity = 1000

# Typed mailbox defaults to the single consumer mailbox as balancing dispatcher is not supported
# Typed mailbox defaults to the single consumber mailbox as balancing dispatcher is not supported
Roiocam marked this conversation as resolved.
Show resolved Hide resolved
default-mailbox {
mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
Expand Down Expand Up @@ -73,7 +73,7 @@ pekko.reliable-delivery {
# To avoid head of line blocking from serialization and transfer
# of large messages this can be enabled.
# Large messages are chunked into pieces of the given size in bytes. The
# chunked messages are sent separately and assembled on the consumer side.
# chunked messages are sent separatetely and assembled on the consumer side.
Roiocam marked this conversation as resolved.
Show resolved Hide resolved
# Serialization and deserialization is performed by the ProducerController and
# ConsumerController respectively instead of in the remote transport layer.
chunk-large-messages = off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ abstract class Props private[pekko] () extends Product with Serializable {
*/
def withDispatcherFromConfig(path: String): Props = DispatcherFromConfig(path, this)

/**
* Prepend a selection of the mailbox found at the given Config path to this Props.
* The path is relative to the configuration root of the [[ActorSystem]] that looks up the
* mailbox.
Roiocam marked this conversation as resolved.
Show resolved Hide resolved
*/
def withMailboxFromConfig(path: String): Props = MailboxFromConfigSelector(path, this)

/**
* Prepend a selection of the same executor as the parent actor to this Props.
*/
Expand Down
Loading
Loading