Skip to content

Commit

Permalink
fix: don't trigger heartbeats from idle backtracking (#630)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter authored Jan 9, 2025
1 parent b1c2070 commit b3f1fca
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ import org.slf4j.Logger
startTimestamp = Instant.EPOCH,
startWallClock = Instant.EPOCH,
currentQueryWallClock = Instant.EPOCH,
previousQueryWallClock = Instant.EPOCH)
previousQueryWallClock = Instant.EPOCH,
idleCountBeforeHeartbeat = 0)
}

final case class QueryState(
Expand All @@ -69,7 +70,8 @@ import org.slf4j.Logger
startTimestamp: Instant,
startWallClock: Instant,
currentQueryWallClock: Instant,
previousQueryWallClock: Instant) {
previousQueryWallClock: Instant,
idleCountBeforeHeartbeat: Long) {

def backtracking: Boolean = backtrackingCount > 0

Expand Down Expand Up @@ -435,6 +437,10 @@ import org.slf4j.Logger

def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0
val newIdleCountBeforeHeartbeat =
if (state.backtracking) state.idleCountBeforeHeartbeat
else if (state.rowCount == 0) state.idleCountBeforeHeartbeat + 1
else 0
// only start tracking query wall clock (for heartbeats) after initial backtracking query
val newQueryWallClock = if (state.latestBacktracking != TimestampOffset.Zero) clock.instant() else Instant.EPOCH
val newState =
Expand All @@ -455,7 +461,8 @@ import org.slf4j.Logger
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
} else if (switchFromBacktracking(state)) {
// switching from backtracking
state.copy(
Expand All @@ -465,7 +472,8 @@ import org.slf4j.Logger
idleCount = newIdleCount,
backtrackingCount = 0,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
} else {
// continue
val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0
Expand All @@ -477,7 +485,8 @@ import org.slf4j.Logger
backtrackingCount = newBacktrackingCount,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
}

val behindCurrentTime =
Expand Down Expand Up @@ -533,7 +542,7 @@ import org.slf4j.Logger
}

def heartbeat(state: QueryState): Option[Envelope] = {
if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) {
if (state.idleCountBeforeHeartbeat >= 2 && state.previousQueryWallClock != Instant.EPOCH) {
// using wall clock to measure duration since the start time (database timestamp) up to idle backtracking limit
val timestamp = state.startTimestamp.plus(
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,16 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
env.sequenceNr)
Nil
} else {
if (log.isDebugEnabled()) {
if (latestBacktracking.isAfter(t.timestamp))
log.debug(
"Event from query for persistenceId [{}] seqNr [{}] timestamp [{}]" +
" was before latest timestamp from backtracking or heartbeat [{}].",
env.persistenceId,
env.sequenceNr,
t.timestamp,
latestBacktracking)
}
env :: Nil
}
case _ =>
Expand Down

0 comments on commit b3f1fca

Please sign in to comment.