From f125c73559957f7eedb8432864c167551cb9c6da Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 20 Sep 2024 11:54:29 +0200 Subject: [PATCH] unique heartbeat persistenceId --- .../persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 314e780c..32aa691f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -7,6 +7,7 @@ package akka.persistence.r2dbc.query.scaladsl import java.time.Clock import java.time.Instant import java.time.{ Duration => JDuration } +import java.util.UUID import java.util.concurrent.ConcurrentHashMap import scala.annotation.tailrec @@ -113,6 +114,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat // key is tuple of entity type and slice private val heartbeatPersistenceIds = new ConcurrentHashMap[(String, Int), String]() + private val heartbeatUuid = UUID.randomUUID().toString + log.debug("Using heartbeat UUID [{}]", heartbeatUuid) private def heartbeatPersistenceId(entityType: String, slice: Int): String = { val key = entityType -> slice @@ -128,7 +131,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat @tailrec private def generateHeartbeatPersistenceId(entityType: String, slice: Int, n: Int = 1): String = { if (n < 1000000) { - val pid = PersistenceId.concat(entityType, s"_ht-$n") + // including a uuid to make sure it is not the same as any persistence id of the application + val pid = PersistenceId.concat(entityType, s"_hb-$heartbeatUuid-$n") if (persistenceExt.sliceForPersistenceId(pid) == slice) pid else