diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala index b2355e302e7..5dc0dd7697e 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala @@ -32,10 +32,16 @@ import pekko.pattern.StatusReply import pekko.testkit.TestException import pekko.util.Timeout +import scala.util.Failure + object AskSpec { sealed trait Msg final case class Foo(s: String, replyTo: ActorRef[String]) extends Msg + final case class Bar(s: String, duration: FiniteDuration, replyTo: ActorRef[String]) extends Msg final case class Stop(replyTo: ActorRef[Unit]) extends Msg + sealed trait Proxy + final case class ProxyMsg(s: String) extends Proxy + final case class ProxyReply(s: String) extends Proxy } class AskSpec extends ScalaTestWithActorTestKit(""" @@ -52,6 +58,9 @@ class AskSpec extends ScalaTestWithActorTestKit(""" case (_, foo: Foo) => foo.replyTo ! "foo" Behaviors.same + case (ctx, bar: Bar) => + ctx.scheduleOnce(bar.duration, bar.replyTo, "bar") + Behaviors.same case (_, Stop(r)) => r ! (()) Behaviors.stopped @@ -119,6 +128,65 @@ class AskSpec extends ScalaTestWithActorTestKit(""" } } + "publish dead-letter if the context.ask has completed on timeout" in { + import pekko.actor.typed.internal.adapter.ActorRefAdapter._ + implicit val timeout: Timeout = 1.millis + + val actor: ActorRef[Msg] = spawn(behavior) + val mockActor: ActorRef[Proxy] = spawn(Behaviors.receive[Proxy]((context, msg) => + msg match { + case ProxyMsg(s) => + context.ask[Msg, String](actor, Bar(s, 10.millis, _)) { + case Success(result) => ProxyReply(result) + case Failure(ex) => throw ex + } + Behaviors.same + case ProxyReply(s) => + throw new IllegalArgumentException(s"unexpected reply: $s") + })) + + mockActor ! ProxyMsg("foo") + + val deadLetterProbe = createDeadLetterProbe() + + val deadLetter = deadLetterProbe.receiveMessage() + deadLetter.message match { + case s: String => s should ===("bar") + case _ => fail(s"unexpected DeadLetter: $deadLetter") + } + + val deadLettersRef = system.classicSystem.deadLetters + deadLetter.recipient shouldNot equal(deadLettersRef) + deadLetter.recipient shouldNot equal(toClassic(actor)) + deadLetter.recipient shouldNot equal(toClassic(mockActor)) + } + "publish dead-letter if the AskPattern.ask has completed on timeout" in { + implicit val timeout: Timeout = 1.millis + + val deadLetterProbe = createDeadLetterProbe() + val mockProbe = createTestProbe[Msg]() + val mockBusyRef = mockProbe.ref + // this will not completed unit worker reply. + val askResult: Future[String] = mockBusyRef.ask(replyTo => Foo("foo", replyTo)) + val request = mockProbe.expectMessageType[Foo](1.seconds) + // waiting for temporary ask actor terminated with timeout + mockProbe.expectTerminated(request.replyTo) + // verify ask timeout + val result = askResult.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage should startWith("Ask timed out on") + // mock reply manually + request match { + case Foo(s, replyTo) => replyTo ! s + } + + val deadLetter = deadLetterProbe.receiveMessage() + deadLetter.message shouldBe a[String] + val deadLettersRef = system.classicSystem.deadLetters + // that should be not equals, otherwise, it may raise confusion, perform like a dead letter sent to the deadLetterActor. + deadLetter.recipient shouldNot equal(deadLettersRef) + } + "transform a replied org.apache.pekko.actor.Status.Failure to a failed future" in { // It's unlikely but possible that this happens, since the receiving actor would // have to accept a message with an actoref that accepts AnyRef or be doing crazy casting diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala deleted file mode 100644 index 35b596ecfb7..00000000000 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/DeadLetterSpec.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.pekko.actor.typed - -import org.apache.pekko.actor.IllegalActorStateException -import org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing -import org.apache.pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import org.apache.pekko.actor.typed.scaladsl.AskPattern.Askable -import org.apache.pekko.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem -import org.apache.pekko.actor.typed.scaladsl.Behaviors -import org.apache.pekko.actor.typed.scaladsl.Behaviors._ -import org.apache.pekko.util.Timeout -import org.scalatest.wordspec.AnyWordSpecLike - -import java.util.concurrent.CountDownLatch -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.TimeoutException -import scala.concurrent.duration._ -import scala.util.Failure -import scala.util.Success - -sealed trait Command -case class Multiply(a: Int, b: Int, forwardRef: ActorRef[WorkerCommand], replyTo: ActorRef[Int]) extends Command -case class ReplyResult(num: Int, replyTo: ActorRef[Int]) extends Command -case class Ignore() extends Command - -sealed trait WorkerCommand -case class WorkerMultiply(a: Int, b: Int, replyTo: ActorRef[WorkerResult]) extends WorkerCommand -case class WorkerResult(num: Int) extends WorkerCommand - -class ManualTerminatedTestSetup(val workerLatch: CountDownLatch) { - implicit val timeout: Timeout = 10.millis - - def forwardBehavior: Behavior[Command] = - setup[Command] { context => - Behaviors.receiveMessage[Command] { msg => - msg match { - case Multiply(a, b, ref, replyTo) => - // context.ask is asynchronous - context.ask[WorkerCommand, WorkerResult](ref, resultReply => WorkerMultiply(a, b, resultReply)) { - case Success(result) => ReplyResult(result.num, replyTo) - case Failure(_) => Ignore() - } - Behaviors.same - case ReplyResult(num, replyTo) => - replyTo ! num - Behaviors.same - case Ignore() => Behaviors.same - } - } - } - - def workerBehavior: Receive[WorkerCommand] = - Behaviors.receiveMessage[WorkerCommand] { msg => - msg match { - case WorkerMultiply(a, b, replyTo) => - workerLatch.await() - replyTo ! WorkerResult(a * b) - Behaviors.stopped - case _ => - throw IllegalActorStateException("worker actor should not receive other message.") - } - } -} - -class DeadLetterSpec extends ScalaTestWithActorTestKit( - """ - pekko.loglevel=DEBUG - pekko.actor.debug.event-stream = on - """) with AnyWordSpecLike with LogCapturing { - - implicit def executor: ExecutionContext = - system.executionContext - - "DeadLetterActor" must { - - "publish dead letter with recipient when context.ask terminated" in new ManualTerminatedTestSetup( - workerLatch = new CountDownLatch(1)) { - val deadLetterProbe = createDeadLetterProbe() - val forwardRef = spawn(forwardBehavior) - val workerRef = spawn(workerBehavior) - - // this will not completed unit worker reply. - val multiplyResult: Future[Int] = forwardRef.ask(replyTo => Multiply(3, 9, workerRef, replyTo)) - // waiting for temporary ask actor terminated with timeout - val result = multiplyResult.failed.futureValue - result shouldBe a[TimeoutException] - result.getMessage should startWith("Ask timed out on") - // unlock worker replying - workerLatch.countDown() - - val deadLetter = deadLetterProbe.receiveMessage() - deadLetter.message shouldBe a[WorkerResult] - val deadLettersRef = system.classicSystem.deadLetters - // that should be not equals, otherwise, it may raise confusion, perform like a dead letter sent to the deadLetterActor. - deadLetter.recipient shouldNot equal(deadLettersRef) - } - - "publish dead letter with recipient when AskPattern timeout" in new ManualTerminatedTestSetup( - workerLatch = new CountDownLatch(1)) { - val deadLetterProbe = createDeadLetterProbe() - val workerRef = spawn(workerBehavior) - - // this not completed unit countDown. - val multiplyResult: Future[WorkerResult] = workerRef.ask(replyTo => WorkerMultiply(3, 9, replyTo)) - // waiting for temporary ask actor terminated with timeout - val result = multiplyResult.failed.futureValue - result shouldBe a[TimeoutException] - result.getMessage should startWith("Ask timed out on") - // unlock worker replying - workerLatch.countDown() - - val deadLetter = deadLetterProbe.receiveMessage() - deadLetter.message shouldBe a[WorkerResult] - val deadLettersRef = system.classicSystem.deadLetters - // that should be not equals, otherwise, it may raise confusion, perform like a dead letter sent to the deadLetterActor. - deadLetter.recipient shouldNot equal(deadLettersRef) - } - } -}