From d2aa7f7307dfbfc7465b423e66fa697015428ce8 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Fri, 22 Dec 2023 13:45:26 +0100 Subject: [PATCH] Delete `RunloopTimeout` and small improvements (#1141) * Delete `RunloopTimeout` as it is no longer used. * Do not suppress error in `commitTransactionWithOffsets` (even though the only invoker still dies upon any error). * Better debug logging in `Runloop`. * Shave tens of seconds of test by never taking more messages than can be sent. * Run test less often, 3 times is enough (`nonFlaky(3)`` runs the test 4 times!). --- .../scala/zio/kafka/consumer/ConsumerSpec.scala | 9 +++++---- .../main/scala/zio/kafka/consumer/Consumer.scala | 3 +-- .../zio/kafka/consumer/internal/Runloop.scala | 14 ++++++++++---- .../zio/kafka/producer/TransactionalProducer.scala | 2 +- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 6697272ca..b6f68ce41 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1196,7 +1196,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { * - A producer generates some messages on topic A, * - a transactional consumer/producer pair (copier1) reads these and copies them to topic B transactionally, * - after a few messages we start a second transactional consumer/producer pair (copier2) that does the same - * (in the same consumer group) this triggers a rebalance, + * (in the same consumer group), this triggers a rebalance, * - produce some more messages to topic A, * - a consumer that empties topic B, * - when enough messages have been received, the copiers are interrupted. @@ -1347,8 +1347,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messagesOnTopicB <- ZIO.logAnnotate("consumer", "validator") { Consumer .plainStream(Subscription.topics(topicB), Serde.string, Serde.string) - .map(_.value) - .timeout(5.seconds) + .mapChunks(_.map(_.value)) + .take(messageCount.toLong) + .timeout(10.seconds) .runCollect .provideSome[Kafka]( transactionalConsumer( @@ -1373,7 +1374,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { // testForPartitionAssignmentStrategy[CooperativeStickyAssignor] // TODO not yet supported ) - }: _*) @@ TestAspect.nonFlaky(3), + }: _*) @@ TestAspect.nonFlaky(2), test("running streams don't stall after a poll timeout") { for { topic <- randomTopic diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index fc3f17ada..c1b66da1d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -161,8 +161,7 @@ trait Consumer { } object Consumer { - case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace - case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace + case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace val offsetBatches: ZSink[Any, Nothing, Offset, Nothing, OffsetBatch] = ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ add _) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 1a2983f79..5dc1ffac6 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -202,9 +202,12 @@ private[consumer] final class Runloop private ( val recordRebalanceRebalancingListener = RebalanceListener( onAssigned = (assignedTps, _) => for { - _ <- ZIO.logDebug(s"${assignedTps.size} partitions are assigned") rebalanceEvent <- lastRebalanceEvent.get - state <- currentStateRef.get + _ <- ZIO.logInfo { + val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" + s"${assignedTps.size} partitions are assigned$sameRebalance" + } + state <- currentStateRef.get streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams else Chunk.empty _ <- endStreams(state, streamsToEnd) @@ -213,9 +216,12 @@ private[consumer] final class Runloop private ( } yield (), onRevoked = (revokedTps, _) => for { - _ <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked") rebalanceEvent <- lastRebalanceEvent.get - state <- currentStateRef.get + _ <- ZIO.logInfo { + val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" + s"${revokedTps.size} partitions are revoked$sameRebalance" + } + state <- currentStateRef.get streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams else state.assignedStreams.filter(control => revokedTps.contains(control.tp)) _ <- endStreams(state, streamsToEnd) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 98380254b..173f43100 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -28,7 +28,7 @@ object TransactionalProducer { private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): Task[Unit] = { val sendOffsetsToTransaction: Task[Unit] = - ZIO.suspendSucceed { + ZIO.suspend { @inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] = ZIO.fail( new InvalidGroupIdException(