From beb1817e3e47067c88b10f2e957dd3840a3296bf Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 29 Oct 2023 18:27:26 +0100 Subject: [PATCH] Fix the flaky test --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- .../zio/kafka/consumer/internal/PartitionStreamControl.scala | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 624104691..f0dcc25a0 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -377,7 +377,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { subscriptions.isEmpty ) } - } @@ flaky(3), + }, test("a slow producer doesnot interrupt the stream") { ZIO.scoped { for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index f1ab25db1..7866e542a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -154,7 +154,10 @@ object PartitionStreamControl { dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data)) }.flattenTake .chunksWith(_.tap(records => registerPull(queueInfo, records))) - .interruptWhen(interruptionPromise) + // Due to https://github.com/zio/zio/issues/8515 we cannot use Zstream.interruptWhen. + .mapZIO { chunk => + interruptionPromise.await.whenZIO(interruptionPromise.isDone).as(chunk) + } } yield new PartitionStreamControl( tp, stream,