From d399b849bb559cf6d3be6ac3ed563737025d1b15 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 10 Jun 2024 10:40:55 +0200 Subject: [PATCH] Allow stream to be interrupted when there is no traffic (#1251) When a partition is lost while there is no traffic, `PartitionStreamControl` is blocked waiting for data. We fix this by racing with the `interruptPromise`. (Thanks @josdirksen for the analysis! See #1250.) Note: this situation can only occur with lost partitions. Timeouts (the other reason for interrupts) do not occur when there is no traffic. Currently, we have no way to test lost partitions. Therefore, there are no added tests. This PR does _not_ change how lost partitions are handled. That is, the stream for the partition that is lost is interrupted, the other streams are closed gracefully, the consumer aborts with an error. --- .../consumer/internal/PartitionStreamControl.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index ea2735824..550bc4349 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -38,7 +38,7 @@ final class PartitionStreamControl private ( val tp: TopicPartition, stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], - interruptionPromise: Promise[Throwable, Unit], + interruptionPromise: Promise[Throwable, Nothing], val completedPromise: Promise[Nothing, Option[Offset]], queueInfoRef: Ref[QueueInfo], maxPollInterval: Duration @@ -135,16 +135,18 @@ object PartitionStreamControl { for { _ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}") - interruptionPromise <- Promise.make[Throwable, Unit] + interruptionPromise <- Promise.make[Throwable, Nothing] completedPromise <- Promise.make[Nothing, Option[Offset]] dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] now <- Clock.nanoTime queueInfo <- Ref.make(QueueInfo(now, 0, None, 0)) requestAndAwaitData = for { - _ <- commandQueue.offer(RunloopCommand.Request(tp)) - _ <- diagnostics.emit(DiagnosticEvent.Request(tp)) - taken <- dataQueue.takeBetween(1, Int.MaxValue) + _ <- commandQueue.offer(RunloopCommand.Request(tp)) + _ <- diagnostics.emit(DiagnosticEvent.Request(tp)) + taken <- dataQueue + .takeBetween(1, Int.MaxValue) + .race(interruptionPromise.await) } yield taken stream = ZStream.logAnnotate(