Skip to content

Commit

Permalink
unique heartbeat persistenceId
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Sep 20, 2024
1 parent a7b26f2 commit 1be050a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.persistence.r2dbc.query.scaladsl
import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap

import scala.annotation.tailrec
Expand Down Expand Up @@ -114,6 +115,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat

// key is tuple of entity type and slice
private val heartbeatPersistenceIds = new ConcurrentHashMap[(String, Int), String]()
private val heartbeatUuid = UUID.randomUUID().toString
log.debug("Using heartbeat UUID [{}]", heartbeatUuid)

private def heartbeatPersistenceId(entityType: String, slice: Int): String = {
val key = entityType -> slice
Expand All @@ -129,7 +132,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat

@tailrec private def generateHeartbeatPersistenceId(entityType: String, slice: Int, n: Int = 1): String = {
if (n < 1000000) {
val pid = PersistenceId.concat(entityType, s"_ht-$n")
// including a uuid to make sure it is not the same as any persistence id of the application
val pid = PersistenceId.concat(entityType, s"_hb-$heartbeatUuid-$n")
if (persistenceExt.sliceForPersistenceId(pid) == slice)
pid
else
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object Dependencies {
val Scala3 = "3.3.3"
val Scala2Versions = Seq(Scala213)
val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3
val AkkaVersion = System.getProperty("override.akka.version", "2.9.4")
val AkkaVersion = System.getProperty("override.akka.version", "2.9.5")
val AkkaVersionInDocs = VersionNumber(AkkaVersion).numbers match { case Seq(major, minor, _*) => s"$major.$minor" }
val AkkaPersistenceJdbcVersion = "5.4.0" // only in migration tool tests
val AkkaProjectionVersionInDocs = "current"
Expand Down

0 comments on commit 1be050a

Please sign in to comment.