Skip to content

Commit

Permalink
More logging around rebalancing when rebalanceSafeCommits is true (#1360
Browse files Browse the repository at this point in the history
)

Follow up of #1358 

See also #1132

---------

Co-authored-by: Erik van Oosten <[email protected]>
  • Loading branch information
svroonland and erikvanoosten authored Nov 10, 2024
1 parent 16a4482 commit 37ad5e7
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
clientId = clientId,
groupId = Some(groupId),
`max.poll.records` = 1,
rebalanceSafeCommits = rebalanceSafeCommits
rebalanceSafeCommits = rebalanceSafeCommits,
maxRebalanceDuration = 60.seconds
)
consumer <- Consumer.make(settings)
} yield consumer
Expand Down
126 changes: 95 additions & 31 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ private[consumer] final class Runloop private (
): Task[Unit] = {
val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos

def timeToDeadlineMillis(): Long = (deadline - java.lang.System.nanoTime()) / 1000000L

val endingTps = streamsToEnd.map(_.tp).toSet

def commitsOfEndingStreams(commits: Chunk[Runloop.Commit]): Chunk[Runloop.Commit] =
Expand All @@ -130,36 +132,92 @@ private[consumer] final class Runloop private (
ZIO.attempt(consumer.commitAsync(java.util.Collections.emptyMap(), null)).orDie
}

def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] =
sealed trait EndOffsetCommitStatus
case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = "not committed" }
case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = "commit pending" }
case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = "committed" }

final case class StreamCompletionStatus(
tp: TopicPartition,
streamEnded: Boolean,
lastPulledOffset: Option[Long],
endOffsetCommitStatus: EndOffsetCommitStatus
) {
override def toString: String =
s"${tp}: " +
s"${if (streamEnded) "stream ended" else "stream is running"}, " +
s"last pulled offset=${lastPulledOffset.getOrElse("none")}, " +
endOffsetCommitStatus
}

def completionStatusesAsString(completionStatuses: Chunk[StreamCompletionStatus]): String =
"Revoked partitions: " + completionStatuses.map(_.toString).mkString("; ")

def getStreamCompletionStatuses(newCommits: Chunk[Commit]): UIO[Chunk[StreamCompletionStatus]] =
for {
committedOffsets <- committedOffsetsRef.get
allPendingCommitOffsets =
(previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets).map {
case (tp, offsetAndMetadata) => (tp, offsetAndMetadata.offset())
}
streamResults <-
ZIO.foreach(streamsToEnd) { stream =>
for {
isDone <- stream.completedPromise.isDone
lastPulledOffset <- stream.lastPulledOffset
endOffset <- if (isDone) stream.completedPromise.await else ZIO.none
} yield (isDone || lastPulledOffset.isEmpty, endOffset)
}
committedOffsets <- committedOffsetsRef.get
} yield {
val allStreamsCompleted = streamResults.forall(_._1)
allStreamsCompleted && {
val endOffsets: Chunk[Offset] = streamResults.flatMap(_._2)
val allPendingCommits = previousPendingCommits ++ commitsOfEndingStreams(newCommits)
endOffsets.forall { endOffset =>
val tp = endOffset.topicPartition
val offset = endOffset.offset
def endOffsetWasCommitted = committedOffsets.contains(tp, offset)
def endOffsetCommitIsPending = allPendingCommits.exists { pendingCommit =>
pendingCommit.offsets.get(tp).exists { pendingOffset =>
pendingOffset.offset() >= offset
}
}
endOffsetWasCommitted || endOffsetCommitIsPending

endOffsetCommitStatus =
endOffset match {
case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) =>
EndOffsetCommitted
case Some(endOffset) if allPendingCommitOffsets.contains((stream.tp, endOffset.offset)) =>
EndOffsetCommitPending
case _ => EndOffsetNotCommitted
}
} yield StreamCompletionStatus(stream.tp, isDone, lastPulledOffset.map(_.offset), endOffsetCommitStatus)
}
}
} yield streamResults

@inline
def logStreamCompletionStatuses(completionStatuses: Chunk[StreamCompletionStatus]): UIO[Unit] = {
val statusStrings = completionStatusesAsString(completionStatuses)
ZIO.logInfo(
s"Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " +
s"the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings"
)
}

def logInitialStreamCompletionStatuses: UIO[Unit] =
for {
completionStatuses <- getStreamCompletionStatuses(newCommits = Chunk.empty)
_ <- logStreamCompletionStatuses(completionStatuses)
} yield ()

def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): UIO[Boolean] =
for {
completionStatuses <- getStreamCompletionStatuses(newCommits)
_ <- logStreamCompletionStatuses(completionStatuses)
} yield completionStatuses.forall { status =>
// A stream is complete when it never got any records, or when it committed the offset of the last consumed record
status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted)
}

def logFinalStreamCompletionStatuses(completed: Boolean, newCommits: Chunk[Commit]): UIO[Unit] =
if (completed)
ZIO.logInfo("Continuing rebalance, all offsets of consumed records in the revoked partitions were committed.")
else
for {
completionStatuses <- getStreamCompletionStatuses(newCommits)
statusStrings = completionStatusesAsString(completionStatuses)
_ <-
ZIO.logWarning(
s"Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " +
s"the records they consumed; the rebalance will continue. " +
s"This might cause another consumer to process some records again. $statusStrings"
)
} yield ()

def commitSync: Task[Unit] =
ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout))

Expand All @@ -179,17 +237,23 @@ private[consumer] final class Runloop private (
//
// Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty.
// Instead, we poll the queue in a loop.
ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end") *>
ZStream
.fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll)
.tap(commitAsync)
.forever
.takeWhile(_ => java.lang.System.nanoTime() <= deadline)
.scan(Chunk.empty[Runloop.Commit])(_ ++ _)
.takeUntilZIO(endingStreamsCompletedAndCommitsExist)
.runDrain *>
commitSync *>
ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end")
for {
_ <- logInitialStreamCompletionStatuses
completedAndCommits <-
ZStream
.fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll)
.tap(commitAsync)
.forever
.takeWhile(_ => java.lang.System.nanoTime() <= deadline)
.scan(Chunk.empty[Runloop.Commit])(_ ++ _)
.mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits)))
.takeUntil { case (completed, _) => completed }
.runLast
.map(_.getOrElse((false, Chunk.empty)))
_ <- logFinalStreamCompletionStatuses(completedAndCommits._1, completedAndCommits._2)
_ <- commitSync
_ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end")
} yield ()
}

// During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times.
Expand Down

0 comments on commit 37ad5e7

Please sign in to comment.