From e5cdce1b85043dfe8c623eae2875d086b9364abc Mon Sep 17 00:00:00 2001 From: "AndyChen(Jingzhang)" Date: Tue, 16 Jul 2024 15:49:33 +0800 Subject: [PATCH] fix: resolves and addition tests for child preStart supervise (#1385) (#1396) * chore: add unit test for child actor initial exception supervise * make assertion correct * trying to fix assertion failed * distinguish between fatal and normal fault * fix unit tests * trying to fix * fix NPE * revert isFailed condition * revert isFailed place * additional tests --- .../pekko/actor/ActorLifeCycleSpec.scala | 27 +++++++++ .../apache/pekko/actor/SupervisorSpec.scala | 36 ++++++++++++ .../org/apache/pekko/actor/ActorCell.scala | 4 +- .../pekko/actor/dungeon/FaultHandling.scala | 58 +++++++++---------- 4 files changed, 95 insertions(+), 30 deletions(-) diff --git a/actor-tests/src/test/scala/org/apache/pekko/actor/ActorLifeCycleSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/actor/ActorLifeCycleSpec.scala index da5c03875f1..fe0dd5a139e 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/actor/ActorLifeCycleSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/actor/ActorLifeCycleSpec.scala @@ -160,6 +160,33 @@ class ActorLifeCycleSpec extends PekkoSpec with BeforeAndAfterEach with Implicit a ! "hello" expectMsg(42) } + + "not break supervisor strategy due to unhandled exception in preStart" in { + val id = newUuid.toString + val gen = new AtomicInteger(0) + val maxRetryNum = 3 + + val childProps = Props(new LifeCycleTestActor(testActor, id, gen) { + override def preStart(): Unit = { + report("preStart") + throw new Exception("test exception") + } + }).withDeploy(Deploy.local) + + val supervisorStrategy: SupervisorStrategy = + OneForOneStrategy(maxNrOfRetries = maxRetryNum, timeout.duration) { + case _: ActorInitializationException => SupervisorStrategy.Restart + case _ => SupervisorStrategy.Escalate + } + val supervisor = system.actorOf(Props(classOf[Supervisor], supervisorStrategy)) + Await.result((supervisor ? childProps).mapTo[ActorRef], timeout.duration) + + (0 to maxRetryNum).foreach(i => { + expectMsg(("preStart", id, i)) + }) + expectNoMessage() + system.stop(supervisor) + } } "have a non null context after termination" in { diff --git a/actor-tests/src/test/scala/org/apache/pekko/actor/SupervisorSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/actor/SupervisorSpec.scala index 5d16c504ef6..22801ed2e1d 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/actor/SupervisorSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/actor/SupervisorSpec.scala @@ -95,6 +95,17 @@ object SupervisorSpec { } } + class Child(cnt: Int) extends Actor { + override def preStart(): Unit = { + if (cnt == 0) throw new RuntimeException("deliberate test failure") + } + + def receive = { + case PingMessage => + sender() ! PongMessage + } + } + def creator(target: ActorRef, fail: Boolean = false) = { val p = Props(new Creator(target)) if (fail) p.withMailbox("error-mailbox") else p @@ -532,6 +543,31 @@ class SupervisorSpec } } + + "supervise exceptions on child actor initialize" in { + val parent = system.actorOf(Props(new Actor { + val cnt: AtomicInteger = new AtomicInteger(0) + var childRef: ActorRef = _ + + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { + case _: ActorInitializationException => SupervisorStrategy.Restart + case _ => SupervisorStrategy.Stop + } + + override def preStart(): Unit = { + childRef = context.actorOf(Props(new Child(cnt.getAndIncrement())), "child") + } + + def receive = { + case msg if msg == PingMessage && childRef != null => + childRef.tell(PingMessage, sender()) + } + }), "parent") + + val probe = TestProbe() + probe.send(parent, PingMessage) + probe.expectMsg(PongMessage) + } } "restarts a child infinitely if maxNrOfRetries = -1 and withinTimeRange = Duration.Inf" in { diff --git a/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala b/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala index 986b2182e0b..490ba64a117 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala @@ -643,7 +643,6 @@ private[pekko] class ActorCell( def failActor(): Unit = if (_actor != null) { clearActorFields(actor, recreate = false) - setFailedFatally() _actor = null // ensure that we know that we failed during creation } @@ -657,10 +656,13 @@ private[pekko] class ActorCell( publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { case e: InterruptedException => + setFailedFatally() failActor() Thread.currentThread().interrupt() throw ActorInitializationException(self, "interruption during creation", e) case NonFatal(e) => + if (actor == null) setFailed(system.deadLetters) + else setFailed(actor.self) failActor() e match { case i: InstantiationException => diff --git a/actor/src/main/scala/org/apache/pekko/actor/dungeon/FaultHandling.scala b/actor/src/main/scala/org/apache/pekko/actor/dungeon/FaultHandling.scala index 2e57945a8cf..c952b685b35 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/dungeon/FaultHandling.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/dungeon/FaultHandling.scala @@ -76,7 +76,7 @@ private[pekko] trait FaultHandling { this: ActorCell => case FailedRef(ref) => ref case _ => null } - private def setFailed(perpetrator: ActorRef): Unit = _failed = _failed match { + protected def setFailed(perpetrator: ActorRef): Unit = _failed = _failed match { case FailedFatally => FailedFatally case _ => FailedRef(perpetrator) } @@ -214,34 +214,34 @@ private[pekko] trait FaultHandling { this: ActorCell => @InternalStableApi final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = { // prevent any further messages to be processed until the actor has been restarted - if (!isFailed) - try { - suspendNonRecursive() - // suspend children - val skip: Set[ActorRef] = currentMessage match { - case Envelope(Failed(_, _, _), child) => setFailed(child); Set(child) - case _ => setFailed(self); Set.empty - } - suspendChildren(exceptFor = skip ++ childrenNotToSuspend) - t match { - // tell supervisor - case _: InterruptedException => - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid)) - case _ => - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - parent.sendSystemMessage(Failed(self, t, uid)) - } - } catch handleNonFatalOrInterruptedException { e => - publish( - Error( - e, - self.path.toString, - clazz(actor), - "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t))) - try children.foreach(stop) - finally finishTerminate() - } + try { + suspendNonRecursive() + // suspend children + val skip: Set[ActorRef] = currentMessage match { + case Envelope(Failed(_, _, _), child) if !isFailed => setFailed(child); Set(child) + case _ if !isFailed => setFailed(self); Set.empty + case _ => Set.empty + } + suspendChildren(exceptFor = skip ++ childrenNotToSuspend) + t match { + // tell supervisor + case _: InterruptedException => + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid)) + case _ => + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + parent.sendSystemMessage(Failed(self, t, uid)) + } + } catch handleNonFatalOrInterruptedException { e => + publish( + Error( + e, + self.path.toString, + clazz(actor), + "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t))) + try children.foreach(stop) + finally finishTerminate() + } } private def finishTerminate(): Unit = {