From 79f525bd3cd5fb7f4acb9114fc6830d4c1ca883b Mon Sep 17 00:00:00 2001 From: Guy Youansi Date: Sun, 12 Jul 2020 22:57:42 +0200 Subject: [PATCH] Avoid dead letter logs on event-sourced entity passivation #390 --- .../io/cloudstate/proxy/crud/CrudEntity.scala | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/crud/CrudEntity.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/crud/CrudEntity.scala index 21f00d637..1d9662eec 100644 --- a/proxy/core/src/main/scala/io/cloudstate/proxy/crud/CrudEntity.scala +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/crud/CrudEntity.scala @@ -24,7 +24,7 @@ import akka.actor._ import akka.cluster.sharding.ShardRegion import akka.persistence._ import akka.stream.scaladsl._ -import akka.stream.{Materializer, OverflowStrategy} +import akka.stream.{CompletionStrategy, Materializer, OverflowStrategy} import akka.util.Timeout import com.google.protobuf.any.{Any => pbAny} import io.cloudstate.protocol.crud.CrudAction.Action.{Delete, Update} @@ -75,6 +75,8 @@ final class CrudEntitySupervisor(client: CrudClient, import CrudEntitySupervisor._ + private var streamTerminated: Boolean = false + override final def receive: Receive = PartialFunction.empty override final def preStart(): Unit = { @@ -99,20 +101,38 @@ final class CrudEntitySupervisor(client: CrudClient, context .actorOf(CrudEntity.props(configuration, entityId, relayRef, concurrencyEnforcer, statsCollector), "entity") ) - context.become(forwarding(manager)) + context.become(forwarding(manager, relayRef)) unstashAll() case _ => stash() } - private[this] final def forwarding(manager: ActorRef): Receive = { + private[this] final def forwarding(manager: ActorRef, relay: ActorRef): Receive = { case Terminated(`manager`) => - context.stop(self) + if (streamTerminated) { + context.stop(self) + } else { + relay ! Status.Success(CompletionStrategy.draining) + context.become(stopping) + } case toParent if sender() == manager => context.parent ! toParent + case CrudEntity.StreamClosed => + streamTerminated = true + manager forward CrudEntity.StreamClosed + case failed: CrudEntity.StreamFailed => + streamTerminated = true + manager forward failed case msg => manager forward msg } + private def stopping: Receive = { + case CrudEntity.StreamClosed => + context.stop(self) + case _: CrudEntity.StreamFailed => + context.stop(self) + } + override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy } @@ -121,6 +141,7 @@ object CrudEntity { final case object Stop final case object StreamClosed extends DeadLetterSuppression + final case class StreamFailed(cause: Throwable) extends DeadLetterSuppression final case class Configuration( serviceName: String, @@ -317,7 +338,7 @@ final class CrudEntity(configuration: CrudEntity.Configuration, notifyOutstandingRequests("Unexpected CRUD entity termination") context.stop(self) - case Status.Failure(error) => + case CrudEntity.StreamFailed(error) => notifyOutstandingRequests("Unexpected CRUD entity termination") throw error