From 8827aca5edd4764fb709328011c30d8c9baf831c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 4 Dec 2024 07:28:04 +0100 Subject: [PATCH] chore: More information in query log prefix (#112) * and some more debug logging --- .../dynamodb/internal/BySliceQuery.scala | 27 +++++++------------ .../query/scaladsl/DynamoDBReadJournal.scala | 27 ++++++++++++++----- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala index de83618..05ee3e3 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala @@ -150,10 +150,9 @@ import org.slf4j.Logger if (state.queryCount != 0 && log.isDebugEnabled()) log.debug( - "{} next query [{}] from slice [{}], between time [{} - {}]. Found [{}] items in previous query.", + "{} next query [{}], between time [{} - {}]. Found [{}] items in previous query.", logPrefix, state.queryCount, - slice, state.latest.timestamp, toTimestamp, state.itemCount) @@ -168,10 +167,9 @@ import org.slf4j.Logger } else { if (log.isDebugEnabled) log.debug( - "{} query [{}] from slice [{}] completed. Found [{}] items in previous query.", + "{} query [{}] completed. Found [{}] items in previous query.", logPrefix, state.queryCount, - slice, state.itemCount) state -> None @@ -180,12 +178,7 @@ import org.slf4j.Logger val currentTimestamp = InstantFactory.now() if (log.isDebugEnabled()) - log.debug( - "{} query slice [{}], from time [{}] until now [{}].", - logPrefix, - slice, - initialOffset.timestamp, - currentTimestamp) + log.debug("{} query from time [{}] until now [{}].", logPrefix, initialOffset.timestamp, currentTimestamp) ContinuousQuery[QueryState, Envelope]( initialState = QueryState.empty.copy(latest = initialOffset), @@ -205,7 +198,7 @@ import org.slf4j.Logger val initialOffset = toTimestampOffset(offset) if (log.isDebugEnabled()) - log.debug("Starting {} query from slice [{}], from time [{}].", logPrefix, slice, initialOffset.timestamp) + log.debug("{} starting query from time [{}].", logPrefix, initialOffset.timestamp) def nextOffset(state: QueryState, envelope: Envelope): QueryState = { if (EnvelopeOrigin.isHeartbeatEvent(envelope)) state @@ -256,12 +249,7 @@ import org.slf4j.Logger if (log.isDebugEnabled) delay.foreach { d => - log.debug( - "{} query [{}] from slice [{}] delay next [{}] ms.", - logPrefix, - state.queryCount, - slice, - d.toMillis) + log.debug("{} query [{}] delay next [{}] ms.", logPrefix, state.queryCount, d.toMillis) } delay @@ -413,7 +401,10 @@ import org.slf4j.Logger val timestamp = state.startTimestamp.plus( JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime))) - createHeartbeat(timestamp) + val h = createHeartbeat(timestamp) + if (h.isDefined) + log.debug("{} heartbeat timestamp [{}]", logPrefix, timestamp) + h } else None } diff --git a/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala b/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala index fd95303..1f6e2ec 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala @@ -219,6 +219,8 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg toSequenceNr: Long, includeDeleted: Boolean): Source[SerializedJournalItem, NotUsed] = { + log.debug("[{}] eventsByPersistenceId from seqNr [{}] to [{}]", persistenceId, fromSequenceNr, toSequenceNr) + queryDao.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr, includeDeleted) } @@ -244,7 +246,11 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg offset: Offset): Source[EventEnvelope[Event], NotUsed] = { val bySliceQueries = (minSlice to maxSlice).map { slice => bySlice[Event](entityType, slice) - .currentBySlice("currentEventsBySlices", entityType, slice, sliceStartOffset(slice, offset)) + .currentBySlice( + s"[$entityType] currentEventsBySlice [$slice]: ", + entityType, + slice, + sliceStartOffset(slice, offset)) } require(bySliceQueries.nonEmpty, s"maxSlice [$maxSlice] must be >= minSlice [$minSlice]") @@ -286,7 +292,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg val bySliceQueries = (minSlice to maxSlice).map { slice => bySlice[Event](entityType, slice).liveBySlice( - "eventsBySlices", + s"[$entityType] eventsBySlice [$slice]: ", entityType, slice, sliceStartOffset(slice, offset)) @@ -333,7 +339,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg val timestampOffset = TimestampOffset.toTimestampOffset(sliceStartOffset(slice, offset)) val snapshotSource = snapshotsBySlice[Snapshot, Event](entityType, slice, transformSnapshot) - .currentBySlice("currentSnapshotsBySlice", entityType, slice, timestampOffset) + .currentBySlice(s"[$entityType] currentSnapshotsBySlice [$slice]: ", entityType, slice, timestampOffset) Source.fromGraph( new StartingFromSnapshotStage[Event]( @@ -357,7 +363,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg snapshotOffsets.size) bySlice[Event](entityType, slice).currentBySlice( - "currentEventsBySlice", + s"[$entityType] currentEventsBySlice [$slice]: ", entityType, slice, initOffset, @@ -395,7 +401,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg val timestampOffset = TimestampOffset.toTimestampOffset(sliceStartOffset(slice, offset)) val snapshotSource = snapshotsBySlice[Snapshot, Event](entityType, slice, transformSnapshot) - .currentBySlice("snapshotsBySlice", entityType, slice, timestampOffset) + .currentBySlice(s"[$entityType] snapshotsBySlice [$slice]: ", entityType, slice, timestampOffset) Source.fromGraph( new StartingFromSnapshotStage[Event]( @@ -419,7 +425,7 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg snapshotOffsets.size) bySlice[Event](entityType, slice).liveBySlice( - "eventsBySlice", + s"[$entityType] eventsBySlice [$slice]: ", entityType, slice, initOffset, @@ -624,11 +630,18 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg // EventTimestampQuery override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = { - queryDao.timestampOfEvent(persistenceId, sequenceNr) + val result = queryDao.timestampOfEvent(persistenceId, sequenceNr) + if (log.isDebugEnabled) { + result.foreach { t => + log.debug("[{}] timestampOf seqNr [{}] is [{}]", persistenceId, sequenceNr, t) + } + } + result } //LoadEventQuery override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]] = { + log.debug("[{}] loadEnvelope seqNr [{}]", persistenceId, sequenceNr) queryDao .loadEvent(persistenceId, sequenceNr, includePayload = true) .map {