From 1ca1f9496cb4bc4d728422aa88d6a8f69938e954 Mon Sep 17 00:00:00 2001 From: Flavien Bert Date: Thu, 16 Nov 2023 17:37:42 +0100 Subject: [PATCH 1/3] Make access to the java consumer fair --- build.sbt | 3 ++- .../zio/kafka/consumer/internal/ConsumerAccess.scala | 10 ++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/build.sbt b/build.sbt index 34b68ef70..f50527959 100644 --- a/build.sbt +++ b/build.sbt @@ -100,7 +100,8 @@ lazy val zioKafka = .settings( libraryDependencies ++= Seq( kafkaClients, - scalaCollectionCompat + scalaCollectionCompat, + "dev.zio" %% "zio-concurrent" % zioVersion.value ) ) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala index 3d2f2e3dd..3d5fa9e45 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala @@ -8,16 +8,18 @@ import zio.kafka.consumer.ConsumerSettings import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import scala.jdk.CollectionConverters._ +import zio.concurrent.ReentrantLock private[consumer] final class ConsumerAccess( private[consumer] val consumer: ByteArrayKafkaConsumer, - access: Semaphore + access: ReentrantLock ) { + def withConsumer[A](f: ByteArrayKafkaConsumer => A): Task[A] = withConsumerZIO[Any, A](c => ZIO.attempt(f(c))) def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] = - access.withPermit(withConsumerNoPermit(f)) + access.lock *> withConsumerNoPermit(f) <* access.unlock private[consumer] def withConsumerNoPermit[R, A]( f: ByteArrayKafkaConsumer => RIO[R, A] @@ -34,7 +36,7 @@ private[consumer] final class ConsumerAccess( * Do not use this method outside of the Runloop */ private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] = - access.withPermit(f(consumer)) + access.lock *> f(consumer) <* access.unlock } private[consumer] object ConsumerAccess { @@ -58,6 +60,6 @@ private[consumer] object ConsumerAccess { def make(consumer: ByteArrayKafkaConsumer): ZIO[Scope, Throwable, ConsumerAccess] = for { - access <- Semaphore.make(1) + access <- ReentrantLock.make(fairness = true) } yield new ConsumerAccess(consumer, access) } From d5df43b21be5ffe168b6b73787c0925873504036 Mon Sep 17 00:00:00 2001 From: Flavien Bert Date: Fri, 17 Nov 2023 10:12:46 +0100 Subject: [PATCH 2/3] review + test --- .../zio/kafka/consumer/ConsumerSpec.scala | 33 +++++++++++++++++++ .../consumer/internal/ConsumerAccess.scala | 4 +-- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index ee456d1ab..55d7d76a7 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -445,6 +445,39 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka](consumer(client, Some(group))) } yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata))) }, + test("access to the java consumer must be fair") { + val kvs = (1 to 10).toList.map(i => (s"key$i", s"msg$i")) + + val expectedResult = (0 to 9).toList.map(i => i.toLong -> i.toLong) + + for { + topic <- randomTopic + client <- randomClient + group <- randomGroup + + _ <- produceMany(topic, kvs) + committedOffsetRef <- Ref.make(Seq.empty[(Long, Long)]) + + topicPartition = new TopicPartition(topic, 0) + + _ <- Consumer + .plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string) + .take(10) + .map(_.offset) + .mapZIO(offsetBatch => + Consumer + .committed(Set(topicPartition)) + .flatMap(offset => + committedOffsetRef.update(map => + map :+ (offsetBatch.offset -> offset(topicPartition).map(_.offset()).getOrElse(0L)) + ) *> offsetBatch.commit + ) + ) + .runDrain + .provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds)) + offsets <- committedOffsetRef.get + } yield assert(offsets)(equalTo(expectedResult)) + }, test("handle rebalancing by completing topic-partition streams") { val nrMessages = 50 val nrPartitions = 6 // Must be even and strictly positive diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala index 3d5fa9e45..a3d6e1fbb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala @@ -19,7 +19,7 @@ private[consumer] final class ConsumerAccess( withConsumerZIO[Any, A](c => ZIO.attempt(f(c))) def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] = - access.lock *> withConsumerNoPermit(f) <* access.unlock + access.lock.zipRight(withConsumerNoPermit(f)).ensuring(access.unlock) private[consumer] def withConsumerNoPermit[R, A]( f: ByteArrayKafkaConsumer => RIO[R, A] @@ -36,7 +36,7 @@ private[consumer] final class ConsumerAccess( * Do not use this method outside of the Runloop */ private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] = - access.lock *> f(consumer) <* access.unlock + access.lock.zipRight(f(consumer)).ensuring(access.unlock) } private[consumer] object ConsumerAccess { From f915d4fa1ff430d20eb1132ccdc17b6bcf98ec0c Mon Sep 17 00:00:00 2001 From: Flavien Bert Date: Fri, 17 Nov 2023 10:20:21 +0100 Subject: [PATCH 3/3] Add a TestAspect.timeout in the test --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 55d7d76a7..fa10ff9ef 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -477,7 +477,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds)) offsets <- committedOffsetRef.get } yield assert(offsets)(equalTo(expectedResult)) - }, + } @@ TestAspect.timeout(20.seconds), test("handle rebalancing by completing topic-partition streams") { val nrMessages = 50 val nrPartitions = 6 // Must be even and strictly positive