Skip to content

Commit

Permalink
Avoid dead letter logs on event-sourced entity passivation cloudstate…
Browse files Browse the repository at this point in the history
  • Loading branch information
ralphlaude committed Jul 12, 2020
1 parent b877e79 commit 79f525b
Showing 1 changed file with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import akka.actor._
import akka.cluster.sharding.ShardRegion
import akka.persistence._
import akka.stream.scaladsl._
import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.{CompletionStrategy, Materializer, OverflowStrategy}
import akka.util.Timeout
import com.google.protobuf.any.{Any => pbAny}
import io.cloudstate.protocol.crud.CrudAction.Action.{Delete, Update}
Expand Down Expand Up @@ -75,6 +75,8 @@ final class CrudEntitySupervisor(client: CrudClient,

import CrudEntitySupervisor._

private var streamTerminated: Boolean = false

override final def receive: Receive = PartialFunction.empty

override final def preStart(): Unit = {
Expand All @@ -99,20 +101,38 @@ final class CrudEntitySupervisor(client: CrudClient,
context
.actorOf(CrudEntity.props(configuration, entityId, relayRef, concurrencyEnforcer, statsCollector), "entity")
)
context.become(forwarding(manager))
context.become(forwarding(manager, relayRef))
unstashAll()
case _ => stash()
}

private[this] final def forwarding(manager: ActorRef): Receive = {
private[this] final def forwarding(manager: ActorRef, relay: ActorRef): Receive = {
case Terminated(`manager`) =>
context.stop(self)
if (streamTerminated) {
context.stop(self)
} else {
relay ! Status.Success(CompletionStrategy.draining)
context.become(stopping)
}
case toParent if sender() == manager =>
context.parent ! toParent
case CrudEntity.StreamClosed =>
streamTerminated = true
manager forward CrudEntity.StreamClosed
case failed: CrudEntity.StreamFailed =>
streamTerminated = true
manager forward failed
case msg =>
manager forward msg
}

private def stopping: Receive = {
case CrudEntity.StreamClosed =>
context.stop(self)
case _: CrudEntity.StreamFailed =>
context.stop(self)
}

override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
}

Expand All @@ -121,6 +141,7 @@ object CrudEntity {
final case object Stop

final case object StreamClosed extends DeadLetterSuppression
final case class StreamFailed(cause: Throwable) extends DeadLetterSuppression

final case class Configuration(
serviceName: String,
Expand Down Expand Up @@ -317,7 +338,7 @@ final class CrudEntity(configuration: CrudEntity.Configuration,
notifyOutstandingRequests("Unexpected CRUD entity termination")
context.stop(self)

case Status.Failure(error) =>
case CrudEntity.StreamFailed(error) =>
notifyOutstandingRequests("Unexpected CRUD entity termination")
throw error

Expand Down

0 comments on commit 79f525b

Please sign in to comment.