Skip to content

Commit

Permalink
fix: resolves and addition tests for child preStart supervise (#1385)
Browse files Browse the repository at this point in the history
* chore: add unit test for child actor initial exception supervise

* make assertion correct

* trying to fix assertion failed

* distinguish between fatal and normal fault

* fix unit tests

* trying to fix

* fix NPE

* revert isFailed condition

* revert isFailed place

* additional tests
  • Loading branch information
Roiocam authored Jul 15, 2024
1 parent 0c2eb9f commit 73c9362
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,33 @@ class ActorLifeCycleSpec extends PekkoSpec with BeforeAndAfterEach with Implicit
a ! "hello"
expectMsg(42)
}

"not break supervisor strategy due to unhandled exception in preStart" in {
val id = newUuid.toString
val gen = new AtomicInteger(0)
val maxRetryNum = 3

val childProps = Props(new LifeCycleTestActor(testActor, id, gen) {
override def preStart(): Unit = {
report("preStart")
throw new Exception("test exception")
}
}).withDeploy(Deploy.local)

val supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = maxRetryNum, timeout.duration) {
case _: ActorInitializationException => SupervisorStrategy.Restart
case _ => SupervisorStrategy.Escalate
}
val supervisor = system.actorOf(Props(classOf[Supervisor], supervisorStrategy))
Await.result((supervisor ? childProps).mapTo[ActorRef], timeout.duration)

(0 to maxRetryNum).foreach(i => {
expectMsg(("preStart", id, i))
})
expectNoMessage()
system.stop(supervisor)
}
}

"have a non null context after termination" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ object SupervisorSpec {
}
}

class Child(cnt: Int) extends Actor {
override def preStart(): Unit = {
if (cnt == 0) throw new RuntimeException("deliberate test failure")
}

def receive = {
case PingMessage =>
sender() ! PongMessage
}
}

def creator(target: ActorRef, fail: Boolean = false) = {
val p = Props(new Creator(target))
if (fail) p.withMailbox("error-mailbox") else p
Expand Down Expand Up @@ -532,6 +543,31 @@ class SupervisorSpec
}

}

"supervise exceptions on child actor initialize" in {
val parent = system.actorOf(Props(new Actor {
val cnt: AtomicInteger = new AtomicInteger(0)
var childRef: ActorRef = _

override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _: ActorInitializationException => SupervisorStrategy.Restart
case _ => SupervisorStrategy.Stop
}

override def preStart(): Unit = {
childRef = context.actorOf(Props(new Child(cnt.getAndIncrement())), "child")
}

def receive = {
case msg if msg == PingMessage && childRef != null =>
childRef.tell(PingMessage, sender())
}
}), "parent")

val probe = TestProbe()
probe.send(parent, PingMessage)
probe.expectMsg(PongMessage)
}
}

"restarts a child infinitely if maxNrOfRetries = -1 and withinTimeRange = Duration.Inf" in {
Expand Down
4 changes: 3 additions & 1 deletion actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,6 @@ private[pekko] class ActorCell(
def failActor(): Unit =
if (_actor != null) {
clearActorFields(actor, recreate = false)
setFailedFatally()
_actor = null // ensure that we know that we failed during creation
}

Expand All @@ -657,10 +656,13 @@ private[pekko] class ActorCell(
publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
case e: InterruptedException =>
setFailedFatally()
failActor()
Thread.currentThread().interrupt()
throw ActorInitializationException(self, "interruption during creation", e)
case NonFatal(e) =>
if (actor == null) setFailed(system.deadLetters)
else setFailed(actor.self)
failActor()
e match {
case i: InstantiationException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[pekko] trait FaultHandling { this: ActorCell =>
case FailedRef(ref) => ref
case _ => null
}
private def setFailed(perpetrator: ActorRef): Unit = _failed = _failed match {
protected def setFailed(perpetrator: ActorRef): Unit = _failed = _failed match {
case FailedFatally => FailedFatally
case _ => FailedRef(perpetrator)
}
Expand Down Expand Up @@ -214,34 +214,34 @@ private[pekko] trait FaultHandling { this: ActorCell =>
@InternalStableApi
final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = {
// prevent any further messages to be processed until the actor has been restarted
if (!isFailed)
try {
suspendNonRecursive()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
case Envelope(Failed(_, _, _), child) => setFailed(child); Set(child)
case _ => setFailed(self); Set.empty
}
suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
t match {
// tell supervisor
case _: InterruptedException =>
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
case _ =>
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(Failed(self, t, uid))
}
} catch handleNonFatalOrInterruptedException { e =>
publish(
Error(
e,
self.path.toString,
clazz(actor),
"emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
try children.foreach(stop)
finally finishTerminate()
}
try {
suspendNonRecursive()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
case Envelope(Failed(_, _, _), child) if !isFailed => setFailed(child); Set(child)
case _ if !isFailed => setFailed(self); Set.empty
case _ => Set.empty
}
suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
t match {
// tell supervisor
case _: InterruptedException =>
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
case _ =>
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(Failed(self, t, uid))
}
} catch handleNonFatalOrInterruptedException { e =>
publish(
Error(
e,
self.path.toString,
clazz(actor),
"emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
try children.foreach(stop)
finally finishTerminate()
}
}

private def finishTerminate(): Unit = {
Expand Down

0 comments on commit 73c9362

Please sign in to comment.