Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: resolves and addition tests for child preStart supervise (#1385) #1396

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -160,6 +160,33 @@ class ActorLifeCycleSpec extends PekkoSpec with BeforeAndAfterEach with Implicit
a ! "hello"
expectMsg(42)
}

"not break supervisor strategy due to unhandled exception in preStart" in {
val id = newUuid.toString
val gen = new AtomicInteger(0)
val maxRetryNum = 3

val childProps = Props(new LifeCycleTestActor(testActor, id, gen) {
override def preStart(): Unit = {
report("preStart")
throw new Exception("test exception")
}
}).withDeploy(Deploy.local)

val supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = maxRetryNum, timeout.duration) {
case _: ActorInitializationException => SupervisorStrategy.Restart
case _ => SupervisorStrategy.Escalate
}
val supervisor = system.actorOf(Props(classOf[Supervisor], supervisorStrategy))
Await.result((supervisor ? childProps).mapTo[ActorRef], timeout.duration)

(0 to maxRetryNum).foreach(i => {
expectMsg(("preStart", id, i))
})
expectNoMessage()
system.stop(supervisor)
}
}

"have a non null context after termination" in {
Original file line number Diff line number Diff line change
@@ -95,6 +95,17 @@ object SupervisorSpec {
}
}

class Child(cnt: Int) extends Actor {
override def preStart(): Unit = {
if (cnt == 0) throw new RuntimeException("deliberate test failure")
}

def receive = {
case PingMessage =>
sender() ! PongMessage
}
}

def creator(target: ActorRef, fail: Boolean = false) = {
val p = Props(new Creator(target))
if (fail) p.withMailbox("error-mailbox") else p
@@ -532,6 +543,31 @@ class SupervisorSpec
}

}

"supervise exceptions on child actor initialize" in {
val parent = system.actorOf(Props(new Actor {
val cnt: AtomicInteger = new AtomicInteger(0)
var childRef: ActorRef = _

override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _: ActorInitializationException => SupervisorStrategy.Restart
case _ => SupervisorStrategy.Stop
}

override def preStart(): Unit = {
childRef = context.actorOf(Props(new Child(cnt.getAndIncrement())), "child")
}

def receive = {
case msg if msg == PingMessage && childRef != null =>
childRef.tell(PingMessage, sender())
}
}), "parent")

val probe = TestProbe()
probe.send(parent, PingMessage)
probe.expectMsg(PongMessage)
}
}

"restarts a child infinitely if maxNrOfRetries = -1 and withinTimeRange = Duration.Inf" in {
4 changes: 3 additions & 1 deletion actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala
Original file line number Diff line number Diff line change
@@ -643,7 +643,6 @@ private[pekko] class ActorCell(
def failActor(): Unit =
if (_actor != null) {
clearActorFields(actor, recreate = false)
setFailedFatally()
_actor = null // ensure that we know that we failed during creation
}

@@ -657,10 +656,13 @@ private[pekko] class ActorCell(
publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
case e: InterruptedException =>
setFailedFatally()
failActor()
Thread.currentThread().interrupt()
throw ActorInitializationException(self, "interruption during creation", e)
case NonFatal(e) =>
if (actor == null) setFailed(system.deadLetters)
else setFailed(actor.self)
failActor()
e match {
case i: InstantiationException =>
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ private[pekko] trait FaultHandling { this: ActorCell =>
case FailedRef(ref) => ref
case _ => null
}
private def setFailed(perpetrator: ActorRef): Unit = _failed = _failed match {
protected def setFailed(perpetrator: ActorRef): Unit = _failed = _failed match {
case FailedFatally => FailedFatally
case _ => FailedRef(perpetrator)
}
@@ -214,34 +214,34 @@ private[pekko] trait FaultHandling { this: ActorCell =>
@InternalStableApi
final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = {
// prevent any further messages to be processed until the actor has been restarted
if (!isFailed)
try {
suspendNonRecursive()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
case Envelope(Failed(_, _, _), child) => setFailed(child); Set(child)
case _ => setFailed(self); Set.empty
}
suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
t match {
// tell supervisor
case _: InterruptedException =>
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
case _ =>
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(Failed(self, t, uid))
}
} catch handleNonFatalOrInterruptedException { e =>
publish(
Error(
e,
self.path.toString,
clazz(actor),
"emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
try children.foreach(stop)
finally finishTerminate()
}
try {
suspendNonRecursive()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
case Envelope(Failed(_, _, _), child) if !isFailed => setFailed(child); Set(child)
case _ if !isFailed => setFailed(self); Set.empty
case _ => Set.empty
}
suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
t match {
// tell supervisor
case _: InterruptedException =>
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
case _ =>
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(Failed(self, t, uid))
}
} catch handleNonFatalOrInterruptedException { e =>
publish(
Error(
e,
self.path.toString,
clazz(actor),
"emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
try children.foreach(stop)
finally finishTerminate()
}
}

private def finishTerminate(): Unit = {