Skip to content

Commit

Permalink
suggestions from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Aug 2, 2024
1 parent 3203ac8 commit f2ace61
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ import org.slf4j.Logger
}
}

private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] = {
if (offset.seen.isEmpty) None else Some(offset.seen.values.max)
}
private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] =
Option.when(offset.seen.nonEmpty)(offset.seen.values.max)

object Buckets {
type EpochSeconds = Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
maxSlice: Int): String = {
// not caching, too many combinations

// If more events than the buffer size are all on the same timestamp, then the query will get stuck on that same
// timestamp. Avoid this by also starting from the highest seen sequence number for that timestamp, using the fact
// that events are ordered by db_timestamp, seq_nr. Note that sequence numbers are per persistence id, so a later
// timestamp can have an earlier sequence number. Add a logical conditional only when db_timestamp = fromTimestamp
// to also filter for seq_nr >= fromSeqNr. Expressed in a logically equivalent form, where A -> B === ~A v B.
def fromSeqNrParamCondition =
if (fromSeqNrParam) "AND (db_timestamp != ? OR seq_nr >= ?)" else ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,27 @@ class EventsBySliceSpec
val pid1 = nextPid(entityType)
val pid2 = nextPid(entityType)
val pid3 = nextPid(entityType)
val pid4 = nextPid(entityType)
val slice1 = query.sliceForPersistenceId(pid1)
val slice2 = query.sliceForPersistenceId(pid2)
val slice3 = query.sliceForPersistenceId(pid3)
val slices = Seq(slice1, slice2, slice3)

val timestamp = InstantFactory.now()
writeEvent(slice1, pid1, 1L, timestamp, "A1")
writeEvent(slice1, pid1, 2L, timestamp, "A2")
writeEvent(slice1, pid1, 3L, timestamp, "A3")
writeEvent(slice1, pid1, 4L, timestamp, "A4")
writeEvent(slice1, pid1, 5L, timestamp, "A5")
writeEvent(slice1, pid1, 6L, timestamp, "A6")
writeEvent(slice2, pid2, 3L, timestamp, "B3")
writeEvent(slice2, pid2, 4L, timestamp, "B4")
writeEvent(slice3, pid3, 3L, timestamp, "C3")
val slice4 = query.sliceForPersistenceId(pid4)
val slices = Seq(slice1, slice2, slice3, slice4)
val t1 = InstantFactory.now()
val t2 = t1.plusMillis(1)

writeEvent(slice1, pid1, 1L, t1, "A1")
writeEvent(slice1, pid1, 2L, t1, "A2")
writeEvent(slice1, pid1, 3L, t1, "A3")
writeEvent(slice1, pid1, 4L, t1, "A4")
writeEvent(slice1, pid1, 5L, t1, "A5")
writeEvent(slice1, pid1, 6L, t1, "A6")
writeEvent(slice2, pid2, 3L, t1, "B3")
writeEvent(slice2, pid2, 4L, t1, "B4")
writeEvent(slice3, pid3, 3L, t1, "C3")
writeEvent(slice4, pid4, 1L, t2, "D1")
writeEvent(slice4, pid4, 2L, t2, "D2")
writeEvent(slice4, pid4, 3L, t2, "D3")

val queryWithSmallBuffer = PersistenceQuery(testKit.system) // buffer size = 4
.readJournalFor[R2dbcReadJournal]("akka.persistence.r2dbc-small-buffer.query")
Expand All @@ -259,15 +265,20 @@ class EventsBySliceSpec
val result: TestSubscriber.Probe[EventEnvelope[String]] =
doQuery(entityType, slices.min, slices.max, NoOffset, queryWithSmallBuffer)
.runWith(sinkProbe)
.request(10)
.request(15)

def take(n: Int): Set[String] =
(1 to n).map(_ => result.expectNext().event).toSet

take(2) shouldBe Set("A1", "A2")
take(1) shouldBe Set("A1")
take(1) shouldBe Set("A2")
take(3) shouldBe Set("A3", "B3", "C3")
take(2) shouldBe Set("A4", "B4")
take(2) shouldBe Set("A5", "A6")
take(1) shouldBe Set("A5")
take(1) shouldBe Set("A6")
take(1) shouldBe Set("D1")
take(1) shouldBe Set("D2")
take(1) shouldBe Set("D3")

assertFinished(result)
}
Expand Down

0 comments on commit f2ace61

Please sign in to comment.