diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index ea2735824..550bc4349 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -38,7 +38,7 @@ final class PartitionStreamControl private ( val tp: TopicPartition, stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], - interruptionPromise: Promise[Throwable, Unit], + interruptionPromise: Promise[Throwable, Nothing], val completedPromise: Promise[Nothing, Option[Offset]], queueInfoRef: Ref[QueueInfo], maxPollInterval: Duration @@ -135,16 +135,18 @@ object PartitionStreamControl { for { _ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}") - interruptionPromise <- Promise.make[Throwable, Unit] + interruptionPromise <- Promise.make[Throwable, Nothing] completedPromise <- Promise.make[Nothing, Option[Offset]] dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] now <- Clock.nanoTime queueInfo <- Ref.make(QueueInfo(now, 0, None, 0)) requestAndAwaitData = for { - _ <- commandQueue.offer(RunloopCommand.Request(tp)) - _ <- diagnostics.emit(DiagnosticEvent.Request(tp)) - taken <- dataQueue.takeBetween(1, Int.MaxValue) + _ <- commandQueue.offer(RunloopCommand.Request(tp)) + _ <- diagnostics.emit(DiagnosticEvent.Request(tp)) + taken <- dataQueue + .takeBetween(1, Int.MaxValue) + .race(interruptionPromise.await) } yield taken stream = ZStream.logAnnotate(