Skip to content

Commit

Permalink
Handle RebalanceInProgressExceptions transparently (zio#437)
Browse files Browse the repository at this point in the history
* Fix for zio#413

* Simplify / reduce diff

* Test that reproduces a RebalanceInProgressException

But the catchAll needs to be moved to the callback

* Test (WIP)

* Implement retries at the correct place

* Add diagnostic event for partitions lost

* Test assertion

* Cleanup and improve test

* Cleanup

* Restore

* Restore
  • Loading branch information
svroonland authored Sep 26, 2022
1 parent 926cf09 commit 82376bc
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 5 deletions.
93 changes: 92 additions & 1 deletion zio-kafka-test/src/test/scala/zio/kafka/ConsumerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zio.kafka.consumer

import io.github.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.consumer.{ ConsumerConfig, CooperativeStickyAssignor }
import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.KafkaTestUtils._
Expand Down Expand Up @@ -750,6 +751,96 @@ object ConsumerSpec extends ZIOSpecWithKafka {
// the exact count cannot be known because fib2's termination triggers fib1's rebalancing asynchronously.
} yield assert(messagesPerPartition0)(forall(equalTo(nrMessages / nrPartitions))) &&
assert(messagesPerPartition.view.sum)(isGreaterThan(0) && isLessThanEqualTo(nrMessages - 20))
},
test("handles RebalanceInProgressExceptions transparently") {
val nrPartitions = 5
val nrMessages = 10000

def customConsumer(clientId: String, groupId: Option[String]) =
(ZLayer(
consumerSettings(
clientId = clientId,
groupId = groupId,
clientInstanceId = None
).map(
_.withProperties(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName
)
.withPollInterval(500.millis)
.withPollTimeout(500.millis)
)
) ++ ZLayer.succeed(Diagnostics.NoOp) >>> Consumer.live)

for {
// Produce messages on several partitions
topic <- randomTopic
group <- randomGroup

_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions))
_ <- ZIO
.foreachDiscard(1 to nrMessages) { i =>
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}
.forkScoped

// Consume messages
messagesReceivedConsumer1 <- Ref.make[Int](0)
messagesReceivedConsumer2 <- Ref.make[Int](0)
drainCount <- Ref.make(0)
subscription = Subscription.topics(topic)
stopConsumer1 <- Promise.make[Nothing, Unit]
fib <-
Consumer
.subscribeAnd(subscription)
.partitionedAssignmentStream(Serde.string, Serde.string)
.rechunk(1)
.mapZIOPar(16) { partitions =>
ZIO.logInfo(s"Consumer 1 got new partition assignment: ${partitions.map(_._1.toString)}") *>
ZStream
.fromIterable(partitions.map(_._2))
.flatMapPar(Int.MaxValue)(s => s)
.mapZIO(record => messagesReceivedConsumer1.update(_ + 1).as(record))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(offsetBatch => offsetBatch.commit)
.runDrain
}
.mapZIO(_ => drainCount.updateAndGet(_ + 1))
.interruptWhen(stopConsumer1.await)
.runDrain
.provideSomeLayer[Kafka](
customConsumer("consumer1", Some(group)) ++ Scope.default
)
.tapError(e => ZIO.logErrorCause(e.getMessage, Cause.fail(e)))
.forkScoped

_ <- messagesReceivedConsumer1.get
.repeat(Schedule.recurUntil((n: Int) => n >= 20) && Schedule.fixed(100.millis))
_ <- ZIO.logInfo("Starting consumer 2")

fib2 <-
Consumer
.subscribeAnd(subscription)
.plainStream(Serde.string, Serde.string)
.mapZIO(record => messagesReceivedConsumer2.update(_ + 1).as(record))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(offsetBatch => offsetBatch.commit)
.runDrain
.provideSomeLayer[Kafka](
customConsumer("consumer2", Some(group))
)
.tapError(e => ZIO.logErrorCause("Error in consumer 2", Cause.fail(e)))
.forkScoped

_ <- messagesReceivedConsumer2.get
.repeat(Schedule.recurUntil((n: Int) => n >= 20) && Schedule.fixed(100.millis))
_ <- stopConsumer1.succeed(())
_ <- fib.join
_ <- fib2.interrupt
} yield assertCompletes
}
).provideSomeLayerShared[TestEnvironment with Kafka](producer) @@ withLiveClock @@ timeout(300.seconds)
).provideSomeLayerShared[TestEnvironment with Kafka](producer ++ Scope.default) @@ withLiveClock @@ timeout(
300.seconds
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ object DiagnosticEvent {
object Rebalance {
final case class Revoked(partitions: Set[TopicPartition]) extends Rebalance
final case class Assigned(partitions: Set[TopicPartition]) extends Rebalance
final case class Lost(partitions: Set[TopicPartition]) extends Rebalance
}
}
15 changes: 11 additions & 4 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import zio._
import zio.kafka.consumer.Consumer.OffsetRetrieval
import zio.kafka.consumer.{ CommittableRecord, RebalanceListener }
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
import zio.kafka.consumer.internal.Runloop.{
Expand All @@ -13,6 +13,7 @@ import zio.kafka.consumer.internal.Runloop.{
ByteArrayConsumerRecord,
Command
}
import zio.kafka.consumer.{ CommittableRecord, RebalanceListener }
import zio.stream._

import java.util
Expand Down Expand Up @@ -77,7 +78,8 @@ private[consumer] final class Runloop(

val emitDiagnostics = RebalanceListener(
(assigned, _) => diagnostics.emitIfEnabled(DiagnosticEvent.Rebalance.Assigned(assigned)),
(revoked, _) => diagnostics.emitIfEnabled(DiagnosticEvent.Rebalance.Revoked(revoked))
(revoked, _) => diagnostics.emitIfEnabled(DiagnosticEvent.Rebalance.Revoked(revoked)),
(lost, _) => diagnostics.emitIfEnabled(DiagnosticEvent.Rebalance.Lost(lost))
)

lazy val revokeTopics = RebalanceListener(
Expand Down Expand Up @@ -135,8 +137,13 @@ private[consumer] final class Runloop(
val offsets = aggregateOffsets(cmds)
val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(cmds)(_.cont.done(e))
val onSuccess = cont(Exit.succeed(())) <* diagnostics.emitIfEnabled(DiagnosticEvent.Commit.Success(offsets))
val onFailure = (err: Throwable) =>
cont(Exit.fail(err)) <* diagnostics.emitIfEnabled(DiagnosticEvent.Commit.Failure(offsets, err))
val onFailure: Throwable => UIO[Unit] = {
case _: RebalanceInProgressException =>
ZIO.logInfo(s"Rebalance in progress, retrying ${cmds.size.toString} commits") *>
commitQueue.offerAll(cmds).unit
case err =>
cont(Exit.fail(err)) <* diagnostics.emitIfEnabled(DiagnosticEvent.Commit.Failure(offsets, err))
}

ZIO
.runtime[Any]
Expand Down

0 comments on commit 82376bc

Please sign in to comment.