Skip to content

Commit

Permalink
Allow stream to be interrupted when there is no traffic (#1251)
Browse files Browse the repository at this point in the history
When a partition is lost while there is no traffic,
`PartitionStreamControl` is blocked waiting for data. We fix this by
racing with the `interruptPromise`. (Thanks @josdirksen for the
analysis! See #1250.)

Note: this situation can only occur with lost partitions. Timeouts (the
other reason for interrupts) do not occur when there is no traffic.
Currently, we have no way to test lost partitions. Therefore, there are
no added tests.

This PR does _not_ change how lost partitions are handled. That is, the
stream for the partition that is lost is interrupted, the other streams
are closed gracefully, the consumer aborts with an error.
  • Loading branch information
erikvanoosten authored Jun 10, 2024
1 parent b7c5894 commit d399b84
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ final class PartitionStreamControl private (
val tp: TopicPartition,
stream: ZStream[Any, Throwable, ByteArrayCommittableRecord],
dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]],
interruptionPromise: Promise[Throwable, Unit],
interruptionPromise: Promise[Throwable, Nothing],
val completedPromise: Promise[Nothing, Option[Offset]],
queueInfoRef: Ref[QueueInfo],
maxPollInterval: Duration
Expand Down Expand Up @@ -135,16 +135,18 @@ object PartitionStreamControl {

for {
_ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}")
interruptionPromise <- Promise.make[Throwable, Unit]
interruptionPromise <- Promise.make[Throwable, Nothing]
completedPromise <- Promise.make[Nothing, Option[Offset]]
dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]]
now <- Clock.nanoTime
queueInfo <- Ref.make(QueueInfo(now, 0, None, 0))
requestAndAwaitData =
for {
_ <- commandQueue.offer(RunloopCommand.Request(tp))
_ <- diagnostics.emit(DiagnosticEvent.Request(tp))
taken <- dataQueue.takeBetween(1, Int.MaxValue)
_ <- commandQueue.offer(RunloopCommand.Request(tp))
_ <- diagnostics.emit(DiagnosticEvent.Request(tp))
taken <- dataQueue
.takeBetween(1, Int.MaxValue)
.race(interruptionPromise.await)
} yield taken

stream = ZStream.logAnnotate(
Expand Down

0 comments on commit d399b84

Please sign in to comment.